Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
MQTTSNAggregateConnectionHandler.cpp
00001 /************************************************************************************** 00002 * Copyright (c) 2018, Tomoaki Yamaguchi 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Tomoaki Yamaguchi - initial API and implementation and/or initial documentation 00015 **************************************************************************************/ 00016 00017 #include "MQTTSNAggregateConnectionHandler.h" 00018 #include "MQTTSNGateway.h" 00019 #include "MQTTSNGWPacket.h" 00020 #include "MQTTGWPacket.h" 00021 #include <string.h> 00022 00023 using namespace std; 00024 using namespace MQTTSNGW; 00025 00026 /*===================================== 00027 Class MQTTSNAggregateConnectionHandler 00028 =====================================*/ 00029 MQTTSNAggregateConnectionHandler::MQTTSNAggregateConnectionHandler(Gateway* gateway) 00030 { 00031 _gateway = gateway; 00032 } 00033 00034 MQTTSNAggregateConnectionHandler::~MQTTSNAggregateConnectionHandler() 00035 { 00036 00037 } 00038 00039 00040 /* 00041 * CONNECT 00042 */ 00043 void MQTTSNAggregateConnectionHandler::handleConnect(Client* client, MQTTSNPacket* packet) 00044 { 00045 MQTTSNPacket_connectData data; 00046 if ( packet->getCONNECT(&data) == 0 ) 00047 { 00048 return; 00049 } 00050 00051 /* return CONNACK when the client is sleeping */ 00052 if ( client->isSleep() || client->isAwake() ) 00053 { 00054 MQTTSNPacket* packet = new MQTTSNPacket(); 00055 packet->setCONNACK(MQTTSN_RC_ACCEPTED); 00056 Event* ev = new Event(); 00057 ev->setClientSendEvent(client, packet); 00058 _gateway->getClientSendQue()->post(ev); 00059 sendStoredPublish(client); 00060 return; 00061 } 00062 00063 //* clear ConnectData of Client */ 00064 Connect* connectData = client->getConnectData(); 00065 memset(connectData, 0, sizeof(Connect)); 00066 00067 client->disconnected(); 00068 00069 Topics* topics = client->getTopics(); 00070 00071 /* CONNECT was not sent yet. prepare Connect data */ 00072 00073 00074 client->setSessionStatus(false); 00075 if (data.cleansession) 00076 { 00077 /* reset the table of msgNo and TopicId pare */ 00078 client->clearWaitedPubTopicId(); 00079 client->clearWaitedSubTopicId(); 00080 00081 /* renew the TopicList */ 00082 if (topics) 00083 { 00084 _gateway->getAdapterManager()->removeAggregateTopicList(topics, client); 00085 topics->eraseNormal(); 00086 } 00087 client->setSessionStatus(true); 00088 } 00089 00090 if (data.willFlag) 00091 { 00092 /* create & send WILLTOPICREQ message to the client */ 00093 MQTTSNPacket* reqTopic = new MQTTSNPacket(); 00094 reqTopic->setWILLTOPICREQ(); 00095 Event* evwr = new Event(); 00096 evwr->setClientSendEvent(client, reqTopic); 00097 00098 /* Send WILLTOPICREQ to the client */ 00099 _gateway->getClientSendQue()->post(evwr); 00100 } 00101 else 00102 { 00103 /* create CONNACK & send it to the client */ 00104 MQTTSNPacket* packet = new MQTTSNPacket(); 00105 packet->setCONNACK(MQTTSN_RC_ACCEPTED); 00106 Event* ev = new Event(); 00107 ev->setClientSendEvent(client, packet); 00108 _gateway->getClientSendQue()->post(ev); 00109 client->connackSended(MQTTSN_RC_ACCEPTED); 00110 sendStoredPublish(client); 00111 return; 00112 } 00113 } 00114 00115 00116 /* 00117 * WILLMSG 00118 */ 00119 void MQTTSNAggregateConnectionHandler::handleWillmsg(Client* client, MQTTSNPacket* packet) 00120 { 00121 if ( !client->isWaitWillMsg() ) 00122 { 00123 DEBUGLOG(" MQTTSNConnectionHandler::handleWillmsg WaitWillMsgFlg is off.\n"); 00124 return; 00125 } 00126 00127 MQTTSNString willmsg = MQTTSNString_initializer; 00128 //Connect* connectData = client->getConnectData(); 00129 00130 if( client->isConnectSendable() ) 00131 { 00132 /* save WillMsg in the client */ 00133 if ( packet->getWILLMSG(&willmsg) == 0 ) 00134 { 00135 return; 00136 } 00137 client->setWillMsg(willmsg); 00138 00139 /* Send CONNACK to the client */ 00140 MQTTSNPacket* packet = new MQTTSNPacket(); 00141 packet->setCONNACK(MQTTSN_RC_ACCEPTED); 00142 Event* ev = new Event(); 00143 ev->setClientSendEvent(client, packet); 00144 _gateway->getClientSendQue()->post(ev); 00145 00146 sendStoredPublish(client); 00147 return; 00148 } 00149 } 00150 00151 /* 00152 * DISCONNECT 00153 */ 00154 void MQTTSNAggregateConnectionHandler::handleDisconnect(Client* client, MQTTSNPacket* packet) 00155 { 00156 MQTTSNPacket* snMsg = new MQTTSNPacket(); 00157 snMsg->setDISCONNECT(0); 00158 Event* evt = new Event(); 00159 evt->setClientSendEvent(client, snMsg); 00160 _gateway->getClientSendQue()->post(evt); 00161 } 00162 00163 /* 00164 * PINGREQ 00165 */ 00166 void MQTTSNAggregateConnectionHandler::handlePingreq(Client* client, MQTTSNPacket* packet) 00167 { 00168 if ( ( client->isSleep() || client->isAwake() ) && client->getClientSleepPacket() ) 00169 { 00170 sendStoredPublish(client); 00171 client->holdPingRequest(); 00172 } 00173 else 00174 { 00175 /* create and send PINGRESP to the PacketHandler */ 00176 client->resetPingRequest(); 00177 00178 MQTTGWPacket* pingresp = new MQTTGWPacket(); 00179 00180 pingresp->setHeader(PINGRESP); 00181 00182 Event* evt = new Event(); 00183 evt->setBrokerRecvEvent(client, pingresp); 00184 _gateway->getPacketEventQue()->post(evt); 00185 } 00186 } 00187 00188 void MQTTSNAggregateConnectionHandler::sendStoredPublish(Client* client) 00189 { 00190 MQTTGWPacket* msg = nullptr; 00191 00192 while ( ( msg = client->getClientSleepPacket() ) != nullptr ) 00193 { 00194 // ToDo: This version can't re-send PUBLISH when PUBACK is not returned. 00195 client->deleteFirstClientSleepPacket(); // pop the que to delete element. 00196 00197 Event* ev = new Event(); 00198 ev->setBrokerRecvEvent(client, msg); 00199 _gateway->getPacketEventQue()->post(ev); 00200 } 00201 } 00202
Generated on Wed Jul 13 2022 10:46:02 by
1.7.2