Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
mbed-mqtt-master/paho_mqtt-sn_embedded_c/MQTTSNGateway/src/MQTTGWPublishHandler.cpp@0:ba7e439238ab, 2022-06-20 (annotated)
- Committer:
- pavleradojkovic
- Date:
- Mon Jun 20 16:24:43 2022 +0000
- Revision:
- 0:ba7e439238ab
Inital commit
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
pavleradojkovic | 0:ba7e439238ab | 1 | /************************************************************************************** |
pavleradojkovic | 0:ba7e439238ab | 2 | * Copyright (c) 2016, Tomoaki Yamaguchi |
pavleradojkovic | 0:ba7e439238ab | 3 | * |
pavleradojkovic | 0:ba7e439238ab | 4 | * All rights reserved. This program and the accompanying materials |
pavleradojkovic | 0:ba7e439238ab | 5 | * are made available under the terms of the Eclipse Public License v1.0 |
pavleradojkovic | 0:ba7e439238ab | 6 | * and Eclipse Distribution License v1.0 which accompany this distribution. |
pavleradojkovic | 0:ba7e439238ab | 7 | * |
pavleradojkovic | 0:ba7e439238ab | 8 | * The Eclipse Public License is available at |
pavleradojkovic | 0:ba7e439238ab | 9 | * http://www.eclipse.org/legal/epl-v10.html |
pavleradojkovic | 0:ba7e439238ab | 10 | * and the Eclipse Distribution License is available at |
pavleradojkovic | 0:ba7e439238ab | 11 | * http://www.eclipse.org/org/documents/edl-v10.php. |
pavleradojkovic | 0:ba7e439238ab | 12 | * |
pavleradojkovic | 0:ba7e439238ab | 13 | * Contributors: |
pavleradojkovic | 0:ba7e439238ab | 14 | * Tomoaki Yamaguchi - initial API and implementation and/or initial documentation |
pavleradojkovic | 0:ba7e439238ab | 15 | * Tieto Poland Sp. z o.o. - Gateway improvements |
pavleradojkovic | 0:ba7e439238ab | 16 | **************************************************************************************/ |
pavleradojkovic | 0:ba7e439238ab | 17 | |
pavleradojkovic | 0:ba7e439238ab | 18 | #include "MQTTGWPublishHandler.h" |
pavleradojkovic | 0:ba7e439238ab | 19 | #include "MQTTSNGateway.h" |
pavleradojkovic | 0:ba7e439238ab | 20 | #include "MQTTSNPacket.h" |
pavleradojkovic | 0:ba7e439238ab | 21 | #include <string> |
pavleradojkovic | 0:ba7e439238ab | 22 | |
pavleradojkovic | 0:ba7e439238ab | 23 | using namespace std; |
pavleradojkovic | 0:ba7e439238ab | 24 | using namespace MQTTSNGW; |
pavleradojkovic | 0:ba7e439238ab | 25 | |
pavleradojkovic | 0:ba7e439238ab | 26 | char* currentDateTime(void); |
pavleradojkovic | 0:ba7e439238ab | 27 | |
pavleradojkovic | 0:ba7e439238ab | 28 | MQTTGWPublishHandler::MQTTGWPublishHandler(Gateway* gateway) |
pavleradojkovic | 0:ba7e439238ab | 29 | { |
pavleradojkovic | 0:ba7e439238ab | 30 | _gateway = gateway; |
pavleradojkovic | 0:ba7e439238ab | 31 | } |
pavleradojkovic | 0:ba7e439238ab | 32 | |
pavleradojkovic | 0:ba7e439238ab | 33 | MQTTGWPublishHandler::~MQTTGWPublishHandler() |
pavleradojkovic | 0:ba7e439238ab | 34 | { |
pavleradojkovic | 0:ba7e439238ab | 35 | |
pavleradojkovic | 0:ba7e439238ab | 36 | } |
pavleradojkovic | 0:ba7e439238ab | 37 | |
pavleradojkovic | 0:ba7e439238ab | 38 | void MQTTGWPublishHandler::handlePublish(Client* client, MQTTGWPacket* packet) |
pavleradojkovic | 0:ba7e439238ab | 39 | { |
pavleradojkovic | 0:ba7e439238ab | 40 | if ( !client->isActive() && !client->isSleep() && !client->isAwake()) |
pavleradojkovic | 0:ba7e439238ab | 41 | { |
pavleradojkovic | 0:ba7e439238ab | 42 | WRITELOG("%s The client is neither active nor sleep %s%s\n", ERRMSG_HEADER, client->getStatus(), ERRMSG_FOOTER); |
pavleradojkovic | 0:ba7e439238ab | 43 | return; |
pavleradojkovic | 0:ba7e439238ab | 44 | } |
pavleradojkovic | 0:ba7e439238ab | 45 | |
pavleradojkovic | 0:ba7e439238ab | 46 | /* client is sleeping. save PUBLISH */ |
pavleradojkovic | 0:ba7e439238ab | 47 | if ( client->isSleep() ) |
pavleradojkovic | 0:ba7e439238ab | 48 | { |
pavleradojkovic | 0:ba7e439238ab | 49 | Publish pub; |
pavleradojkovic | 0:ba7e439238ab | 50 | packet->getPUBLISH(&pub); |
pavleradojkovic | 0:ba7e439238ab | 51 | |
pavleradojkovic | 0:ba7e439238ab | 52 | WRITELOG(FORMAT_Y_G_G, currentDateTime(), packet->getName(), |
pavleradojkovic | 0:ba7e439238ab | 53 | RIGHTARROW, client->getClientId(), "is sleeping. a message was saved."); |
pavleradojkovic | 0:ba7e439238ab | 54 | |
pavleradojkovic | 0:ba7e439238ab | 55 | if (pub.header.bits.qos == 1) |
pavleradojkovic | 0:ba7e439238ab | 56 | { |
pavleradojkovic | 0:ba7e439238ab | 57 | replyACK(client, &pub, PUBACK); |
pavleradojkovic | 0:ba7e439238ab | 58 | } |
pavleradojkovic | 0:ba7e439238ab | 59 | else if ( pub.header.bits.qos == 2) |
pavleradojkovic | 0:ba7e439238ab | 60 | { |
pavleradojkovic | 0:ba7e439238ab | 61 | replyACK(client, &pub, PUBREC); |
pavleradojkovic | 0:ba7e439238ab | 62 | } |
pavleradojkovic | 0:ba7e439238ab | 63 | |
pavleradojkovic | 0:ba7e439238ab | 64 | MQTTGWPacket* msg = new MQTTGWPacket(); |
pavleradojkovic | 0:ba7e439238ab | 65 | *msg = *packet; |
pavleradojkovic | 0:ba7e439238ab | 66 | if ( msg->getType() == 0 ) |
pavleradojkovic | 0:ba7e439238ab | 67 | { |
pavleradojkovic | 0:ba7e439238ab | 68 | WRITELOG("%s MQTTGWPublishHandler::handlePublish can't allocate memories for Packet.%s\n", ERRMSG_HEADER,ERRMSG_FOOTER); |
pavleradojkovic | 0:ba7e439238ab | 69 | delete msg; |
pavleradojkovic | 0:ba7e439238ab | 70 | return; |
pavleradojkovic | 0:ba7e439238ab | 71 | } |
pavleradojkovic | 0:ba7e439238ab | 72 | client->setClientSleepPacket(msg); |
pavleradojkovic | 0:ba7e439238ab | 73 | return; |
pavleradojkovic | 0:ba7e439238ab | 74 | } |
pavleradojkovic | 0:ba7e439238ab | 75 | |
pavleradojkovic | 0:ba7e439238ab | 76 | Publish pub; |
pavleradojkovic | 0:ba7e439238ab | 77 | packet->getPUBLISH(&pub); |
pavleradojkovic | 0:ba7e439238ab | 78 | |
pavleradojkovic | 0:ba7e439238ab | 79 | MQTTSNPacket* snPacket = new MQTTSNPacket(); |
pavleradojkovic | 0:ba7e439238ab | 80 | |
pavleradojkovic | 0:ba7e439238ab | 81 | /* create MQTTSN_topicid */ |
pavleradojkovic | 0:ba7e439238ab | 82 | MQTTSN_topicid topicId; |
pavleradojkovic | 0:ba7e439238ab | 83 | uint16_t id = 0; |
pavleradojkovic | 0:ba7e439238ab | 84 | |
pavleradojkovic | 0:ba7e439238ab | 85 | if (pub.topiclen <= 2) |
pavleradojkovic | 0:ba7e439238ab | 86 | { |
pavleradojkovic | 0:ba7e439238ab | 87 | topicId.type = MQTTSN_TOPIC_TYPE_SHORT; |
pavleradojkovic | 0:ba7e439238ab | 88 | *(topicId.data.short_name) = *pub.topic; |
pavleradojkovic | 0:ba7e439238ab | 89 | *(topicId.data.short_name + 1) = *(pub.topic + 1); |
pavleradojkovic | 0:ba7e439238ab | 90 | } |
pavleradojkovic | 0:ba7e439238ab | 91 | else |
pavleradojkovic | 0:ba7e439238ab | 92 | { |
pavleradojkovic | 0:ba7e439238ab | 93 | topicId.data.long_.len = pub.topiclen; |
pavleradojkovic | 0:ba7e439238ab | 94 | topicId.data.long_.name = pub.topic; |
pavleradojkovic | 0:ba7e439238ab | 95 | Topic* tp = client->getTopics()->getTopicByName(&topicId); |
pavleradojkovic | 0:ba7e439238ab | 96 | |
pavleradojkovic | 0:ba7e439238ab | 97 | if ( tp ) |
pavleradojkovic | 0:ba7e439238ab | 98 | { |
pavleradojkovic | 0:ba7e439238ab | 99 | topicId.type = tp->getType(); |
pavleradojkovic | 0:ba7e439238ab | 100 | topicId.data.long_.len = pub.topiclen; |
pavleradojkovic | 0:ba7e439238ab | 101 | topicId.data.long_.name = pub.topic; |
pavleradojkovic | 0:ba7e439238ab | 102 | topicId.data.id = tp->getTopicId(); |
pavleradojkovic | 0:ba7e439238ab | 103 | } |
pavleradojkovic | 0:ba7e439238ab | 104 | else |
pavleradojkovic | 0:ba7e439238ab | 105 | { |
pavleradojkovic | 0:ba7e439238ab | 106 | /* This message might be subscribed with wild card. */ |
pavleradojkovic | 0:ba7e439238ab | 107 | topicId.type = MQTTSN_TOPIC_TYPE_NORMAL; |
pavleradojkovic | 0:ba7e439238ab | 108 | Topic* topic = client->getTopics()->match(&topicId); |
pavleradojkovic | 0:ba7e439238ab | 109 | if (topic == nullptr) |
pavleradojkovic | 0:ba7e439238ab | 110 | { |
pavleradojkovic | 0:ba7e439238ab | 111 | WRITELOG(" Invalid Topic. PUBLISH message is canceled.\n"); |
pavleradojkovic | 0:ba7e439238ab | 112 | if (pub.header.bits.qos == 1) |
pavleradojkovic | 0:ba7e439238ab | 113 | { |
pavleradojkovic | 0:ba7e439238ab | 114 | replyACK(client, &pub, PUBACK); |
pavleradojkovic | 0:ba7e439238ab | 115 | } |
pavleradojkovic | 0:ba7e439238ab | 116 | else if ( pub.header.bits.qos == 2 ) |
pavleradojkovic | 0:ba7e439238ab | 117 | { |
pavleradojkovic | 0:ba7e439238ab | 118 | replyACK(client, &pub, PUBREC); |
pavleradojkovic | 0:ba7e439238ab | 119 | } |
pavleradojkovic | 0:ba7e439238ab | 120 | |
pavleradojkovic | 0:ba7e439238ab | 121 | delete snPacket; |
pavleradojkovic | 0:ba7e439238ab | 122 | return; |
pavleradojkovic | 0:ba7e439238ab | 123 | } |
pavleradojkovic | 0:ba7e439238ab | 124 | |
pavleradojkovic | 0:ba7e439238ab | 125 | /* add the Topic and get a TopicId */ |
pavleradojkovic | 0:ba7e439238ab | 126 | topic = client->getTopics()->add(&topicId); |
pavleradojkovic | 0:ba7e439238ab | 127 | id = topic->getTopicId(); |
pavleradojkovic | 0:ba7e439238ab | 128 | |
pavleradojkovic | 0:ba7e439238ab | 129 | if (id > 0) |
pavleradojkovic | 0:ba7e439238ab | 130 | { |
pavleradojkovic | 0:ba7e439238ab | 131 | /* create REGISTER */ |
pavleradojkovic | 0:ba7e439238ab | 132 | MQTTSNPacket* regPacket = new MQTTSNPacket(); |
pavleradojkovic | 0:ba7e439238ab | 133 | |
pavleradojkovic | 0:ba7e439238ab | 134 | MQTTSNString topicName = MQTTSNString_initializer; |
pavleradojkovic | 0:ba7e439238ab | 135 | topicName.lenstring.len = topicId.data.long_.len; |
pavleradojkovic | 0:ba7e439238ab | 136 | topicName.lenstring.data = topicId.data.long_.name; |
pavleradojkovic | 0:ba7e439238ab | 137 | |
pavleradojkovic | 0:ba7e439238ab | 138 | uint16_t regackMsgId = client->getNextSnMsgId(); |
pavleradojkovic | 0:ba7e439238ab | 139 | regPacket->setREGISTER(id, regackMsgId, &topicName); |
pavleradojkovic | 0:ba7e439238ab | 140 | |
pavleradojkovic | 0:ba7e439238ab | 141 | /* send REGISTER */ |
pavleradojkovic | 0:ba7e439238ab | 142 | Event* evrg = new Event(); |
pavleradojkovic | 0:ba7e439238ab | 143 | evrg->setClientSendEvent(client, regPacket); |
pavleradojkovic | 0:ba7e439238ab | 144 | _gateway->getClientSendQue()->post(evrg); |
pavleradojkovic | 0:ba7e439238ab | 145 | |
pavleradojkovic | 0:ba7e439238ab | 146 | /* send PUBLISH */ |
pavleradojkovic | 0:ba7e439238ab | 147 | topicId.data.id = id; |
pavleradojkovic | 0:ba7e439238ab | 148 | snPacket->setPUBLISH((uint8_t) pub.header.bits.dup, (int) pub.header.bits.qos, |
pavleradojkovic | 0:ba7e439238ab | 149 | (uint8_t) pub.header.bits.retain, (uint16_t) pub.msgId, topicId, (uint8_t*) pub.payload, |
pavleradojkovic | 0:ba7e439238ab | 150 | pub.payloadlen); |
pavleradojkovic | 0:ba7e439238ab | 151 | client->getWaitREGACKPacketList()->setPacket(snPacket, regackMsgId); |
pavleradojkovic | 0:ba7e439238ab | 152 | return; |
pavleradojkovic | 0:ba7e439238ab | 153 | } |
pavleradojkovic | 0:ba7e439238ab | 154 | else |
pavleradojkovic | 0:ba7e439238ab | 155 | { |
pavleradojkovic | 0:ba7e439238ab | 156 | WRITELOG("%sMQTTGWPublishHandler Can't create a Topic.%s\n", ERRMSG_HEADER,ERRMSG_FOOTER); |
pavleradojkovic | 0:ba7e439238ab | 157 | delete snPacket; |
pavleradojkovic | 0:ba7e439238ab | 158 | return; |
pavleradojkovic | 0:ba7e439238ab | 159 | } |
pavleradojkovic | 0:ba7e439238ab | 160 | } |
pavleradojkovic | 0:ba7e439238ab | 161 | } |
pavleradojkovic | 0:ba7e439238ab | 162 | |
pavleradojkovic | 0:ba7e439238ab | 163 | snPacket->setPUBLISH((uint8_t) pub.header.bits.dup, (int) pub.header.bits.qos, (uint8_t) pub.header.bits.retain, |
pavleradojkovic | 0:ba7e439238ab | 164 | (uint16_t) pub.msgId, topicId, (uint8_t*) pub.payload, pub.payloadlen); |
pavleradojkovic | 0:ba7e439238ab | 165 | Event* ev1 = new Event(); |
pavleradojkovic | 0:ba7e439238ab | 166 | ev1->setClientSendEvent(client, snPacket); |
pavleradojkovic | 0:ba7e439238ab | 167 | _gateway->getClientSendQue()->post(ev1); |
pavleradojkovic | 0:ba7e439238ab | 168 | |
pavleradojkovic | 0:ba7e439238ab | 169 | } |
pavleradojkovic | 0:ba7e439238ab | 170 | |
pavleradojkovic | 0:ba7e439238ab | 171 | void MQTTGWPublishHandler::replyACK(Client* client, Publish* pub, int type) |
pavleradojkovic | 0:ba7e439238ab | 172 | { |
pavleradojkovic | 0:ba7e439238ab | 173 | MQTTGWPacket* pubAck = new MQTTGWPacket(); |
pavleradojkovic | 0:ba7e439238ab | 174 | pubAck->setAck(type, (uint16_t)pub->msgId); |
pavleradojkovic | 0:ba7e439238ab | 175 | Event* ev1 = new Event(); |
pavleradojkovic | 0:ba7e439238ab | 176 | ev1->setBrokerSendEvent(client, pubAck); |
pavleradojkovic | 0:ba7e439238ab | 177 | _gateway->getBrokerSendQue()->post(ev1); |
pavleradojkovic | 0:ba7e439238ab | 178 | } |
pavleradojkovic | 0:ba7e439238ab | 179 | |
pavleradojkovic | 0:ba7e439238ab | 180 | void MQTTGWPublishHandler::handlePuback(Client* client, MQTTGWPacket* packet) |
pavleradojkovic | 0:ba7e439238ab | 181 | { |
pavleradojkovic | 0:ba7e439238ab | 182 | Ack ack; |
pavleradojkovic | 0:ba7e439238ab | 183 | packet->getAck(&ack); |
pavleradojkovic | 0:ba7e439238ab | 184 | TopicIdMapElement* topicId = client->getWaitedPubTopicId((uint16_t)ack.msgId); |
pavleradojkovic | 0:ba7e439238ab | 185 | if (topicId) |
pavleradojkovic | 0:ba7e439238ab | 186 | { |
pavleradojkovic | 0:ba7e439238ab | 187 | MQTTSNPacket* mqttsnPacket = new MQTTSNPacket(); |
pavleradojkovic | 0:ba7e439238ab | 188 | mqttsnPacket->setPUBACK(topicId->getTopicId(), (uint16_t)ack.msgId, 0); |
pavleradojkovic | 0:ba7e439238ab | 189 | |
pavleradojkovic | 0:ba7e439238ab | 190 | client->eraseWaitedPubTopicId((uint16_t)ack.msgId); |
pavleradojkovic | 0:ba7e439238ab | 191 | Event* ev1 = new Event(); |
pavleradojkovic | 0:ba7e439238ab | 192 | ev1->setClientSendEvent(client, mqttsnPacket); |
pavleradojkovic | 0:ba7e439238ab | 193 | _gateway->getClientSendQue()->post(ev1); |
pavleradojkovic | 0:ba7e439238ab | 194 | return; |
pavleradojkovic | 0:ba7e439238ab | 195 | } |
pavleradojkovic | 0:ba7e439238ab | 196 | WRITELOG(" PUBACK from the Broker is invalid. PacketID : %04X ClientID : %s \n", (uint16_t)ack.msgId, client->getClientId()); |
pavleradojkovic | 0:ba7e439238ab | 197 | } |
pavleradojkovic | 0:ba7e439238ab | 198 | |
pavleradojkovic | 0:ba7e439238ab | 199 | void MQTTGWPublishHandler::handleAck(Client* client, MQTTGWPacket* packet, int type) |
pavleradojkovic | 0:ba7e439238ab | 200 | { |
pavleradojkovic | 0:ba7e439238ab | 201 | Ack ack; |
pavleradojkovic | 0:ba7e439238ab | 202 | packet->getAck(&ack); |
pavleradojkovic | 0:ba7e439238ab | 203 | |
pavleradojkovic | 0:ba7e439238ab | 204 | if ( client->isActive() || client->isAwake() ) |
pavleradojkovic | 0:ba7e439238ab | 205 | { |
pavleradojkovic | 0:ba7e439238ab | 206 | MQTTSNPacket* mqttsnPacket = new MQTTSNPacket(); |
pavleradojkovic | 0:ba7e439238ab | 207 | if (type == PUBREC) |
pavleradojkovic | 0:ba7e439238ab | 208 | { |
pavleradojkovic | 0:ba7e439238ab | 209 | mqttsnPacket->setPUBREC((uint16_t) ack.msgId); |
pavleradojkovic | 0:ba7e439238ab | 210 | } |
pavleradojkovic | 0:ba7e439238ab | 211 | else if (type == PUBREL) |
pavleradojkovic | 0:ba7e439238ab | 212 | { |
pavleradojkovic | 0:ba7e439238ab | 213 | mqttsnPacket->setPUBREL((uint16_t) ack.msgId); |
pavleradojkovic | 0:ba7e439238ab | 214 | } |
pavleradojkovic | 0:ba7e439238ab | 215 | else if (type == PUBCOMP) |
pavleradojkovic | 0:ba7e439238ab | 216 | { |
pavleradojkovic | 0:ba7e439238ab | 217 | mqttsnPacket->setPUBCOMP((uint16_t) ack.msgId); |
pavleradojkovic | 0:ba7e439238ab | 218 | } |
pavleradojkovic | 0:ba7e439238ab | 219 | |
pavleradojkovic | 0:ba7e439238ab | 220 | Event* ev1 = new Event(); |
pavleradojkovic | 0:ba7e439238ab | 221 | ev1->setClientSendEvent(client, mqttsnPacket); |
pavleradojkovic | 0:ba7e439238ab | 222 | _gateway->getClientSendQue()->post(ev1); |
pavleradojkovic | 0:ba7e439238ab | 223 | } |
pavleradojkovic | 0:ba7e439238ab | 224 | else if ( client->isSleep() ) |
pavleradojkovic | 0:ba7e439238ab | 225 | { |
pavleradojkovic | 0:ba7e439238ab | 226 | if (type == PUBREL) |
pavleradojkovic | 0:ba7e439238ab | 227 | { |
pavleradojkovic | 0:ba7e439238ab | 228 | MQTTGWPacket* pubComp = new MQTTGWPacket(); |
pavleradojkovic | 0:ba7e439238ab | 229 | pubComp->setAck(PUBCOMP, (uint16_t)ack.msgId); |
pavleradojkovic | 0:ba7e439238ab | 230 | Event* ev1 = new Event(); |
pavleradojkovic | 0:ba7e439238ab | 231 | ev1->setBrokerSendEvent(client, pubComp); |
pavleradojkovic | 0:ba7e439238ab | 232 | _gateway->getBrokerSendQue()->post(ev1); |
pavleradojkovic | 0:ba7e439238ab | 233 | } |
pavleradojkovic | 0:ba7e439238ab | 234 | } |
pavleradojkovic | 0:ba7e439238ab | 235 | } |
pavleradojkovic | 0:ba7e439238ab | 236 | |
pavleradojkovic | 0:ba7e439238ab | 237 | |
pavleradojkovic | 0:ba7e439238ab | 238 | |
pavleradojkovic | 0:ba7e439238ab | 239 | void MQTTGWPublishHandler::handleAggregatePuback(Client* client, MQTTGWPacket* packet) |
pavleradojkovic | 0:ba7e439238ab | 240 | { |
pavleradojkovic | 0:ba7e439238ab | 241 | uint16_t msgId = packet->getMsgId(); |
pavleradojkovic | 0:ba7e439238ab | 242 | uint16_t clientMsgId = 0; |
pavleradojkovic | 0:ba7e439238ab | 243 | Client* newClient = _gateway->getAdapterManager()->convertClient(msgId, &clientMsgId); |
pavleradojkovic | 0:ba7e439238ab | 244 | if ( newClient != nullptr ) |
pavleradojkovic | 0:ba7e439238ab | 245 | { |
pavleradojkovic | 0:ba7e439238ab | 246 | packet->setMsgId((int)clientMsgId); |
pavleradojkovic | 0:ba7e439238ab | 247 | handlePuback(newClient, packet); |
pavleradojkovic | 0:ba7e439238ab | 248 | } |
pavleradojkovic | 0:ba7e439238ab | 249 | } |
pavleradojkovic | 0:ba7e439238ab | 250 | |
pavleradojkovic | 0:ba7e439238ab | 251 | void MQTTGWPublishHandler::handleAggregateAck(Client* client, MQTTGWPacket* packet, int type) |
pavleradojkovic | 0:ba7e439238ab | 252 | { |
pavleradojkovic | 0:ba7e439238ab | 253 | uint16_t msgId = packet->getMsgId(); |
pavleradojkovic | 0:ba7e439238ab | 254 | uint16_t clientMsgId = 0; |
pavleradojkovic | 0:ba7e439238ab | 255 | Client* newClient = _gateway->getAdapterManager()->convertClient(msgId, &clientMsgId); |
pavleradojkovic | 0:ba7e439238ab | 256 | if ( newClient != nullptr ) |
pavleradojkovic | 0:ba7e439238ab | 257 | { |
pavleradojkovic | 0:ba7e439238ab | 258 | packet->setMsgId((int)clientMsgId); |
pavleradojkovic | 0:ba7e439238ab | 259 | handleAck(newClient, packet,type); |
pavleradojkovic | 0:ba7e439238ab | 260 | } |
pavleradojkovic | 0:ba7e439238ab | 261 | } |
pavleradojkovic | 0:ba7e439238ab | 262 | |
pavleradojkovic | 0:ba7e439238ab | 263 | void MQTTGWPublishHandler::handleAggregatePubrel(Client* client, MQTTGWPacket* packet) |
pavleradojkovic | 0:ba7e439238ab | 264 | { |
pavleradojkovic | 0:ba7e439238ab | 265 | Publish pub; |
pavleradojkovic | 0:ba7e439238ab | 266 | packet->getPUBLISH(&pub); |
pavleradojkovic | 0:ba7e439238ab | 267 | replyACK(client, &pub, PUBCOMP); |
pavleradojkovic | 0:ba7e439238ab | 268 | } |
pavleradojkovic | 0:ba7e439238ab | 269 | |
pavleradojkovic | 0:ba7e439238ab | 270 | void MQTTGWPublishHandler::handleAggregatePublish(Client* client, MQTTGWPacket* packet) |
pavleradojkovic | 0:ba7e439238ab | 271 | { |
pavleradojkovic | 0:ba7e439238ab | 272 | Publish pub; |
pavleradojkovic | 0:ba7e439238ab | 273 | packet->getPUBLISH(&pub); |
pavleradojkovic | 0:ba7e439238ab | 274 | |
pavleradojkovic | 0:ba7e439238ab | 275 | WRITELOG(FORMAT_Y_G_G, currentDateTime(), packet->getName(), |
pavleradojkovic | 0:ba7e439238ab | 276 | RIGHTARROW, client->getClientId(), "is sleeping. a message was saved."); |
pavleradojkovic | 0:ba7e439238ab | 277 | |
pavleradojkovic | 0:ba7e439238ab | 278 | if (pub.header.bits.qos == 1) |
pavleradojkovic | 0:ba7e439238ab | 279 | { |
pavleradojkovic | 0:ba7e439238ab | 280 | replyACK(client, &pub, PUBACK); |
pavleradojkovic | 0:ba7e439238ab | 281 | } |
pavleradojkovic | 0:ba7e439238ab | 282 | else if ( pub.header.bits.qos == 2) |
pavleradojkovic | 0:ba7e439238ab | 283 | { |
pavleradojkovic | 0:ba7e439238ab | 284 | replyACK(client, &pub, PUBREC); |
pavleradojkovic | 0:ba7e439238ab | 285 | } |
pavleradojkovic | 0:ba7e439238ab | 286 | |
pavleradojkovic | 0:ba7e439238ab | 287 | |
pavleradojkovic | 0:ba7e439238ab | 288 | |
pavleradojkovic | 0:ba7e439238ab | 289 | string* topicName = new string(pub.topic, pub.topiclen); |
pavleradojkovic | 0:ba7e439238ab | 290 | Topic topic = Topic(topicName, MQTTSN_TOPIC_TYPE_NORMAL); |
pavleradojkovic | 0:ba7e439238ab | 291 | AggregateTopicElement* list = _gateway->getAdapterManager()->createClientList(&topic); |
pavleradojkovic | 0:ba7e439238ab | 292 | if ( list != nullptr ) |
pavleradojkovic | 0:ba7e439238ab | 293 | { |
pavleradojkovic | 0:ba7e439238ab | 294 | ClientTopicElement* p = list->getFirstElement(); |
pavleradojkovic | 0:ba7e439238ab | 295 | |
pavleradojkovic | 0:ba7e439238ab | 296 | while ( p ) |
pavleradojkovic | 0:ba7e439238ab | 297 | { |
pavleradojkovic | 0:ba7e439238ab | 298 | Client* devClient = p->getClient(); |
pavleradojkovic | 0:ba7e439238ab | 299 | if ( devClient != nullptr ) |
pavleradojkovic | 0:ba7e439238ab | 300 | { |
pavleradojkovic | 0:ba7e439238ab | 301 | MQTTGWPacket* msg = new MQTTGWPacket(); |
pavleradojkovic | 0:ba7e439238ab | 302 | *msg = *packet; |
pavleradojkovic | 0:ba7e439238ab | 303 | if ( msg->getType() == 0 ) |
pavleradojkovic | 0:ba7e439238ab | 304 | { |
pavleradojkovic | 0:ba7e439238ab | 305 | WRITELOG("%s MQTTGWPublishHandler::handleAggregatePublish can't allocate memories for Packet.%s\n", ERRMSG_HEADER,ERRMSG_FOOTER); |
pavleradojkovic | 0:ba7e439238ab | 306 | delete msg; |
pavleradojkovic | 0:ba7e439238ab | 307 | break; |
pavleradojkovic | 0:ba7e439238ab | 308 | } |
pavleradojkovic | 0:ba7e439238ab | 309 | Event* ev = new Event(); |
pavleradojkovic | 0:ba7e439238ab | 310 | ev->setBrokerRecvEvent(devClient, msg); |
pavleradojkovic | 0:ba7e439238ab | 311 | _gateway->getPacketEventQue()->post(ev); |
pavleradojkovic | 0:ba7e439238ab | 312 | } |
pavleradojkovic | 0:ba7e439238ab | 313 | else |
pavleradojkovic | 0:ba7e439238ab | 314 | { |
pavleradojkovic | 0:ba7e439238ab | 315 | break; |
pavleradojkovic | 0:ba7e439238ab | 316 | } |
pavleradojkovic | 0:ba7e439238ab | 317 | |
pavleradojkovic | 0:ba7e439238ab | 318 | p = list->getNextElement(p); |
pavleradojkovic | 0:ba7e439238ab | 319 | } |
pavleradojkovic | 0:ba7e439238ab | 320 | delete list; |
pavleradojkovic | 0:ba7e439238ab | 321 | } |
pavleradojkovic | 0:ba7e439238ab | 322 | } |
pavleradojkovic | 0:ba7e439238ab | 323 |