Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
LSubscribeManager.cpp
00001 /************************************************************************************** 00002 * Copyright (c) 2016, Tomoaki Yamaguchi 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Tomoaki Yamaguchi - initial API and implementation and/or initial documentation 00015 **************************************************************************************/ 00016 00017 #include <stdlib.h> 00018 #include <string.h> 00019 00020 #include "LMqttsnClientApp.h" 00021 #include "LTimer.h" 00022 #include "LScreen.h" 00023 #include "LGwProxy.h" 00024 #include "LMqttsnClient.h" 00025 #include "LSubscribeManager.h" 00026 00027 using namespace std; 00028 using namespace linuxAsyncClient; 00029 00030 extern void setUint16(uint8_t* pos, uint16_t val); 00031 extern uint16_t getUint16(const uint8_t* pos); 00032 extern LMqttsnClient* theClient; 00033 extern SUBSCRIBE_LIST; 00034 extern LScreen* theScreen; 00035 #define SUB_DONE 1 00036 #define SUB_READY 0 00037 /*======================================== 00038 Class SubscribeManager 00039 =======================================*/ 00040 LSubscribeManager::LSubscribeManager() 00041 { 00042 _first = 0; 00043 _last = 0; 00044 } 00045 00046 LSubscribeManager::~LSubscribeManager() 00047 { 00048 SubElement* elm = _first; 00049 SubElement* sav = 0; 00050 while (elm) 00051 { 00052 sav = elm->next; 00053 if (elm != 0) 00054 { 00055 free(elm); 00056 } 00057 elm = sav; 00058 } 00059 } 00060 00061 void LSubscribeManager::onConnect(void) 00062 { 00063 DISPLAY("\033[0m\033[0;32m Attempting OnConnect.....\033[0m\033[0;37m\n"); 00064 if (_first == 0) 00065 { 00066 for (uint8_t i = 0; theOnPublishList[i].topic != 0; i++) 00067 { 00068 if ( theOnPublishList[i].type == MQTTSN_TOPIC_TYPE_PREDEFINED) 00069 { 00070 subscribe(theOnPublishList[i].id, theOnPublishList[i].pubCallback, theOnPublishList[i].qos); 00071 } 00072 else 00073 { 00074 subscribe(theOnPublishList[i].topic, theOnPublishList[i].pubCallback, theOnPublishList[i].qos); 00075 } 00076 } 00077 } 00078 else 00079 { 00080 SubElement* elm = _first; 00081 SubElement* pelm; 00082 do 00083 { 00084 pelm = elm; 00085 if (elm->msgType == MQTTSN_TYPE_SUBSCRIBE) 00086 { 00087 elm->done = SUB_READY; 00088 elm->retryCount = MQTTSN_RETRY_COUNT; 00089 subscribe(elm->topicName, elm->callback, elm->qos); 00090 } 00091 elm = pelm->next; 00092 } while (pelm->next); 00093 } 00094 00095 while (!theClient->getSubscribeManager()->isDone()) 00096 { 00097 theClient->getGwProxy()->getMessage(); 00098 } 00099 DISPLAY("\033[0m\033[0;32m OnConnect complete\033[0m\033[0;37m\n"); 00100 DISPLAY("\033[0m\033[0;32m Test is Ready.\033[0m\033[0;37m\n"); 00101 } 00102 00103 bool LSubscribeManager::isDone(void) 00104 { 00105 SubElement* elm = _first; 00106 SubElement* prevelm; 00107 while (elm) 00108 { 00109 prevelm = elm; 00110 if (elm->done == SUB_READY) 00111 { 00112 return false; 00113 } 00114 elm = prevelm->next; 00115 } 00116 return true; 00117 } 00118 00119 void LSubscribeManager::send(SubElement* elm) 00120 { 00121 if (elm->done == SUB_DONE) 00122 { 00123 return; 00124 } 00125 uint8_t msg[MQTTSN_MAX_MSG_LENGTH + 1]; 00126 if (elm->topicType == MQTTSN_TOPIC_TYPE_PREDEFINED) 00127 { 00128 msg[0] = 7; 00129 setUint16(msg + 5, elm->topicId); 00130 } 00131 else 00132 { 00133 msg[0] = 5 + strlen(elm->topicName); 00134 strcpy((char*) msg + 5, elm->topicName); 00135 } 00136 msg[1] = elm->msgType; 00137 msg[2] = elm->qos | elm->topicType; 00138 if (elm->retryCount == MQTTSN_RETRY_COUNT) 00139 { 00140 elm->msgId = theClient->getGwProxy()->getNextMsgId(); 00141 } 00142 00143 if ((elm->retryCount < MQTTSN_RETRY_COUNT) && elm->msgType == MQTTSN_TYPE_SUBSCRIBE) 00144 { 00145 msg[2] = msg[2] | MQTTSN_FLAG_DUP; 00146 } 00147 00148 setUint16(msg + 3, elm->msgId); 00149 00150 theClient->getGwProxy()->connect(); 00151 theClient->getGwProxy()->writeMsg(msg); 00152 theClient->getGwProxy()->setPingReqTimer(); 00153 elm->sendUTC = time(NULL); 00154 elm->retryCount--; 00155 } 00156 00157 void LSubscribeManager::subscribe(const char* topicName, TopicCallback onPublish, uint8_t qos) 00158 { 00159 MQTTSN_topicTypes topicType; 00160 if ( strlen(topicName) > 2 ) 00161 { 00162 topicType = MQTTSN_TOPIC_TYPE_NORMAL; 00163 } 00164 else 00165 { 00166 topicType = MQTTSN_TOPIC_TYPE_SHORT; 00167 } 00168 SubElement* elm = add(MQTTSN_TYPE_SUBSCRIBE, topicName, topicType, 0, qos, onPublish); 00169 send(elm); 00170 } 00171 00172 void LSubscribeManager::subscribe(uint16_t topicId, TopicCallback onPublish, uint8_t qos) 00173 { 00174 SubElement* elm = add(MQTTSN_TYPE_SUBSCRIBE, 0, MQTTSN_TOPIC_TYPE_PREDEFINED, topicId, qos, onPublish); 00175 send(elm); 00176 } 00177 00178 void LSubscribeManager::unsubscribe(const char* topicName) 00179 { 00180 MQTTSN_topicTypes topicType; 00181 if ( strlen(topicName) > 2 ) 00182 { 00183 topicType = MQTTSN_TOPIC_TYPE_NORMAL; 00184 } 00185 else 00186 { 00187 topicType = MQTTSN_TOPIC_TYPE_SHORT; 00188 } 00189 SubElement* elm = add(MQTTSN_TYPE_UNSUBSCRIBE, topicName, topicType, 0, 0, 0); 00190 send(elm); 00191 } 00192 00193 void LSubscribeManager::unsubscribe( uint16_t topicId) 00194 { 00195 SubElement* elm = add(MQTTSN_TYPE_UNSUBSCRIBE, 0, MQTTSN_TOPIC_TYPE_PREDEFINED, topicId, 0, 0); 00196 send(elm); 00197 } 00198 00199 void LSubscribeManager::checkTimeout(void) 00200 { 00201 SubElement* elm = _first; 00202 00203 while (elm) 00204 { 00205 if (elm->sendUTC + MQTTSN_TIME_RETRY < time(NULL)) 00206 { 00207 if (elm->retryCount >= 0) 00208 { 00209 send(elm); 00210 } 00211 else 00212 { 00213 if ( elm->done == SUB_READY ) 00214 { 00215 if (elm->msgType == MQTTSN_TYPE_SUBSCRIBE) 00216 { 00217 DISPLAY("\033[0m\033[0;31m\n!!!!!! SUBSCRIBE Error !!!!! Topic : %s\033[0m\033[0;37m\n\n", (char*)elm->topicName); 00218 }else{ 00219 DISPLAY("\033[0m\033[0;31m\n!!!!!! UNSUBSCRIBE Error !!!!! Topic : %s\033[0m\033[0;37m\n\n", (char*)elm->topicName); 00220 } 00221 elm->done = SUB_DONE; 00222 } 00223 } 00224 } 00225 elm = elm->next; 00226 } 00227 } 00228 00229 void LSubscribeManager::responce(const uint8_t* msg) 00230 { 00231 if (msg[0] == MQTTSN_TYPE_SUBACK) 00232 { 00233 uint16_t topicId = getUint16(msg + 2); 00234 uint16_t msgId = getUint16(msg + 4); 00235 uint8_t rc = msg[6]; 00236 00237 SubElement* elm = getElement(msgId); 00238 if (elm) 00239 { 00240 if ( rc == MQTTSN_RC_ACCEPTED ) 00241 { 00242 theClient->getGwProxy()->getTopicTable()->add((char*) elm->topicName, elm->topicType, topicId, elm->callback); 00243 getElement(msgId)->done = SUB_DONE; 00244 DISPLAY("\033[0m\033[0;32m Topic \"%s\" Id : %d was Subscribed. \033[0m\033[0;37m\n\n", getElement(msgId)->topicName, topicId); 00245 } 00246 else 00247 { 00248 DISPLAY("\033[0m\033[0;31m SUBACK Invalid messageId. %s\033[0m\033[0;37m\n\n", getElement(msgId)->topicName); 00249 remove(elm); 00250 } 00251 } 00252 } 00253 else if (msg[0] == MQTTSN_TYPE_UNSUBACK) 00254 { 00255 uint16_t msgId = getUint16(msg + 1); 00256 SubElement* elm = getElement(msgId); 00257 if (elm) 00258 { 00259 //theClient->getGwProxy()->getTopicTable()->setCallback(elm->topicName, 0); 00260 DISPLAY("\033[0m\033[0;32m Topic \"%s\" was Unsubscribed. \033[0m\033[0;37m\n\n", getElement(msgId)->topicName); 00261 remove(elm); 00262 } 00263 else 00264 { 00265 DISPLAY("\033[0m\033[0;31m UNSUBACK Invalid messageId. \033[0m\033[0;37m\n\n"); 00266 } 00267 } 00268 } 00269 00270 /* SubElement operations */ 00271 00272 SubElement* LSubscribeManager::add(uint8_t msgType, const char* topicName, MQTTSN_topicTypes topicType, uint16_t topicId, 00273 uint8_t qos, TopicCallback callback) 00274 { 00275 SubElement* elm = 0; 00276 if (topicName ) 00277 { 00278 elm = getElement(topicName, msgType); 00279 } 00280 else 00281 { 00282 elm = getElement(topicId, topicType); 00283 } 00284 00285 if ( elm == 0 ) 00286 { 00287 elm = (SubElement*) calloc(1, sizeof(SubElement)); 00288 if (elm == 0) 00289 { 00290 return 0; 00291 } 00292 if (_last == 0) 00293 { 00294 _first = elm; 00295 _last = elm; 00296 } 00297 else 00298 { 00299 elm->prev = _last; 00300 _last->next = elm; 00301 _last = elm; 00302 } 00303 } 00304 00305 elm->msgType = msgType; 00306 elm->callback = callback; 00307 elm->topicName = topicName; 00308 elm->topicId = topicId; 00309 elm->topicType = topicType; 00310 00311 if (qos == 1) 00312 { 00313 elm->qos = MQTTSN_FLAG_QOS_1; 00314 } 00315 else if (qos == 2) 00316 { 00317 elm->qos = MQTTSN_FLAG_QOS_2; 00318 } 00319 else 00320 { 00321 elm->qos = MQTTSN_FLAG_QOS_0; 00322 } 00323 elm->msgId = 0; 00324 elm->retryCount = MQTTSN_RETRY_COUNT; 00325 elm->done = SUB_READY; 00326 elm->sendUTC = 0; 00327 00328 return elm; 00329 } 00330 00331 void LSubscribeManager::remove(SubElement* elm) 00332 { 00333 if (elm) 00334 { 00335 if (elm->prev == 0) 00336 { 00337 _first = elm->next; 00338 if (elm->next != 0) 00339 { 00340 elm->next->prev = 0; 00341 _last = elm->next; 00342 } 00343 free(elm); 00344 } 00345 else 00346 { 00347 if ( elm->next == 0 ) 00348 { 00349 _last = elm->prev; 00350 } 00351 elm->prev->next = elm->next; 00352 free(elm); 00353 } 00354 } 00355 } 00356 00357 SubElement* LSubscribeManager::getElement(uint16_t msgId) 00358 { 00359 SubElement* elm = _first; 00360 while (elm) 00361 { 00362 if (elm->msgId == msgId) 00363 { 00364 return elm; 00365 } 00366 else 00367 { 00368 elm = elm->next; 00369 } 00370 } 00371 return 0; 00372 } 00373 00374 SubElement* LSubscribeManager::getElement(const char* topicName, uint8_t msgType) 00375 { 00376 SubElement* elm = _first; 00377 while (elm) 00378 { 00379 if ( elm->msgType == msgType && strncmp(elm->topicName, topicName, strlen(topicName)) == 0 ) 00380 { 00381 return elm; 00382 } 00383 else 00384 { 00385 elm = elm->next; 00386 } 00387 } 00388 return 0; 00389 } 00390 00391 SubElement* LSubscribeManager::getElement(uint16_t topicId, MQTTSN_topicTypes topicType) 00392 { 00393 SubElement* elm = _first; 00394 while (elm) 00395 { 00396 if (elm->topicId == topicId && elm->topicType == topicType) 00397 { 00398 return elm; 00399 } 00400 else 00401 { 00402 elm = elm->next; 00403 } 00404 } 00405 return 0; 00406 }
Generated on Wed Jul 13 2022 10:46:02 by
1.7.2