Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
MQTTClient.h
00001 /******************************************************************************* 00002 * Copyright (c) 2014, 2015 IBM Corp. 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Ian Craggs - initial API and implementation and/or initial documentation 00015 * Ian Craggs - fix for bug 458512 - QoS 2 messages 00016 * Ian Craggs - fix for bug 460389 - send loop uses wrong length 00017 * Ian Craggs - fix for bug 464169 - clearing subscriptions 00018 * Ian Craggs - fix for bug 464551 - enums and ints can be different size 00019 *******************************************************************************/ 00020 00021 #if !defined(MQTTCLIENT_H) 00022 #define MQTTCLIENT_H 00023 00024 #include "FP.h" 00025 #include "MQTTPacket.h" 00026 #include "stdio.h" 00027 #include "MQTTLogging.h" 00028 00029 #if !defined(MQTTCLIENT_QOS1) 00030 #define MQTTCLIENT_QOS1 1 00031 #endif 00032 #if !defined(MQTTCLIENT_QOS2) 00033 #define MQTTCLIENT_QOS2 0 00034 #endif 00035 00036 namespace MQTT 00037 { 00038 00039 00040 enum QoS { QOS0, QOS1, QOS2 }; 00041 00042 // all failure return codes must be negative 00043 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 }; 00044 00045 00046 struct Message 00047 { 00048 enum QoS qos; 00049 bool retained; 00050 bool dup; 00051 unsigned short id; 00052 void *payload; 00053 size_t payloadlen; 00054 }; 00055 00056 00057 struct MessageData 00058 { 00059 MessageData(MQTTString &aTopicName, struct Message &aMessage) : message(aMessage), topicName(aTopicName) 00060 { } 00061 00062 struct Message &message; 00063 MQTTString &topicName; 00064 }; 00065 00066 00067 class PacketId 00068 { 00069 public: 00070 PacketId() 00071 { 00072 next = 0; 00073 } 00074 00075 int getNext() 00076 { 00077 return next = (next == MAX_PACKET_ID) ? 1 : ++next; 00078 } 00079 00080 private: 00081 static const int MAX_PACKET_ID = 65535; 00082 int next; 00083 }; 00084 00085 00086 /** 00087 * @class Client 00088 * @brief blocking, non-threaded MQTT client API 00089 * 00090 * This version of the API blocks on all method calls, until they are complete. This means that only one 00091 * MQTT request can be in process at any one time. 00092 * @param Network a network class which supports send, receive 00093 * @param Timer a timer class with the methods: 00094 */ 00095 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 512, int MAX_MESSAGE_HANDLERS = 5> 00096 class Client 00097 { 00098 00099 public: 00100 00101 typedef void (*messageHandler)(MessageData&); 00102 00103 /** Construct the client 00104 * @param network - pointer to an instance of the Network class - must be connected to the endpoint 00105 * before calling MQTT connect 00106 * @param limits an instance of the Limit class - to alter limits as required 00107 */ 00108 Client(Network& network, unsigned int command_timeout_ms = 30000); 00109 00110 /** Set the default message handling callback - used for any message which does not match a subscription message handler 00111 * @param mh - pointer to the callback function 00112 */ 00113 void setDefaultMessageHandler(messageHandler mh) 00114 { 00115 defaultMessageHandler.attach(mh); 00116 } 00117 00118 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00119 * The nework object must be connected to the network endpoint before calling this 00120 * Default connect options are used 00121 * @return success code - 00122 */ 00123 int connect(); 00124 00125 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00126 * The nework object must be connected to the network endpoint before calling this 00127 * @param options - connect options 00128 * @return success code - 00129 */ 00130 int connect(MQTTPacket_connectData& options); 00131 00132 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00133 * @param topic - the topic to publish to 00134 * @param message - the message to send 00135 * @return success code - 00136 */ 00137 int publish(const char* topicName, Message& message); 00138 00139 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00140 * @param topic - the topic to publish to 00141 * @param payload - the data to send 00142 * @param payloadlen - the length of the data 00143 * @param qos - the QoS to send the publish at 00144 * @param retained - whether the message should be retained 00145 * @return success code - 00146 */ 00147 int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false); 00148 00149 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00150 * @param topic - the topic to publish to 00151 * @param payload - the data to send 00152 * @param payloadlen - the length of the data 00153 * @param id - the packet id used - returned 00154 * @param qos - the QoS to send the publish at 00155 * @param retained - whether the message should be retained 00156 * @return success code - 00157 */ 00158 int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false); 00159 00160 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback 00161 * @param topicFilter - a topic pattern which can include wildcards 00162 * @param qos - the MQTT QoS to subscribe at 00163 * @param mh - the callback function to be invoked when a message is received for this subscription 00164 * @return success code - 00165 */ 00166 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh); 00167 00168 /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback 00169 * @param topicFilter - a topic pattern which can include wildcards 00170 * @return success code - 00171 */ 00172 int unsubscribe(const char* topicFilter); 00173 00174 /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state 00175 * @return success code - 00176 */ 00177 int disconnect(); 00178 00179 /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive 00180 * yield can be called if no other MQTT operation is needed. This will also allow messages to be 00181 * received. 00182 * @param timeout_ms the time to wait, in milliseconds 00183 * @return success code - on failure, this means the client has disconnected 00184 */ 00185 int yield(unsigned long timeout_ms = 1000L); 00186 00187 /** Is the client connected? 00188 * @return flag - is the client connected or not? 00189 */ 00190 bool isConnected() 00191 { 00192 return isconnected; 00193 } 00194 00195 private: 00196 00197 void cleanSession(); 00198 int cycle(Timer& timer); 00199 int waitfor(int packet_type, Timer& timer); 00200 int keepalive(); 00201 int publish(int len, Timer& timer, enum QoS qos); 00202 00203 int decodePacket(int* value, int timeout); 00204 int readPacket(Timer& timer); 00205 int sendPacket(int length, Timer& timer); 00206 int deliverMessage(MQTTString& topicName, Message& message); 00207 bool isTopicMatched(char* topicFilter, MQTTString& topicName); 00208 00209 Network& ipstack; 00210 unsigned long command_timeout_ms; 00211 00212 unsigned char sendbuf[MAX_MQTT_PACKET_SIZE]; 00213 unsigned char readbuf[MAX_MQTT_PACKET_SIZE]; 00214 00215 Timer last_sent, last_received; 00216 unsigned int keepAliveInterval; 00217 bool ping_outstanding; 00218 bool cleansession; 00219 00220 PacketId packetid; 00221 00222 struct MessageHandlers 00223 { 00224 const char* topicFilter; 00225 FP<void, MessageData&> fp; 00226 } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic 00227 00228 FP<void, MessageData&> defaultMessageHandler; 00229 00230 bool isconnected; 00231 00232 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00233 unsigned char pubbuf[MAX_MQTT_PACKET_SIZE]; // store the last publish for sending on reconnect 00234 int inflightLen; 00235 unsigned short inflightMsgid; 00236 enum QoS inflightQoS; 00237 #endif 00238 00239 #if MQTTCLIENT_QOS2 00240 bool pubrel; 00241 #if !defined(MAX_INCOMING_QOS2_MESSAGES) 00242 #define MAX_INCOMING_QOS2_MESSAGES 10 00243 #endif 00244 unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES]; 00245 bool isQoS2msgidFree(unsigned short id); 00246 bool useQoS2msgid(unsigned short id); 00247 void freeQoS2msgid(unsigned short id); 00248 #endif 00249 00250 }; 00251 00252 } 00253 00254 00255 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00256 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::cleanSession() 00257 { 00258 ping_outstanding = false; 00259 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00260 messageHandlers[i].topicFilter = 0; 00261 isconnected = false; 00262 00263 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00264 inflightMsgid = 0; 00265 inflightQoS = QOS0; 00266 #endif 00267 00268 #if MQTTCLIENT_QOS2 00269 pubrel = false; 00270 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00271 incomingQoS2messages[i] = 0; 00272 #endif 00273 } 00274 00275 00276 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00277 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms) : ipstack(network), packetid() 00278 { 00279 last_sent = Timer(); 00280 last_received = Timer(); 00281 this->command_timeout_ms = command_timeout_ms; 00282 cleanSession(); 00283 } 00284 00285 00286 #if MQTTCLIENT_QOS2 00287 template<class Network, class Timer, int a, int b> 00288 bool MQTT::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id) 00289 { 00290 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00291 { 00292 if (incomingQoS2messages[i] == id) 00293 return false; 00294 } 00295 return true; 00296 } 00297 00298 00299 template<class Network, class Timer, int a, int b> 00300 bool MQTT::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id) 00301 { 00302 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00303 { 00304 if (incomingQoS2messages[i] == 0) 00305 { 00306 incomingQoS2messages[i] = id; 00307 return true; 00308 } 00309 } 00310 return false; 00311 } 00312 00313 00314 template<class Network, class Timer, int a, int b> 00315 void MQTT::Client<Network, Timer, a, b>::freeQoS2msgid(unsigned short id) 00316 { 00317 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00318 { 00319 if (incomingQoS2messages[i] == id) 00320 { 00321 incomingQoS2messages[i] = 0; 00322 return; 00323 } 00324 } 00325 } 00326 #endif 00327 00328 00329 template<class Network, class Timer, int a, int b> 00330 int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer) 00331 { 00332 int rc = FAILURE, 00333 sent = 0; 00334 00335 while (sent < length && !timer.expired()) 00336 { 00337 rc = ipstack.write(&sendbuf[sent], length - sent, timer.left_ms()); 00338 if (rc < 0) // there was an error writing the data 00339 break; 00340 sent += rc; 00341 } 00342 if (sent == length) 00343 { 00344 if (this->keepAliveInterval > 0) 00345 last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet 00346 rc = SUCCESS; 00347 } 00348 else 00349 rc = FAILURE; 00350 00351 #if defined(MQTT_DEBUG) 00352 char printbuf[150]; 00353 char * MQTTFormat_toServerString(char* strbuf, int strbuflen, unsigned char* buf, int buflen); 00354 DEBUG("Rc %d from sending packet %s\n", rc, MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length)); 00355 #endif 00356 return rc; 00357 } 00358 00359 00360 template<class Network, class Timer, int a, int b> 00361 int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout) 00362 { 00363 unsigned char c; 00364 int multiplier = 1; 00365 int len = 0; 00366 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; 00367 00368 *value = 0; 00369 do 00370 { 00371 int rc = MQTTPACKET_READ_ERROR; 00372 00373 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) 00374 { 00375 rc = MQTTPACKET_READ_ERROR; /* bad data */ 00376 goto exit; 00377 } 00378 rc = ipstack.read(&c, 1, timeout); 00379 if (rc != 1) 00380 goto exit; 00381 *value += (c & 127) * multiplier; 00382 multiplier *= 128; 00383 } while ((c & 128) != 0); 00384 exit: 00385 return len; 00386 } 00387 00388 00389 /** 00390 * If any read fails in this method, then we should disconnect from the network, as on reconnect 00391 * the packets can be retried. 00392 * @param timeout the max time to wait for the packet read to complete, in milliseconds 00393 * @return the MQTT packet type, or -1 if none 00394 */ 00395 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00396 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::readPacket(Timer& timer) 00397 { 00398 int rc = FAILURE; 00399 MQTTHeader header = {0}; 00400 int len = 0; 00401 int rem_len = 0; 00402 00403 /* 1. read the header byte. This has the packet type in it */ 00404 if (ipstack.read(readbuf, 1, timer.left_ms()) != 1) 00405 goto exit; 00406 00407 len = 1; 00408 /* 2. read the remaining length. This is variable in itself */ 00409 decodePacket(&rem_len, timer.left_ms()); 00410 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */ 00411 00412 if (rem_len > (MAX_MQTT_PACKET_SIZE - len)) 00413 { 00414 rc = BUFFER_OVERFLOW; 00415 goto exit; 00416 } 00417 00418 /* 3. read the rest of the buffer using a callback to supply the rest of the data */ 00419 if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len)) 00420 goto exit; 00421 00422 header.byte = readbuf[0]; 00423 rc = header.bits.type; 00424 if (this->keepAliveInterval > 0) 00425 last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet 00426 exit: 00427 00428 #if defined(MQTT_DEBUG) 00429 if (rc >= 0) 00430 { 00431 char printbuf[50]; 00432 char* MQTTFormat_toClientString(char* strbuf, int strbuflen, unsigned char* buf, int buflen); 00433 DEBUG("Rc %d from receiving packet %s\n", rc, MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len)); 00434 } 00435 #endif 00436 return rc; 00437 } 00438 00439 00440 // assume topic filter and name is in correct format 00441 // # can only be at end 00442 // + and # can only be next to separator 00443 template<class Network, class Timer, int a, int b> 00444 bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName) 00445 { 00446 char* curf = topicFilter; 00447 char* curn = topicName.lenstring.data; 00448 char* curn_end = curn + topicName.lenstring.len; 00449 00450 while (*curf && curn < curn_end) 00451 { 00452 if (*curn == '/' && *curf != '/') 00453 break; 00454 if (*curf != '+' && *curf != '#' && *curf != *curn) 00455 break; 00456 if (*curf == '+') 00457 { // skip until we meet the next separator, or end of string 00458 char* nextpos = curn + 1; 00459 while (nextpos < curn_end && *nextpos != '/') 00460 nextpos = ++curn + 1; 00461 } 00462 else if (*curf == '#') 00463 curn = curn_end - 1; // skip until end of string 00464 curf++; 00465 curn++; 00466 }; 00467 00468 return (curn == curn_end) && (*curf == '\0'); 00469 } 00470 00471 00472 00473 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00474 int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message) 00475 { 00476 int rc = FAILURE; 00477 00478 // we have to find the right message handler - indexed by topic 00479 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00480 { 00481 if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) || 00482 isTopicMatched((char*)messageHandlers[i].topicFilter, topicName))) 00483 { 00484 if (messageHandlers[i].fp.attached()) 00485 { 00486 MessageData md(topicName, message); 00487 messageHandlers[i].fp(md); 00488 rc = SUCCESS; 00489 } 00490 } 00491 } 00492 00493 if (rc == FAILURE && defaultMessageHandler.attached()) 00494 { 00495 MessageData md(topicName, message); 00496 defaultMessageHandler(md); 00497 rc = SUCCESS; 00498 } 00499 00500 return rc; 00501 } 00502 00503 00504 00505 template<class Network, class Timer, int a, int b> 00506 int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms) 00507 { 00508 int rc = SUCCESS; 00509 Timer timer = Timer(); 00510 00511 timer.countdown_ms(timeout_ms); 00512 while (!timer.expired()) 00513 { 00514 if (cycle(timer) < 0) 00515 { 00516 rc = FAILURE; 00517 break; 00518 } 00519 } 00520 00521 return rc; 00522 } 00523 00524 00525 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00526 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer) 00527 { 00528 /* get one piece of work off the wire and one pass through */ 00529 00530 // read the socket, see what work is due 00531 int packet_type = readPacket(timer); 00532 00533 int len = 0, 00534 rc = SUCCESS; 00535 00536 switch (packet_type) 00537 { 00538 case FAILURE: 00539 case BUFFER_OVERFLOW: 00540 rc = packet_type; 00541 break; 00542 case CONNACK: 00543 case PUBACK: 00544 case SUBACK: 00545 break; 00546 case PUBLISH: 00547 { 00548 MQTTString topicName = MQTTString_initializer; 00549 Message msg; 00550 int intQoS; 00551 if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName, 00552 (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00553 goto exit; 00554 msg.qos = (enum QoS)intQoS; 00555 #if MQTTCLIENT_QOS2 00556 if (msg.qos != QOS2) 00557 #endif 00558 deliverMessage(topicName, msg); 00559 #if MQTTCLIENT_QOS2 00560 else if (isQoS2msgidFree(msg.id)) 00561 { 00562 if (useQoS2msgid(msg.id)) 00563 deliverMessage(topicName, msg); 00564 else 00565 WARN("Maximum number of incoming QoS2 messages exceeded"); 00566 } 00567 #endif 00568 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00569 if (msg.qos != QOS0) 00570 { 00571 if (msg.qos == QOS1) 00572 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id); 00573 else if (msg.qos == QOS2) 00574 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id); 00575 if (len <= 0) 00576 rc = FAILURE; 00577 else 00578 rc = sendPacket(len, timer); 00579 if (rc == FAILURE) 00580 goto exit; // there was a problem 00581 } 00582 break; 00583 #endif 00584 } 00585 #if MQTTCLIENT_QOS2 00586 case PUBREC: 00587 case PUBREL: 00588 unsigned short mypacketid; 00589 unsigned char dup, type; 00590 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00591 rc = FAILURE; 00592 else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, 00593 (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0) 00594 rc = FAILURE; 00595 else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet 00596 rc = FAILURE; // there was a problem 00597 if (rc == FAILURE) 00598 goto exit; // there was a problem 00599 if (packet_type == PUBREL) 00600 freeQoS2msgid(mypacketid); 00601 break; 00602 00603 case PUBCOMP: 00604 break; 00605 #endif 00606 case PINGRESP: 00607 ping_outstanding = false; 00608 break; 00609 } 00610 keepalive(); 00611 exit: 00612 if (rc == SUCCESS) 00613 rc = packet_type; 00614 return rc; 00615 } 00616 00617 00618 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00619 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive() 00620 { 00621 int rc = FAILURE; 00622 00623 if (keepAliveInterval == 0) 00624 { 00625 rc = SUCCESS; 00626 goto exit; 00627 } 00628 00629 if (last_sent.expired() || last_received.expired()) 00630 { 00631 if (!ping_outstanding) 00632 { 00633 Timer timer(1000); 00634 int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE); 00635 if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet 00636 ping_outstanding = true; 00637 } 00638 } 00639 00640 exit: 00641 return rc; 00642 } 00643 00644 00645 // only used in single-threaded mode where one command at a time is in process 00646 template<class Network, class Timer, int a, int b> 00647 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer) 00648 { 00649 int rc = FAILURE; 00650 00651 do 00652 { 00653 if (timer.expired()) 00654 break; // we timed out 00655 } 00656 while ((rc = cycle(timer)) != packet_type); 00657 00658 return rc; 00659 } 00660 00661 00662 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00663 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options) 00664 { 00665 Timer connect_timer(command_timeout_ms); 00666 int rc = FAILURE; 00667 int len = 0; 00668 00669 if (isconnected) // don't send connect packet again if we are already connected 00670 goto exit; 00671 00672 this->keepAliveInterval = options.keepAliveInterval; 00673 this->cleansession = options.cleansession; 00674 if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0) 00675 goto exit; 00676 if ((rc = sendPacket(len, connect_timer)) != SUCCESS) // send the connect packet 00677 goto exit; // there was a problem 00678 00679 if (this->keepAliveInterval > 0) 00680 last_received.countdown(this->keepAliveInterval); 00681 // this will be a blocking call, wait for the connack 00682 if (waitfor(CONNACK, connect_timer) == CONNACK) 00683 { 00684 unsigned char connack_rc = 255; 00685 bool sessionPresent = false; 00686 if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00687 rc = connack_rc; 00688 else 00689 rc = FAILURE; 00690 } 00691 else 00692 rc = FAILURE; 00693 00694 #if MQTTCLIENT_QOS2 00695 // resend any inflight publish 00696 if (inflightMsgid > 0 && inflightQoS == QOS2 && pubrel) 00697 { 00698 if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0) 00699 rc = FAILURE; 00700 else 00701 rc = publish(len, connect_timer, inflightQoS); 00702 } 00703 else 00704 #endif 00705 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00706 if (inflightMsgid > 0) 00707 { 00708 memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE); 00709 rc = publish(inflightLen, connect_timer, inflightQoS); 00710 } 00711 #endif 00712 00713 exit: 00714 if (rc == SUCCESS) 00715 isconnected = true; 00716 return rc; 00717 } 00718 00719 00720 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00721 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect() 00722 { 00723 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; 00724 return connect(default_options); 00725 } 00726 00727 00728 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00729 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler) 00730 { 00731 int rc = FAILURE; 00732 Timer timer(command_timeout_ms); 00733 int len = 0; 00734 MQTTString topic = {(char*)topicFilter, {0, 0}}; 00735 00736 if (!isconnected) 00737 goto exit; 00738 00739 len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); 00740 if (len <= 0) 00741 goto exit; 00742 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet 00743 goto exit; // there was a problem 00744 00745 if (waitfor(SUBACK, timer) == SUBACK) // wait for suback 00746 { 00747 int count = 0, grantedQoS = -1; 00748 unsigned short mypacketid; 00749 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00750 rc = grantedQoS; // 0, 1, 2 or 0x80 00751 if (rc != 0x80) 00752 { 00753 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00754 { 00755 if (messageHandlers[i].topicFilter == 0) 00756 { 00757 messageHandlers[i].topicFilter = topicFilter; 00758 messageHandlers[i].fp.attach(messageHandler); 00759 rc = 0; 00760 break; 00761 } 00762 } 00763 } 00764 } 00765 else 00766 rc = FAILURE; 00767 00768 exit: 00769 if (rc != SUCCESS) 00770 cleanSession(); 00771 return rc; 00772 } 00773 00774 00775 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00776 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter) 00777 { 00778 int rc = FAILURE; 00779 Timer timer(command_timeout_ms); 00780 MQTTString topic = {(char*)topicFilter, {0, 0}}; 00781 int len = 0; 00782 00783 if (!isconnected) 00784 goto exit; 00785 00786 if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0) 00787 goto exit; 00788 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet 00789 goto exit; // there was a problem 00790 00791 if (waitfor(UNSUBACK, timer) == UNSUBACK) 00792 { 00793 unsigned short mypacketid; // should be the same as the packetid above 00794 if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00795 { 00796 rc = 0; 00797 00798 // remove the subscription message handler associated with this topic, if there is one 00799 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00800 { 00801 if (messageHandlers[i].topicFilter && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0) 00802 { 00803 messageHandlers[i].topicFilter = 0; 00804 break; 00805 } 00806 } 00807 } 00808 } 00809 else 00810 rc = FAILURE; 00811 00812 exit: 00813 if (rc != SUCCESS) 00814 cleanSession(); 00815 return rc; 00816 } 00817 00818 00819 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00820 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos) 00821 { 00822 int rc; 00823 00824 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet 00825 goto exit; // there was a problem 00826 00827 #if MQTTCLIENT_QOS1 00828 if (qos == QOS1) 00829 { 00830 if (waitfor(PUBACK, timer) == PUBACK) 00831 { 00832 unsigned short mypacketid; 00833 unsigned char dup, type; 00834 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00835 rc = FAILURE; 00836 else if (inflightMsgid == mypacketid) 00837 inflightMsgid = 0; 00838 } 00839 else 00840 rc = FAILURE; 00841 } 00842 #elif MQTTCLIENT_QOS2 00843 else if (qos == QOS2) 00844 { 00845 if (waitfor(PUBCOMP, timer) == PUBCOMP) 00846 { 00847 unsigned short mypacketid; 00848 unsigned char dup, type; 00849 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00850 rc = FAILURE; 00851 else if (inflightMsgid == mypacketid) 00852 inflightMsgid = 0; 00853 } 00854 else 00855 rc = FAILURE; 00856 } 00857 #endif 00858 00859 exit: 00860 if (rc != SUCCESS) 00861 cleanSession(); 00862 return rc; 00863 } 00864 00865 00866 00867 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00868 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained) 00869 { 00870 int rc = FAILURE; 00871 Timer timer(command_timeout_ms); 00872 MQTTString topicString = MQTTString_initializer; 00873 int len = 0; 00874 00875 if (!isconnected) 00876 goto exit; 00877 00878 topicString.cstring = (char*)topicName; 00879 00880 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00881 if (qos == QOS1 || qos == QOS2) 00882 id = packetid.getNext(); 00883 #endif 00884 00885 len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id, 00886 topicString, (unsigned char*)payload, payloadlen); 00887 if (len <= 0) 00888 goto exit; 00889 00890 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00891 if (!cleansession) 00892 { 00893 memcpy(pubbuf, sendbuf, len); 00894 inflightMsgid = id; 00895 inflightLen = len; 00896 inflightQoS = qos; 00897 #if MQTTCLIENT_QOS2 00898 pubrel = false; 00899 #endif 00900 } 00901 #endif 00902 00903 rc = publish(len, timer, qos); 00904 exit: 00905 return rc; 00906 } 00907 00908 00909 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00910 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained) 00911 { 00912 unsigned short id = 0; // dummy - not used for anything 00913 return publish(topicName, payload, payloadlen, id, qos, retained); 00914 } 00915 00916 00917 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00918 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message) 00919 { 00920 return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained); 00921 } 00922 00923 00924 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00925 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect() 00926 { 00927 int rc = FAILURE; 00928 Timer timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete 00929 int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE); 00930 if (len > 0) 00931 rc = sendPacket(len, timer); // send the disconnect packet 00932 00933 if (cleansession) 00934 cleanSession(); 00935 else 00936 isconnected = false; 00937 return rc; 00938 } 00939 00940 00941 #endif
Generated on Wed Jul 13 2022 01:56:50 by
1.7.2