Jack Hansdampf / mbed-mqtt-GSOE1

Dependents:   ESP8266MQTT

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers LSubscribeManager.cpp Source File

LSubscribeManager.cpp

00001 /**************************************************************************************
00002  * Copyright (c) 2016, Tomoaki Yamaguchi
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors:
00014  *    Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
00015  **************************************************************************************/
00016 
00017 #include <stdlib.h>
00018 #include <string.h>
00019 
00020 #include "LMqttsnClientApp.h"
00021 #include "LTimer.h"
00022 #include "LScreen.h"
00023 #include "LGwProxy.h"
00024 #include "LMqttsnClient.h"
00025 #include "LSubscribeManager.h"
00026 
00027 using namespace std;
00028 using namespace linuxAsyncClient;
00029 
00030 extern void setUint16(uint8_t* pos, uint16_t val);
00031 extern uint16_t getUint16(const uint8_t* pos);
00032 extern LMqttsnClient* theClient;
00033 extern SUBSCRIBE_LIST;
00034 extern LScreen* theScreen;
00035 #define SUB_DONE   1
00036 #define SUB_READY  0
00037 /*========================================
00038  Class SubscribeManager
00039  =======================================*/
00040 LSubscribeManager::LSubscribeManager()
00041 {
00042     _first = 0;
00043     _last = 0;
00044 }
00045 
00046 LSubscribeManager::~LSubscribeManager()
00047 {
00048     SubElement* elm = _first;
00049     SubElement* sav = 0;
00050     while (elm)
00051     {
00052         sav = elm->next;
00053         if (elm != 0)
00054         {
00055             free(elm);
00056         }
00057         elm = sav;
00058     }
00059 }
00060 
00061 void LSubscribeManager::onConnect(void)
00062 {
00063     DISPLAY("\033[0m\033[0;32m Attempting OnConnect.....\033[0m\033[0;37m\n");
00064     if (_first == 0)
00065     {
00066         for (uint8_t i = 0; theOnPublishList[i].topic != 0; i++)
00067         {
00068             if ( theOnPublishList[i].type == MQTTSN_TOPIC_TYPE_PREDEFINED)
00069             {
00070                 subscribe(theOnPublishList[i].id, theOnPublishList[i].pubCallback, theOnPublishList[i].qos);
00071             }
00072             else
00073             {
00074                 subscribe(theOnPublishList[i].topic, theOnPublishList[i].pubCallback, theOnPublishList[i].qos);
00075             }
00076         }
00077     }
00078     else
00079     {
00080         SubElement* elm = _first;
00081         SubElement* pelm;
00082         do
00083         {
00084             pelm = elm;
00085             if (elm->msgType == MQTTSN_TYPE_SUBSCRIBE)
00086             {
00087                 elm->done = SUB_READY;
00088                 elm->retryCount = MQTTSN_RETRY_COUNT;
00089                 subscribe(elm->topicName, elm->callback, elm->qos);
00090             }
00091             elm = pelm->next;
00092         } while (pelm->next);
00093     }
00094 
00095     while (!theClient->getSubscribeManager()->isDone())
00096     {
00097         theClient->getGwProxy()->getMessage();
00098     }
00099     DISPLAY("\033[0m\033[0;32m OnConnect complete\033[0m\033[0;37m\n");
00100     DISPLAY("\033[0m\033[0;32m Test is Ready.\033[0m\033[0;37m\n");
00101 }
00102 
00103 bool LSubscribeManager::isDone(void)
00104 {
00105     SubElement* elm = _first;
00106     SubElement* prevelm;
00107     while (elm)
00108     {
00109         prevelm = elm;
00110         if (elm->done == SUB_READY)
00111         {
00112             return false;
00113         }
00114         elm = prevelm->next;
00115     }
00116     return true;
00117 }
00118 
00119 void LSubscribeManager::send(SubElement* elm)
00120 {
00121     if (elm->done == SUB_DONE)
00122     {
00123         return;
00124     }
00125     uint8_t msg[MQTTSN_MAX_MSG_LENGTH + 1];
00126     if (elm->topicType == MQTTSN_TOPIC_TYPE_PREDEFINED)
00127     {
00128         msg[0] = 7;
00129         setUint16(msg + 5, elm->topicId);
00130     }
00131     else
00132     {
00133         msg[0] = 5 + strlen(elm->topicName);
00134         strcpy((char*) msg + 5, elm->topicName);
00135     }
00136     msg[1] = elm->msgType;
00137     msg[2] = elm->qos | elm->topicType;
00138     if (elm->retryCount == MQTTSN_RETRY_COUNT)
00139     {
00140         elm->msgId = theClient->getGwProxy()->getNextMsgId();
00141     }
00142 
00143     if ((elm->retryCount < MQTTSN_RETRY_COUNT) && elm->msgType == MQTTSN_TYPE_SUBSCRIBE)
00144     {
00145         msg[2] = msg[2] | MQTTSN_FLAG_DUP;
00146     }
00147 
00148     setUint16(msg + 3, elm->msgId);
00149 
00150     theClient->getGwProxy()->connect();
00151     theClient->getGwProxy()->writeMsg(msg);
00152     theClient->getGwProxy()->setPingReqTimer();
00153     elm->sendUTC = time(NULL);
00154     elm->retryCount--;
00155 }
00156 
00157 void LSubscribeManager::subscribe(const char* topicName, TopicCallback onPublish, uint8_t qos)
00158 {
00159     MQTTSN_topicTypes topicType;
00160     if ( strlen(topicName) > 2 )
00161     {
00162         topicType = MQTTSN_TOPIC_TYPE_NORMAL;
00163     }
00164     else
00165     {
00166         topicType = MQTTSN_TOPIC_TYPE_SHORT;
00167     }
00168     SubElement* elm = add(MQTTSN_TYPE_SUBSCRIBE, topicName, topicType, 0,  qos, onPublish);
00169     send(elm);
00170 }
00171 
00172 void LSubscribeManager::subscribe(uint16_t topicId, TopicCallback onPublish, uint8_t qos)
00173 {
00174     SubElement* elm = add(MQTTSN_TYPE_SUBSCRIBE, 0, MQTTSN_TOPIC_TYPE_PREDEFINED, topicId,  qos, onPublish);
00175     send(elm);
00176 }
00177 
00178 void LSubscribeManager::unsubscribe(const char* topicName)
00179 {
00180     MQTTSN_topicTypes topicType;
00181     if ( strlen(topicName) > 2 )
00182     {
00183         topicType = MQTTSN_TOPIC_TYPE_NORMAL;
00184     }
00185     else
00186     {
00187         topicType = MQTTSN_TOPIC_TYPE_SHORT;
00188     }
00189     SubElement* elm = add(MQTTSN_TYPE_UNSUBSCRIBE, topicName, topicType, 0, 0, 0);
00190     send(elm);
00191 }
00192 
00193 void LSubscribeManager::unsubscribe( uint16_t topicId)
00194 {
00195     SubElement* elm = add(MQTTSN_TYPE_UNSUBSCRIBE, 0, MQTTSN_TOPIC_TYPE_PREDEFINED, topicId, 0, 0);
00196     send(elm);
00197 }
00198 
00199 void LSubscribeManager::checkTimeout(void)
00200 {
00201     SubElement* elm = _first;
00202 
00203     while (elm)
00204     {
00205         if (elm->sendUTC + MQTTSN_TIME_RETRY < time(NULL))
00206         {
00207             if (elm->retryCount >= 0)
00208             {
00209                 send(elm);
00210             }
00211             else
00212             {
00213                 if ( elm->done == SUB_READY )
00214                 {
00215                     if (elm->msgType == MQTTSN_TYPE_SUBSCRIBE)
00216                     {
00217                         DISPLAY("\033[0m\033[0;31m\n!!!!!! SUBSCRIBE  Error !!!!! Topic : %s\033[0m\033[0;37m\n\n", (char*)elm->topicName);
00218                     }else{
00219                         DISPLAY("\033[0m\033[0;31m\n!!!!!! UNSUBSCRIBE  Error !!!!! Topic : %s\033[0m\033[0;37m\n\n", (char*)elm->topicName);
00220                     }
00221                     elm->done = SUB_DONE;
00222                 }
00223             }
00224         }
00225         elm = elm->next;
00226     }
00227 }
00228 
00229 void LSubscribeManager::responce(const uint8_t* msg)
00230 {
00231     if (msg[0] == MQTTSN_TYPE_SUBACK)
00232     {
00233         uint16_t topicId = getUint16(msg + 2);
00234         uint16_t msgId = getUint16(msg + 4);
00235         uint8_t rc = msg[6];
00236 
00237         SubElement* elm = getElement(msgId);
00238         if (elm)
00239         {
00240             if ( rc == MQTTSN_RC_ACCEPTED )
00241             {
00242                 theClient->getGwProxy()->getTopicTable()->add((char*) elm->topicName, elm->topicType, topicId, elm->callback);
00243                 getElement(msgId)->done = SUB_DONE;
00244                 DISPLAY("\033[0m\033[0;32m Topic \"%s\" Id : %d was Subscribed. \033[0m\033[0;37m\n\n", getElement(msgId)->topicName, topicId);
00245             }
00246             else
00247             {
00248                 DISPLAY("\033[0m\033[0;31m SUBACK Invalid messageId. %s\033[0m\033[0;37m\n\n", getElement(msgId)->topicName);
00249                 remove(elm);
00250             }
00251         }
00252     }
00253     else if (msg[0] == MQTTSN_TYPE_UNSUBACK)
00254     {
00255         uint16_t msgId = getUint16(msg + 1);
00256         SubElement* elm = getElement(msgId);
00257         if (elm)
00258         {
00259             //theClient->getGwProxy()->getTopicTable()->setCallback(elm->topicName, 0);
00260             DISPLAY("\033[0m\033[0;32m Topic  \"%s\"  was Unsubscribed. \033[0m\033[0;37m\n\n", getElement(msgId)->topicName);
00261             remove(elm);
00262         }
00263         else
00264         {
00265             DISPLAY("\033[0m\033[0;31m UNSUBACK Invalid messageId. \033[0m\033[0;37m\n\n");
00266         }
00267     }
00268 }
00269 
00270 /* SubElement operations */
00271 
00272 SubElement* LSubscribeManager::add(uint8_t msgType, const char* topicName, MQTTSN_topicTypes topicType, uint16_t topicId,
00273         uint8_t qos, TopicCallback callback)
00274 {
00275     SubElement* elm = 0;
00276     if (topicName )
00277     {
00278         elm = getElement(topicName, msgType);
00279     }
00280     else
00281     {
00282         elm = getElement(topicId, topicType);
00283     }
00284 
00285     if ( elm  == 0 )
00286     {
00287         elm = (SubElement*) calloc(1, sizeof(SubElement));
00288         if (elm == 0)
00289         {
00290             return 0;
00291         }
00292         if (_last == 0)
00293         {
00294             _first = elm;
00295             _last = elm;
00296         }
00297         else
00298         {
00299             elm->prev = _last;
00300             _last->next = elm;
00301             _last = elm;
00302         }
00303     }
00304 
00305     elm->msgType = msgType;
00306     elm->callback = callback;
00307     elm->topicName = topicName;
00308     elm->topicId = topicId;
00309     elm->topicType = topicType;
00310 
00311     if (qos == 1)
00312     {
00313         elm->qos = MQTTSN_FLAG_QOS_1;
00314     }
00315     else if (qos == 2)
00316     {
00317         elm->qos = MQTTSN_FLAG_QOS_2;
00318     }
00319     else
00320     {
00321         elm->qos = MQTTSN_FLAG_QOS_0;
00322     }
00323     elm->msgId = 0;
00324     elm->retryCount = MQTTSN_RETRY_COUNT;
00325     elm->done = SUB_READY;
00326     elm->sendUTC = 0;
00327 
00328     return elm;
00329 }
00330 
00331 void LSubscribeManager::remove(SubElement* elm)
00332 {
00333     if (elm)
00334     {
00335         if (elm->prev == 0)
00336         {
00337             _first = elm->next;
00338             if (elm->next != 0)
00339             {
00340                 elm->next->prev = 0;
00341                 _last = elm->next;
00342             }
00343             free(elm);
00344         }
00345         else
00346         {
00347             if ( elm->next == 0 )
00348             {
00349                 _last = elm->prev;
00350             }
00351             elm->prev->next = elm->next;
00352             free(elm);
00353         }
00354     }
00355 }
00356 
00357 SubElement* LSubscribeManager::getElement(uint16_t msgId)
00358 {
00359     SubElement* elm = _first;
00360     while (elm)
00361     {
00362         if (elm->msgId == msgId)
00363         {
00364             return elm;
00365         }
00366         else
00367         {
00368             elm = elm->next;
00369         }
00370     }
00371     return 0;
00372 }
00373 
00374 SubElement* LSubscribeManager::getElement(const char* topicName, uint8_t msgType)
00375 {
00376     SubElement* elm = _first;
00377     while (elm)
00378     {
00379         if ( elm->msgType == msgType &&  strncmp(elm->topicName, topicName, strlen(topicName)) == 0 )
00380         {
00381             return elm;
00382         }
00383         else
00384         {
00385             elm = elm->next;
00386         }
00387     }
00388     return 0;
00389 }
00390 
00391 SubElement* LSubscribeManager::getElement(uint16_t topicId, MQTTSN_topicTypes topicType)
00392 {
00393     SubElement* elm = _first;
00394     while (elm)
00395     {
00396         if (elm->topicId == topicId && elm->topicType == topicType)
00397         {
00398             return elm;
00399         }
00400         else
00401         {
00402             elm = elm->next;
00403         }
00404     }
00405     return 0;
00406 }