Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
LGwProxy.cpp
00001 /************************************************************************************** 00002 * Copyright (c) 2016-2018, Tomoaki Yamaguchi 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Tomoaki Yamaguchi - initial API and implementation and/or initial documentation 00015 **************************************************************************************/ 00016 00017 #include <string.h> 00018 #include <stdio.h> 00019 00020 #include "LMqttsnClientApp.h" 00021 #include "LGwProxy.h" 00022 #include "LMqttsnClient.h" 00023 #include "LScreen.h" 00024 00025 using namespace std; 00026 using namespace linuxAsyncClient; 00027 00028 extern void setUint16(uint8_t* pos, uint16_t val); 00029 extern uint16_t getUint16(const uint8_t* pos); 00030 extern LMqttsnClient* theClient; 00031 extern LScreen* theScreen; 00032 00033 /*===================================== 00034 Class GwProxy 00035 ======================================*/ 00036 static const char* packet_names[] = { "ADVERTISE", "SEARCHGW", "GWINFO", "RESERVED", "CONNECT", "CONNACK", 00037 "WILLTOPICREQ", "WILLTOPIC", "WILLMSGREQ", "WILLMSG", "REGISTER", "REGACK", "PUBLISH", "PUBACK", "PUBCOMP", 00038 "PUBREC", "PUBREL", "RESERVED", "SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK", "PINGREQ", "PINGRESP", 00039 "DISCONNECT", "RESERVED", "WILLTOPICUPD", "WILLTOPICRESP", "WILLMSGUPD", "WILLMSGRESP" }; 00040 00041 LGwProxy::LGwProxy() 00042 { 00043 _nextMsgId = 0; 00044 _status = GW_LOST; 00045 _gwId = 0; 00046 _willTopic = 0; 00047 _willMsg = 0; 00048 _qosWill = 0; 00049 _retainWill = 0; 00050 _tkeepAlive = MQTTSN_DEFAULT_KEEPALIVE; 00051 _tAdv = MQTTSN_DEFAULT_DURATION; 00052 _cleanSession = 0; 00053 _pingStatus = 0; 00054 _connectRetry = MQTTSN_RETRY_COUNT; 00055 _tSleep = 0; 00056 _tWake = 0; 00057 _initialized = 0; 00058 _isForwarderMode = false; 00059 _isQoSMinus1Mode = false; 00060 } 00061 00062 LGwProxy::~LGwProxy() 00063 { 00064 _topicTbl.clearTopic(); 00065 } 00066 00067 void LGwProxy::initialize(LUdpConfig netconf, LMqttsnConfig mqconf) 00068 { 00069 _network.initialize(netconf); 00070 _clientId = netconf.clientId; 00071 _willTopic = mqconf.willTopic; 00072 _willMsg = mqconf.willMsg; 00073 _qosWill = mqconf.willQos; 00074 _retainWill = mqconf.willRetain; 00075 _cleanSession = mqconf.cleanSession; 00076 _tkeepAlive = mqconf.keepAlive; 00077 _initialized = 1; 00078 } 00079 00080 void LGwProxy::connect() 00081 { 00082 char* pos; 00083 00084 while (_status != GW_CONNECTED) 00085 { 00086 pos = _msg; 00087 00088 if (_status == GW_LOST) 00089 { 00090 00091 *pos++ = 3; 00092 *pos++ = MQTTSN_TYPE_SEARCHGW; 00093 *pos = 0; // SERCHGW 00094 _status = GW_SEARCHING; 00095 writeGwMsg(); 00096 } 00097 else if (_status == GW_SEND_WILLMSG) 00098 { 00099 *pos++ = 2 + (uint8_t) strlen(_willMsg); 00100 *pos++ = MQTTSN_TYPE_WILLMSG; 00101 strcpy(pos, _willMsg); // WILLMSG 00102 _status = GW_WAIT_CONNACK; 00103 writeGwMsg(); 00104 } 00105 else if (_status == GW_SEND_WILLTOPIC) 00106 { 00107 *pos++ = 3 + (uint8_t) strlen(_willTopic); 00108 *pos++ = MQTTSN_TYPE_WILLTOPIC; 00109 *pos++ = _qosWill | _retainWill; 00110 strcpy(pos, _willTopic); // WILLTOPIC 00111 _status = GW_WAIT_WILLMSGREQ; 00112 writeGwMsg(); 00113 } 00114 else if (_status == GW_CONNECTING || _status == GW_DISCONNECTED || _status == GW_SLEPT) 00115 { 00116 uint8_t clientIdLen = uint8_t(strlen(_clientId) > 23 ? 23 : strlen(_clientId)); 00117 if (_isQoSMinus1Mode) 00118 { 00119 _status = GW_CONNECTED; 00120 } 00121 else 00122 { 00123 *pos++ = 6 + clientIdLen; 00124 *pos++ = MQTTSN_TYPE_CONNECT; 00125 pos++; 00126 if (_cleanSession) 00127 { 00128 _msg[2] = MQTTSN_FLAG_CLEAN; 00129 } 00130 *pos++ = MQTTSN_PROTOCOL_ID; 00131 setUint16((uint8_t*) pos, _tkeepAlive); 00132 pos += 2; 00133 strncpy(pos, _clientId, clientIdLen); 00134 _msg[6 + clientIdLen] = 0; 00135 _status = GW_WAIT_CONNACK; 00136 if (_willMsg && _willTopic && _status != GW_SLEPT) 00137 { 00138 if (strlen(_willMsg) && strlen(_willTopic)) 00139 { 00140 _msg[2] = _msg[2] | MQTTSN_FLAG_WILL; // CONNECT 00141 _status = GW_WAIT_WILLTOPICREQ; 00142 } 00143 } 00144 writeGwMsg(); 00145 _connectRetry = MQTTSN_RETRY_COUNT; 00146 } 00147 } 00148 getConnectResponce(); 00149 } 00150 return; 00151 } 00152 00153 int LGwProxy::getConnectResponce(void) 00154 { 00155 int len = readMsg(); 00156 00157 if (len == 0) 00158 { 00159 if (_sendUTC + MQTTSN_TIME_RETRY < time(NULL)) 00160 { 00161 if (_msg[1] == MQTTSN_TYPE_CONNECT) 00162 { 00163 _connectRetry--; 00164 } 00165 if (--_retryCount > 0) 00166 { 00167 writeMsg((const uint8_t*) _msg); // Not writeGwMsg() : not to reset the counter. 00168 _sendUTC = time(NULL); 00169 } 00170 else 00171 { 00172 _sendUTC = 0; 00173 if (_status > GW_SEARCHING && _connectRetry > 0) 00174 { 00175 _status = GW_CONNECTING; 00176 } 00177 else 00178 { 00179 _status = GW_LOST; 00180 _gwId = 0; 00181 } 00182 return -1; 00183 } 00184 } 00185 return 0; 00186 } 00187 else if (_mqttsnMsg[0] == MQTTSN_TYPE_GWINFO && _status == GW_SEARCHING) 00188 { 00189 _network.setGwAddress(); 00190 _gwId = _mqttsnMsg[1]; 00191 _status = GW_CONNECTING; 00192 } 00193 else if (_mqttsnMsg[0] == MQTTSN_TYPE_WILLTOPICREQ && _status == GW_WAIT_WILLTOPICREQ) 00194 { 00195 _status = GW_SEND_WILLTOPIC; 00196 } 00197 else if (_mqttsnMsg[0] == MQTTSN_TYPE_WILLMSGREQ && _status == GW_WAIT_WILLMSGREQ) 00198 { 00199 _status = GW_SEND_WILLMSG; 00200 } 00201 else if (_mqttsnMsg[0] == MQTTSN_TYPE_CONNACK && _status == GW_WAIT_CONNACK) 00202 { 00203 if (_mqttsnMsg[1] == MQTTSN_RC_ACCEPTED) 00204 { 00205 _status = GW_CONNECTED; 00206 _connectRetry = MQTTSN_RETRY_COUNT; 00207 setPingReqTimer(); 00208 if (_tSleep) 00209 { 00210 _tSleep = 0; 00211 } 00212 else 00213 { 00214 DISPLAY("\033[0m\033[0;32m\n\n Connected to the Broker\033[0m\033[0;37m\n\n"); 00215 00216 if (_cleanSession || _initialized == 1) 00217 { 00218 _topicTbl.clearTopic(); 00219 _initialized = 0; 00220 theClient->onConnect(); // SUBSCRIBEs are conducted 00221 } 00222 } 00223 } 00224 else 00225 { 00226 _status = GW_CONNECTING; 00227 } 00228 } 00229 return 1; 00230 } 00231 00232 void LGwProxy::reconnect(void) 00233 { 00234 D_MQTTLOG("...Gateway reconnect\r\n"); 00235 _status = GW_DISCONNECTED; 00236 connect(); 00237 } 00238 00239 void LGwProxy::disconnect(uint16_t secs) 00240 { 00241 _tSleep = secs; 00242 _tWake = 0; 00243 00244 _msg[1] = MQTTSN_TYPE_DISCONNECT; 00245 00246 if (secs) 00247 { 00248 _msg[0] = 4; 00249 setUint16((uint8_t*) _msg + 2, secs); 00250 _status = GW_SLEEPING; 00251 } 00252 else 00253 { 00254 _msg[0] = 2; 00255 _keepAliveTimer.stop(); 00256 _status = GW_DISCONNECTING; 00257 } 00258 00259 _retryCount = MQTTSN_RETRY_COUNT; 00260 writeMsg((const uint8_t*) _msg); 00261 _sendUTC = time(NULL); 00262 00263 while (_status != GW_DISCONNECTED && _status != GW_SLEPT) 00264 { 00265 if (getDisconnectResponce() < 0) 00266 { 00267 _status = GW_LOST; 00268 DISPLAY("\033[0m\033[0;31m\n\n!!!!!! DISCONNECT Error !!!!!\033[0m\033[0;37m \n\n"); 00269 return; 00270 } 00271 } 00272 } 00273 00274 int LGwProxy::getDisconnectResponce(void) 00275 { 00276 int len = readMsg(); 00277 00278 if (len == 0) 00279 { 00280 if (_sendUTC + MQTTSN_TIME_RETRY < time(NULL)) 00281 { 00282 if (--_retryCount >= 0) 00283 { 00284 writeMsg((const uint8_t*) _msg); 00285 _sendUTC = time(NULL); 00286 } 00287 else 00288 { 00289 _status = GW_LOST; 00290 _gwId = 0; 00291 return -1; 00292 } 00293 } 00294 return 0; 00295 } 00296 else if (_mqttsnMsg[0] == MQTTSN_TYPE_DISCONNECT) 00297 { 00298 if (_status == GW_SLEEPING) 00299 { 00300 _status = GW_SLEPT; 00301 uint32_t remain = _keepAliveTimer.getRemain(); 00302 theClient->setSleepMode(remain); 00303 00304 /* Wake up and starts from this point. */ 00305 00306 } 00307 else 00308 { 00309 _status = GW_DISCONNECTED; 00310 } 00311 } 00312 return 0; 00313 } 00314 00315 int LGwProxy::getMessage(void) 00316 { 00317 int len = readMsg(); 00318 if (len < 0) 00319 { 00320 return len; //error 00321 } 00322 #ifdef DEBUG_MQTTSN 00323 if (len) 00324 { 00325 D_MQTTLOG(" recved msgType %x\n", _mqttsnMsg[0]); 00326 } 00327 #endif 00328 00329 if (len == 0) 00330 { 00331 // Check PINGREQ required 00332 checkPingReq(); 00333 00334 // Check ADVERTISE valid 00335 checkAdvertise(); 00336 00337 // Check Timeout of REGISTERs 00338 _regMgr.checkTimeout(); 00339 00340 // Check Timeout of PUBLISHes, 00341 theClient->getPublishManager()->checkTimeout(); 00342 00343 // Check Timeout of SUBSCRIBEs, 00344 theClient->getSubscribeManager()->checkTimeout(); 00345 00346 } 00347 else if (_mqttsnMsg[0] == MQTTSN_TYPE_PUBLISH) 00348 { 00349 theClient->getPublishManager()->published(_mqttsnMsg, len); 00350 00351 } 00352 else if (_mqttsnMsg[0] == MQTTSN_TYPE_PUBACK || _mqttsnMsg[0] == MQTTSN_TYPE_PUBCOMP 00353 || _mqttsnMsg[0] == MQTTSN_TYPE_PUBREC || _mqttsnMsg[0] == MQTTSN_TYPE_PUBREL) 00354 { 00355 theClient->getPublishManager()->responce(_mqttsnMsg, (uint16_t) len); 00356 00357 } 00358 else if (_mqttsnMsg[0] == MQTTSN_TYPE_SUBACK || _mqttsnMsg[0] == MQTTSN_TYPE_UNSUBACK) 00359 { 00360 theClient->getSubscribeManager()->responce(_mqttsnMsg); 00361 00362 } 00363 else if (_mqttsnMsg[0] == MQTTSN_TYPE_REGISTER) 00364 { 00365 _regMgr.responceRegister(_mqttsnMsg, len); 00366 00367 } 00368 else if (_mqttsnMsg[0] == MQTTSN_TYPE_REGACK) 00369 { 00370 _regMgr.responceRegAck(getUint16(_mqttsnMsg + 3), getUint16(_mqttsnMsg + 1)); 00371 00372 } 00373 else if (_mqttsnMsg[0] == MQTTSN_TYPE_PINGRESP) 00374 { 00375 if (_pingStatus == GW_WAIT_PINGRESP) 00376 { 00377 _pingStatus = 0; 00378 setPingReqTimer(); 00379 00380 if (_tSleep > 0) 00381 { 00382 _tWake += _tkeepAlive; 00383 if (_tWake < _tSleep) 00384 { 00385 theClient->setSleepMode(_tkeepAlive * 1000UL); 00386 } 00387 else 00388 { 00389 DISPLAY("\033[0m\033[0;32m\n\n Get back to ACTIVE.\033[0m\033[0;37m\n\n"); 00390 _tWake = 0; 00391 connect(); 00392 } 00393 } 00394 } 00395 } 00396 else if (_mqttsnMsg[0] == MQTTSN_TYPE_DISCONNECT) 00397 { 00398 _status = GW_LOST; 00399 _gwAliveTimer.stop(); 00400 _keepAliveTimer.stop(); 00401 } 00402 else if (_mqttsnMsg[0] == MQTTSN_TYPE_ADVERTISE) 00403 { 00404 if (getUint16((const uint8_t*) (_mqttsnMsg + 2)) < 61) 00405 { 00406 _tAdv = getUint16((const uint8_t*) (_mqttsnMsg + 2)) * 1500; 00407 } 00408 else 00409 { 00410 _tAdv = getUint16((const uint8_t*) (_mqttsnMsg + 2)) * 1100; 00411 } 00412 _gwAliveTimer.start(_tAdv); 00413 } 00414 return 0; 00415 } 00416 00417 uint16_t LGwProxy::registerTopic(char* topicName, uint16_t topicId) 00418 { 00419 uint16_t id = topicId; 00420 if (id == 0) 00421 { 00422 id = _topicTbl.getTopicId(topicName); 00423 _regMgr.registerTopic(topicName); 00424 } 00425 return id; 00426 } 00427 00428 int LGwProxy::writeMsg(const uint8_t* msg) 00429 { 00430 uint16_t len; 00431 uint8_t pos; 00432 uint8_t rc = 0; 00433 00434 if (msg[0] == 0x01) 00435 { 00436 len = getUint16(msg + 1); 00437 pos = 2; 00438 } 00439 else 00440 { 00441 len = msg[0]; 00442 pos = 1; 00443 } 00444 00445 if (msg[0] == 3 && msg[1] == MQTTSN_TYPE_SEARCHGW) 00446 { 00447 rc = _network.broadcast(msg, len); 00448 } 00449 else 00450 { 00451 if (_isForwarderMode) 00452 { 00453 // create a forwarder encapsulation message WirelessNodeId is a 4bytes fake data 00454 uint8_t* buf = (uint8_t*) malloc(len + 7); 00455 buf[0] = 7; 00456 buf[1] = MQTTSN_TYPE_ENCAPSULATED; 00457 buf[2] = 1; 00458 buf[3] = 'w'; 00459 buf[4] = 'n'; 00460 buf[5] = 'I'; 00461 buf[6] = 'd'; 00462 memcpy(buf + 7, msg, len); 00463 if (buf) 00464 rc = _network.unicast(buf, len + 7); 00465 free(buf); 00466 DISPLAY(" Encapsulated\n "); 00467 } 00468 else 00469 { 00470 rc = _network.unicast(msg, len); 00471 } 00472 00473 if (rc > 0) 00474 { 00475 if (msg[pos] >= MQTTSN_TYPE_ADVERTISE && msg[pos] <= MQTTSN_TYPE_WILLMSGRESP) 00476 { 00477 DISPLAY(" send %s\n", packet_names[msg[pos]]); 00478 } 00479 } 00480 } 00481 return rc; 00482 } 00483 00484 void LGwProxy::writeGwMsg(void) 00485 { 00486 _retryCount = MQTTSN_RETRY_COUNT; 00487 writeMsg((const uint8_t*) _msg); 00488 _sendUTC = time(NULL); 00489 } 00490 00491 int LGwProxy::readMsg(void) 00492 { 00493 int len = 0; 00494 uint8_t* msg = _network.getMessage(&len); 00495 _mqttsnMsg = msg; 00496 00497 if (len == 0) 00498 { 00499 return 0; 00500 } 00501 00502 if (_mqttsnMsg[0] == 0x01) 00503 { 00504 int msgLen = (int) getUint16((const uint8_t*) _mqttsnMsg + 1); 00505 if (len != msgLen) 00506 { 00507 _mqttsnMsg += 3; 00508 len = msgLen - 3; 00509 } 00510 } 00511 else 00512 { 00513 _mqttsnMsg += 1; 00514 len -= 1; 00515 } 00516 00517 if (*_mqttsnMsg == MQTTSN_TYPE_ENCAPSULATED) 00518 { 00519 int lenEncap = len + 1; 00520 00521 if (msg[lenEncap] == 0x01) 00522 { 00523 int msgLen = (int) getUint16((const uint8_t*) (msg + lenEncap + 1)); 00524 msg += (lenEncap + 3); 00525 len = msgLen - 3; 00526 } 00527 else 00528 { 00529 msg += (lenEncap + 1); 00530 len = *(msg - 1); 00531 } 00532 _mqttsnMsg = msg; 00533 DISPLAY(" recv encapslated message\n"); 00534 } 00535 00536 if (*_mqttsnMsg >= MQTTSN_TYPE_ADVERTISE && *_mqttsnMsg <= MQTTSN_TYPE_WILLMSGRESP) 00537 { 00538 DISPLAY(" recv %s\n", packet_names[*_mqttsnMsg]); 00539 } 00540 return len; 00541 } 00542 00543 void LGwProxy::setWillTopic(const char* willTopic, uint8_t qos, bool retain) 00544 { 00545 _willTopic = willTopic; 00546 _retainWill = _qosWill = 0; 00547 if (qos == 1) 00548 { 00549 _qosWill = MQTTSN_FLAG_QOS_1; 00550 } 00551 else if (qos == 2) 00552 { 00553 _qosWill = MQTTSN_FLAG_QOS_2; 00554 } 00555 if (retain) 00556 { 00557 _retainWill = MQTTSN_FLAG_RETAIN; 00558 } 00559 } 00560 void LGwProxy::setWillMsg(const char* willMsg) 00561 { 00562 _willMsg = willMsg; 00563 } 00564 00565 void LGwProxy::setCleanSession(bool flg) 00566 { 00567 if (flg) 00568 { 00569 _cleanSession = MQTTSN_FLAG_CLEAN; 00570 } 00571 else 00572 { 00573 _cleanSession = 0; 00574 } 00575 } 00576 00577 uint16_t LGwProxy::getNextMsgId(void) 00578 { 00579 _nextMsgId++; 00580 if (_nextMsgId == 0) 00581 { 00582 _nextMsgId = 1; 00583 } 00584 return _nextMsgId; 00585 } 00586 00587 void LGwProxy::checkPingReq(void) 00588 { 00589 if ( _isQoSMinus1Mode ) 00590 { 00591 return; 00592 } 00593 00594 uint8_t msg[2]; 00595 msg[0] = 0x02; 00596 msg[1] = MQTTSN_TYPE_PINGREQ; 00597 00598 if ((_status == GW_CONNECTED || _status == GW_SLEPT) && isPingReqRequired() && _pingStatus != GW_WAIT_PINGRESP) 00599 { 00600 _pingStatus = GW_WAIT_PINGRESP; 00601 _pingRetryCount = MQTTSN_RETRY_COUNT; 00602 00603 writeMsg((const uint8_t*) msg); 00604 _pingSendUTC = time(NULL); 00605 } 00606 else if (_pingStatus == GW_WAIT_PINGRESP) 00607 { 00608 if (_pingSendUTC + MQTTSN_TIME_RETRY < time(NULL)) 00609 { 00610 if (--_pingRetryCount > 0) 00611 { 00612 writeMsg((const uint8_t*) msg); 00613 _pingSendUTC = time(NULL); 00614 } 00615 else 00616 { 00617 _status = GW_LOST; 00618 _gwId = 0; 00619 _pingStatus = 0; 00620 _keepAliveTimer.stop(); 00621 D_MQTTLOG(" !!! PINGREQ Timeout\n"); 00622 } 00623 } 00624 } 00625 } 00626 00627 void LGwProxy::checkAdvertise(void) 00628 { 00629 if (_gwAliveTimer.isTimeUp()) 00630 { 00631 _status = GW_LOST; 00632 _gwId = 0; 00633 _pingStatus = 0; 00634 _gwAliveTimer.stop(); 00635 _keepAliveTimer.stop(); 00636 D_MQTTLOG(" !!! ADVERTISE Timeout\n"); 00637 } 00638 } 00639 00640 LTopicTable* LGwProxy::getTopicTable(void) 00641 { 00642 return &_topicTbl; 00643 } 00644 00645 LRegisterManager* LGwProxy::getRegisterManager(void) 00646 { 00647 return &_regMgr; 00648 } 00649 00650 bool LGwProxy::isPingReqRequired(void) 00651 { 00652 return _keepAliveTimer.isTimeUp(_tkeepAlive * 1000UL); 00653 } 00654 00655 void LGwProxy::setPingReqTimer(void) 00656 { 00657 _keepAliveTimer.start(_tkeepAlive * 1000UL); 00658 } 00659 00660 const char* LGwProxy::getClientId(void) 00661 { 00662 return _clientId; 00663 } 00664 00665 void LGwProxy::setForwarderMode(bool valid) 00666 { 00667 _isForwarderMode = valid; 00668 } 00669 00670 void LGwProxy::setQoSMinus1Mode(bool valid) 00671 { 00672 _isQoSMinus1Mode = valid; 00673 }
Generated on Wed Jul 13 2022 10:46:02 by
1.7.2