Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
MQTTGWPublishHandler.cpp
00001 /************************************************************************************** 00002 * Copyright (c) 2016, Tomoaki Yamaguchi 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Tomoaki Yamaguchi - initial API and implementation and/or initial documentation 00015 * Tieto Poland Sp. z o.o. - Gateway improvements 00016 **************************************************************************************/ 00017 00018 #include "MQTTGWPublishHandler.h" 00019 #include "MQTTSNGateway.h" 00020 #include "MQTTSNPacket.h" 00021 #include <string> 00022 00023 using namespace std; 00024 using namespace MQTTSNGW; 00025 00026 char* currentDateTime(void); 00027 00028 MQTTGWPublishHandler::MQTTGWPublishHandler(Gateway* gateway) 00029 { 00030 _gateway = gateway; 00031 } 00032 00033 MQTTGWPublishHandler::~MQTTGWPublishHandler() 00034 { 00035 00036 } 00037 00038 void MQTTGWPublishHandler::handlePublish(Client* client, MQTTGWPacket* packet) 00039 { 00040 if ( !client->isActive() && !client->isSleep() && !client->isAwake()) 00041 { 00042 WRITELOG("%s The client is neither active nor sleep %s%s\n", ERRMSG_HEADER, client->getStatus(), ERRMSG_FOOTER); 00043 return; 00044 } 00045 00046 /* client is sleeping. save PUBLISH */ 00047 if ( client->isSleep() ) 00048 { 00049 Publish pub; 00050 packet->getPUBLISH(&pub); 00051 00052 WRITELOG(FORMAT_Y_G_G, currentDateTime(), packet->getName(), 00053 RIGHTARROW, client->getClientId(), "is sleeping. a message was saved."); 00054 00055 if (pub.header.bits.qos == 1) 00056 { 00057 replyACK(client, &pub, PUBACK); 00058 } 00059 else if ( pub.header.bits.qos == 2) 00060 { 00061 replyACK(client, &pub, PUBREC); 00062 } 00063 00064 MQTTGWPacket* msg = new MQTTGWPacket(); 00065 *msg = *packet; 00066 if ( msg->getType() == 0 ) 00067 { 00068 WRITELOG("%s MQTTGWPublishHandler::handlePublish can't allocate memories for Packet.%s\n", ERRMSG_HEADER,ERRMSG_FOOTER); 00069 delete msg; 00070 return; 00071 } 00072 client->setClientSleepPacket(msg); 00073 return; 00074 } 00075 00076 Publish pub; 00077 packet->getPUBLISH(&pub); 00078 00079 MQTTSNPacket* snPacket = new MQTTSNPacket(); 00080 00081 /* create MQTTSN_topicid */ 00082 MQTTSN_topicid topicId; 00083 uint16_t id = 0; 00084 00085 if (pub.topiclen <= 2) 00086 { 00087 topicId.type = MQTTSN_TOPIC_TYPE_SHORT; 00088 *(topicId.data.short_name) = *pub.topic; 00089 *(topicId.data.short_name + 1) = *(pub.topic + 1); 00090 } 00091 else 00092 { 00093 topicId.data.long_.len = pub.topiclen; 00094 topicId.data.long_.name = pub.topic; 00095 Topic* tp = client->getTopics()->getTopicByName(&topicId); 00096 00097 if ( tp ) 00098 { 00099 topicId.type = tp->getType(); 00100 topicId.data.long_.len = pub.topiclen; 00101 topicId.data.long_.name = pub.topic; 00102 topicId.data.id = tp->getTopicId(); 00103 } 00104 else 00105 { 00106 /* This message might be subscribed with wild card. */ 00107 topicId.type = MQTTSN_TOPIC_TYPE_NORMAL; 00108 Topic* topic = client->getTopics()->match(&topicId); 00109 if (topic == nullptr) 00110 { 00111 WRITELOG(" Invalid Topic. PUBLISH message is canceled.\n"); 00112 if (pub.header.bits.qos == 1) 00113 { 00114 replyACK(client, &pub, PUBACK); 00115 } 00116 else if ( pub.header.bits.qos == 2 ) 00117 { 00118 replyACK(client, &pub, PUBREC); 00119 } 00120 00121 delete snPacket; 00122 return; 00123 } 00124 00125 /* add the Topic and get a TopicId */ 00126 topic = client->getTopics()->add(&topicId); 00127 id = topic->getTopicId(); 00128 00129 if (id > 0) 00130 { 00131 /* create REGISTER */ 00132 MQTTSNPacket* regPacket = new MQTTSNPacket(); 00133 00134 MQTTSNString topicName = MQTTSNString_initializer; 00135 topicName.lenstring.len = topicId.data.long_.len; 00136 topicName.lenstring.data = topicId.data.long_.name; 00137 00138 uint16_t regackMsgId = client->getNextSnMsgId(); 00139 regPacket->setREGISTER(id, regackMsgId, &topicName); 00140 00141 /* send REGISTER */ 00142 Event* evrg = new Event(); 00143 evrg->setClientSendEvent(client, regPacket); 00144 _gateway->getClientSendQue()->post(evrg); 00145 00146 /* send PUBLISH */ 00147 topicId.data.id = id; 00148 snPacket->setPUBLISH((uint8_t) pub.header.bits.dup, (int) pub.header.bits.qos, 00149 (uint8_t) pub.header.bits.retain, (uint16_t) pub.msgId, topicId, (uint8_t*) pub.payload, 00150 pub.payloadlen); 00151 client->getWaitREGACKPacketList()->setPacket(snPacket, regackMsgId); 00152 return; 00153 } 00154 else 00155 { 00156 WRITELOG("%sMQTTGWPublishHandler Can't create a Topic.%s\n", ERRMSG_HEADER,ERRMSG_FOOTER); 00157 delete snPacket; 00158 return; 00159 } 00160 } 00161 } 00162 00163 snPacket->setPUBLISH((uint8_t) pub.header.bits.dup, (int) pub.header.bits.qos, (uint8_t) pub.header.bits.retain, 00164 (uint16_t) pub.msgId, topicId, (uint8_t*) pub.payload, pub.payloadlen); 00165 Event* ev1 = new Event(); 00166 ev1->setClientSendEvent(client, snPacket); 00167 _gateway->getClientSendQue()->post(ev1); 00168 00169 } 00170 00171 void MQTTGWPublishHandler::replyACK(Client* client, Publish* pub, int type) 00172 { 00173 MQTTGWPacket* pubAck = new MQTTGWPacket(); 00174 pubAck->setAck(type, (uint16_t)pub->msgId); 00175 Event* ev1 = new Event(); 00176 ev1->setBrokerSendEvent(client, pubAck); 00177 _gateway->getBrokerSendQue()->post(ev1); 00178 } 00179 00180 void MQTTGWPublishHandler::handlePuback(Client* client, MQTTGWPacket* packet) 00181 { 00182 Ack ack; 00183 packet->getAck(&ack); 00184 TopicIdMapElement* topicId = client->getWaitedPubTopicId((uint16_t)ack.msgId); 00185 if (topicId) 00186 { 00187 MQTTSNPacket* mqttsnPacket = new MQTTSNPacket(); 00188 mqttsnPacket->setPUBACK(topicId->getTopicId(), (uint16_t)ack.msgId, 0); 00189 00190 client->eraseWaitedPubTopicId((uint16_t)ack.msgId); 00191 Event* ev1 = new Event(); 00192 ev1->setClientSendEvent(client, mqttsnPacket); 00193 _gateway->getClientSendQue()->post(ev1); 00194 return; 00195 } 00196 WRITELOG(" PUBACK from the Broker is invalid. PacketID : %04X ClientID : %s \n", (uint16_t)ack.msgId, client->getClientId()); 00197 } 00198 00199 void MQTTGWPublishHandler::handleAck(Client* client, MQTTGWPacket* packet, int type) 00200 { 00201 Ack ack; 00202 packet->getAck(&ack); 00203 00204 if ( client->isActive() || client->isAwake() ) 00205 { 00206 MQTTSNPacket* mqttsnPacket = new MQTTSNPacket(); 00207 if (type == PUBREC) 00208 { 00209 mqttsnPacket->setPUBREC((uint16_t) ack.msgId); 00210 } 00211 else if (type == PUBREL) 00212 { 00213 mqttsnPacket->setPUBREL((uint16_t) ack.msgId); 00214 } 00215 else if (type == PUBCOMP) 00216 { 00217 mqttsnPacket->setPUBCOMP((uint16_t) ack.msgId); 00218 } 00219 00220 Event* ev1 = new Event(); 00221 ev1->setClientSendEvent(client, mqttsnPacket); 00222 _gateway->getClientSendQue()->post(ev1); 00223 } 00224 else if ( client->isSleep() ) 00225 { 00226 if (type == PUBREL) 00227 { 00228 MQTTGWPacket* pubComp = new MQTTGWPacket(); 00229 pubComp->setAck(PUBCOMP, (uint16_t)ack.msgId); 00230 Event* ev1 = new Event(); 00231 ev1->setBrokerSendEvent(client, pubComp); 00232 _gateway->getBrokerSendQue()->post(ev1); 00233 } 00234 } 00235 } 00236 00237 00238 00239 void MQTTGWPublishHandler::handleAggregatePuback(Client* client, MQTTGWPacket* packet) 00240 { 00241 uint16_t msgId = packet->getMsgId(); 00242 uint16_t clientMsgId = 0; 00243 Client* newClient = _gateway->getAdapterManager()->convertClient(msgId, &clientMsgId); 00244 if ( newClient != nullptr ) 00245 { 00246 packet->setMsgId((int)clientMsgId); 00247 handlePuback(newClient, packet); 00248 } 00249 } 00250 00251 void MQTTGWPublishHandler::handleAggregateAck(Client* client, MQTTGWPacket* packet, int type) 00252 { 00253 uint16_t msgId = packet->getMsgId(); 00254 uint16_t clientMsgId = 0; 00255 Client* newClient = _gateway->getAdapterManager()->convertClient(msgId, &clientMsgId); 00256 if ( newClient != nullptr ) 00257 { 00258 packet->setMsgId((int)clientMsgId); 00259 handleAck(newClient, packet,type); 00260 } 00261 } 00262 00263 void MQTTGWPublishHandler::handleAggregatePubrel(Client* client, MQTTGWPacket* packet) 00264 { 00265 Publish pub; 00266 packet->getPUBLISH(&pub); 00267 replyACK(client, &pub, PUBCOMP); 00268 } 00269 00270 void MQTTGWPublishHandler::handleAggregatePublish(Client* client, MQTTGWPacket* packet) 00271 { 00272 Publish pub; 00273 packet->getPUBLISH(&pub); 00274 00275 WRITELOG(FORMAT_Y_G_G, currentDateTime(), packet->getName(), 00276 RIGHTARROW, client->getClientId(), "is sleeping. a message was saved."); 00277 00278 if (pub.header.bits.qos == 1) 00279 { 00280 replyACK(client, &pub, PUBACK); 00281 } 00282 else if ( pub.header.bits.qos == 2) 00283 { 00284 replyACK(client, &pub, PUBREC); 00285 } 00286 00287 00288 00289 string* topicName = new string(pub.topic, pub.topiclen); 00290 Topic topic = Topic(topicName, MQTTSN_TOPIC_TYPE_NORMAL); 00291 AggregateTopicElement* list = _gateway->getAdapterManager()->createClientList(&topic); 00292 if ( list != nullptr ) 00293 { 00294 ClientTopicElement* p = list->getFirstElement(); 00295 00296 while ( p ) 00297 { 00298 Client* devClient = p->getClient(); 00299 if ( devClient != nullptr ) 00300 { 00301 MQTTGWPacket* msg = new MQTTGWPacket(); 00302 *msg = *packet; 00303 if ( msg->getType() == 0 ) 00304 { 00305 WRITELOG("%s MQTTGWPublishHandler::handleAggregatePublish can't allocate memories for Packet.%s\n", ERRMSG_HEADER,ERRMSG_FOOTER); 00306 delete msg; 00307 break; 00308 } 00309 Event* ev = new Event(); 00310 ev->setBrokerRecvEvent(devClient, msg); 00311 _gateway->getPacketEventQue()->post(ev); 00312 } 00313 else 00314 { 00315 break; 00316 } 00317 00318 p = list->getNextElement(p); 00319 } 00320 delete list; 00321 } 00322 } 00323
Generated on Wed Jul 13 2022 10:46:02 by
1.7.2