Porting mros2 as an Mbed library.
Dependents: mbed-os-example-mros2 example-mbed-mros2-sub-pose example-mbed-mros2-pub-twist example-mbed-mros2-mturtle-teleop
embeddedRTPS/include/rtps/entities/StatefulWriter.tpp@7:c80f65422d99, 2022-03-19 (annotated)
- Committer:
- smoritaemb
- Date:
- Sat Mar 19 09:23:37 2022 +0900
- Revision:
- 7:c80f65422d99
- Parent:
- 0:580aba13d1a1
Merge test_assortment_of_msgs branch.
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
smoritaemb | 0:580aba13d1a1 | 1 | /* |
smoritaemb | 0:580aba13d1a1 | 2 | The MIT License |
smoritaemb | 0:580aba13d1a1 | 3 | Copyright (c) 2019 Lehrstuhl Informatik 11 - RWTH Aachen University |
smoritaemb | 0:580aba13d1a1 | 4 | Permission is hereby granted, free of charge, to any person obtaining a copy |
smoritaemb | 0:580aba13d1a1 | 5 | of this software and associated documentation files (the "Software"), to deal |
smoritaemb | 0:580aba13d1a1 | 6 | in the Software without restriction, including without limitation the rights |
smoritaemb | 0:580aba13d1a1 | 7 | to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
smoritaemb | 0:580aba13d1a1 | 8 | copies of the Software, and to permit persons to whom the Software is |
smoritaemb | 0:580aba13d1a1 | 9 | furnished to do so, subject to the following conditions: |
smoritaemb | 0:580aba13d1a1 | 10 | The above copyright notice and this permission notice shall be included in |
smoritaemb | 0:580aba13d1a1 | 11 | all copies or substantial portions of the Software. |
smoritaemb | 0:580aba13d1a1 | 12 | THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
smoritaemb | 0:580aba13d1a1 | 13 | IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
smoritaemb | 0:580aba13d1a1 | 14 | FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
smoritaemb | 0:580aba13d1a1 | 15 | AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
smoritaemb | 0:580aba13d1a1 | 16 | LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
smoritaemb | 0:580aba13d1a1 | 17 | OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
smoritaemb | 0:580aba13d1a1 | 18 | THE SOFTWARE |
smoritaemb | 0:580aba13d1a1 | 19 | |
smoritaemb | 0:580aba13d1a1 | 20 | This file is part of embeddedRTPS. |
smoritaemb | 0:580aba13d1a1 | 21 | |
smoritaemb | 0:580aba13d1a1 | 22 | Author: i11 - Embedded Software, RWTH Aachen University |
smoritaemb | 0:580aba13d1a1 | 23 | */ |
smoritaemb | 0:580aba13d1a1 | 24 | |
smoritaemb | 0:580aba13d1a1 | 25 | #include "rtps/messages/MessageFactory.h" |
smoritaemb | 0:580aba13d1a1 | 26 | #include <cstring> |
smoritaemb | 0:580aba13d1a1 | 27 | #include <stdio.h> |
smoritaemb | 0:580aba13d1a1 | 28 | |
smoritaemb | 0:580aba13d1a1 | 29 | using rtps::StatefulWriterT; |
smoritaemb | 0:580aba13d1a1 | 30 | |
smoritaemb | 0:580aba13d1a1 | 31 | /* |
smoritaemb | 0:580aba13d1a1 | 32 | static int line_cnt_ = 0; |
smoritaemb | 0:580aba13d1a1 | 33 | static char tft_buffer_[150]; |
smoritaemb | 0:580aba13d1a1 | 34 | |
smoritaemb | 0:580aba13d1a1 | 35 | #define log(args...) if(true){ \ |
smoritaemb | 0:580aba13d1a1 | 36 | Lock lock{m_mutex}; \ |
smoritaemb | 0:580aba13d1a1 | 37 | snprintf(tft_buffer_, sizeof(tft_buffer_), args); \ |
smoritaemb | 0:580aba13d1a1 | 38 | TFT_PrintLine(line_cnt_, tft_buffer_); \ |
smoritaemb | 0:580aba13d1a1 | 39 | line_cnt_ = (line_cnt_+1)%5; \ |
smoritaemb | 0:580aba13d1a1 | 40 | } |
smoritaemb | 0:580aba13d1a1 | 41 | */ |
smoritaemb | 0:580aba13d1a1 | 42 | #define SFW_VERBOSE 0 |
smoritaemb | 0:580aba13d1a1 | 43 | |
smoritaemb | 0:580aba13d1a1 | 44 | #if SFW_VERBOSE |
smoritaemb | 0:580aba13d1a1 | 45 | #include "rtps/utils/printutils.h" |
smoritaemb | 0:580aba13d1a1 | 46 | #endif |
smoritaemb | 0:580aba13d1a1 | 47 | |
smoritaemb | 0:580aba13d1a1 | 48 | template <class NetworkDriver> |
smoritaemb | 0:580aba13d1a1 | 49 | StatefulWriterT<NetworkDriver>::~StatefulWriterT() { |
smoritaemb | 0:580aba13d1a1 | 50 | m_running = false; |
smoritaemb | 0:580aba13d1a1 | 51 | sys_msleep(10); // Required for tests/ Join currently not available |
smoritaemb | 0:580aba13d1a1 | 52 | // if(sys_mutex_valid(&m_mutex)){ |
smoritaemb | 0:580aba13d1a1 | 53 | sys_mutex_free(&m_mutex); |
smoritaemb | 0:580aba13d1a1 | 54 | //} |
smoritaemb | 0:580aba13d1a1 | 55 | } |
smoritaemb | 0:580aba13d1a1 | 56 | |
smoritaemb | 0:580aba13d1a1 | 57 | template <class NetworkDriver> |
smoritaemb | 0:580aba13d1a1 | 58 | bool StatefulWriterT<NetworkDriver>::init(TopicData attributes, |
smoritaemb | 0:580aba13d1a1 | 59 | TopicKind_t topicKind, |
smoritaemb | 0:580aba13d1a1 | 60 | ThreadPool * /*threadPool*/, |
smoritaemb | 0:580aba13d1a1 | 61 | NetworkDriver &driver) { |
smoritaemb | 0:580aba13d1a1 | 62 | if (sys_mutex_new(&m_mutex) != ERR_OK) { |
smoritaemb | 0:580aba13d1a1 | 63 | #if SFW_VERBOSE |
smoritaemb | 0:580aba13d1a1 | 64 | log("StatefulWriter: Failed to create mutex.\n"); |
smoritaemb | 0:580aba13d1a1 | 65 | #endif |
smoritaemb | 0:580aba13d1a1 | 66 | return false; |
smoritaemb | 0:580aba13d1a1 | 67 | } |
smoritaemb | 0:580aba13d1a1 | 68 | |
smoritaemb | 0:580aba13d1a1 | 69 | m_transport = &driver; |
smoritaemb | 0:580aba13d1a1 | 70 | m_attributes = attributes; |
smoritaemb | 0:580aba13d1a1 | 71 | m_topicKind = topicKind; |
smoritaemb | 0:580aba13d1a1 | 72 | m_packetInfo.srcPort = attributes.unicastLocator.port; |
smoritaemb | 0:580aba13d1a1 | 73 | if (m_attributes.endpointGuid.entityId == |
smoritaemb | 0:580aba13d1a1 | 74 | ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER) { |
smoritaemb | 0:580aba13d1a1 | 75 | #ifdef MROS2_USE_EMBEDDEDRTPS |
smoritaemb | 0:580aba13d1a1 | 76 | m_heartbeatThread = sys_thread_new("HBThreadPub", callHbPubFunc, this, |
smoritaemb | 0:580aba13d1a1 | 77 | Config::HEARTBEAT_STACKSIZE, |
smoritaemb | 0:580aba13d1a1 | 78 | Config::THREAD_POOL_WRITER_PRIO); |
smoritaemb | 0:580aba13d1a1 | 79 | networkPubDriverPtr = this; |
smoritaemb | 0:580aba13d1a1 | 80 | hbPubFuncPtr = hbFunctionJumppad; |
smoritaemb | 0:580aba13d1a1 | 81 | #else |
smoritaemb | 0:580aba13d1a1 | 82 | m_heartbeatThread = sys_thread_new("HBThreadPub", hbFunctionJumppad, this, |
smoritaemb | 0:580aba13d1a1 | 83 | Config::HEARTBEAT_STACKSIZE, |
smoritaemb | 0:580aba13d1a1 | 84 | Config::THREAD_POOL_WRITER_PRIO); |
smoritaemb | 0:580aba13d1a1 | 85 | #endif |
smoritaemb | 0:580aba13d1a1 | 86 | } else if (m_attributes.endpointGuid.entityId == |
smoritaemb | 0:580aba13d1a1 | 87 | ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER) { |
smoritaemb | 0:580aba13d1a1 | 88 | #ifdef MROS2_USE_EMBEDDEDRTPS |
smoritaemb | 0:580aba13d1a1 | 89 | m_heartbeatThread = sys_thread_new("HBThreadSub", callHbSubFunc, this, |
smoritaemb | 0:580aba13d1a1 | 90 | Config::HEARTBEAT_STACKSIZE, |
smoritaemb | 0:580aba13d1a1 | 91 | Config::THREAD_POOL_WRITER_PRIO); |
smoritaemb | 0:580aba13d1a1 | 92 | networkSubDriverPtr = this; |
smoritaemb | 0:580aba13d1a1 | 93 | hbSubFuncPtr = hbFunctionJumppad; |
smoritaemb | 0:580aba13d1a1 | 94 | #else |
smoritaemb | 0:580aba13d1a1 | 95 | m_heartbeatThread = sys_thread_new("HBThreadSub", hbFunctionJumppad, this, |
smoritaemb | 0:580aba13d1a1 | 96 | Config::HEARTBEAT_STACKSIZE, |
smoritaemb | 0:580aba13d1a1 | 97 | Config::THREAD_POOL_WRITER_PRIO); |
smoritaemb | 0:580aba13d1a1 | 98 | #endif |
smoritaemb | 0:580aba13d1a1 | 99 | } else { |
smoritaemb | 0:580aba13d1a1 | 100 | m_heartbeatThread = sys_thread_new("HBThread", hbFunctionJumppad, this, |
smoritaemb | 0:580aba13d1a1 | 101 | Config::HEARTBEAT_STACKSIZE, |
smoritaemb | 0:580aba13d1a1 | 102 | Config::THREAD_POOL_WRITER_PRIO); |
smoritaemb | 0:580aba13d1a1 | 103 | } |
smoritaemb | 0:580aba13d1a1 | 104 | /* |
smoritaemb | 0:580aba13d1a1 | 105 | if(hbFuncPointer == NULL) |
smoritaemb | 0:580aba13d1a1 | 106 | { |
smoritaemb | 0:580aba13d1a1 | 107 | hbFuncPointer = hbFunctionJumppad; |
smoritaemb | 0:580aba13d1a1 | 108 | }*/ |
smoritaemb | 0:580aba13d1a1 | 109 | m_is_initialized_ = true; |
smoritaemb | 0:580aba13d1a1 | 110 | return true; |
smoritaemb | 0:580aba13d1a1 | 111 | } |
smoritaemb | 0:580aba13d1a1 | 112 | |
smoritaemb | 0:580aba13d1a1 | 113 | template <class NetworkDriver> |
smoritaemb | 0:580aba13d1a1 | 114 | bool StatefulWriterT<NetworkDriver>::addNewMatchedReader( |
smoritaemb | 0:580aba13d1a1 | 115 | const ReaderProxy &newProxy) { |
smoritaemb | 0:580aba13d1a1 | 116 | #if SFW_VERBOSE |
smoritaemb | 0:580aba13d1a1 | 117 | log("StatefulWriter[%s]: New reader added with id: ", |
smoritaemb | 0:580aba13d1a1 | 118 | &this->m_attributes.topicName[0]); |
smoritaemb | 0:580aba13d1a1 | 119 | printGuid(newProxy.remoteReaderGuid); |
smoritaemb | 0:580aba13d1a1 | 120 | log("\n"); |
smoritaemb | 0:580aba13d1a1 | 121 | #endif |
smoritaemb | 0:580aba13d1a1 | 122 | return m_proxies.add(newProxy); |
smoritaemb | 0:580aba13d1a1 | 123 | } |
smoritaemb | 0:580aba13d1a1 | 124 | |
smoritaemb | 0:580aba13d1a1 | 125 | template <class NetworkDriver> |
smoritaemb | 0:580aba13d1a1 | 126 | void StatefulWriterT<NetworkDriver>::removeReader(const Guid &guid) { |
smoritaemb | 0:580aba13d1a1 | 127 | auto isElementToRemove = [&](const ReaderProxy &proxy) { |
smoritaemb | 0:580aba13d1a1 | 128 | return proxy.remoteReaderGuid == guid; |
smoritaemb | 0:580aba13d1a1 | 129 | }; |
smoritaemb | 0:580aba13d1a1 | 130 | auto thunk = [](void *arg, const ReaderProxy &value) { |
smoritaemb | 0:580aba13d1a1 | 131 | return (*static_cast<decltype(isElementToRemove) *>(arg))(value); |
smoritaemb | 0:580aba13d1a1 | 132 | }; |
smoritaemb | 0:580aba13d1a1 | 133 | |
smoritaemb | 0:580aba13d1a1 | 134 | m_proxies.remove(thunk, &isElementToRemove); |
smoritaemb | 0:580aba13d1a1 | 135 | } |
smoritaemb | 0:580aba13d1a1 | 136 | |
smoritaemb | 0:580aba13d1a1 | 137 | template <class NetworkDriver> |
smoritaemb | 0:580aba13d1a1 | 138 | const rtps::CacheChange *StatefulWriterT<NetworkDriver>::newChange( |
smoritaemb | 0:580aba13d1a1 | 139 | ChangeKind_t kind, const uint8_t *data, DataSize_t size) { |
smoritaemb | 0:580aba13d1a1 | 140 | if (isIrrelevant(kind)) { |
smoritaemb | 0:580aba13d1a1 | 141 | return nullptr; |
smoritaemb | 0:580aba13d1a1 | 142 | } |
smoritaemb | 0:580aba13d1a1 | 143 | |
smoritaemb | 0:580aba13d1a1 | 144 | Lock lock{m_mutex}; |
smoritaemb | 0:580aba13d1a1 | 145 | |
smoritaemb | 0:580aba13d1a1 | 146 | if (m_history.isFull()) { |
smoritaemb | 0:580aba13d1a1 | 147 | // Right now we drop elements anyway because we cannot detect non-responding |
smoritaemb | 0:580aba13d1a1 | 148 | // readers yet. return nullptr; |
smoritaemb | 0:580aba13d1a1 | 149 | SequenceNumber_t newMin = ++SequenceNumber_t(m_history.getSeqNumMin()); |
smoritaemb | 0:580aba13d1a1 | 150 | if (m_nextSequenceNumberToSend < newMin) { |
smoritaemb | 0:580aba13d1a1 | 151 | m_nextSequenceNumberToSend = |
smoritaemb | 0:580aba13d1a1 | 152 | newMin; // Make sure we have the correct sn to send |
smoritaemb | 0:580aba13d1a1 | 153 | } |
smoritaemb | 0:580aba13d1a1 | 154 | } |
smoritaemb | 0:580aba13d1a1 | 155 | |
smoritaemb | 0:580aba13d1a1 | 156 | auto *result = m_history.addChange(data, size); |
smoritaemb | 0:580aba13d1a1 | 157 | if (mp_threadPool != nullptr) { |
smoritaemb | 0:580aba13d1a1 | 158 | mp_threadPool->addWorkload(this); |
smoritaemb | 0:580aba13d1a1 | 159 | } |
smoritaemb | 0:580aba13d1a1 | 160 | |
smoritaemb | 0:580aba13d1a1 | 161 | #if SFW_VERBOSE |
smoritaemb | 0:580aba13d1a1 | 162 | log("StatefulWriter[%s]: Adding new data.\n", this->m_attributes.topicName); |
smoritaemb | 0:580aba13d1a1 | 163 | #endif |
smoritaemb | 0:580aba13d1a1 | 164 | return result; |
smoritaemb | 0:580aba13d1a1 | 165 | } |
smoritaemb | 0:580aba13d1a1 | 166 | |
smoritaemb | 0:580aba13d1a1 | 167 | template <class NetworkDriver> void StatefulWriterT<NetworkDriver>::progress() { |
smoritaemb | 0:580aba13d1a1 | 168 | for (const auto &proxy : m_proxies) { |
smoritaemb | 0:580aba13d1a1 | 169 | if (!sendData(proxy, m_nextSequenceNumberToSend)) { |
smoritaemb | 0:580aba13d1a1 | 170 | continue; |
smoritaemb | 0:580aba13d1a1 | 171 | } |
smoritaemb | 0:580aba13d1a1 | 172 | } |
smoritaemb | 0:580aba13d1a1 | 173 | ++m_nextSequenceNumberToSend; |
smoritaemb | 0:580aba13d1a1 | 174 | } |
smoritaemb | 0:580aba13d1a1 | 175 | |
smoritaemb | 0:580aba13d1a1 | 176 | template <typename NetworkDriver> |
smoritaemb | 0:580aba13d1a1 | 177 | bool StatefulWriterT<NetworkDriver>::isIrrelevant(ChangeKind_t kind) const { |
smoritaemb | 0:580aba13d1a1 | 178 | // Right now we only allow alive changes |
smoritaemb | 0:580aba13d1a1 | 179 | // return kind == ChangeKind_t::INVALID || (m_topicKind == TopicKind_t::NO_KEY |
smoritaemb | 0:580aba13d1a1 | 180 | // && kind != ChangeKind_t::ALIVE); |
smoritaemb | 0:580aba13d1a1 | 181 | return kind != ChangeKind_t::ALIVE; |
smoritaemb | 0:580aba13d1a1 | 182 | } |
smoritaemb | 0:580aba13d1a1 | 183 | |
smoritaemb | 0:580aba13d1a1 | 184 | template <class NetworkDriver> |
smoritaemb | 0:580aba13d1a1 | 185 | void StatefulWriterT<NetworkDriver>::setAllChangesToUnsent() { |
smoritaemb | 0:580aba13d1a1 | 186 | Lock lock(m_mutex); |
smoritaemb | 0:580aba13d1a1 | 187 | |
smoritaemb | 0:580aba13d1a1 | 188 | m_nextSequenceNumberToSend = m_history.getSeqNumMin(); |
smoritaemb | 0:580aba13d1a1 | 189 | |
smoritaemb | 0:580aba13d1a1 | 190 | if (mp_threadPool != nullptr) { |
smoritaemb | 0:580aba13d1a1 | 191 | mp_threadPool->addWorkload(this); |
smoritaemb | 0:580aba13d1a1 | 192 | } |
smoritaemb | 0:580aba13d1a1 | 193 | } |
smoritaemb | 0:580aba13d1a1 | 194 | |
smoritaemb | 0:580aba13d1a1 | 195 | template <class NetworkDriver> |
smoritaemb | 0:580aba13d1a1 | 196 | void StatefulWriterT<NetworkDriver>::onNewAckNack( |
smoritaemb | 0:580aba13d1a1 | 197 | const SubmessageAckNack &msg, const GuidPrefix_t &sourceGuidPrefix) { |
smoritaemb | 0:580aba13d1a1 | 198 | // Lock lock{m_mutex}; |
smoritaemb | 0:580aba13d1a1 | 199 | // Search for reader |
smoritaemb | 0:580aba13d1a1 | 200 | ReaderProxy *reader = nullptr; |
smoritaemb | 0:580aba13d1a1 | 201 | for (auto &proxy : m_proxies) { |
smoritaemb | 0:580aba13d1a1 | 202 | if (proxy.remoteReaderGuid.prefix == sourceGuidPrefix && |
smoritaemb | 0:580aba13d1a1 | 203 | proxy.remoteReaderGuid.entityId == msg.readerId) { |
smoritaemb | 0:580aba13d1a1 | 204 | reader = &proxy; |
smoritaemb | 0:580aba13d1a1 | 205 | break; |
smoritaemb | 0:580aba13d1a1 | 206 | } |
smoritaemb | 0:580aba13d1a1 | 207 | } |
smoritaemb | 0:580aba13d1a1 | 208 | |
smoritaemb | 0:580aba13d1a1 | 209 | if (reader == nullptr) { |
smoritaemb | 0:580aba13d1a1 | 210 | #if SFW_VERBOSE |
smoritaemb | 0:580aba13d1a1 | 211 | log("StatefulWriter[%s]: No proxy found with id: ", |
smoritaemb | 0:580aba13d1a1 | 212 | &this->m_attributes.topicName[0]); |
smoritaemb | 0:580aba13d1a1 | 213 | printEntityId(msg.readerId); |
smoritaemb | 0:580aba13d1a1 | 214 | log(" Dropping acknack.\n"); |
smoritaemb | 0:580aba13d1a1 | 215 | #endif |
smoritaemb | 0:580aba13d1a1 | 216 | return; |
smoritaemb | 0:580aba13d1a1 | 217 | } |
smoritaemb | 0:580aba13d1a1 | 218 | |
smoritaemb | 0:580aba13d1a1 | 219 | uint8_t hash = 0; |
smoritaemb | 0:580aba13d1a1 | 220 | for (int i = 0; i < sourceGuidPrefix.id.size(); i++) { |
smoritaemb | 0:580aba13d1a1 | 221 | hash += sourceGuidPrefix.id.at(i); |
smoritaemb | 0:580aba13d1a1 | 222 | } |
smoritaemb | 0:580aba13d1a1 | 223 | |
smoritaemb | 0:580aba13d1a1 | 224 | char bfr[20]; |
smoritaemb | 0:580aba13d1a1 | 225 | size_t size = snprintf(bfr, sizeof(bfr), "%u <= %u", msg.count.value, |
smoritaemb | 0:580aba13d1a1 | 226 | reader->ackNackCount.value); |
smoritaemb | 0:580aba13d1a1 | 227 | if (!(size < sizeof(bfr))) { |
smoritaemb | 0:580aba13d1a1 | 228 | while (1) |
smoritaemb | 0:580aba13d1a1 | 229 | ; |
smoritaemb | 0:580aba13d1a1 | 230 | } |
smoritaemb | 0:580aba13d1a1 | 231 | |
smoritaemb | 0:580aba13d1a1 | 232 | if (msg.count.value <= reader->ackNackCount.value) { |
smoritaemb | 0:580aba13d1a1 | 233 | #if SFW_VERBOSE |
smoritaemb | 0:580aba13d1a1 | 234 | log("StatefulWriter[%s]: Count too small. Dropping acknack.\n", |
smoritaemb | 0:580aba13d1a1 | 235 | &this->m_attributes.topicName[0]); |
smoritaemb | 0:580aba13d1a1 | 236 | #endif |
smoritaemb | 0:580aba13d1a1 | 237 | return; |
smoritaemb | 0:580aba13d1a1 | 238 | } |
smoritaemb | 0:580aba13d1a1 | 239 | |
smoritaemb | 0:580aba13d1a1 | 240 | reader->ackNackCount = msg.count; |
smoritaemb | 0:580aba13d1a1 | 241 | |
smoritaemb | 0:580aba13d1a1 | 242 | // Send missing packets |
smoritaemb | 0:580aba13d1a1 | 243 | SequenceNumber_t nextSN = msg.readerSNState.base; |
smoritaemb | 0:580aba13d1a1 | 244 | #if SFW_VERBOSE |
smoritaemb | 0:580aba13d1a1 | 245 | if (nextSN.low == 0 && nextSN.high == 0) { |
smoritaemb | 0:580aba13d1a1 | 246 | log("StatefulWriter[%s]: Received preemptive acknack. Ignored.\n", |
smoritaemb | 0:580aba13d1a1 | 247 | &this->m_attributes.topicName[0]); |
smoritaemb | 0:580aba13d1a1 | 248 | } else { |
smoritaemb | 0:580aba13d1a1 | 249 | log("StatefulWriter[%s]: Received non-preemptive acknack.\n", |
smoritaemb | 0:580aba13d1a1 | 250 | &this->m_attributes.topicName[0]); |
smoritaemb | 0:580aba13d1a1 | 251 | } |
smoritaemb | 0:580aba13d1a1 | 252 | #endif |
smoritaemb | 0:580aba13d1a1 | 253 | for (uint32_t i = 0; i < msg.readerSNState.numBits; ++i, ++nextSN) { |
smoritaemb | 0:580aba13d1a1 | 254 | if (msg.readerSNState.isSet(i)) { |
smoritaemb | 0:580aba13d1a1 | 255 | #if SFW_VERBOSE |
smoritaemb | 0:580aba13d1a1 | 256 | log("StatefulWriter[%s]: Send Packet on acknack.\n", |
smoritaemb | 0:580aba13d1a1 | 257 | this->m_attributes.topicName); |
smoritaemb | 0:580aba13d1a1 | 258 | #endif |
smoritaemb | 0:580aba13d1a1 | 259 | sendData(*reader, nextSN); |
smoritaemb | 0:580aba13d1a1 | 260 | } |
smoritaemb | 0:580aba13d1a1 | 261 | } |
smoritaemb | 0:580aba13d1a1 | 262 | // Check for sequence numbers after defined range |
smoritaemb | 0:580aba13d1a1 | 263 | SequenceNumber_t maxSN; |
smoritaemb | 0:580aba13d1a1 | 264 | { |
smoritaemb | 0:580aba13d1a1 | 265 | Lock lock(m_mutex); |
smoritaemb | 0:580aba13d1a1 | 266 | maxSN = m_history.getSeqNumMax(); |
smoritaemb | 0:580aba13d1a1 | 267 | } |
smoritaemb | 0:580aba13d1a1 | 268 | while (nextSN <= maxSN) { |
smoritaemb | 0:580aba13d1a1 | 269 | sendData(*reader, nextSN); |
smoritaemb | 0:580aba13d1a1 | 270 | ++nextSN; |
smoritaemb | 0:580aba13d1a1 | 271 | } |
smoritaemb | 0:580aba13d1a1 | 272 | } |
smoritaemb | 0:580aba13d1a1 | 273 | |
smoritaemb | 0:580aba13d1a1 | 274 | template <class NetworkDriver> |
smoritaemb | 0:580aba13d1a1 | 275 | bool StatefulWriterT<NetworkDriver>::sendData( |
smoritaemb | 0:580aba13d1a1 | 276 | const ReaderProxy &reader, const SequenceNumber_t &snMissing) { |
smoritaemb | 0:580aba13d1a1 | 277 | |
smoritaemb | 0:580aba13d1a1 | 278 | // TODO smarter packaging e.g. by creating MessageStruct and serialize after |
smoritaemb | 0:580aba13d1a1 | 279 | // adjusting values Reusing the pbuf is not possible. See |
smoritaemb | 0:580aba13d1a1 | 280 | // https://www.nongnu.org/lwip/2_0_x/raw_api.html (Zero-Copy MACs) |
smoritaemb | 0:580aba13d1a1 | 281 | |
smoritaemb | 0:580aba13d1a1 | 282 | PacketInfo info; |
smoritaemb | 0:580aba13d1a1 | 283 | info.srcPort = m_packetInfo.srcPort; |
smoritaemb | 0:580aba13d1a1 | 284 | |
smoritaemb | 0:580aba13d1a1 | 285 | MessageFactory::addHeader(info.buffer, m_attributes.endpointGuid.prefix); |
smoritaemb | 0:580aba13d1a1 | 286 | MessageFactory::addSubMessageDestination(info.buffer); |
smoritaemb | 0:580aba13d1a1 | 287 | MessageFactory::addSubMessageTimeStamp(info.buffer); |
smoritaemb | 0:580aba13d1a1 | 288 | |
smoritaemb | 0:580aba13d1a1 | 289 | // Just usable for IPv4 |
smoritaemb | 0:580aba13d1a1 | 290 | const Locator &locator = reader.remoteLocator; |
smoritaemb | 0:580aba13d1a1 | 291 | |
smoritaemb | 0:580aba13d1a1 | 292 | info.destAddr = locator.getIp4Address(); |
smoritaemb | 0:580aba13d1a1 | 293 | info.destPort = (Ip4Port_t)locator.port; |
smoritaemb | 0:580aba13d1a1 | 294 | |
smoritaemb | 0:580aba13d1a1 | 295 | { |
smoritaemb | 0:580aba13d1a1 | 296 | Lock lock(m_mutex); |
smoritaemb | 0:580aba13d1a1 | 297 | const CacheChange *next = m_history.getChangeBySN(snMissing); |
smoritaemb | 0:580aba13d1a1 | 298 | if (next == nullptr) { |
smoritaemb | 0:580aba13d1a1 | 299 | #if SFW_VERBOSE |
smoritaemb | 0:580aba13d1a1 | 300 | log("StatefulWriter[%s]: Couldn't get a CacheChange with SN (%i,%u)\n", |
smoritaemb | 0:580aba13d1a1 | 301 | &this->m_attributes.topicName[0], snMissing.high, snMissing.low); |
smoritaemb | 0:580aba13d1a1 | 302 | #endif |
smoritaemb | 0:580aba13d1a1 | 303 | return false; |
smoritaemb | 0:580aba13d1a1 | 304 | } |
smoritaemb | 0:580aba13d1a1 | 305 | MessageFactory::addSubMessageData( |
smoritaemb | 0:580aba13d1a1 | 306 | info.buffer, next->data, false, next->sequenceNumber, |
smoritaemb | 0:580aba13d1a1 | 307 | m_attributes.endpointGuid.entityId, reader.remoteReaderGuid.entityId); |
smoritaemb | 0:580aba13d1a1 | 308 | } |
smoritaemb | 0:580aba13d1a1 | 309 | |
smoritaemb | 0:580aba13d1a1 | 310 | m_transport->sendPacket(info); |
smoritaemb | 0:580aba13d1a1 | 311 | return true; |
smoritaemb | 0:580aba13d1a1 | 312 | } |
smoritaemb | 0:580aba13d1a1 | 313 | |
smoritaemb | 0:580aba13d1a1 | 314 | template <class NetworkDriver> |
smoritaemb | 0:580aba13d1a1 | 315 | void StatefulWriterT<NetworkDriver>::hbFunctionJumppad(void *thisPointer) { |
smoritaemb | 0:580aba13d1a1 | 316 | auto *writer = static_cast<StatefulWriterT<NetworkDriver> *>(thisPointer); |
smoritaemb | 0:580aba13d1a1 | 317 | writer->sendHeartBeatLoop(); |
smoritaemb | 0:580aba13d1a1 | 318 | } |
smoritaemb | 0:580aba13d1a1 | 319 | |
smoritaemb | 0:580aba13d1a1 | 320 | template <class NetworkDriver> |
smoritaemb | 0:580aba13d1a1 | 321 | void StatefulWriterT<NetworkDriver>::sendHeartBeatLoop() { |
smoritaemb | 0:580aba13d1a1 | 322 | while (m_running) { |
smoritaemb | 0:580aba13d1a1 | 323 | sendHeartBeat(); |
smoritaemb | 0:580aba13d1a1 | 324 | sys_msleep(Config::SF_WRITER_HB_PERIOD_MS); |
smoritaemb | 0:580aba13d1a1 | 325 | } |
smoritaemb | 0:580aba13d1a1 | 326 | } |
smoritaemb | 0:580aba13d1a1 | 327 | |
smoritaemb | 0:580aba13d1a1 | 328 | template <class NetworkDriver> |
smoritaemb | 0:580aba13d1a1 | 329 | void StatefulWriterT<NetworkDriver>::sendHeartBeat() { |
smoritaemb | 0:580aba13d1a1 | 330 | if (m_proxies.isEmpty()) { |
smoritaemb | 0:580aba13d1a1 | 331 | #if SFW_VERBOSE |
smoritaemb | 0:580aba13d1a1 | 332 | log("StatefulWriter[%s]: Skipping heartbeat. No proxies.\n", |
smoritaemb | 0:580aba13d1a1 | 333 | this->m_attributes.topicName); |
smoritaemb | 0:580aba13d1a1 | 334 | #endif |
smoritaemb | 0:580aba13d1a1 | 335 | return; |
smoritaemb | 0:580aba13d1a1 | 336 | } |
smoritaemb | 0:580aba13d1a1 | 337 | |
smoritaemb | 0:580aba13d1a1 | 338 | for (auto &proxy : m_proxies) { |
smoritaemb | 0:580aba13d1a1 | 339 | |
smoritaemb | 0:580aba13d1a1 | 340 | PacketInfo info; |
smoritaemb | 0:580aba13d1a1 | 341 | info.srcPort = m_packetInfo.srcPort; |
smoritaemb | 0:580aba13d1a1 | 342 | |
smoritaemb | 0:580aba13d1a1 | 343 | SequenceNumber_t firstSN; |
smoritaemb | 0:580aba13d1a1 | 344 | SequenceNumber_t lastSN; |
smoritaemb | 0:580aba13d1a1 | 345 | MessageFactory::addHeader(info.buffer, m_attributes.endpointGuid.prefix); |
smoritaemb | 0:580aba13d1a1 | 346 | { |
smoritaemb | 0:580aba13d1a1 | 347 | Lock lock(m_mutex); |
smoritaemb | 0:580aba13d1a1 | 348 | firstSN = m_history.getSeqNumMin(); |
smoritaemb | 0:580aba13d1a1 | 349 | lastSN = m_history.getSeqNumMax(); |
smoritaemb | 0:580aba13d1a1 | 350 | } |
smoritaemb | 0:580aba13d1a1 | 351 | if (firstSN == SEQUENCENUMBER_UNKNOWN || lastSN == SEQUENCENUMBER_UNKNOWN) { |
smoritaemb | 0:580aba13d1a1 | 352 | #if SFW_VERBOSE |
smoritaemb | 0:580aba13d1a1 | 353 | if (strlen(&this->m_attributes.typeName[0]) != 0) { |
smoritaemb | 0:580aba13d1a1 | 354 | log("StatefulWriter[%s]: Skipping heartbeat. No data.\n", |
smoritaemb | 0:580aba13d1a1 | 355 | this->m_attributes.topicName); |
smoritaemb | 0:580aba13d1a1 | 356 | } |
smoritaemb | 0:580aba13d1a1 | 357 | #endif |
smoritaemb | 0:580aba13d1a1 | 358 | return; |
smoritaemb | 0:580aba13d1a1 | 359 | } |
smoritaemb | 0:580aba13d1a1 | 360 | MessageFactory::addSubMessageDestination(info.buffer, proxy.remoteReaderGuid.prefix.id.data()); |
smoritaemb | 0:580aba13d1a1 | 361 | MessageFactory::addHeartbeat( |
smoritaemb | 0:580aba13d1a1 | 362 | info.buffer, m_attributes.endpointGuid.entityId, |
smoritaemb | 0:580aba13d1a1 | 363 | proxy.remoteReaderGuid.entityId, firstSN, lastSN, m_hbCount); |
smoritaemb | 0:580aba13d1a1 | 364 | |
smoritaemb | 0:580aba13d1a1 | 365 | info.destAddr = proxy.remoteLocator.getIp4Address(); |
smoritaemb | 0:580aba13d1a1 | 366 | info.destPort = proxy.remoteLocator.port; |
smoritaemb | 0:580aba13d1a1 | 367 | |
smoritaemb | 0:580aba13d1a1 | 368 | m_transport->sendPacket(info); |
smoritaemb | 0:580aba13d1a1 | 369 | } |
smoritaemb | 0:580aba13d1a1 | 370 | m_hbCount.value++; |
smoritaemb | 0:580aba13d1a1 | 371 | } |
smoritaemb | 0:580aba13d1a1 | 372 | |
smoritaemb | 0:580aba13d1a1 | 373 | |
smoritaemb | 0:580aba13d1a1 | 374 | #undef SFW_VERBOSE |