Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
MQTTSNGWClient.cpp
00001 /************************************************************************************** 00002 * Copyright (c) 2016, Tomoaki Yamaguchi 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Tomoaki Yamaguchi - initial API and implementation and/or initial documentation 00015 * Tieto Poland Sp. z o.o. - Gateway improvements 00016 **************************************************************************************/ 00017 00018 #include "MQTTSNGWDefines.h" 00019 #include "MQTTSNGWClientList.h" 00020 #include "MQTTSNGateway.h" 00021 #include "SensorNetwork.h" 00022 #include <string> 00023 #include <string.h> 00024 #include <stdio.h> 00025 00026 #include "MQTTSNGWForwarder.h" 00027 00028 using namespace MQTTSNGW; 00029 char* currentDateTime(void); 00030 00031 00032 /*===================================== 00033 Class Client 00034 =====================================*/ 00035 static const char* theClientStatus[] = { "Disconnected", "TryConnecting", "Connecting", "Active", "Asleep", "Awake", "Lost" }; 00036 00037 Client::Client(bool secure) 00038 { 00039 _packetId = 0; 00040 _snMsgId = 0; 00041 _status = Cstat_Disconnected; 00042 _keepAliveMsec = 0; 00043 _topics = new Topics(); 00044 _clientId = nullptr; 00045 _willTopic = nullptr; 00046 _willMsg = nullptr; 00047 _connectData = MQTTPacket_Connect_Initializer; 00048 _network = new Network(secure); 00049 _secureNetwork = secure; 00050 _sensorNetype = true; 00051 _connAck = nullptr; 00052 _waitWillMsgFlg = false; 00053 _sessionStatus = false; 00054 _prevClient = nullptr; 00055 _nextClient = nullptr; 00056 _clientSleepPacketQue.setMaxSize(MAX_SAVED_PUBLISH); 00057 _proxyPacketQue.setMaxSize(MAX_SAVED_PUBLISH); 00058 _hasPredefTopic = false; 00059 _holdPingRequest = false; 00060 _forwarder = nullptr; 00061 _clientType = Ctype_Regular; 00062 } 00063 00064 Client::~Client() 00065 { 00066 if ( _topics ) 00067 { 00068 delete _topics; 00069 } 00070 00071 if ( _clientId ) 00072 { 00073 free(_clientId); 00074 } 00075 00076 if ( _willTopic ) 00077 { 00078 free(_willTopic); 00079 } 00080 00081 if ( _willMsg ) 00082 { 00083 free(_willMsg); 00084 } 00085 00086 if (_connAck) 00087 { 00088 delete _connAck; 00089 } 00090 00091 if (_network) 00092 { 00093 delete _network; 00094 } 00095 } 00096 00097 TopicIdMapElement* Client::getWaitedPubTopicId(uint16_t msgId) 00098 { 00099 return _waitedPubTopicIdMap.getElement(msgId); 00100 } 00101 00102 TopicIdMapElement* Client::getWaitedSubTopicId(uint16_t msgId) 00103 { 00104 return _waitedSubTopicIdMap.getElement(msgId); 00105 } 00106 00107 MQTTGWPacket* Client::getClientSleepPacket() 00108 { 00109 return _clientSleepPacketQue.getPacket(); 00110 } 00111 00112 void Client::deleteFirstClientSleepPacket() 00113 { 00114 _clientSleepPacketQue.pop(); 00115 } 00116 00117 int Client::setClientSleepPacket(MQTTGWPacket* packet) 00118 { 00119 int rc = _clientSleepPacketQue.post(packet); 00120 if ( rc ) 00121 { 00122 WRITELOG("%s %s is sleeping. the packet was saved.\n", currentDateTime(), _clientId); 00123 } 00124 else 00125 { 00126 WRITELOG("%s %s is sleeping but discard the packet.\n", currentDateTime(), _clientId); 00127 } 00128 return rc; 00129 } 00130 00131 MQTTSNPacket* Client::getProxyPacket(void) 00132 { 00133 return _proxyPacketQue.getPacket(); 00134 } 00135 00136 void Client::deleteFirstProxyPacket() 00137 { 00138 _proxyPacketQue.pop(); 00139 } 00140 00141 int Client::setProxyPacket(MQTTSNPacket* packet) 00142 { 00143 int rc = _proxyPacketQue.post(packet); 00144 if ( rc ) 00145 { 00146 WRITELOG("%s %s is Disconnected. the packet was saved.\n", currentDateTime(), _clientId); 00147 } 00148 else 00149 { 00150 WRITELOG("%s %s is Disconnected and discard the packet.\n", currentDateTime(), _clientId); 00151 } 00152 return rc; 00153 } 00154 00155 Connect* Client::getConnectData(void) 00156 { 00157 return &_connectData; 00158 } 00159 00160 void Client::eraseWaitedPubTopicId(uint16_t msgId) 00161 { 00162 _waitedPubTopicIdMap.erase(msgId); 00163 } 00164 00165 void Client::eraseWaitedSubTopicId(uint16_t msgId) 00166 { 00167 _waitedSubTopicIdMap.erase(msgId); 00168 } 00169 00170 void Client::clearWaitedPubTopicId(void) 00171 { 00172 _waitedPubTopicIdMap.clear(); 00173 } 00174 00175 void Client::clearWaitedSubTopicId(void) 00176 { 00177 _waitedSubTopicIdMap.clear(); 00178 } 00179 00180 void Client::setWaitedPubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type) 00181 { 00182 _waitedPubTopicIdMap.add(msgId, topicId, type); 00183 } 00184 void Client::setWaitedSubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type) 00185 { 00186 _waitedSubTopicIdMap.add(msgId, topicId, type); 00187 } 00188 00189 bool Client::checkTimeover(void) 00190 { 00191 return (_status == Cstat_Active && _keepAliveTimer.isTimeup()); 00192 } 00193 00194 void Client::setKeepAlive(MQTTSNPacket* packet) 00195 { 00196 MQTTSNPacket_connectData param; 00197 if (packet->getCONNECT(¶m)) 00198 { 00199 _keepAliveMsec = param.duration * 1000UL; 00200 _keepAliveTimer.start(_keepAliveMsec * 1.5); 00201 } 00202 } 00203 00204 void Client::setForwarder(Forwarder* forwarder) 00205 { 00206 _forwarder = forwarder; 00207 _clientType = Ctype_Forwarded; 00208 } 00209 00210 Forwarder* Client::getForwarder(void) 00211 { 00212 return _forwarder; 00213 } 00214 00215 void Client::setSessionStatus(bool status) 00216 { 00217 _sessionStatus = status; 00218 } 00219 00220 bool Client::erasable(void) 00221 { 00222 return _sessionStatus && !_hasPredefTopic && _forwarder == nullptr; 00223 } 00224 00225 void Client::updateStatus(MQTTSNPacket* packet) 00226 { 00227 if (((_status == Cstat_Disconnected) || (_status == Cstat_Lost)) && packet->getType() == MQTTSN_CONNECT) 00228 { 00229 setKeepAlive(packet); 00230 } 00231 else if (_status == Cstat_Active) 00232 { 00233 switch (packet->getType()) 00234 { 00235 case MQTTSN_PINGREQ: 00236 case MQTTSN_PUBLISH: 00237 case MQTTSN_SUBSCRIBE: 00238 case MQTTSN_UNSUBSCRIBE: 00239 case MQTTSN_PUBACK: 00240 case MQTTSN_PUBCOMP: 00241 case MQTTSN_PUBREL: 00242 case MQTTSN_PUBREC: 00243 if ( _clientType != Ctype_Proxy ) 00244 { 00245 _keepAliveTimer.start(_keepAliveMsec * 1.5); 00246 } 00247 break; 00248 case MQTTSN_DISCONNECT: 00249 uint16_t duration; 00250 packet->getDISCONNECT(&duration); 00251 if (duration) 00252 { 00253 _status = Cstat_Asleep; 00254 } 00255 else 00256 { 00257 disconnected(); 00258 } 00259 break; 00260 default: 00261 break; 00262 } 00263 } 00264 else if (_status == Cstat_Awake || _status == Cstat_Asleep) 00265 { 00266 switch (packet->getType()) 00267 { 00268 case MQTTSN_CONNECT: 00269 _status = Cstat_Active; 00270 break; 00271 case MQTTSN_DISCONNECT: 00272 disconnected(); 00273 break; 00274 case MQTTSN_PINGREQ: 00275 _status = Cstat_Awake; 00276 break; 00277 case MQTTSN_PINGRESP: 00278 _status = Cstat_Asleep; 00279 break; 00280 default: 00281 break; 00282 } 00283 } 00284 DEBUGLOG("Client Status = %s\n", theClientStatus[_status]); 00285 } 00286 00287 void Client::updateStatus(ClientStatus stat) 00288 { 00289 _status = stat; 00290 } 00291 00292 void Client::connectSended() 00293 { 00294 _status = Cstat_Connecting; 00295 } 00296 00297 void Client::connackSended(int rc) 00298 { 00299 if (rc == MQTTSN_RC_ACCEPTED) 00300 { 00301 _status = Cstat_Active; 00302 } 00303 else 00304 { 00305 disconnected(); 00306 } 00307 } 00308 00309 void Client::disconnected(void) 00310 { 00311 _status = Cstat_Disconnected; 00312 _waitWillMsgFlg = false; 00313 } 00314 00315 void Client::tryConnect(void) 00316 { 00317 _status = Cstat_TryConnecting; 00318 } 00319 00320 bool Client::isConnectSendable(void) 00321 { 00322 if ( _status == Cstat_Lost || _status == Cstat_TryConnecting ) 00323 { 00324 return false; 00325 } 00326 else 00327 { 00328 return true; 00329 } 00330 } 00331 00332 uint16_t Client::getNextPacketId(void) 00333 { 00334 _packetId++; 00335 if ( _packetId == 0xffff ) 00336 { 00337 _packetId = 1; 00338 } 00339 return _packetId; 00340 } 00341 00342 uint8_t Client::getNextSnMsgId(void) 00343 { 00344 _snMsgId++; 00345 if (_snMsgId == 0) 00346 { 00347 _snMsgId++; 00348 } 00349 return _snMsgId; 00350 } 00351 00352 Topics* Client::getTopics(void) 00353 { 00354 return _topics; 00355 } 00356 00357 Network* Client::getNetwork(void) 00358 { 00359 return _network; 00360 } 00361 00362 void Client::setClientAddress(SensorNetAddress* sensorNetAddr) 00363 { 00364 _sensorNetAddr = *sensorNetAddr; 00365 } 00366 00367 SensorNetAddress* Client::getSensorNetAddress(void) 00368 { 00369 return &_sensorNetAddr; 00370 } 00371 00372 void Client::setSensorNetType(bool stable) 00373 { 00374 _sensorNetype = stable; 00375 } 00376 00377 void Client::setTopics(Topics* topics) 00378 { 00379 _topics = topics; 00380 } 00381 00382 ClientStatus Client::getClientStatus(void) 00383 { 00384 return _status; 00385 } 00386 00387 void Client::setWaitWillMsgFlg(bool flg) 00388 { 00389 _waitWillMsgFlg = flg; 00390 } 00391 00392 bool Client::isWaitWillMsg(void) 00393 { 00394 return _waitWillMsgFlg; 00395 } 00396 00397 bool Client::isDisconnect(void) 00398 { 00399 return (_status == Cstat_Disconnected); 00400 } 00401 00402 bool Client::isActive(void) 00403 { 00404 return (_status == Cstat_Active); 00405 } 00406 00407 bool Client::isSleep(void) 00408 { 00409 return (_status == Cstat_Asleep); 00410 } 00411 00412 bool Client::isAwake(void) 00413 { 00414 return (_status == Cstat_Awake); 00415 } 00416 00417 bool Client::isConnecting(void) 00418 { 00419 return (_status == Cstat_Connecting); 00420 } 00421 00422 bool Client::isSecureNetwork(void) 00423 { 00424 return _secureNetwork; 00425 } 00426 00427 bool Client::isSensorNetStable(void) 00428 { 00429 return _sensorNetype; 00430 } 00431 00432 WaitREGACKPacketList* Client::getWaitREGACKPacketList() 00433 { 00434 return &_waitREGACKList; 00435 } 00436 00437 Client* Client::getNextClient(void) 00438 { 00439 return _nextClient; 00440 } 00441 00442 void Client::setClientId(MQTTSNString id) 00443 { 00444 if ( _clientId ) 00445 { 00446 free(_clientId); 00447 } 00448 00449 if ( id.cstring ) 00450 { 00451 _clientId = (char*)calloc(strlen(id.cstring) + 1, 1); 00452 memcpy(_clientId, id.cstring, strlen(id.cstring)); 00453 } 00454 else 00455 { 00456 /* save clientId into (char*)_clientId NULL terminated */ 00457 _clientId = (char*)calloc(MQTTSNstrlen(id) + 1, 1); 00458 unsigned char* ptr = (unsigned char*)_clientId; 00459 writeMQTTSNString((unsigned char**)&ptr, id); 00460 } 00461 } 00462 00463 void Client::setWillTopic(MQTTSNString willTopic) 00464 { 00465 if ( _willTopic ) 00466 { 00467 free(_willTopic); 00468 } 00469 00470 _willTopic = (char*)calloc(MQTTSNstrlen(willTopic) + 1, 1); 00471 /* save willTopic into (char*)_willTopic with NULL termination */ 00472 unsigned char* ptr = (unsigned char*)_willTopic; 00473 writeMQTTSNString((unsigned char**)&ptr, willTopic); 00474 } 00475 00476 void Client::setWillMsg(MQTTSNString willMsg) 00477 { 00478 if ( _willMsg) 00479 { 00480 free(_willMsg); 00481 } 00482 00483 _willMsg = (char*)calloc(MQTTSNstrlen(willMsg) + 1, 1); 00484 /* save willMsg into (char*)_willMsg with NULL termination */ 00485 unsigned char* ptr = (unsigned char*)_willMsg; 00486 writeMQTTSNString((unsigned char**)&ptr, willMsg); 00487 } 00488 00489 char* Client::getClientId(void) 00490 { 00491 return _clientId; 00492 } 00493 00494 char* Client::getWillTopic(void) 00495 { 00496 return _willTopic; 00497 } 00498 00499 char* Client::getWillMsg(void) 00500 { 00501 return _willMsg; 00502 } 00503 00504 const char* Client::getStatus(void) 00505 { 00506 return theClientStatus[_status]; 00507 } 00508 00509 bool Client::isQoSm1Proxy(void) 00510 { 00511 return _clientType == Ctype_Proxy; 00512 } 00513 00514 bool Client::isForwarded(void) 00515 { 00516 return _clientType == Ctype_Forwarded; 00517 } 00518 00519 bool Client::isAggregated(void) 00520 { 00521 return _clientType == Ctype_Aggregated; 00522 } 00523 00524 bool Client::isAggregater(void) 00525 { 00526 return _clientType == Ctype_Aggregater; 00527 } 00528 00529 void Client::setAdapterType(AdapterType type) 00530 { 00531 switch ( type ) 00532 { 00533 case Atype_QoSm1Proxy: 00534 _clientType = Ctype_Proxy; 00535 break; 00536 case Atype_Aggregater: 00537 _clientType = Ctype_Aggregater; 00538 break; 00539 default: 00540 throw Exception("Client::setAdapterType(): Invalid Type."); 00541 break; 00542 } 00543 } 00544 00545 bool Client::isAdapter(void) 00546 { 00547 return _clientType == Ctype_Proxy || _clientType == Ctype_Aggregater; 00548 } 00549 00550 bool Client::isQoSm1(void) 00551 { 00552 return _clientType == Ctype_QoS_1; 00553 } 00554 00555 void Client::setQoSm1(void) 00556 { 00557 _clientType = Ctype_QoS_1; 00558 } 00559 00560 void Client::setAggregated(void) 00561 { 00562 _clientType = Ctype_Aggregated; 00563 } 00564 00565 void Client::holdPingRequest(void) 00566 { 00567 _holdPingRequest = true; 00568 } 00569 00570 void Client::resetPingRequest(void) 00571 { 00572 _holdPingRequest = false; 00573 } 00574 00575 bool Client::isHoldPringReqest(void) 00576 { 00577 return _holdPingRequest; 00578 } 00579 00580 00581 00582 /*===================================== 00583 Class WaitREGACKPacket 00584 =====================================*/ 00585 waitREGACKPacket::waitREGACKPacket(MQTTSNPacket* packet, uint16_t REGACKMsgId) 00586 { 00587 _packet = packet; 00588 _msgId = REGACKMsgId; 00589 _next = nullptr; 00590 _prev = nullptr; 00591 } 00592 00593 waitREGACKPacket::~waitREGACKPacket() 00594 { 00595 delete _packet; 00596 } 00597 00598 /*===================================== 00599 Class WaitREGACKPacketList 00600 =====================================*/ 00601 00602 WaitREGACKPacketList::WaitREGACKPacketList() 00603 { 00604 _first = nullptr; 00605 _end = nullptr; 00606 _cnt = 0; 00607 } 00608 00609 WaitREGACKPacketList::~WaitREGACKPacketList() 00610 { 00611 waitREGACKPacket* p = _first; 00612 while (p) 00613 { 00614 waitREGACKPacket* q = p->_next; 00615 delete p; 00616 p = q; 00617 } 00618 } 00619 00620 int WaitREGACKPacketList::setPacket(MQTTSNPacket* packet, uint16_t REGACKMsgId) 00621 { 00622 waitREGACKPacket* elm = new waitREGACKPacket(packet, REGACKMsgId); 00623 if (elm == nullptr) 00624 { 00625 return 0; 00626 } 00627 00628 if (_first == nullptr) 00629 { 00630 _first = elm; 00631 _end = elm; 00632 } 00633 else 00634 { 00635 _end->_next = elm; 00636 elm->_prev = _end; 00637 _end = elm; 00638 } 00639 _cnt++; 00640 return 1; 00641 } 00642 00643 MQTTSNPacket* WaitREGACKPacketList::getPacket(uint16_t REGACKMsgId) 00644 { 00645 waitREGACKPacket* p = _first; 00646 while (p) 00647 { 00648 if (p->_msgId == REGACKMsgId) 00649 { 00650 return p->_packet; 00651 } 00652 p = p->_next; 00653 } 00654 return nullptr; 00655 } 00656 00657 void WaitREGACKPacketList::erase(uint16_t REGACKMsgId) 00658 { 00659 waitREGACKPacket* p = _first; 00660 while (p) 00661 { 00662 if (p->_msgId == REGACKMsgId) 00663 { 00664 if (p->_prev == nullptr) 00665 { 00666 _first = p->_next; 00667 00668 } 00669 else 00670 { 00671 p->_prev->_next = p->_next; 00672 } 00673 if (p->_next == nullptr) 00674 { 00675 _end = p->_prev; 00676 } 00677 else 00678 { 00679 p->_next->_prev = p->_prev; 00680 } 00681 _cnt--; 00682 break; 00683 // Do not delete element. Element is deleted after sending to Client. 00684 } 00685 p = p->_next; 00686 } 00687 } 00688 00689 uint8_t WaitREGACKPacketList::getCount(void) 00690 { 00691 return _cnt; 00692 } 00693 00694
Generated on Wed Jul 13 2022 10:46:02 by
1.7.2