Jack Hansdampf / mbed-mqtt-GSOE1

Dependents:   ESP8266MQTT

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTSNGWPacketHandleTask.cpp Source File

MQTTSNGWPacketHandleTask.cpp

00001 /**************************************************************************************
00002  * Copyright (c) 2016, Tomoaki Yamaguchi
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors:
00014  *    Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
00015  *    Tieto Poland Sp. z o.o. - Gateway improvements
00016  **************************************************************************************/
00017 
00018 #include "MQTTSNGWDefines.h"
00019 #include "MQTTSNGWPacketHandleTask.h"
00020 #include "MQTTSNGWProcess.h"
00021 #include "MQTTGWPacket.h"
00022 #include "MQTTSNGWClient.h"
00023 #include "MQTTSNGWProcess.h"
00024 #include "MQTTSNGWAdapterManager.h"
00025 #include "MQTTGWConnectionHandler.h"
00026 #include "MQTTGWPublishHandler.h"
00027 #include "MQTTGWSubscribeHandler.h"
00028 #include "MQTTSNGWConnectionHandler.h"
00029 #include "MQTTSNGWPublishHandler.h"
00030 #include "MQTTSNGWSubscribeHandler.h"
00031 #include "Timer.h"
00032 #include "MQTTSNAggregateConnectionHandler.h"
00033 
00034 #include <string.h>
00035 
00036 using namespace std;
00037 using namespace MQTTSNGW;
00038 
00039 #define EVENT_QUE_TIME_OUT  2000      // 2000 msecs
00040 char* currentDateTime(void);
00041 /*=====================================
00042  Class PacketHandleTask
00043  =====================================*/
00044 
00045 PacketHandleTask::PacketHandleTask(Gateway* gateway)
00046 {
00047     _gateway = gateway;
00048     _gateway->attach((Thread*)this);
00049     _mqttConnection = new MQTTGWConnectionHandler(_gateway);
00050     _mqttPublish = new MQTTGWPublishHandler(_gateway);
00051     _mqttSubscribe = new MQTTGWSubscribeHandler(_gateway);
00052     _mqttsnConnection = new MQTTSNConnectionHandler(_gateway);
00053     _mqttsnPublish = new MQTTSNPublishHandler(_gateway);
00054     _mqttsnSubscribe = new MQTTSNSubscribeHandler(_gateway);
00055 
00056     _mqttsnAggrConnection = new MQTTSNAggregateConnectionHandler(_gateway);
00057 }
00058 
00059 /**
00060  *  Destructor is called by Gateway's destructor indirectly.
00061  */
00062 PacketHandleTask::~PacketHandleTask()
00063 {
00064     if ( _mqttConnection )
00065     {
00066         delete _mqttConnection;
00067     }
00068     if ( _mqttPublish )
00069     {
00070         delete _mqttPublish;
00071     }
00072     if ( _mqttSubscribe )
00073     {
00074         delete _mqttSubscribe;
00075     }
00076     if ( _mqttsnConnection )
00077     {
00078         delete _mqttsnConnection;
00079     }
00080     if ( _mqttsnPublish )
00081     {
00082         delete _mqttsnPublish;
00083     }
00084     if ( _mqttsnSubscribe )
00085     {
00086         delete _mqttsnSubscribe;
00087     }
00088 
00089     if ( _mqttsnAggrConnection )
00090     {
00091         delete _mqttsnAggrConnection;
00092     }
00093 }
00094 
00095 void PacketHandleTask::run()
00096 {
00097     Event* ev = nullptr;
00098     EventQue* eventQue = _gateway->getPacketEventQue();
00099     AdapterManager* adpMgr = _gateway->getAdapterManager();
00100 
00101     Client* client = nullptr;
00102     MQTTSNPacket* snPacket = nullptr;
00103     MQTTGWPacket* brPacket = nullptr;
00104     char msgId[6];
00105     memset(msgId, 0, 6);
00106 
00107     _advertiseTimer.start(_gateway->getGWParams()->keepAlive * 1000UL);
00108 
00109     while (true)
00110     {
00111         /* wait Event */
00112         ev = eventQue->timedwait(EVENT_QUE_TIME_OUT);
00113 
00114         if (ev->getEventType() == EtStop)
00115         {
00116             WRITELOG("%s PacketHandleTask stopped.\n", currentDateTime());
00117             delete ev;
00118             return;
00119         }
00120 
00121         if (ev->getEventType() == EtTimeout)
00122         {
00123             /*------ Check Keep Alive Timer & send Advertise ------*/
00124             if (_advertiseTimer.isTimeup())
00125             {
00126                 _mqttsnConnection->sendADVERTISE();
00127                 _advertiseTimer.start(_gateway->getGWParams()->keepAlive * 1000UL);
00128             }
00129 
00130             /*------ Check Adapters   Connect or PINGREQ ------*/
00131             adpMgr->checkConnection();
00132         }
00133 
00134         /*------    Handle SEARCHGW Message     ---------*/
00135         else if (ev->getEventType() == EtBroadcast)
00136         {
00137             snPacket = ev->getMQTTSNPacket();
00138             _mqttsnConnection->handleSearchgw(snPacket);
00139         }
00140 
00141         /*------    Handle Messages form Clients      ---------*/
00142         else if (ev->getEventType() == EtClientRecv)
00143         {
00144             client = ev->getClient();
00145             snPacket = ev->getMQTTSNPacket();
00146 
00147             DEBUGLOG("     PacketHandleTask gets %s %s from the client.\n", snPacket->getName(), snPacket->getMsgId(msgId));
00148 
00149             if ( adpMgr->isAggregatedClient(client) )
00150             {
00151                 aggregatePacketHandler(client, snPacket);
00152             }
00153             else
00154             {
00155                 transparentPacketHandler(client, snPacket);
00156             }
00157 
00158 
00159             /* Reset the Timer for PINGREQ. */
00160             client->updateStatus(snPacket);
00161         }
00162         /*------  Handle Messages form Broker      ---------*/
00163         else if ( ev->getEventType() == EtBrokerRecv )
00164         {
00165             client = ev->getClient();
00166             brPacket = ev->getMQTTGWPacket();
00167             DEBUGLOG("     PacketHandleTask gets %s %s from the broker.\n", brPacket->getName(), brPacket->getMsgId(msgId));
00168 
00169 
00170             if ( client->isAggregater() )
00171             {
00172                 aggregatePacketHandler(client, brPacket);
00173             }
00174             else
00175             {
00176                 transparentPacketHandler(client, brPacket);
00177             }
00178         }
00179         delete ev;
00180     }
00181 }
00182 
00183 
00184 
00185 void PacketHandleTask::aggregatePacketHandler(Client*client, MQTTSNPacket* packet)
00186 {
00187     switch (packet->getType())
00188     {
00189     case MQTTSN_CONNECT:
00190         _mqttsnAggrConnection->handleConnect(client, packet);
00191         break;
00192     case MQTTSN_WILLTOPIC:
00193         _mqttsnConnection->handleWilltopic(client, packet);
00194         break;
00195     case MQTTSN_WILLMSG:
00196         _mqttsnAggrConnection->handleWillmsg(client, packet);
00197         break;
00198     case MQTTSN_DISCONNECT:
00199         _mqttsnAggrConnection->handleDisconnect(client, packet);
00200         break;
00201     case MQTTSN_WILLMSGUPD:
00202         _mqttsnConnection->handleWillmsgupd(client, packet);
00203         break;
00204     case MQTTSN_PINGREQ:
00205         _mqttsnAggrConnection->handlePingreq(client, packet);
00206         break;
00207     case MQTTSN_PUBLISH:
00208         _mqttsnPublish->handleAggregatePublish(client, packet);
00209         break;
00210     case MQTTSN_PUBACK:
00211         _mqttsnPublish->handleAggregateAck(client, packet, MQTTSN_PUBACK);
00212         break;
00213     case MQTTSN_PUBREC:
00214         _mqttsnPublish->handleAggregateAck(client, packet, MQTTSN_PUBREC);
00215         break;
00216     case MQTTSN_PUBREL:
00217         _mqttsnPublish->handleAggregateAck(client, packet, MQTTSN_PUBREL);
00218         break;
00219     case MQTTSN_PUBCOMP:
00220         _mqttsnPublish->handleAggregateAck(client, packet, MQTTSN_PUBCOMP);
00221         break;
00222     case MQTTSN_REGISTER:
00223         _mqttsnPublish->handleRegister(client, packet);
00224         break;
00225     case MQTTSN_REGACK:
00226         _mqttsnPublish->handleRegAck(client, packet);
00227         break;
00228     case MQTTSN_SUBSCRIBE:
00229         _mqttsnSubscribe->handleAggregateSubscribe(client, packet);
00230         break;
00231     case MQTTSN_UNSUBSCRIBE:
00232         _mqttsnSubscribe->handleAggregateUnsubscribe(client, packet);
00233         break;
00234     default:
00235         break;
00236     }
00237 }
00238 
00239 
00240 void PacketHandleTask::aggregatePacketHandler(Client*client, MQTTGWPacket* packet)
00241 {
00242     switch (packet->getType())
00243     {
00244     case CONNACK:
00245         _mqttConnection->handleConnack(client, packet);
00246         break;
00247     case PINGRESP:
00248         _mqttConnection->handlePingresp(client, packet);
00249         break;
00250     case PUBLISH:
00251         _mqttPublish->handleAggregatePublish(client, packet);
00252         break;
00253     case PUBACK:
00254         _mqttPublish->handleAggregatePuback(client, packet);
00255         break;
00256     case PUBREC:
00257         _mqttPublish->handleAggregateAck(client, packet, PUBREC);
00258         break;
00259     case PUBREL:
00260         _mqttPublish->handleAggregatePubrel(client, packet);
00261         break;
00262     case PUBCOMP:
00263         _mqttPublish->handleAggregateAck(client, packet, PUBCOMP);
00264         break;
00265     case SUBACK:
00266         _mqttSubscribe->handleAggregateSuback(client, packet);
00267         break;
00268     case UNSUBACK:
00269         _mqttSubscribe->handleAggregateUnsuback(client, packet);
00270         break;
00271     default:
00272         break;
00273     }
00274 }
00275 
00276 void PacketHandleTask::transparentPacketHandler(Client*client, MQTTSNPacket* packet)
00277 {
00278     switch (packet->getType())
00279     {
00280     case MQTTSN_CONNECT:
00281         _mqttsnConnection->handleConnect(client, packet);
00282         break;
00283     case MQTTSN_WILLTOPIC:
00284         _mqttsnConnection->handleWilltopic(client, packet);
00285         break;
00286     case MQTTSN_WILLMSG:
00287         _mqttsnConnection->handleWillmsg(client, packet);
00288         break;
00289     case MQTTSN_DISCONNECT:
00290         _mqttsnConnection->handleDisconnect(client, packet);
00291         break;
00292     case MQTTSN_WILLMSGUPD:
00293         _mqttsnConnection->handleWillmsgupd(client, packet);
00294         break;
00295     case MQTTSN_PINGREQ:
00296         _mqttsnConnection->handlePingreq(client, packet);
00297         break;
00298     case MQTTSN_PUBLISH:
00299         _mqttsnPublish->handlePublish(client, packet);
00300         break;
00301     case MQTTSN_PUBACK:
00302         _mqttsnPublish->handlePuback(client, packet);
00303         break;
00304     case MQTTSN_PUBREC:
00305         _mqttsnPublish->handleAck(client, packet, PUBREC);
00306         break;
00307     case MQTTSN_PUBREL:
00308         _mqttsnPublish->handleAck(client, packet, PUBREL);
00309         break;
00310     case MQTTSN_PUBCOMP:
00311         _mqttsnPublish->handleAck(client, packet, PUBCOMP);
00312         break;
00313     case MQTTSN_REGISTER:
00314         _mqttsnPublish->handleRegister(client, packet);
00315         break;
00316     case MQTTSN_REGACK:
00317         _mqttsnPublish->handleRegAck(client, packet);
00318         break;
00319     case MQTTSN_SUBSCRIBE:
00320         _mqttsnSubscribe->handleSubscribe(client, packet);
00321         break;
00322     case MQTTSN_UNSUBSCRIBE:
00323         _mqttsnSubscribe->handleUnsubscribe(client, packet);
00324         break;
00325     default:
00326         break;
00327     }
00328 }
00329 
00330 
00331 void PacketHandleTask::transparentPacketHandler(Client*client, MQTTGWPacket* packet)
00332 {
00333     switch (packet->getType())
00334     {
00335     case CONNACK:
00336         _mqttConnection->handleConnack(client, packet);
00337         break;
00338     case PINGRESP:
00339         _mqttConnection->handlePingresp(client, packet);
00340         break;
00341     case PUBLISH:
00342         _mqttPublish->handlePublish(client, packet);
00343         break;
00344     case PUBACK:
00345         _mqttPublish->handlePuback(client, packet);
00346         break;
00347     case PUBREC:
00348         _mqttPublish->handleAck(client, packet, PUBREC);
00349         break;
00350     case PUBREL:
00351         _mqttPublish->handleAck(client, packet, PUBREL);
00352         break;
00353     case PUBCOMP:
00354         _mqttPublish->handleAck(client, packet, PUBCOMP);
00355         break;
00356     case SUBACK:
00357         _mqttSubscribe->handleSuback(client, packet);
00358         break;
00359     case UNSUBACK:
00360         _mqttSubscribe->handleUnsuback(client, packet);
00361         break;
00362     default:
00363         break;
00364     }
00365 }
00366