Jack Hansdampf / mbed-mqtt-GSOE1

Dependents:   ESP8266MQTT

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTSNGWClientRecvTask.cpp Source File

MQTTSNGWClientRecvTask.cpp

00001 /**************************************************************************************
00002  * Copyright (c) 2016, Tomoaki Yamaguchi
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors:
00014  *    Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
00015  **************************************************************************************/
00016 
00017 #include "MQTTSNGWClientRecvTask.h"
00018 #include "MQTTSNPacket.h"
00019 #include "MQTTSNGWQoSm1Proxy.h"
00020 #include "MQTTSNGWEncapsulatedPacket.h"
00021 #include <cstring>
00022 
00023 //#include "MQTTSNGWForwarder.h"
00024 
00025 using namespace MQTTSNGW;
00026 char* currentDateTime(void);
00027 /*=====================================
00028  Class ClientRecvTask
00029  =====================================*/
00030 ClientRecvTask::ClientRecvTask(Gateway* gateway)
00031 {
00032     _gateway = gateway;
00033     _gateway->attach((Thread*)this);
00034     _sensorNetwork = _gateway->getSensorNetwork();
00035 }
00036 
00037 ClientRecvTask::~ClientRecvTask()
00038 {
00039 
00040 }
00041 
00042 /**
00043  * Initialize SensorNetwork
00044  */
00045 void ClientRecvTask::initialize(int argc, char** argv)
00046 {
00047     if ( _sensorNetwork->initialize() < 0 )
00048     {
00049         throw Exception(" Can't open the sensor network.\n");
00050     }
00051 }
00052 
00053 /*
00054  * Receive a packet from clients via sensor netwwork
00055  * and generate a event to execute the packet handling  procedure
00056  * of MQTTSNPacketHandlingTask.
00057  */
00058 void ClientRecvTask::run()
00059 {
00060     Event* ev = nullptr;
00061     AdapterManager* adpMgr = _gateway->getAdapterManager();
00062     QoSm1Proxy* qosm1Proxy = adpMgr->getQoSm1Proxy();
00063     bool isAggrActive = adpMgr->isAggregaterActive();
00064     ClientList* clientList = _gateway->getClientList();
00065     EventQue* packetEventQue = _gateway->getPacketEventQue();
00066 
00067     char buf[128];
00068 
00069     while (true)
00070     {
00071         Client* client = nullptr;
00072         Forwarder* fwd = nullptr;
00073         WirelessNodeId nodeId;
00074 
00075         MQTTSNPacket* packet = new MQTTSNPacket();
00076         int packetLen = packet->recv(_sensorNetwork);
00077 
00078         if (CHK_SIGINT)
00079         {
00080             WRITELOG("%s ClientRecvTask   stopped.\n", currentDateTime());
00081             delete packet;
00082             return;
00083         }
00084 
00085         if (packetLen < 2 )
00086         {
00087             delete packet;
00088             continue;
00089         }
00090 
00091         if ( packet->getType() <= MQTTSN_ADVERTISE || packet->getType() == MQTTSN_GWINFO )
00092         {
00093             delete packet;
00094             continue;
00095         }
00096 
00097         if ( packet->getType() == MQTTSN_SEARCHGW )
00098         {
00099             /* write log and post Event */
00100             log(0, packet, 0);
00101             ev = new Event();
00102             ev->setBrodcastEvent(packet);
00103             packetEventQue->post(ev);
00104             continue;
00105         }
00106 
00107 
00108         SensorNetAddress* senderAddr = _gateway->getSensorNetwork()->getSenderAddress();
00109 
00110         if ( packet->getType() == MQTTSN_ENCAPSULATED )
00111         {
00112             fwd = _gateway->getAdapterManager()->getForwarderList()->getForwarder(senderAddr);
00113 
00114             if ( fwd != nullptr )
00115             {
00116                 MQTTSNString fwdName = MQTTSNString_initializer;
00117                 fwdName.cstring = const_cast<char *>( fwd->getName() );
00118                 log(0, packet, &fwdName);
00119 
00120                 /* get the packet from the encapsulation message */
00121                 MQTTSNGWEncapsulatedPacket  encap;
00122                 encap.desirialize(packet->getPacketData(), packet->getPacketLength());
00123                 nodeId.setId( encap.getWirelessNodeId() );
00124                 client = fwd->getClient(&nodeId);
00125                 packet = encap.getMQTTSNPacket();
00126             }
00127         }
00128         else
00129         {
00130             /*   Check the client belonging to QoS-1Proxy  ?    */
00131 
00132             if ( qosm1Proxy->isActive() )
00133             {
00134                  const char* clientName = qosm1Proxy->getClientId(senderAddr);
00135 
00136                 if ( clientName )
00137                 {
00138                     if ( !packet->isQoSMinusPUBLISH() )
00139                     {
00140                         client = qosm1Proxy->getClient();
00141                         log(clientName, packet);
00142                         WRITELOG("%s %s  %s can send only PUBLISH with QoS-1.%s\n", ERRMSG_HEADER, clientName, senderAddr->sprint(buf), ERRMSG_FOOTER);
00143                         delete packet;
00144                         continue;
00145                     }
00146                 }
00147             }
00148         }
00149 
00150         client = _gateway->getClientList()->getClient(senderAddr);
00151 
00152         if ( client )
00153         {
00154             /* write log and post Event */
00155             log(client, packet, 0);
00156             ev = new Event();
00157             ev->setClientRecvEvent(client,packet);
00158             packetEventQue->post(ev);
00159         }
00160         else
00161         {
00162             /* new client */
00163             if (packet->getType() == MQTTSN_CONNECT)
00164             {
00165                 MQTTSNPacket_connectData data;
00166                 memset(&data, 0, sizeof(MQTTSNPacket_connectData));
00167                 if ( !packet->getCONNECT(&data) )
00168                 {
00169                     log(0, packet, &data.clientID);
00170                     WRITELOG("%s CONNECT message form %s is incorrect.%s\n", ERRMSG_HEADER, senderAddr->sprint(buf), ERRMSG_FOOTER);
00171                     delete packet;
00172                     continue;
00173                 }
00174 
00175                 client = clientList->getClient(&data.clientID);
00176 
00177                 if ( fwd )
00178                 {
00179                     if ( client == nullptr )
00180                     {
00181                         /* create a new client */
00182                         client = clientList->createClient(0, &data.clientID, isAggrActive);
00183                     }
00184                     /* Add to af forwarded client list of forwarder. */
00185                     fwd->addClient(client, &nodeId);
00186                 }
00187                 else
00188                 {
00189                     if ( client )
00190                     {
00191                         /* Client exists. Set SensorNet Address of it. */
00192                         client->setClientAddress(senderAddr);
00193                     }
00194                     else
00195                     {
00196                         /* create a new client */
00197                         client = clientList->createClient(senderAddr, &data.clientID, isAggrActive);
00198                     }
00199                 }
00200 
00201                 log(client, packet, &data.clientID);
00202 
00203                 if (!client)
00204                 {
00205                     WRITELOG("%s Client(%s) was rejected. CONNECT message has been discarded.%s\n", ERRMSG_HEADER, senderAddr->sprint(buf), ERRMSG_FOOTER);
00206                     delete packet;
00207                     continue;
00208                 }
00209 
00210                 /* post Client RecvEvent */
00211                 ev = new Event();
00212                 ev->setClientRecvEvent(client, packet);
00213                 packetEventQue->post(ev);
00214             }
00215             else
00216             {
00217                 log(client, packet, 0);
00218                 if ( packet->getType() == MQTTSN_ENCAPSULATED )
00219                 {
00220                     WRITELOG("%s Forwarder(%s) is not declared by ClientList file. message has been discarded.%s\n", ERRMSG_HEADER, _sensorNetwork->getSenderAddress()->sprint(buf), ERRMSG_FOOTER);
00221                 }
00222                 else
00223                 {
00224                     WRITELOG("%s Client(%s) is not connecting. message has been discarded.%s\n", ERRMSG_HEADER, senderAddr->sprint(buf), ERRMSG_FOOTER);
00225                 }
00226                 delete packet;
00227             }
00228         }
00229     }
00230 }
00231 
00232 void ClientRecvTask::log(Client* client, MQTTSNPacket* packet, MQTTSNString* id)
00233 {
00234     const char* clientId;
00235     char cstr[MAX_CLIENTID_LENGTH + 1];
00236 
00237     if ( id )
00238     {
00239         if ( id->cstring )
00240         {
00241             strncpy(cstr, id->cstring, strlen(id->cstring) );
00242             clientId = cstr;
00243         }
00244         else
00245         {
00246             memset((void*)cstr, 0, id->lenstring.len + 1);
00247             strncpy(cstr, id->lenstring.data, id->lenstring.len );
00248             clientId = cstr;
00249         }
00250     }
00251     else if ( client )
00252     {
00253         clientId = client->getClientId();
00254     }
00255     else
00256     {
00257         clientId = UNKNOWNCL;
00258     }
00259 
00260     log(clientId,  packet);
00261 }
00262 
00263 void ClientRecvTask::log(const char* clientId, MQTTSNPacket* packet)
00264 {
00265     char pbuf[ SIZE_OF_LOG_PACKET * 3 + 1];
00266     char msgId[6];
00267 
00268     switch (packet->getType())
00269     {
00270     case MQTTSN_SEARCHGW:
00271         WRITELOG(FORMAT_Y_G_G_NL, currentDateTime(), packet->getName(), LEFTARROW, CLIENT, packet->print(pbuf));
00272         break;
00273     case MQTTSN_CONNECT:
00274     case MQTTSN_PINGREQ:
00275         WRITELOG(FORMAT_Y_G_G_NL, currentDateTime(), packet->getName(), LEFTARROW, clientId, packet->print(pbuf));
00276         break;
00277     case MQTTSN_DISCONNECT:
00278     case MQTTSN_WILLTOPICUPD:
00279     case MQTTSN_WILLMSGUPD:
00280     case MQTTSN_WILLTOPIC:
00281     case MQTTSN_WILLMSG:
00282         WRITELOG(FORMAT_Y_G_G, currentDateTime(), packet->getName(), LEFTARROW, clientId, packet->print(pbuf));
00283         break;
00284     case MQTTSN_PUBLISH:
00285     case MQTTSN_REGISTER:
00286     case MQTTSN_SUBSCRIBE:
00287     case MQTTSN_UNSUBSCRIBE:
00288         WRITELOG(FORMAT_G_MSGID_G_G_NL, currentDateTime(), packet->getName(), packet->getMsgId(msgId), LEFTARROW, clientId, packet->print(pbuf));
00289         break;
00290     case MQTTSN_REGACK:
00291     case MQTTSN_PUBACK:
00292     case MQTTSN_PUBREC:
00293     case MQTTSN_PUBREL:
00294     case MQTTSN_PUBCOMP:
00295         WRITELOG(FORMAT_G_MSGID_G_G, currentDateTime(), packet->getName(), packet->getMsgId(msgId), LEFTARROW, clientId, packet->print(pbuf));
00296         break;
00297     case MQTTSN_ENCAPSULATED:
00298             WRITELOG(FORMAT_Y_G_G, currentDateTime(), packet->getName(), LEFTARROW, clientId, packet->print(pbuf));
00299             break;
00300     default:
00301         WRITELOG(FORMAT_W_NL, currentDateTime(), packet->getName(), LEFTARROW, clientId, packet->print(pbuf));
00302         break;
00303     }
00304 }