S Morita / mbed-mros2

Dependents:   mbed-os-example-mros2 example-mbed-mros2-sub-pose example-mbed-mros2-pub-twist example-mbed-mros2-mturtle-teleop

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers SPDPAgent.cpp Source File

SPDPAgent.cpp

00001 /*
00002 The MIT License
00003 Copyright (c) 2019 Lehrstuhl Informatik 11 - RWTH Aachen University
00004 Permission is hereby granted, free of charge, to any person obtaining a copy
00005 of this software and associated documentation files (the "Software"), to deal
00006 in the Software without restriction, including without limitation the rights
00007 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
00008 copies of the Software, and to permit persons to whom the Software is
00009 furnished to do so, subject to the following conditions:
00010 The above copyright notice and this permission notice shall be included in
00011 all copies or substantial portions of the Software.
00012 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
00013 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
00014 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
00015 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
00016 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
00017 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
00018 THE SOFTWARE
00019 
00020 This file is part of embeddedRTPS.
00021 
00022 Author: i11 - Embedded Software, RWTH Aachen University
00023 */
00024 
00025 #include "rtps/discovery/SPDPAgent.h"
00026 #include "lwip/sys.h"
00027 #include "rtps/discovery/ParticipantProxyData.h"
00028 #include "rtps/entities/Participant.h"
00029 #include "rtps/entities/Reader.h"
00030 #include "rtps/entities/Writer.h"
00031 #include "rtps/messages/MessageTypes.h"
00032 #include "rtps/utils/udpUtils.h"
00033 
00034 using rtps::SPDPAgent;
00035 using rtps::SMElement::BuildInEndpointSet;
00036 using rtps::SMElement::ParameterId;
00037 
00038 #define SPDP_VERBOSE 0
00039 
00040 #if SPDP_VERBOSE
00041 #include "rtps/utils/printutils.h"
00042 #endif
00043 
00044 SPDPAgent::~SPDPAgent() {
00045   if (initialized) {
00046     sys_mutex_free(&m_mutex);
00047   }
00048 }
00049 
00050 void SPDPAgent::init(Participant &participant, BuiltInEndpoints &endpoints) {
00051   if (sys_mutex_new(&m_mutex) != ERR_OK) {
00052 #if SPDP_VERBOSE
00053     printf("Could not alloc mutex");
00054 #endif
00055     return;
00056   }
00057   mp_participant = &participant;
00058   m_buildInEndpoints = endpoints;
00059   m_buildInEndpoints.spdpReader->registerCallback(receiveCallback, this);
00060 
00061   ucdr_init_buffer(&m_microbuffer, m_outputBuffer.data(),
00062                    m_outputBuffer.size());
00063   // addInlineQos();
00064   addParticipantParameters();
00065   initialized = true;
00066 }
00067 
00068 void SPDPAgent::start() {
00069   if (m_running) {
00070     return;
00071   }
00072   m_running = true;
00073 #ifdef MROS2_USE_EMBEDDEDRTPS
00074   sys_thread_new("SPDPThread", callRunBroadcast, this,
00075                  Config::SPDP_WRITER_STACKSIZE, Config::SPDP_WRITER_PRIO);
00076 #else
00077   sys_thread_new("SPDPThread", runBroadcast, this,
00078                    Config::SPDP_WRITER_STACKSIZE, Config::SPDP_WRITER_PRIO);
00079 #endif
00080 }
00081 
00082 void SPDPAgent::stop() { m_running = false; }
00083 
00084 void SPDPAgent::runBroadcast(void *args) {
00085   SPDPAgent &agent = *static_cast<SPDPAgent *>(args);
00086   const DataSize_t size = ucdr_buffer_length(&agent.m_microbuffer);
00087   agent.m_buildInEndpoints.spdpWriter->newChange(
00088       ChangeKind_t::ALIVE, agent.m_microbuffer.init, size);
00089   while (agent.m_running) {
00090     sys_msleep(Config::SPDP_RESEND_PERIOD_MS);
00091     agent.m_buildInEndpoints.spdpWriter->setAllChangesToUnsent();
00092   }
00093 }
00094 
00095 void SPDPAgent::receiveCallback(void *callee,
00096                                 const ReaderCacheChange &cacheChange) {
00097   auto agent = static_cast<SPDPAgent *>(callee);
00098   agent->handleSPDPPackage(cacheChange);
00099 }
00100 
00101 void SPDPAgent::handleSPDPPackage(const ReaderCacheChange &cacheChange) {
00102   if (!initialized) {
00103 #if SPDP_VERBOSE
00104     printf("SPDP: Callback called without initialization\n");
00105 #endif
00106     return;
00107   }
00108 
00109   Lock lock{m_mutex};
00110   if (cacheChange.size > m_inputBuffer.size()) {
00111 #if SPDP_VERBOSE
00112     printf("SPDP: Input buffer to small\n");
00113 #endif
00114     return;
00115   }
00116 
00117   // Something went wrong deserializing remote participant
00118   if (!cacheChange.copyInto(m_inputBuffer.data(), m_inputBuffer.size())) {
00119     return;
00120   }
00121 
00122   ucdrBuffer buffer;
00123   ucdr_init_buffer(&buffer, m_inputBuffer.data(), m_inputBuffer.size());
00124 
00125   if (cacheChange.kind == ChangeKind_t::ALIVE) {
00126     configureEndianessAndOptions(buffer);
00127     volatile bool success = m_proxyDataBuffer.readFromUcdrBuffer(buffer);
00128     if (success) {
00129       // TODO In case we store the history we can free the history mutex here
00130       processProxyData();
00131     }
00132   } else {
00133     // TODO RemoveParticipant
00134   }
00135 }
00136 
00137 void SPDPAgent::configureEndianessAndOptions(ucdrBuffer &buffer) {
00138   std::array<uint8_t, 2> encapsulation{};
00139   // Endianess doesn't matter for this since those are single bytes
00140   ucdr_deserialize_array_uint8_t(&buffer, encapsulation.data(),
00141                                  encapsulation.size());
00142   if (encapsulation == SMElement::SCHEME_PL_CDR_LE) {
00143     buffer.endianness = UCDR_LITTLE_ENDIANNESS;
00144   } else {
00145     buffer.endianness = UCDR_BIG_ENDIANNESS;
00146   }
00147   // Reuse encapsulation buffer to skip options
00148   ucdr_deserialize_array_uint8_t(&buffer, encapsulation.data(),
00149                                  encapsulation.size());
00150 }
00151 
00152 void SPDPAgent::processProxyData() {
00153   if (m_proxyDataBuffer.m_guid.prefix.id == mp_participant->m_guidPrefix.id) {
00154     return; // Our own packet
00155   }
00156 
00157   if (mp_participant->findRemoteParticipant(m_proxyDataBuffer.m_guid.prefix) !=
00158       nullptr) {
00159     m_buildInEndpoints.spdpWriter->setAllChangesToUnsent();
00160     return; // Already in our list
00161   }
00162 
00163   // New participant, help him join fast by broadcasting data again
00164   // if(mp_participant->getRemoteParticipantCount() == 1){
00165   //    return;
00166   //}
00167 
00168   if (mp_participant->addNewRemoteParticipant(m_proxyDataBuffer)) {
00169     addProxiesForBuiltInEndpoints();
00170     m_buildInEndpoints.spdpWriter->setAllChangesToUnsent();
00171 #if SPDP_VERBOSE
00172     printf("Added new participant with guid: ");
00173     printGuidPrefix(m_proxyDataBuffer.m_guid.prefix);
00174     printf("\n");
00175   } else {
00176     printf("Failed to add new participant");
00177 #endif
00178   }
00179 }
00180 
00181 bool SPDPAgent::addProxiesForBuiltInEndpoints() {
00182 
00183   Locator *locator = nullptr;
00184 
00185   // Check if the remote participants has a locator in our subnet
00186   for (unsigned int i = 0;
00187        i < m_proxyDataBuffer.m_metatrafficUnicastLocatorList.size(); i++) {
00188     Locator *l = &(m_proxyDataBuffer.m_metatrafficUnicastLocatorList[i]);
00189     if (l->isValid() && l->isSameSubnet()) {
00190       locator = l;
00191       break;
00192     }
00193   }
00194 
00195   if (!locator) {
00196     return false;
00197   }
00198 
00199   ip4_addr_t ip4addr = locator->getIp4Address();
00200   const char *addr = ip4addr_ntoa(&ip4addr);
00201   printf("Adding IPv4 Locator %s", addr);
00202 
00203   if (m_proxyDataBuffer.hasPublicationWriter()) {
00204     const WriterProxy proxy{{m_proxyDataBuffer.m_guid.prefix,
00205                              ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER},
00206                             *locator};
00207     m_buildInEndpoints.sedpPubReader->addNewMatchedWriter(proxy);
00208   }
00209 
00210   if (m_proxyDataBuffer.hasSubscriptionWriter()) {
00211     const WriterProxy proxy{{m_proxyDataBuffer.m_guid.prefix,
00212                              ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER},
00213                             *locator};
00214     m_buildInEndpoints.sedpSubReader->addNewMatchedWriter(proxy);
00215   }
00216 
00217   if (m_proxyDataBuffer.hasPublicationReader()) {
00218     const ReaderProxy proxy{{m_proxyDataBuffer.m_guid.prefix,
00219                              ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER},
00220                             *locator};
00221     m_buildInEndpoints.sedpPubWriter->addNewMatchedReader(proxy);
00222   }
00223 
00224   if (m_proxyDataBuffer.hasSubscriptionReader()) {
00225     const ReaderProxy proxy{{m_proxyDataBuffer.m_guid.prefix,
00226                              ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER},
00227                             *locator};
00228     m_buildInEndpoints.sedpSubWriter->addNewMatchedReader(proxy);
00229   }
00230 
00231   return true;
00232 }
00233 
00234 void SPDPAgent::addInlineQos() {
00235   ucdr_serialize_uint16_t(&m_microbuffer, ParameterId::PID_KEY_HASH);
00236   ucdr_serialize_uint16_t(&m_microbuffer, 16);
00237   ucdr_serialize_array_uint8_t(&m_microbuffer,
00238                                mp_participant->m_guidPrefix.id.data(),
00239                                sizeof(GuidPrefix_t::id));
00240   ucdr_serialize_array_uint8_t(&m_microbuffer,
00241                                ENTITYID_BUILD_IN_PARTICIPANT.entityKey.data(),
00242                                sizeof(EntityId_t::entityKey));
00243   ucdr_serialize_uint8_t(
00244       &m_microbuffer,
00245       static_cast<uint8_t>(ENTITYID_BUILD_IN_PARTICIPANT.entityKind));
00246 
00247   endCurrentList();
00248 }
00249 
00250 void SPDPAgent::endCurrentList() {
00251   //adding node name and node namespace
00252 /*
00253   ucdr_serialize_uint16_t(&m_microbuffer, ParameterId::PID_ENTITY_NAME); //TODO: clean this
00254   ucdr_serialize_uint16_t(&m_microbuffer, 8);
00255   ucdr_serialize_uint16_t(&m_microbuffer, 4);
00256   ucdr_serialize_uint16_t(&m_microbuffer, 0);
00257   ucdr_serialize_uint8_t(&m_microbuffer, 's');
00258   ucdr_serialize_uint8_t(&m_microbuffer, 't');
00259   ucdr_serialize_uint8_t(&m_microbuffer, 'm');
00260   ucdr_serialize_uint8_t(&m_microbuffer, 0);
00261 
00262   ucdr_serialize_uint16_t(&m_microbuffer, ParameterId::PID_USER_DATA); //TODO: clean this
00263     ucdr_serialize_uint16_t(&m_microbuffer, 28);
00264   ucdr_serialize_uint16_t(&m_microbuffer, 22);
00265   ucdr_serialize_uint16_t(&m_microbuffer, 0);
00266   ucdr_serialize_uint8_t(&m_microbuffer, 'n');
00267   ucdr_serialize_uint8_t(&m_microbuffer, 'a');
00268   ucdr_serialize_uint8_t(&m_microbuffer, 'm');
00269   ucdr_serialize_uint8_t(&m_microbuffer, 'e');
00270   ucdr_serialize_uint8_t(&m_microbuffer, '=');
00271   ucdr_serialize_uint8_t(&m_microbuffer, 's');
00272   ucdr_serialize_uint8_t(&m_microbuffer, 't');
00273   ucdr_serialize_uint8_t(&m_microbuffer, 'm');
00274   ucdr_serialize_uint8_t(&m_microbuffer, ';');
00275   ucdr_serialize_uint8_t(&m_microbuffer, 'n');
00276   ucdr_serialize_uint8_t(&m_microbuffer, 'a');
00277   ucdr_serialize_uint8_t(&m_microbuffer, 'm');
00278   ucdr_serialize_uint8_t(&m_microbuffer, 'e');
00279   ucdr_serialize_uint8_t(&m_microbuffer, 's');
00280   ucdr_serialize_uint8_t(&m_microbuffer, 'p');
00281   ucdr_serialize_uint8_t(&m_microbuffer, 'a');
00282   ucdr_serialize_uint8_t(&m_microbuffer, 'c');
00283   ucdr_serialize_uint8_t(&m_microbuffer, 'e');
00284   ucdr_serialize_uint8_t(&m_microbuffer, '=');
00285   ucdr_serialize_uint8_t(&m_microbuffer, '/');
00286   ucdr_serialize_uint8_t(&m_microbuffer, ';');
00287   ucdr_serialize_uint8_t(&m_microbuffer, 0);
00288   ucdr_serialize_uint16_t(&m_microbuffer, 0);
00289 */
00290   ucdr_serialize_uint16_t(&m_microbuffer, ParameterId::PID_SENTINEL);
00291   ucdr_serialize_uint16_t(&m_microbuffer, 0);
00292 }
00293 
00294 void SPDPAgent::addParticipantParameters() {
00295   const uint16_t zero_options = 0;
00296   const uint16_t protocolVersionSize =
00297       sizeof(PROTOCOLVERSION.major) + sizeof(PROTOCOLVERSION.minor);
00298   const uint16_t vendorIdSize = Config::VENDOR_ID.vendorId.size();
00299   const uint16_t locatorSize = sizeof(Locator);
00300   const uint16_t durationSize =
00301       sizeof(Duration_t::seconds) + sizeof(Duration_t::fraction);
00302   const uint16_t entityKeySize = 3;
00303   const uint16_t entityKindSize = 1;
00304   const uint16_t entityIdSize = entityKeySize + entityKindSize;
00305   const uint16_t guidSize = sizeof(GuidPrefix_t::id) + entityIdSize;
00306 
00307   const Locator userUniCastLocator =
00308       getUserUnicastLocator(mp_participant->m_participantId);
00309   const Locator builtInUniCastLocator =
00310       getBuiltInUnicastLocator(mp_participant->m_participantId);
00311   const Locator builtInMultiCastLocator = getBuiltInMulticastLocator();
00312 
00313   ucdr_serialize_array_uint8_t(&m_microbuffer,
00314                                rtps::SMElement::SCHEME_PL_CDR_LE.data(),
00315                                rtps::SMElement::SCHEME_PL_CDR_LE.size());
00316   ucdr_serialize_uint16_t(&m_microbuffer, zero_options);
00317 
00318   ucdr_serialize_uint16_t(&m_microbuffer, ParameterId::PID_PROTOCOL_VERSION);
00319   ucdr_serialize_uint16_t(&m_microbuffer, protocolVersionSize + 2);
00320   ucdr_serialize_uint8_t(&m_microbuffer, PROTOCOLVERSION.major);
00321   ucdr_serialize_uint8_t(&m_microbuffer, PROTOCOLVERSION.minor);
00322   m_microbuffer.iterator += 2;      // padding
00323   m_microbuffer.last_data_size = 4; // to 4 byte
00324 
00325   ucdr_serialize_uint16_t(&m_microbuffer, ParameterId::PID_VENDORID);
00326   ucdr_serialize_uint16_t(&m_microbuffer, vendorIdSize + 2);
00327   ucdr_serialize_array_uint8_t(&m_microbuffer,
00328                                Config::VENDOR_ID.vendorId.data(), vendorIdSize);
00329   m_microbuffer.iterator += 2;      // padding
00330   m_microbuffer.last_data_size = 4; // to 4 byte
00331 
00332   ucdr_serialize_uint16_t(&m_microbuffer,
00333                           ParameterId::PID_DEFAULT_UNICAST_LOCATOR);
00334   ucdr_serialize_uint16_t(&m_microbuffer, locatorSize);
00335   ucdr_serialize_array_uint8_t(
00336       &m_microbuffer, reinterpret_cast<const uint8_t *>(&userUniCastLocator),
00337       locatorSize);
00338 
00339   ucdr_serialize_uint16_t(&m_microbuffer,
00340                           ParameterId::PID_METATRAFFIC_UNICAST_LOCATOR);
00341   ucdr_serialize_uint16_t(&m_microbuffer, locatorSize);
00342   ucdr_serialize_array_uint8_t(
00343       &m_microbuffer, reinterpret_cast<const uint8_t *>(&builtInUniCastLocator),
00344       locatorSize);
00345 
00346   ucdr_serialize_uint16_t(&m_microbuffer,
00347                           ParameterId::PID_METATRAFFIC_MULTICAST_LOCATOR);
00348   ucdr_serialize_uint16_t(&m_microbuffer, locatorSize);
00349   ucdr_serialize_array_uint8_t(
00350       &m_microbuffer,
00351       reinterpret_cast<const uint8_t *>(&builtInMultiCastLocator), locatorSize);
00352 
00353   ucdr_serialize_uint16_t(&m_microbuffer,
00354                           ParameterId::PID_PARTICIPANT_LEASE_DURATION);
00355   ucdr_serialize_uint16_t(&m_microbuffer, durationSize);
00356   ucdr_serialize_int32_t(&m_microbuffer, Config::SPDP_LEASE_DURATION.seconds);
00357   ucdr_serialize_uint32_t(&m_microbuffer, Config::SPDP_LEASE_DURATION.fraction);
00358 
00359   ucdr_serialize_uint16_t(&m_microbuffer, ParameterId::PID_PARTICIPANT_GUID);
00360   ucdr_serialize_uint16_t(&m_microbuffer, guidSize);
00361   ucdr_serialize_array_uint8_t(&m_microbuffer,
00362                                mp_participant->m_guidPrefix.id.data(),
00363                                sizeof(GuidPrefix_t::id));
00364   ucdr_serialize_array_uint8_t(&m_microbuffer,
00365                                ENTITYID_BUILD_IN_PARTICIPANT.entityKey.data(),
00366                                entityKeySize);
00367   ucdr_serialize_uint8_t(
00368       &m_microbuffer,
00369       static_cast<uint8_t>(ENTITYID_BUILD_IN_PARTICIPANT.entityKind));
00370 
00371   ucdr_serialize_uint16_t(&m_microbuffer,
00372                           ParameterId::PID_BUILTIN_ENDPOINT_SET);
00373   ucdr_serialize_uint16_t(&m_microbuffer, sizeof(BuildInEndpointSet));
00374   ucdr_serialize_uint32_t(
00375       &m_microbuffer, BuildInEndpointSet::DISC_BIE_PARTICIPANT_ANNOUNCER |
00376                           BuildInEndpointSet::DISC_BIE_PARTICIPANT_DETECTOR |
00377                           BuildInEndpointSet::DISC_BIE_PUBLICATION_ANNOUNCER |
00378                           BuildInEndpointSet::DISC_BIE_PUBLICATION_DETECTOR |
00379                           BuildInEndpointSet::DISC_BIE_SUBSCRIPTION_ANNOUNCER |
00380                           BuildInEndpointSet::DISC_BIE_SUBSCRIPTION_DETECTOR);
00381 
00382   endCurrentList();
00383 }
00384 
00385 void callRunBroadcast(void *arg){
00386     SPDPAgent::runBroadcast(arg);
00387 }
00388 
00389 #undef SPDP_VERBOSE