Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
LPublishManager.cpp
00001 /************************************************************************************** 00002 * Copyright (c) 2016, Tomoaki Yamaguchi 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Tomoaki Yamaguchi - initial API and implementation and/or initial documentation 00015 **************************************************************************************/ 00016 00017 #include <stdlib.h> 00018 #include <string.h> 00019 #include <stdio.h> 00020 00021 #include "LMqttsnClientApp.h" 00022 #include "LTimer.h" 00023 #include "LGwProxy.h" 00024 #include "LMqttsnClient.h" 00025 #include "LPublishManager.h" 00026 #include "LScreen.h" 00027 00028 using namespace std; 00029 using namespace linuxAsyncClient; 00030 00031 extern void setUint16(uint8_t* pos, uint16_t val); 00032 extern uint16_t getUint16(const uint8_t* pos); 00033 extern LMqttsnClient* theClient; 00034 extern bool theOTAflag; 00035 extern LScreen* theScreen; 00036 /*======================================== 00037 Class PublishManager 00038 =======================================*/ 00039 const char* NULLCHAR = ""; 00040 00041 LPublishManager::LPublishManager() 00042 { 00043 _first = 0; 00044 _last = 0; 00045 _elmCnt = 0; 00046 _publishedFlg = SAVE_TASK_INDEX; 00047 } 00048 00049 LPublishManager::~LPublishManager() 00050 { 00051 PubElement* elm = _first; 00052 PubElement* sav = 0; 00053 while (elm) 00054 { 00055 sav = elm->next; 00056 if (elm != 0) 00057 { 00058 delElement(elm); 00059 } 00060 elm = sav; 00061 } 00062 } 00063 00064 void LPublishManager::publish(const char* topicName, Payload* payload, uint8_t qos, bool retain) 00065 { 00066 publish(topicName, payload->getRowData(), payload->getLen(), qos, retain); 00067 } 00068 00069 void LPublishManager::publish(const char* topicName, uint8_t* payload, uint16_t len, uint8_t qos, bool retain) 00070 { 00071 uint16_t msgId = 0; 00072 uint8_t topicType = MQTTSN_TOPIC_TYPE_SHORT; 00073 if ( strlen(topicName) > 2 ) 00074 { 00075 topicType = MQTTSN_TOPIC_TYPE_NORMAL; 00076 } 00077 00078 if ( qos > 0 && qos < 3 ) 00079 { 00080 msgId = theClient->getGwProxy()->getNextMsgId(); 00081 } 00082 00083 PubElement* elm = add(topicName, 0, payload, len, qos, retain, msgId, topicType); 00084 00085 if (elm->status == TOPICID_IS_READY) 00086 { 00087 sendPublish(elm); 00088 } 00089 else 00090 { 00091 theClient->getGwProxy()->registerTopic((char*) topicName, 0); 00092 } 00093 } 00094 00095 void LPublishManager::publish(uint16_t topicId, Payload* payload, uint8_t qos, bool retain) 00096 { 00097 publish(topicId, payload->getRowData(), payload->getLen(), qos, retain); 00098 } 00099 00100 void LPublishManager::publish(uint16_t topicId, uint8_t* payload, uint16_t len, uint8_t qos, bool retain) 00101 { 00102 uint16_t msgId = 0; 00103 if ( qos > 0 && qos < 3 ) 00104 { 00105 msgId = theClient->getGwProxy()->getNextMsgId(); 00106 } 00107 PubElement* elm = add(NULLCHAR, topicId, payload, len, qos, retain, msgId, MQTTSN_TOPIC_TYPE_PREDEFINED); 00108 sendPublish(elm); 00109 } 00110 00111 void LPublishManager::sendPublish(PubElement* elm) 00112 { 00113 if (elm == 0) 00114 { 00115 return; 00116 } 00117 00118 theClient->getGwProxy()->connect(); 00119 00120 uint8_t msg[MQTTSN_MAX_MSG_LENGTH + 1]; 00121 uint8_t org = 0; 00122 if (elm->payloadlen > 128) 00123 { 00124 msg[0] = 0x01; 00125 setUint16(msg + 1, elm->payloadlen + 9); 00126 org = 2; 00127 } 00128 else 00129 { 00130 msg[0] = (uint8_t) elm->payloadlen + 7; 00131 } 00132 msg[org + 1] = MQTTSN_TYPE_PUBLISH; 00133 msg[org + 2] = elm->flag; 00134 if ((elm->retryCount < MQTTSN_RETRY_COUNT)) 00135 { 00136 msg[org + 2] = msg[org + 2] | MQTTSN_FLAG_DUP; 00137 } 00138 if ((elm->flag & 0x03) == MQTTSN_TOPIC_TYPE_SHORT ) 00139 { 00140 memcpy(msg + org + 3, elm->topicName, 2); 00141 } 00142 else 00143 { 00144 setUint16(msg + org + 3, elm->topicId); 00145 } 00146 setUint16(msg + org + 5, elm->msgId); 00147 memcpy(msg + org + 7, elm->payload, elm->payloadlen); 00148 00149 theClient->getGwProxy()->writeMsg(msg); 00150 theClient->getGwProxy()->setPingReqTimer(); 00151 if ( ((elm->flag & 0x60) == MQTTSN_FLAG_QOS_0 ) || ( (elm->flag & 0x60) == MQTTSN_FLAG_QOS_M1) ) 00152 { 00153 DISPLAY("\033[0m\033[0;32m Topic \"%s\" was Published. \033[0m\033[0;37m\n\n", elm->topicName); 00154 remove(elm); // PUBLISH Done 00155 return; 00156 } 00157 else if ((elm->flag & 0x60) == MQTTSN_FLAG_QOS_1) 00158 { 00159 elm->status = WAIT_PUBACK; 00160 } 00161 else if ((elm->flag & 0x60) == MQTTSN_FLAG_QOS_2) 00162 { 00163 elm->status = WAIT_PUBREC; 00164 } 00165 00166 elm->sendUTC = time(NULL); 00167 elm->retryCount--; 00168 } 00169 00170 void LPublishManager::sendSuspend(const char* topicName, uint16_t topicId, uint8_t topicType) 00171 { 00172 PubElement* elm = _first; 00173 while (elm) 00174 { 00175 if (strcmp(elm->topicName, topicName) == 0 && elm->status == TOPICID_IS_SUSPEND) 00176 { 00177 elm->topicId = topicId; 00178 elm->flag |= topicType; 00179 elm->status = TOPICID_IS_READY; 00180 sendPublish(elm); 00181 elm = 0; 00182 } 00183 else 00184 { 00185 elm = elm->next; 00186 } 00187 } 00188 } 00189 00190 void LPublishManager::sendPubAck(uint16_t topicId, uint16_t msgId, uint8_t rc) 00191 { 00192 uint8_t msg[7]; 00193 msg[0] = 7; 00194 msg[1] = MQTTSN_TYPE_PUBACK; 00195 setUint16(msg + 2, topicId); 00196 setUint16(msg + 4, msgId); 00197 msg[6] = rc; 00198 theClient->getGwProxy()->writeMsg(msg); 00199 } 00200 00201 void LPublishManager::sendPubRel(PubElement* elm) 00202 { 00203 uint8_t msg[4]; 00204 msg[0] = 4; 00205 msg[1] = MQTTSN_TYPE_PUBREL; 00206 setUint16(msg + 2, elm->msgId); 00207 theClient->getGwProxy()->writeMsg(msg); 00208 } 00209 00210 bool LPublishManager::isDone(void) 00211 { 00212 return (_first == 0); 00213 } 00214 00215 bool LPublishManager::isMaxFlight(void) 00216 { 00217 return (_elmCnt > MAX_INFLIGHT_MSG / 2); 00218 } 00219 00220 void LPublishManager::responce(const uint8_t* msg, uint16_t msglen) 00221 { 00222 if (msg[0] == MQTTSN_TYPE_PUBACK) 00223 { 00224 uint16_t msgId = getUint16(msg + 3); 00225 PubElement* elm = getElement(msgId); 00226 if (elm == 0) 00227 { 00228 return; 00229 } 00230 if (msg[5] == MQTTSN_RC_ACCEPTED) 00231 { 00232 if (elm->status == WAIT_PUBACK) 00233 { 00234 DISPLAY("\033[0m\033[0;32m Topic \"%s\" Id : %d was Published. \033[0m\033[0;37m\n\n", elm->topicName, elm->topicId); 00235 remove(elm); // PUBLISH Done 00236 } 00237 } 00238 else if (msg[5] == MQTTSN_RC_REJECTED_INVALID_TOPIC_ID) 00239 { 00240 elm->status = TOPICID_IS_SUSPEND; 00241 elm->topicId = 0; 00242 elm->retryCount = MQTTSN_RETRY_COUNT; 00243 elm->sendUTC = 0; 00244 theClient->getGwProxy()->registerTopic((char*) elm->topicName, 0); 00245 } 00246 } 00247 else if (msg[0] == MQTTSN_TYPE_PUBREC) 00248 { 00249 PubElement* elm = getElement(getUint16(msg + 1)); 00250 if (elm == 0) 00251 { 00252 return; 00253 } 00254 if (elm->status == WAIT_PUBREC || elm->status == WAIT_PUBCOMP) 00255 { 00256 sendPubRel(elm); 00257 elm->status = WAIT_PUBCOMP; 00258 elm->sendUTC = time(NULL); 00259 } 00260 } 00261 else if (msg[0] == MQTTSN_TYPE_PUBCOMP) 00262 { 00263 PubElement* elm = getElement(getUint16(msg + 1)); 00264 if (elm == 0) 00265 { 00266 return; 00267 } 00268 if (elm->status == WAIT_PUBCOMP) 00269 { 00270 DISPLAY("\033[0m\033[0;32m Topic \"%s\" Id : %d was Published. \033[0m\033[0;37m\n\n", elm->topicName, elm->topicId); 00271 remove(elm); // PUBLISH Done 00272 } 00273 } 00274 } 00275 00276 void LPublishManager::published(uint8_t* msg, uint16_t msglen) 00277 { 00278 uint16_t topicId = getUint16(msg + 2); 00279 00280 if (msg[1] & MQTTSN_FLAG_QOS_1) 00281 { 00282 sendPubAck(topicId, getUint16(msg + 4), MQTTSN_RC_ACCEPTED); 00283 } 00284 00285 _publishedFlg = NEG_TASK_INDEX; 00286 theClient->getTopicTable()->execCallback(topicId, msg + 6, msglen - 6, (MQTTSN_topicTypes)(msg[1] & MQTTSN_TOPIC_TYPE)); 00287 _publishedFlg = SAVE_TASK_INDEX; 00288 } 00289 00290 void LPublishManager::checkTimeout(void) 00291 { 00292 PubElement* elm = _first; 00293 while (elm) 00294 { 00295 if (elm->sendUTC > 0 && elm->sendUTC + MQTTSN_TIME_RETRY < time(NULL)) 00296 { 00297 if (elm->retryCount >= 0) 00298 { 00299 sendPublish(elm); 00300 D_MQTTLOG("...Timeout retry\r\n"); 00301 } 00302 else 00303 { 00304 theClient->getGwProxy()->reconnect(); 00305 elm->retryCount = MQTTSN_RETRY_COUNT; 00306 break; 00307 } 00308 } 00309 elm = elm->next; 00310 } 00311 } 00312 00313 PubElement* LPublishManager::getElement(uint16_t msgId) 00314 { 00315 PubElement* elm = _first; 00316 while (elm) 00317 { 00318 if (elm->msgId == msgId) 00319 { 00320 break; 00321 } 00322 else 00323 { 00324 elm = elm->next; 00325 } 00326 } 00327 return elm; 00328 } 00329 00330 PubElement* LPublishManager::getElement(const char* topicName) 00331 { 00332 PubElement* elm = _first; 00333 while (elm) 00334 { 00335 if (strcmp(elm->topicName, topicName) == 0) 00336 { 00337 break; 00338 } 00339 else 00340 { 00341 elm = elm->next; 00342 } 00343 } 00344 return elm; 00345 } 00346 00347 void LPublishManager::remove(PubElement* elm) 00348 { 00349 if (elm) 00350 { 00351 if (elm->prev == 0) 00352 { 00353 _first = elm->next; 00354 if (elm->next == 0) 00355 { 00356 _last = 0; 00357 } 00358 else 00359 { 00360 elm->next->prev = 0; 00361 _last = elm->next; 00362 } 00363 } 00364 else 00365 { 00366 if ( elm->next == 0 ) 00367 { 00368 _last = elm->prev; 00369 } 00370 elm->prev->next = elm->next; 00371 } 00372 delElement(elm); 00373 } 00374 } 00375 00376 void LPublishManager::delElement(PubElement* elm) 00377 { 00378 if (elm->taskIndex >= 0) 00379 { 00380 theClient->getTaskManager()->done(elm->taskIndex); 00381 } 00382 _elmCnt--; 00383 if ( elm->payload ) 00384 { 00385 free(elm->payload); 00386 } 00387 free(elm); 00388 } 00389 00390 /* 00391 PubElement* PublishManager::add(const char* topicName, uint16_t topicId, MQTTSNPayload* payload, uint8_t qos, uint8_t retain, uint16_t msgId){ 00392 return add(topicName, topicId, payload->getRowData(), payload->getLen(), qos, retain, msgId); 00393 }*/ 00394 00395 PubElement* LPublishManager::add(const char* topicName, uint16_t topicId, uint8_t* payload, uint16_t len, uint8_t qos, 00396 uint8_t retain, uint16_t msgId, uint8_t topicType) 00397 { 00398 PubElement* elm = (PubElement*) calloc(1, sizeof(PubElement)); 00399 00400 if (elm == 0) 00401 { 00402 return elm; 00403 } 00404 if (_last == 0) 00405 { 00406 _first = elm; 00407 _last = elm; 00408 } 00409 else 00410 { 00411 elm->prev = _last; 00412 _last->next = elm; 00413 _last = elm; 00414 } 00415 00416 elm->topicName = topicName; 00417 elm->flag |= topicType; 00418 00419 if (qos == 0) 00420 { 00421 elm->flag |= MQTTSN_FLAG_QOS_0; 00422 } 00423 else if (qos == 1) 00424 { 00425 elm->flag |= MQTTSN_FLAG_QOS_1; 00426 } 00427 else if (qos == 2) 00428 { 00429 elm->flag |= MQTTSN_FLAG_QOS_2; 00430 } 00431 else if (qos == 3) 00432 { 00433 elm->flag |= MQTTSN_FLAG_QOS_M1; 00434 } 00435 if (retain) 00436 { 00437 elm->flag |= MQTTSN_FLAG_RETAIN; 00438 } 00439 00440 if (topicId) 00441 { 00442 elm->status = TOPICID_IS_READY; 00443 elm->topicId = topicId; 00444 } 00445 else 00446 { 00447 uint16_t id = theClient->getTopicId(topicName); 00448 if ( id ) 00449 { 00450 elm->status = TOPICID_IS_READY; 00451 elm->topicId = id; 00452 } 00453 } 00454 00455 elm->payloadlen = len; 00456 elm->msgId = msgId; 00457 elm->retryCount = MQTTSN_RETRY_COUNT; 00458 elm->sendUTC = 0; 00459 00460 if (_publishedFlg == NEG_TASK_INDEX) 00461 { 00462 elm->taskIndex = -1; 00463 } 00464 else 00465 { 00466 elm->taskIndex = theClient->getTaskManager()->getIndex(); 00467 theClient->getTaskManager()->suspend(elm->taskIndex); 00468 } 00469 00470 elm->payload = (uint8_t*) malloc(len); 00471 if (elm->payload == 0) 00472 { 00473 delElement(elm); 00474 return 0; 00475 } 00476 memcpy(elm->payload, payload, len); 00477 00478 ++_elmCnt; 00479 return elm; 00480 }
Generated on Wed Jul 13 2022 10:46:02 by
1.7.2