Jack Hansdampf / mbed-mqtt-GSOE1

Dependents:   ESP8266MQTT

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTSNGWBrokerRecvTask.cpp Source File

MQTTSNGWBrokerRecvTask.cpp

00001 /**************************************************************************************
00002  * Copyright (c) 2016, Tomoaki Yamaguchi
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors:
00014  *    Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
00015  **************************************************************************************/
00016 
00017 #include "MQTTSNGWBrokerRecvTask.h"
00018 #include "MQTTSNGWClient.h"
00019 #include "MQTTSNGWClientList.h"
00020 #include <unistd.h>
00021 
00022 using namespace std;
00023 using namespace MQTTSNGW;
00024 
00025 char* currentDateTime(void);
00026 
00027 /*=====================================
00028  Class BrokerRecvTask
00029  =====================================*/
00030 BrokerRecvTask::BrokerRecvTask(Gateway* gateway)
00031 {
00032     _gateway = gateway;
00033     _gateway->attach((Thread*)this);
00034     _light = nullptr;
00035 }
00036 
00037 BrokerRecvTask::~BrokerRecvTask()
00038 {
00039 
00040 }
00041 
00042 /**
00043  *  Initialize attributs of this class
00044  */
00045 void BrokerRecvTask::initialize(int argc, char** argv)
00046 {
00047     _light = _gateway->getLightIndicator();
00048 }
00049 
00050 /**
00051  *  receive a MQTT messge from the broker and post a event.
00052  */
00053 void BrokerRecvTask::run(void)
00054 {
00055     struct timeval timeout;
00056     MQTTGWPacket* packet = nullptr;
00057     int rc;
00058     Event* ev = nullptr;
00059     fd_set rset;
00060     fd_set wset;
00061 
00062     while (true)
00063     {
00064         _light->blueLight(false);
00065         if (CHK_SIGINT)
00066         {
00067             WRITELOG("%s BrokerRecvTask   stopped.\n", currentDateTime());
00068             return;
00069         }
00070         timeout.tv_sec = 0;
00071         timeout.tv_usec = 500000;    // 500 msec
00072         FD_ZERO(&rset);
00073         FD_ZERO(&wset);
00074         int maxSock = 0;
00075         int sockfd = 0;
00076 
00077         /* Prepare sockets list to read */
00078         Client* client = _gateway->getClientList()->getClient(0);
00079 
00080         while ( client )
00081         {
00082             if (client->getNetwork()->isValid())
00083             {
00084                 sockfd = client->getNetwork()->getSock();
00085                 FD_SET(sockfd, &rset);
00086                 FD_SET(sockfd, &wset);
00087                 if (sockfd > maxSock)
00088                 {
00089                     maxSock = sockfd;
00090                 }
00091             }
00092             client = client->getNextClient();
00093         }
00094 
00095         if (maxSock == 0)
00096         {
00097             usleep(500 * 1000);
00098         }
00099         else
00100         {
00101             /* Check sockets is ready to read */
00102             int activity = select(maxSock + 1, &rset, 0, 0, &timeout);
00103             if (activity > 0)
00104             {
00105                 client = _gateway->getClientList()->getClient(0);
00106 
00107                 while ( client )
00108                 {
00109                     _light->blueLight(false);
00110                     if (client->getNetwork()->isValid())
00111                     {
00112                         int sockfd = client->getNetwork()->getSock();
00113                         if (FD_ISSET(sockfd, &rset))
00114                         {
00115                             packet = new MQTTGWPacket();
00116                             rc = 0;
00117                             /* read sockets */
00118                             _light->blueLight(true);
00119                             rc = packet->recv(client->getNetwork());
00120                             if ( rc > 0 )
00121                             {
00122                                 if ( log(client, packet) == -1 )
00123                                 {
00124                                     delete packet;
00125                                     goto nextClient;
00126                                 }
00127 
00128                                 /* post a BrokerRecvEvent */
00129                                 ev = new Event();
00130                                 ev->setBrokerRecvEvent(client, packet);
00131                                 _gateway->getPacketEventQue()->post(ev);
00132                             }
00133                             else
00134                             {
00135                                 if ( rc == 0 )  // Disconnected
00136                                 {
00137                                     client->getNetwork()->close();
00138                                     delete packet;
00139 
00140                                     /* delete client when the client is not authorized & session is clean */
00141                                     _gateway->getClientList()->erase(client);
00142 
00143                                     if ( client )
00144                                     {
00145                                         client = client->getNextClient();
00146                                     }
00147                                     continue;
00148                                 }
00149                                 else if (rc == -1)
00150                                 {
00151                                     WRITELOG("%s BrokerRecvTask can't receive a packet from the broker errno=%d %s%s\n", ERRMSG_HEADER, errno, client->getClientId(), ERRMSG_FOOTER);
00152                                 }
00153                                 else if ( rc == -2 )
00154                                 {
00155                                     WRITELOG("%s BrokerRecvTask receive invalid length of packet from the broker.  DISCONNECT  %s %s\n", ERRMSG_HEADER, client->getClientId(),ERRMSG_FOOTER);
00156                                 }
00157                                 else if ( rc == -3 )
00158                                 {
00159                                     WRITELOG("%s BrokerRecvTask can't get memories for the packet %s%s\n", ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER);
00160                                 }
00161 
00162                                 delete packet;
00163 
00164                                 if ( (rc == -1 || rc == -2) && client->isActive() )
00165                                 {
00166                                     /* disconnect the client */
00167                                     packet = new MQTTGWPacket();
00168                                     packet->setHeader(DISCONNECT);
00169                                     ev = new Event();
00170                                     ev->setBrokerRecvEvent(client, packet);
00171                                     _gateway->getPacketEventQue()->post(ev);
00172                                 }
00173                             }
00174                         }
00175                     }
00176                     nextClient:
00177                     client = client->getNextClient();
00178                 }
00179             }
00180         }
00181     }
00182 }
00183 
00184 /**
00185  *  write message content into stdout or Ringbuffer
00186  */
00187 int BrokerRecvTask::log(Client* client, MQTTGWPacket* packet)
00188 {
00189     char pbuf[(SIZE_OF_LOG_PACKET + 5 )* 3];
00190     char msgId[6];
00191     int rc = 0;
00192 
00193     switch (packet->getType())
00194     {
00195     case CONNACK:
00196         WRITELOG(FORMAT_Y_Y_W, currentDateTime(), packet->getName(), LEFTARROWB, client->getClientId(), packet->print(pbuf));
00197         break;
00198     case PUBLISH:
00199         WRITELOG(FORMAT_W_MSGID_Y_W_NL, currentDateTime(), packet->getName(), packet->getMsgId(msgId), LEFTARROWB, client->getClientId(), packet->print(pbuf));
00200         break;
00201     case PUBACK:
00202     case PUBREC:
00203     case PUBREL:
00204     case PUBCOMP:
00205         WRITELOG(FORMAT_W_MSGID_Y_W, currentDateTime(), packet->getName(), packet->getMsgId(msgId), LEFTARROWB, client->getClientId(), packet->print(pbuf));
00206         break;
00207     case SUBACK:
00208     case UNSUBACK:
00209         WRITELOG(FORMAT_W_MSGID_Y_W, currentDateTime(), packet->getName(), packet->getMsgId(msgId), LEFTARROWB, client->getClientId(), packet->print(pbuf));
00210         break;
00211     case PINGRESP:
00212         WRITELOG(FORMAT_Y_Y_W, currentDateTime(), packet->getName(), LEFTARROWB, client->getClientId(), packet->print(pbuf));
00213         break;
00214     default:
00215         WRITELOG("Type=%x\n", packet->getType());
00216         rc = -1;
00217         break;
00218     }
00219     return rc;
00220 }