Jack Hansdampf / mbed-mqtt-GSOE1

Dependents:   ESP8266MQTT

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers LPublishManager.cpp Source File

LPublishManager.cpp

00001 /**************************************************************************************
00002  * Copyright (c) 2016, Tomoaki Yamaguchi
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors:
00014  *    Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
00015  **************************************************************************************/
00016 
00017 #include <stdlib.h>
00018 #include <string.h>
00019 #include <stdio.h>
00020 
00021 #include "LMqttsnClientApp.h"
00022 #include "LTimer.h"
00023 #include "LGwProxy.h"
00024 #include "LMqttsnClient.h"
00025 #include "LPublishManager.h"
00026 #include "LScreen.h"
00027 
00028 using namespace std;
00029 using namespace linuxAsyncClient;
00030 
00031 extern void setUint16(uint8_t* pos, uint16_t val);
00032 extern uint16_t getUint16(const uint8_t* pos);
00033 extern LMqttsnClient* theClient;
00034 extern bool theOTAflag;
00035 extern LScreen* theScreen;
00036 /*========================================
00037  Class PublishManager
00038  =======================================*/
00039 const char* NULLCHAR = "";
00040 
00041 LPublishManager::LPublishManager()
00042 {
00043     _first = 0;
00044     _last = 0;
00045     _elmCnt = 0;
00046     _publishedFlg = SAVE_TASK_INDEX;
00047 }
00048 
00049 LPublishManager::~LPublishManager()
00050 {
00051     PubElement* elm = _first;
00052     PubElement* sav = 0;
00053     while (elm)
00054     {
00055         sav = elm->next;
00056         if (elm != 0)
00057         {
00058             delElement(elm);
00059         }
00060         elm = sav;
00061     }
00062 }
00063 
00064 void LPublishManager::publish(const char* topicName, Payload* payload, uint8_t qos, bool retain)
00065 {
00066     publish(topicName, payload->getRowData(), payload->getLen(), qos, retain);
00067 }
00068 
00069 void LPublishManager::publish(const char* topicName, uint8_t* payload, uint16_t len, uint8_t qos, bool retain)
00070 {
00071     uint16_t msgId = 0;
00072     uint8_t topicType = MQTTSN_TOPIC_TYPE_SHORT;
00073     if ( strlen(topicName) > 2 )
00074     {
00075         topicType = MQTTSN_TOPIC_TYPE_NORMAL;
00076     }
00077 
00078     if ( qos > 0  && qos < 3 )
00079     {
00080         msgId = theClient->getGwProxy()->getNextMsgId();
00081     }
00082 
00083     PubElement* elm = add(topicName, 0, payload, len, qos, retain, msgId, topicType);
00084 
00085     if (elm->status == TOPICID_IS_READY)
00086     {
00087         sendPublish(elm);
00088     }
00089     else
00090     {
00091         theClient->getGwProxy()->registerTopic((char*) topicName, 0);
00092     }
00093 }
00094 
00095 void LPublishManager::publish(uint16_t topicId, Payload* payload, uint8_t qos, bool retain)
00096 {
00097     publish(topicId, payload->getRowData(), payload->getLen(), qos, retain);
00098 }
00099 
00100 void LPublishManager::publish(uint16_t topicId, uint8_t* payload, uint16_t len, uint8_t qos, bool retain)
00101 {
00102     uint16_t msgId = 0;
00103     if ( qos > 0 && qos < 3 )
00104     {
00105         msgId = theClient->getGwProxy()->getNextMsgId();
00106     }
00107     PubElement* elm = add(NULLCHAR, topicId, payload, len, qos, retain, msgId, MQTTSN_TOPIC_TYPE_PREDEFINED);
00108     sendPublish(elm);
00109 }
00110 
00111 void LPublishManager::sendPublish(PubElement* elm)
00112 {
00113     if (elm == 0)
00114     {
00115         return;
00116     }
00117 
00118     theClient->getGwProxy()->connect();
00119 
00120     uint8_t msg[MQTTSN_MAX_MSG_LENGTH + 1];
00121     uint8_t org = 0;
00122     if (elm->payloadlen > 128)
00123     {
00124         msg[0] = 0x01;
00125         setUint16(msg + 1, elm->payloadlen + 9);
00126         org = 2;
00127     }
00128     else
00129     {
00130         msg[0] = (uint8_t) elm->payloadlen + 7;
00131     }
00132     msg[org + 1] = MQTTSN_TYPE_PUBLISH;
00133     msg[org + 2] = elm->flag;
00134     if ((elm->retryCount < MQTTSN_RETRY_COUNT))
00135     {
00136         msg[org + 2] = msg[org + 2] | MQTTSN_FLAG_DUP;
00137     }
00138     if ((elm->flag & 0x03) == MQTTSN_TOPIC_TYPE_SHORT )
00139     {
00140         memcpy(msg + org + 3, elm->topicName, 2);
00141     }
00142     else
00143     {
00144         setUint16(msg + org + 3, elm->topicId);
00145     }
00146     setUint16(msg + org + 5, elm->msgId);
00147     memcpy(msg + org + 7, elm->payload, elm->payloadlen);
00148 
00149     theClient->getGwProxy()->writeMsg(msg);
00150     theClient->getGwProxy()->setPingReqTimer();
00151     if ( ((elm->flag & 0x60) == MQTTSN_FLAG_QOS_0 ) || ( (elm->flag & 0x60) == MQTTSN_FLAG_QOS_M1) )
00152     {
00153         DISPLAY("\033[0m\033[0;32m Topic \"%s\" was Published. \033[0m\033[0;37m\n\n", elm->topicName);
00154         remove(elm);  // PUBLISH Done
00155         return;
00156     }
00157     else if ((elm->flag & 0x60) == MQTTSN_FLAG_QOS_1)
00158     {
00159         elm->status = WAIT_PUBACK;
00160     }
00161     else if ((elm->flag & 0x60) == MQTTSN_FLAG_QOS_2)
00162     {
00163         elm->status = WAIT_PUBREC;
00164     }
00165 
00166     elm->sendUTC = time(NULL);
00167     elm->retryCount--;
00168 }
00169 
00170 void LPublishManager::sendSuspend(const char* topicName, uint16_t topicId, uint8_t topicType)
00171 {
00172     PubElement* elm = _first;
00173     while (elm)
00174     {
00175         if (strcmp(elm->topicName, topicName) == 0 && elm->status == TOPICID_IS_SUSPEND)
00176         {
00177             elm->topicId = topicId;
00178             elm->flag |= topicType;
00179             elm->status = TOPICID_IS_READY;
00180             sendPublish(elm);
00181             elm = 0;
00182         }
00183         else
00184         {
00185             elm = elm->next;
00186         }
00187     }
00188 }
00189 
00190 void LPublishManager::sendPubAck(uint16_t topicId, uint16_t msgId, uint8_t rc)
00191 {
00192     uint8_t msg[7];
00193     msg[0] = 7;
00194     msg[1] = MQTTSN_TYPE_PUBACK;
00195     setUint16(msg + 2, topicId);
00196     setUint16(msg + 4, msgId);
00197     msg[6] = rc;
00198     theClient->getGwProxy()->writeMsg(msg);
00199 }
00200 
00201 void LPublishManager::sendPubRel(PubElement* elm)
00202 {
00203     uint8_t msg[4];
00204     msg[0] = 4;
00205     msg[1] = MQTTSN_TYPE_PUBREL;
00206     setUint16(msg + 2, elm->msgId);
00207     theClient->getGwProxy()->writeMsg(msg);
00208 }
00209 
00210 bool LPublishManager::isDone(void)
00211 {
00212     return (_first == 0);
00213 }
00214 
00215 bool LPublishManager::isMaxFlight(void)
00216 {
00217     return (_elmCnt > MAX_INFLIGHT_MSG / 2);
00218 }
00219 
00220 void LPublishManager::responce(const uint8_t* msg, uint16_t msglen)
00221 {
00222     if (msg[0] == MQTTSN_TYPE_PUBACK)
00223     {
00224         uint16_t msgId = getUint16(msg + 3);
00225         PubElement* elm = getElement(msgId);
00226         if (elm == 0)
00227         {
00228             return;
00229         }
00230         if (msg[5] == MQTTSN_RC_ACCEPTED)
00231         {
00232             if (elm->status == WAIT_PUBACK)
00233             {
00234                 DISPLAY("\033[0m\033[0;32m Topic \"%s\"  Id : %d was Published. \033[0m\033[0;37m\n\n", elm->topicName, elm->topicId);
00235                 remove(elm); // PUBLISH Done
00236             }
00237         }
00238         else if (msg[5] == MQTTSN_RC_REJECTED_INVALID_TOPIC_ID)
00239         {
00240             elm->status = TOPICID_IS_SUSPEND;
00241             elm->topicId = 0;
00242             elm->retryCount = MQTTSN_RETRY_COUNT;
00243             elm->sendUTC = 0;
00244             theClient->getGwProxy()->registerTopic((char*) elm->topicName, 0);
00245         }
00246     }
00247     else if (msg[0] == MQTTSN_TYPE_PUBREC)
00248     {
00249         PubElement* elm = getElement(getUint16(msg + 1));
00250         if (elm == 0)
00251         {
00252             return;
00253         }
00254         if (elm->status == WAIT_PUBREC || elm->status == WAIT_PUBCOMP)
00255         {
00256             sendPubRel(elm);
00257             elm->status = WAIT_PUBCOMP;
00258             elm->sendUTC = time(NULL);
00259         }
00260     }
00261     else if (msg[0] == MQTTSN_TYPE_PUBCOMP)
00262     {
00263         PubElement* elm = getElement(getUint16(msg + 1));
00264         if (elm == 0)
00265         {
00266             return;
00267         }
00268         if (elm->status == WAIT_PUBCOMP)
00269         {
00270             DISPLAY("\033[0m\033[0;32m Topic \"%s\"  Id : %d was Published. \033[0m\033[0;37m\n\n", elm->topicName, elm->topicId);
00271             remove(elm);  // PUBLISH Done
00272         }
00273     }
00274 }
00275 
00276 void LPublishManager::published(uint8_t* msg, uint16_t msglen)
00277 {
00278     uint16_t topicId = getUint16(msg + 2);
00279 
00280     if (msg[1] & MQTTSN_FLAG_QOS_1)
00281     {
00282         sendPubAck(topicId, getUint16(msg + 4), MQTTSN_RC_ACCEPTED);
00283     }
00284 
00285     _publishedFlg = NEG_TASK_INDEX;
00286     theClient->getTopicTable()->execCallback(topicId, msg + 6, msglen - 6,  (MQTTSN_topicTypes)(msg[1] & MQTTSN_TOPIC_TYPE));
00287     _publishedFlg = SAVE_TASK_INDEX;
00288 }
00289 
00290 void LPublishManager::checkTimeout(void)
00291 {
00292     PubElement* elm = _first;
00293     while (elm)
00294     {
00295         if (elm->sendUTC > 0 && elm->sendUTC + MQTTSN_TIME_RETRY < time(NULL))
00296         {
00297             if (elm->retryCount >= 0)
00298             {
00299                 sendPublish(elm);
00300                 D_MQTTLOG("...Timeout retry\r\n");
00301             }
00302             else
00303             {
00304                 theClient->getGwProxy()->reconnect();
00305                 elm->retryCount = MQTTSN_RETRY_COUNT;
00306                 break;
00307             }
00308         }
00309         elm = elm->next;
00310     }
00311 }
00312 
00313 PubElement* LPublishManager::getElement(uint16_t msgId)
00314 {
00315     PubElement* elm = _first;
00316     while (elm)
00317     {
00318         if (elm->msgId == msgId)
00319         {
00320             break;
00321         }
00322         else
00323         {
00324             elm = elm->next;
00325         }
00326     }
00327     return elm;
00328 }
00329 
00330 PubElement* LPublishManager::getElement(const char* topicName)
00331 {
00332     PubElement* elm = _first;
00333     while (elm)
00334     {
00335         if (strcmp(elm->topicName, topicName) == 0)
00336         {
00337             break;
00338         }
00339         else
00340         {
00341             elm = elm->next;
00342         }
00343     }
00344     return elm;
00345 }
00346 
00347 void LPublishManager::remove(PubElement* elm)
00348 {
00349     if (elm)
00350         {
00351             if (elm->prev == 0)
00352             {
00353                 _first = elm->next;
00354                 if (elm->next == 0)
00355                 {
00356                     _last = 0;
00357                 }
00358                 else
00359                 {
00360                     elm->next->prev = 0;
00361                     _last = elm->next;
00362                 }
00363             }
00364             else
00365             {
00366                 if ( elm->next == 0 )
00367                 {
00368                     _last = elm->prev;
00369                 }
00370                 elm->prev->next = elm->next;
00371             }
00372             delElement(elm);
00373         }
00374 }
00375 
00376 void LPublishManager::delElement(PubElement* elm)
00377 {
00378     if (elm->taskIndex >= 0)
00379     {
00380         theClient->getTaskManager()->done(elm->taskIndex);
00381     }
00382     _elmCnt--;
00383     if ( elm->payload )
00384     {
00385         free(elm->payload);
00386     }
00387     free(elm);
00388 }
00389 
00390 /*
00391  PubElement* PublishManager::add(const char* topicName, uint16_t topicId, MQTTSNPayload* payload, uint8_t qos, uint8_t retain, uint16_t msgId){
00392  return add(topicName, topicId, payload->getRowData(), payload->getLen(), qos, retain, msgId);
00393  }*/
00394 
00395 PubElement* LPublishManager::add(const char* topicName, uint16_t topicId, uint8_t* payload, uint16_t len, uint8_t qos,
00396         uint8_t retain, uint16_t msgId, uint8_t topicType)
00397 {
00398     PubElement* elm = (PubElement*) calloc(1, sizeof(PubElement));
00399 
00400     if (elm == 0)
00401     {
00402         return elm;
00403     }
00404     if (_last == 0)
00405     {
00406         _first = elm;
00407         _last = elm;
00408     }
00409     else
00410     {
00411         elm->prev = _last;
00412         _last->next = elm;
00413         _last = elm;
00414     }
00415 
00416     elm->topicName = topicName;
00417     elm->flag |= topicType;
00418 
00419     if (qos == 0)
00420     {
00421         elm->flag |= MQTTSN_FLAG_QOS_0;
00422     }
00423     else if (qos == 1)
00424     {
00425         elm->flag |= MQTTSN_FLAG_QOS_1;
00426     }
00427     else if (qos == 2)
00428     {
00429         elm->flag |= MQTTSN_FLAG_QOS_2;
00430     }
00431     else if (qos == 3)
00432     {
00433         elm->flag |= MQTTSN_FLAG_QOS_M1;
00434     }
00435     if (retain)
00436     {
00437         elm->flag |= MQTTSN_FLAG_RETAIN;
00438     }
00439 
00440     if (topicId)
00441     {
00442         elm->status = TOPICID_IS_READY;
00443         elm->topicId = topicId;
00444     }
00445     else
00446     {
00447         uint16_t id = theClient->getTopicId(topicName);
00448         if ( id )
00449         {
00450             elm->status = TOPICID_IS_READY;
00451             elm->topicId = id;
00452         }
00453     }
00454 
00455     elm->payloadlen = len;
00456     elm->msgId = msgId;
00457     elm->retryCount = MQTTSN_RETRY_COUNT;
00458     elm->sendUTC = 0;
00459 
00460     if (_publishedFlg == NEG_TASK_INDEX)
00461     {
00462         elm->taskIndex = -1;
00463     }
00464     else
00465     {
00466         elm->taskIndex = theClient->getTaskManager()->getIndex();
00467         theClient->getTaskManager()->suspend(elm->taskIndex);
00468     }
00469 
00470     elm->payload = (uint8_t*) malloc(len);
00471     if (elm->payload == 0)
00472     {
00473         delElement(elm);
00474         return 0;
00475     }
00476     memcpy(elm->payload, payload, len);
00477 
00478     ++_elmCnt;
00479     return elm;
00480 }