Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
MQTTSNGWConnectionHandler.cpp
00001 /************************************************************************************** 00002 * Copyright (c) 2016, Tomoaki Yamaguchi 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Tomoaki Yamaguchi - initial API and implementation and/or initial documentation 00015 **************************************************************************************/ 00016 00017 #include "MQTTSNGWConnectionHandler.h" 00018 #include "MQTTSNGateway.h" 00019 #include "MQTTSNGWPacket.h" 00020 #include "MQTTGWPacket.h" 00021 #include <string.h> 00022 00023 using namespace std; 00024 using namespace MQTTSNGW; 00025 00026 /*===================================== 00027 Class MQTTSNConnectionHandler 00028 =====================================*/ 00029 MQTTSNConnectionHandler::MQTTSNConnectionHandler(Gateway* gateway) 00030 { 00031 _gateway = gateway; 00032 } 00033 00034 MQTTSNConnectionHandler::~MQTTSNConnectionHandler() 00035 { 00036 00037 } 00038 00039 /* 00040 * ADVERTISE 00041 */ 00042 void MQTTSNConnectionHandler::sendADVERTISE() 00043 { 00044 MQTTSNPacket* adv = new MQTTSNPacket(); 00045 adv->setADVERTISE(_gateway->getGWParams()->gatewayId, _gateway->getGWParams()->keepAlive); 00046 Event* ev1 = new Event(); 00047 ev1->setBrodcastEvent(adv); //broadcast 00048 _gateway->getClientSendQue()->post(ev1); 00049 } 00050 00051 /* 00052 * SEARCHGW 00053 */ 00054 void MQTTSNConnectionHandler::handleSearchgw(MQTTSNPacket* packet) 00055 { 00056 if (packet->getType() == MQTTSN_SEARCHGW) 00057 { 00058 MQTTSNPacket* gwinfo = new MQTTSNPacket(); 00059 gwinfo->setGWINFO(_gateway->getGWParams()->gatewayId); 00060 Event* ev1 = new Event(); 00061 ev1->setBrodcastEvent(gwinfo); 00062 _gateway->getClientSendQue()->post(ev1); 00063 } 00064 } 00065 00066 /* 00067 * CONNECT 00068 */ 00069 void MQTTSNConnectionHandler::handleConnect(Client* client, MQTTSNPacket* packet) 00070 { 00071 MQTTSNPacket_connectData data; 00072 if ( packet->getCONNECT(&data) == 0 ) 00073 { 00074 return; 00075 } 00076 00077 /* return CONNACK when the client is sleeping */ 00078 if ( client->isSleep() || client->isAwake() ) 00079 { 00080 MQTTSNPacket* packet = new MQTTSNPacket(); 00081 packet->setCONNACK(MQTTSN_RC_ACCEPTED); 00082 Event* ev = new Event(); 00083 ev->setClientSendEvent(client, packet); 00084 _gateway->getClientSendQue()->post(ev); 00085 00086 sendStoredPublish(client); 00087 return; 00088 } 00089 00090 //* clear ConnectData of Client */ 00091 Connect* connectData = client->getConnectData(); 00092 memset(connectData, 0, sizeof(Connect)); 00093 if ( !client->isAdapter() ) 00094 { 00095 client->disconnected(); 00096 } 00097 00098 Topics* topics = client->getTopics(); 00099 00100 /* CONNECT was not sent yet. prepare Connect data */ 00101 connectData->header.bits.type = CONNECT; 00102 connectData->clientID = client->getClientId(); 00103 connectData->version = _gateway->getGWParams()->mqttVersion; 00104 connectData->keepAliveTimer = data.duration; 00105 connectData->flags.bits.will = data.willFlag; 00106 00107 if ((const char*) _gateway->getGWParams()->loginId != nullptr && (const char*) _gateway->getGWParams()->password != 0) 00108 { 00109 connectData->flags.bits.password = 1; 00110 connectData->flags.bits.username = 1; 00111 } 00112 00113 client->setSessionStatus(false); 00114 if (data.cleansession) 00115 { 00116 connectData->flags.bits.cleanstart = 1; 00117 /* reset the table of msgNo and TopicId pare */ 00118 client->clearWaitedPubTopicId(); 00119 client->clearWaitedSubTopicId(); 00120 00121 /* renew the TopicList */ 00122 if (topics) 00123 { 00124 topics->eraseNormal();; 00125 } 00126 client->setSessionStatus(true); 00127 } 00128 00129 if (data.willFlag) 00130 { 00131 /* create & send WILLTOPICREQ message to the client */ 00132 MQTTSNPacket* reqTopic = new MQTTSNPacket(); 00133 reqTopic->setWILLTOPICREQ(); 00134 Event* evwr = new Event(); 00135 evwr->setClientSendEvent(client, reqTopic); 00136 00137 /* Send WILLTOPICREQ to the client */ 00138 _gateway->getClientSendQue()->post(evwr); 00139 } 00140 else 00141 { 00142 /* CONNECT message was not qued in. 00143 * create CONNECT message & send it to the broker */ 00144 MQTTGWPacket* mqMsg = new MQTTGWPacket(); 00145 mqMsg->setCONNECT(client->getConnectData(), (unsigned char*)_gateway->getGWParams()->loginId, (unsigned char*)_gateway->getGWParams()->password); 00146 Event* ev1 = new Event(); 00147 ev1->setBrokerSendEvent(client, mqMsg); 00148 _gateway->getBrokerSendQue()->post(ev1); 00149 } 00150 } 00151 00152 /* 00153 * WILLTOPIC 00154 */ 00155 void MQTTSNConnectionHandler::handleWilltopic(Client* client, MQTTSNPacket* packet) 00156 { 00157 int willQos; 00158 uint8_t willRetain; 00159 MQTTSNString willTopic = MQTTSNString_initializer; 00160 00161 if ( packet->getWILLTOPIC(&willQos, &willRetain, &willTopic) == 0 ) 00162 { 00163 return; 00164 } 00165 client->setWillTopic(willTopic); 00166 Connect* connectData = client->getConnectData(); 00167 00168 /* add the connectData for MQTT CONNECT message */ 00169 connectData->willTopic = client->getWillTopic(); 00170 connectData->flags.bits.willQoS = willQos; 00171 connectData->flags.bits.willRetain = willRetain; 00172 00173 /* Send WILLMSGREQ to the client */ 00174 client->setWaitWillMsgFlg(true); 00175 MQTTSNPacket* reqMsg = new MQTTSNPacket(); 00176 reqMsg->setWILLMSGREQ(); 00177 Event* evt = new Event(); 00178 evt->setClientSendEvent(client, reqMsg); 00179 _gateway->getClientSendQue()->post(evt); 00180 } 00181 00182 /* 00183 * WILLMSG 00184 */ 00185 void MQTTSNConnectionHandler::handleWillmsg(Client* client, MQTTSNPacket* packet) 00186 { 00187 if ( !client->isWaitWillMsg() ) 00188 { 00189 DEBUGLOG(" MQTTSNConnectionHandler::handleWillmsg WaitWillMsgFlg is off.\n"); 00190 return; 00191 } 00192 00193 MQTTSNString willmsg = MQTTSNString_initializer; 00194 Connect* connectData = client->getConnectData(); 00195 00196 if( client->isConnectSendable() ) 00197 { 00198 /* save WillMsg in the client */ 00199 if ( packet->getWILLMSG(&willmsg) == 0 ) 00200 { 00201 return; 00202 } 00203 client->setWillMsg(willmsg); 00204 00205 /* create CONNECT message */ 00206 MQTTGWPacket* mqttPacket = new MQTTGWPacket(); 00207 connectData->willMsg = client->getWillMsg(); 00208 mqttPacket->setCONNECT(connectData, (unsigned char*)_gateway->getGWParams()->loginId, (unsigned char*)_gateway->getGWParams()->password); 00209 00210 /* Send CONNECT to the broker */ 00211 Event* evt = new Event(); 00212 evt->setBrokerSendEvent(client, mqttPacket); 00213 client->setWaitWillMsgFlg(false); 00214 _gateway->getBrokerSendQue()->post(evt); 00215 } 00216 } 00217 00218 /* 00219 * DISCONNECT 00220 */ 00221 void MQTTSNConnectionHandler::handleDisconnect(Client* client, MQTTSNPacket* packet) 00222 { 00223 uint16_t duration = 0; 00224 00225 if ( packet->getDISCONNECT(&duration) != 0 ) 00226 { 00227 if ( duration == 0 ) 00228 { 00229 MQTTGWPacket* mqMsg = new MQTTGWPacket(); 00230 mqMsg->setHeader(DISCONNECT); 00231 Event* ev = new Event(); 00232 ev->setBrokerSendEvent(client, mqMsg); 00233 _gateway->getBrokerSendQue()->post(ev); 00234 } 00235 } 00236 00237 MQTTSNPacket* snMsg = new MQTTSNPacket(); 00238 snMsg->setDISCONNECT(0); 00239 Event* evt = new Event(); 00240 evt->setClientSendEvent(client, snMsg); 00241 _gateway->getClientSendQue()->post(evt); 00242 } 00243 00244 /* 00245 * WILLTOPICUPD 00246 */ 00247 void MQTTSNConnectionHandler::handleWilltopicupd(Client* client, MQTTSNPacket* packet) 00248 { 00249 /* send NOT_SUPPORTED responce to the client */ 00250 MQTTSNPacket* respMsg = new MQTTSNPacket(); 00251 respMsg->setWILLTOPICRESP(MQTTSN_RC_NOT_SUPPORTED); 00252 Event* evt = new Event(); 00253 evt->setClientSendEvent(client, respMsg); 00254 _gateway->getClientSendQue()->post(evt); 00255 } 00256 00257 /* 00258 * WILLMSGUPD 00259 */ 00260 void MQTTSNConnectionHandler::handleWillmsgupd(Client* client, MQTTSNPacket* packet) 00261 { 00262 /* send NOT_SUPPORTED responce to the client */ 00263 MQTTSNPacket* respMsg = new MQTTSNPacket(); 00264 respMsg->setWILLMSGRESP(MQTTSN_RC_NOT_SUPPORTED); 00265 Event* evt = new Event(); 00266 evt->setClientSendEvent(client, respMsg); 00267 _gateway->getClientSendQue()->post(evt); 00268 } 00269 00270 /* 00271 * PINGREQ 00272 */ 00273 void MQTTSNConnectionHandler::handlePingreq(Client* client, MQTTSNPacket* packet) 00274 { 00275 if ( ( client->isSleep() || client->isAwake() ) && client->getClientSleepPacket() ) 00276 { 00277 sendStoredPublish(client); 00278 client->holdPingRequest(); 00279 } 00280 else 00281 { 00282 /* send PINGREQ to the broker */ 00283 client->resetPingRequest(); 00284 MQTTGWPacket* pingreq = new MQTTGWPacket(); 00285 pingreq->setHeader(PINGREQ); 00286 Event* evt = new Event(); 00287 evt->setBrokerSendEvent(client, pingreq); 00288 _gateway->getBrokerSendQue()->post(evt); 00289 } 00290 } 00291 00292 void MQTTSNConnectionHandler::sendStoredPublish(Client* client) 00293 { 00294 MQTTGWPacket* msg = nullptr; 00295 00296 while ( ( msg = client->getClientSleepPacket() ) != nullptr ) 00297 { 00298 // ToDo: This version can't re-send PUBLISH when PUBACK is not returned. 00299 client->deleteFirstClientSleepPacket(); // pop the que to delete element. 00300 00301 Event* ev = new Event(); 00302 ev->setBrokerRecvEvent(client, msg); 00303 _gateway->getPacketEventQue()->post(ev); 00304 } 00305 }
Generated on Wed Jul 13 2022 10:46:03 by
1.7.2