Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Fork of mbed-os-example-mbed5-blinky by
MQTTAsync.h
00001 /******************************************************************************* 00002 * Copyright (c) 2014 IBM Corp. 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Ian Craggs - initial API and implementation and/or initial documentation 00015 *******************************************************************************/ 00016 00017 #if !defined(MQTTASYNC_H) 00018 #define MQTTASYNC_H 00019 00020 #include "FP.h" 00021 #include "MQTTPacket.h" 00022 #include "stdio.h" 00023 00024 namespace MQTT 00025 { 00026 00027 00028 enum QoS { QOS0, QOS1, QOS2 }; 00029 00030 00031 struct Message 00032 { 00033 enum QoS qos; 00034 bool retained; 00035 bool dup; 00036 unsigned short id; 00037 void *payload; 00038 size_t payloadlen; 00039 }; 00040 00041 00042 class PacketId 00043 { 00044 public: 00045 PacketId(); 00046 00047 int getNext(); 00048 00049 private: 00050 static const int MAX_PACKET_ID = 65535; 00051 int next; 00052 }; 00053 00054 typedef void (*messageHandler)(Message*); 00055 00056 typedef struct limits 00057 { 00058 int MAX_MQTT_PACKET_SIZE; // 00059 int MAX_MESSAGE_HANDLERS; // each subscription requires a message handler 00060 int MAX_CONCURRENT_OPERATIONS; // each command which runs concurrently can have a result handler, when we are in multi-threaded mode 00061 int command_timeout_ms; 00062 00063 limits() 00064 { 00065 MAX_MQTT_PACKET_SIZE = 100; 00066 MAX_MESSAGE_HANDLERS = 5; 00067 MAX_CONCURRENT_OPERATIONS = 1; // 1 indicates single-threaded mode - set to >1 for multithreaded mode 00068 command_timeout_ms = 30000; 00069 } 00070 } Limits; 00071 00072 00073 /** 00074 * @class Async 00075 * @brief non-blocking, threaded MQTT client API 00076 * @param Network a network class which supports send, receive 00077 * @param Timer a timer class with the methods: 00078 */ 00079 template<class Network, class Timer, class Thread, class Mutex> class Async 00080 { 00081 00082 public: 00083 00084 struct Result 00085 { 00086 /* success or failure result data */ 00087 Async<Network, Timer, Thread, Mutex>* client; 00088 int rc; 00089 }; 00090 00091 typedef void (*resultHandler)(Result*); 00092 00093 Async(Network* network, const Limits limits = Limits()); 00094 00095 typedef struct 00096 { 00097 Async* client; 00098 Network* network; 00099 } connectionLostInfo; 00100 00101 typedef int (*connectionLostHandlers)(connectionLostInfo*); 00102 00103 /** Set the connection lost callback - called whenever the connection is lost and we should be connected 00104 * @param clh - pointer to the callback function 00105 */ 00106 void setConnectionLostHandler(connectionLostHandlers clh) 00107 { 00108 connectionLostHandler.attach(clh); 00109 } 00110 00111 /** Set the default message handling callback - used for any message which does not match a subscription message handler 00112 * @param mh - pointer to the callback function 00113 */ 00114 void setDefaultMessageHandler(messageHandler mh) 00115 { 00116 defaultMessageHandler.attach(mh); 00117 } 00118 00119 int connect(resultHandler fn, MQTTPacket_connectData* options = 0); 00120 00121 template<class T> 00122 int connect(void(T::*method)(Result *), MQTTPacket_connectData* options = 0, T *item = 0); // alternative to pass in pointer to member function 00123 00124 int publish(resultHandler rh, const char* topic, Message* message); 00125 00126 int subscribe(resultHandler rh, const char* topicFilter, enum QoS qos, messageHandler mh); 00127 00128 int unsubscribe(resultHandler rh, const char* topicFilter); 00129 00130 int disconnect(resultHandler rh); 00131 00132 private: 00133 00134 void run(void const *argument); 00135 int cycle(int timeout); 00136 int waitfor(int packet_type, Timer& atimer); 00137 int keepalive(); 00138 int findFreeOperation(); 00139 00140 int decodePacket(int* value, int timeout); 00141 int readPacket(int timeout); 00142 int sendPacket(int length, int timeout); 00143 int deliverMessage(MQTTString* topic, Message* message); 00144 00145 Thread* thread; 00146 Network* ipstack; 00147 00148 Limits limits; 00149 00150 char* buf; 00151 char* readbuf; 00152 00153 Timer ping_timer, connect_timer; 00154 unsigned int keepAliveInterval; 00155 bool ping_outstanding; 00156 00157 PacketId packetid; 00158 00159 typedef FP<void, Result*> resultHandlerFP; 00160 resultHandlerFP connectHandler; 00161 00162 typedef FP<void, Message*> messageHandlerFP; 00163 struct MessageHandlers 00164 { 00165 const char* topic; 00166 messageHandlerFP fp; 00167 } *messageHandlers; // Message handlers are indexed by subscription topic 00168 00169 // how many concurrent operations should we allow? Each one will require a function pointer 00170 struct Operations 00171 { 00172 unsigned short id; 00173 resultHandlerFP fp; 00174 const char* topic; // if this is a publish, store topic name in case republishing is required 00175 Message* message; // for publish, 00176 Timer timer; // to check if the command has timed out 00177 } *operations; // result handlers are indexed by packet ids 00178 00179 static void threadfn(void* arg); 00180 00181 messageHandlerFP defaultMessageHandler; 00182 00183 typedef FP<int, connectionLostInfo*> connectionLostFP; 00184 00185 connectionLostFP connectionLostHandler; 00186 00187 }; 00188 00189 } 00190 00191 00192 template<class Network, class Timer, class Thread, class Mutex> void MQTT::Async<Network, Timer, Thread, Mutex>::threadfn(void* arg) 00193 { 00194 ((Async<Network, Timer, Thread, Mutex>*) arg)->run(NULL); 00195 } 00196 00197 00198 template<class Network, class Timer, class Thread, class Mutex> MQTT::Async<Network, Timer, Thread, Mutex>::Async(Network* network, Limits limits) : limits(limits), packetid() 00199 { 00200 this->thread = 0; 00201 this->ipstack = network; 00202 this->ping_timer = Timer(); 00203 this->ping_outstanding = 0; 00204 00205 // How to make these memory allocations portable? I was hoping to avoid the heap 00206 buf = new char[limits.MAX_MQTT_PACKET_SIZE]; 00207 readbuf = new char[limits.MAX_MQTT_PACKET_SIZE]; 00208 this->operations = new struct Operations[limits.MAX_CONCURRENT_OPERATIONS]; 00209 for (int i = 0; i < limits.MAX_CONCURRENT_OPERATIONS; ++i) 00210 operations[i].id = 0; 00211 this->messageHandlers = new struct MessageHandlers[limits.MAX_MESSAGE_HANDLERS]; 00212 for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i) 00213 messageHandlers[i].topic = 0; 00214 } 00215 00216 00217 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::sendPacket(int length, int timeout) 00218 { 00219 int sent = 0; 00220 00221 while (sent < length) 00222 sent += ipstack->write(&buf[sent], length, timeout); 00223 if (sent == length) 00224 ping_timer.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet 00225 return sent; 00226 } 00227 00228 00229 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::decodePacket(int* value, int timeout) 00230 { 00231 char c; 00232 int multiplier = 1; 00233 int len = 0; 00234 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; 00235 00236 *value = 0; 00237 do 00238 { 00239 int rc = MQTTPACKET_READ_ERROR; 00240 00241 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) 00242 { 00243 rc = MQTTPACKET_READ_ERROR; /* bad data */ 00244 goto exit; 00245 } 00246 rc = ipstack->read(&c, 1, timeout); 00247 if (rc != 1) 00248 goto exit; 00249 *value += (c & 127) * multiplier; 00250 multiplier *= 128; 00251 } while ((c & 128) != 0); 00252 exit: 00253 return len; 00254 } 00255 00256 00257 /** 00258 * If any read fails in this method, then we should disconnect from the network, as on reconnect 00259 * the packets can be retried. 00260 * @param timeout the max time to wait for the packet read to complete, in milliseconds 00261 * @return the MQTT packet type, or -1 if none 00262 */ 00263 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::readPacket(int timeout) 00264 { 00265 int rc = -1; 00266 MQTTHeader header = {0}; 00267 int len = 0; 00268 int rem_len = 0; 00269 00270 /* 1. read the header byte. This has the packet type in it */ 00271 if (ipstack->read(readbuf, 1, timeout) != 1) 00272 goto exit; 00273 00274 len = 1; 00275 /* 2. read the remaining length. This is variable in itself */ 00276 decodePacket(&rem_len, timeout); 00277 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */ 00278 00279 /* 3. read the rest of the buffer using a callback to supply the rest of the data */ 00280 if (ipstack->read(readbuf + len, rem_len, timeout) != rem_len) 00281 goto exit; 00282 00283 header.byte = readbuf[0]; 00284 rc = header.bits.type; 00285 exit: 00286 return rc; 00287 } 00288 00289 00290 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::deliverMessage(MQTTString* topic, Message* message) 00291 { 00292 int rc = -1; 00293 00294 // we have to find the right message handler - indexed by topic 00295 for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i) 00296 { 00297 if (messageHandlers[i].topic != 0 && MQTTPacket_equals(topic, (char*)messageHandlers[i].topic)) 00298 { 00299 messageHandlers[i].fp(message); 00300 rc = 0; 00301 break; 00302 } 00303 } 00304 00305 return rc; 00306 } 00307 00308 00309 00310 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::cycle(int timeout) 00311 { 00312 /* get one piece of work off the wire and one pass through */ 00313 00314 // read the socket, see what work is due 00315 int packet_type = readPacket(timeout); 00316 00317 int len, rc; 00318 switch (packet_type) 00319 { 00320 case CONNACK: 00321 if (this->thread) 00322 { 00323 Result res = {this, 0}; 00324 if (MQTTDeserialize_connack(&res.rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) 00325 ; 00326 connectHandler(&res); 00327 connectHandler.detach(); // only invoke the callback once 00328 } 00329 break; 00330 case PUBACK: 00331 if (this->thread) 00332 ; //call resultHandler 00333 case SUBACK: 00334 break; 00335 case PUBLISH: 00336 MQTTString topicName; 00337 Message msg; 00338 rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName, 00339 (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, limits.MAX_MQTT_PACKET_SIZE);; 00340 if (msg.qos == QOS0) 00341 deliverMessage(&topicName, &msg); 00342 break; 00343 case PUBREC: 00344 int type, dup, mypacketid; 00345 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) 00346 ; 00347 // must lock this access against the application thread, if we are multi-threaded 00348 len = MQTTSerialize_ack(buf, limits.MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid); 00349 rc = sendPacket(len, timeout); // send the PUBREL packet 00350 if (rc != len) 00351 goto exit; // there was a problem 00352 00353 break; 00354 case PUBCOMP: 00355 break; 00356 case PINGRESP: 00357 ping_outstanding = false; 00358 break; 00359 } 00360 keepalive(); 00361 exit: 00362 return packet_type; 00363 } 00364 00365 00366 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::keepalive() 00367 { 00368 int rc = 0; 00369 00370 if (keepAliveInterval == 0) 00371 goto exit; 00372 00373 if (ping_timer.expired()) 00374 { 00375 if (ping_outstanding) 00376 rc = -1; 00377 else 00378 { 00379 int len = MQTTSerialize_pingreq(buf, limits.MAX_MQTT_PACKET_SIZE); 00380 rc = sendPacket(len, 1000); // send the ping packet 00381 if (rc != len) 00382 rc = -1; // indicate there's a problem 00383 else 00384 ping_outstanding = true; 00385 } 00386 } 00387 00388 exit: 00389 return rc; 00390 } 00391 00392 00393 template<class Network, class Timer, class Thread, class Mutex> void MQTT::Async<Network, Timer, Thread, Mutex>::run(void const *argument) 00394 { 00395 while (true) 00396 cycle(ping_timer.left_ms()); 00397 } 00398 00399 00400 // only used in single-threaded mode where one command at a time is in process 00401 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::waitfor(int packet_type, Timer& atimer) 00402 { 00403 int rc = -1; 00404 00405 do 00406 { 00407 if (atimer.expired()) 00408 break; // we timed out 00409 } 00410 while ((rc = cycle(atimer.left_ms())) != packet_type); 00411 00412 return rc; 00413 } 00414 00415 00416 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::connect(resultHandler resultHandler, MQTTPacket_connectData* options) 00417 { 00418 connect_timer.countdown(limits.command_timeout_ms); 00419 00420 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; 00421 if (options == 0) 00422 options = &default_options; // set default options if none were supplied 00423 00424 this->keepAliveInterval = options->keepAliveInterval; 00425 ping_timer.countdown(this->keepAliveInterval); 00426 int len = MQTTSerialize_connect(buf, limits.MAX_MQTT_PACKET_SIZE, options); 00427 int rc = sendPacket(len, connect_timer.left_ms()); // send the connect packet 00428 if (rc != len) 00429 goto exit; // there was a problem 00430 00431 if (resultHandler == 0) // wait until the connack is received 00432 { 00433 // this will be a blocking call, wait for the connack 00434 if (waitfor(CONNACK, connect_timer) == CONNACK) 00435 { 00436 int connack_rc = -1; 00437 if (MQTTDeserialize_connack(&connack_rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) 00438 rc = connack_rc; 00439 } 00440 } 00441 else 00442 { 00443 // set connect response callback function 00444 connectHandler.attach(resultHandler); 00445 00446 // start background thread 00447 this->thread = new Thread((void (*)(void const *argument))&MQTT::Async<Network, Timer, Thread, Mutex>::threadfn, (void*)this); 00448 } 00449 00450 exit: 00451 return rc; 00452 } 00453 00454 00455 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::findFreeOperation() 00456 { 00457 int found = -1; 00458 for (int i = 0; i < limits.MAX_CONCURRENT_OPERATIONS; ++i) 00459 { 00460 if (operations[i].id == 0) 00461 { 00462 found = i; 00463 break; 00464 } 00465 } 00466 return found; 00467 } 00468 00469 00470 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::subscribe(resultHandler resultHandler, const char* topicFilter, enum QoS qos, messageHandler messageHandler) 00471 { 00472 int index = 0; 00473 if (this->thread) 00474 index = findFreeOperation(); 00475 Timer& atimer = operations[index].timer; 00476 00477 atimer.countdown(limits.command_timeout_ms); 00478 MQTTString topic = {(char*)topicFilter, 0, 0}; 00479 00480 int len = MQTTSerialize_subscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); 00481 int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet 00482 if (rc != len) 00483 goto exit; // there was a problem 00484 00485 /* wait for suback */ 00486 if (resultHandler == 0) 00487 { 00488 // this will block 00489 if (waitfor(SUBACK, atimer) == SUBACK) 00490 { 00491 int count = 0, grantedQoS = -1, mypacketid; 00492 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) 00493 rc = grantedQoS; // 0, 1, 2 or 0x80 00494 if (rc != 0x80) 00495 { 00496 for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i) 00497 { 00498 if (messageHandlers[i].topic == 0) 00499 { 00500 messageHandlers[i].topic = topicFilter; 00501 messageHandlers[i].fp.attach(messageHandler); 00502 rc = 0; 00503 break; 00504 } 00505 } 00506 } 00507 } 00508 } 00509 else 00510 { 00511 // set subscribe response callback function 00512 00513 } 00514 00515 exit: 00516 return rc; 00517 } 00518 00519 00520 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::unsubscribe(resultHandler resultHandler, const char* topicFilter) 00521 { 00522 int index = 0; 00523 if (this->thread) 00524 index = findFreeOperation(); 00525 Timer& atimer = operations[index].timer; 00526 00527 atimer.countdown(limits.command_timeout_ms); 00528 MQTTString topic = {(char*)topicFilter, 0, 0}; 00529 00530 int len = MQTTSerialize_unsubscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic); 00531 int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet 00532 if (rc != len) 00533 goto exit; // there was a problem 00534 00535 // set unsubscribe response callback function 00536 00537 00538 exit: 00539 return rc; 00540 } 00541 00542 00543 00544 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::publish(resultHandler resultHandler, const char* topicName, Message* message) 00545 { 00546 int index = 0; 00547 if (this->thread) 00548 index = findFreeOperation(); 00549 Timer& atimer = operations[index].timer; 00550 00551 atimer.countdown(limits.command_timeout_ms); 00552 MQTTString topic = {(char*)topicName, 0, 0}; 00553 00554 if (message->qos == QOS1 || message->qos == QOS2) 00555 message->id = packetid.getNext(); 00556 00557 int len = MQTTSerialize_publish(buf, limits.MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id, topic, (char*)message->payload, message->payloadlen); 00558 int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet 00559 if (rc != len) 00560 goto exit; // there was a problem 00561 00562 /* wait for acks */ 00563 if (resultHandler == 0) 00564 { 00565 if (message->qos == QOS1) 00566 { 00567 if (waitfor(PUBACK, atimer) == PUBACK) 00568 { 00569 int type, dup, mypacketid; 00570 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) 00571 rc = 0; 00572 } 00573 } 00574 else if (message->qos == QOS2) 00575 { 00576 if (waitfor(PUBCOMP, atimer) == PUBCOMP) 00577 { 00578 int type, dup, mypacketid; 00579 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) 00580 rc = 0; 00581 } 00582 00583 } 00584 } 00585 else 00586 { 00587 // set publish response callback function 00588 00589 } 00590 00591 exit: 00592 return rc; 00593 } 00594 00595 00596 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::disconnect(resultHandler resultHandler) 00597 { 00598 Timer timer = Timer(limits.command_timeout_ms); // we might wait for incomplete incoming publishes to complete 00599 int len = MQTTSerialize_disconnect(buf, limits.MAX_MQTT_PACKET_SIZE); 00600 int rc = sendPacket(len, timer.left_ms()); // send the disconnect packet 00601 00602 return (rc == len) ? 0 : -1; 00603 } 00604 00605 00606 00607 #endif
Generated on Wed Jul 13 2022 21:16:17 by
