S Morita / mbed-mros2

Dependents:   mbed-os-example-mros2 example-mbed-mros2-sub-pose example-mbed-mros2-pub-twist example-mbed-mros2-mturtle-teleop

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers SEDPAgent.cpp Source File

SEDPAgent.cpp

00001 /*
00002 The MIT License
00003 Copyright (c) 2019 Lehrstuhl Informatik 11 - RWTH Aachen University
00004 Permission is hereby granted, free of charge, to any person obtaining a copy
00005 of this software and associated documentation files (the "Software"), to deal
00006 in the Software without restriction, including without limitation the rights
00007 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
00008 copies of the Software, and to permit persons to whom the Software is
00009 furnished to do so, subject to the following conditions:
00010 The above copyright notice and this permission notice shall be included in
00011 all copies or substantial portions of the Software.
00012 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
00013 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
00014 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
00015 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
00016 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
00017 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
00018 THE SOFTWARE
00019 
00020 This file is part of embeddedRTPS.
00021 
00022 Author: i11 - Embedded Software, RWTH Aachen University
00023 */
00024 
00025 #include "rtps/discovery/SEDPAgent.h"
00026 #include "rtps/discovery/TopicData.h"
00027 #include "rtps/entities/Participant.h"
00028 #include "rtps/entities/Reader.h"
00029 #include "rtps/entities/Writer.h"
00030 #include "rtps/messages/MessageTypes.h"
00031 #include "ucdr/microcdr.h"
00032 
00033 #define SEDP_VERBOSE 0
00034 
00035 using rtps::SEDPAgent;
00036 
00037 #if SEDP_VERBOSE
00038 uint32_t line_ = 0;
00039 char bf_[100];
00040 
00041 #include <asoa/driver/os.h>
00042 
00043 #define SEDP_LOG(...)                                                          \
00044   if (true) {                                                                  \
00045     size_t t = snprintf(bf_, sizeof(bf_), __VA_ARGS__);                        \
00046     ASOA_ASSERT(t < sizeof(bf_), "overflow");                                  \
00047     TFT_PrintLine(line_ + 3, bf_);                                             \
00048     line_ = (line_ + 1) % 30;                                                  \
00049   }
00050 
00051 #endif
00052 
00053 void SEDPAgent::init(Participant &part, const BuiltInEndpoints &endpoints) {
00054   // TODO move
00055   if (sys_mutex_new(&m_mutex) != ERR_OK) {
00056 #if SEDP_VERBOSE
00057     printf("SEDPAgent failed to create mutex\n");
00058 #endif
00059     return;
00060   }
00061 
00062   m_part = &part;
00063   m_endpoints = endpoints;
00064   if (m_endpoints.sedpPubReader != nullptr) {
00065     m_endpoints.sedpPubReader->registerCallback(receiveCallbackPublisher, this);
00066   }
00067   if (m_endpoints.sedpSubReader != nullptr) {
00068     m_endpoints.sedpSubReader->registerCallback(receiveCallbackSubscriber,
00069                                                 this);
00070   }
00071 }
00072 
00073 void SEDPAgent::registerOnNewPublisherMatchedCallback(
00074     void (*callback)(void *arg), void *args) {
00075   mfp_onNewPublisherCallback = callback;
00076   m_onNewPublisherArgs = args;
00077 }
00078 
00079 void SEDPAgent::registerOnNewSubscriberMatchedCallback(
00080     void (*callback)(void *arg), void *args) {
00081   mfp_onNewSubscriberCallback = callback;
00082   m_onNewSubscriberArgs = args;
00083 }
00084 
00085 void SEDPAgent::receiveCallbackPublisher(void *callee,
00086                                          const ReaderCacheChange &cacheChange) {
00087   auto agent = static_cast<SEDPAgent *>(callee);
00088   agent->onNewPublisher(cacheChange);
00089 }
00090 
00091 void SEDPAgent::receiveCallbackSubscriber(
00092     void *callee, const ReaderCacheChange &cacheChange) {
00093   auto agent = static_cast<SEDPAgent *>(callee);
00094   agent->onNewSubscriber(cacheChange);
00095 }
00096 
00097 void SEDPAgent::onNewPublisher(const ReaderCacheChange &change) {
00098   Lock lock{m_mutex};
00099 #if SEDP_VERBOSE
00100   SEDP_LOG("New publisher\n");
00101 #endif
00102 
00103   if (!change.copyInto(m_buffer, sizeof(m_buffer) / sizeof(m_buffer[0]))) {
00104 #if SEDP_VERBOSE
00105     SEDP_LOG("EDPAgent: Buffer too small.\n");
00106 #endif
00107     return;
00108   }
00109   ucdrBuffer cdrBuffer;
00110   ucdr_init_buffer(&cdrBuffer, m_buffer, sizeof(m_buffer));
00111 
00112   TopicData topicData;
00113   if (topicData.readFromUcdrBuffer(cdrBuffer)) {
00114     onNewPublisher(topicData);
00115   }
00116 }
00117 
00118 void SEDPAgent::onNewPublisher(const TopicData &writerData) {
00119   // TODO Is it okay to add Endpoint if the respective participant is unknown
00120   // participant?
00121   if (!m_part->findRemoteParticipant(writerData.endpointGuid.prefix)) {
00122     return;
00123   }
00124 #if SEDP_VERBOSE
00125   SEDP_LOG("PUB T/D %s/%s", writerData.topicName, writerData.typeName);
00126 #endif
00127   Reader *reader = m_part->getMatchingReader(writerData);
00128   if (reader == nullptr) {
00129 #if SEDP_VERBOSE
00130     SEDP_LOG("SEDPAgent: Couldn't find reader for new Publisher[%s, %s]\n",
00131              writerData.topicName, writerData.typeName);
00132 #endif
00133     return;
00134   }
00135   // TODO check policies
00136 #if SEDP_VERBOSE
00137   SEDP_LOG("Found a new ");
00138   if (writerData.reliabilityKind == ReliabilityKind_t::RELIABLE) {
00139     SEDP_LOG("reliable ");
00140   } else {
00141     SEDP_LOG("best-effort ");
00142   }
00143   SEDP_LOG("publisher\n");
00144 #endif
00145   reader->addNewMatchedWriter(
00146       WriterProxy{writerData.endpointGuid, writerData.unicastLocator});
00147   if (mfp_onNewPublisherCallback != nullptr) {
00148     mfp_onNewPublisherCallback(m_onNewPublisherArgs);
00149   }
00150 }
00151 
00152 void SEDPAgent::onNewSubscriber(const ReaderCacheChange &change) {
00153   Lock lock{m_mutex};
00154 #if SEDP_VERBOSE
00155   SEDP_LOG("New subscriber\n");
00156 #endif
00157 
00158   if (!change.copyInto(m_buffer, sizeof(m_buffer) / sizeof(m_buffer[0]))) {
00159 #if SEDP_VERBOSE
00160     SEDP_LOG("SEDPAgent: Buffer too small.");
00161 #endif
00162     return;
00163   }
00164   ucdrBuffer cdrBuffer;
00165   ucdr_init_buffer(&cdrBuffer, m_buffer, sizeof(m_buffer));
00166 
00167   TopicData topicData;
00168   if (topicData.readFromUcdrBuffer(cdrBuffer)) {
00169     onNewSubscriber(topicData);
00170   }
00171 }
00172 
00173 void SEDPAgent::onNewSubscriber(const TopicData &readerData) {
00174   if (!m_part->findRemoteParticipant(readerData.endpointGuid.prefix)) {
00175     return;
00176   }
00177   Writer *writer = m_part->getMatchingWriter(readerData);
00178 #if SEDP_VERBOSE
00179   SEDP_LOG("SUB T/D %s/%s", readerData.topicName, readerData.typeName);
00180 #endif
00181   if (writer == nullptr) {
00182 #if SEDP_VERBOSE
00183     SEDP_LOG("SEDPAgent: Couldn't find writer for new subscriber[%s, %s]\n",
00184              readerData.topicName, readerData.typeName);
00185 #endif
00186     return;
00187   }
00188 
00189   // TODO check policies
00190 #if SEDP_VERBOSE
00191   SEDP_LOG("Found a new ");
00192   if (readerData.reliabilityKind == ReliabilityKind_t::RELIABLE) {
00193     SEDP_LOG("reliable ");
00194   } else {
00195     SEDP_LOG("best-effort ");
00196   }
00197   SEDP_LOG("Subscriber\n");
00198 #endif
00199   writer->addNewMatchedReader(
00200       ReaderProxy{readerData.endpointGuid, readerData.unicastLocator});
00201   if (mfp_onNewSubscriberCallback != nullptr) {
00202     mfp_onNewSubscriberCallback(m_onNewSubscriberArgs);
00203   }
00204 }
00205 
00206 void SEDPAgent::addWriter(Writer &writer) {
00207   if (m_endpoints.sedpPubWriter == nullptr) {
00208     return;
00209   }
00210   EntityKind_t writerKind =
00211       writer.m_attributes.endpointGuid.entityId.entityKind;
00212   if (writerKind == EntityKind_t::BUILD_IN_WRITER_WITH_KEY ||
00213       writerKind == EntityKind_t::BUILD_IN_WRITER_WITHOUT_KEY) {
00214     return; // No need to announce builtin endpoints
00215   }
00216 
00217   Lock lock{m_mutex};
00218   ucdrBuffer microbuffer;
00219   ucdr_init_buffer(&microbuffer, m_buffer,
00220                    sizeof(m_buffer) / sizeof(m_buffer[0]));
00221   const uint16_t zero_options = 0;
00222 
00223   ucdr_serialize_array_uint8_t(&microbuffer,
00224                                rtps::SMElement::SCHEME_PL_CDR_LE.data(),
00225                                rtps::SMElement::SCHEME_PL_CDR_LE.size());
00226   ucdr_serialize_uint16_t(&microbuffer, zero_options);
00227   writer.m_attributes.serializeIntoUcdrBuffer(microbuffer);
00228   m_endpoints.sedpPubWriter->newChange(ChangeKind_t::ALIVE, m_buffer,
00229                                        ucdr_buffer_length(&microbuffer));
00230 #if SEDP_VERBOSE
00231   SEDP_LOG("Added new change to sedpPubWriter.\n");
00232 #endif
00233 }
00234 
00235 void SEDPAgent::addReader(Reader &reader) {
00236   if (m_endpoints.sedpSubWriter == nullptr) {
00237     return;
00238   }
00239 
00240   EntityKind_t readerKind =
00241       reader.m_attributes.endpointGuid.entityId.entityKind;
00242   if (readerKind == EntityKind_t::BUILD_IN_READER_WITH_KEY ||
00243       readerKind == EntityKind_t::BUILD_IN_READER_WITHOUT_KEY) {
00244     return; // No need to announce builtin endpoints
00245   }
00246 
00247   Lock lock{m_mutex};
00248   ucdrBuffer microbuffer;
00249   ucdr_init_buffer(&microbuffer, m_buffer,
00250                    sizeof(m_buffer) / sizeof(m_buffer[0]));
00251   const uint16_t zero_options = 0;
00252 
00253   ucdr_serialize_array_uint8_t(&microbuffer,
00254                                rtps::SMElement::SCHEME_PL_CDR_LE.data(),
00255                                rtps::SMElement::SCHEME_PL_CDR_LE.size());
00256   ucdr_serialize_uint16_t(&microbuffer, zero_options);
00257   reader.m_attributes.serializeIntoUcdrBuffer(microbuffer);
00258   m_endpoints.sedpSubWriter->newChange(ChangeKind_t::ALIVE, m_buffer,
00259                                        ucdr_buffer_length(&microbuffer));
00260 
00261 #if SEDP_VERBOSE
00262   SEDP_LOG("Added new change to sedpSubWriter.\n");
00263 #endif
00264 }