Pavle Radojkovic / mbed-mqtt
Committer:
pavleradojkovic
Date:
Mon Jun 20 16:24:43 2022 +0000
Revision:
0:ba7e439238ab
Inital commit

Who changed what in which revision?

UserRevisionLine numberNew contents of line
pavleradojkovic 0:ba7e439238ab 1 /**************************************************************************************
pavleradojkovic 0:ba7e439238ab 2 * Copyright (c) 2016, Tomoaki Yamaguchi
pavleradojkovic 0:ba7e439238ab 3 *
pavleradojkovic 0:ba7e439238ab 4 * All rights reserved. This program and the accompanying materials
pavleradojkovic 0:ba7e439238ab 5 * are made available under the terms of the Eclipse Public License v1.0
pavleradojkovic 0:ba7e439238ab 6 * and Eclipse Distribution License v1.0 which accompany this distribution.
pavleradojkovic 0:ba7e439238ab 7 *
pavleradojkovic 0:ba7e439238ab 8 * The Eclipse Public License is available at
pavleradojkovic 0:ba7e439238ab 9 * http://www.eclipse.org/legal/epl-v10.html
pavleradojkovic 0:ba7e439238ab 10 * and the Eclipse Distribution License is available at
pavleradojkovic 0:ba7e439238ab 11 * http://www.eclipse.org/org/documents/edl-v10.php.
pavleradojkovic 0:ba7e439238ab 12 *
pavleradojkovic 0:ba7e439238ab 13 * Contributors:
pavleradojkovic 0:ba7e439238ab 14 * Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
pavleradojkovic 0:ba7e439238ab 15 * Tieto Poland Sp. z o.o. - Gateway improvements
pavleradojkovic 0:ba7e439238ab 16 **************************************************************************************/
pavleradojkovic 0:ba7e439238ab 17
pavleradojkovic 0:ba7e439238ab 18 #include "MQTTGWPublishHandler.h"
pavleradojkovic 0:ba7e439238ab 19 #include "MQTTSNGateway.h"
pavleradojkovic 0:ba7e439238ab 20 #include "MQTTSNPacket.h"
pavleradojkovic 0:ba7e439238ab 21 #include <string>
pavleradojkovic 0:ba7e439238ab 22
pavleradojkovic 0:ba7e439238ab 23 using namespace std;
pavleradojkovic 0:ba7e439238ab 24 using namespace MQTTSNGW;
pavleradojkovic 0:ba7e439238ab 25
pavleradojkovic 0:ba7e439238ab 26 char* currentDateTime(void);
pavleradojkovic 0:ba7e439238ab 27
pavleradojkovic 0:ba7e439238ab 28 MQTTGWPublishHandler::MQTTGWPublishHandler(Gateway* gateway)
pavleradojkovic 0:ba7e439238ab 29 {
pavleradojkovic 0:ba7e439238ab 30 _gateway = gateway;
pavleradojkovic 0:ba7e439238ab 31 }
pavleradojkovic 0:ba7e439238ab 32
pavleradojkovic 0:ba7e439238ab 33 MQTTGWPublishHandler::~MQTTGWPublishHandler()
pavleradojkovic 0:ba7e439238ab 34 {
pavleradojkovic 0:ba7e439238ab 35
pavleradojkovic 0:ba7e439238ab 36 }
pavleradojkovic 0:ba7e439238ab 37
pavleradojkovic 0:ba7e439238ab 38 void MQTTGWPublishHandler::handlePublish(Client* client, MQTTGWPacket* packet)
pavleradojkovic 0:ba7e439238ab 39 {
pavleradojkovic 0:ba7e439238ab 40 if ( !client->isActive() && !client->isSleep() && !client->isAwake())
pavleradojkovic 0:ba7e439238ab 41 {
pavleradojkovic 0:ba7e439238ab 42 WRITELOG("%s The client is neither active nor sleep %s%s\n", ERRMSG_HEADER, client->getStatus(), ERRMSG_FOOTER);
pavleradojkovic 0:ba7e439238ab 43 return;
pavleradojkovic 0:ba7e439238ab 44 }
pavleradojkovic 0:ba7e439238ab 45
pavleradojkovic 0:ba7e439238ab 46 /* client is sleeping. save PUBLISH */
pavleradojkovic 0:ba7e439238ab 47 if ( client->isSleep() )
pavleradojkovic 0:ba7e439238ab 48 {
pavleradojkovic 0:ba7e439238ab 49 Publish pub;
pavleradojkovic 0:ba7e439238ab 50 packet->getPUBLISH(&pub);
pavleradojkovic 0:ba7e439238ab 51
pavleradojkovic 0:ba7e439238ab 52 WRITELOG(FORMAT_Y_G_G, currentDateTime(), packet->getName(),
pavleradojkovic 0:ba7e439238ab 53 RIGHTARROW, client->getClientId(), "is sleeping. a message was saved.");
pavleradojkovic 0:ba7e439238ab 54
pavleradojkovic 0:ba7e439238ab 55 if (pub.header.bits.qos == 1)
pavleradojkovic 0:ba7e439238ab 56 {
pavleradojkovic 0:ba7e439238ab 57 replyACK(client, &pub, PUBACK);
pavleradojkovic 0:ba7e439238ab 58 }
pavleradojkovic 0:ba7e439238ab 59 else if ( pub.header.bits.qos == 2)
pavleradojkovic 0:ba7e439238ab 60 {
pavleradojkovic 0:ba7e439238ab 61 replyACK(client, &pub, PUBREC);
pavleradojkovic 0:ba7e439238ab 62 }
pavleradojkovic 0:ba7e439238ab 63
pavleradojkovic 0:ba7e439238ab 64 MQTTGWPacket* msg = new MQTTGWPacket();
pavleradojkovic 0:ba7e439238ab 65 *msg = *packet;
pavleradojkovic 0:ba7e439238ab 66 if ( msg->getType() == 0 )
pavleradojkovic 0:ba7e439238ab 67 {
pavleradojkovic 0:ba7e439238ab 68 WRITELOG("%s MQTTGWPublishHandler::handlePublish can't allocate memories for Packet.%s\n", ERRMSG_HEADER,ERRMSG_FOOTER);
pavleradojkovic 0:ba7e439238ab 69 delete msg;
pavleradojkovic 0:ba7e439238ab 70 return;
pavleradojkovic 0:ba7e439238ab 71 }
pavleradojkovic 0:ba7e439238ab 72 client->setClientSleepPacket(msg);
pavleradojkovic 0:ba7e439238ab 73 return;
pavleradojkovic 0:ba7e439238ab 74 }
pavleradojkovic 0:ba7e439238ab 75
pavleradojkovic 0:ba7e439238ab 76 Publish pub;
pavleradojkovic 0:ba7e439238ab 77 packet->getPUBLISH(&pub);
pavleradojkovic 0:ba7e439238ab 78
pavleradojkovic 0:ba7e439238ab 79 MQTTSNPacket* snPacket = new MQTTSNPacket();
pavleradojkovic 0:ba7e439238ab 80
pavleradojkovic 0:ba7e439238ab 81 /* create MQTTSN_topicid */
pavleradojkovic 0:ba7e439238ab 82 MQTTSN_topicid topicId;
pavleradojkovic 0:ba7e439238ab 83 uint16_t id = 0;
pavleradojkovic 0:ba7e439238ab 84
pavleradojkovic 0:ba7e439238ab 85 if (pub.topiclen <= 2)
pavleradojkovic 0:ba7e439238ab 86 {
pavleradojkovic 0:ba7e439238ab 87 topicId.type = MQTTSN_TOPIC_TYPE_SHORT;
pavleradojkovic 0:ba7e439238ab 88 *(topicId.data.short_name) = *pub.topic;
pavleradojkovic 0:ba7e439238ab 89 *(topicId.data.short_name + 1) = *(pub.topic + 1);
pavleradojkovic 0:ba7e439238ab 90 }
pavleradojkovic 0:ba7e439238ab 91 else
pavleradojkovic 0:ba7e439238ab 92 {
pavleradojkovic 0:ba7e439238ab 93 topicId.data.long_.len = pub.topiclen;
pavleradojkovic 0:ba7e439238ab 94 topicId.data.long_.name = pub.topic;
pavleradojkovic 0:ba7e439238ab 95 Topic* tp = client->getTopics()->getTopicByName(&topicId);
pavleradojkovic 0:ba7e439238ab 96
pavleradojkovic 0:ba7e439238ab 97 if ( tp )
pavleradojkovic 0:ba7e439238ab 98 {
pavleradojkovic 0:ba7e439238ab 99 topicId.type = tp->getType();
pavleradojkovic 0:ba7e439238ab 100 topicId.data.long_.len = pub.topiclen;
pavleradojkovic 0:ba7e439238ab 101 topicId.data.long_.name = pub.topic;
pavleradojkovic 0:ba7e439238ab 102 topicId.data.id = tp->getTopicId();
pavleradojkovic 0:ba7e439238ab 103 }
pavleradojkovic 0:ba7e439238ab 104 else
pavleradojkovic 0:ba7e439238ab 105 {
pavleradojkovic 0:ba7e439238ab 106 /* This message might be subscribed with wild card. */
pavleradojkovic 0:ba7e439238ab 107 topicId.type = MQTTSN_TOPIC_TYPE_NORMAL;
pavleradojkovic 0:ba7e439238ab 108 Topic* topic = client->getTopics()->match(&topicId);
pavleradojkovic 0:ba7e439238ab 109 if (topic == nullptr)
pavleradojkovic 0:ba7e439238ab 110 {
pavleradojkovic 0:ba7e439238ab 111 WRITELOG(" Invalid Topic. PUBLISH message is canceled.\n");
pavleradojkovic 0:ba7e439238ab 112 if (pub.header.bits.qos == 1)
pavleradojkovic 0:ba7e439238ab 113 {
pavleradojkovic 0:ba7e439238ab 114 replyACK(client, &pub, PUBACK);
pavleradojkovic 0:ba7e439238ab 115 }
pavleradojkovic 0:ba7e439238ab 116 else if ( pub.header.bits.qos == 2 )
pavleradojkovic 0:ba7e439238ab 117 {
pavleradojkovic 0:ba7e439238ab 118 replyACK(client, &pub, PUBREC);
pavleradojkovic 0:ba7e439238ab 119 }
pavleradojkovic 0:ba7e439238ab 120
pavleradojkovic 0:ba7e439238ab 121 delete snPacket;
pavleradojkovic 0:ba7e439238ab 122 return;
pavleradojkovic 0:ba7e439238ab 123 }
pavleradojkovic 0:ba7e439238ab 124
pavleradojkovic 0:ba7e439238ab 125 /* add the Topic and get a TopicId */
pavleradojkovic 0:ba7e439238ab 126 topic = client->getTopics()->add(&topicId);
pavleradojkovic 0:ba7e439238ab 127 id = topic->getTopicId();
pavleradojkovic 0:ba7e439238ab 128
pavleradojkovic 0:ba7e439238ab 129 if (id > 0)
pavleradojkovic 0:ba7e439238ab 130 {
pavleradojkovic 0:ba7e439238ab 131 /* create REGISTER */
pavleradojkovic 0:ba7e439238ab 132 MQTTSNPacket* regPacket = new MQTTSNPacket();
pavleradojkovic 0:ba7e439238ab 133
pavleradojkovic 0:ba7e439238ab 134 MQTTSNString topicName = MQTTSNString_initializer;
pavleradojkovic 0:ba7e439238ab 135 topicName.lenstring.len = topicId.data.long_.len;
pavleradojkovic 0:ba7e439238ab 136 topicName.lenstring.data = topicId.data.long_.name;
pavleradojkovic 0:ba7e439238ab 137
pavleradojkovic 0:ba7e439238ab 138 uint16_t regackMsgId = client->getNextSnMsgId();
pavleradojkovic 0:ba7e439238ab 139 regPacket->setREGISTER(id, regackMsgId, &topicName);
pavleradojkovic 0:ba7e439238ab 140
pavleradojkovic 0:ba7e439238ab 141 /* send REGISTER */
pavleradojkovic 0:ba7e439238ab 142 Event* evrg = new Event();
pavleradojkovic 0:ba7e439238ab 143 evrg->setClientSendEvent(client, regPacket);
pavleradojkovic 0:ba7e439238ab 144 _gateway->getClientSendQue()->post(evrg);
pavleradojkovic 0:ba7e439238ab 145
pavleradojkovic 0:ba7e439238ab 146 /* send PUBLISH */
pavleradojkovic 0:ba7e439238ab 147 topicId.data.id = id;
pavleradojkovic 0:ba7e439238ab 148 snPacket->setPUBLISH((uint8_t) pub.header.bits.dup, (int) pub.header.bits.qos,
pavleradojkovic 0:ba7e439238ab 149 (uint8_t) pub.header.bits.retain, (uint16_t) pub.msgId, topicId, (uint8_t*) pub.payload,
pavleradojkovic 0:ba7e439238ab 150 pub.payloadlen);
pavleradojkovic 0:ba7e439238ab 151 client->getWaitREGACKPacketList()->setPacket(snPacket, regackMsgId);
pavleradojkovic 0:ba7e439238ab 152 return;
pavleradojkovic 0:ba7e439238ab 153 }
pavleradojkovic 0:ba7e439238ab 154 else
pavleradojkovic 0:ba7e439238ab 155 {
pavleradojkovic 0:ba7e439238ab 156 WRITELOG("%sMQTTGWPublishHandler Can't create a Topic.%s\n", ERRMSG_HEADER,ERRMSG_FOOTER);
pavleradojkovic 0:ba7e439238ab 157 delete snPacket;
pavleradojkovic 0:ba7e439238ab 158 return;
pavleradojkovic 0:ba7e439238ab 159 }
pavleradojkovic 0:ba7e439238ab 160 }
pavleradojkovic 0:ba7e439238ab 161 }
pavleradojkovic 0:ba7e439238ab 162
pavleradojkovic 0:ba7e439238ab 163 snPacket->setPUBLISH((uint8_t) pub.header.bits.dup, (int) pub.header.bits.qos, (uint8_t) pub.header.bits.retain,
pavleradojkovic 0:ba7e439238ab 164 (uint16_t) pub.msgId, topicId, (uint8_t*) pub.payload, pub.payloadlen);
pavleradojkovic 0:ba7e439238ab 165 Event* ev1 = new Event();
pavleradojkovic 0:ba7e439238ab 166 ev1->setClientSendEvent(client, snPacket);
pavleradojkovic 0:ba7e439238ab 167 _gateway->getClientSendQue()->post(ev1);
pavleradojkovic 0:ba7e439238ab 168
pavleradojkovic 0:ba7e439238ab 169 }
pavleradojkovic 0:ba7e439238ab 170
pavleradojkovic 0:ba7e439238ab 171 void MQTTGWPublishHandler::replyACK(Client* client, Publish* pub, int type)
pavleradojkovic 0:ba7e439238ab 172 {
pavleradojkovic 0:ba7e439238ab 173 MQTTGWPacket* pubAck = new MQTTGWPacket();
pavleradojkovic 0:ba7e439238ab 174 pubAck->setAck(type, (uint16_t)pub->msgId);
pavleradojkovic 0:ba7e439238ab 175 Event* ev1 = new Event();
pavleradojkovic 0:ba7e439238ab 176 ev1->setBrokerSendEvent(client, pubAck);
pavleradojkovic 0:ba7e439238ab 177 _gateway->getBrokerSendQue()->post(ev1);
pavleradojkovic 0:ba7e439238ab 178 }
pavleradojkovic 0:ba7e439238ab 179
pavleradojkovic 0:ba7e439238ab 180 void MQTTGWPublishHandler::handlePuback(Client* client, MQTTGWPacket* packet)
pavleradojkovic 0:ba7e439238ab 181 {
pavleradojkovic 0:ba7e439238ab 182 Ack ack;
pavleradojkovic 0:ba7e439238ab 183 packet->getAck(&ack);
pavleradojkovic 0:ba7e439238ab 184 TopicIdMapElement* topicId = client->getWaitedPubTopicId((uint16_t)ack.msgId);
pavleradojkovic 0:ba7e439238ab 185 if (topicId)
pavleradojkovic 0:ba7e439238ab 186 {
pavleradojkovic 0:ba7e439238ab 187 MQTTSNPacket* mqttsnPacket = new MQTTSNPacket();
pavleradojkovic 0:ba7e439238ab 188 mqttsnPacket->setPUBACK(topicId->getTopicId(), (uint16_t)ack.msgId, 0);
pavleradojkovic 0:ba7e439238ab 189
pavleradojkovic 0:ba7e439238ab 190 client->eraseWaitedPubTopicId((uint16_t)ack.msgId);
pavleradojkovic 0:ba7e439238ab 191 Event* ev1 = new Event();
pavleradojkovic 0:ba7e439238ab 192 ev1->setClientSendEvent(client, mqttsnPacket);
pavleradojkovic 0:ba7e439238ab 193 _gateway->getClientSendQue()->post(ev1);
pavleradojkovic 0:ba7e439238ab 194 return;
pavleradojkovic 0:ba7e439238ab 195 }
pavleradojkovic 0:ba7e439238ab 196 WRITELOG(" PUBACK from the Broker is invalid. PacketID : %04X ClientID : %s \n", (uint16_t)ack.msgId, client->getClientId());
pavleradojkovic 0:ba7e439238ab 197 }
pavleradojkovic 0:ba7e439238ab 198
pavleradojkovic 0:ba7e439238ab 199 void MQTTGWPublishHandler::handleAck(Client* client, MQTTGWPacket* packet, int type)
pavleradojkovic 0:ba7e439238ab 200 {
pavleradojkovic 0:ba7e439238ab 201 Ack ack;
pavleradojkovic 0:ba7e439238ab 202 packet->getAck(&ack);
pavleradojkovic 0:ba7e439238ab 203
pavleradojkovic 0:ba7e439238ab 204 if ( client->isActive() || client->isAwake() )
pavleradojkovic 0:ba7e439238ab 205 {
pavleradojkovic 0:ba7e439238ab 206 MQTTSNPacket* mqttsnPacket = new MQTTSNPacket();
pavleradojkovic 0:ba7e439238ab 207 if (type == PUBREC)
pavleradojkovic 0:ba7e439238ab 208 {
pavleradojkovic 0:ba7e439238ab 209 mqttsnPacket->setPUBREC((uint16_t) ack.msgId);
pavleradojkovic 0:ba7e439238ab 210 }
pavleradojkovic 0:ba7e439238ab 211 else if (type == PUBREL)
pavleradojkovic 0:ba7e439238ab 212 {
pavleradojkovic 0:ba7e439238ab 213 mqttsnPacket->setPUBREL((uint16_t) ack.msgId);
pavleradojkovic 0:ba7e439238ab 214 }
pavleradojkovic 0:ba7e439238ab 215 else if (type == PUBCOMP)
pavleradojkovic 0:ba7e439238ab 216 {
pavleradojkovic 0:ba7e439238ab 217 mqttsnPacket->setPUBCOMP((uint16_t) ack.msgId);
pavleradojkovic 0:ba7e439238ab 218 }
pavleradojkovic 0:ba7e439238ab 219
pavleradojkovic 0:ba7e439238ab 220 Event* ev1 = new Event();
pavleradojkovic 0:ba7e439238ab 221 ev1->setClientSendEvent(client, mqttsnPacket);
pavleradojkovic 0:ba7e439238ab 222 _gateway->getClientSendQue()->post(ev1);
pavleradojkovic 0:ba7e439238ab 223 }
pavleradojkovic 0:ba7e439238ab 224 else if ( client->isSleep() )
pavleradojkovic 0:ba7e439238ab 225 {
pavleradojkovic 0:ba7e439238ab 226 if (type == PUBREL)
pavleradojkovic 0:ba7e439238ab 227 {
pavleradojkovic 0:ba7e439238ab 228 MQTTGWPacket* pubComp = new MQTTGWPacket();
pavleradojkovic 0:ba7e439238ab 229 pubComp->setAck(PUBCOMP, (uint16_t)ack.msgId);
pavleradojkovic 0:ba7e439238ab 230 Event* ev1 = new Event();
pavleradojkovic 0:ba7e439238ab 231 ev1->setBrokerSendEvent(client, pubComp);
pavleradojkovic 0:ba7e439238ab 232 _gateway->getBrokerSendQue()->post(ev1);
pavleradojkovic 0:ba7e439238ab 233 }
pavleradojkovic 0:ba7e439238ab 234 }
pavleradojkovic 0:ba7e439238ab 235 }
pavleradojkovic 0:ba7e439238ab 236
pavleradojkovic 0:ba7e439238ab 237
pavleradojkovic 0:ba7e439238ab 238
pavleradojkovic 0:ba7e439238ab 239 void MQTTGWPublishHandler::handleAggregatePuback(Client* client, MQTTGWPacket* packet)
pavleradojkovic 0:ba7e439238ab 240 {
pavleradojkovic 0:ba7e439238ab 241 uint16_t msgId = packet->getMsgId();
pavleradojkovic 0:ba7e439238ab 242 uint16_t clientMsgId = 0;
pavleradojkovic 0:ba7e439238ab 243 Client* newClient = _gateway->getAdapterManager()->convertClient(msgId, &clientMsgId);
pavleradojkovic 0:ba7e439238ab 244 if ( newClient != nullptr )
pavleradojkovic 0:ba7e439238ab 245 {
pavleradojkovic 0:ba7e439238ab 246 packet->setMsgId((int)clientMsgId);
pavleradojkovic 0:ba7e439238ab 247 handlePuback(newClient, packet);
pavleradojkovic 0:ba7e439238ab 248 }
pavleradojkovic 0:ba7e439238ab 249 }
pavleradojkovic 0:ba7e439238ab 250
pavleradojkovic 0:ba7e439238ab 251 void MQTTGWPublishHandler::handleAggregateAck(Client* client, MQTTGWPacket* packet, int type)
pavleradojkovic 0:ba7e439238ab 252 {
pavleradojkovic 0:ba7e439238ab 253 uint16_t msgId = packet->getMsgId();
pavleradojkovic 0:ba7e439238ab 254 uint16_t clientMsgId = 0;
pavleradojkovic 0:ba7e439238ab 255 Client* newClient = _gateway->getAdapterManager()->convertClient(msgId, &clientMsgId);
pavleradojkovic 0:ba7e439238ab 256 if ( newClient != nullptr )
pavleradojkovic 0:ba7e439238ab 257 {
pavleradojkovic 0:ba7e439238ab 258 packet->setMsgId((int)clientMsgId);
pavleradojkovic 0:ba7e439238ab 259 handleAck(newClient, packet,type);
pavleradojkovic 0:ba7e439238ab 260 }
pavleradojkovic 0:ba7e439238ab 261 }
pavleradojkovic 0:ba7e439238ab 262
pavleradojkovic 0:ba7e439238ab 263 void MQTTGWPublishHandler::handleAggregatePubrel(Client* client, MQTTGWPacket* packet)
pavleradojkovic 0:ba7e439238ab 264 {
pavleradojkovic 0:ba7e439238ab 265 Publish pub;
pavleradojkovic 0:ba7e439238ab 266 packet->getPUBLISH(&pub);
pavleradojkovic 0:ba7e439238ab 267 replyACK(client, &pub, PUBCOMP);
pavleradojkovic 0:ba7e439238ab 268 }
pavleradojkovic 0:ba7e439238ab 269
pavleradojkovic 0:ba7e439238ab 270 void MQTTGWPublishHandler::handleAggregatePublish(Client* client, MQTTGWPacket* packet)
pavleradojkovic 0:ba7e439238ab 271 {
pavleradojkovic 0:ba7e439238ab 272 Publish pub;
pavleradojkovic 0:ba7e439238ab 273 packet->getPUBLISH(&pub);
pavleradojkovic 0:ba7e439238ab 274
pavleradojkovic 0:ba7e439238ab 275 WRITELOG(FORMAT_Y_G_G, currentDateTime(), packet->getName(),
pavleradojkovic 0:ba7e439238ab 276 RIGHTARROW, client->getClientId(), "is sleeping. a message was saved.");
pavleradojkovic 0:ba7e439238ab 277
pavleradojkovic 0:ba7e439238ab 278 if (pub.header.bits.qos == 1)
pavleradojkovic 0:ba7e439238ab 279 {
pavleradojkovic 0:ba7e439238ab 280 replyACK(client, &pub, PUBACK);
pavleradojkovic 0:ba7e439238ab 281 }
pavleradojkovic 0:ba7e439238ab 282 else if ( pub.header.bits.qos == 2)
pavleradojkovic 0:ba7e439238ab 283 {
pavleradojkovic 0:ba7e439238ab 284 replyACK(client, &pub, PUBREC);
pavleradojkovic 0:ba7e439238ab 285 }
pavleradojkovic 0:ba7e439238ab 286
pavleradojkovic 0:ba7e439238ab 287
pavleradojkovic 0:ba7e439238ab 288
pavleradojkovic 0:ba7e439238ab 289 string* topicName = new string(pub.topic, pub.topiclen);
pavleradojkovic 0:ba7e439238ab 290 Topic topic = Topic(topicName, MQTTSN_TOPIC_TYPE_NORMAL);
pavleradojkovic 0:ba7e439238ab 291 AggregateTopicElement* list = _gateway->getAdapterManager()->createClientList(&topic);
pavleradojkovic 0:ba7e439238ab 292 if ( list != nullptr )
pavleradojkovic 0:ba7e439238ab 293 {
pavleradojkovic 0:ba7e439238ab 294 ClientTopicElement* p = list->getFirstElement();
pavleradojkovic 0:ba7e439238ab 295
pavleradojkovic 0:ba7e439238ab 296 while ( p )
pavleradojkovic 0:ba7e439238ab 297 {
pavleradojkovic 0:ba7e439238ab 298 Client* devClient = p->getClient();
pavleradojkovic 0:ba7e439238ab 299 if ( devClient != nullptr )
pavleradojkovic 0:ba7e439238ab 300 {
pavleradojkovic 0:ba7e439238ab 301 MQTTGWPacket* msg = new MQTTGWPacket();
pavleradojkovic 0:ba7e439238ab 302 *msg = *packet;
pavleradojkovic 0:ba7e439238ab 303 if ( msg->getType() == 0 )
pavleradojkovic 0:ba7e439238ab 304 {
pavleradojkovic 0:ba7e439238ab 305 WRITELOG("%s MQTTGWPublishHandler::handleAggregatePublish can't allocate memories for Packet.%s\n", ERRMSG_HEADER,ERRMSG_FOOTER);
pavleradojkovic 0:ba7e439238ab 306 delete msg;
pavleradojkovic 0:ba7e439238ab 307 break;
pavleradojkovic 0:ba7e439238ab 308 }
pavleradojkovic 0:ba7e439238ab 309 Event* ev = new Event();
pavleradojkovic 0:ba7e439238ab 310 ev->setBrokerRecvEvent(devClient, msg);
pavleradojkovic 0:ba7e439238ab 311 _gateway->getPacketEventQue()->post(ev);
pavleradojkovic 0:ba7e439238ab 312 }
pavleradojkovic 0:ba7e439238ab 313 else
pavleradojkovic 0:ba7e439238ab 314 {
pavleradojkovic 0:ba7e439238ab 315 break;
pavleradojkovic 0:ba7e439238ab 316 }
pavleradojkovic 0:ba7e439238ab 317
pavleradojkovic 0:ba7e439238ab 318 p = list->getNextElement(p);
pavleradojkovic 0:ba7e439238ab 319 }
pavleradojkovic 0:ba7e439238ab 320 delete list;
pavleradojkovic 0:ba7e439238ab 321 }
pavleradojkovic 0:ba7e439238ab 322 }
pavleradojkovic 0:ba7e439238ab 323