Jack Hansdampf / mbed-mqtt-GSOE1

Dependents:   ESP8266MQTT

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers LGwProxy.cpp Source File

LGwProxy.cpp

00001 /**************************************************************************************
00002  * Copyright (c) 2016-2018, Tomoaki Yamaguchi
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors:
00014  *    Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
00015  **************************************************************************************/
00016 
00017 #include <string.h>
00018 #include <stdio.h>
00019 
00020 #include "LMqttsnClientApp.h"
00021 #include "LGwProxy.h"
00022 #include "LMqttsnClient.h"
00023 #include "LScreen.h"
00024 
00025 using namespace std;
00026 using namespace linuxAsyncClient;
00027 
00028 extern void setUint16(uint8_t* pos, uint16_t val);
00029 extern uint16_t getUint16(const uint8_t* pos);
00030 extern LMqttsnClient* theClient;
00031 extern LScreen* theScreen;
00032 
00033 /*=====================================
00034  Class GwProxy
00035  ======================================*/
00036 static const char* packet_names[] = { "ADVERTISE", "SEARCHGW", "GWINFO", "RESERVED", "CONNECT", "CONNACK",
00037         "WILLTOPICREQ", "WILLTOPIC", "WILLMSGREQ", "WILLMSG", "REGISTER", "REGACK", "PUBLISH", "PUBACK", "PUBCOMP",
00038         "PUBREC", "PUBREL", "RESERVED", "SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK", "PINGREQ", "PINGRESP",
00039         "DISCONNECT", "RESERVED", "WILLTOPICUPD", "WILLTOPICRESP", "WILLMSGUPD", "WILLMSGRESP" };
00040 
00041 LGwProxy::LGwProxy()
00042 {
00043     _nextMsgId = 0;
00044     _status = GW_LOST;
00045     _gwId = 0;
00046     _willTopic = 0;
00047     _willMsg = 0;
00048     _qosWill = 0;
00049     _retainWill = 0;
00050     _tkeepAlive = MQTTSN_DEFAULT_KEEPALIVE;
00051     _tAdv = MQTTSN_DEFAULT_DURATION;
00052     _cleanSession = 0;
00053     _pingStatus = 0;
00054     _connectRetry = MQTTSN_RETRY_COUNT;
00055     _tSleep = 0;
00056     _tWake = 0;
00057     _initialized = 0;
00058     _isForwarderMode = false;
00059     _isQoSMinus1Mode = false;
00060 }
00061 
00062 LGwProxy::~LGwProxy()
00063 {
00064     _topicTbl.clearTopic();
00065 }
00066 
00067 void LGwProxy::initialize(LUdpConfig netconf, LMqttsnConfig mqconf)
00068 {
00069     _network.initialize(netconf);
00070     _clientId = netconf.clientId;
00071     _willTopic = mqconf.willTopic;
00072     _willMsg = mqconf.willMsg;
00073     _qosWill = mqconf.willQos;
00074     _retainWill = mqconf.willRetain;
00075     _cleanSession = mqconf.cleanSession;
00076     _tkeepAlive = mqconf.keepAlive;
00077     _initialized = 1;
00078 }
00079 
00080 void LGwProxy::connect()
00081 {
00082     char* pos;
00083 
00084     while (_status != GW_CONNECTED)
00085     {
00086         pos = _msg;
00087 
00088         if (_status == GW_LOST)
00089         {
00090 
00091             *pos++ = 3;
00092             *pos++ = MQTTSN_TYPE_SEARCHGW;
00093             *pos = 0;                        // SERCHGW
00094             _status = GW_SEARCHING;
00095             writeGwMsg();
00096         }
00097         else if (_status == GW_SEND_WILLMSG)
00098         {
00099             *pos++ = 2 + (uint8_t) strlen(_willMsg);
00100             *pos++ = MQTTSN_TYPE_WILLMSG;
00101             strcpy(pos, _willMsg);          // WILLMSG
00102             _status = GW_WAIT_CONNACK;
00103             writeGwMsg();
00104         }
00105         else if (_status == GW_SEND_WILLTOPIC)
00106         {
00107             *pos++ = 3 + (uint8_t) strlen(_willTopic);
00108             *pos++ = MQTTSN_TYPE_WILLTOPIC;
00109             *pos++ = _qosWill | _retainWill;
00110             strcpy(pos, _willTopic);        // WILLTOPIC
00111             _status = GW_WAIT_WILLMSGREQ;
00112             writeGwMsg();
00113         }
00114         else if (_status == GW_CONNECTING || _status == GW_DISCONNECTED || _status == GW_SLEPT)
00115         {
00116             uint8_t clientIdLen = uint8_t(strlen(_clientId) > 23 ? 23 : strlen(_clientId));
00117             if (_isQoSMinus1Mode)
00118             {
00119                 _status = GW_CONNECTED;
00120             }
00121             else
00122             {
00123                 *pos++ = 6 + clientIdLen;
00124                 *pos++ = MQTTSN_TYPE_CONNECT;
00125                 pos++;
00126                 if (_cleanSession)
00127                 {
00128                     _msg[2] = MQTTSN_FLAG_CLEAN;
00129                 }
00130                 *pos++ = MQTTSN_PROTOCOL_ID;
00131                 setUint16((uint8_t*) pos, _tkeepAlive);
00132                 pos += 2;
00133                 strncpy(pos, _clientId, clientIdLen);
00134                 _msg[6 + clientIdLen] = 0;
00135                 _status = GW_WAIT_CONNACK;
00136                 if (_willMsg && _willTopic && _status != GW_SLEPT)
00137                 {
00138                     if (strlen(_willMsg) && strlen(_willTopic))
00139                     {
00140                         _msg[2] = _msg[2] | MQTTSN_FLAG_WILL;   // CONNECT
00141                         _status = GW_WAIT_WILLTOPICREQ;
00142                     }
00143                 }
00144                 writeGwMsg();
00145                 _connectRetry = MQTTSN_RETRY_COUNT;
00146             }
00147         }
00148         getConnectResponce();
00149     }
00150     return;
00151 }
00152 
00153 int LGwProxy::getConnectResponce(void)
00154 {
00155     int len = readMsg();
00156 
00157     if (len == 0)
00158     {
00159         if (_sendUTC + MQTTSN_TIME_RETRY < time(NULL))
00160         {
00161             if (_msg[1] == MQTTSN_TYPE_CONNECT)
00162             {
00163                 _connectRetry--;
00164             }
00165             if (--_retryCount > 0)
00166             {
00167                 writeMsg((const uint8_t*) _msg);  // Not writeGwMsg() : not to reset the counter.
00168                 _sendUTC = time(NULL);
00169             }
00170             else
00171             {
00172                 _sendUTC = 0;
00173                 if (_status > GW_SEARCHING && _connectRetry > 0)
00174                 {
00175                     _status = GW_CONNECTING;
00176                 }
00177                 else
00178                 {
00179                     _status = GW_LOST;
00180                     _gwId = 0;
00181                 }
00182                 return -1;
00183             }
00184         }
00185         return 0;
00186     }
00187     else if (_mqttsnMsg[0] == MQTTSN_TYPE_GWINFO && _status == GW_SEARCHING)
00188     {
00189         _network.setGwAddress();
00190         _gwId = _mqttsnMsg[1];
00191         _status = GW_CONNECTING;
00192     }
00193     else if (_mqttsnMsg[0] == MQTTSN_TYPE_WILLTOPICREQ && _status == GW_WAIT_WILLTOPICREQ)
00194     {
00195         _status = GW_SEND_WILLTOPIC;
00196     }
00197     else if (_mqttsnMsg[0] == MQTTSN_TYPE_WILLMSGREQ && _status == GW_WAIT_WILLMSGREQ)
00198     {
00199         _status = GW_SEND_WILLMSG;
00200     }
00201     else if (_mqttsnMsg[0] == MQTTSN_TYPE_CONNACK && _status == GW_WAIT_CONNACK)
00202     {
00203         if (_mqttsnMsg[1] == MQTTSN_RC_ACCEPTED)
00204         {
00205             _status = GW_CONNECTED;
00206             _connectRetry = MQTTSN_RETRY_COUNT;
00207             setPingReqTimer();
00208             if (_tSleep)
00209             {
00210                 _tSleep = 0;
00211             }
00212             else
00213             {
00214                 DISPLAY("\033[0m\033[0;32m\n\n Connected to the Broker\033[0m\033[0;37m\n\n");
00215 
00216                 if (_cleanSession || _initialized == 1)
00217                 {
00218                     _topicTbl.clearTopic();
00219                     _initialized = 0;
00220                     theClient->onConnect();  // SUBSCRIBEs are conducted
00221                 }
00222             }
00223         }
00224         else
00225         {
00226             _status = GW_CONNECTING;
00227         }
00228     }
00229     return 1;
00230 }
00231 
00232 void LGwProxy::reconnect(void)
00233 {
00234     D_MQTTLOG("...Gateway reconnect\r\n");
00235     _status = GW_DISCONNECTED;
00236     connect();
00237 }
00238 
00239 void LGwProxy::disconnect(uint16_t secs)
00240 {
00241     _tSleep = secs;
00242     _tWake = 0;
00243 
00244     _msg[1] = MQTTSN_TYPE_DISCONNECT;
00245 
00246     if (secs)
00247     {
00248         _msg[0] = 4;
00249         setUint16((uint8_t*) _msg + 2, secs);
00250         _status = GW_SLEEPING;
00251     }
00252     else
00253     {
00254         _msg[0] = 2;
00255         _keepAliveTimer.stop();
00256         _status = GW_DISCONNECTING;
00257     }
00258 
00259     _retryCount = MQTTSN_RETRY_COUNT;
00260     writeMsg((const uint8_t*) _msg);
00261     _sendUTC = time(NULL);
00262 
00263     while (_status != GW_DISCONNECTED && _status != GW_SLEPT)
00264     {
00265         if (getDisconnectResponce() < 0)
00266         {
00267             _status = GW_LOST;
00268             DISPLAY("\033[0m\033[0;31m\n\n!!!!!! DISCONNECT  Error !!!!!\033[0m\033[0;37m \n\n");
00269             return;
00270         }
00271     }
00272 }
00273 
00274 int LGwProxy::getDisconnectResponce(void)
00275 {
00276     int len = readMsg();
00277 
00278     if (len == 0)
00279     {
00280         if (_sendUTC + MQTTSN_TIME_RETRY < time(NULL))
00281         {
00282             if (--_retryCount >= 0)
00283             {
00284                 writeMsg((const uint8_t*) _msg);
00285                 _sendUTC = time(NULL);
00286             }
00287             else
00288             {
00289                 _status = GW_LOST;
00290                 _gwId = 0;
00291                 return -1;
00292             }
00293         }
00294         return 0;
00295     }
00296     else if (_mqttsnMsg[0] == MQTTSN_TYPE_DISCONNECT)
00297     {
00298         if (_status == GW_SLEEPING)
00299         {
00300             _status = GW_SLEPT;
00301             uint32_t remain = _keepAliveTimer.getRemain();
00302             theClient->setSleepMode(remain);
00303 
00304             /* Wake up and starts from this point. */
00305 
00306         }
00307         else
00308         {
00309             _status = GW_DISCONNECTED;
00310         }
00311     }
00312     return 0;
00313 }
00314 
00315 int LGwProxy::getMessage(void)
00316 {
00317     int len = readMsg();
00318     if (len < 0)
00319     {
00320         return len;   //error
00321     }
00322 #ifdef DEBUG_MQTTSN
00323     if (len)
00324     {
00325         D_MQTTLOG(" recved msgType %x\n", _mqttsnMsg[0]);
00326     }
00327 #endif
00328 
00329     if (len == 0)
00330     {
00331         // Check PINGREQ required
00332         checkPingReq();
00333 
00334         // Check ADVERTISE valid
00335         checkAdvertise();
00336 
00337         // Check Timeout of REGISTERs
00338         _regMgr.checkTimeout();
00339 
00340         // Check Timeout of PUBLISHes,
00341         theClient->getPublishManager()->checkTimeout();
00342 
00343         // Check Timeout of SUBSCRIBEs,
00344         theClient->getSubscribeManager()->checkTimeout();
00345 
00346     }
00347     else if (_mqttsnMsg[0] == MQTTSN_TYPE_PUBLISH)
00348     {
00349         theClient->getPublishManager()->published(_mqttsnMsg, len);
00350 
00351     }
00352     else if (_mqttsnMsg[0] == MQTTSN_TYPE_PUBACK || _mqttsnMsg[0] == MQTTSN_TYPE_PUBCOMP
00353             || _mqttsnMsg[0] == MQTTSN_TYPE_PUBREC || _mqttsnMsg[0] == MQTTSN_TYPE_PUBREL)
00354     {
00355         theClient->getPublishManager()->responce(_mqttsnMsg, (uint16_t) len);
00356 
00357     }
00358     else if (_mqttsnMsg[0] == MQTTSN_TYPE_SUBACK || _mqttsnMsg[0] == MQTTSN_TYPE_UNSUBACK)
00359     {
00360         theClient->getSubscribeManager()->responce(_mqttsnMsg);
00361 
00362     }
00363     else if (_mqttsnMsg[0] == MQTTSN_TYPE_REGISTER)
00364     {
00365         _regMgr.responceRegister(_mqttsnMsg, len);
00366 
00367     }
00368     else if (_mqttsnMsg[0] == MQTTSN_TYPE_REGACK)
00369     {
00370         _regMgr.responceRegAck(getUint16(_mqttsnMsg + 3), getUint16(_mqttsnMsg + 1));
00371 
00372     }
00373     else if (_mqttsnMsg[0] == MQTTSN_TYPE_PINGRESP)
00374     {
00375         if (_pingStatus == GW_WAIT_PINGRESP)
00376         {
00377             _pingStatus = 0;
00378             setPingReqTimer();
00379 
00380             if (_tSleep > 0)
00381             {
00382                 _tWake += _tkeepAlive;
00383                 if (_tWake < _tSleep)
00384                 {
00385                     theClient->setSleepMode(_tkeepAlive * 1000UL);
00386                 }
00387                 else
00388                 {
00389                     DISPLAY("\033[0m\033[0;32m\n\n Get back to ACTIVE.\033[0m\033[0;37m\n\n");
00390                     _tWake = 0;
00391                     connect();
00392                 }
00393             }
00394         }
00395     }
00396     else if (_mqttsnMsg[0] == MQTTSN_TYPE_DISCONNECT)
00397     {
00398         _status = GW_LOST;
00399         _gwAliveTimer.stop();
00400         _keepAliveTimer.stop();
00401     }
00402     else if (_mqttsnMsg[0] == MQTTSN_TYPE_ADVERTISE)
00403     {
00404         if (getUint16((const uint8_t*) (_mqttsnMsg + 2)) < 61)
00405         {
00406             _tAdv = getUint16((const uint8_t*) (_mqttsnMsg + 2)) * 1500;
00407         }
00408         else
00409         {
00410             _tAdv = getUint16((const uint8_t*) (_mqttsnMsg + 2)) * 1100;
00411         }
00412         _gwAliveTimer.start(_tAdv);
00413     }
00414     return 0;
00415 }
00416 
00417 uint16_t LGwProxy::registerTopic(char* topicName, uint16_t topicId)
00418 {
00419     uint16_t id = topicId;
00420     if (id == 0)
00421     {
00422         id = _topicTbl.getTopicId(topicName);
00423         _regMgr.registerTopic(topicName);
00424     }
00425     return id;
00426 }
00427 
00428 int LGwProxy::writeMsg(const uint8_t* msg)
00429 {
00430     uint16_t len;
00431     uint8_t pos;
00432     uint8_t rc = 0;
00433 
00434     if (msg[0] == 0x01)
00435     {
00436         len = getUint16(msg + 1);
00437         pos = 2;
00438     }
00439     else
00440     {
00441         len = msg[0];
00442         pos = 1;
00443     }
00444 
00445     if (msg[0] == 3 && msg[1] == MQTTSN_TYPE_SEARCHGW)
00446     {
00447         rc = _network.broadcast(msg, len);
00448     }
00449     else
00450     {
00451         if (_isForwarderMode)
00452         {
00453             // create a forwarder encapsulation message  WirelessNodeId is a 4bytes fake data
00454             uint8_t* buf = (uint8_t*) malloc(len + 7);
00455             buf[0] = 7;
00456             buf[1] = MQTTSN_TYPE_ENCAPSULATED;
00457             buf[2] = 1;
00458             buf[3] = 'w';
00459             buf[4] = 'n';
00460             buf[5] = 'I';
00461             buf[6] = 'd';
00462             memcpy(buf + 7, msg, len);
00463             if (buf)
00464                 rc = _network.unicast(buf, len + 7);
00465             free(buf);
00466             DISPLAY("  Encapsulated\n ");
00467         }
00468         else
00469         {
00470             rc = _network.unicast(msg, len);
00471         }
00472 
00473         if (rc > 0)
00474         {
00475             if (msg[pos] >= MQTTSN_TYPE_ADVERTISE && msg[pos] <= MQTTSN_TYPE_WILLMSGRESP)
00476             {
00477                 DISPLAY("  send %s\n", packet_names[msg[pos]]);
00478             }
00479         }
00480     }
00481     return rc;
00482 }
00483 
00484 void LGwProxy::writeGwMsg(void)
00485 {
00486     _retryCount = MQTTSN_RETRY_COUNT;
00487     writeMsg((const uint8_t*) _msg);
00488     _sendUTC = time(NULL);
00489 }
00490 
00491 int LGwProxy::readMsg(void)
00492 {
00493     int len = 0;
00494     uint8_t* msg = _network.getMessage(&len);
00495     _mqttsnMsg = msg;
00496 
00497     if (len == 0)
00498     {
00499         return 0;
00500     }
00501 
00502     if (_mqttsnMsg[0] == 0x01)
00503     {
00504         int msgLen = (int) getUint16((const uint8_t*) _mqttsnMsg + 1);
00505         if (len != msgLen)
00506         {
00507             _mqttsnMsg += 3;
00508             len = msgLen - 3;
00509         }
00510     }
00511     else
00512     {
00513         _mqttsnMsg += 1;
00514         len -= 1;
00515     }
00516 
00517     if (*_mqttsnMsg == MQTTSN_TYPE_ENCAPSULATED)
00518     {
00519         int lenEncap = len + 1;
00520 
00521         if (msg[lenEncap] == 0x01)
00522         {
00523             int msgLen = (int) getUint16((const uint8_t*) (msg + lenEncap + 1));
00524             msg += (lenEncap + 3);
00525             len = msgLen - 3;
00526         }
00527         else
00528         {
00529             msg += (lenEncap + 1);
00530             len = *(msg - 1);
00531         }
00532         _mqttsnMsg = msg;
00533         DISPLAY("  recv encapslated message\n");
00534     }
00535 
00536     if (*_mqttsnMsg >= MQTTSN_TYPE_ADVERTISE && *_mqttsnMsg <= MQTTSN_TYPE_WILLMSGRESP)
00537     {
00538         DISPLAY("  recv %s\n", packet_names[*_mqttsnMsg]);
00539     }
00540     return len;
00541 }
00542 
00543 void LGwProxy::setWillTopic(const char* willTopic, uint8_t qos, bool retain)
00544 {
00545     _willTopic = willTopic;
00546     _retainWill = _qosWill = 0;
00547     if (qos == 1)
00548     {
00549         _qosWill = MQTTSN_FLAG_QOS_1;
00550     }
00551     else if (qos == 2)
00552     {
00553         _qosWill = MQTTSN_FLAG_QOS_2;
00554     }
00555     if (retain)
00556     {
00557         _retainWill = MQTTSN_FLAG_RETAIN;
00558     }
00559 }
00560 void LGwProxy::setWillMsg(const char* willMsg)
00561 {
00562     _willMsg = willMsg;
00563 }
00564 
00565 void LGwProxy::setCleanSession(bool flg)
00566 {
00567     if (flg)
00568     {
00569         _cleanSession = MQTTSN_FLAG_CLEAN;
00570     }
00571     else
00572     {
00573         _cleanSession = 0;
00574     }
00575 }
00576 
00577 uint16_t LGwProxy::getNextMsgId(void)
00578 {
00579     _nextMsgId++;
00580     if (_nextMsgId == 0)
00581     {
00582         _nextMsgId = 1;
00583     }
00584     return _nextMsgId;
00585 }
00586 
00587 void LGwProxy::checkPingReq(void)
00588 {
00589     if ( _isQoSMinus1Mode )
00590     {
00591         return;
00592     }
00593 
00594     uint8_t msg[2];
00595     msg[0] = 0x02;
00596     msg[1] = MQTTSN_TYPE_PINGREQ;
00597 
00598     if ((_status == GW_CONNECTED || _status == GW_SLEPT) && isPingReqRequired() && _pingStatus != GW_WAIT_PINGRESP)
00599     {
00600         _pingStatus = GW_WAIT_PINGRESP;
00601         _pingRetryCount = MQTTSN_RETRY_COUNT;
00602 
00603         writeMsg((const uint8_t*) msg);
00604         _pingSendUTC = time(NULL);
00605     }
00606     else if (_pingStatus == GW_WAIT_PINGRESP)
00607     {
00608         if (_pingSendUTC + MQTTSN_TIME_RETRY < time(NULL))
00609         {
00610             if (--_pingRetryCount > 0)
00611             {
00612                 writeMsg((const uint8_t*) msg);
00613                 _pingSendUTC = time(NULL);
00614             }
00615             else
00616             {
00617                 _status = GW_LOST;
00618                 _gwId = 0;
00619                 _pingStatus = 0;
00620                 _keepAliveTimer.stop();
00621                 D_MQTTLOG("   !!! PINGREQ Timeout\n");
00622             }
00623         }
00624     }
00625 }
00626 
00627 void LGwProxy::checkAdvertise(void)
00628 {
00629     if (_gwAliveTimer.isTimeUp())
00630     {
00631         _status = GW_LOST;
00632         _gwId = 0;
00633         _pingStatus = 0;
00634         _gwAliveTimer.stop();
00635         _keepAliveTimer.stop();
00636         D_MQTTLOG("   !!! ADVERTISE Timeout\n");
00637     }
00638 }
00639 
00640 LTopicTable* LGwProxy::getTopicTable(void)
00641 {
00642     return &_topicTbl;
00643 }
00644 
00645 LRegisterManager* LGwProxy::getRegisterManager(void)
00646 {
00647     return &_regMgr;
00648 }
00649 
00650 bool LGwProxy::isPingReqRequired(void)
00651 {
00652     return _keepAliveTimer.isTimeUp(_tkeepAlive * 1000UL);
00653 }
00654 
00655 void LGwProxy::setPingReqTimer(void)
00656 {
00657     _keepAliveTimer.start(_tkeepAlive * 1000UL);
00658 }
00659 
00660 const char* LGwProxy::getClientId(void)
00661 {
00662     return _clientId;
00663 }
00664 
00665 void LGwProxy::setForwarderMode(bool valid)
00666 {
00667     _isForwarderMode = valid;
00668 }
00669 
00670 void LGwProxy::setQoSMinus1Mode(bool valid)
00671 {
00672     _isQoSMinus1Mode = valid;
00673 }