Porting mros2 as an Mbed library.

Dependents:   mbed-os-example-mros2 example-mbed-mros2-sub-pose example-mbed-mros2-pub-twist example-mbed-mros2-mturtle-teleop

Committer:
smoritaemb
Date:
Sat Mar 19 09:23:37 2022 +0900
Revision:
7:c80f65422d99
Parent:
0:580aba13d1a1
Merge test_assortment_of_msgs branch.

Who changed what in which revision?

UserRevisionLine numberNew contents of line
smoritaemb 0:580aba13d1a1 1 /*
smoritaemb 0:580aba13d1a1 2 The MIT License
smoritaemb 0:580aba13d1a1 3 Copyright (c) 2019 Lehrstuhl Informatik 11 - RWTH Aachen University
smoritaemb 0:580aba13d1a1 4 Permission is hereby granted, free of charge, to any person obtaining a copy
smoritaemb 0:580aba13d1a1 5 of this software and associated documentation files (the "Software"), to deal
smoritaemb 0:580aba13d1a1 6 in the Software without restriction, including without limitation the rights
smoritaemb 0:580aba13d1a1 7 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
smoritaemb 0:580aba13d1a1 8 copies of the Software, and to permit persons to whom the Software is
smoritaemb 0:580aba13d1a1 9 furnished to do so, subject to the following conditions:
smoritaemb 0:580aba13d1a1 10 The above copyright notice and this permission notice shall be included in
smoritaemb 0:580aba13d1a1 11 all copies or substantial portions of the Software.
smoritaemb 0:580aba13d1a1 12 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
smoritaemb 0:580aba13d1a1 13 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
smoritaemb 0:580aba13d1a1 14 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
smoritaemb 0:580aba13d1a1 15 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
smoritaemb 0:580aba13d1a1 16 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
smoritaemb 0:580aba13d1a1 17 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
smoritaemb 0:580aba13d1a1 18 THE SOFTWARE
smoritaemb 0:580aba13d1a1 19
smoritaemb 0:580aba13d1a1 20 This file is part of embeddedRTPS.
smoritaemb 0:580aba13d1a1 21
smoritaemb 0:580aba13d1a1 22 Author: i11 - Embedded Software, RWTH Aachen University
smoritaemb 0:580aba13d1a1 23 */
smoritaemb 0:580aba13d1a1 24
smoritaemb 0:580aba13d1a1 25 #include "rtps/messages/MessageFactory.h"
smoritaemb 0:580aba13d1a1 26 #include <cstring>
smoritaemb 0:580aba13d1a1 27 #include <stdio.h>
smoritaemb 0:580aba13d1a1 28
smoritaemb 0:580aba13d1a1 29 using rtps::StatefulWriterT;
smoritaemb 0:580aba13d1a1 30
smoritaemb 0:580aba13d1a1 31 /*
smoritaemb 0:580aba13d1a1 32 static int line_cnt_ = 0;
smoritaemb 0:580aba13d1a1 33 static char tft_buffer_[150];
smoritaemb 0:580aba13d1a1 34
smoritaemb 0:580aba13d1a1 35 #define log(args...) if(true){ \
smoritaemb 0:580aba13d1a1 36 Lock lock{m_mutex}; \
smoritaemb 0:580aba13d1a1 37 snprintf(tft_buffer_, sizeof(tft_buffer_), args); \
smoritaemb 0:580aba13d1a1 38 TFT_PrintLine(line_cnt_, tft_buffer_); \
smoritaemb 0:580aba13d1a1 39 line_cnt_ = (line_cnt_+1)%5; \
smoritaemb 0:580aba13d1a1 40 }
smoritaemb 0:580aba13d1a1 41 */
smoritaemb 0:580aba13d1a1 42 #define SFW_VERBOSE 0
smoritaemb 0:580aba13d1a1 43
smoritaemb 0:580aba13d1a1 44 #if SFW_VERBOSE
smoritaemb 0:580aba13d1a1 45 #include "rtps/utils/printutils.h"
smoritaemb 0:580aba13d1a1 46 #endif
smoritaemb 0:580aba13d1a1 47
smoritaemb 0:580aba13d1a1 48 template <class NetworkDriver>
smoritaemb 0:580aba13d1a1 49 StatefulWriterT<NetworkDriver>::~StatefulWriterT() {
smoritaemb 0:580aba13d1a1 50 m_running = false;
smoritaemb 0:580aba13d1a1 51 sys_msleep(10); // Required for tests/ Join currently not available
smoritaemb 0:580aba13d1a1 52 // if(sys_mutex_valid(&m_mutex)){
smoritaemb 0:580aba13d1a1 53 sys_mutex_free(&m_mutex);
smoritaemb 0:580aba13d1a1 54 //}
smoritaemb 0:580aba13d1a1 55 }
smoritaemb 0:580aba13d1a1 56
smoritaemb 0:580aba13d1a1 57 template <class NetworkDriver>
smoritaemb 0:580aba13d1a1 58 bool StatefulWriterT<NetworkDriver>::init(TopicData attributes,
smoritaemb 0:580aba13d1a1 59 TopicKind_t topicKind,
smoritaemb 0:580aba13d1a1 60 ThreadPool * /*threadPool*/,
smoritaemb 0:580aba13d1a1 61 NetworkDriver &driver) {
smoritaemb 0:580aba13d1a1 62 if (sys_mutex_new(&m_mutex) != ERR_OK) {
smoritaemb 0:580aba13d1a1 63 #if SFW_VERBOSE
smoritaemb 0:580aba13d1a1 64 log("StatefulWriter: Failed to create mutex.\n");
smoritaemb 0:580aba13d1a1 65 #endif
smoritaemb 0:580aba13d1a1 66 return false;
smoritaemb 0:580aba13d1a1 67 }
smoritaemb 0:580aba13d1a1 68
smoritaemb 0:580aba13d1a1 69 m_transport = &driver;
smoritaemb 0:580aba13d1a1 70 m_attributes = attributes;
smoritaemb 0:580aba13d1a1 71 m_topicKind = topicKind;
smoritaemb 0:580aba13d1a1 72 m_packetInfo.srcPort = attributes.unicastLocator.port;
smoritaemb 0:580aba13d1a1 73 if (m_attributes.endpointGuid.entityId ==
smoritaemb 0:580aba13d1a1 74 ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER) {
smoritaemb 0:580aba13d1a1 75 #ifdef MROS2_USE_EMBEDDEDRTPS
smoritaemb 0:580aba13d1a1 76 m_heartbeatThread = sys_thread_new("HBThreadPub", callHbPubFunc, this,
smoritaemb 0:580aba13d1a1 77 Config::HEARTBEAT_STACKSIZE,
smoritaemb 0:580aba13d1a1 78 Config::THREAD_POOL_WRITER_PRIO);
smoritaemb 0:580aba13d1a1 79 networkPubDriverPtr = this;
smoritaemb 0:580aba13d1a1 80 hbPubFuncPtr = hbFunctionJumppad;
smoritaemb 0:580aba13d1a1 81 #else
smoritaemb 0:580aba13d1a1 82 m_heartbeatThread = sys_thread_new("HBThreadPub", hbFunctionJumppad, this,
smoritaemb 0:580aba13d1a1 83 Config::HEARTBEAT_STACKSIZE,
smoritaemb 0:580aba13d1a1 84 Config::THREAD_POOL_WRITER_PRIO);
smoritaemb 0:580aba13d1a1 85 #endif
smoritaemb 0:580aba13d1a1 86 } else if (m_attributes.endpointGuid.entityId ==
smoritaemb 0:580aba13d1a1 87 ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER) {
smoritaemb 0:580aba13d1a1 88 #ifdef MROS2_USE_EMBEDDEDRTPS
smoritaemb 0:580aba13d1a1 89 m_heartbeatThread = sys_thread_new("HBThreadSub", callHbSubFunc, this,
smoritaemb 0:580aba13d1a1 90 Config::HEARTBEAT_STACKSIZE,
smoritaemb 0:580aba13d1a1 91 Config::THREAD_POOL_WRITER_PRIO);
smoritaemb 0:580aba13d1a1 92 networkSubDriverPtr = this;
smoritaemb 0:580aba13d1a1 93 hbSubFuncPtr = hbFunctionJumppad;
smoritaemb 0:580aba13d1a1 94 #else
smoritaemb 0:580aba13d1a1 95 m_heartbeatThread = sys_thread_new("HBThreadSub", hbFunctionJumppad, this,
smoritaemb 0:580aba13d1a1 96 Config::HEARTBEAT_STACKSIZE,
smoritaemb 0:580aba13d1a1 97 Config::THREAD_POOL_WRITER_PRIO);
smoritaemb 0:580aba13d1a1 98 #endif
smoritaemb 0:580aba13d1a1 99 } else {
smoritaemb 0:580aba13d1a1 100 m_heartbeatThread = sys_thread_new("HBThread", hbFunctionJumppad, this,
smoritaemb 0:580aba13d1a1 101 Config::HEARTBEAT_STACKSIZE,
smoritaemb 0:580aba13d1a1 102 Config::THREAD_POOL_WRITER_PRIO);
smoritaemb 0:580aba13d1a1 103 }
smoritaemb 0:580aba13d1a1 104 /*
smoritaemb 0:580aba13d1a1 105 if(hbFuncPointer == NULL)
smoritaemb 0:580aba13d1a1 106 {
smoritaemb 0:580aba13d1a1 107 hbFuncPointer = hbFunctionJumppad;
smoritaemb 0:580aba13d1a1 108 }*/
smoritaemb 0:580aba13d1a1 109 m_is_initialized_ = true;
smoritaemb 0:580aba13d1a1 110 return true;
smoritaemb 0:580aba13d1a1 111 }
smoritaemb 0:580aba13d1a1 112
smoritaemb 0:580aba13d1a1 113 template <class NetworkDriver>
smoritaemb 0:580aba13d1a1 114 bool StatefulWriterT<NetworkDriver>::addNewMatchedReader(
smoritaemb 0:580aba13d1a1 115 const ReaderProxy &newProxy) {
smoritaemb 0:580aba13d1a1 116 #if SFW_VERBOSE
smoritaemb 0:580aba13d1a1 117 log("StatefulWriter[%s]: New reader added with id: ",
smoritaemb 0:580aba13d1a1 118 &this->m_attributes.topicName[0]);
smoritaemb 0:580aba13d1a1 119 printGuid(newProxy.remoteReaderGuid);
smoritaemb 0:580aba13d1a1 120 log("\n");
smoritaemb 0:580aba13d1a1 121 #endif
smoritaemb 0:580aba13d1a1 122 return m_proxies.add(newProxy);
smoritaemb 0:580aba13d1a1 123 }
smoritaemb 0:580aba13d1a1 124
smoritaemb 0:580aba13d1a1 125 template <class NetworkDriver>
smoritaemb 0:580aba13d1a1 126 void StatefulWriterT<NetworkDriver>::removeReader(const Guid &guid) {
smoritaemb 0:580aba13d1a1 127 auto isElementToRemove = [&](const ReaderProxy &proxy) {
smoritaemb 0:580aba13d1a1 128 return proxy.remoteReaderGuid == guid;
smoritaemb 0:580aba13d1a1 129 };
smoritaemb 0:580aba13d1a1 130 auto thunk = [](void *arg, const ReaderProxy &value) {
smoritaemb 0:580aba13d1a1 131 return (*static_cast<decltype(isElementToRemove) *>(arg))(value);
smoritaemb 0:580aba13d1a1 132 };
smoritaemb 0:580aba13d1a1 133
smoritaemb 0:580aba13d1a1 134 m_proxies.remove(thunk, &isElementToRemove);
smoritaemb 0:580aba13d1a1 135 }
smoritaemb 0:580aba13d1a1 136
smoritaemb 0:580aba13d1a1 137 template <class NetworkDriver>
smoritaemb 0:580aba13d1a1 138 const rtps::CacheChange *StatefulWriterT<NetworkDriver>::newChange(
smoritaemb 0:580aba13d1a1 139 ChangeKind_t kind, const uint8_t *data, DataSize_t size) {
smoritaemb 0:580aba13d1a1 140 if (isIrrelevant(kind)) {
smoritaemb 0:580aba13d1a1 141 return nullptr;
smoritaemb 0:580aba13d1a1 142 }
smoritaemb 0:580aba13d1a1 143
smoritaemb 0:580aba13d1a1 144 Lock lock{m_mutex};
smoritaemb 0:580aba13d1a1 145
smoritaemb 0:580aba13d1a1 146 if (m_history.isFull()) {
smoritaemb 0:580aba13d1a1 147 // Right now we drop elements anyway because we cannot detect non-responding
smoritaemb 0:580aba13d1a1 148 // readers yet. return nullptr;
smoritaemb 0:580aba13d1a1 149 SequenceNumber_t newMin = ++SequenceNumber_t(m_history.getSeqNumMin());
smoritaemb 0:580aba13d1a1 150 if (m_nextSequenceNumberToSend < newMin) {
smoritaemb 0:580aba13d1a1 151 m_nextSequenceNumberToSend =
smoritaemb 0:580aba13d1a1 152 newMin; // Make sure we have the correct sn to send
smoritaemb 0:580aba13d1a1 153 }
smoritaemb 0:580aba13d1a1 154 }
smoritaemb 0:580aba13d1a1 155
smoritaemb 0:580aba13d1a1 156 auto *result = m_history.addChange(data, size);
smoritaemb 0:580aba13d1a1 157 if (mp_threadPool != nullptr) {
smoritaemb 0:580aba13d1a1 158 mp_threadPool->addWorkload(this);
smoritaemb 0:580aba13d1a1 159 }
smoritaemb 0:580aba13d1a1 160
smoritaemb 0:580aba13d1a1 161 #if SFW_VERBOSE
smoritaemb 0:580aba13d1a1 162 log("StatefulWriter[%s]: Adding new data.\n", this->m_attributes.topicName);
smoritaemb 0:580aba13d1a1 163 #endif
smoritaemb 0:580aba13d1a1 164 return result;
smoritaemb 0:580aba13d1a1 165 }
smoritaemb 0:580aba13d1a1 166
smoritaemb 0:580aba13d1a1 167 template <class NetworkDriver> void StatefulWriterT<NetworkDriver>::progress() {
smoritaemb 0:580aba13d1a1 168 for (const auto &proxy : m_proxies) {
smoritaemb 0:580aba13d1a1 169 if (!sendData(proxy, m_nextSequenceNumberToSend)) {
smoritaemb 0:580aba13d1a1 170 continue;
smoritaemb 0:580aba13d1a1 171 }
smoritaemb 0:580aba13d1a1 172 }
smoritaemb 0:580aba13d1a1 173 ++m_nextSequenceNumberToSend;
smoritaemb 0:580aba13d1a1 174 }
smoritaemb 0:580aba13d1a1 175
smoritaemb 0:580aba13d1a1 176 template <typename NetworkDriver>
smoritaemb 0:580aba13d1a1 177 bool StatefulWriterT<NetworkDriver>::isIrrelevant(ChangeKind_t kind) const {
smoritaemb 0:580aba13d1a1 178 // Right now we only allow alive changes
smoritaemb 0:580aba13d1a1 179 // return kind == ChangeKind_t::INVALID || (m_topicKind == TopicKind_t::NO_KEY
smoritaemb 0:580aba13d1a1 180 // && kind != ChangeKind_t::ALIVE);
smoritaemb 0:580aba13d1a1 181 return kind != ChangeKind_t::ALIVE;
smoritaemb 0:580aba13d1a1 182 }
smoritaemb 0:580aba13d1a1 183
smoritaemb 0:580aba13d1a1 184 template <class NetworkDriver>
smoritaemb 0:580aba13d1a1 185 void StatefulWriterT<NetworkDriver>::setAllChangesToUnsent() {
smoritaemb 0:580aba13d1a1 186 Lock lock(m_mutex);
smoritaemb 0:580aba13d1a1 187
smoritaemb 0:580aba13d1a1 188 m_nextSequenceNumberToSend = m_history.getSeqNumMin();
smoritaemb 0:580aba13d1a1 189
smoritaemb 0:580aba13d1a1 190 if (mp_threadPool != nullptr) {
smoritaemb 0:580aba13d1a1 191 mp_threadPool->addWorkload(this);
smoritaemb 0:580aba13d1a1 192 }
smoritaemb 0:580aba13d1a1 193 }
smoritaemb 0:580aba13d1a1 194
smoritaemb 0:580aba13d1a1 195 template <class NetworkDriver>
smoritaemb 0:580aba13d1a1 196 void StatefulWriterT<NetworkDriver>::onNewAckNack(
smoritaemb 0:580aba13d1a1 197 const SubmessageAckNack &msg, const GuidPrefix_t &sourceGuidPrefix) {
smoritaemb 0:580aba13d1a1 198 // Lock lock{m_mutex};
smoritaemb 0:580aba13d1a1 199 // Search for reader
smoritaemb 0:580aba13d1a1 200 ReaderProxy *reader = nullptr;
smoritaemb 0:580aba13d1a1 201 for (auto &proxy : m_proxies) {
smoritaemb 0:580aba13d1a1 202 if (proxy.remoteReaderGuid.prefix == sourceGuidPrefix &&
smoritaemb 0:580aba13d1a1 203 proxy.remoteReaderGuid.entityId == msg.readerId) {
smoritaemb 0:580aba13d1a1 204 reader = &proxy;
smoritaemb 0:580aba13d1a1 205 break;
smoritaemb 0:580aba13d1a1 206 }
smoritaemb 0:580aba13d1a1 207 }
smoritaemb 0:580aba13d1a1 208
smoritaemb 0:580aba13d1a1 209 if (reader == nullptr) {
smoritaemb 0:580aba13d1a1 210 #if SFW_VERBOSE
smoritaemb 0:580aba13d1a1 211 log("StatefulWriter[%s]: No proxy found with id: ",
smoritaemb 0:580aba13d1a1 212 &this->m_attributes.topicName[0]);
smoritaemb 0:580aba13d1a1 213 printEntityId(msg.readerId);
smoritaemb 0:580aba13d1a1 214 log(" Dropping acknack.\n");
smoritaemb 0:580aba13d1a1 215 #endif
smoritaemb 0:580aba13d1a1 216 return;
smoritaemb 0:580aba13d1a1 217 }
smoritaemb 0:580aba13d1a1 218
smoritaemb 0:580aba13d1a1 219 uint8_t hash = 0;
smoritaemb 0:580aba13d1a1 220 for (int i = 0; i < sourceGuidPrefix.id.size(); i++) {
smoritaemb 0:580aba13d1a1 221 hash += sourceGuidPrefix.id.at(i);
smoritaemb 0:580aba13d1a1 222 }
smoritaemb 0:580aba13d1a1 223
smoritaemb 0:580aba13d1a1 224 char bfr[20];
smoritaemb 0:580aba13d1a1 225 size_t size = snprintf(bfr, sizeof(bfr), "%u <= %u", msg.count.value,
smoritaemb 0:580aba13d1a1 226 reader->ackNackCount.value);
smoritaemb 0:580aba13d1a1 227 if (!(size < sizeof(bfr))) {
smoritaemb 0:580aba13d1a1 228 while (1)
smoritaemb 0:580aba13d1a1 229 ;
smoritaemb 0:580aba13d1a1 230 }
smoritaemb 0:580aba13d1a1 231
smoritaemb 0:580aba13d1a1 232 if (msg.count.value <= reader->ackNackCount.value) {
smoritaemb 0:580aba13d1a1 233 #if SFW_VERBOSE
smoritaemb 0:580aba13d1a1 234 log("StatefulWriter[%s]: Count too small. Dropping acknack.\n",
smoritaemb 0:580aba13d1a1 235 &this->m_attributes.topicName[0]);
smoritaemb 0:580aba13d1a1 236 #endif
smoritaemb 0:580aba13d1a1 237 return;
smoritaemb 0:580aba13d1a1 238 }
smoritaemb 0:580aba13d1a1 239
smoritaemb 0:580aba13d1a1 240 reader->ackNackCount = msg.count;
smoritaemb 0:580aba13d1a1 241
smoritaemb 0:580aba13d1a1 242 // Send missing packets
smoritaemb 0:580aba13d1a1 243 SequenceNumber_t nextSN = msg.readerSNState.base;
smoritaemb 0:580aba13d1a1 244 #if SFW_VERBOSE
smoritaemb 0:580aba13d1a1 245 if (nextSN.low == 0 && nextSN.high == 0) {
smoritaemb 0:580aba13d1a1 246 log("StatefulWriter[%s]: Received preemptive acknack. Ignored.\n",
smoritaemb 0:580aba13d1a1 247 &this->m_attributes.topicName[0]);
smoritaemb 0:580aba13d1a1 248 } else {
smoritaemb 0:580aba13d1a1 249 log("StatefulWriter[%s]: Received non-preemptive acknack.\n",
smoritaemb 0:580aba13d1a1 250 &this->m_attributes.topicName[0]);
smoritaemb 0:580aba13d1a1 251 }
smoritaemb 0:580aba13d1a1 252 #endif
smoritaemb 0:580aba13d1a1 253 for (uint32_t i = 0; i < msg.readerSNState.numBits; ++i, ++nextSN) {
smoritaemb 0:580aba13d1a1 254 if (msg.readerSNState.isSet(i)) {
smoritaemb 0:580aba13d1a1 255 #if SFW_VERBOSE
smoritaemb 0:580aba13d1a1 256 log("StatefulWriter[%s]: Send Packet on acknack.\n",
smoritaemb 0:580aba13d1a1 257 this->m_attributes.topicName);
smoritaemb 0:580aba13d1a1 258 #endif
smoritaemb 0:580aba13d1a1 259 sendData(*reader, nextSN);
smoritaemb 0:580aba13d1a1 260 }
smoritaemb 0:580aba13d1a1 261 }
smoritaemb 0:580aba13d1a1 262 // Check for sequence numbers after defined range
smoritaemb 0:580aba13d1a1 263 SequenceNumber_t maxSN;
smoritaemb 0:580aba13d1a1 264 {
smoritaemb 0:580aba13d1a1 265 Lock lock(m_mutex);
smoritaemb 0:580aba13d1a1 266 maxSN = m_history.getSeqNumMax();
smoritaemb 0:580aba13d1a1 267 }
smoritaemb 0:580aba13d1a1 268 while (nextSN <= maxSN) {
smoritaemb 0:580aba13d1a1 269 sendData(*reader, nextSN);
smoritaemb 0:580aba13d1a1 270 ++nextSN;
smoritaemb 0:580aba13d1a1 271 }
smoritaemb 0:580aba13d1a1 272 }
smoritaemb 0:580aba13d1a1 273
smoritaemb 0:580aba13d1a1 274 template <class NetworkDriver>
smoritaemb 0:580aba13d1a1 275 bool StatefulWriterT<NetworkDriver>::sendData(
smoritaemb 0:580aba13d1a1 276 const ReaderProxy &reader, const SequenceNumber_t &snMissing) {
smoritaemb 0:580aba13d1a1 277
smoritaemb 0:580aba13d1a1 278 // TODO smarter packaging e.g. by creating MessageStruct and serialize after
smoritaemb 0:580aba13d1a1 279 // adjusting values Reusing the pbuf is not possible. See
smoritaemb 0:580aba13d1a1 280 // https://www.nongnu.org/lwip/2_0_x/raw_api.html (Zero-Copy MACs)
smoritaemb 0:580aba13d1a1 281
smoritaemb 0:580aba13d1a1 282 PacketInfo info;
smoritaemb 0:580aba13d1a1 283 info.srcPort = m_packetInfo.srcPort;
smoritaemb 0:580aba13d1a1 284
smoritaemb 0:580aba13d1a1 285 MessageFactory::addHeader(info.buffer, m_attributes.endpointGuid.prefix);
smoritaemb 0:580aba13d1a1 286 MessageFactory::addSubMessageDestination(info.buffer);
smoritaemb 0:580aba13d1a1 287 MessageFactory::addSubMessageTimeStamp(info.buffer);
smoritaemb 0:580aba13d1a1 288
smoritaemb 0:580aba13d1a1 289 // Just usable for IPv4
smoritaemb 0:580aba13d1a1 290 const Locator &locator = reader.remoteLocator;
smoritaemb 0:580aba13d1a1 291
smoritaemb 0:580aba13d1a1 292 info.destAddr = locator.getIp4Address();
smoritaemb 0:580aba13d1a1 293 info.destPort = (Ip4Port_t)locator.port;
smoritaemb 0:580aba13d1a1 294
smoritaemb 0:580aba13d1a1 295 {
smoritaemb 0:580aba13d1a1 296 Lock lock(m_mutex);
smoritaemb 0:580aba13d1a1 297 const CacheChange *next = m_history.getChangeBySN(snMissing);
smoritaemb 0:580aba13d1a1 298 if (next == nullptr) {
smoritaemb 0:580aba13d1a1 299 #if SFW_VERBOSE
smoritaemb 0:580aba13d1a1 300 log("StatefulWriter[%s]: Couldn't get a CacheChange with SN (%i,%u)\n",
smoritaemb 0:580aba13d1a1 301 &this->m_attributes.topicName[0], snMissing.high, snMissing.low);
smoritaemb 0:580aba13d1a1 302 #endif
smoritaemb 0:580aba13d1a1 303 return false;
smoritaemb 0:580aba13d1a1 304 }
smoritaemb 0:580aba13d1a1 305 MessageFactory::addSubMessageData(
smoritaemb 0:580aba13d1a1 306 info.buffer, next->data, false, next->sequenceNumber,
smoritaemb 0:580aba13d1a1 307 m_attributes.endpointGuid.entityId, reader.remoteReaderGuid.entityId);
smoritaemb 0:580aba13d1a1 308 }
smoritaemb 0:580aba13d1a1 309
smoritaemb 0:580aba13d1a1 310 m_transport->sendPacket(info);
smoritaemb 0:580aba13d1a1 311 return true;
smoritaemb 0:580aba13d1a1 312 }
smoritaemb 0:580aba13d1a1 313
smoritaemb 0:580aba13d1a1 314 template <class NetworkDriver>
smoritaemb 0:580aba13d1a1 315 void StatefulWriterT<NetworkDriver>::hbFunctionJumppad(void *thisPointer) {
smoritaemb 0:580aba13d1a1 316 auto *writer = static_cast<StatefulWriterT<NetworkDriver> *>(thisPointer);
smoritaemb 0:580aba13d1a1 317 writer->sendHeartBeatLoop();
smoritaemb 0:580aba13d1a1 318 }
smoritaemb 0:580aba13d1a1 319
smoritaemb 0:580aba13d1a1 320 template <class NetworkDriver>
smoritaemb 0:580aba13d1a1 321 void StatefulWriterT<NetworkDriver>::sendHeartBeatLoop() {
smoritaemb 0:580aba13d1a1 322 while (m_running) {
smoritaemb 0:580aba13d1a1 323 sendHeartBeat();
smoritaemb 0:580aba13d1a1 324 sys_msleep(Config::SF_WRITER_HB_PERIOD_MS);
smoritaemb 0:580aba13d1a1 325 }
smoritaemb 0:580aba13d1a1 326 }
smoritaemb 0:580aba13d1a1 327
smoritaemb 0:580aba13d1a1 328 template <class NetworkDriver>
smoritaemb 0:580aba13d1a1 329 void StatefulWriterT<NetworkDriver>::sendHeartBeat() {
smoritaemb 0:580aba13d1a1 330 if (m_proxies.isEmpty()) {
smoritaemb 0:580aba13d1a1 331 #if SFW_VERBOSE
smoritaemb 0:580aba13d1a1 332 log("StatefulWriter[%s]: Skipping heartbeat. No proxies.\n",
smoritaemb 0:580aba13d1a1 333 this->m_attributes.topicName);
smoritaemb 0:580aba13d1a1 334 #endif
smoritaemb 0:580aba13d1a1 335 return;
smoritaemb 0:580aba13d1a1 336 }
smoritaemb 0:580aba13d1a1 337
smoritaemb 0:580aba13d1a1 338 for (auto &proxy : m_proxies) {
smoritaemb 0:580aba13d1a1 339
smoritaemb 0:580aba13d1a1 340 PacketInfo info;
smoritaemb 0:580aba13d1a1 341 info.srcPort = m_packetInfo.srcPort;
smoritaemb 0:580aba13d1a1 342
smoritaemb 0:580aba13d1a1 343 SequenceNumber_t firstSN;
smoritaemb 0:580aba13d1a1 344 SequenceNumber_t lastSN;
smoritaemb 0:580aba13d1a1 345 MessageFactory::addHeader(info.buffer, m_attributes.endpointGuid.prefix);
smoritaemb 0:580aba13d1a1 346 {
smoritaemb 0:580aba13d1a1 347 Lock lock(m_mutex);
smoritaemb 0:580aba13d1a1 348 firstSN = m_history.getSeqNumMin();
smoritaemb 0:580aba13d1a1 349 lastSN = m_history.getSeqNumMax();
smoritaemb 0:580aba13d1a1 350 }
smoritaemb 0:580aba13d1a1 351 if (firstSN == SEQUENCENUMBER_UNKNOWN || lastSN == SEQUENCENUMBER_UNKNOWN) {
smoritaemb 0:580aba13d1a1 352 #if SFW_VERBOSE
smoritaemb 0:580aba13d1a1 353 if (strlen(&this->m_attributes.typeName[0]) != 0) {
smoritaemb 0:580aba13d1a1 354 log("StatefulWriter[%s]: Skipping heartbeat. No data.\n",
smoritaemb 0:580aba13d1a1 355 this->m_attributes.topicName);
smoritaemb 0:580aba13d1a1 356 }
smoritaemb 0:580aba13d1a1 357 #endif
smoritaemb 0:580aba13d1a1 358 return;
smoritaemb 0:580aba13d1a1 359 }
smoritaemb 0:580aba13d1a1 360 MessageFactory::addSubMessageDestination(info.buffer, proxy.remoteReaderGuid.prefix.id.data());
smoritaemb 0:580aba13d1a1 361 MessageFactory::addHeartbeat(
smoritaemb 0:580aba13d1a1 362 info.buffer, m_attributes.endpointGuid.entityId,
smoritaemb 0:580aba13d1a1 363 proxy.remoteReaderGuid.entityId, firstSN, lastSN, m_hbCount);
smoritaemb 0:580aba13d1a1 364
smoritaemb 0:580aba13d1a1 365 info.destAddr = proxy.remoteLocator.getIp4Address();
smoritaemb 0:580aba13d1a1 366 info.destPort = proxy.remoteLocator.port;
smoritaemb 0:580aba13d1a1 367
smoritaemb 0:580aba13d1a1 368 m_transport->sendPacket(info);
smoritaemb 0:580aba13d1a1 369 }
smoritaemb 0:580aba13d1a1 370 m_hbCount.value++;
smoritaemb 0:580aba13d1a1 371 }
smoritaemb 0:580aba13d1a1 372
smoritaemb 0:580aba13d1a1 373
smoritaemb 0:580aba13d1a1 374 #undef SFW_VERBOSE