Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependents: mbed-os-example-mros2 example-mbed-mros2-sub-pose example-mbed-mros2-pub-twist example-mbed-mros2-mturtle-teleop
SEDPAgent.cpp
00001 /* 00002 The MIT License 00003 Copyright (c) 2019 Lehrstuhl Informatik 11 - RWTH Aachen University 00004 Permission is hereby granted, free of charge, to any person obtaining a copy 00005 of this software and associated documentation files (the "Software"), to deal 00006 in the Software without restriction, including without limitation the rights 00007 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 00008 copies of the Software, and to permit persons to whom the Software is 00009 furnished to do so, subject to the following conditions: 00010 The above copyright notice and this permission notice shall be included in 00011 all copies or substantial portions of the Software. 00012 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 00013 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 00014 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 00015 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 00016 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 00017 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 00018 THE SOFTWARE 00019 00020 This file is part of embeddedRTPS. 00021 00022 Author: i11 - Embedded Software, RWTH Aachen University 00023 */ 00024 00025 #include "rtps/discovery/SEDPAgent.h" 00026 #include "rtps/discovery/TopicData.h" 00027 #include "rtps/entities/Participant.h" 00028 #include "rtps/entities/Reader.h" 00029 #include "rtps/entities/Writer.h" 00030 #include "rtps/messages/MessageTypes.h" 00031 #include "ucdr/microcdr.h" 00032 00033 #define SEDP_VERBOSE 0 00034 00035 using rtps::SEDPAgent; 00036 00037 #if SEDP_VERBOSE 00038 uint32_t line_ = 0; 00039 char bf_[100]; 00040 00041 #include <asoa/driver/os.h> 00042 00043 #define SEDP_LOG(...) \ 00044 if (true) { \ 00045 size_t t = snprintf(bf_, sizeof(bf_), __VA_ARGS__); \ 00046 ASOA_ASSERT(t < sizeof(bf_), "overflow"); \ 00047 TFT_PrintLine(line_ + 3, bf_); \ 00048 line_ = (line_ + 1) % 30; \ 00049 } 00050 00051 #endif 00052 00053 void SEDPAgent::init(Participant &part, const BuiltInEndpoints &endpoints) { 00054 // TODO move 00055 if (sys_mutex_new(&m_mutex) != ERR_OK) { 00056 #if SEDP_VERBOSE 00057 printf("SEDPAgent failed to create mutex\n"); 00058 #endif 00059 return; 00060 } 00061 00062 m_part = ∂ 00063 m_endpoints = endpoints; 00064 if (m_endpoints.sedpPubReader != nullptr) { 00065 m_endpoints.sedpPubReader->registerCallback(receiveCallbackPublisher, this); 00066 } 00067 if (m_endpoints.sedpSubReader != nullptr) { 00068 m_endpoints.sedpSubReader->registerCallback(receiveCallbackSubscriber, 00069 this); 00070 } 00071 } 00072 00073 void SEDPAgent::registerOnNewPublisherMatchedCallback( 00074 void (*callback)(void *arg), void *args) { 00075 mfp_onNewPublisherCallback = callback; 00076 m_onNewPublisherArgs = args; 00077 } 00078 00079 void SEDPAgent::registerOnNewSubscriberMatchedCallback( 00080 void (*callback)(void *arg), void *args) { 00081 mfp_onNewSubscriberCallback = callback; 00082 m_onNewSubscriberArgs = args; 00083 } 00084 00085 void SEDPAgent::receiveCallbackPublisher(void *callee, 00086 const ReaderCacheChange &cacheChange) { 00087 auto agent = static_cast<SEDPAgent *>(callee); 00088 agent->onNewPublisher(cacheChange); 00089 } 00090 00091 void SEDPAgent::receiveCallbackSubscriber( 00092 void *callee, const ReaderCacheChange &cacheChange) { 00093 auto agent = static_cast<SEDPAgent *>(callee); 00094 agent->onNewSubscriber(cacheChange); 00095 } 00096 00097 void SEDPAgent::onNewPublisher(const ReaderCacheChange &change) { 00098 Lock lock{m_mutex}; 00099 #if SEDP_VERBOSE 00100 SEDP_LOG("New publisher\n"); 00101 #endif 00102 00103 if (!change.copyInto(m_buffer, sizeof(m_buffer) / sizeof(m_buffer[0]))) { 00104 #if SEDP_VERBOSE 00105 SEDP_LOG("EDPAgent: Buffer too small.\n"); 00106 #endif 00107 return; 00108 } 00109 ucdrBuffer cdrBuffer; 00110 ucdr_init_buffer(&cdrBuffer, m_buffer, sizeof(m_buffer)); 00111 00112 TopicData topicData; 00113 if (topicData.readFromUcdrBuffer(cdrBuffer)) { 00114 onNewPublisher(topicData); 00115 } 00116 } 00117 00118 void SEDPAgent::onNewPublisher(const TopicData &writerData) { 00119 // TODO Is it okay to add Endpoint if the respective participant is unknown 00120 // participant? 00121 if (!m_part->findRemoteParticipant(writerData.endpointGuid.prefix)) { 00122 return; 00123 } 00124 #if SEDP_VERBOSE 00125 SEDP_LOG("PUB T/D %s/%s", writerData.topicName, writerData.typeName); 00126 #endif 00127 Reader *reader = m_part->getMatchingReader(writerData); 00128 if (reader == nullptr) { 00129 #if SEDP_VERBOSE 00130 SEDP_LOG("SEDPAgent: Couldn't find reader for new Publisher[%s, %s]\n", 00131 writerData.topicName, writerData.typeName); 00132 #endif 00133 return; 00134 } 00135 // TODO check policies 00136 #if SEDP_VERBOSE 00137 SEDP_LOG("Found a new "); 00138 if (writerData.reliabilityKind == ReliabilityKind_t::RELIABLE) { 00139 SEDP_LOG("reliable "); 00140 } else { 00141 SEDP_LOG("best-effort "); 00142 } 00143 SEDP_LOG("publisher\n"); 00144 #endif 00145 reader->addNewMatchedWriter( 00146 WriterProxy{writerData.endpointGuid, writerData.unicastLocator}); 00147 if (mfp_onNewPublisherCallback != nullptr) { 00148 mfp_onNewPublisherCallback(m_onNewPublisherArgs); 00149 } 00150 } 00151 00152 void SEDPAgent::onNewSubscriber(const ReaderCacheChange &change) { 00153 Lock lock{m_mutex}; 00154 #if SEDP_VERBOSE 00155 SEDP_LOG("New subscriber\n"); 00156 #endif 00157 00158 if (!change.copyInto(m_buffer, sizeof(m_buffer) / sizeof(m_buffer[0]))) { 00159 #if SEDP_VERBOSE 00160 SEDP_LOG("SEDPAgent: Buffer too small."); 00161 #endif 00162 return; 00163 } 00164 ucdrBuffer cdrBuffer; 00165 ucdr_init_buffer(&cdrBuffer, m_buffer, sizeof(m_buffer)); 00166 00167 TopicData topicData; 00168 if (topicData.readFromUcdrBuffer(cdrBuffer)) { 00169 onNewSubscriber(topicData); 00170 } 00171 } 00172 00173 void SEDPAgent::onNewSubscriber(const TopicData &readerData) { 00174 if (!m_part->findRemoteParticipant(readerData.endpointGuid.prefix)) { 00175 return; 00176 } 00177 Writer *writer = m_part->getMatchingWriter(readerData); 00178 #if SEDP_VERBOSE 00179 SEDP_LOG("SUB T/D %s/%s", readerData.topicName, readerData.typeName); 00180 #endif 00181 if (writer == nullptr) { 00182 #if SEDP_VERBOSE 00183 SEDP_LOG("SEDPAgent: Couldn't find writer for new subscriber[%s, %s]\n", 00184 readerData.topicName, readerData.typeName); 00185 #endif 00186 return; 00187 } 00188 00189 // TODO check policies 00190 #if SEDP_VERBOSE 00191 SEDP_LOG("Found a new "); 00192 if (readerData.reliabilityKind == ReliabilityKind_t::RELIABLE) { 00193 SEDP_LOG("reliable "); 00194 } else { 00195 SEDP_LOG("best-effort "); 00196 } 00197 SEDP_LOG("Subscriber\n"); 00198 #endif 00199 writer->addNewMatchedReader( 00200 ReaderProxy{readerData.endpointGuid, readerData.unicastLocator}); 00201 if (mfp_onNewSubscriberCallback != nullptr) { 00202 mfp_onNewSubscriberCallback(m_onNewSubscriberArgs); 00203 } 00204 } 00205 00206 void SEDPAgent::addWriter(Writer &writer) { 00207 if (m_endpoints.sedpPubWriter == nullptr) { 00208 return; 00209 } 00210 EntityKind_t writerKind = 00211 writer.m_attributes.endpointGuid.entityId.entityKind; 00212 if (writerKind == EntityKind_t::BUILD_IN_WRITER_WITH_KEY || 00213 writerKind == EntityKind_t::BUILD_IN_WRITER_WITHOUT_KEY) { 00214 return; // No need to announce builtin endpoints 00215 } 00216 00217 Lock lock{m_mutex}; 00218 ucdrBuffer microbuffer; 00219 ucdr_init_buffer(µbuffer, m_buffer, 00220 sizeof(m_buffer) / sizeof(m_buffer[0])); 00221 const uint16_t zero_options = 0; 00222 00223 ucdr_serialize_array_uint8_t(µbuffer, 00224 rtps::SMElement::SCHEME_PL_CDR_LE.data(), 00225 rtps::SMElement::SCHEME_PL_CDR_LE.size()); 00226 ucdr_serialize_uint16_t(µbuffer, zero_options); 00227 writer.m_attributes.serializeIntoUcdrBuffer(microbuffer); 00228 m_endpoints.sedpPubWriter->newChange(ChangeKind_t::ALIVE, m_buffer, 00229 ucdr_buffer_length(µbuffer)); 00230 #if SEDP_VERBOSE 00231 SEDP_LOG("Added new change to sedpPubWriter.\n"); 00232 #endif 00233 } 00234 00235 void SEDPAgent::addReader(Reader &reader) { 00236 if (m_endpoints.sedpSubWriter == nullptr) { 00237 return; 00238 } 00239 00240 EntityKind_t readerKind = 00241 reader.m_attributes.endpointGuid.entityId.entityKind; 00242 if (readerKind == EntityKind_t::BUILD_IN_READER_WITH_KEY || 00243 readerKind == EntityKind_t::BUILD_IN_READER_WITHOUT_KEY) { 00244 return; // No need to announce builtin endpoints 00245 } 00246 00247 Lock lock{m_mutex}; 00248 ucdrBuffer microbuffer; 00249 ucdr_init_buffer(µbuffer, m_buffer, 00250 sizeof(m_buffer) / sizeof(m_buffer[0])); 00251 const uint16_t zero_options = 0; 00252 00253 ucdr_serialize_array_uint8_t(µbuffer, 00254 rtps::SMElement::SCHEME_PL_CDR_LE.data(), 00255 rtps::SMElement::SCHEME_PL_CDR_LE.size()); 00256 ucdr_serialize_uint16_t(µbuffer, zero_options); 00257 reader.m_attributes.serializeIntoUcdrBuffer(microbuffer); 00258 m_endpoints.sedpSubWriter->newChange(ChangeKind_t::ALIVE, m_buffer, 00259 ucdr_buffer_length(µbuffer)); 00260 00261 #if SEDP_VERBOSE 00262 SEDP_LOG("Added new change to sedpSubWriter.\n"); 00263 #endif 00264 }
Generated on Sun Jul 31 2022 04:49:48 by
1.7.2