Jack Hansdampf / mbed-mqtt-GSOE1

Dependents:   ESP8266MQTT

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTSNGateway.cpp Source File

MQTTSNGateway.cpp

00001 /**************************************************************************************
00002  * Copyright (c) 2016, Tomoaki Yamaguchi
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors:
00014  *    Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
00015  **************************************************************************************/
00016 #include "MQTTSNGWDefines.h"
00017 #include "MQTTSNGateway.h"
00018 #include "SensorNetwork.h"
00019 #include "MQTTSNGWProcess.h"
00020 #include "MQTTSNGWVersion.h"
00021 #include "MQTTSNGWQoSm1Proxy.h"
00022 #include "MQTTSNGWClient.h"
00023 #include <string.h>
00024 using namespace MQTTSNGW;
00025 
00026 char* currentDateTime(void);
00027 
00028 /*=====================================
00029  Class Gateway
00030  =====================================*/
00031 MQTTSNGW::Gateway* theGateway = nullptr;
00032 
00033 Gateway::Gateway(void)
00034 {
00035     theMultiTaskProcess = this;
00036     theProcess = this;
00037     _packetEventQue.setMaxSize(MAX_INFLIGHTMESSAGES * MAX_CLIENTS);
00038     _clientList = new ClientList();
00039     _adapterManager = new AdapterManager(this);
00040     _topics = new Topics();
00041 }
00042 
00043 Gateway::~Gateway()
00044 {
00045     if ( _params.loginId )
00046     {
00047         free(_params.loginId);
00048     }
00049     if ( _params.password )
00050     {
00051         free(_params.password);
00052     }
00053     if ( _params.gatewayName )
00054     {
00055         free(_params.gatewayName);
00056     }
00057     if ( _params.brokerName )
00058     {
00059         free(_params.brokerName);
00060     }
00061     if ( _params.port )
00062     {
00063         free(_params.port);
00064     }
00065     if ( _params.portSecure )
00066     {
00067         free(_params.portSecure);
00068     }
00069     if ( _params.certKey )
00070     {
00071         free(_params.certKey);
00072     }
00073     if ( _params.privateKey )
00074     {
00075         free(_params.privateKey);
00076     }
00077     if ( _params.rootCApath )
00078     {
00079         free(_params.rootCApath);
00080     }
00081     if ( _params.rootCAfile )
00082     {
00083         free(_params.rootCAfile);
00084     }
00085     if ( _params.clientListName )
00086     {
00087         free(_params.clientListName);
00088     }
00089     if ( _params.configName )
00090     {
00091         free(_params.configName);
00092     }
00093 
00094     if ( _params.qosMinusClientListName )
00095     {
00096         free(_params.qosMinusClientListName);
00097     }
00098 
00099     if ( _adapterManager )
00100     {
00101         delete _adapterManager;
00102     }
00103     if ( _clientList )
00104     {
00105         delete _clientList;
00106     }
00107 
00108     if ( _topics )
00109     {
00110         delete _topics;
00111     }
00112 }
00113 
00114 int Gateway::getParam(const char* parameter, char* value)
00115 {
00116     return MultiTaskProcess::getParam(parameter, value);
00117 }
00118 
00119 void Gateway::initialize(int argc, char** argv)
00120 {
00121     char param[MQTTSNGW_PARAM_MAX];
00122     string fileName;
00123     theGateway = this;
00124 
00125     MultiTaskProcess::initialize(argc, argv);
00126     resetRingBuffer();
00127 
00128     _params.configDir = *getConfigDirName();
00129     fileName = _params.configDir + *getConfigFileName();
00130     _params.configName = strdup(fileName.c_str());
00131 
00132     if (getParam("BrokerName", param) == 0)
00133     {
00134         _params.brokerName = strdup(param);
00135     }
00136     if (getParam("BrokerPortNo", param) == 0)
00137     {
00138         _params.port = strdup(param);
00139     }
00140     if (getParam("BrokerSecurePortNo", param) == 0)
00141     {
00142         _params.portSecure = strdup(param);
00143     }
00144 
00145     if (getParam("CertKey", param) == 0)
00146     {
00147         _params.certKey = strdup(param);
00148     }
00149     if (getParam("PrivateKey", param) == 0)
00150         {
00151             _params.privateKey = strdup(param);
00152         }
00153     if (getParam("RootCApath", param) == 0)
00154     {
00155         _params.rootCApath = strdup(param);
00156     }
00157     if (getParam("RootCAfile", param) == 0)
00158     {
00159         _params.rootCAfile = strdup(param);
00160     }
00161 
00162     if (getParam("GatewayID", param) == 0)
00163     {
00164         _params.gatewayId = atoi(param);
00165     }
00166 
00167     if (_params.gatewayId == 0 || _params.gatewayId > 255)
00168     {
00169         throw Exception( "Gateway::initialize: invalid Gateway Id");
00170     }
00171 
00172     if (getParam("GatewayName", param) == 0)
00173     {
00174         _params.gatewayName = strdup(param);
00175     }
00176 
00177     if (_params.gatewayName == 0 )
00178     {
00179         throw Exception( "Gateway::initialize: Gateway Name is missing.");
00180     }
00181 
00182     _params.mqttVersion = DEFAULT_MQTT_VERSION;
00183     if (getParam("MQTTVersion", param) == 0)
00184     {
00185         _params.mqttVersion = atoi(param);
00186     }
00187 
00188     _params.maxInflightMsgs = DEFAULT_MQTT_VERSION;
00189     if (getParam("MaxInflightMsgs", param) == 0)
00190     {
00191         _params.maxInflightMsgs = atoi(param);
00192     }
00193 
00194     _params.keepAlive = DEFAULT_KEEP_ALIVE_TIME;
00195     if (getParam("KeepAlive", param) == 0)
00196     {
00197         _params.keepAlive = atoi(param);
00198     }
00199 
00200     if (getParam("LoginID", param) == 0)
00201     {
00202         _params.loginId = strdup(param);
00203     }
00204 
00205     if (getParam("Password", param) == 0)
00206     {
00207         _params.password = strdup(param);
00208     }
00209 
00210     if (getParam("ClientAuthentication", param) == 0)
00211     {
00212         if (!strcasecmp(param, "YES"))
00213         {
00214             _params.clientAuthentication = true;
00215         }
00216     }
00217 
00218     /*  ClientList and Adapters  Initialize  */
00219     _adapterManager->initialize();
00220 
00221     bool aggregate = _adapterManager->isAggregaterActive();
00222     _clientList->initialize(aggregate);
00223 
00224     /*  Setup predefined topics  */
00225     _clientList->setPredefinedTopics(aggregate);
00226 }
00227 
00228 void Gateway::run(void)
00229 {
00230     /* write prompts */
00231     _lightIndicator.redLight(true);
00232     WRITELOG("\n%s", PAHO_COPYRIGHT4);
00233     WRITELOG("\n%s\n", PAHO_COPYRIGHT0);
00234     WRITELOG("%s\n", PAHO_COPYRIGHT1);
00235     WRITELOG("%s\n", PAHO_COPYRIGHT2);
00236     WRITELOG(" *\n%s\n", PAHO_COPYRIGHT3);
00237     WRITELOG(" * Version: %s\n", PAHO_GATEWAY_VERSION);
00238     WRITELOG("%s\n", PAHO_COPYRIGHT4);
00239     WRITELOG("\n%s %s has been started.\n\n", currentDateTime(), _params.gatewayName);
00240     WRITELOG(" ConfigFile: %s\n", _params.configName);
00241 
00242     if ( _params.clientListName )
00243     {
00244         WRITELOG(" ClientList: %s\n", _params.clientListName);
00245     }
00246 
00247     if (  _params.predefinedTopicFileName )
00248     {
00249         WRITELOG(" PreDefFile: %s\n", _params.predefinedTopicFileName);
00250     }
00251 
00252     WRITELOG(" SensorN/W:  %s\n", _sensorNetwork.getDescription());
00253     WRITELOG(" Broker:     %s : %s, %s\n", _params.brokerName, _params.port, _params.portSecure);
00254     WRITELOG(" RootCApath: %s\n", _params.rootCApath);
00255     WRITELOG(" RootCAfile: %s\n", _params.rootCAfile);
00256     WRITELOG(" CertKey:    %s\n", _params.certKey);
00257     WRITELOG(" PrivateKey: %s\n\n\n", _params.privateKey);
00258 
00259 
00260     /* Run Tasks until CTRL+C entred */
00261     MultiTaskProcess::run();
00262 
00263     /* stop Tasks */
00264     Event* ev = new Event();
00265     ev->setStop();
00266     _packetEventQue.post(ev);
00267     ev = new Event();
00268     ev->setStop();
00269     _brokerSendQue.post(ev);
00270     ev = new Event();
00271     ev->setStop();
00272     _clientSendQue.post(ev);
00273 
00274     /* wait until all Task stop */
00275     MultiTaskProcess::waitStop();
00276 
00277     WRITELOG("\n%s MQTT-SN Gateway  stoped\n\n", currentDateTime());
00278     _lightIndicator.allLightOff();
00279 }
00280 
00281 EventQue* Gateway::getPacketEventQue()
00282 {
00283     return &_packetEventQue;
00284 }
00285 
00286 EventQue* Gateway::getClientSendQue()
00287 {
00288     return &_clientSendQue;
00289 }
00290 
00291 EventQue* Gateway::getBrokerSendQue()
00292 {
00293     return &_brokerSendQue;
00294 }
00295 
00296 ClientList* Gateway::getClientList()
00297 {
00298     return _clientList;
00299 }
00300 
00301 SensorNetwork* Gateway::getSensorNetwork()
00302 {
00303     return &_sensorNetwork;
00304 }
00305 
00306 LightIndicator* Gateway::getLightIndicator()
00307 {
00308     return &_lightIndicator;
00309 }
00310 
00311 GatewayParams* Gateway::getGWParams(void)
00312 {
00313     return &_params;
00314 }
00315 
00316 AdapterManager* Gateway::getAdapterManager(void)
00317 {
00318     return _adapterManager;
00319 }
00320 
00321 Topics* Gateway::getTopics(void)
00322 {
00323     return _topics;
00324 }
00325 
00326 bool Gateway::hasSecureConnection(void)
00327 {
00328     return (  _params.certKey
00329             && _params.privateKey
00330             && _params.rootCApath
00331             && _params.rootCAfile );
00332 }
00333 /*=====================================
00334  Class EventQue
00335  =====================================*/
00336 EventQue::EventQue()
00337 {
00338 
00339 }
00340 
00341 EventQue::~EventQue()
00342 {
00343     _mutex.lock();
00344     while (_que.size() > 0)
00345     {
00346         delete _que.front();
00347         _que.pop();
00348     }
00349     _mutex.unlock();
00350 }
00351 
00352 void  EventQue::setMaxSize(uint16_t maxSize)
00353 {
00354     _que.setMaxSize((int)maxSize);
00355 }
00356 
00357 Event* EventQue::wait(void)
00358 {
00359     Event* ev = nullptr;
00360 
00361     while(ev == nullptr)
00362     {
00363         if ( _que.size() == 0 )
00364         {
00365             _sem.wait();
00366         }
00367         _mutex.lock();
00368         ev = _que.front();
00369         _que.pop();
00370         _mutex.unlock();
00371     }
00372     return ev;
00373 }
00374 
00375 Event* EventQue::timedwait(uint16_t millsec)
00376 {
00377     Event* ev;
00378     if ( _que.size() == 0 )
00379     {
00380         _sem.timedwait(millsec);
00381     }
00382     _mutex.lock();
00383 
00384     if (_que.size() == 0)
00385     {
00386         ev = new Event();
00387         ev->setTimeout();
00388     }
00389     else
00390     {
00391         ev = _que.front();
00392         _que.pop();
00393     }
00394     _mutex.unlock();
00395     return ev;
00396 }
00397 
00398 void EventQue::post(Event* ev)
00399 {
00400     if ( ev )
00401     {
00402         _mutex.lock();
00403         if ( _que.post(ev) )
00404         {
00405             _sem.post();
00406         }
00407         else
00408         {
00409             delete ev;
00410         }
00411         _mutex.unlock();
00412     }
00413 }
00414 
00415 int EventQue::size()
00416 {
00417     _mutex.lock();
00418     int sz = _que.size();
00419     _mutex.unlock();
00420     return sz;
00421 }
00422 
00423 
00424 /*=====================================
00425  Class Event
00426  =====================================*/
00427 Event::Event()
00428 {
00429 
00430 }
00431 
00432 Event::~Event()
00433 {
00434     if (_sensorNetAddr)
00435     {
00436         delete _sensorNetAddr;
00437     }
00438 
00439     if (_mqttSNPacket)
00440     {
00441         delete _mqttSNPacket;
00442     }
00443 
00444     if (_mqttGWPacket)
00445     {
00446         delete _mqttGWPacket;
00447     }
00448 }
00449 
00450 EventType Event::getEventType()
00451 {
00452     return _eventType;
00453 }
00454 
00455 void Event::setClientSendEvent(Client* client, MQTTSNPacket* packet)
00456 {
00457     _client = client;
00458     _eventType = EtClientSend;
00459     _mqttSNPacket = packet;
00460 }
00461 
00462 void Event::setBrokerSendEvent(Client* client, MQTTGWPacket* packet)
00463 {
00464     _client = client;
00465     _eventType = EtBrokerSend;
00466     _mqttGWPacket = packet;
00467 }
00468 
00469 void Event::setClientRecvEvent(Client* client, MQTTSNPacket* packet)
00470 {
00471     _client = client;
00472     _eventType = EtClientRecv;
00473     _mqttSNPacket = packet;
00474 }
00475 
00476 void Event::setBrokerRecvEvent(Client* client, MQTTGWPacket* packet)
00477 {
00478     _client = client;
00479     _eventType = EtBrokerRecv;
00480     _mqttGWPacket = packet;
00481 }
00482 
00483 void Event::setTimeout(void)
00484 {
00485     _eventType = EtTimeout;
00486 }
00487 
00488 void Event::setStop(void)
00489 {
00490     _eventType = EtStop;
00491 }
00492 
00493 void Event::setBrodcastEvent(MQTTSNPacket* msg)
00494 {
00495     _mqttSNPacket = msg;
00496     _eventType = EtBroadcast;
00497 }
00498 
00499 void Event::setClientSendEvent(SensorNetAddress* addr, MQTTSNPacket* msg)
00500 {
00501     _eventType = EtSensornetSend;
00502     _sensorNetAddr = addr;
00503     _mqttSNPacket = msg;
00504 }
00505 
00506 Client* Event::getClient(void)
00507 {
00508     return _client;
00509 }
00510 
00511 SensorNetAddress* Event::getSensorNetAddress(void)
00512 {
00513     return _sensorNetAddr;
00514 }
00515 
00516 MQTTSNPacket* Event::getMQTTSNPacket()
00517 {
00518     return _mqttSNPacket;
00519 }
00520 
00521 MQTTGWPacket* Event::getMQTTGWPacket(void)
00522 {
00523     return _mqttGWPacket;
00524 }
00525 
00526