Jack Hansdampf / mbed-mqtt-GSOE1

Dependents:   ESP8266MQTT

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTSNGWBrokerSendTask.cpp Source File

MQTTSNGWBrokerSendTask.cpp

00001 /**************************************************************************************
00002  * Copyright (c) 2016, Tomoaki Yamaguchi
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors:
00014  *    Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
00015  **************************************************************************************/
00016 
00017 #include <MQTTSNGWAdapterManager.h>
00018 #include "MQTTSNGWBrokerSendTask.h"
00019 #include "MQTTSNGWDefines.h"
00020 #include "MQTTSNGateway.h"
00021 #include "MQTTSNGWClient.h"
00022 #include "MQTTGWPacket.h"
00023 #include <string.h>
00024 
00025 using namespace std;
00026 using namespace MQTTSNGW;
00027 
00028 char* currentDateTime();
00029 #define ERRMSG_FORMAT "\n%s   \x1b[0m\x1b[31merror:\x1b[0m\x1b[37m Can't Xmit to the Broker. errno=%d\n"
00030 
00031 /*=====================================
00032  Class BrokerSendTask
00033  =====================================*/
00034 BrokerSendTask::BrokerSendTask(Gateway* gateway)
00035 {
00036     _gateway = gateway;
00037     _gateway->attach((Thread*)this);
00038     _gwparams = nullptr;
00039     _light = nullptr;
00040 }
00041 
00042 BrokerSendTask::~BrokerSendTask()
00043 {
00044 
00045 }
00046 
00047 /**
00048  *  Initialize attributs of this class
00049  */
00050 void BrokerSendTask::initialize(int argc, char** argv)
00051 {
00052     _gwparams = _gateway->getGWParams();
00053     _light = _gateway->getLightIndicator();
00054 }
00055 
00056 /**
00057  *  connect to the broker and send MQTT messges
00058  */
00059 void BrokerSendTask::run()
00060 {
00061     Event* ev = nullptr;
00062     MQTTGWPacket* packet = nullptr;
00063     Client* client = nullptr;
00064     AdapterManager* adpMgr = _gateway->getAdapterManager();
00065     int rc = 0;
00066 
00067     while (true)
00068     {
00069         ev = _gateway->getBrokerSendQue()->wait();
00070 
00071         if ( ev->getEventType() == EtStop )
00072         {
00073             WRITELOG("%s BrokerSendTask   stopped.\n", currentDateTime());
00074             delete ev;
00075             return;
00076         }
00077 
00078         if ( ev->getEventType() == EtBrokerSend)
00079         {
00080             client = ev->getClient();
00081             packet = ev->getMQTTGWPacket();
00082 
00083             /* Check Client is managed by Adapters */
00084             client = adpMgr->getClient(*client);
00085 
00086             if ( packet->getType() == CONNECT && client->getNetwork()->isValid() )
00087             {
00088                 client->getNetwork()->close();
00089             }
00090 
00091             if ( !client->getNetwork()->isValid() )
00092             {
00093                 /* connect to the broker and send a packet */
00094 
00095                 if (client->isSecureNetwork())
00096                 {
00097                     rc = client->getNetwork()->connect((const char*)_gwparams->brokerName, (const char*)_gwparams->portSecure, (const char*)_gwparams->rootCApath,
00098                             (const char*)_gwparams->rootCAfile, (const char*)_gwparams->certKey, (const char*)_gwparams->privateKey);
00099                 }
00100                 else
00101                 {
00102                     rc = client->getNetwork()->connect((const char*)_gwparams->brokerName, (const char*)_gwparams->port);
00103                 }
00104 
00105                 if ( !rc )
00106                 {
00107                     /* disconnect the broker and the client */
00108                     WRITELOG("%s BrokerSendTask: %s can't connect to the broker. errno=%d %s %s\n",
00109                             ERRMSG_HEADER, client->getClientId(), errno, strerror(errno), ERRMSG_FOOTER);
00110                     delete ev;
00111                     client->getNetwork()->close();
00112                     continue;
00113                 }
00114             }
00115 
00116             /* send a packet */
00117             _light->blueLight(true);
00118             if ( (rc = packet->send(client->getNetwork())) > 0 )
00119             {
00120                 if ( packet->getType() == CONNECT )
00121                 {
00122                     client->connectSended();
00123                 }
00124                 log(client, packet);
00125             }
00126             else
00127             {
00128                 WRITELOG("%s BrokerSendTask: %s can't send a packet to the broker. errno=%d %s %s\n",
00129                         ERRMSG_HEADER, client->getClientId(), rc == -1 ? errno : 0, strerror(errno), ERRMSG_FOOTER);
00130                 client->getNetwork()->close();
00131 
00132                 /* Disconnect the client */
00133                 packet = new MQTTGWPacket();
00134                 packet->setHeader(DISCONNECT);
00135                 Event* ev1 = new Event();
00136                 ev1->setBrokerRecvEvent(client, packet);
00137                 _gateway->getPacketEventQue()->post(ev1);
00138             }
00139 
00140             _light->blueLight(false);
00141         }
00142         delete ev;
00143     }
00144 }
00145 
00146 
00147 /**
00148  *  write message content into stdout or Ringbuffer
00149  */
00150 void BrokerSendTask::log(Client* client, MQTTGWPacket* packet)
00151 {
00152     char pbuf[(SIZE_OF_LOG_PACKET + 5 )* 3];
00153     char msgId[6];
00154 
00155     switch (packet->getType())
00156     {
00157     case CONNECT:
00158         WRITELOG(FORMAT_Y_Y_W, currentDateTime(), packet->getName(), RIGHTARROWB, client->getClientId(), packet->print(pbuf));
00159         break;
00160     case PUBLISH:
00161         WRITELOG(FORMAT_W_MSGID_Y_W, currentDateTime(), packet->getName(), packet->getMsgId(msgId), RIGHTARROWB, client->getClientId(), packet->print(pbuf));
00162         break;
00163     case SUBSCRIBE:
00164     case UNSUBSCRIBE:
00165     case PUBACK:
00166     case PUBREC:
00167     case PUBREL:
00168     case PUBCOMP:
00169         WRITELOG(FORMAT_W_MSGID_Y_W, currentDateTime(), packet->getName(), packet->getMsgId(msgId), RIGHTARROWB, client->getClientId(), packet->print(pbuf));
00170         break;
00171     case PINGREQ:
00172         WRITELOG(FORMAT_Y_Y_W, currentDateTime(), packet->getName(), RIGHTARROWB, client->getClientId(), packet->print(pbuf));
00173         break;
00174     case DISCONNECT:
00175         WRITELOG(FORMAT_Y_Y_W, currentDateTime(), packet->getName(), RIGHTARROWB, client->getClientId(), packet->print(pbuf));
00176         break;
00177     default:
00178         break;
00179     }
00180 }
00181