Dependents:   mbed-os-example-mros2 example-mbed-mros2-sub-pose example-mbed-mros2-pub-twist example-mbed-mros2-mturtle-teleop

Committer:
smoritaemb
Date:
Thu Dec 30 21:06:29 2021 +0900
Revision:
0:580aba13d1a1
Updated to catch up to mros2 v2.3

Who changed what in which revision?

UserRevisionLine numberNew contents of line
smoritaemb 0:580aba13d1a1 1 /*
smoritaemb 0:580aba13d1a1 2 The MIT License
smoritaemb 0:580aba13d1a1 3 Copyright (c) 2019 Lehrstuhl Informatik 11 - RWTH Aachen University
smoritaemb 0:580aba13d1a1 4 Permission is hereby granted, free of charge, to any person obtaining a copy
smoritaemb 0:580aba13d1a1 5 of this software and associated documentation files (the "Software"), to deal
smoritaemb 0:580aba13d1a1 6 in the Software without restriction, including without limitation the rights
smoritaemb 0:580aba13d1a1 7 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
smoritaemb 0:580aba13d1a1 8 copies of the Software, and to permit persons to whom the Software is
smoritaemb 0:580aba13d1a1 9 furnished to do so, subject to the following conditions:
smoritaemb 0:580aba13d1a1 10 The above copyright notice and this permission notice shall be included in
smoritaemb 0:580aba13d1a1 11 all copies or substantial portions of the Software.
smoritaemb 0:580aba13d1a1 12 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
smoritaemb 0:580aba13d1a1 13 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
smoritaemb 0:580aba13d1a1 14 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
smoritaemb 0:580aba13d1a1 15 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
smoritaemb 0:580aba13d1a1 16 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
smoritaemb 0:580aba13d1a1 17 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
smoritaemb 0:580aba13d1a1 18 THE SOFTWARE
smoritaemb 0:580aba13d1a1 19
smoritaemb 0:580aba13d1a1 20 This file is part of embeddedRTPS.
smoritaemb 0:580aba13d1a1 21
smoritaemb 0:580aba13d1a1 22 Author: i11 - Embedded Software, RWTH Aachen University
smoritaemb 0:580aba13d1a1 23 */
smoritaemb 0:580aba13d1a1 24
smoritaemb 0:580aba13d1a1 25 #include <rtps/entities/ReaderProxy.h>
smoritaemb 0:580aba13d1a1 26
smoritaemb 0:580aba13d1a1 27 #include "lwip/tcpip.h"
smoritaemb 0:580aba13d1a1 28 #include "rtps/ThreadPool.h"
smoritaemb 0:580aba13d1a1 29 #include "rtps/communication/UdpDriver.h"
smoritaemb 0:580aba13d1a1 30 #include "rtps/messages/MessageFactory.h"
smoritaemb 0:580aba13d1a1 31 #include "rtps/storages/PBufWrapper.h"
smoritaemb 0:580aba13d1a1 32 #include "rtps/utils/Log.h"
smoritaemb 0:580aba13d1a1 33 #include "rtps/utils/udpUtils.h"
smoritaemb 0:580aba13d1a1 34
smoritaemb 0:580aba13d1a1 35 using rtps::CacheChange;
smoritaemb 0:580aba13d1a1 36 using rtps::SequenceNumber_t;
smoritaemb 0:580aba13d1a1 37 using rtps::StatelessWriterT;
smoritaemb 0:580aba13d1a1 38
smoritaemb 0:580aba13d1a1 39 #define SLW_VERBOSE 0
smoritaemb 0:580aba13d1a1 40
smoritaemb 0:580aba13d1a1 41 #if SLW_VERBOSE
smoritaemb 0:580aba13d1a1 42 #include "rtps/utils/printutils.h"
smoritaemb 0:580aba13d1a1 43 #endif
smoritaemb 0:580aba13d1a1 44
smoritaemb 0:580aba13d1a1 45 template <class NetworkDriver>
smoritaemb 0:580aba13d1a1 46 StatelessWriterT<NetworkDriver>::~StatelessWriterT() {
smoritaemb 0:580aba13d1a1 47 // if(sys_mutex_valid(&m_mutex)){
smoritaemb 0:580aba13d1a1 48 sys_mutex_free(&m_mutex);
smoritaemb 0:580aba13d1a1 49 //}
smoritaemb 0:580aba13d1a1 50 }
smoritaemb 0:580aba13d1a1 51
smoritaemb 0:580aba13d1a1 52 template <typename NetworkDriver>
smoritaemb 0:580aba13d1a1 53 bool StatelessWriterT<NetworkDriver>::init(TopicData attributes,
smoritaemb 0:580aba13d1a1 54 TopicKind_t topicKind,
smoritaemb 0:580aba13d1a1 55 ThreadPool *threadPool,
smoritaemb 0:580aba13d1a1 56 NetworkDriver &driver) {
smoritaemb 0:580aba13d1a1 57 if (sys_mutex_new(&m_mutex) != ERR_OK) {
smoritaemb 0:580aba13d1a1 58 #if SLW_VERBOSE
smoritaemb 0:580aba13d1a1 59 Log::printLine("SFW:Failed to create mutex \n");
smoritaemb 0:580aba13d1a1 60 #endif
smoritaemb 0:580aba13d1a1 61 return false;
smoritaemb 0:580aba13d1a1 62 }
smoritaemb 0:580aba13d1a1 63
smoritaemb 0:580aba13d1a1 64 m_attributes = attributes;
smoritaemb 0:580aba13d1a1 65 m_packetInfo.srcPort = attributes.unicastLocator.port;
smoritaemb 0:580aba13d1a1 66 m_topicKind = topicKind;
smoritaemb 0:580aba13d1a1 67 mp_threadPool = threadPool;
smoritaemb 0:580aba13d1a1 68 m_transport = &driver;
smoritaemb 0:580aba13d1a1 69
smoritaemb 0:580aba13d1a1 70 m_is_initialized_ = true;
smoritaemb 0:580aba13d1a1 71 return true;
smoritaemb 0:580aba13d1a1 72 }
smoritaemb 0:580aba13d1a1 73
smoritaemb 0:580aba13d1a1 74 template <class NetworkDriver>
smoritaemb 0:580aba13d1a1 75 bool StatelessWriterT<NetworkDriver>::addNewMatchedReader(
smoritaemb 0:580aba13d1a1 76 const ReaderProxy &newProxy) {
smoritaemb 0:580aba13d1a1 77 #if SLW_VERBOSE
smoritaemb 0:580aba13d1a1 78 printf("StatefulWriter[%s]: New reader added with id: ",
smoritaemb 0:580aba13d1a1 79 &this->m_attributes.topicName[0]);
smoritaemb 0:580aba13d1a1 80 printGuid(newProxy.remoteReaderGuid);
smoritaemb 0:580aba13d1a1 81 printf("\n");
smoritaemb 0:580aba13d1a1 82 #endif
smoritaemb 0:580aba13d1a1 83 return m_proxies.add(newProxy);
smoritaemb 0:580aba13d1a1 84 }
smoritaemb 0:580aba13d1a1 85
smoritaemb 0:580aba13d1a1 86 template <class NetworkDriver>
smoritaemb 0:580aba13d1a1 87 void StatelessWriterT<NetworkDriver>::removeReader(const Guid &guid) {
smoritaemb 0:580aba13d1a1 88 auto isElementToRemove = [&](const ReaderProxy &proxy) {
smoritaemb 0:580aba13d1a1 89 return proxy.remoteReaderGuid == guid;
smoritaemb 0:580aba13d1a1 90 };
smoritaemb 0:580aba13d1a1 91 auto thunk = [](void *arg, const ReaderProxy &value) {
smoritaemb 0:580aba13d1a1 92 return (*static_cast<decltype(isElementToRemove) *>(arg))(value);
smoritaemb 0:580aba13d1a1 93 };
smoritaemb 0:580aba13d1a1 94
smoritaemb 0:580aba13d1a1 95 m_proxies.remove(thunk, &isElementToRemove);
smoritaemb 0:580aba13d1a1 96 }
smoritaemb 0:580aba13d1a1 97
smoritaemb 0:580aba13d1a1 98 template <typename NetworkDriver>
smoritaemb 0:580aba13d1a1 99 const CacheChange *StatelessWriterT<NetworkDriver>::newChange(
smoritaemb 0:580aba13d1a1 100 rtps::ChangeKind_t kind, const uint8_t *data, DataSize_t size) {
smoritaemb 0:580aba13d1a1 101 if (isIrrelevant(kind)) {
smoritaemb 0:580aba13d1a1 102 return nullptr;
smoritaemb 0:580aba13d1a1 103 }
smoritaemb 0:580aba13d1a1 104 Lock lock(m_mutex);
smoritaemb 0:580aba13d1a1 105
smoritaemb 0:580aba13d1a1 106 if (m_history.isFull()) {
smoritaemb 0:580aba13d1a1 107 SequenceNumber_t newMin = ++SequenceNumber_t(m_history.getSeqNumMin());
smoritaemb 0:580aba13d1a1 108 if (m_nextSequenceNumberToSend < newMin) {
smoritaemb 0:580aba13d1a1 109 m_nextSequenceNumberToSend =
smoritaemb 0:580aba13d1a1 110 newMin; // Make sure we have the correct sn to send
smoritaemb 0:580aba13d1a1 111 }
smoritaemb 0:580aba13d1a1 112 }
smoritaemb 0:580aba13d1a1 113
smoritaemb 0:580aba13d1a1 114 auto *result = m_history.addChange(data, size);
smoritaemb 0:580aba13d1a1 115 if (mp_threadPool != nullptr) {
smoritaemb 0:580aba13d1a1 116 mp_threadPool->addWorkload(this);
smoritaemb 0:580aba13d1a1 117 }
smoritaemb 0:580aba13d1a1 118
smoritaemb 0:580aba13d1a1 119 #if SLW_VERBOSE
smoritaemb 0:580aba13d1a1 120 printf("StatelessWriter[%s]: Adding new data.\n",
smoritaemb 0:580aba13d1a1 121 this->m_attributes.topicName);
smoritaemb 0:580aba13d1a1 122 #endif
smoritaemb 0:580aba13d1a1 123 return result;
smoritaemb 0:580aba13d1a1 124 }
smoritaemb 0:580aba13d1a1 125
smoritaemb 0:580aba13d1a1 126 template <typename NetworkDriver>
smoritaemb 0:580aba13d1a1 127 void StatelessWriterT<NetworkDriver>::setAllChangesToUnsent() {
smoritaemb 0:580aba13d1a1 128 Lock lock(m_mutex);
smoritaemb 0:580aba13d1a1 129
smoritaemb 0:580aba13d1a1 130 m_nextSequenceNumberToSend = m_history.getSeqNumMin();
smoritaemb 0:580aba13d1a1 131
smoritaemb 0:580aba13d1a1 132 if (mp_threadPool != nullptr) {
smoritaemb 0:580aba13d1a1 133 mp_threadPool->addWorkload(this);
smoritaemb 0:580aba13d1a1 134 }
smoritaemb 0:580aba13d1a1 135 }
smoritaemb 0:580aba13d1a1 136
smoritaemb 0:580aba13d1a1 137 template <typename NetworkDriver>
smoritaemb 0:580aba13d1a1 138 void StatelessWriterT<NetworkDriver>::onNewAckNack(
smoritaemb 0:580aba13d1a1 139 const SubmessageAckNack & /*msg*/, const GuidPrefix_t &sourceGuidPrefix) {
smoritaemb 0:580aba13d1a1 140 // Too lazy to respond
smoritaemb 0:580aba13d1a1 141 }
smoritaemb 0:580aba13d1a1 142
smoritaemb 0:580aba13d1a1 143 template <typename NetworkDriver>
smoritaemb 0:580aba13d1a1 144 bool StatelessWriterT<NetworkDriver>::isIrrelevant(ChangeKind_t kind) const {
smoritaemb 0:580aba13d1a1 145 // Right now we only allow alive changes
smoritaemb 0:580aba13d1a1 146 // return kind == ChangeKind_t::INVALID || (m_topicKind == TopicKind_t::NO_KEY
smoritaemb 0:580aba13d1a1 147 // && kind != ChangeKind_t::ALIVE);
smoritaemb 0:580aba13d1a1 148 return kind != ChangeKind_t::ALIVE;
smoritaemb 0:580aba13d1a1 149 }
smoritaemb 0:580aba13d1a1 150
smoritaemb 0:580aba13d1a1 151 template <typename NetworkDriver>
smoritaemb 0:580aba13d1a1 152 void StatelessWriterT<NetworkDriver>::progress() {
smoritaemb 0:580aba13d1a1 153 // TODO smarter packaging e.g. by creating MessageStruct and serialize after
smoritaemb 0:580aba13d1a1 154 // adjusting values Reusing the pbuf is not possible. See
smoritaemb 0:580aba13d1a1 155 // https://www.nongnu.org/lwip/2_1_x/raw_api.html (Zero-Copy MACs)
smoritaemb 0:580aba13d1a1 156
smoritaemb 0:580aba13d1a1 157 for (const auto &proxy : m_proxies) {
smoritaemb 0:580aba13d1a1 158
smoritaemb 0:580aba13d1a1 159 #if SLW_VERBOSE
smoritaemb 0:580aba13d1a1 160 printf("StatelessWriter[%s]: Progess.\n", this->m_attributes.topicName);
smoritaemb 0:580aba13d1a1 161 #endif
smoritaemb 0:580aba13d1a1 162
smoritaemb 0:580aba13d1a1 163 PacketInfo info;
smoritaemb 0:580aba13d1a1 164 info.srcPort = m_packetInfo.srcPort;
smoritaemb 0:580aba13d1a1 165
smoritaemb 0:580aba13d1a1 166 MessageFactory::addHeader(info.buffer, m_attributes.endpointGuid.prefix);
smoritaemb 0:580aba13d1a1 167 //MessageFactory::addSubMessageTimeStamp(info.buffer);
smoritaemb 0:580aba13d1a1 168
smoritaemb 0:580aba13d1a1 169 {
smoritaemb 0:580aba13d1a1 170 Lock lock(m_mutex);
smoritaemb 0:580aba13d1a1 171 const CacheChange *next =
smoritaemb 0:580aba13d1a1 172 m_history.getChangeBySN(m_nextSequenceNumberToSend);
smoritaemb 0:580aba13d1a1 173 if (next == nullptr) {
smoritaemb 0:580aba13d1a1 174 #if SLW_VERBOSE
smoritaemb 0:580aba13d1a1 175 printf("StatelessWriter[%s]: Couldn't get a new CacheChange with SN "
smoritaemb 0:580aba13d1a1 176 "(%i,%i)\n",
smoritaemb 0:580aba13d1a1 177 &m_attributes.topicName[0], m_nextSequenceNumberToSend.high,
smoritaemb 0:580aba13d1a1 178 m_nextSequenceNumberToSend.low);
smoritaemb 0:580aba13d1a1 179 #endif
smoritaemb 0:580aba13d1a1 180 return;
smoritaemb 0:580aba13d1a1 181 } else {
smoritaemb 0:580aba13d1a1 182 #if SLW_VERBOSE
smoritaemb 0:580aba13d1a1 183 printf("StatelessWriter[%s]: Sending change with SN (%i,%i)\n",
smoritaemb 0:580aba13d1a1 184 &m_attributes.topicName[0], m_nextSequenceNumberToSend.high,
smoritaemb 0:580aba13d1a1 185 m_nextSequenceNumberToSend.low);
smoritaemb 0:580aba13d1a1 186 #endif
smoritaemb 0:580aba13d1a1 187 }
smoritaemb 0:580aba13d1a1 188 //TODO: these should be called only when the message data is published.
smoritaemb 0:580aba13d1a1 189 char hoge[2];
smoritaemb 0:580aba13d1a1 190 pbuf_copy_partial(next->data.firstElement, &hoge, 2,0);
smoritaemb 0:580aba13d1a1 191 if(hoge[0] != 0 || hoge[1] != 3) {
smoritaemb 0:580aba13d1a1 192 MessageFactory::addSubMessageDestination(info.buffer);
smoritaemb 0:580aba13d1a1 193 //next->data = next->data[1];
smoritaemb 0:580aba13d1a1 194 //next->size - next->size - 1;
smoritaemb 0:580aba13d1a1 195 }
smoritaemb 0:580aba13d1a1 196 MessageFactory::addSubMessageTimeStamp(info.buffer);
smoritaemb 0:580aba13d1a1 197 MessageFactory::addSubMessageData(
smoritaemb 0:580aba13d1a1 198 info.buffer, next->data, false, next->sequenceNumber,
smoritaemb 0:580aba13d1a1 199 m_attributes.endpointGuid.entityId,
smoritaemb 0:580aba13d1a1 200 proxy.remoteReaderGuid.entityId); // TODO
smoritaemb 0:580aba13d1a1 201 }
smoritaemb 0:580aba13d1a1 202
smoritaemb 0:580aba13d1a1 203 // Just usable for IPv4
smoritaemb 0:580aba13d1a1 204 const Locator &locator = proxy.remoteLocator;
smoritaemb 0:580aba13d1a1 205
smoritaemb 0:580aba13d1a1 206 info.destAddr = locator.getIp4Address();
smoritaemb 0:580aba13d1a1 207 info.destPort = (Ip4Port_t)locator.port;
smoritaemb 0:580aba13d1a1 208
smoritaemb 0:580aba13d1a1 209 m_transport->sendPacket(info);
smoritaemb 0:580aba13d1a1 210 }
smoritaemb 0:580aba13d1a1 211
smoritaemb 0:580aba13d1a1 212 ++m_nextSequenceNumberToSend;
smoritaemb 0:580aba13d1a1 213 }