S Morita / mbed-mros2

Dependents:   mbed-os-example-mros2 example-mbed-mros2-sub-pose example-mbed-mros2-pub-twist example-mbed-mros2-mturtle-teleop

Committer:
smoritaemb
Date:
Sat Mar 19 09:23:37 2022 +0900
Revision:
7:c80f65422d99
Parent:
0:580aba13d1a1
Merge test_assortment_of_msgs branch.

Who changed what in which revision?

UserRevisionLine numberNew contents of line
smoritaemb 0:580aba13d1a1 1 /*
smoritaemb 0:580aba13d1a1 2 The MIT License
smoritaemb 0:580aba13d1a1 3 Copyright (c) 2019 Lehrstuhl Informatik 11 - RWTH Aachen University
smoritaemb 0:580aba13d1a1 4 Permission is hereby granted, free of charge, to any person obtaining a copy
smoritaemb 0:580aba13d1a1 5 of this software and associated documentation files (the "Software"), to deal
smoritaemb 0:580aba13d1a1 6 in the Software without restriction, including without limitation the rights
smoritaemb 0:580aba13d1a1 7 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
smoritaemb 0:580aba13d1a1 8 copies of the Software, and to permit persons to whom the Software is
smoritaemb 0:580aba13d1a1 9 furnished to do so, subject to the following conditions:
smoritaemb 0:580aba13d1a1 10 The above copyright notice and this permission notice shall be included in
smoritaemb 0:580aba13d1a1 11 all copies or substantial portions of the Software.
smoritaemb 0:580aba13d1a1 12 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
smoritaemb 0:580aba13d1a1 13 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
smoritaemb 0:580aba13d1a1 14 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
smoritaemb 0:580aba13d1a1 15 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
smoritaemb 0:580aba13d1a1 16 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
smoritaemb 0:580aba13d1a1 17 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
smoritaemb 0:580aba13d1a1 18 THE SOFTWARE
smoritaemb 0:580aba13d1a1 19
smoritaemb 0:580aba13d1a1 20 This file is part of embeddedRTPS.
smoritaemb 0:580aba13d1a1 21
smoritaemb 0:580aba13d1a1 22 Author: i11 - Embedded Software, RWTH Aachen University
smoritaemb 0:580aba13d1a1 23 */
smoritaemb 0:580aba13d1a1 24
smoritaemb 0:580aba13d1a1 25 #include "rtps/messages/MessageReceiver.h"
smoritaemb 0:580aba13d1a1 26 #include <rtps/entities/Participant.h>
smoritaemb 0:580aba13d1a1 27
smoritaemb 0:580aba13d1a1 28 #include "rtps/entities/Reader.h"
smoritaemb 0:580aba13d1a1 29 #include "rtps/entities/Writer.h"
smoritaemb 0:580aba13d1a1 30 #include "rtps/messages/MessageTypes.h"
smoritaemb 0:580aba13d1a1 31
smoritaemb 0:580aba13d1a1 32 using rtps::MessageReceiver;
smoritaemb 0:580aba13d1a1 33
smoritaemb 0:580aba13d1a1 34 #define RECV_VERBOSE 0
smoritaemb 0:580aba13d1a1 35
smoritaemb 0:580aba13d1a1 36 #if RECV_VERBOSE
smoritaemb 0:580aba13d1a1 37 #include "rtps/utils/printutils.h"
smoritaemb 0:580aba13d1a1 38 #endif
smoritaemb 0:580aba13d1a1 39
smoritaemb 0:580aba13d1a1 40 MessageReceiver::MessageReceiver(Participant *part) : mp_part(part) {}
smoritaemb 0:580aba13d1a1 41
smoritaemb 0:580aba13d1a1 42 void MessageReceiver::resetState() {
smoritaemb 0:580aba13d1a1 43 sourceGuidPrefix = GUIDPREFIX_UNKNOWN;
smoritaemb 0:580aba13d1a1 44 sourceVersion = PROTOCOLVERSION;
smoritaemb 0:580aba13d1a1 45 sourceVendor = VENDOR_UNKNOWN;
smoritaemb 0:580aba13d1a1 46 haveTimeStamp = false;
smoritaemb 0:580aba13d1a1 47 }
smoritaemb 0:580aba13d1a1 48
smoritaemb 0:580aba13d1a1 49 bool MessageReceiver::processMessage(const uint8_t *data, DataSize_t size) {
smoritaemb 0:580aba13d1a1 50 resetState();
smoritaemb 0:580aba13d1a1 51 MessageProcessingInfo msgInfo(data, size);
smoritaemb 0:580aba13d1a1 52
smoritaemb 0:580aba13d1a1 53 if (!processHeader(msgInfo)) {
smoritaemb 0:580aba13d1a1 54 return false;
smoritaemb 0:580aba13d1a1 55 }
smoritaemb 0:580aba13d1a1 56 SubmessageHeader submsgHeader;
smoritaemb 0:580aba13d1a1 57 while (msgInfo.nextPos < msgInfo.size) {
smoritaemb 0:580aba13d1a1 58 if (!deserializeMessage(msgInfo, submsgHeader)) {
smoritaemb 0:580aba13d1a1 59 return false;
smoritaemb 0:580aba13d1a1 60 }
smoritaemb 0:580aba13d1a1 61 processSubmessage(msgInfo, submsgHeader);
smoritaemb 0:580aba13d1a1 62 }
smoritaemb 0:580aba13d1a1 63
smoritaemb 0:580aba13d1a1 64 return true;
smoritaemb 0:580aba13d1a1 65 }
smoritaemb 0:580aba13d1a1 66
smoritaemb 0:580aba13d1a1 67 bool MessageReceiver::processHeader(MessageProcessingInfo &msgInfo) {
smoritaemb 0:580aba13d1a1 68 Header header;
smoritaemb 0:580aba13d1a1 69 if (!deserializeMessage(msgInfo, header)) {
smoritaemb 0:580aba13d1a1 70 return false;
smoritaemb 0:580aba13d1a1 71 }
smoritaemb 0:580aba13d1a1 72
smoritaemb 0:580aba13d1a1 73 if (header.guidPrefix.id == mp_part->m_guidPrefix.id) {
smoritaemb 0:580aba13d1a1 74 #if RECV_VERBOSE
smoritaemb 0:580aba13d1a1 75 printf("[MessageReceiver]: Received own message.\n");
smoritaemb 0:580aba13d1a1 76 #endif
smoritaemb 0:580aba13d1a1 77 return false; // Don't process our own packet
smoritaemb 0:580aba13d1a1 78 }
smoritaemb 0:580aba13d1a1 79
smoritaemb 0:580aba13d1a1 80 if (header.protocolName != RTPS_PROTOCOL_NAME ||
smoritaemb 0:580aba13d1a1 81 header.protocolVersion.major != PROTOCOLVERSION.major) {
smoritaemb 0:580aba13d1a1 82 return false;
smoritaemb 0:580aba13d1a1 83 }
smoritaemb 0:580aba13d1a1 84
smoritaemb 0:580aba13d1a1 85 sourceGuidPrefix = header.guidPrefix;
smoritaemb 0:580aba13d1a1 86 sourceVendor = header.vendorId;
smoritaemb 0:580aba13d1a1 87 sourceVersion = header.protocolVersion;
smoritaemb 0:580aba13d1a1 88
smoritaemb 0:580aba13d1a1 89 msgInfo.nextPos += Header::getRawSize();
smoritaemb 0:580aba13d1a1 90 return true;
smoritaemb 0:580aba13d1a1 91 }
smoritaemb 0:580aba13d1a1 92
smoritaemb 0:580aba13d1a1 93 bool MessageReceiver::processSubmessage(MessageProcessingInfo &msgInfo,
smoritaemb 0:580aba13d1a1 94 const SubmessageHeader &submsgHeader) {
smoritaemb 0:580aba13d1a1 95 bool success = false;
smoritaemb 0:580aba13d1a1 96
smoritaemb 0:580aba13d1a1 97 switch (submsgHeader.submessageId) {
smoritaemb 0:580aba13d1a1 98 case SubmessageKind::ACKNACK:
smoritaemb 0:580aba13d1a1 99 #if RECV_VERBOSE
smoritaemb 0:580aba13d1a1 100 printf("Processing AckNack submessage\n");
smoritaemb 0:580aba13d1a1 101 #endif
smoritaemb 0:580aba13d1a1 102 success = processAckNackSubmessage(msgInfo);
smoritaemb 0:580aba13d1a1 103 break;
smoritaemb 0:580aba13d1a1 104 case SubmessageKind::DATA:
smoritaemb 0:580aba13d1a1 105 #if RECV_VERBOSE
smoritaemb 0:580aba13d1a1 106 printf("Processing Data submessage\n");
smoritaemb 0:580aba13d1a1 107 #endif
smoritaemb 0:580aba13d1a1 108 success = processDataSubmessage(msgInfo);
smoritaemb 0:580aba13d1a1 109 break;
smoritaemb 0:580aba13d1a1 110 case SubmessageKind::HEARTBEAT:
smoritaemb 0:580aba13d1a1 111 #if RECV_VERBOSE
smoritaemb 0:580aba13d1a1 112 printf("Processing Heartbeat submessage\n");
smoritaemb 0:580aba13d1a1 113 #endif
smoritaemb 0:580aba13d1a1 114 success = processHeartbeatSubmessage(msgInfo);
smoritaemb 0:580aba13d1a1 115 break;
smoritaemb 0:580aba13d1a1 116 case SubmessageKind::INFO_DST:
smoritaemb 0:580aba13d1a1 117 #if RECV_VERBOSE
smoritaemb 0:580aba13d1a1 118 printf("Info_DST submessage not relevant.\n");
smoritaemb 0:580aba13d1a1 119 #endif
smoritaemb 0:580aba13d1a1 120 success = true; // Not relevant
smoritaemb 0:580aba13d1a1 121 break;
smoritaemb 0:580aba13d1a1 122 case SubmessageKind::INFO_TS:
smoritaemb 0:580aba13d1a1 123 #if RECV_VERBOSE
smoritaemb 0:580aba13d1a1 124 printf("Info_TS submessage not relevant.\n");
smoritaemb 0:580aba13d1a1 125 #endif
smoritaemb 0:580aba13d1a1 126 success = true; // Not relevant now
smoritaemb 0:580aba13d1a1 127 break;
smoritaemb 0:580aba13d1a1 128 default:
smoritaemb 0:580aba13d1a1 129 #if RECV_VERBOSE
smoritaemb 0:580aba13d1a1 130 printf("Submessage of type %u currently not supported. Skipping..\n",
smoritaemb 0:580aba13d1a1 131 static_cast<uint8_t>(submsgHeader->submessageId));
smoritaemb 0:580aba13d1a1 132 #endif
smoritaemb 0:580aba13d1a1 133 success = false;
smoritaemb 0:580aba13d1a1 134 }
smoritaemb 0:580aba13d1a1 135 msgInfo.nextPos +=
smoritaemb 0:580aba13d1a1 136 submsgHeader.submessageLength + SubmessageHeader::getRawSize();
smoritaemb 0:580aba13d1a1 137 return success;
smoritaemb 0:580aba13d1a1 138 }
smoritaemb 0:580aba13d1a1 139
smoritaemb 0:580aba13d1a1 140 bool MessageReceiver::processDataSubmessage(MessageProcessingInfo &msgInfo) {
smoritaemb 0:580aba13d1a1 141 SubmessageData dataSubmsg;
smoritaemb 0:580aba13d1a1 142 if (!deserializeMessage(msgInfo, dataSubmsg)) {
smoritaemb 0:580aba13d1a1 143 return false;
smoritaemb 0:580aba13d1a1 144 }
smoritaemb 0:580aba13d1a1 145
smoritaemb 0:580aba13d1a1 146 const uint8_t *serializedData =
smoritaemb 0:580aba13d1a1 147 msgInfo.getPointerToCurrentPos() + SubmessageData::getRawSize();
smoritaemb 0:580aba13d1a1 148 const DataSize_t size =
smoritaemb 0:580aba13d1a1 149 msgInfo.size - (msgInfo.nextPos + SubmessageData::getRawSize());
smoritaemb 0:580aba13d1a1 150
smoritaemb 0:580aba13d1a1 151 // bool isLittleEndian = (submsgHeader->flags &
smoritaemb 0:580aba13d1a1 152 // SubMessageFlag::FLAG_ENDIANESS); bool hasInlineQos = (submsgHeader->flags &
smoritaemb 0:580aba13d1a1 153 // SubMessageFlag::FLAG_INLINE_QOS);
smoritaemb 0:580aba13d1a1 154
smoritaemb 0:580aba13d1a1 155 Reader *reader = mp_part->getReader(dataSubmsg.readerId);
smoritaemb 0:580aba13d1a1 156 if (reader != nullptr) {
smoritaemb 0:580aba13d1a1 157 Guid writerGuid{sourceGuidPrefix, dataSubmsg.writerId};
smoritaemb 0:580aba13d1a1 158 ReaderCacheChange change{ChangeKind_t::ALIVE, writerGuid,
smoritaemb 0:580aba13d1a1 159 dataSubmsg.writerSN, serializedData, size};
smoritaemb 0:580aba13d1a1 160 reader->newChange(change);
smoritaemb 0:580aba13d1a1 161 } else {
smoritaemb 0:580aba13d1a1 162 #if RECV_VERBOSE
smoritaemb 0:580aba13d1a1 163 printf("Couldn't find a reader with id: ");
smoritaemb 0:580aba13d1a1 164 printEntityId(dataSubmsg->readerId);
smoritaemb 0:580aba13d1a1 165 printf("\n");
smoritaemb 0:580aba13d1a1 166 #endif
smoritaemb 0:580aba13d1a1 167 }
smoritaemb 0:580aba13d1a1 168
smoritaemb 0:580aba13d1a1 169 return true;
smoritaemb 0:580aba13d1a1 170 }
smoritaemb 0:580aba13d1a1 171
smoritaemb 0:580aba13d1a1 172 bool MessageReceiver::processHeartbeatSubmessage(
smoritaemb 0:580aba13d1a1 173 MessageProcessingInfo &msgInfo) {
smoritaemb 0:580aba13d1a1 174 SubmessageHeartbeat submsgHB;
smoritaemb 0:580aba13d1a1 175 if (!deserializeMessage(msgInfo, submsgHB)) {
smoritaemb 0:580aba13d1a1 176 return false;
smoritaemb 0:580aba13d1a1 177 }
smoritaemb 0:580aba13d1a1 178
smoritaemb 0:580aba13d1a1 179 Reader *reader = mp_part->getReader(submsgHB.readerId);
smoritaemb 0:580aba13d1a1 180 if (reader != nullptr) {
smoritaemb 0:580aba13d1a1 181 reader->onNewHeartbeat(submsgHB, sourceGuidPrefix);
smoritaemb 0:580aba13d1a1 182 return true;
smoritaemb 0:580aba13d1a1 183 } else {
smoritaemb 0:580aba13d1a1 184 return false;
smoritaemb 0:580aba13d1a1 185 }
smoritaemb 0:580aba13d1a1 186 }
smoritaemb 0:580aba13d1a1 187
smoritaemb 0:580aba13d1a1 188 bool MessageReceiver::processAckNackSubmessage(MessageProcessingInfo &msgInfo) {
smoritaemb 0:580aba13d1a1 189 SubmessageAckNack submsgAckNack;
smoritaemb 0:580aba13d1a1 190 if (!deserializeMessage(msgInfo, submsgAckNack)) {
smoritaemb 0:580aba13d1a1 191 return false;
smoritaemb 0:580aba13d1a1 192 }
smoritaemb 0:580aba13d1a1 193
smoritaemb 0:580aba13d1a1 194 Writer *writer = mp_part->getWriter(submsgAckNack.writerId);
smoritaemb 0:580aba13d1a1 195 if (writer != nullptr) {
smoritaemb 0:580aba13d1a1 196 writer->onNewAckNack(submsgAckNack, sourceGuidPrefix);
smoritaemb 0:580aba13d1a1 197 return true;
smoritaemb 0:580aba13d1a1 198 } else {
smoritaemb 0:580aba13d1a1 199 return false;
smoritaemb 0:580aba13d1a1 200 }
smoritaemb 0:580aba13d1a1 201 }
smoritaemb 0:580aba13d1a1 202
smoritaemb 0:580aba13d1a1 203 #undef RECV_VERBOSE