Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
MQTTAsync.h
00001 /******************************************************************************* 00002 * Copyright (c) 2014 IBM Corp. 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Ian Craggs - initial API and implementation and/or initial documentation 00015 *******************************************************************************/ 00016 00017 #if !defined(MQTTASYNC_H) 00018 #define MQTTASYNC_H 00019 00020 #include "MQTTPacket.h" 00021 #include "stdio.h" 00022 00023 namespace MQTT 00024 { 00025 00026 00027 enum QoS { QOS0, QOS1, QOS2 }; 00028 00029 00030 struct Message 00031 { 00032 enum QoS qos; 00033 bool retained; 00034 bool dup; 00035 unsigned short id; 00036 void *payload; 00037 size_t payloadlen; 00038 }; 00039 00040 00041 class PacketId 00042 { 00043 public: 00044 PacketId(); 00045 00046 int getNext(); 00047 00048 private: 00049 static const int MAX_PACKET_ID = 65535; 00050 int next; 00051 }; 00052 00053 typedef void (*messageHandler)(Message*); 00054 00055 typedef struct limits 00056 { 00057 int MAX_MQTT_PACKET_SIZE; // 00058 int MAX_MESSAGE_HANDLERS; // each subscription requires a message handler 00059 int MAX_CONCURRENT_OPERATIONS; // each command which runs concurrently can have a result handler, when we are in multi-threaded mode 00060 int command_timeout_ms; 00061 00062 limits() 00063 { 00064 MAX_MQTT_PACKET_SIZE = 100; 00065 MAX_MESSAGE_HANDLERS = 5; 00066 MAX_CONCURRENT_OPERATIONS = 1; // 1 indicates single-threaded mode - set to >1 for multithreaded mode 00067 command_timeout_ms = 30000; 00068 } 00069 } Limits; 00070 00071 00072 /** 00073 * @class Async 00074 * @brief non-blocking, threaded MQTT client API 00075 * @param Network a network class which supports send, receive 00076 * @param Timer a timer class with the methods: 00077 */ 00078 template<class Network, class Timer, class Thread, class Mutex> class Async 00079 { 00080 00081 public: 00082 00083 struct Result 00084 { 00085 /* success or failure result data */ 00086 Async<Network, Timer, Thread, Mutex>* client; 00087 int rc; 00088 }; 00089 00090 typedef void (*resultHandler)(Result*); 00091 00092 Async(Network* network, const Limits limits = Limits()); 00093 00094 typedef struct 00095 { 00096 Async* client; 00097 Network* network; 00098 } connectionLostInfo; 00099 00100 typedef int (*connectionLostHandlers)(connectionLostInfo*); 00101 00102 /** Set the connection lost callback - called whenever the connection is lost and we should be connected 00103 * @param clh - pointer to the callback function 00104 */ 00105 void setConnectionLostHandler(connectionLostHandlers clh) 00106 { 00107 connectionLostHandler.attach(clh); 00108 } 00109 00110 /** Set the default message handling callback - used for any message which does not match a subscription message handler 00111 * @param mh - pointer to the callback function 00112 */ 00113 void setDefaultMessageHandler(messageHandler mh) 00114 { 00115 defaultMessageHandler.attach(mh); 00116 } 00117 00118 int connect(resultHandler fn, MQTTPacket_connectData* options = 0); 00119 00120 template<class T> 00121 int connect(void(T::*method)(Result *), MQTTPacket_connectData* options = 0, T *item = 0); // alternative to pass in pointer to member function 00122 00123 int publish(resultHandler rh, const char* topic, Message* message); 00124 00125 int subscribe(resultHandler rh, const char* topicFilter, enum QoS qos, messageHandler mh); 00126 00127 int unsubscribe(resultHandler rh, const char* topicFilter); 00128 00129 int disconnect(resultHandler rh); 00130 00131 private: 00132 00133 void run(void const *argument); 00134 int cycle(int timeout); 00135 int waitfor(int packet_type, Timer& atimer); 00136 int keepalive(); 00137 int findFreeOperation(); 00138 00139 int decodePacket(int* value, int timeout); 00140 int readPacket(int timeout); 00141 int sendPacket(int length, int timeout); 00142 int deliverMessage(MQTTString* topic, Message* message); 00143 00144 Thread* thread; 00145 Network* ipstack; 00146 00147 Limits limits; 00148 00149 char* buf; 00150 char* readbuf; 00151 00152 Timer ping_timer, connect_timer; 00153 unsigned int keepAliveInterval; 00154 bool ping_outstanding; 00155 00156 PacketId packetid; 00157 00158 typedef Callback<void(Result*)> resultHandlerFP; 00159 resultHandlerFP connectHandler; 00160 00161 typedef Callback<void(Message*)> messageHandlerFP; 00162 struct MessageHandlers 00163 { 00164 const char* topic; 00165 messageHandlerFP fp; 00166 } *messageHandlers; // Message handlers are indexed by subscription topic 00167 00168 // how many concurrent operations should we allow? Each one will require a function pointer 00169 struct Operations 00170 { 00171 unsigned short id; 00172 resultHandlerFP fp; 00173 const char* topic; // if this is a publish, store topic name in case republishing is required 00174 Message* message; // for publish, 00175 Timer timer; // to check if the command has timed out 00176 } *operations; // result handlers are indexed by packet ids 00177 00178 static void threadfn(void* arg); 00179 00180 messageHandlerFP defaultMessageHandler; 00181 00182 typedef Callback<int(connectionLostInfo*)> connectionLostFP; 00183 00184 connectionLostFP connectionLostHandler; 00185 00186 }; 00187 00188 } 00189 00190 00191 template<class Network, class Timer, class Thread, class Mutex> void MQTT::Async<Network, Timer, Thread, Mutex>::threadfn(void* arg) 00192 { 00193 ((Async<Network, Timer, Thread, Mutex>*) arg)->run(NULL); 00194 } 00195 00196 00197 template<class Network, class Timer, class Thread, class Mutex> MQTT::Async<Network, Timer, Thread, Mutex>::Async(Network* network, Limits limits) : limits(limits), packetid() 00198 { 00199 this->thread = 0; 00200 this->ipstack = network; 00201 this->ping_timer = Timer(); 00202 this->ping_outstanding = 0; 00203 00204 // How to make these memory allocations portable? I was hoping to avoid the heap 00205 buf = new char[limits.MAX_MQTT_PACKET_SIZE]; 00206 readbuf = new char[limits.MAX_MQTT_PACKET_SIZE]; 00207 this->operations = new struct Operations[limits.MAX_CONCURRENT_OPERATIONS]; 00208 for (int i = 0; i < limits.MAX_CONCURRENT_OPERATIONS; ++i) 00209 operations[i].id = 0; 00210 this->messageHandlers = new struct MessageHandlers[limits.MAX_MESSAGE_HANDLERS]; 00211 for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i) 00212 messageHandlers[i].topic = 0; 00213 } 00214 00215 00216 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::sendPacket(int length, int timeout) 00217 { 00218 int sent = 0; 00219 00220 while (sent < length) 00221 sent += ipstack->write(&buf[sent], length, timeout); 00222 if (sent == length) 00223 ping_timer.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet 00224 return sent; 00225 } 00226 00227 00228 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::decodePacket(int* value, int timeout) 00229 { 00230 char c; 00231 int multiplier = 1; 00232 int len = 0; 00233 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; 00234 00235 *value = 0; 00236 do 00237 { 00238 int rc = MQTTPACKET_READ_ERROR; 00239 00240 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) 00241 { 00242 rc = MQTTPACKET_READ_ERROR; /* bad data */ 00243 goto exit; 00244 } 00245 rc = ipstack->read(&c, 1, timeout); 00246 if (rc != 1) 00247 goto exit; 00248 *value += (c & 127) * multiplier; 00249 multiplier *= 128; 00250 } while ((c & 128) != 0); 00251 exit: 00252 return len; 00253 } 00254 00255 00256 /** 00257 * If any read fails in this method, then we should disconnect from the network, as on reconnect 00258 * the packets can be retried. 00259 * @param timeout the max time to wait for the packet read to complete, in milliseconds 00260 * @return the MQTT packet type, or -1 if none 00261 */ 00262 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::readPacket(int timeout) 00263 { 00264 int rc = -1; 00265 MQTTHeader header = {0}; 00266 int len = 0; 00267 int rem_len = 0; 00268 00269 /* 1. read the header byte. This has the packet type in it */ 00270 if (ipstack->read(readbuf, 1, timeout) != 1) 00271 goto exit; 00272 00273 len = 1; 00274 /* 2. read the remaining length. This is variable in itself */ 00275 decodePacket(&rem_len, timeout); 00276 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */ 00277 00278 /* 3. read the rest of the buffer using a callback to supply the rest of the data */ 00279 if (ipstack->read(readbuf + len, rem_len, timeout) != rem_len) 00280 goto exit; 00281 00282 header.byte = readbuf[0]; 00283 rc = header.bits.type; 00284 exit: 00285 return rc; 00286 } 00287 00288 00289 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::deliverMessage(MQTTString* topic, Message* message) 00290 { 00291 int rc = -1; 00292 00293 // we have to find the right message handler - indexed by topic 00294 for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i) 00295 { 00296 if (messageHandlers[i].topic != 0 && MQTTPacket_equals(topic, (char*)messageHandlers[i].topic)) 00297 { 00298 messageHandlers[i].fp(message); 00299 rc = 0; 00300 break; 00301 } 00302 } 00303 00304 return rc; 00305 } 00306 00307 00308 00309 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::cycle(int timeout) 00310 { 00311 /* get one piece of work off the wire and one pass through */ 00312 00313 // read the socket, see what work is due 00314 int packet_type = readPacket(timeout); 00315 00316 int len, rc; 00317 switch (packet_type) 00318 { 00319 case CONNACK: 00320 if (this->thread) 00321 { 00322 Result res = {this, 0}; 00323 if (MQTTDeserialize_connack(&res.rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) 00324 ; 00325 connectHandler(&res); 00326 connectHandler.detach(); // only invoke the callback once 00327 } 00328 break; 00329 case PUBACK: 00330 if (this->thread) 00331 ; //call resultHandler 00332 case SUBACK: 00333 break; 00334 case PUBLISH: 00335 MQTTString topicName; 00336 Message msg; 00337 rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName, 00338 (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, limits.MAX_MQTT_PACKET_SIZE);; 00339 if (msg.qos == QOS0) 00340 deliverMessage(&topicName, &msg); 00341 break; 00342 case PUBREC: 00343 int type, dup, mypacketid; 00344 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) 00345 ; 00346 // must lock this access against the application thread, if we are multi-threaded 00347 len = MQTTSerialize_ack(buf, limits.MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid); 00348 rc = sendPacket(len, timeout); // send the PUBREL packet 00349 if (rc != len) 00350 goto exit; // there was a problem 00351 00352 break; 00353 case PUBCOMP: 00354 break; 00355 case PINGRESP: 00356 ping_outstanding = false; 00357 break; 00358 } 00359 keepalive(); 00360 exit: 00361 return packet_type; 00362 } 00363 00364 00365 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::keepalive() 00366 { 00367 int rc = 0; 00368 00369 if (keepAliveInterval == 0) 00370 goto exit; 00371 00372 if (ping_timer.expired()) 00373 { 00374 if (ping_outstanding) 00375 rc = -1; 00376 else 00377 { 00378 int len = MQTTSerialize_pingreq(buf, limits.MAX_MQTT_PACKET_SIZE); 00379 rc = sendPacket(len, 1000); // send the ping packet 00380 if (rc != len) 00381 rc = -1; // indicate there's a problem 00382 else 00383 ping_outstanding = true; 00384 } 00385 } 00386 00387 exit: 00388 return rc; 00389 } 00390 00391 00392 template<class Network, class Timer, class Thread, class Mutex> void MQTT::Async<Network, Timer, Thread, Mutex>::run(void const *argument) 00393 { 00394 while (true) 00395 cycle(ping_timer.left_ms()); 00396 } 00397 00398 00399 // only used in single-threaded mode where one command at a time is in process 00400 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::waitfor(int packet_type, Timer& atimer) 00401 { 00402 int rc = -1; 00403 00404 do 00405 { 00406 if (atimer.expired()) 00407 break; // we timed out 00408 } 00409 while ((rc = cycle(atimer.left_ms())) != packet_type); 00410 00411 return rc; 00412 } 00413 00414 00415 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::connect(resultHandler resultHandler, MQTTPacket_connectData* options) 00416 { 00417 connect_timer.countdown(limits.command_timeout_ms); 00418 00419 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; 00420 if (options == 0) 00421 options = &default_options; // set default options if none were supplied 00422 00423 this->keepAliveInterval = options->keepAliveInterval; 00424 ping_timer.countdown(this->keepAliveInterval); 00425 int len = MQTTSerialize_connect(buf, limits.MAX_MQTT_PACKET_SIZE, options); 00426 int rc = sendPacket(len, connect_timer.left_ms()); // send the connect packet 00427 if (rc != len) 00428 goto exit; // there was a problem 00429 00430 if (resultHandler == 0) // wait until the connack is received 00431 { 00432 // this will be a blocking call, wait for the connack 00433 if (waitfor(CONNACK, connect_timer) == CONNACK) 00434 { 00435 int connack_rc = -1; 00436 if (MQTTDeserialize_connack(&connack_rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) 00437 rc = connack_rc; 00438 } 00439 } 00440 else 00441 { 00442 // set connect response callback function 00443 connectHandler.attach(resultHandler); 00444 00445 // start background thread 00446 this->thread = new Thread((void (*)(void const *argument))&MQTT::Async<Network, Timer, Thread, Mutex>::threadfn, (void*)this); 00447 } 00448 00449 exit: 00450 return rc; 00451 } 00452 00453 00454 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::findFreeOperation() 00455 { 00456 int found = -1; 00457 for (int i = 0; i < limits.MAX_CONCURRENT_OPERATIONS; ++i) 00458 { 00459 if (operations[i].id == 0) 00460 { 00461 found = i; 00462 break; 00463 } 00464 } 00465 return found; 00466 } 00467 00468 00469 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::subscribe(resultHandler resultHandler, const char* topicFilter, enum QoS qos, messageHandler messageHandler) 00470 { 00471 int index = 0; 00472 if (this->thread) 00473 index = findFreeOperation(); 00474 Timer& atimer = operations[index].timer; 00475 00476 atimer.countdown(limits.command_timeout_ms); 00477 MQTTString topic = {(char*)topicFilter, 0, 0}; 00478 00479 int len = MQTTSerialize_subscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); 00480 int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet 00481 if (rc != len) 00482 goto exit; // there was a problem 00483 00484 /* wait for suback */ 00485 if (resultHandler == 0) 00486 { 00487 // this will block 00488 if (waitfor(SUBACK, atimer) == SUBACK) 00489 { 00490 int count = 0, grantedQoS = -1, mypacketid; 00491 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) 00492 rc = grantedQoS; // 0, 1, 2 or 0x80 00493 if (rc != 0x80) 00494 { 00495 for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i) 00496 { 00497 if (messageHandlers[i].topic == 0) 00498 { 00499 messageHandlers[i].topic = topicFilter; 00500 messageHandlers[i].fp.attach(messageHandler); 00501 rc = 0; 00502 break; 00503 } 00504 } 00505 } 00506 } 00507 } 00508 else 00509 { 00510 // set subscribe response callback function 00511 00512 } 00513 00514 exit: 00515 return rc; 00516 } 00517 00518 00519 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::unsubscribe(resultHandler resultHandler, const char* topicFilter) 00520 { 00521 int index = 0; 00522 if (this->thread) 00523 index = findFreeOperation(); 00524 Timer& atimer = operations[index].timer; 00525 00526 atimer.countdown(limits.command_timeout_ms); 00527 MQTTString topic = {(char*)topicFilter, 0, 0}; 00528 00529 int len = MQTTSerialize_unsubscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic); 00530 int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet 00531 if (rc != len) 00532 goto exit; // there was a problem 00533 00534 // set unsubscribe response callback function 00535 00536 00537 exit: 00538 return rc; 00539 } 00540 00541 00542 00543 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::publish(resultHandler resultHandler, const char* topicName, Message* message) 00544 { 00545 int index = 0; 00546 if (this->thread) 00547 index = findFreeOperation(); 00548 Timer& atimer = operations[index].timer; 00549 00550 atimer.countdown(limits.command_timeout_ms); 00551 MQTTString topic = {(char*)topicName, 0, 0}; 00552 00553 if (message->qos == QOS1 || message->qos == QOS2) 00554 message->id = packetid.getNext(); 00555 00556 int len = MQTTSerialize_publish(buf, limits.MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id, topic, (char*)message->payload, message->payloadlen); 00557 int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet 00558 if (rc != len) 00559 goto exit; // there was a problem 00560 00561 /* wait for acks */ 00562 if (resultHandler == 0) 00563 { 00564 if (message->qos == QOS1) 00565 { 00566 if (waitfor(PUBACK, atimer) == PUBACK) 00567 { 00568 int type, dup, mypacketid; 00569 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) 00570 rc = 0; 00571 } 00572 } 00573 else if (message->qos == QOS2) 00574 { 00575 if (waitfor(PUBCOMP, atimer) == PUBCOMP) 00576 { 00577 int type, dup, mypacketid; 00578 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) 00579 rc = 0; 00580 } 00581 00582 } 00583 } 00584 else 00585 { 00586 // set publish response callback function 00587 00588 } 00589 00590 exit: 00591 return rc; 00592 } 00593 00594 00595 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::disconnect(resultHandler resultHandler) 00596 { 00597 Timer timer = Timer(limits.command_timeout_ms); // we might wait for incomplete incoming publishes to complete 00598 int len = MQTTSerialize_disconnect(buf, limits.MAX_MQTT_PACKET_SIZE); 00599 int rc = sendPacket(len, timer.left_ms()); // send the disconnect packet 00600 00601 return (rc == len) ? 0 : -1; 00602 } 00603 00604 00605 00606 #endif
Generated on Mon Jul 18 2022 07:40:17 by
1.7.2