Jack Hansdampf / mbed-mqtt-GSOE1

Dependents:   ESP8266MQTT

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTSNGWConnectionHandler.cpp Source File

MQTTSNGWConnectionHandler.cpp

00001 /**************************************************************************************
00002  * Copyright (c) 2016, Tomoaki Yamaguchi
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors:
00014  *    Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
00015  **************************************************************************************/
00016 
00017 #include "MQTTSNGWConnectionHandler.h"
00018 #include "MQTTSNGateway.h"
00019 #include "MQTTSNGWPacket.h"
00020 #include "MQTTGWPacket.h"
00021 #include <string.h>
00022 
00023 using namespace std;
00024 using namespace MQTTSNGW;
00025 
00026 /*=====================================
00027  Class MQTTSNConnectionHandler
00028  =====================================*/
00029 MQTTSNConnectionHandler::MQTTSNConnectionHandler(Gateway* gateway)
00030 {
00031     _gateway = gateway;
00032 }
00033 
00034 MQTTSNConnectionHandler::~MQTTSNConnectionHandler()
00035 {
00036 
00037 }
00038 
00039 /*
00040  *  ADVERTISE
00041  */
00042 void MQTTSNConnectionHandler::sendADVERTISE()
00043 {
00044     MQTTSNPacket* adv = new MQTTSNPacket();
00045     adv->setADVERTISE(_gateway->getGWParams()->gatewayId, _gateway->getGWParams()->keepAlive);
00046     Event* ev1 = new Event();
00047     ev1->setBrodcastEvent(adv);  //broadcast
00048     _gateway->getClientSendQue()->post(ev1);
00049 }
00050 
00051 /*
00052  *  SEARCHGW
00053  */
00054 void MQTTSNConnectionHandler::handleSearchgw(MQTTSNPacket* packet)
00055 {
00056     if (packet->getType() == MQTTSN_SEARCHGW)
00057     {
00058         MQTTSNPacket* gwinfo = new MQTTSNPacket();
00059         gwinfo->setGWINFO(_gateway->getGWParams()->gatewayId);
00060         Event* ev1 = new Event();
00061         ev1->setBrodcastEvent(gwinfo);
00062         _gateway->getClientSendQue()->post(ev1);
00063     }
00064 }
00065 
00066 /*
00067  *  CONNECT
00068  */
00069 void MQTTSNConnectionHandler::handleConnect(Client* client, MQTTSNPacket* packet)
00070 {
00071     MQTTSNPacket_connectData data;
00072     if ( packet->getCONNECT(&data) == 0 )
00073     {
00074         return;
00075     }
00076 
00077     /* return CONNACK when the client is sleeping */
00078     if ( client->isSleep() || client->isAwake() )
00079     {
00080         MQTTSNPacket* packet = new MQTTSNPacket();
00081         packet->setCONNACK(MQTTSN_RC_ACCEPTED);
00082         Event* ev = new Event();
00083         ev->setClientSendEvent(client, packet);
00084         _gateway->getClientSendQue()->post(ev);
00085 
00086         sendStoredPublish(client);
00087         return;
00088     }
00089 
00090     //* clear ConnectData of Client */
00091     Connect* connectData = client->getConnectData();
00092     memset(connectData, 0, sizeof(Connect));
00093     if ( !client->isAdapter() )
00094     {
00095         client->disconnected();
00096     }
00097 
00098     Topics* topics = client->getTopics();
00099 
00100     /* CONNECT was not sent yet. prepare Connect data */
00101     connectData->header.bits.type = CONNECT;
00102     connectData->clientID = client->getClientId();
00103     connectData->version = _gateway->getGWParams()->mqttVersion;
00104     connectData->keepAliveTimer = data.duration;
00105     connectData->flags.bits.will = data.willFlag;
00106 
00107     if ((const char*) _gateway->getGWParams()->loginId != nullptr && (const char*) _gateway->getGWParams()->password != 0)
00108     {
00109         connectData->flags.bits.password = 1;
00110         connectData->flags.bits.username = 1;
00111     }
00112 
00113     client->setSessionStatus(false);
00114     if (data.cleansession)
00115     {
00116         connectData->flags.bits.cleanstart = 1;
00117         /* reset the table of msgNo and TopicId pare */
00118         client->clearWaitedPubTopicId();
00119         client->clearWaitedSubTopicId();
00120 
00121         /* renew the TopicList */
00122         if (topics)
00123         {
00124             topics->eraseNormal();;
00125         }
00126         client->setSessionStatus(true);
00127     }
00128 
00129     if (data.willFlag)
00130     {
00131         /* create & send WILLTOPICREQ message to the client */
00132         MQTTSNPacket* reqTopic = new MQTTSNPacket();
00133         reqTopic->setWILLTOPICREQ();
00134         Event* evwr = new Event();
00135         evwr->setClientSendEvent(client, reqTopic);
00136 
00137         /* Send WILLTOPICREQ to the client */
00138         _gateway->getClientSendQue()->post(evwr);
00139     }
00140     else
00141     {
00142         /* CONNECT message was not qued in.
00143          * create CONNECT message & send it to the broker */
00144         MQTTGWPacket* mqMsg = new MQTTGWPacket();
00145         mqMsg->setCONNECT(client->getConnectData(), (unsigned char*)_gateway->getGWParams()->loginId, (unsigned char*)_gateway->getGWParams()->password);
00146         Event* ev1 = new Event();
00147         ev1->setBrokerSendEvent(client, mqMsg);
00148         _gateway->getBrokerSendQue()->post(ev1);
00149     }
00150 }
00151 
00152 /*
00153  *  WILLTOPIC
00154  */
00155 void MQTTSNConnectionHandler::handleWilltopic(Client* client, MQTTSNPacket* packet)
00156 {
00157     int willQos;
00158     uint8_t willRetain;
00159     MQTTSNString willTopic = MQTTSNString_initializer;
00160 
00161     if ( packet->getWILLTOPIC(&willQos, &willRetain, &willTopic) == 0 )
00162     {
00163         return;
00164     }
00165     client->setWillTopic(willTopic);
00166     Connect* connectData = client->getConnectData();
00167 
00168     /* add the connectData for MQTT CONNECT message */
00169     connectData->willTopic = client->getWillTopic();
00170     connectData->flags.bits.willQoS = willQos;
00171     connectData->flags.bits.willRetain = willRetain;
00172 
00173     /* Send WILLMSGREQ to the client */
00174     client->setWaitWillMsgFlg(true);
00175     MQTTSNPacket* reqMsg = new MQTTSNPacket();
00176     reqMsg->setWILLMSGREQ();
00177     Event* evt = new Event();
00178     evt->setClientSendEvent(client, reqMsg);
00179     _gateway->getClientSendQue()->post(evt);
00180 }
00181 
00182 /*
00183  *  WILLMSG
00184  */
00185 void MQTTSNConnectionHandler::handleWillmsg(Client* client, MQTTSNPacket* packet)
00186 {
00187     if ( !client->isWaitWillMsg() )
00188     {
00189         DEBUGLOG("     MQTTSNConnectionHandler::handleWillmsg  WaitWillMsgFlg is off.\n");
00190         return;
00191     }
00192 
00193     MQTTSNString willmsg  = MQTTSNString_initializer;
00194     Connect* connectData = client->getConnectData();
00195 
00196     if( client->isConnectSendable() )
00197     {
00198         /* save WillMsg in the client */
00199         if ( packet->getWILLMSG(&willmsg) == 0 )
00200         {
00201             return;
00202         }
00203         client->setWillMsg(willmsg);
00204 
00205         /* create CONNECT message */
00206         MQTTGWPacket* mqttPacket =  new MQTTGWPacket();
00207         connectData->willMsg = client->getWillMsg();
00208         mqttPacket->setCONNECT(connectData, (unsigned char*)_gateway->getGWParams()->loginId, (unsigned char*)_gateway->getGWParams()->password);
00209 
00210         /* Send CONNECT to the broker */
00211         Event* evt = new Event();
00212         evt->setBrokerSendEvent(client, mqttPacket);
00213         client->setWaitWillMsgFlg(false);   
00214         _gateway->getBrokerSendQue()->post(evt);
00215     }
00216 }
00217 
00218 /*
00219  *  DISCONNECT
00220  */
00221 void MQTTSNConnectionHandler::handleDisconnect(Client* client, MQTTSNPacket* packet)
00222 {
00223     uint16_t duration = 0;
00224 
00225     if ( packet->getDISCONNECT(&duration) != 0 )
00226     {
00227         if ( duration == 0 )
00228         {
00229             MQTTGWPacket* mqMsg = new MQTTGWPacket();
00230             mqMsg->setHeader(DISCONNECT);
00231             Event* ev = new Event();
00232             ev->setBrokerSendEvent(client, mqMsg);
00233             _gateway->getBrokerSendQue()->post(ev);
00234         }
00235     }
00236 
00237     MQTTSNPacket* snMsg = new MQTTSNPacket();
00238     snMsg->setDISCONNECT(0);
00239     Event* evt = new Event();
00240     evt->setClientSendEvent(client, snMsg);
00241     _gateway->getClientSendQue()->post(evt);
00242 }
00243 
00244 /*
00245  *  WILLTOPICUPD
00246  */
00247 void MQTTSNConnectionHandler::handleWilltopicupd(Client* client, MQTTSNPacket* packet)
00248 {
00249     /* send NOT_SUPPORTED responce to the client */
00250     MQTTSNPacket* respMsg = new MQTTSNPacket();
00251     respMsg->setWILLTOPICRESP(MQTTSN_RC_NOT_SUPPORTED);
00252     Event* evt = new Event();
00253     evt->setClientSendEvent(client, respMsg);
00254     _gateway->getClientSendQue()->post(evt);
00255 }
00256 
00257 /*
00258  *  WILLMSGUPD
00259  */
00260 void MQTTSNConnectionHandler::handleWillmsgupd(Client* client, MQTTSNPacket* packet)
00261 {
00262     /* send NOT_SUPPORTED responce to the client */
00263     MQTTSNPacket* respMsg = new MQTTSNPacket();
00264     respMsg->setWILLMSGRESP(MQTTSN_RC_NOT_SUPPORTED);
00265     Event* evt = new Event();
00266     evt->setClientSendEvent(client, respMsg);
00267     _gateway->getClientSendQue()->post(evt);
00268 }
00269 
00270 /*
00271  *  PINGREQ
00272  */
00273 void MQTTSNConnectionHandler::handlePingreq(Client* client, MQTTSNPacket* packet)
00274 {
00275     if ( ( client->isSleep() || client->isAwake() ) &&  client->getClientSleepPacket() )
00276     {
00277         sendStoredPublish(client);
00278         client->holdPingRequest();
00279     }
00280     else
00281     {
00282         /* send PINGREQ to the broker */
00283         client->resetPingRequest();
00284         MQTTGWPacket* pingreq = new MQTTGWPacket();
00285         pingreq->setHeader(PINGREQ);
00286         Event* evt = new Event();
00287         evt->setBrokerSendEvent(client, pingreq);
00288         _gateway->getBrokerSendQue()->post(evt);
00289     }
00290 }
00291 
00292 void MQTTSNConnectionHandler::sendStoredPublish(Client* client)
00293 {
00294     MQTTGWPacket* msg = nullptr;
00295 
00296     while  ( ( msg = client->getClientSleepPacket() ) != nullptr )
00297     {
00298         // ToDo:  This version can't re-send PUBLISH when PUBACK is not returned.
00299         client->deleteFirstClientSleepPacket();  // pop the que to delete element.
00300 
00301         Event* ev = new Event();
00302         ev->setBrokerRecvEvent(client, msg);
00303         _gateway->getPacketEventQue()->post(ev);
00304     }
00305 }