Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
MQTTSNGateway.cpp
00001 /************************************************************************************** 00002 * Copyright (c) 2016, Tomoaki Yamaguchi 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Tomoaki Yamaguchi - initial API and implementation and/or initial documentation 00015 **************************************************************************************/ 00016 #include "MQTTSNGWDefines.h" 00017 #include "MQTTSNGateway.h" 00018 #include "SensorNetwork.h" 00019 #include "MQTTSNGWProcess.h" 00020 #include "MQTTSNGWVersion.h" 00021 #include "MQTTSNGWQoSm1Proxy.h" 00022 #include "MQTTSNGWClient.h" 00023 #include <string.h> 00024 using namespace MQTTSNGW; 00025 00026 char* currentDateTime(void); 00027 00028 /*===================================== 00029 Class Gateway 00030 =====================================*/ 00031 MQTTSNGW::Gateway* theGateway = nullptr; 00032 00033 Gateway::Gateway(void) 00034 { 00035 theMultiTaskProcess = this; 00036 theProcess = this; 00037 _packetEventQue.setMaxSize(MAX_INFLIGHTMESSAGES * MAX_CLIENTS); 00038 _clientList = new ClientList(); 00039 _adapterManager = new AdapterManager(this); 00040 _topics = new Topics(); 00041 } 00042 00043 Gateway::~Gateway() 00044 { 00045 if ( _params.loginId ) 00046 { 00047 free(_params.loginId); 00048 } 00049 if ( _params.password ) 00050 { 00051 free(_params.password); 00052 } 00053 if ( _params.gatewayName ) 00054 { 00055 free(_params.gatewayName); 00056 } 00057 if ( _params.brokerName ) 00058 { 00059 free(_params.brokerName); 00060 } 00061 if ( _params.port ) 00062 { 00063 free(_params.port); 00064 } 00065 if ( _params.portSecure ) 00066 { 00067 free(_params.portSecure); 00068 } 00069 if ( _params.certKey ) 00070 { 00071 free(_params.certKey); 00072 } 00073 if ( _params.privateKey ) 00074 { 00075 free(_params.privateKey); 00076 } 00077 if ( _params.rootCApath ) 00078 { 00079 free(_params.rootCApath); 00080 } 00081 if ( _params.rootCAfile ) 00082 { 00083 free(_params.rootCAfile); 00084 } 00085 if ( _params.clientListName ) 00086 { 00087 free(_params.clientListName); 00088 } 00089 if ( _params.configName ) 00090 { 00091 free(_params.configName); 00092 } 00093 00094 if ( _params.qosMinusClientListName ) 00095 { 00096 free(_params.qosMinusClientListName); 00097 } 00098 00099 if ( _adapterManager ) 00100 { 00101 delete _adapterManager; 00102 } 00103 if ( _clientList ) 00104 { 00105 delete _clientList; 00106 } 00107 00108 if ( _topics ) 00109 { 00110 delete _topics; 00111 } 00112 } 00113 00114 int Gateway::getParam(const char* parameter, char* value) 00115 { 00116 return MultiTaskProcess::getParam(parameter, value); 00117 } 00118 00119 void Gateway::initialize(int argc, char** argv) 00120 { 00121 char param[MQTTSNGW_PARAM_MAX]; 00122 string fileName; 00123 theGateway = this; 00124 00125 MultiTaskProcess::initialize(argc, argv); 00126 resetRingBuffer(); 00127 00128 _params.configDir = *getConfigDirName(); 00129 fileName = _params.configDir + *getConfigFileName(); 00130 _params.configName = strdup(fileName.c_str()); 00131 00132 if (getParam("BrokerName", param) == 0) 00133 { 00134 _params.brokerName = strdup(param); 00135 } 00136 if (getParam("BrokerPortNo", param) == 0) 00137 { 00138 _params.port = strdup(param); 00139 } 00140 if (getParam("BrokerSecurePortNo", param) == 0) 00141 { 00142 _params.portSecure = strdup(param); 00143 } 00144 00145 if (getParam("CertKey", param) == 0) 00146 { 00147 _params.certKey = strdup(param); 00148 } 00149 if (getParam("PrivateKey", param) == 0) 00150 { 00151 _params.privateKey = strdup(param); 00152 } 00153 if (getParam("RootCApath", param) == 0) 00154 { 00155 _params.rootCApath = strdup(param); 00156 } 00157 if (getParam("RootCAfile", param) == 0) 00158 { 00159 _params.rootCAfile = strdup(param); 00160 } 00161 00162 if (getParam("GatewayID", param) == 0) 00163 { 00164 _params.gatewayId = atoi(param); 00165 } 00166 00167 if (_params.gatewayId == 0 || _params.gatewayId > 255) 00168 { 00169 throw Exception( "Gateway::initialize: invalid Gateway Id"); 00170 } 00171 00172 if (getParam("GatewayName", param) == 0) 00173 { 00174 _params.gatewayName = strdup(param); 00175 } 00176 00177 if (_params.gatewayName == 0 ) 00178 { 00179 throw Exception( "Gateway::initialize: Gateway Name is missing."); 00180 } 00181 00182 _params.mqttVersion = DEFAULT_MQTT_VERSION; 00183 if (getParam("MQTTVersion", param) == 0) 00184 { 00185 _params.mqttVersion = atoi(param); 00186 } 00187 00188 _params.maxInflightMsgs = DEFAULT_MQTT_VERSION; 00189 if (getParam("MaxInflightMsgs", param) == 0) 00190 { 00191 _params.maxInflightMsgs = atoi(param); 00192 } 00193 00194 _params.keepAlive = DEFAULT_KEEP_ALIVE_TIME; 00195 if (getParam("KeepAlive", param) == 0) 00196 { 00197 _params.keepAlive = atoi(param); 00198 } 00199 00200 if (getParam("LoginID", param) == 0) 00201 { 00202 _params.loginId = strdup(param); 00203 } 00204 00205 if (getParam("Password", param) == 0) 00206 { 00207 _params.password = strdup(param); 00208 } 00209 00210 if (getParam("ClientAuthentication", param) == 0) 00211 { 00212 if (!strcasecmp(param, "YES")) 00213 { 00214 _params.clientAuthentication = true; 00215 } 00216 } 00217 00218 /* ClientList and Adapters Initialize */ 00219 _adapterManager->initialize(); 00220 00221 bool aggregate = _adapterManager->isAggregaterActive(); 00222 _clientList->initialize(aggregate); 00223 00224 /* Setup predefined topics */ 00225 _clientList->setPredefinedTopics(aggregate); 00226 } 00227 00228 void Gateway::run(void) 00229 { 00230 /* write prompts */ 00231 _lightIndicator.redLight(true); 00232 WRITELOG("\n%s", PAHO_COPYRIGHT4); 00233 WRITELOG("\n%s\n", PAHO_COPYRIGHT0); 00234 WRITELOG("%s\n", PAHO_COPYRIGHT1); 00235 WRITELOG("%s\n", PAHO_COPYRIGHT2); 00236 WRITELOG(" *\n%s\n", PAHO_COPYRIGHT3); 00237 WRITELOG(" * Version: %s\n", PAHO_GATEWAY_VERSION); 00238 WRITELOG("%s\n", PAHO_COPYRIGHT4); 00239 WRITELOG("\n%s %s has been started.\n\n", currentDateTime(), _params.gatewayName); 00240 WRITELOG(" ConfigFile: %s\n", _params.configName); 00241 00242 if ( _params.clientListName ) 00243 { 00244 WRITELOG(" ClientList: %s\n", _params.clientListName); 00245 } 00246 00247 if ( _params.predefinedTopicFileName ) 00248 { 00249 WRITELOG(" PreDefFile: %s\n", _params.predefinedTopicFileName); 00250 } 00251 00252 WRITELOG(" SensorN/W: %s\n", _sensorNetwork.getDescription()); 00253 WRITELOG(" Broker: %s : %s, %s\n", _params.brokerName, _params.port, _params.portSecure); 00254 WRITELOG(" RootCApath: %s\n", _params.rootCApath); 00255 WRITELOG(" RootCAfile: %s\n", _params.rootCAfile); 00256 WRITELOG(" CertKey: %s\n", _params.certKey); 00257 WRITELOG(" PrivateKey: %s\n\n\n", _params.privateKey); 00258 00259 00260 /* Run Tasks until CTRL+C entred */ 00261 MultiTaskProcess::run(); 00262 00263 /* stop Tasks */ 00264 Event* ev = new Event(); 00265 ev->setStop(); 00266 _packetEventQue.post(ev); 00267 ev = new Event(); 00268 ev->setStop(); 00269 _brokerSendQue.post(ev); 00270 ev = new Event(); 00271 ev->setStop(); 00272 _clientSendQue.post(ev); 00273 00274 /* wait until all Task stop */ 00275 MultiTaskProcess::waitStop(); 00276 00277 WRITELOG("\n%s MQTT-SN Gateway stoped\n\n", currentDateTime()); 00278 _lightIndicator.allLightOff(); 00279 } 00280 00281 EventQue* Gateway::getPacketEventQue() 00282 { 00283 return &_packetEventQue; 00284 } 00285 00286 EventQue* Gateway::getClientSendQue() 00287 { 00288 return &_clientSendQue; 00289 } 00290 00291 EventQue* Gateway::getBrokerSendQue() 00292 { 00293 return &_brokerSendQue; 00294 } 00295 00296 ClientList* Gateway::getClientList() 00297 { 00298 return _clientList; 00299 } 00300 00301 SensorNetwork* Gateway::getSensorNetwork() 00302 { 00303 return &_sensorNetwork; 00304 } 00305 00306 LightIndicator* Gateway::getLightIndicator() 00307 { 00308 return &_lightIndicator; 00309 } 00310 00311 GatewayParams* Gateway::getGWParams(void) 00312 { 00313 return &_params; 00314 } 00315 00316 AdapterManager* Gateway::getAdapterManager(void) 00317 { 00318 return _adapterManager; 00319 } 00320 00321 Topics* Gateway::getTopics(void) 00322 { 00323 return _topics; 00324 } 00325 00326 bool Gateway::hasSecureConnection(void) 00327 { 00328 return ( _params.certKey 00329 && _params.privateKey 00330 && _params.rootCApath 00331 && _params.rootCAfile ); 00332 } 00333 /*===================================== 00334 Class EventQue 00335 =====================================*/ 00336 EventQue::EventQue() 00337 { 00338 00339 } 00340 00341 EventQue::~EventQue() 00342 { 00343 _mutex.lock(); 00344 while (_que.size() > 0) 00345 { 00346 delete _que.front(); 00347 _que.pop(); 00348 } 00349 _mutex.unlock(); 00350 } 00351 00352 void EventQue::setMaxSize(uint16_t maxSize) 00353 { 00354 _que.setMaxSize((int)maxSize); 00355 } 00356 00357 Event* EventQue::wait(void) 00358 { 00359 Event* ev = nullptr; 00360 00361 while(ev == nullptr) 00362 { 00363 if ( _que.size() == 0 ) 00364 { 00365 _sem.wait(); 00366 } 00367 _mutex.lock(); 00368 ev = _que.front(); 00369 _que.pop(); 00370 _mutex.unlock(); 00371 } 00372 return ev; 00373 } 00374 00375 Event* EventQue::timedwait(uint16_t millsec) 00376 { 00377 Event* ev; 00378 if ( _que.size() == 0 ) 00379 { 00380 _sem.timedwait(millsec); 00381 } 00382 _mutex.lock(); 00383 00384 if (_que.size() == 0) 00385 { 00386 ev = new Event(); 00387 ev->setTimeout(); 00388 } 00389 else 00390 { 00391 ev = _que.front(); 00392 _que.pop(); 00393 } 00394 _mutex.unlock(); 00395 return ev; 00396 } 00397 00398 void EventQue::post(Event* ev) 00399 { 00400 if ( ev ) 00401 { 00402 _mutex.lock(); 00403 if ( _que.post(ev) ) 00404 { 00405 _sem.post(); 00406 } 00407 else 00408 { 00409 delete ev; 00410 } 00411 _mutex.unlock(); 00412 } 00413 } 00414 00415 int EventQue::size() 00416 { 00417 _mutex.lock(); 00418 int sz = _que.size(); 00419 _mutex.unlock(); 00420 return sz; 00421 } 00422 00423 00424 /*===================================== 00425 Class Event 00426 =====================================*/ 00427 Event::Event() 00428 { 00429 00430 } 00431 00432 Event::~Event() 00433 { 00434 if (_sensorNetAddr) 00435 { 00436 delete _sensorNetAddr; 00437 } 00438 00439 if (_mqttSNPacket) 00440 { 00441 delete _mqttSNPacket; 00442 } 00443 00444 if (_mqttGWPacket) 00445 { 00446 delete _mqttGWPacket; 00447 } 00448 } 00449 00450 EventType Event::getEventType() 00451 { 00452 return _eventType; 00453 } 00454 00455 void Event::setClientSendEvent(Client* client, MQTTSNPacket* packet) 00456 { 00457 _client = client; 00458 _eventType = EtClientSend; 00459 _mqttSNPacket = packet; 00460 } 00461 00462 void Event::setBrokerSendEvent(Client* client, MQTTGWPacket* packet) 00463 { 00464 _client = client; 00465 _eventType = EtBrokerSend; 00466 _mqttGWPacket = packet; 00467 } 00468 00469 void Event::setClientRecvEvent(Client* client, MQTTSNPacket* packet) 00470 { 00471 _client = client; 00472 _eventType = EtClientRecv; 00473 _mqttSNPacket = packet; 00474 } 00475 00476 void Event::setBrokerRecvEvent(Client* client, MQTTGWPacket* packet) 00477 { 00478 _client = client; 00479 _eventType = EtBrokerRecv; 00480 _mqttGWPacket = packet; 00481 } 00482 00483 void Event::setTimeout(void) 00484 { 00485 _eventType = EtTimeout; 00486 } 00487 00488 void Event::setStop(void) 00489 { 00490 _eventType = EtStop; 00491 } 00492 00493 void Event::setBrodcastEvent(MQTTSNPacket* msg) 00494 { 00495 _mqttSNPacket = msg; 00496 _eventType = EtBroadcast; 00497 } 00498 00499 void Event::setClientSendEvent(SensorNetAddress* addr, MQTTSNPacket* msg) 00500 { 00501 _eventType = EtSensornetSend; 00502 _sensorNetAddr = addr; 00503 _mqttSNPacket = msg; 00504 } 00505 00506 Client* Event::getClient(void) 00507 { 00508 return _client; 00509 } 00510 00511 SensorNetAddress* Event::getSensorNetAddress(void) 00512 { 00513 return _sensorNetAddr; 00514 } 00515 00516 MQTTSNPacket* Event::getMQTTSNPacket() 00517 { 00518 return _mqttSNPacket; 00519 } 00520 00521 MQTTGWPacket* Event::getMQTTGWPacket(void) 00522 { 00523 return _mqttGWPacket; 00524 } 00525 00526
Generated on Wed Jul 13 2022 10:46:02 by
1.7.2