Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependencies: C027 C027_Support C12832 mbed-rtos mbed FP MQTTPacket
Fork of HelloMQTT by
MQTTClient.h
00001 /******************************************************************************* 00002 * Copyright (c) 2014 IBM Corp. 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Ian Craggs - initial API and implementation and/or initial documentation 00015 *******************************************************************************/ 00016 00017 /* 00018 00019 TODO: 00020 00021 ensure publish packets are retried on reconnect 00022 00023 updating usage of FP. Try to remove inclusion of FP.cpp in main. sg- 00024 00025 */ 00026 00027 #if !defined(MQTTCLIENT_H) 00028 #define MQTTCLIENT_H 00029 00030 #include "FP.h" 00031 #include "MQTTPacket.h" 00032 #include "stdio.h" 00033 00034 namespace MQTT 00035 { 00036 00037 00038 enum QoS { QOS0, QOS1, QOS2 }; 00039 00040 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 }; 00041 00042 00043 struct Message 00044 { 00045 enum QoS qos; 00046 bool retained; 00047 bool dup; 00048 unsigned short id; 00049 void *payload; 00050 size_t payloadlen; 00051 }; 00052 00053 00054 struct MessageData 00055 { 00056 struct Message message; 00057 char* topicName; 00058 }; 00059 00060 00061 class PacketId 00062 { 00063 public: 00064 PacketId() 00065 { 00066 next = 0; 00067 } 00068 00069 int getNext() 00070 { 00071 return next = (next == MAX_PACKET_ID) ? 1 : ++next; 00072 } 00073 00074 private: 00075 static const int MAX_PACKET_ID = 65535; 00076 int next; 00077 }; 00078 00079 00080 /** 00081 * @class Client 00082 * @brief blocking, non-threaded MQTT client API 00083 * 00084 * This version of the API blocks on all method calls, until they are complete. This means that only one 00085 * MQTT request can be in process at any one time. 00086 * @param Network a network class which supports send, receive 00087 * @param Timer a timer class with the methods: 00088 */ 00089 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5> class Client 00090 { 00091 00092 public: 00093 00094 typedef void (*messageHandler)(Message*); 00095 00096 /** Construct the client 00097 * @param network - pointer to an instance of the Network class - must be connected to the endpoint 00098 * before calling MQTT connect 00099 * @param limits an instance of the Limit class - to alter limits as required 00100 */ 00101 Client(Network& network, unsigned int command_timeout_ms = 30000); 00102 00103 /** Set the default message handling callback - used for any message which does not match a subscription message handler 00104 * @param mh - pointer to the callback function 00105 */ 00106 void setDefaultMessageHandler(messageHandler mh) 00107 { 00108 defaultMessageHandler.attach(mh); 00109 } 00110 00111 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00112 * The nework object must be connected to the network endpoint before calling this 00113 * @param options - connect options 00114 * @return success code - 00115 */ 00116 int connect(MQTTPacket_connectData* options = 0); 00117 00118 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00119 * @param topic - the topic to publish to 00120 * @param message - the message to send 00121 * @return success code - 00122 */ 00123 int publish(const char* topicName, Message* message); 00124 00125 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback 00126 * @param topicFilter - a topic pattern which can include wildcards 00127 * @param qos - the MQTT QoS to subscribe at 00128 * @param mh - the callback function to be invoked when a message is received for this subscription 00129 * @return success code - 00130 */ 00131 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh); 00132 00133 /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback 00134 * @param topicFilter - a topic pattern which can include wildcards 00135 * @return success code - 00136 */ 00137 int unsubscribe(const char* topicFilter); 00138 00139 /** MQTT Disconnect - send an MQTT disconnect packet 00140 * @return success code - 00141 */ 00142 int disconnect(); 00143 00144 /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive 00145 * yield can be called if no other MQTT operation is needed. This will also allow messages to be 00146 * received. 00147 * @param timeout_ms the time to wait, in milliseconds 00148 * @return success code - on failure, this means the client has disconnected 00149 */ 00150 int yield(int timeout_ms = 1000); 00151 00152 private: 00153 00154 int cycle(Timer& timer); 00155 int waitfor(int packet_type, Timer& timer); 00156 int keepalive(); 00157 00158 int decodePacket(int* value, int timeout); 00159 int readPacket(Timer& timer); 00160 int sendPacket(int length, Timer& timer); 00161 int deliverMessage(MQTTString& topicName, Message& message); 00162 bool isTopicMatched(char* topicFilter, MQTTString& topicName); 00163 00164 Network& ipstack; 00165 unsigned int command_timeout_ms; 00166 00167 char buf[MAX_MQTT_PACKET_SIZE]; 00168 char readbuf[MAX_MQTT_PACKET_SIZE]; 00169 00170 Timer ping_timer; 00171 unsigned int keepAliveInterval; 00172 bool ping_outstanding; 00173 00174 PacketId packetid; 00175 00176 // typedef FP<void, Message*> messageHandlerFP; 00177 struct MessageHandlers 00178 { 00179 const char* topicFilter; 00180 //messageHandlerFP fp; typedefs not liked? 00181 FP<void, Message*> fp; 00182 } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic 00183 00184 FP<void, Message*> defaultMessageHandler; 00185 00186 bool isconnected; 00187 00188 }; 00189 00190 } 00191 00192 00193 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00194 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms) : ipstack(network), packetid() 00195 { 00196 ping_timer = Timer(); 00197 ping_outstanding = 0; 00198 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00199 messageHandlers[i].topicFilter = 0; 00200 this->command_timeout_ms = command_timeout_ms; 00201 isconnected = false; 00202 } 00203 00204 00205 template<class Network, class Timer, int a, int b> 00206 int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer) 00207 { 00208 int rc = FAILURE, 00209 sent = 0; 00210 00211 while (sent < length && !timer.expired()) 00212 { 00213 rc = ipstack.write(&buf[sent], length, timer.left_ms()); 00214 if (rc < 0) // there was an error writing the data 00215 break; 00216 sent += rc; 00217 } 00218 if (sent == length) 00219 { 00220 ping_timer.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet 00221 rc = SUCCESS; 00222 } 00223 else 00224 rc = FAILURE; 00225 return rc; 00226 } 00227 00228 00229 template<class Network, class Timer, int a, int b> 00230 int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout) 00231 { 00232 char c; 00233 int multiplier = 1; 00234 int len = 0; 00235 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; 00236 00237 *value = 0; 00238 do 00239 { 00240 int rc = MQTTPACKET_READ_ERROR; 00241 00242 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) 00243 { 00244 rc = MQTTPACKET_READ_ERROR; /* bad data */ 00245 goto exit; 00246 } 00247 rc = ipstack.read(&c, 1, timeout); 00248 if (rc != 1) 00249 goto exit; 00250 *value += (c & 127) * multiplier; 00251 multiplier *= 128; 00252 } while ((c & 128) != 0); 00253 exit: 00254 return len; 00255 } 00256 00257 00258 /** 00259 * If any read fails in this method, then we should disconnect from the network, as on reconnect 00260 * the packets can be retried. 00261 * @param timeout the max time to wait for the packet read to complete, in milliseconds 00262 * @return the MQTT packet type, or -1 if none 00263 */ 00264 template<class Network, class Timer, int a, int b> 00265 int MQTT::Client<Network, Timer, a, b>::readPacket(Timer& timer) 00266 { 00267 int rc = FAILURE; 00268 MQTTHeader header = {0}; 00269 int len = 0; 00270 int rem_len = 0; 00271 00272 /* 1. read the header byte. This has the packet type in it */ 00273 if (ipstack.read(readbuf, 1, timer.left_ms()) != 1) 00274 goto exit; 00275 00276 len = 1; 00277 /* 2. read the remaining length. This is variable in itself */ 00278 decodePacket(&rem_len, timer.left_ms()); 00279 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */ 00280 00281 /* 3. read the rest of the buffer using a callback to supply the rest of the data */ 00282 if (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len) 00283 goto exit; 00284 00285 header.byte = readbuf[0]; 00286 rc = header.bits.type; 00287 exit: 00288 return rc; 00289 } 00290 00291 00292 // assume topic filter and name is in correct format 00293 // # can only be at end 00294 // + and # can only be next to separator 00295 template<class Network, class Timer, int a, int b> 00296 bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName) 00297 { 00298 char* curf = topicFilter; 00299 char* curn = topicName.lenstring.data; 00300 char* curn_end = curn + topicName.lenstring.len; 00301 00302 while (*curf && curn < curn_end) 00303 { 00304 if (*curn == '/' && *curf != '/') 00305 break; 00306 if (*curf != '+' && *curf != '#' && *curf != *curn) 00307 break; 00308 if (*curf == '+') 00309 { // skip until we meet the next separator, or end of string 00310 char* nextpos = curn + 1; 00311 while (nextpos < curn_end && *nextpos != '/') 00312 nextpos = ++curn + 1; 00313 } 00314 else if (*curf == '#') 00315 curn = curn_end - 1; // skip until end of string 00316 curf++; 00317 curn++; 00318 }; 00319 00320 return (curn == curn_end) && (*curf == '\0'); 00321 } 00322 00323 00324 00325 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00326 int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message) 00327 { 00328 int rc = FAILURE; 00329 00330 // we have to find the right message handler - indexed by topic 00331 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00332 { 00333 if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) || 00334 isTopicMatched((char*)messageHandlers[i].topicFilter, topicName))) 00335 { 00336 if (messageHandlers[i].fp.attached()) 00337 { 00338 messageHandlers[i].fp(&message); 00339 rc = SUCCESS; 00340 } 00341 } 00342 } 00343 00344 if (rc == FAILURE && defaultMessageHandler.attached()) 00345 { 00346 defaultMessageHandler(&message); 00347 rc = SUCCESS; 00348 } 00349 00350 return rc; 00351 } 00352 00353 00354 00355 template<class Network, class Timer, int a, int b> 00356 int MQTT::Client<Network, Timer, a, b>::yield(int timeout_ms) 00357 { 00358 int rc = SUCCESS; 00359 Timer timer = Timer(); 00360 00361 timer.countdown_ms(timeout_ms); 00362 while (!timer.expired()) 00363 { 00364 if (cycle(timer) == FAILURE) 00365 { 00366 rc = FAILURE; 00367 break; 00368 } 00369 } 00370 00371 return rc; 00372 } 00373 00374 00375 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00376 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer) 00377 { 00378 /* get one piece of work off the wire and one pass through */ 00379 00380 // read the socket, see what work is due 00381 int packet_type = readPacket(timer); 00382 00383 int len = 0, 00384 rc = SUCCESS; 00385 00386 switch (packet_type) 00387 { 00388 case CONNACK: 00389 case PUBACK: 00390 case SUBACK: 00391 break; 00392 case PUBLISH: 00393 MQTTString topicName; 00394 Message msg; 00395 if (MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName, 00396 (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00397 goto exit; 00398 deliverMessage(topicName, msg); 00399 if (msg.qos != QOS0) 00400 { 00401 if (msg.qos == QOS1) 00402 len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id); 00403 else if (msg.qos == QOS2) 00404 len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id); 00405 if (len <= 0) 00406 rc = FAILURE; 00407 else 00408 rc = sendPacket(len, timer); 00409 if (rc == FAILURE) 00410 goto exit; // there was a problem 00411 } 00412 break; 00413 case PUBREC: 00414 int type, dup, mypacketid; 00415 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00416 rc = FAILURE; 00417 else if ((len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid)) <= 0) 00418 rc = FAILURE; 00419 else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet 00420 rc = FAILURE; // there was a problem 00421 if (rc == FAILURE) 00422 goto exit; // there was a problem 00423 break; 00424 case PUBCOMP: 00425 break; 00426 case PINGRESP: 00427 ping_outstanding = false; 00428 break; 00429 } 00430 keepalive(); 00431 exit: 00432 if (rc == SUCCESS) 00433 rc = packet_type; 00434 return rc; 00435 } 00436 00437 00438 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00439 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive() 00440 { 00441 int rc = FAILURE; 00442 00443 if (keepAliveInterval == 0) 00444 { 00445 rc = SUCCESS; 00446 goto exit; 00447 } 00448 00449 if (ping_timer.expired()) 00450 { 00451 if (!ping_outstanding) 00452 { 00453 Timer timer = Timer(1000); 00454 int len = MQTTSerialize_pingreq(buf, MAX_MQTT_PACKET_SIZE); 00455 if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet 00456 ping_outstanding = true; 00457 } 00458 } 00459 00460 exit: 00461 return rc; 00462 } 00463 00464 00465 // only used in single-threaded mode where one command at a time is in process 00466 template<class Network, class Timer, int a, int b> 00467 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer) 00468 { 00469 int rc = FAILURE; 00470 00471 do 00472 { 00473 if (timer.expired()) 00474 break; // we timed out 00475 } 00476 while ((rc = cycle(timer)) != packet_type); 00477 00478 return rc; 00479 } 00480 00481 00482 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00483 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData* options) 00484 { 00485 Timer connect_timer = Timer(command_timeout_ms); 00486 int rc = FAILURE; 00487 00488 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; 00489 if (options == 0) 00490 options = &default_options; // set default options if none were supplied 00491 00492 this->keepAliveInterval = options->keepAliveInterval; 00493 ping_timer.countdown(this->keepAliveInterval); 00494 int len = MQTTSerialize_connect(buf, MAX_MQTT_PACKET_SIZE, options); 00495 if (len <= 0) 00496 goto exit; 00497 if ((rc = sendPacket(len, connect_timer)) != SUCCESS) // send the connect packet 00498 goto exit; // there was a problem 00499 00500 // this will be a blocking call, wait for the connack 00501 if (waitfor(CONNACK, connect_timer) == CONNACK) 00502 { 00503 int connack_rc = -1; 00504 if (MQTTDeserialize_connack(&connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00505 rc = connack_rc; 00506 else 00507 rc = FAILURE; 00508 } 00509 else 00510 rc = FAILURE; 00511 00512 exit: 00513 if (rc == SUCCESS) 00514 isconnected = true; 00515 return rc; 00516 } 00517 00518 00519 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00520 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler) 00521 { 00522 int rc = FAILURE; 00523 Timer timer = Timer(command_timeout_ms); 00524 int len = 0; 00525 00526 MQTTString topic = {(char*)topicFilter, 0, 0}; 00527 if (!isconnected) 00528 goto exit; 00529 00530 len = MQTTSerialize_subscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); 00531 if (len <= 0) 00532 goto exit; 00533 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet 00534 goto exit; // there was a problem 00535 00536 if (waitfor(SUBACK, timer) == SUBACK) // wait for suback 00537 { 00538 int count = 0, grantedQoS = -1, mypacketid; 00539 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00540 rc = grantedQoS; // 0, 1, 2 or 0x80 00541 if (rc != 0x80) 00542 { 00543 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00544 { 00545 if (messageHandlers[i].topicFilter == 0) 00546 { 00547 messageHandlers[i].topicFilter = topicFilter; 00548 messageHandlers[i].fp.attach(messageHandler); 00549 rc = 0; 00550 break; 00551 } 00552 } 00553 } 00554 } 00555 else 00556 rc = FAILURE; 00557 00558 exit: 00559 //if (rc == FAILURE) 00560 // closesession(); 00561 return rc; 00562 } 00563 00564 00565 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00566 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter) 00567 { 00568 int rc = FAILURE; 00569 Timer timer = Timer(command_timeout_ms); 00570 00571 MQTTString topic = {(char*)topicFilter, 0, 0}; 00572 00573 int len = MQTTSerialize_unsubscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic); 00574 if (len <= 0) 00575 goto exit; 00576 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet 00577 goto exit; // there was a problem 00578 00579 if (waitfor(UNSUBACK, timer) == UNSUBACK) 00580 { 00581 int mypacketid; // should be the same as the packetid above 00582 if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00583 rc = 0; 00584 } 00585 else 00586 rc = FAILURE; 00587 00588 exit: 00589 return rc; 00590 } 00591 00592 00593 00594 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00595 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message* message) 00596 { 00597 int rc = FAILURE; 00598 Timer timer = Timer(command_timeout_ms); 00599 00600 MQTTString topicString = {(char*)topicName, 0, 0}; 00601 00602 if (message->qos == QOS1 || message->qos == QOS2) 00603 message->id = packetid.getNext(); 00604 00605 int len = MQTTSerialize_publish(buf, MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id, 00606 topicString, (char*)message->payload, message->payloadlen); 00607 if (len <= 0) 00608 goto exit; 00609 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet 00610 goto exit; // there was a problem 00611 00612 if (message->qos == QOS1) 00613 { 00614 if (waitfor(PUBACK, timer) == PUBACK) 00615 { 00616 int type, dup, mypacketid; 00617 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00618 rc = FAILURE; 00619 } 00620 else 00621 rc = FAILURE; 00622 } 00623 else if (message->qos == QOS2) 00624 { 00625 if (waitfor(PUBCOMP, timer) == PUBCOMP) 00626 { 00627 int type, dup, mypacketid; 00628 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00629 rc = FAILURE; 00630 } 00631 else 00632 rc = FAILURE; 00633 } 00634 00635 exit: 00636 return rc; 00637 } 00638 00639 00640 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00641 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect() 00642 { 00643 int rc = FAILURE; 00644 Timer timer = Timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete 00645 int len = MQTTSerialize_disconnect(buf, MAX_MQTT_PACKET_SIZE); 00646 if (len > 0) 00647 rc = sendPacket(len, timer); // send the disconnect packet 00648 00649 return rc; 00650 } 00651 00652 00653 #endif
Generated on Thu Jul 14 2022 01:31:03 by
1.7.2
