Dependents: mbed-os-example-mros2 example-mbed-mros2-sub-pose example-mbed-mros2-pub-twist example-mbed-mros2-mturtle-teleop
embeddedRTPS/include/rtps/entities/StatelessWriter.tpp@0:580aba13d1a1, 2021-12-30 (annotated)
- Committer:
- smoritaemb
- Date:
- Thu Dec 30 21:06:29 2021 +0900
- Revision:
- 0:580aba13d1a1
Updated to catch up to mros2 v2.3
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
smoritaemb | 0:580aba13d1a1 | 1 | /* |
smoritaemb | 0:580aba13d1a1 | 2 | The MIT License |
smoritaemb | 0:580aba13d1a1 | 3 | Copyright (c) 2019 Lehrstuhl Informatik 11 - RWTH Aachen University |
smoritaemb | 0:580aba13d1a1 | 4 | Permission is hereby granted, free of charge, to any person obtaining a copy |
smoritaemb | 0:580aba13d1a1 | 5 | of this software and associated documentation files (the "Software"), to deal |
smoritaemb | 0:580aba13d1a1 | 6 | in the Software without restriction, including without limitation the rights |
smoritaemb | 0:580aba13d1a1 | 7 | to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
smoritaemb | 0:580aba13d1a1 | 8 | copies of the Software, and to permit persons to whom the Software is |
smoritaemb | 0:580aba13d1a1 | 9 | furnished to do so, subject to the following conditions: |
smoritaemb | 0:580aba13d1a1 | 10 | The above copyright notice and this permission notice shall be included in |
smoritaemb | 0:580aba13d1a1 | 11 | all copies or substantial portions of the Software. |
smoritaemb | 0:580aba13d1a1 | 12 | THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
smoritaemb | 0:580aba13d1a1 | 13 | IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
smoritaemb | 0:580aba13d1a1 | 14 | FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
smoritaemb | 0:580aba13d1a1 | 15 | AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
smoritaemb | 0:580aba13d1a1 | 16 | LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
smoritaemb | 0:580aba13d1a1 | 17 | OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
smoritaemb | 0:580aba13d1a1 | 18 | THE SOFTWARE |
smoritaemb | 0:580aba13d1a1 | 19 | |
smoritaemb | 0:580aba13d1a1 | 20 | This file is part of embeddedRTPS. |
smoritaemb | 0:580aba13d1a1 | 21 | |
smoritaemb | 0:580aba13d1a1 | 22 | Author: i11 - Embedded Software, RWTH Aachen University |
smoritaemb | 0:580aba13d1a1 | 23 | */ |
smoritaemb | 0:580aba13d1a1 | 24 | |
smoritaemb | 0:580aba13d1a1 | 25 | #include <rtps/entities/ReaderProxy.h> |
smoritaemb | 0:580aba13d1a1 | 26 | |
smoritaemb | 0:580aba13d1a1 | 27 | #include "lwip/tcpip.h" |
smoritaemb | 0:580aba13d1a1 | 28 | #include "rtps/ThreadPool.h" |
smoritaemb | 0:580aba13d1a1 | 29 | #include "rtps/communication/UdpDriver.h" |
smoritaemb | 0:580aba13d1a1 | 30 | #include "rtps/messages/MessageFactory.h" |
smoritaemb | 0:580aba13d1a1 | 31 | #include "rtps/storages/PBufWrapper.h" |
smoritaemb | 0:580aba13d1a1 | 32 | #include "rtps/utils/Log.h" |
smoritaemb | 0:580aba13d1a1 | 33 | #include "rtps/utils/udpUtils.h" |
smoritaemb | 0:580aba13d1a1 | 34 | |
smoritaemb | 0:580aba13d1a1 | 35 | using rtps::CacheChange; |
smoritaemb | 0:580aba13d1a1 | 36 | using rtps::SequenceNumber_t; |
smoritaemb | 0:580aba13d1a1 | 37 | using rtps::StatelessWriterT; |
smoritaemb | 0:580aba13d1a1 | 38 | |
smoritaemb | 0:580aba13d1a1 | 39 | #define SLW_VERBOSE 0 |
smoritaemb | 0:580aba13d1a1 | 40 | |
smoritaemb | 0:580aba13d1a1 | 41 | #if SLW_VERBOSE |
smoritaemb | 0:580aba13d1a1 | 42 | #include "rtps/utils/printutils.h" |
smoritaemb | 0:580aba13d1a1 | 43 | #endif |
smoritaemb | 0:580aba13d1a1 | 44 | |
smoritaemb | 0:580aba13d1a1 | 45 | template <class NetworkDriver> |
smoritaemb | 0:580aba13d1a1 | 46 | StatelessWriterT<NetworkDriver>::~StatelessWriterT() { |
smoritaemb | 0:580aba13d1a1 | 47 | // if(sys_mutex_valid(&m_mutex)){ |
smoritaemb | 0:580aba13d1a1 | 48 | sys_mutex_free(&m_mutex); |
smoritaemb | 0:580aba13d1a1 | 49 | //} |
smoritaemb | 0:580aba13d1a1 | 50 | } |
smoritaemb | 0:580aba13d1a1 | 51 | |
smoritaemb | 0:580aba13d1a1 | 52 | template <typename NetworkDriver> |
smoritaemb | 0:580aba13d1a1 | 53 | bool StatelessWriterT<NetworkDriver>::init(TopicData attributes, |
smoritaemb | 0:580aba13d1a1 | 54 | TopicKind_t topicKind, |
smoritaemb | 0:580aba13d1a1 | 55 | ThreadPool *threadPool, |
smoritaemb | 0:580aba13d1a1 | 56 | NetworkDriver &driver) { |
smoritaemb | 0:580aba13d1a1 | 57 | if (sys_mutex_new(&m_mutex) != ERR_OK) { |
smoritaemb | 0:580aba13d1a1 | 58 | #if SLW_VERBOSE |
smoritaemb | 0:580aba13d1a1 | 59 | Log::printLine("SFW:Failed to create mutex \n"); |
smoritaemb | 0:580aba13d1a1 | 60 | #endif |
smoritaemb | 0:580aba13d1a1 | 61 | return false; |
smoritaemb | 0:580aba13d1a1 | 62 | } |
smoritaemb | 0:580aba13d1a1 | 63 | |
smoritaemb | 0:580aba13d1a1 | 64 | m_attributes = attributes; |
smoritaemb | 0:580aba13d1a1 | 65 | m_packetInfo.srcPort = attributes.unicastLocator.port; |
smoritaemb | 0:580aba13d1a1 | 66 | m_topicKind = topicKind; |
smoritaemb | 0:580aba13d1a1 | 67 | mp_threadPool = threadPool; |
smoritaemb | 0:580aba13d1a1 | 68 | m_transport = &driver; |
smoritaemb | 0:580aba13d1a1 | 69 | |
smoritaemb | 0:580aba13d1a1 | 70 | m_is_initialized_ = true; |
smoritaemb | 0:580aba13d1a1 | 71 | return true; |
smoritaemb | 0:580aba13d1a1 | 72 | } |
smoritaemb | 0:580aba13d1a1 | 73 | |
smoritaemb | 0:580aba13d1a1 | 74 | template <class NetworkDriver> |
smoritaemb | 0:580aba13d1a1 | 75 | bool StatelessWriterT<NetworkDriver>::addNewMatchedReader( |
smoritaemb | 0:580aba13d1a1 | 76 | const ReaderProxy &newProxy) { |
smoritaemb | 0:580aba13d1a1 | 77 | #if SLW_VERBOSE |
smoritaemb | 0:580aba13d1a1 | 78 | printf("StatefulWriter[%s]: New reader added with id: ", |
smoritaemb | 0:580aba13d1a1 | 79 | &this->m_attributes.topicName[0]); |
smoritaemb | 0:580aba13d1a1 | 80 | printGuid(newProxy.remoteReaderGuid); |
smoritaemb | 0:580aba13d1a1 | 81 | printf("\n"); |
smoritaemb | 0:580aba13d1a1 | 82 | #endif |
smoritaemb | 0:580aba13d1a1 | 83 | return m_proxies.add(newProxy); |
smoritaemb | 0:580aba13d1a1 | 84 | } |
smoritaemb | 0:580aba13d1a1 | 85 | |
smoritaemb | 0:580aba13d1a1 | 86 | template <class NetworkDriver> |
smoritaemb | 0:580aba13d1a1 | 87 | void StatelessWriterT<NetworkDriver>::removeReader(const Guid &guid) { |
smoritaemb | 0:580aba13d1a1 | 88 | auto isElementToRemove = [&](const ReaderProxy &proxy) { |
smoritaemb | 0:580aba13d1a1 | 89 | return proxy.remoteReaderGuid == guid; |
smoritaemb | 0:580aba13d1a1 | 90 | }; |
smoritaemb | 0:580aba13d1a1 | 91 | auto thunk = [](void *arg, const ReaderProxy &value) { |
smoritaemb | 0:580aba13d1a1 | 92 | return (*static_cast<decltype(isElementToRemove) *>(arg))(value); |
smoritaemb | 0:580aba13d1a1 | 93 | }; |
smoritaemb | 0:580aba13d1a1 | 94 | |
smoritaemb | 0:580aba13d1a1 | 95 | m_proxies.remove(thunk, &isElementToRemove); |
smoritaemb | 0:580aba13d1a1 | 96 | } |
smoritaemb | 0:580aba13d1a1 | 97 | |
smoritaemb | 0:580aba13d1a1 | 98 | template <typename NetworkDriver> |
smoritaemb | 0:580aba13d1a1 | 99 | const CacheChange *StatelessWriterT<NetworkDriver>::newChange( |
smoritaemb | 0:580aba13d1a1 | 100 | rtps::ChangeKind_t kind, const uint8_t *data, DataSize_t size) { |
smoritaemb | 0:580aba13d1a1 | 101 | if (isIrrelevant(kind)) { |
smoritaemb | 0:580aba13d1a1 | 102 | return nullptr; |
smoritaemb | 0:580aba13d1a1 | 103 | } |
smoritaemb | 0:580aba13d1a1 | 104 | Lock lock(m_mutex); |
smoritaemb | 0:580aba13d1a1 | 105 | |
smoritaemb | 0:580aba13d1a1 | 106 | if (m_history.isFull()) { |
smoritaemb | 0:580aba13d1a1 | 107 | SequenceNumber_t newMin = ++SequenceNumber_t(m_history.getSeqNumMin()); |
smoritaemb | 0:580aba13d1a1 | 108 | if (m_nextSequenceNumberToSend < newMin) { |
smoritaemb | 0:580aba13d1a1 | 109 | m_nextSequenceNumberToSend = |
smoritaemb | 0:580aba13d1a1 | 110 | newMin; // Make sure we have the correct sn to send |
smoritaemb | 0:580aba13d1a1 | 111 | } |
smoritaemb | 0:580aba13d1a1 | 112 | } |
smoritaemb | 0:580aba13d1a1 | 113 | |
smoritaemb | 0:580aba13d1a1 | 114 | auto *result = m_history.addChange(data, size); |
smoritaemb | 0:580aba13d1a1 | 115 | if (mp_threadPool != nullptr) { |
smoritaemb | 0:580aba13d1a1 | 116 | mp_threadPool->addWorkload(this); |
smoritaemb | 0:580aba13d1a1 | 117 | } |
smoritaemb | 0:580aba13d1a1 | 118 | |
smoritaemb | 0:580aba13d1a1 | 119 | #if SLW_VERBOSE |
smoritaemb | 0:580aba13d1a1 | 120 | printf("StatelessWriter[%s]: Adding new data.\n", |
smoritaemb | 0:580aba13d1a1 | 121 | this->m_attributes.topicName); |
smoritaemb | 0:580aba13d1a1 | 122 | #endif |
smoritaemb | 0:580aba13d1a1 | 123 | return result; |
smoritaemb | 0:580aba13d1a1 | 124 | } |
smoritaemb | 0:580aba13d1a1 | 125 | |
smoritaemb | 0:580aba13d1a1 | 126 | template <typename NetworkDriver> |
smoritaemb | 0:580aba13d1a1 | 127 | void StatelessWriterT<NetworkDriver>::setAllChangesToUnsent() { |
smoritaemb | 0:580aba13d1a1 | 128 | Lock lock(m_mutex); |
smoritaemb | 0:580aba13d1a1 | 129 | |
smoritaemb | 0:580aba13d1a1 | 130 | m_nextSequenceNumberToSend = m_history.getSeqNumMin(); |
smoritaemb | 0:580aba13d1a1 | 131 | |
smoritaemb | 0:580aba13d1a1 | 132 | if (mp_threadPool != nullptr) { |
smoritaemb | 0:580aba13d1a1 | 133 | mp_threadPool->addWorkload(this); |
smoritaemb | 0:580aba13d1a1 | 134 | } |
smoritaemb | 0:580aba13d1a1 | 135 | } |
smoritaemb | 0:580aba13d1a1 | 136 | |
smoritaemb | 0:580aba13d1a1 | 137 | template <typename NetworkDriver> |
smoritaemb | 0:580aba13d1a1 | 138 | void StatelessWriterT<NetworkDriver>::onNewAckNack( |
smoritaemb | 0:580aba13d1a1 | 139 | const SubmessageAckNack & /*msg*/, const GuidPrefix_t &sourceGuidPrefix) { |
smoritaemb | 0:580aba13d1a1 | 140 | // Too lazy to respond |
smoritaemb | 0:580aba13d1a1 | 141 | } |
smoritaemb | 0:580aba13d1a1 | 142 | |
smoritaemb | 0:580aba13d1a1 | 143 | template <typename NetworkDriver> |
smoritaemb | 0:580aba13d1a1 | 144 | bool StatelessWriterT<NetworkDriver>::isIrrelevant(ChangeKind_t kind) const { |
smoritaemb | 0:580aba13d1a1 | 145 | // Right now we only allow alive changes |
smoritaemb | 0:580aba13d1a1 | 146 | // return kind == ChangeKind_t::INVALID || (m_topicKind == TopicKind_t::NO_KEY |
smoritaemb | 0:580aba13d1a1 | 147 | // && kind != ChangeKind_t::ALIVE); |
smoritaemb | 0:580aba13d1a1 | 148 | return kind != ChangeKind_t::ALIVE; |
smoritaemb | 0:580aba13d1a1 | 149 | } |
smoritaemb | 0:580aba13d1a1 | 150 | |
smoritaemb | 0:580aba13d1a1 | 151 | template <typename NetworkDriver> |
smoritaemb | 0:580aba13d1a1 | 152 | void StatelessWriterT<NetworkDriver>::progress() { |
smoritaemb | 0:580aba13d1a1 | 153 | // TODO smarter packaging e.g. by creating MessageStruct and serialize after |
smoritaemb | 0:580aba13d1a1 | 154 | // adjusting values Reusing the pbuf is not possible. See |
smoritaemb | 0:580aba13d1a1 | 155 | // https://www.nongnu.org/lwip/2_1_x/raw_api.html (Zero-Copy MACs) |
smoritaemb | 0:580aba13d1a1 | 156 | |
smoritaemb | 0:580aba13d1a1 | 157 | for (const auto &proxy : m_proxies) { |
smoritaemb | 0:580aba13d1a1 | 158 | |
smoritaemb | 0:580aba13d1a1 | 159 | #if SLW_VERBOSE |
smoritaemb | 0:580aba13d1a1 | 160 | printf("StatelessWriter[%s]: Progess.\n", this->m_attributes.topicName); |
smoritaemb | 0:580aba13d1a1 | 161 | #endif |
smoritaemb | 0:580aba13d1a1 | 162 | |
smoritaemb | 0:580aba13d1a1 | 163 | PacketInfo info; |
smoritaemb | 0:580aba13d1a1 | 164 | info.srcPort = m_packetInfo.srcPort; |
smoritaemb | 0:580aba13d1a1 | 165 | |
smoritaemb | 0:580aba13d1a1 | 166 | MessageFactory::addHeader(info.buffer, m_attributes.endpointGuid.prefix); |
smoritaemb | 0:580aba13d1a1 | 167 | //MessageFactory::addSubMessageTimeStamp(info.buffer); |
smoritaemb | 0:580aba13d1a1 | 168 | |
smoritaemb | 0:580aba13d1a1 | 169 | { |
smoritaemb | 0:580aba13d1a1 | 170 | Lock lock(m_mutex); |
smoritaemb | 0:580aba13d1a1 | 171 | const CacheChange *next = |
smoritaemb | 0:580aba13d1a1 | 172 | m_history.getChangeBySN(m_nextSequenceNumberToSend); |
smoritaemb | 0:580aba13d1a1 | 173 | if (next == nullptr) { |
smoritaemb | 0:580aba13d1a1 | 174 | #if SLW_VERBOSE |
smoritaemb | 0:580aba13d1a1 | 175 | printf("StatelessWriter[%s]: Couldn't get a new CacheChange with SN " |
smoritaemb | 0:580aba13d1a1 | 176 | "(%i,%i)\n", |
smoritaemb | 0:580aba13d1a1 | 177 | &m_attributes.topicName[0], m_nextSequenceNumberToSend.high, |
smoritaemb | 0:580aba13d1a1 | 178 | m_nextSequenceNumberToSend.low); |
smoritaemb | 0:580aba13d1a1 | 179 | #endif |
smoritaemb | 0:580aba13d1a1 | 180 | return; |
smoritaemb | 0:580aba13d1a1 | 181 | } else { |
smoritaemb | 0:580aba13d1a1 | 182 | #if SLW_VERBOSE |
smoritaemb | 0:580aba13d1a1 | 183 | printf("StatelessWriter[%s]: Sending change with SN (%i,%i)\n", |
smoritaemb | 0:580aba13d1a1 | 184 | &m_attributes.topicName[0], m_nextSequenceNumberToSend.high, |
smoritaemb | 0:580aba13d1a1 | 185 | m_nextSequenceNumberToSend.low); |
smoritaemb | 0:580aba13d1a1 | 186 | #endif |
smoritaemb | 0:580aba13d1a1 | 187 | } |
smoritaemb | 0:580aba13d1a1 | 188 | //TODO: these should be called only when the message data is published. |
smoritaemb | 0:580aba13d1a1 | 189 | char hoge[2]; |
smoritaemb | 0:580aba13d1a1 | 190 | pbuf_copy_partial(next->data.firstElement, &hoge, 2,0); |
smoritaemb | 0:580aba13d1a1 | 191 | if(hoge[0] != 0 || hoge[1] != 3) { |
smoritaemb | 0:580aba13d1a1 | 192 | MessageFactory::addSubMessageDestination(info.buffer); |
smoritaemb | 0:580aba13d1a1 | 193 | //next->data = next->data[1]; |
smoritaemb | 0:580aba13d1a1 | 194 | //next->size - next->size - 1; |
smoritaemb | 0:580aba13d1a1 | 195 | } |
smoritaemb | 0:580aba13d1a1 | 196 | MessageFactory::addSubMessageTimeStamp(info.buffer); |
smoritaemb | 0:580aba13d1a1 | 197 | MessageFactory::addSubMessageData( |
smoritaemb | 0:580aba13d1a1 | 198 | info.buffer, next->data, false, next->sequenceNumber, |
smoritaemb | 0:580aba13d1a1 | 199 | m_attributes.endpointGuid.entityId, |
smoritaemb | 0:580aba13d1a1 | 200 | proxy.remoteReaderGuid.entityId); // TODO |
smoritaemb | 0:580aba13d1a1 | 201 | } |
smoritaemb | 0:580aba13d1a1 | 202 | |
smoritaemb | 0:580aba13d1a1 | 203 | // Just usable for IPv4 |
smoritaemb | 0:580aba13d1a1 | 204 | const Locator &locator = proxy.remoteLocator; |
smoritaemb | 0:580aba13d1a1 | 205 | |
smoritaemb | 0:580aba13d1a1 | 206 | info.destAddr = locator.getIp4Address(); |
smoritaemb | 0:580aba13d1a1 | 207 | info.destPort = (Ip4Port_t)locator.port; |
smoritaemb | 0:580aba13d1a1 | 208 | |
smoritaemb | 0:580aba13d1a1 | 209 | m_transport->sendPacket(info); |
smoritaemb | 0:580aba13d1a1 | 210 | } |
smoritaemb | 0:580aba13d1a1 | 211 | |
smoritaemb | 0:580aba13d1a1 | 212 | ++m_nextSequenceNumberToSend; |
smoritaemb | 0:580aba13d1a1 | 213 | } |