Dependents: mbed-os-example-mros2 example-mbed-mros2-sub-pose example-mbed-mros2-pub-twist example-mbed-mros2-mturtle-teleop
Diff: embeddedRTPS/include/rtps/entities/StatefulWriter.tpp
- Revision:
- 0:580aba13d1a1
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/embeddedRTPS/include/rtps/entities/StatefulWriter.tpp Thu Dec 30 21:06:29 2021 +0900 @@ -0,0 +1,374 @@ +/* +The MIT License +Copyright (c) 2019 Lehrstuhl Informatik 11 - RWTH Aachen University +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE + +This file is part of embeddedRTPS. + +Author: i11 - Embedded Software, RWTH Aachen University +*/ + +#include "rtps/messages/MessageFactory.h" +#include <cstring> +#include <stdio.h> + +using rtps::StatefulWriterT; + +/* +static int line_cnt_ = 0; +static char tft_buffer_[150]; + +#define log(args...) if(true){ \ +Lock lock{m_mutex}; \ +snprintf(tft_buffer_, sizeof(tft_buffer_), args); \ +TFT_PrintLine(line_cnt_, tft_buffer_); \ +line_cnt_ = (line_cnt_+1)%5; \ +} +*/ +#define SFW_VERBOSE 0 + +#if SFW_VERBOSE +#include "rtps/utils/printutils.h" +#endif + +template <class NetworkDriver> +StatefulWriterT<NetworkDriver>::~StatefulWriterT() { + m_running = false; + sys_msleep(10); // Required for tests/ Join currently not available + // if(sys_mutex_valid(&m_mutex)){ + sys_mutex_free(&m_mutex); + //} +} + +template <class NetworkDriver> +bool StatefulWriterT<NetworkDriver>::init(TopicData attributes, + TopicKind_t topicKind, + ThreadPool * /*threadPool*/, + NetworkDriver &driver) { + if (sys_mutex_new(&m_mutex) != ERR_OK) { +#if SFW_VERBOSE + log("StatefulWriter: Failed to create mutex.\n"); +#endif + return false; + } + + m_transport = &driver; + m_attributes = attributes; + m_topicKind = topicKind; + m_packetInfo.srcPort = attributes.unicastLocator.port; + if (m_attributes.endpointGuid.entityId == + ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER) { + #ifdef MROS2_USE_EMBEDDEDRTPS + m_heartbeatThread = sys_thread_new("HBThreadPub", callHbPubFunc, this, + Config::HEARTBEAT_STACKSIZE, + Config::THREAD_POOL_WRITER_PRIO); + networkPubDriverPtr = this; + hbPubFuncPtr = hbFunctionJumppad; + #else + m_heartbeatThread = sys_thread_new("HBThreadPub", hbFunctionJumppad, this, + Config::HEARTBEAT_STACKSIZE, + Config::THREAD_POOL_WRITER_PRIO); + #endif + } else if (m_attributes.endpointGuid.entityId == + ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER) { + #ifdef MROS2_USE_EMBEDDEDRTPS + m_heartbeatThread = sys_thread_new("HBThreadSub", callHbSubFunc, this, + Config::HEARTBEAT_STACKSIZE, + Config::THREAD_POOL_WRITER_PRIO); + networkSubDriverPtr = this; + hbSubFuncPtr = hbFunctionJumppad; + #else + m_heartbeatThread = sys_thread_new("HBThreadSub", hbFunctionJumppad, this, + Config::HEARTBEAT_STACKSIZE, + Config::THREAD_POOL_WRITER_PRIO); + #endif + } else { + m_heartbeatThread = sys_thread_new("HBThread", hbFunctionJumppad, this, + Config::HEARTBEAT_STACKSIZE, + Config::THREAD_POOL_WRITER_PRIO); + } + /* + if(hbFuncPointer == NULL) + { + hbFuncPointer = hbFunctionJumppad; + }*/ + m_is_initialized_ = true; + return true; +} + +template <class NetworkDriver> +bool StatefulWriterT<NetworkDriver>::addNewMatchedReader( + const ReaderProxy &newProxy) { +#if SFW_VERBOSE + log("StatefulWriter[%s]: New reader added with id: ", + &this->m_attributes.topicName[0]); + printGuid(newProxy.remoteReaderGuid); + log("\n"); +#endif + return m_proxies.add(newProxy); +} + +template <class NetworkDriver> +void StatefulWriterT<NetworkDriver>::removeReader(const Guid &guid) { + auto isElementToRemove = [&](const ReaderProxy &proxy) { + return proxy.remoteReaderGuid == guid; + }; + auto thunk = [](void *arg, const ReaderProxy &value) { + return (*static_cast<decltype(isElementToRemove) *>(arg))(value); + }; + + m_proxies.remove(thunk, &isElementToRemove); +} + +template <class NetworkDriver> +const rtps::CacheChange *StatefulWriterT<NetworkDriver>::newChange( + ChangeKind_t kind, const uint8_t *data, DataSize_t size) { + if (isIrrelevant(kind)) { + return nullptr; + } + + Lock lock{m_mutex}; + + if (m_history.isFull()) { + // Right now we drop elements anyway because we cannot detect non-responding + // readers yet. return nullptr; + SequenceNumber_t newMin = ++SequenceNumber_t(m_history.getSeqNumMin()); + if (m_nextSequenceNumberToSend < newMin) { + m_nextSequenceNumberToSend = + newMin; // Make sure we have the correct sn to send + } + } + + auto *result = m_history.addChange(data, size); + if (mp_threadPool != nullptr) { + mp_threadPool->addWorkload(this); + } + +#if SFW_VERBOSE + log("StatefulWriter[%s]: Adding new data.\n", this->m_attributes.topicName); +#endif + return result; +} + +template <class NetworkDriver> void StatefulWriterT<NetworkDriver>::progress() { + for (const auto &proxy : m_proxies) { + if (!sendData(proxy, m_nextSequenceNumberToSend)) { + continue; + } + } + ++m_nextSequenceNumberToSend; +} + +template <typename NetworkDriver> +bool StatefulWriterT<NetworkDriver>::isIrrelevant(ChangeKind_t kind) const { + // Right now we only allow alive changes + // return kind == ChangeKind_t::INVALID || (m_topicKind == TopicKind_t::NO_KEY + // && kind != ChangeKind_t::ALIVE); + return kind != ChangeKind_t::ALIVE; +} + +template <class NetworkDriver> +void StatefulWriterT<NetworkDriver>::setAllChangesToUnsent() { + Lock lock(m_mutex); + + m_nextSequenceNumberToSend = m_history.getSeqNumMin(); + + if (mp_threadPool != nullptr) { + mp_threadPool->addWorkload(this); + } +} + +template <class NetworkDriver> +void StatefulWriterT<NetworkDriver>::onNewAckNack( + const SubmessageAckNack &msg, const GuidPrefix_t &sourceGuidPrefix) { + // Lock lock{m_mutex}; + // Search for reader + ReaderProxy *reader = nullptr; + for (auto &proxy : m_proxies) { + if (proxy.remoteReaderGuid.prefix == sourceGuidPrefix && + proxy.remoteReaderGuid.entityId == msg.readerId) { + reader = &proxy; + break; + } + } + + if (reader == nullptr) { +#if SFW_VERBOSE + log("StatefulWriter[%s]: No proxy found with id: ", + &this->m_attributes.topicName[0]); + printEntityId(msg.readerId); + log(" Dropping acknack.\n"); +#endif + return; + } + + uint8_t hash = 0; + for (int i = 0; i < sourceGuidPrefix.id.size(); i++) { + hash += sourceGuidPrefix.id.at(i); + } + + char bfr[20]; + size_t size = snprintf(bfr, sizeof(bfr), "%u <= %u", msg.count.value, + reader->ackNackCount.value); + if (!(size < sizeof(bfr))) { + while (1) + ; + } + + if (msg.count.value <= reader->ackNackCount.value) { +#if SFW_VERBOSE + log("StatefulWriter[%s]: Count too small. Dropping acknack.\n", + &this->m_attributes.topicName[0]); +#endif + return; + } + + reader->ackNackCount = msg.count; + + // Send missing packets + SequenceNumber_t nextSN = msg.readerSNState.base; +#if SFW_VERBOSE + if (nextSN.low == 0 && nextSN.high == 0) { + log("StatefulWriter[%s]: Received preemptive acknack. Ignored.\n", + &this->m_attributes.topicName[0]); + } else { + log("StatefulWriter[%s]: Received non-preemptive acknack.\n", + &this->m_attributes.topicName[0]); + } +#endif + for (uint32_t i = 0; i < msg.readerSNState.numBits; ++i, ++nextSN) { + if (msg.readerSNState.isSet(i)) { +#if SFW_VERBOSE + log("StatefulWriter[%s]: Send Packet on acknack.\n", + this->m_attributes.topicName); +#endif + sendData(*reader, nextSN); + } + } + // Check for sequence numbers after defined range + SequenceNumber_t maxSN; + { + Lock lock(m_mutex); + maxSN = m_history.getSeqNumMax(); + } + while (nextSN <= maxSN) { + sendData(*reader, nextSN); + ++nextSN; + } +} + +template <class NetworkDriver> +bool StatefulWriterT<NetworkDriver>::sendData( + const ReaderProxy &reader, const SequenceNumber_t &snMissing) { + + // TODO smarter packaging e.g. by creating MessageStruct and serialize after + // adjusting values Reusing the pbuf is not possible. See + // https://www.nongnu.org/lwip/2_0_x/raw_api.html (Zero-Copy MACs) + + PacketInfo info; + info.srcPort = m_packetInfo.srcPort; + + MessageFactory::addHeader(info.buffer, m_attributes.endpointGuid.prefix); + MessageFactory::addSubMessageDestination(info.buffer); + MessageFactory::addSubMessageTimeStamp(info.buffer); + + // Just usable for IPv4 + const Locator &locator = reader.remoteLocator; + + info.destAddr = locator.getIp4Address(); + info.destPort = (Ip4Port_t)locator.port; + + { + Lock lock(m_mutex); + const CacheChange *next = m_history.getChangeBySN(snMissing); + if (next == nullptr) { +#if SFW_VERBOSE + log("StatefulWriter[%s]: Couldn't get a CacheChange with SN (%i,%u)\n", + &this->m_attributes.topicName[0], snMissing.high, snMissing.low); +#endif + return false; + } + MessageFactory::addSubMessageData( + info.buffer, next->data, false, next->sequenceNumber, + m_attributes.endpointGuid.entityId, reader.remoteReaderGuid.entityId); + } + + m_transport->sendPacket(info); + return true; +} + +template <class NetworkDriver> +void StatefulWriterT<NetworkDriver>::hbFunctionJumppad(void *thisPointer) { + auto *writer = static_cast<StatefulWriterT<NetworkDriver> *>(thisPointer); + writer->sendHeartBeatLoop(); +} + +template <class NetworkDriver> +void StatefulWriterT<NetworkDriver>::sendHeartBeatLoop() { + while (m_running) { + sendHeartBeat(); + sys_msleep(Config::SF_WRITER_HB_PERIOD_MS); + } +} + +template <class NetworkDriver> +void StatefulWriterT<NetworkDriver>::sendHeartBeat() { + if (m_proxies.isEmpty()) { +#if SFW_VERBOSE + log("StatefulWriter[%s]: Skipping heartbeat. No proxies.\n", + this->m_attributes.topicName); +#endif + return; + } + + for (auto &proxy : m_proxies) { + + PacketInfo info; + info.srcPort = m_packetInfo.srcPort; + + SequenceNumber_t firstSN; + SequenceNumber_t lastSN; + MessageFactory::addHeader(info.buffer, m_attributes.endpointGuid.prefix); + { + Lock lock(m_mutex); + firstSN = m_history.getSeqNumMin(); + lastSN = m_history.getSeqNumMax(); + } + if (firstSN == SEQUENCENUMBER_UNKNOWN || lastSN == SEQUENCENUMBER_UNKNOWN) { +#if SFW_VERBOSE + if (strlen(&this->m_attributes.typeName[0]) != 0) { + log("StatefulWriter[%s]: Skipping heartbeat. No data.\n", + this->m_attributes.topicName); + } +#endif + return; + } + MessageFactory::addSubMessageDestination(info.buffer, proxy.remoteReaderGuid.prefix.id.data()); + MessageFactory::addHeartbeat( + info.buffer, m_attributes.endpointGuid.entityId, + proxy.remoteReaderGuid.entityId, firstSN, lastSN, m_hbCount); + + info.destAddr = proxy.remoteLocator.getIp4Address(); + info.destPort = proxy.remoteLocator.port; + + m_transport->sendPacket(info); + } + m_hbCount.value++; +} + + +#undef SFW_VERBOSE