Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Fork of MQTT by
MQTTAsync.h
00001 /******************************************************************************* 00002 * Copyright (c) 2014 IBM Corp. 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Ian Craggs - initial API and implementation and/or initial documentation 00015 *******************************************************************************/ 00016 00017 #if !defined(MQTTASYNC_H) 00018 #define MQTTASYNC_H 00019 00020 #include "FP.h" 00021 #include "MQTTPacket.h" 00022 #include "stdio.h" 00023 00024 00025 00026 namespace MQTT 00027 { 00028 00029 00030 enum QoS { QOS0, QOS1, QOS2 }; 00031 00032 00033 struct Message 00034 { 00035 enum QoS qos; 00036 bool retained; 00037 bool dup; 00038 unsigned short id; 00039 void *payload; 00040 size_t payloadlen; 00041 }; 00042 00043 00044 class PacketId 00045 { 00046 public: 00047 PacketId(); 00048 00049 int getNext(); 00050 00051 private: 00052 static const int MAX_PACKET_ID = 65535; 00053 int next; 00054 }; 00055 00056 typedef void (*messageHandler)(Message*); 00057 00058 typedef struct limits 00059 { 00060 int MAX_MQTT_PACKET_SIZE; // 00061 int MAX_MESSAGE_HANDLERS; // each subscription requires a message handler 00062 int MAX_CONCURRENT_OPERATIONS; // each command which runs concurrently can have a result handler, when we are in multi-threaded mode 00063 int command_timeout_ms; 00064 00065 limits() 00066 { 00067 MAX_MQTT_PACKET_SIZE = ADV_MAX_PACKET_SIZE; 00068 MAX_MESSAGE_HANDLERS = 5; 00069 MAX_CONCURRENT_OPERATIONS = 1; // 1 indicates single-threaded mode - set to >1 for multithreaded mode 00070 command_timeout_ms = 30000; 00071 } 00072 } Limits; 00073 00074 00075 /** 00076 * @class Async 00077 * @brief non-blocking, threaded MQTT client API 00078 * @param Network a network class which supports send, receive 00079 * @param Timer a timer class with the methods: 00080 */ 00081 template<class Network, class Timer, class Thread, class Mutex> class Async 00082 { 00083 00084 public: 00085 00086 struct Result 00087 { 00088 /* success or failure result data */ 00089 Async<Network, Timer, Thread, Mutex>* client; 00090 int rc; 00091 }; 00092 00093 typedef void (*resultHandler)(Result*); 00094 00095 Async(Network* network, const Limits limits = Limits()); 00096 00097 typedef struct 00098 { 00099 Async* client; 00100 Network* network; 00101 } connectionLostInfo; 00102 00103 typedef int (*connectionLostHandlers)(connectionLostInfo*); 00104 00105 /** Set the connection lost callback - called whenever the connection is lost and we should be connected 00106 * @param clh - pointer to the callback function 00107 */ 00108 void setConnectionLostHandler(connectionLostHandlers clh) 00109 { 00110 connectionLostHandler.attach(clh); 00111 } 00112 00113 /** Set the default message handling callback - used for any message which does not match a subscription message handler 00114 * @param mh - pointer to the callback function 00115 */ 00116 void setDefaultMessageHandler(messageHandler mh) 00117 { 00118 defaultMessageHandler.attach(mh); 00119 } 00120 00121 int connect(resultHandler fn, MQTTPacket_connectData* options = 0); 00122 00123 template<class T> 00124 int connect(void(T::*method)(Result *), MQTTPacket_connectData* options = 0, T *item = 0); // alternative to pass in pointer to member function 00125 00126 int publish(resultHandler rh, const char* topic, Message* message); 00127 00128 int subscribe(resultHandler rh, const char* topicFilter, enum QoS qos, messageHandler mh); 00129 00130 int unsubscribe(resultHandler rh, const char* topicFilter); 00131 00132 int disconnect(resultHandler rh); 00133 00134 private: 00135 00136 void run(void const *argument); 00137 int cycle(int timeout); 00138 int waitfor(int packet_type, Timer& atimer); 00139 int keepalive(); 00140 int findFreeOperation(); 00141 00142 int decodePacket(int* value, int timeout); 00143 int readPacket(int timeout); 00144 int sendPacket(int length, int timeout); 00145 int deliverMessage(MQTTString* topic, Message* message); 00146 00147 Thread* thread; 00148 Network* ipstack; 00149 00150 Limits limits; 00151 00152 char* buf; 00153 char* readbuf; 00154 00155 Timer ping_timer, connect_timer; 00156 unsigned int keepAliveInterval; 00157 bool ping_outstanding; 00158 00159 PacketId packetid; 00160 00161 typedef FP<void, Result*> resultHandlerFP; 00162 resultHandlerFP connectHandler; 00163 00164 typedef FP<void, Message*> messageHandlerFP; 00165 struct MessageHandlers 00166 { 00167 const char* topic; 00168 messageHandlerFP fp; 00169 } *messageHandlers; // Message handlers are indexed by subscription topic 00170 00171 // how many concurrent operations should we allow? Each one will require a function pointer 00172 struct Operations 00173 { 00174 unsigned short id; 00175 resultHandlerFP fp; 00176 const char* topic; // if this is a publish, store topic name in case republishing is required 00177 Message* message; // for publish, 00178 Timer timer; // to check if the command has timed out 00179 } *operations; // result handlers are indexed by packet ids 00180 00181 static void threadfn(void* arg); 00182 00183 messageHandlerFP defaultMessageHandler; 00184 00185 typedef FP<int, connectionLostInfo*> connectionLostFP; 00186 00187 connectionLostFP connectionLostHandler; 00188 00189 }; 00190 00191 } 00192 00193 00194 template<class Network, class Timer, class Thread, class Mutex> void MQTT::Async<Network, Timer, Thread, Mutex>::threadfn(void* arg) 00195 { 00196 ((Async<Network, Timer, Thread, Mutex>*) arg)->run(NULL); 00197 } 00198 00199 00200 template<class Network, class Timer, class Thread, class Mutex> MQTT::Async<Network, Timer, Thread, Mutex>::Async(Network* network, Limits limits) : limits(limits), packetid() 00201 { 00202 this->thread = 0; 00203 this->ipstack = network; 00204 this->ping_timer = Timer(); 00205 this->ping_outstanding = 0; 00206 00207 // How to make these memory allocations portable? I was hoping to avoid the heap 00208 printf("size=%d\n",limits.MAX_MQTT_PACKET_SIZE); 00209 buf = new char[limits.MAX_MQTT_PACKET_SIZE]; 00210 readbuf = new char[limits.MAX_MQTT_PACKET_SIZE]; 00211 this->operations = new struct Operations[limits.MAX_CONCURRENT_OPERATIONS]; 00212 for (int i = 0; i < limits.MAX_CONCURRENT_OPERATIONS; ++i) 00213 operations[i].id = 0; 00214 this->messageHandlers = new struct MessageHandlers[limits.MAX_MESSAGE_HANDLERS]; 00215 for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i) 00216 messageHandlers[i].topic = 0; 00217 } 00218 00219 00220 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::sendPacket(int length, int timeout) 00221 { 00222 int sent = 0; 00223 00224 while (sent < length) 00225 sent += ipstack->write(&buf[sent], length, timeout); 00226 if (sent == length) 00227 ping_timer.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet 00228 return sent; 00229 } 00230 00231 00232 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::decodePacket(int* value, int timeout) 00233 { 00234 char c; 00235 int multiplier = 1; 00236 int len = 0; 00237 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; 00238 00239 *value = 0; 00240 do 00241 { 00242 int rc = MQTTPACKET_READ_ERROR; 00243 00244 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) 00245 { 00246 rc = MQTTPACKET_READ_ERROR; /* bad data */ 00247 goto exit; 00248 } 00249 rc = ipstack->read(&c, 1, timeout); 00250 if (rc != 1) 00251 goto exit; 00252 *value += (c & 127) * multiplier; 00253 multiplier *= 128; 00254 } while ((c & 128) != 0); 00255 exit: 00256 return len; 00257 } 00258 00259 00260 /** 00261 * If any read fails in this method, then we should disconnect from the network, as on reconnect 00262 * the packets can be retried. 00263 * @param timeout the max time to wait for the packet read to complete, in milliseconds 00264 * @return the MQTT packet type, or -1 if none 00265 */ 00266 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::readPacket(int timeout) 00267 { 00268 int rc = -1; 00269 MQTTHeader header = {0}; 00270 int len = 0; 00271 int rem_len = 0; 00272 00273 /* 1. read the header byte. This has the packet type in it */ 00274 if (ipstack->read(readbuf, 1, timeout) != 1) 00275 goto exit; 00276 00277 len = 1; 00278 /* 2. read the remaining length. This is variable in itself */ 00279 decodePacket(&rem_len, timeout); 00280 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */ 00281 00282 /* 3. read the rest of the buffer using a callback to supply the rest of the data */ 00283 if (ipstack->read(readbuf + len, rem_len, timeout) != rem_len) 00284 goto exit; 00285 00286 header.byte = readbuf[0]; 00287 rc = header.bits.type; 00288 exit: 00289 return rc; 00290 } 00291 00292 00293 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::deliverMessage(MQTTString* topic, Message* message) 00294 { 00295 int rc = -1; 00296 00297 // we have to find the right message handler - indexed by topic 00298 for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i) 00299 { 00300 if (messageHandlers[i].topic != 0 && MQTTPacket_equals(topic, (char*)messageHandlers[i].topic)) 00301 { 00302 messageHandlers[i].fp(message); 00303 rc = 0; 00304 break; 00305 } 00306 } 00307 00308 return rc; 00309 } 00310 00311 00312 00313 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::cycle(int timeout) 00314 { 00315 /* get one piece of work off the wire and one pass through */ 00316 00317 // read the socket, see what work is due 00318 int packet_type = readPacket(timeout); 00319 00320 int len, rc; 00321 switch (packet_type) 00322 { 00323 case CONNACK: 00324 if (this->thread) 00325 { 00326 Result res = {this, 0}; 00327 if (MQTTDeserialize_connack(&res.rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) 00328 ; 00329 connectHandler(&res); 00330 connectHandler.detach(); // only invoke the callback once 00331 } 00332 break; 00333 case PUBACK: 00334 if (this->thread) 00335 ; //call resultHandler 00336 case SUBACK: 00337 break; 00338 case PUBLISH: 00339 MQTTString topicName; 00340 Message msg; 00341 rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName, 00342 (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, limits.MAX_MQTT_PACKET_SIZE);; 00343 if (msg.qos == QOS0) 00344 deliverMessage(&topicName, &msg); 00345 break; 00346 case PUBREC: 00347 int type, dup, mypacketid; 00348 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) 00349 ; 00350 // must lock this access against the application thread, if we are multi-threaded 00351 len = MQTTSerialize_ack(buf, limits.MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid); 00352 rc = sendPacket(len, timeout); // send the PUBREL packet 00353 if (rc != len) 00354 goto exit; // there was a problem 00355 00356 break; 00357 case PUBCOMP: 00358 break; 00359 case PINGRESP: 00360 ping_outstanding = false; 00361 break; 00362 } 00363 keepalive(); 00364 exit: 00365 return packet_type; 00366 } 00367 00368 00369 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::keepalive() 00370 { 00371 int rc = 0; 00372 00373 if (keepAliveInterval == 0) 00374 goto exit; 00375 00376 if (ping_timer.expired()) 00377 { 00378 if (ping_outstanding) 00379 rc = -1; 00380 else 00381 { 00382 int len = MQTTSerialize_pingreq(buf, limits.MAX_MQTT_PACKET_SIZE); 00383 rc = sendPacket(len, 1000); // send the ping packet 00384 if (rc != len) 00385 rc = -1; // indicate there's a problem 00386 else 00387 ping_outstanding = true; 00388 } 00389 } 00390 00391 exit: 00392 return rc; 00393 } 00394 00395 00396 template<class Network, class Timer, class Thread, class Mutex> void MQTT::Async<Network, Timer, Thread, Mutex>::run(void const *argument) 00397 { 00398 while (true) 00399 cycle(ping_timer.left_ms()); 00400 } 00401 00402 00403 // only used in single-threaded mode where one command at a time is in process 00404 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::waitfor(int packet_type, Timer& atimer) 00405 { 00406 int rc = -1; 00407 00408 do 00409 { 00410 if (atimer.expired()) 00411 break; // we timed out 00412 } 00413 while ((rc = cycle(atimer.left_ms())) != packet_type); 00414 00415 return rc; 00416 } 00417 00418 00419 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::connect(resultHandler resultHandler, MQTTPacket_connectData* options) 00420 { 00421 connect_timer.countdown(limits.command_timeout_ms); 00422 00423 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; 00424 if (options == 0) 00425 options = &default_options; // set default options if none were supplied 00426 00427 this->keepAliveInterval = options->keepAliveInterval; 00428 ping_timer.countdown(this->keepAliveInterval); 00429 int len = MQTTSerialize_connect(buf, limits.MAX_MQTT_PACKET_SIZE, options); 00430 int rc = sendPacket(len, connect_timer.left_ms()); // send the connect packet 00431 if (rc != len) 00432 goto exit; // there was a problem 00433 00434 if (resultHandler == 0) // wait until the connack is received 00435 { 00436 // this will be a blocking call, wait for the connack 00437 if (waitfor(CONNACK, connect_timer) == CONNACK) 00438 { 00439 int connack_rc = -1; 00440 if (MQTTDeserialize_connack(&connack_rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) 00441 rc = connack_rc; 00442 } 00443 } 00444 else 00445 { 00446 // set connect response callback function 00447 connectHandler.attach(resultHandler); 00448 00449 // start background thread 00450 this->thread = new Thread((void (*)(void const *argument))&MQTT::Async<Network, Timer, Thread, Mutex>::threadfn, (void*)this); 00451 } 00452 00453 exit: 00454 return rc; 00455 } 00456 00457 00458 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::findFreeOperation() 00459 { 00460 int found = -1; 00461 for (int i = 0; i < limits.MAX_CONCURRENT_OPERATIONS; ++i) 00462 { 00463 if (operations[i].id == 0) 00464 { 00465 found = i; 00466 break; 00467 } 00468 } 00469 return found; 00470 } 00471 00472 00473 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::subscribe(resultHandler resultHandler, const char* topicFilter, enum QoS qos, messageHandler messageHandler) 00474 { 00475 int index = 0; 00476 if (this->thread) 00477 index = findFreeOperation(); 00478 Timer& atimer = operations[index].timer; 00479 00480 atimer.countdown(limits.command_timeout_ms); 00481 MQTTString topic = {(char*)topicFilter, 0, 0}; 00482 00483 int len = MQTTSerialize_subscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); 00484 int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet 00485 if (rc != len) 00486 goto exit; // there was a problem 00487 00488 /* wait for suback */ 00489 if (resultHandler == 0) 00490 { 00491 // this will block 00492 if (waitfor(SUBACK, atimer) == SUBACK) 00493 { 00494 int count = 0, grantedQoS = -1, mypacketid; 00495 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) 00496 rc = grantedQoS; // 0, 1, 2 or 0x80 00497 if (rc != 0x80) 00498 { 00499 for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i) 00500 { 00501 if (messageHandlers[i].topic == 0) 00502 { 00503 messageHandlers[i].topic = topicFilter; 00504 messageHandlers[i].fp.attach(messageHandler); 00505 rc = 0; 00506 break; 00507 } 00508 } 00509 } 00510 } 00511 } 00512 else 00513 { 00514 // set subscribe response callback function 00515 00516 } 00517 00518 exit: 00519 return rc; 00520 } 00521 00522 00523 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::unsubscribe(resultHandler resultHandler, const char* topicFilter) 00524 { 00525 int index = 0; 00526 if (this->thread) 00527 index = findFreeOperation(); 00528 Timer& atimer = operations[index].timer; 00529 00530 atimer.countdown(limits.command_timeout_ms); 00531 MQTTString topic = {(char*)topicFilter, 0, 0}; 00532 00533 int len = MQTTSerialize_unsubscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic); 00534 int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet 00535 if (rc != len) 00536 goto exit; // there was a problem 00537 00538 // set unsubscribe response callback function 00539 00540 00541 exit: 00542 return rc; 00543 } 00544 00545 00546 00547 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::publish(resultHandler resultHandler, const char* topicName, Message* message) 00548 { 00549 int index = 0; 00550 if (this->thread) 00551 index = findFreeOperation(); 00552 Timer& atimer = operations[index].timer; 00553 00554 atimer.countdown(limits.command_timeout_ms); 00555 MQTTString topic = {(char*)topicName, 0, 0}; 00556 00557 if (message->qos == QOS1 || message->qos == QOS2) 00558 message->id = packetid.getNext(); 00559 00560 int len = MQTTSerialize_publish(buf, limits.MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id, topic, (char*)message->payload, message->payloadlen); 00561 int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet 00562 if (rc != len) 00563 goto exit; // there was a problem 00564 00565 /* wait for acks */ 00566 if (resultHandler == 0) 00567 { 00568 if (message->qos == QOS1) 00569 { 00570 if (waitfor(PUBACK, atimer) == PUBACK) 00571 { 00572 int type, dup, mypacketid; 00573 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) 00574 rc = 0; 00575 } 00576 } 00577 else if (message->qos == QOS2) 00578 { 00579 if (waitfor(PUBCOMP, atimer) == PUBCOMP) 00580 { 00581 int type, dup, mypacketid; 00582 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) 00583 rc = 0; 00584 } 00585 00586 } 00587 } 00588 else 00589 { 00590 // set publish response callback function 00591 00592 } 00593 00594 exit: 00595 return rc; 00596 } 00597 00598 00599 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::disconnect(resultHandler resultHandler) 00600 { 00601 Timer timer = Timer(limits.command_timeout_ms); // we might wait for incomplete incoming publishes to complete 00602 int len = MQTTSerialize_disconnect(buf, limits.MAX_MQTT_PACKET_SIZE); 00603 int rc = sendPacket(len, timer.left_ms()); // send the disconnect packet 00604 00605 return (rc == len) ? 0 : -1; 00606 } 00607 00608 00609 00610 #endif
Generated on Fri Jul 15 2022 18:56:26 by
1.7.2
