Jack Hansdampf / mbed-mqtt-GSOE1

Dependents:   ESP8266MQTT

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTSNAggregateConnectionHandler.cpp Source File

MQTTSNAggregateConnectionHandler.cpp

00001 /**************************************************************************************
00002  * Copyright (c) 2018, Tomoaki Yamaguchi
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors:
00014  *    Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
00015  **************************************************************************************/
00016 
00017 #include "MQTTSNAggregateConnectionHandler.h"
00018 #include "MQTTSNGateway.h"
00019 #include "MQTTSNGWPacket.h"
00020 #include "MQTTGWPacket.h"
00021 #include <string.h>
00022 
00023 using namespace std;
00024 using namespace MQTTSNGW;
00025 
00026 /*=====================================
00027  Class MQTTSNAggregateConnectionHandler
00028  =====================================*/
00029 MQTTSNAggregateConnectionHandler::MQTTSNAggregateConnectionHandler(Gateway* gateway)
00030 {
00031     _gateway = gateway;
00032 }
00033 
00034 MQTTSNAggregateConnectionHandler::~MQTTSNAggregateConnectionHandler()
00035 {
00036 
00037 }
00038 
00039 
00040 /*
00041  *  CONNECT
00042  */
00043 void MQTTSNAggregateConnectionHandler::handleConnect(Client* client, MQTTSNPacket* packet)
00044 {
00045     MQTTSNPacket_connectData data;
00046     if ( packet->getCONNECT(&data) == 0 )
00047     {
00048         return;
00049     }
00050 
00051     /* return CONNACK when the client is sleeping */
00052     if ( client->isSleep() || client->isAwake() )
00053     {
00054         MQTTSNPacket* packet = new MQTTSNPacket();
00055         packet->setCONNACK(MQTTSN_RC_ACCEPTED);
00056         Event* ev = new Event();
00057         ev->setClientSendEvent(client, packet);
00058         _gateway->getClientSendQue()->post(ev);
00059         sendStoredPublish(client);
00060         return;
00061     }
00062 
00063     //* clear ConnectData of Client */
00064     Connect* connectData = client->getConnectData();
00065     memset(connectData, 0, sizeof(Connect));
00066 
00067     client->disconnected();
00068 
00069     Topics* topics = client->getTopics();
00070 
00071     /* CONNECT was not sent yet. prepare Connect data */
00072 
00073 
00074     client->setSessionStatus(false);
00075     if (data.cleansession)
00076     {
00077         /* reset the table of msgNo and TopicId pare */
00078         client->clearWaitedPubTopicId();
00079         client->clearWaitedSubTopicId();
00080 
00081         /* renew the TopicList */
00082         if (topics)
00083         {
00084             _gateway->getAdapterManager()->removeAggregateTopicList(topics, client);
00085             topics->eraseNormal();
00086         }
00087         client->setSessionStatus(true);
00088     }
00089 
00090     if (data.willFlag)
00091     {
00092         /* create & send WILLTOPICREQ message to the client */
00093         MQTTSNPacket* reqTopic = new MQTTSNPacket();
00094         reqTopic->setWILLTOPICREQ();
00095         Event* evwr = new Event();
00096         evwr->setClientSendEvent(client, reqTopic);
00097 
00098         /* Send WILLTOPICREQ to the client */
00099         _gateway->getClientSendQue()->post(evwr);
00100     }
00101     else
00102     {
00103         /* create CONNACK & send it to the client */
00104         MQTTSNPacket* packet = new MQTTSNPacket();
00105         packet->setCONNACK(MQTTSN_RC_ACCEPTED);
00106         Event* ev = new Event();
00107         ev->setClientSendEvent(client, packet);
00108         _gateway->getClientSendQue()->post(ev);
00109         client->connackSended(MQTTSN_RC_ACCEPTED);
00110         sendStoredPublish(client);
00111         return;
00112     }
00113 }
00114 
00115 
00116 /*
00117  *  WILLMSG
00118  */
00119 void MQTTSNAggregateConnectionHandler::handleWillmsg(Client* client, MQTTSNPacket* packet)
00120 {
00121     if ( !client->isWaitWillMsg() )
00122     {
00123         DEBUGLOG("     MQTTSNConnectionHandler::handleWillmsg  WaitWillMsgFlg is off.\n");
00124         return;
00125     }
00126 
00127     MQTTSNString willmsg  = MQTTSNString_initializer;
00128     //Connect* connectData = client->getConnectData();
00129 
00130     if( client->isConnectSendable() )
00131     {
00132         /* save WillMsg in the client */
00133         if ( packet->getWILLMSG(&willmsg) == 0 )
00134         {
00135             return;
00136         }
00137         client->setWillMsg(willmsg);
00138 
00139             /* Send CONNACK to the client */
00140         MQTTSNPacket* packet = new MQTTSNPacket();
00141         packet->setCONNACK(MQTTSN_RC_ACCEPTED);
00142         Event* ev = new Event();
00143         ev->setClientSendEvent(client, packet);
00144         _gateway->getClientSendQue()->post(ev);
00145 
00146         sendStoredPublish(client);
00147         return;
00148     }
00149 }
00150 
00151 /*
00152  *  DISCONNECT
00153  */
00154 void MQTTSNAggregateConnectionHandler::handleDisconnect(Client* client, MQTTSNPacket* packet)
00155 {
00156     MQTTSNPacket* snMsg = new MQTTSNPacket();
00157     snMsg->setDISCONNECT(0);
00158     Event* evt = new Event();
00159     evt->setClientSendEvent(client, snMsg);
00160     _gateway->getClientSendQue()->post(evt);
00161 }
00162 
00163 /*
00164  *  PINGREQ
00165  */
00166 void MQTTSNAggregateConnectionHandler::handlePingreq(Client* client, MQTTSNPacket* packet)
00167 {
00168     if ( ( client->isSleep() || client->isAwake() ) &&  client->getClientSleepPacket() )
00169     {
00170         sendStoredPublish(client);
00171         client->holdPingRequest();
00172     }
00173     else
00174     {
00175         /* create and send PINGRESP to the PacketHandler */
00176         client->resetPingRequest();
00177 
00178         MQTTGWPacket* pingresp = new MQTTGWPacket();
00179 
00180         pingresp->setHeader(PINGRESP);
00181 
00182         Event* evt = new Event();
00183         evt->setBrokerRecvEvent(client, pingresp);
00184         _gateway->getPacketEventQue()->post(evt);
00185     }
00186 }
00187 
00188 void MQTTSNAggregateConnectionHandler::sendStoredPublish(Client* client)
00189 {
00190     MQTTGWPacket* msg = nullptr;
00191 
00192     while  ( ( msg = client->getClientSleepPacket() ) != nullptr )
00193     {
00194         // ToDo:  This version can't re-send PUBLISH when PUBACK is not returned.
00195         client->deleteFirstClientSleepPacket();  // pop the que to delete element.
00196 
00197         Event* ev = new Event();
00198         ev->setBrokerRecvEvent(client, msg);
00199         _gateway->getPacketEventQue()->post(ev);
00200     }
00201 }
00202