Jack Hansdampf / mbed-mqtt-GSOE1

Dependents:   ESP8266MQTT

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTSNGWClient.cpp Source File

MQTTSNGWClient.cpp

00001 /**************************************************************************************
00002  * Copyright (c) 2016, Tomoaki Yamaguchi
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors:
00014  *    Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
00015  *    Tieto Poland Sp. z o.o. - Gateway improvements
00016  **************************************************************************************/
00017 
00018 #include "MQTTSNGWDefines.h"
00019 #include "MQTTSNGWClientList.h"
00020 #include "MQTTSNGateway.h"
00021 #include "SensorNetwork.h"
00022 #include <string>
00023 #include <string.h>
00024 #include <stdio.h>
00025 
00026 #include "MQTTSNGWForwarder.h"
00027 
00028 using namespace MQTTSNGW;
00029 char* currentDateTime(void);
00030 
00031 
00032 /*=====================================
00033  Class Client
00034  =====================================*/
00035 static const char* theClientStatus[] = { "Disconnected", "TryConnecting", "Connecting", "Active", "Asleep", "Awake", "Lost" };
00036 
00037 Client::Client(bool secure)
00038 {
00039     _packetId = 0;
00040     _snMsgId = 0;
00041     _status = Cstat_Disconnected;
00042     _keepAliveMsec = 0;
00043     _topics = new Topics();
00044     _clientId = nullptr;
00045     _willTopic = nullptr;
00046     _willMsg = nullptr;
00047     _connectData = MQTTPacket_Connect_Initializer;
00048     _network = new Network(secure);
00049     _secureNetwork = secure;
00050     _sensorNetype = true;
00051     _connAck = nullptr;
00052     _waitWillMsgFlg = false;
00053     _sessionStatus = false;
00054     _prevClient = nullptr;
00055     _nextClient = nullptr;
00056     _clientSleepPacketQue.setMaxSize(MAX_SAVED_PUBLISH);
00057     _proxyPacketQue.setMaxSize(MAX_SAVED_PUBLISH);
00058     _hasPredefTopic = false;
00059     _holdPingRequest = false;
00060     _forwarder = nullptr;
00061     _clientType = Ctype_Regular;
00062 }
00063 
00064 Client::~Client()
00065 {
00066     if ( _topics )
00067     {
00068         delete _topics;
00069     }
00070 
00071     if ( _clientId )
00072     {
00073         free(_clientId);
00074     }
00075 
00076     if ( _willTopic )
00077     {
00078         free(_willTopic);
00079     }
00080 
00081     if ( _willMsg )
00082     {
00083         free(_willMsg);
00084     }
00085 
00086     if (_connAck)
00087     {
00088         delete _connAck;
00089     }
00090 
00091     if (_network)
00092     {
00093         delete _network;
00094     }
00095 }
00096 
00097 TopicIdMapElement* Client::getWaitedPubTopicId(uint16_t msgId)
00098 {
00099     return _waitedPubTopicIdMap.getElement(msgId);
00100 }
00101 
00102 TopicIdMapElement* Client::getWaitedSubTopicId(uint16_t msgId)
00103 {
00104     return _waitedSubTopicIdMap.getElement(msgId);
00105 }
00106 
00107 MQTTGWPacket* Client::getClientSleepPacket()
00108 {
00109     return _clientSleepPacketQue.getPacket();
00110 }
00111 
00112 void Client::deleteFirstClientSleepPacket()
00113 {
00114     _clientSleepPacketQue.pop();
00115 }
00116 
00117 int Client::setClientSleepPacket(MQTTGWPacket* packet)
00118 {
00119     int rc = _clientSleepPacketQue.post(packet);
00120     if ( rc )
00121     {
00122         WRITELOG("%s    %s is sleeping. the packet was saved.\n", currentDateTime(), _clientId);
00123     }
00124     else
00125     {
00126         WRITELOG("%s    %s is sleeping but discard the packet.\n", currentDateTime(), _clientId);
00127     }
00128     return rc;
00129 }
00130 
00131 MQTTSNPacket* Client::getProxyPacket(void)
00132 {
00133     return _proxyPacketQue.getPacket();
00134 }
00135 
00136 void Client::deleteFirstProxyPacket()
00137 {
00138     _proxyPacketQue.pop();
00139 }
00140 
00141 int Client::setProxyPacket(MQTTSNPacket* packet)
00142 {
00143     int rc = _proxyPacketQue.post(packet);
00144     if ( rc )
00145     {
00146         WRITELOG("%s    %s is Disconnected. the packet was saved.\n", currentDateTime(), _clientId);
00147     }
00148     else
00149     {
00150         WRITELOG("%s    %s is Disconnected and discard the packet.\n", currentDateTime(), _clientId);
00151     }
00152     return rc;
00153 }
00154 
00155 Connect* Client::getConnectData(void)
00156 {
00157     return &_connectData;
00158 }
00159 
00160 void Client::eraseWaitedPubTopicId(uint16_t msgId)
00161 {
00162     _waitedPubTopicIdMap.erase(msgId);
00163 }
00164 
00165 void Client::eraseWaitedSubTopicId(uint16_t msgId)
00166 {
00167     _waitedSubTopicIdMap.erase(msgId);
00168 }
00169 
00170 void Client::clearWaitedPubTopicId(void)
00171 {
00172     _waitedPubTopicIdMap.clear();
00173 }
00174 
00175 void Client::clearWaitedSubTopicId(void)
00176 {
00177     _waitedSubTopicIdMap.clear();
00178 }
00179 
00180 void Client::setWaitedPubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type)
00181 {
00182     _waitedPubTopicIdMap.add(msgId, topicId, type);
00183 }
00184 void Client::setWaitedSubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type)
00185 {
00186     _waitedSubTopicIdMap.add(msgId, topicId, type);
00187 }
00188 
00189 bool Client::checkTimeover(void)
00190 {
00191     return (_status == Cstat_Active && _keepAliveTimer.isTimeup());
00192 }
00193 
00194 void Client::setKeepAlive(MQTTSNPacket* packet)
00195 {
00196     MQTTSNPacket_connectData param;
00197     if (packet->getCONNECT(&param))
00198     {
00199         _keepAliveMsec = param.duration * 1000UL;
00200         _keepAliveTimer.start(_keepAliveMsec * 1.5);
00201     }
00202 }
00203 
00204 void Client::setForwarder(Forwarder* forwarder)
00205 {
00206     _forwarder = forwarder;
00207     _clientType = Ctype_Forwarded;
00208 }
00209 
00210 Forwarder* Client::getForwarder(void)
00211 {
00212     return _forwarder;
00213 }
00214 
00215 void Client::setSessionStatus(bool status)
00216 {
00217     _sessionStatus = status;
00218 }
00219 
00220 bool Client::erasable(void)
00221 {
00222     return _sessionStatus && !_hasPredefTopic  && _forwarder == nullptr;
00223 }
00224 
00225 void Client::updateStatus(MQTTSNPacket* packet)
00226 {
00227     if (((_status == Cstat_Disconnected) || (_status == Cstat_Lost)) && packet->getType() == MQTTSN_CONNECT)
00228     {
00229         setKeepAlive(packet);
00230     }
00231     else if (_status == Cstat_Active)
00232     {
00233         switch (packet->getType())
00234         {
00235         case MQTTSN_PINGREQ:
00236         case MQTTSN_PUBLISH:
00237         case MQTTSN_SUBSCRIBE:
00238         case MQTTSN_UNSUBSCRIBE:
00239         case MQTTSN_PUBACK:
00240         case MQTTSN_PUBCOMP:
00241         case MQTTSN_PUBREL:
00242         case MQTTSN_PUBREC:
00243             if ( _clientType != Ctype_Proxy )
00244             {
00245                 _keepAliveTimer.start(_keepAliveMsec * 1.5);
00246             }
00247             break;
00248         case MQTTSN_DISCONNECT:
00249             uint16_t duration;
00250             packet->getDISCONNECT(&duration);
00251             if (duration)
00252             {
00253                 _status = Cstat_Asleep;
00254             }
00255             else
00256             {
00257                 disconnected();
00258             }
00259             break;
00260         default:
00261             break;
00262         }
00263     }
00264     else if (_status == Cstat_Awake || _status == Cstat_Asleep)
00265     {
00266         switch (packet->getType())
00267         {
00268         case MQTTSN_CONNECT:
00269             _status = Cstat_Active;
00270             break;
00271         case MQTTSN_DISCONNECT:
00272             disconnected();
00273             break;
00274         case MQTTSN_PINGREQ:
00275             _status = Cstat_Awake;
00276             break;
00277         case MQTTSN_PINGRESP:
00278             _status = Cstat_Asleep;
00279             break;
00280         default:
00281             break;
00282         }
00283     }
00284     DEBUGLOG("Client Status = %s\n", theClientStatus[_status]);
00285 }
00286 
00287 void Client::updateStatus(ClientStatus stat)
00288 {
00289     _status = stat;
00290 }
00291 
00292 void Client::connectSended()
00293 {
00294     _status = Cstat_Connecting;
00295 }
00296 
00297 void Client::connackSended(int rc)
00298 {
00299     if (rc == MQTTSN_RC_ACCEPTED)
00300     {
00301         _status = Cstat_Active;
00302     }
00303     else
00304     {
00305         disconnected();
00306     }
00307 }
00308 
00309 void Client::disconnected(void)
00310 {
00311     _status = Cstat_Disconnected;
00312     _waitWillMsgFlg = false;
00313 }
00314 
00315 void Client::tryConnect(void)
00316 {
00317     _status = Cstat_TryConnecting;
00318 }
00319 
00320 bool Client::isConnectSendable(void)
00321 {
00322     if ( _status == Cstat_Lost || _status == Cstat_TryConnecting )
00323     {
00324         return false;
00325     }
00326     else
00327     {
00328         return true;
00329     }
00330 }
00331 
00332 uint16_t Client::getNextPacketId(void)
00333 {
00334     _packetId++;
00335     if ( _packetId == 0xffff )
00336     {
00337         _packetId = 1;
00338     }
00339     return _packetId;
00340 }
00341 
00342 uint8_t Client::getNextSnMsgId(void)
00343 {
00344     _snMsgId++;
00345     if (_snMsgId == 0)
00346     {
00347         _snMsgId++;
00348     }
00349     return _snMsgId;
00350 }
00351 
00352 Topics* Client::getTopics(void)
00353 {
00354     return _topics;
00355 }
00356 
00357 Network* Client::getNetwork(void)
00358 {
00359     return _network;
00360 }
00361 
00362 void Client::setClientAddress(SensorNetAddress* sensorNetAddr)
00363 {
00364     _sensorNetAddr = *sensorNetAddr;
00365 }
00366 
00367 SensorNetAddress* Client::getSensorNetAddress(void)
00368 {
00369     return &_sensorNetAddr;
00370 }
00371 
00372 void Client::setSensorNetType(bool stable)
00373 {
00374     _sensorNetype = stable;
00375 }
00376 
00377 void Client::setTopics(Topics* topics)
00378 {
00379   _topics = topics;
00380 }
00381 
00382 ClientStatus Client::getClientStatus(void)
00383 {
00384     return _status;
00385 }
00386 
00387 void Client::setWaitWillMsgFlg(bool flg)
00388 {
00389     _waitWillMsgFlg = flg;
00390 }
00391 
00392 bool Client::isWaitWillMsg(void)
00393 {
00394     return _waitWillMsgFlg;
00395 }
00396 
00397 bool Client::isDisconnect(void)
00398 {
00399     return (_status == Cstat_Disconnected);
00400 }
00401 
00402 bool Client::isActive(void)
00403 {
00404     return (_status == Cstat_Active);
00405 }
00406 
00407 bool Client::isSleep(void)
00408 {
00409     return (_status == Cstat_Asleep);
00410 }
00411 
00412 bool Client::isAwake(void)
00413 {
00414     return (_status == Cstat_Awake);
00415 }
00416 
00417 bool Client::isConnecting(void)
00418 {
00419     return (_status == Cstat_Connecting);
00420 }
00421 
00422 bool Client::isSecureNetwork(void)
00423 {
00424     return _secureNetwork;
00425 }
00426 
00427 bool Client::isSensorNetStable(void)
00428 {
00429     return _sensorNetype;
00430 }
00431 
00432 WaitREGACKPacketList* Client::getWaitREGACKPacketList()
00433 {
00434     return &_waitREGACKList;
00435 }
00436 
00437 Client* Client::getNextClient(void)
00438 {
00439     return _nextClient;
00440 }
00441 
00442 void Client::setClientId(MQTTSNString id)
00443 {
00444     if ( _clientId )
00445     {
00446         free(_clientId);
00447     }
00448 
00449     if ( id.cstring )
00450     {
00451         _clientId = (char*)calloc(strlen(id.cstring) + 1, 1);
00452         memcpy(_clientId, id.cstring, strlen(id.cstring));
00453     }
00454     else
00455     {
00456         /* save clientId into (char*)_clientId NULL terminated */
00457         _clientId = (char*)calloc(MQTTSNstrlen(id) + 1, 1);
00458         unsigned char* ptr = (unsigned char*)_clientId;
00459         writeMQTTSNString((unsigned char**)&ptr, id);
00460     }
00461 }
00462 
00463 void Client::setWillTopic(MQTTSNString willTopic)
00464 {
00465     if ( _willTopic )
00466     {
00467         free(_willTopic);
00468     }
00469 
00470     _willTopic = (char*)calloc(MQTTSNstrlen(willTopic) + 1, 1);
00471     /* save willTopic into (char*)_willTopic  with NULL termination */
00472     unsigned char* ptr = (unsigned char*)_willTopic;
00473     writeMQTTSNString((unsigned char**)&ptr, willTopic);
00474 }
00475 
00476 void Client::setWillMsg(MQTTSNString willMsg)
00477 {
00478     if ( _willMsg)
00479     {
00480         free(_willMsg);
00481     }
00482 
00483     _willMsg = (char*)calloc(MQTTSNstrlen(willMsg) + 1, 1);
00484     /* save willMsg into (char*)_willMsg  with NULL termination */
00485     unsigned char* ptr = (unsigned char*)_willMsg;
00486     writeMQTTSNString((unsigned char**)&ptr, willMsg);
00487 }
00488 
00489 char* Client::getClientId(void)
00490 {
00491     return _clientId;
00492 }
00493 
00494 char* Client::getWillTopic(void)
00495 {
00496     return _willTopic;
00497 }
00498 
00499 char* Client::getWillMsg(void)
00500 {
00501     return _willMsg;
00502 }
00503 
00504 const char* Client::getStatus(void)
00505 {
00506     return theClientStatus[_status];
00507 }
00508 
00509 bool Client::isQoSm1Proxy(void)
00510 {
00511     return _clientType == Ctype_Proxy;
00512 }
00513 
00514 bool Client::isForwarded(void)
00515 {
00516     return _clientType == Ctype_Forwarded;
00517 }
00518 
00519 bool Client::isAggregated(void)
00520 {
00521     return _clientType == Ctype_Aggregated;
00522 }
00523 
00524 bool Client::isAggregater(void)
00525 {
00526     return _clientType == Ctype_Aggregater;
00527 }
00528 
00529 void Client::setAdapterType(AdapterType type)
00530 {
00531     switch ( type )
00532     {
00533     case Atype_QoSm1Proxy:
00534         _clientType = Ctype_Proxy;
00535         break;
00536     case Atype_Aggregater:
00537         _clientType = Ctype_Aggregater;
00538         break;
00539     default:
00540         throw Exception("Client::setAdapterType(): Invalid Type.");
00541         break;
00542     }
00543 }
00544 
00545 bool Client::isAdapter(void)
00546 {
00547     return _clientType == Ctype_Proxy || _clientType == Ctype_Aggregater;
00548 }
00549 
00550 bool Client::isQoSm1(void)
00551 {
00552     return _clientType == Ctype_QoS_1;
00553 }
00554 
00555 void Client::setQoSm1(void)
00556 {
00557     _clientType =  Ctype_QoS_1;
00558 }
00559 
00560 void Client::setAggregated(void)
00561 {
00562     _clientType = Ctype_Aggregated;
00563 }
00564 
00565 void Client::holdPingRequest(void)
00566 {
00567     _holdPingRequest = true;
00568 }
00569 
00570 void Client::resetPingRequest(void)
00571 {
00572     _holdPingRequest = false;
00573 }
00574 
00575 bool Client::isHoldPringReqest(void)
00576 {
00577     return _holdPingRequest;
00578 }
00579 
00580 
00581 
00582 /*=====================================
00583  Class WaitREGACKPacket
00584  =====================================*/
00585 waitREGACKPacket::waitREGACKPacket(MQTTSNPacket* packet, uint16_t REGACKMsgId)
00586 {
00587     _packet = packet;
00588     _msgId = REGACKMsgId;
00589     _next = nullptr;
00590     _prev = nullptr;
00591 }
00592 
00593 waitREGACKPacket::~waitREGACKPacket()
00594 {
00595     delete _packet;
00596 }
00597 
00598 /*=====================================
00599  Class WaitREGACKPacketList
00600  =====================================*/
00601 
00602 WaitREGACKPacketList::WaitREGACKPacketList()
00603 {
00604     _first = nullptr;
00605     _end = nullptr;
00606     _cnt = 0;
00607 }
00608 
00609 WaitREGACKPacketList::~WaitREGACKPacketList()
00610 {
00611     waitREGACKPacket* p = _first;
00612     while (p)
00613     {
00614         waitREGACKPacket* q = p->_next;
00615         delete p;
00616         p = q;
00617     }
00618 }
00619 
00620 int WaitREGACKPacketList::setPacket(MQTTSNPacket* packet, uint16_t REGACKMsgId)
00621 {
00622     waitREGACKPacket* elm = new waitREGACKPacket(packet, REGACKMsgId);
00623     if (elm == nullptr)
00624     {
00625         return 0;
00626     }
00627 
00628     if (_first == nullptr)
00629     {
00630         _first = elm;
00631         _end = elm;
00632     }
00633     else
00634     {
00635         _end->_next = elm;
00636         elm->_prev = _end;
00637         _end = elm;
00638     }
00639     _cnt++;
00640     return 1;
00641 }
00642 
00643 MQTTSNPacket* WaitREGACKPacketList::getPacket(uint16_t REGACKMsgId)
00644 {
00645     waitREGACKPacket* p = _first;
00646     while (p)
00647     {
00648         if (p->_msgId == REGACKMsgId)
00649         {
00650             return p->_packet;
00651         }
00652         p = p->_next;
00653     }
00654     return nullptr;
00655 }
00656 
00657 void WaitREGACKPacketList::erase(uint16_t REGACKMsgId)
00658 {
00659     waitREGACKPacket* p = _first;
00660     while (p)
00661     {
00662         if (p->_msgId == REGACKMsgId)
00663         {
00664             if (p->_prev == nullptr)
00665             {
00666                 _first = p->_next;
00667 
00668             }
00669             else
00670             {
00671                 p->_prev->_next = p->_next;
00672             }
00673             if (p->_next == nullptr)
00674             {
00675                 _end = p->_prev;
00676             }
00677             else
00678             {
00679                 p->_next->_prev = p->_prev;
00680             }
00681             _cnt--;
00682             break;
00683             // Do not delete element. Element is deleted after sending to Client.
00684         }
00685         p = p->_next;
00686     }
00687 }
00688 
00689 uint8_t WaitREGACKPacketList::getCount(void)
00690 {
00691     return _cnt;
00692 }
00693 
00694