Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependents: mbed-os-example-mros2 example-mbed-mros2-sub-pose example-mbed-mros2-pub-twist example-mbed-mros2-mturtle-teleop
SPDPAgent.cpp
00001 /* 00002 The MIT License 00003 Copyright (c) 2019 Lehrstuhl Informatik 11 - RWTH Aachen University 00004 Permission is hereby granted, free of charge, to any person obtaining a copy 00005 of this software and associated documentation files (the "Software"), to deal 00006 in the Software without restriction, including without limitation the rights 00007 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 00008 copies of the Software, and to permit persons to whom the Software is 00009 furnished to do so, subject to the following conditions: 00010 The above copyright notice and this permission notice shall be included in 00011 all copies or substantial portions of the Software. 00012 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 00013 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 00014 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 00015 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 00016 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 00017 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 00018 THE SOFTWARE 00019 00020 This file is part of embeddedRTPS. 00021 00022 Author: i11 - Embedded Software, RWTH Aachen University 00023 */ 00024 00025 #include "rtps/discovery/SPDPAgent.h" 00026 #include "lwip/sys.h" 00027 #include "rtps/discovery/ParticipantProxyData.h" 00028 #include "rtps/entities/Participant.h" 00029 #include "rtps/entities/Reader.h" 00030 #include "rtps/entities/Writer.h" 00031 #include "rtps/messages/MessageTypes.h" 00032 #include "rtps/utils/udpUtils.h" 00033 00034 using rtps::SPDPAgent; 00035 using rtps::SMElement::BuildInEndpointSet; 00036 using rtps::SMElement::ParameterId; 00037 00038 #define SPDP_VERBOSE 0 00039 00040 #if SPDP_VERBOSE 00041 #include "rtps/utils/printutils.h" 00042 #endif 00043 00044 SPDPAgent::~SPDPAgent() { 00045 if (initialized) { 00046 sys_mutex_free(&m_mutex); 00047 } 00048 } 00049 00050 void SPDPAgent::init(Participant &participant, BuiltInEndpoints &endpoints) { 00051 if (sys_mutex_new(&m_mutex) != ERR_OK) { 00052 #if SPDP_VERBOSE 00053 printf("Could not alloc mutex"); 00054 #endif 00055 return; 00056 } 00057 mp_participant = &participant; 00058 m_buildInEndpoints = endpoints; 00059 m_buildInEndpoints.spdpReader->registerCallback(receiveCallback, this); 00060 00061 ucdr_init_buffer(&m_microbuffer, m_outputBuffer.data(), 00062 m_outputBuffer.size()); 00063 // addInlineQos(); 00064 addParticipantParameters(); 00065 initialized = true; 00066 } 00067 00068 void SPDPAgent::start() { 00069 if (m_running) { 00070 return; 00071 } 00072 m_running = true; 00073 #ifdef MROS2_USE_EMBEDDEDRTPS 00074 sys_thread_new("SPDPThread", callRunBroadcast, this, 00075 Config::SPDP_WRITER_STACKSIZE, Config::SPDP_WRITER_PRIO); 00076 #else 00077 sys_thread_new("SPDPThread", runBroadcast, this, 00078 Config::SPDP_WRITER_STACKSIZE, Config::SPDP_WRITER_PRIO); 00079 #endif 00080 } 00081 00082 void SPDPAgent::stop() { m_running = false; } 00083 00084 void SPDPAgent::runBroadcast(void *args) { 00085 SPDPAgent &agent = *static_cast<SPDPAgent *>(args); 00086 const DataSize_t size = ucdr_buffer_length(&agent.m_microbuffer); 00087 agent.m_buildInEndpoints.spdpWriter->newChange( 00088 ChangeKind_t::ALIVE, agent.m_microbuffer.init, size); 00089 while (agent.m_running) { 00090 sys_msleep(Config::SPDP_RESEND_PERIOD_MS); 00091 agent.m_buildInEndpoints.spdpWriter->setAllChangesToUnsent(); 00092 } 00093 } 00094 00095 void SPDPAgent::receiveCallback(void *callee, 00096 const ReaderCacheChange &cacheChange) { 00097 auto agent = static_cast<SPDPAgent *>(callee); 00098 agent->handleSPDPPackage(cacheChange); 00099 } 00100 00101 void SPDPAgent::handleSPDPPackage(const ReaderCacheChange &cacheChange) { 00102 if (!initialized) { 00103 #if SPDP_VERBOSE 00104 printf("SPDP: Callback called without initialization\n"); 00105 #endif 00106 return; 00107 } 00108 00109 Lock lock{m_mutex}; 00110 if (cacheChange.size > m_inputBuffer.size()) { 00111 #if SPDP_VERBOSE 00112 printf("SPDP: Input buffer to small\n"); 00113 #endif 00114 return; 00115 } 00116 00117 // Something went wrong deserializing remote participant 00118 if (!cacheChange.copyInto(m_inputBuffer.data(), m_inputBuffer.size())) { 00119 return; 00120 } 00121 00122 ucdrBuffer buffer; 00123 ucdr_init_buffer(&buffer, m_inputBuffer.data(), m_inputBuffer.size()); 00124 00125 if (cacheChange.kind == ChangeKind_t::ALIVE) { 00126 configureEndianessAndOptions(buffer); 00127 volatile bool success = m_proxyDataBuffer.readFromUcdrBuffer(buffer); 00128 if (success) { 00129 // TODO In case we store the history we can free the history mutex here 00130 processProxyData(); 00131 } 00132 } else { 00133 // TODO RemoveParticipant 00134 } 00135 } 00136 00137 void SPDPAgent::configureEndianessAndOptions(ucdrBuffer &buffer) { 00138 std::array<uint8_t, 2> encapsulation{}; 00139 // Endianess doesn't matter for this since those are single bytes 00140 ucdr_deserialize_array_uint8_t(&buffer, encapsulation.data(), 00141 encapsulation.size()); 00142 if (encapsulation == SMElement::SCHEME_PL_CDR_LE) { 00143 buffer.endianness = UCDR_LITTLE_ENDIANNESS; 00144 } else { 00145 buffer.endianness = UCDR_BIG_ENDIANNESS; 00146 } 00147 // Reuse encapsulation buffer to skip options 00148 ucdr_deserialize_array_uint8_t(&buffer, encapsulation.data(), 00149 encapsulation.size()); 00150 } 00151 00152 void SPDPAgent::processProxyData() { 00153 if (m_proxyDataBuffer.m_guid.prefix.id == mp_participant->m_guidPrefix.id) { 00154 return; // Our own packet 00155 } 00156 00157 if (mp_participant->findRemoteParticipant(m_proxyDataBuffer.m_guid.prefix) != 00158 nullptr) { 00159 m_buildInEndpoints.spdpWriter->setAllChangesToUnsent(); 00160 return; // Already in our list 00161 } 00162 00163 // New participant, help him join fast by broadcasting data again 00164 // if(mp_participant->getRemoteParticipantCount() == 1){ 00165 // return; 00166 //} 00167 00168 if (mp_participant->addNewRemoteParticipant(m_proxyDataBuffer)) { 00169 addProxiesForBuiltInEndpoints(); 00170 m_buildInEndpoints.spdpWriter->setAllChangesToUnsent(); 00171 #if SPDP_VERBOSE 00172 printf("Added new participant with guid: "); 00173 printGuidPrefix(m_proxyDataBuffer.m_guid.prefix); 00174 printf("\n"); 00175 } else { 00176 printf("Failed to add new participant"); 00177 #endif 00178 } 00179 } 00180 00181 bool SPDPAgent::addProxiesForBuiltInEndpoints() { 00182 00183 Locator *locator = nullptr; 00184 00185 // Check if the remote participants has a locator in our subnet 00186 for (unsigned int i = 0; 00187 i < m_proxyDataBuffer.m_metatrafficUnicastLocatorList.size(); i++) { 00188 Locator *l = &(m_proxyDataBuffer.m_metatrafficUnicastLocatorList[i]); 00189 if (l->isValid() && l->isSameSubnet()) { 00190 locator = l; 00191 break; 00192 } 00193 } 00194 00195 if (!locator) { 00196 return false; 00197 } 00198 00199 ip4_addr_t ip4addr = locator->getIp4Address(); 00200 const char *addr = ip4addr_ntoa(&ip4addr); 00201 printf("Adding IPv4 Locator %s", addr); 00202 00203 if (m_proxyDataBuffer.hasPublicationWriter()) { 00204 const WriterProxy proxy{{m_proxyDataBuffer.m_guid.prefix, 00205 ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER}, 00206 *locator}; 00207 m_buildInEndpoints.sedpPubReader->addNewMatchedWriter(proxy); 00208 } 00209 00210 if (m_proxyDataBuffer.hasSubscriptionWriter()) { 00211 const WriterProxy proxy{{m_proxyDataBuffer.m_guid.prefix, 00212 ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER}, 00213 *locator}; 00214 m_buildInEndpoints.sedpSubReader->addNewMatchedWriter(proxy); 00215 } 00216 00217 if (m_proxyDataBuffer.hasPublicationReader()) { 00218 const ReaderProxy proxy{{m_proxyDataBuffer.m_guid.prefix, 00219 ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER}, 00220 *locator}; 00221 m_buildInEndpoints.sedpPubWriter->addNewMatchedReader(proxy); 00222 } 00223 00224 if (m_proxyDataBuffer.hasSubscriptionReader()) { 00225 const ReaderProxy proxy{{m_proxyDataBuffer.m_guid.prefix, 00226 ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER}, 00227 *locator}; 00228 m_buildInEndpoints.sedpSubWriter->addNewMatchedReader(proxy); 00229 } 00230 00231 return true; 00232 } 00233 00234 void SPDPAgent::addInlineQos() { 00235 ucdr_serialize_uint16_t(&m_microbuffer, ParameterId::PID_KEY_HASH); 00236 ucdr_serialize_uint16_t(&m_microbuffer, 16); 00237 ucdr_serialize_array_uint8_t(&m_microbuffer, 00238 mp_participant->m_guidPrefix.id.data(), 00239 sizeof(GuidPrefix_t::id)); 00240 ucdr_serialize_array_uint8_t(&m_microbuffer, 00241 ENTITYID_BUILD_IN_PARTICIPANT.entityKey.data(), 00242 sizeof(EntityId_t::entityKey)); 00243 ucdr_serialize_uint8_t( 00244 &m_microbuffer, 00245 static_cast<uint8_t>(ENTITYID_BUILD_IN_PARTICIPANT.entityKind)); 00246 00247 endCurrentList(); 00248 } 00249 00250 void SPDPAgent::endCurrentList() { 00251 //adding node name and node namespace 00252 /* 00253 ucdr_serialize_uint16_t(&m_microbuffer, ParameterId::PID_ENTITY_NAME); //TODO: clean this 00254 ucdr_serialize_uint16_t(&m_microbuffer, 8); 00255 ucdr_serialize_uint16_t(&m_microbuffer, 4); 00256 ucdr_serialize_uint16_t(&m_microbuffer, 0); 00257 ucdr_serialize_uint8_t(&m_microbuffer, 's'); 00258 ucdr_serialize_uint8_t(&m_microbuffer, 't'); 00259 ucdr_serialize_uint8_t(&m_microbuffer, 'm'); 00260 ucdr_serialize_uint8_t(&m_microbuffer, 0); 00261 00262 ucdr_serialize_uint16_t(&m_microbuffer, ParameterId::PID_USER_DATA); //TODO: clean this 00263 ucdr_serialize_uint16_t(&m_microbuffer, 28); 00264 ucdr_serialize_uint16_t(&m_microbuffer, 22); 00265 ucdr_serialize_uint16_t(&m_microbuffer, 0); 00266 ucdr_serialize_uint8_t(&m_microbuffer, 'n'); 00267 ucdr_serialize_uint8_t(&m_microbuffer, 'a'); 00268 ucdr_serialize_uint8_t(&m_microbuffer, 'm'); 00269 ucdr_serialize_uint8_t(&m_microbuffer, 'e'); 00270 ucdr_serialize_uint8_t(&m_microbuffer, '='); 00271 ucdr_serialize_uint8_t(&m_microbuffer, 's'); 00272 ucdr_serialize_uint8_t(&m_microbuffer, 't'); 00273 ucdr_serialize_uint8_t(&m_microbuffer, 'm'); 00274 ucdr_serialize_uint8_t(&m_microbuffer, ';'); 00275 ucdr_serialize_uint8_t(&m_microbuffer, 'n'); 00276 ucdr_serialize_uint8_t(&m_microbuffer, 'a'); 00277 ucdr_serialize_uint8_t(&m_microbuffer, 'm'); 00278 ucdr_serialize_uint8_t(&m_microbuffer, 'e'); 00279 ucdr_serialize_uint8_t(&m_microbuffer, 's'); 00280 ucdr_serialize_uint8_t(&m_microbuffer, 'p'); 00281 ucdr_serialize_uint8_t(&m_microbuffer, 'a'); 00282 ucdr_serialize_uint8_t(&m_microbuffer, 'c'); 00283 ucdr_serialize_uint8_t(&m_microbuffer, 'e'); 00284 ucdr_serialize_uint8_t(&m_microbuffer, '='); 00285 ucdr_serialize_uint8_t(&m_microbuffer, '/'); 00286 ucdr_serialize_uint8_t(&m_microbuffer, ';'); 00287 ucdr_serialize_uint8_t(&m_microbuffer, 0); 00288 ucdr_serialize_uint16_t(&m_microbuffer, 0); 00289 */ 00290 ucdr_serialize_uint16_t(&m_microbuffer, ParameterId::PID_SENTINEL); 00291 ucdr_serialize_uint16_t(&m_microbuffer, 0); 00292 } 00293 00294 void SPDPAgent::addParticipantParameters() { 00295 const uint16_t zero_options = 0; 00296 const uint16_t protocolVersionSize = 00297 sizeof(PROTOCOLVERSION.major) + sizeof(PROTOCOLVERSION.minor); 00298 const uint16_t vendorIdSize = Config::VENDOR_ID.vendorId.size(); 00299 const uint16_t locatorSize = sizeof(Locator); 00300 const uint16_t durationSize = 00301 sizeof(Duration_t::seconds) + sizeof(Duration_t::fraction); 00302 const uint16_t entityKeySize = 3; 00303 const uint16_t entityKindSize = 1; 00304 const uint16_t entityIdSize = entityKeySize + entityKindSize; 00305 const uint16_t guidSize = sizeof(GuidPrefix_t::id) + entityIdSize; 00306 00307 const Locator userUniCastLocator = 00308 getUserUnicastLocator(mp_participant->m_participantId); 00309 const Locator builtInUniCastLocator = 00310 getBuiltInUnicastLocator(mp_participant->m_participantId); 00311 const Locator builtInMultiCastLocator = getBuiltInMulticastLocator(); 00312 00313 ucdr_serialize_array_uint8_t(&m_microbuffer, 00314 rtps::SMElement::SCHEME_PL_CDR_LE.data(), 00315 rtps::SMElement::SCHEME_PL_CDR_LE.size()); 00316 ucdr_serialize_uint16_t(&m_microbuffer, zero_options); 00317 00318 ucdr_serialize_uint16_t(&m_microbuffer, ParameterId::PID_PROTOCOL_VERSION); 00319 ucdr_serialize_uint16_t(&m_microbuffer, protocolVersionSize + 2); 00320 ucdr_serialize_uint8_t(&m_microbuffer, PROTOCOLVERSION.major); 00321 ucdr_serialize_uint8_t(&m_microbuffer, PROTOCOLVERSION.minor); 00322 m_microbuffer.iterator += 2; // padding 00323 m_microbuffer.last_data_size = 4; // to 4 byte 00324 00325 ucdr_serialize_uint16_t(&m_microbuffer, ParameterId::PID_VENDORID); 00326 ucdr_serialize_uint16_t(&m_microbuffer, vendorIdSize + 2); 00327 ucdr_serialize_array_uint8_t(&m_microbuffer, 00328 Config::VENDOR_ID.vendorId.data(), vendorIdSize); 00329 m_microbuffer.iterator += 2; // padding 00330 m_microbuffer.last_data_size = 4; // to 4 byte 00331 00332 ucdr_serialize_uint16_t(&m_microbuffer, 00333 ParameterId::PID_DEFAULT_UNICAST_LOCATOR); 00334 ucdr_serialize_uint16_t(&m_microbuffer, locatorSize); 00335 ucdr_serialize_array_uint8_t( 00336 &m_microbuffer, reinterpret_cast<const uint8_t *>(&userUniCastLocator), 00337 locatorSize); 00338 00339 ucdr_serialize_uint16_t(&m_microbuffer, 00340 ParameterId::PID_METATRAFFIC_UNICAST_LOCATOR); 00341 ucdr_serialize_uint16_t(&m_microbuffer, locatorSize); 00342 ucdr_serialize_array_uint8_t( 00343 &m_microbuffer, reinterpret_cast<const uint8_t *>(&builtInUniCastLocator), 00344 locatorSize); 00345 00346 ucdr_serialize_uint16_t(&m_microbuffer, 00347 ParameterId::PID_METATRAFFIC_MULTICAST_LOCATOR); 00348 ucdr_serialize_uint16_t(&m_microbuffer, locatorSize); 00349 ucdr_serialize_array_uint8_t( 00350 &m_microbuffer, 00351 reinterpret_cast<const uint8_t *>(&builtInMultiCastLocator), locatorSize); 00352 00353 ucdr_serialize_uint16_t(&m_microbuffer, 00354 ParameterId::PID_PARTICIPANT_LEASE_DURATION); 00355 ucdr_serialize_uint16_t(&m_microbuffer, durationSize); 00356 ucdr_serialize_int32_t(&m_microbuffer, Config::SPDP_LEASE_DURATION.seconds); 00357 ucdr_serialize_uint32_t(&m_microbuffer, Config::SPDP_LEASE_DURATION.fraction); 00358 00359 ucdr_serialize_uint16_t(&m_microbuffer, ParameterId::PID_PARTICIPANT_GUID); 00360 ucdr_serialize_uint16_t(&m_microbuffer, guidSize); 00361 ucdr_serialize_array_uint8_t(&m_microbuffer, 00362 mp_participant->m_guidPrefix.id.data(), 00363 sizeof(GuidPrefix_t::id)); 00364 ucdr_serialize_array_uint8_t(&m_microbuffer, 00365 ENTITYID_BUILD_IN_PARTICIPANT.entityKey.data(), 00366 entityKeySize); 00367 ucdr_serialize_uint8_t( 00368 &m_microbuffer, 00369 static_cast<uint8_t>(ENTITYID_BUILD_IN_PARTICIPANT.entityKind)); 00370 00371 ucdr_serialize_uint16_t(&m_microbuffer, 00372 ParameterId::PID_BUILTIN_ENDPOINT_SET); 00373 ucdr_serialize_uint16_t(&m_microbuffer, sizeof(BuildInEndpointSet)); 00374 ucdr_serialize_uint32_t( 00375 &m_microbuffer, BuildInEndpointSet::DISC_BIE_PARTICIPANT_ANNOUNCER | 00376 BuildInEndpointSet::DISC_BIE_PARTICIPANT_DETECTOR | 00377 BuildInEndpointSet::DISC_BIE_PUBLICATION_ANNOUNCER | 00378 BuildInEndpointSet::DISC_BIE_PUBLICATION_DETECTOR | 00379 BuildInEndpointSet::DISC_BIE_SUBSCRIPTION_ANNOUNCER | 00380 BuildInEndpointSet::DISC_BIE_SUBSCRIPTION_DETECTOR); 00381 00382 endCurrentList(); 00383 } 00384 00385 void callRunBroadcast(void *arg){ 00386 SPDPAgent::runBroadcast(arg); 00387 } 00388 00389 #undef SPDP_VERBOSE
Generated on Sun Jul 31 2022 04:49:48 by
1.7.2