Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Fork of Cayenne-MQTT-mbed by
MQTTClient.h
00001 /******************************************************************************* 00002 * Copyright (c) 2014, 2015 IBM Corp. 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Ian Craggs - initial API and implementation and/or initial documentation 00015 * Ian Craggs - fix for bug 458512 - QoS 2 messages 00016 * Ian Craggs - fix for bug 460389 - send loop uses wrong length 00017 * Ian Craggs - fix for bug 464169 - clearing subscriptions 00018 * Ian Craggs - fix for bug 464551 - enums and ints can be different size 00019 * Mark Sonnentag - fix for bug 475204 - inefficient instantiation of Timer 00020 * Ian Craggs - fix for bug 475749 - packetid modified twice 00021 *******************************************************************************/ 00022 00023 #if !defined(MQTTCLIENT_H) 00024 #define MQTTCLIENT_H 00025 00026 #include "FP.h" 00027 #include "MQTTPacket.h" 00028 #include "stdio.h" 00029 #include "MQTTLogging.h" 00030 00031 #if !defined(MQTTCLIENT_QOS1) 00032 #define MQTTCLIENT_QOS1 1 00033 #endif 00034 #if !defined(MQTTCLIENT_QOS2) 00035 #define MQTTCLIENT_QOS2 0 00036 #endif 00037 00038 namespace MQTT 00039 { 00040 00041 00042 enum QoS { QOS0, QOS1, QOS2 }; 00043 00044 // all failure return codes must be negative 00045 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 }; 00046 00047 00048 struct Message 00049 { 00050 enum QoS qos; 00051 bool retained; 00052 bool dup; 00053 unsigned short id; 00054 void *payload; 00055 size_t payloadlen; 00056 }; 00057 00058 00059 struct MessageData 00060 { 00061 MessageData(MQTTString &aTopicName, struct Message &aMessage) : message(aMessage), topicName(aTopicName) 00062 { } 00063 00064 struct Message &message; 00065 MQTTString &topicName; 00066 }; 00067 00068 00069 class PacketId 00070 { 00071 public: 00072 PacketId() 00073 { 00074 next = 0; 00075 } 00076 00077 int getNext() 00078 { 00079 return next = (next == MAX_PACKET_ID) ? 1 : next + 1; 00080 } 00081 00082 private: 00083 static const int MAX_PACKET_ID = 65535; 00084 int next; 00085 }; 00086 00087 00088 /** 00089 * @class Client 00090 * @brief blocking, non-threaded MQTT client API 00091 * 00092 * This version of the API blocks on all method calls, until they are complete. This means that only one 00093 * MQTT request can be in process at any one time. 00094 * @param Network a network class with the methods: read, write. See NetworkInterface.h for function definitions. 00095 * @param Timer a timer class with the methods: countdown_ms, countdown, left_ms, expired. See TimerInterface.h for function definitions. 00096 */ 00097 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5> 00098 class Client 00099 { 00100 00101 public: 00102 00103 typedef void (*messageHandler)(MessageData&); 00104 00105 /** Construct the client 00106 * @param network - pointer to an instance of the Network class - must be connected to the endpoint 00107 * before calling MQTT connect 00108 * @param limits an instance of the Limit class - to alter limits as required 00109 */ 00110 Client(Network& network, unsigned int command_timeout_ms = 30000); 00111 00112 /** Set the default message handling callback - used for any message which does not match a subscription message handler 00113 * @param mh - pointer to the callback function 00114 */ 00115 void setDefaultMessageHandler(messageHandler mh) 00116 { 00117 defaultMessageHandler.attach(mh); 00118 } 00119 00120 /** Set the default message handling callback - used for any message which does not match a subscription message handler 00121 * @param time - address of initialized object 00122 * @param mh - pointer to the callback function 00123 */ 00124 template<class T> 00125 void setDefaultMessageHandler(T *item, void (T::*method)(MessageData&)) 00126 { 00127 defaultMessageHandler.attach(item, method); 00128 } 00129 00130 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00131 * The nework object must be connected to the network endpoint before calling this 00132 * Default connect options are used 00133 * @return success code - 00134 */ 00135 int connect(); 00136 00137 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00138 * The nework object must be connected to the network endpoint before calling this 00139 * @param options - connect options 00140 * @return success code - 00141 */ 00142 int connect(MQTTPacket_connectData& options); 00143 00144 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00145 * @param topic - the topic to publish to 00146 * @param message - the message to send 00147 * @return success code - 00148 */ 00149 int publish(const char* topicName, Message& message); 00150 00151 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00152 * @param topic - the topic to publish to 00153 * @param payload - the data to send 00154 * @param payloadlen - the length of the data 00155 * @param qos - the QoS to send the publish at 00156 * @param retained - whether the message should be retained 00157 * @return success code - 00158 */ 00159 int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false); 00160 00161 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00162 * @param topic - the topic to publish to 00163 * @param payload - the data to send 00164 * @param payloadlen - the length of the data 00165 * @param id - the packet id used - returned 00166 * @param qos - the QoS to send the publish at 00167 * @param retained - whether the message should be retained 00168 * @return success code - 00169 */ 00170 int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false); 00171 00172 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback 00173 * @param topicFilter - a topic pattern which can include wildcards 00174 * @param qos - the MQTT QoS to subscribe at 00175 * @param mh - the callback function to be invoked when a message is received for this subscription 00176 * @return success code - 00177 */ 00178 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh); 00179 00180 /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback 00181 * @param topicFilter - a topic pattern which can include wildcards 00182 * @return success code - 00183 */ 00184 int unsubscribe(const char* topicFilter); 00185 00186 /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state 00187 * @return success code - 00188 */ 00189 int disconnect(); 00190 00191 /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive 00192 * yield can be called if no other MQTT operation is needed. This will also allow messages to be 00193 * received. 00194 * @param timeout_ms the time to wait, in milliseconds 00195 * @return success code - on failure, this means the client has disconnected 00196 */ 00197 int yield(unsigned long timeout_ms = 1000L); 00198 00199 /** Is the client connected? 00200 * @return flag - is the client connected or not? 00201 */ 00202 bool isConnected() 00203 { 00204 return isconnected; 00205 } 00206 00207 private: 00208 00209 void cleanSession(); 00210 int cycle(Timer& timer); 00211 int waitfor(int packet_type, Timer& timer); 00212 int keepalive(); 00213 int publish(int len, Timer& timer, enum QoS qos); 00214 00215 int decodePacket(int* value, int timeout); 00216 int readPacket(Timer& timer); 00217 int sendPacket(int length, Timer& timer); 00218 int deliverMessage(MQTTString& topicName, Message& message); 00219 bool isTopicMatched(char* topicFilter, MQTTString& topicName); 00220 00221 Network& ipstack; 00222 unsigned long command_timeout_ms; 00223 00224 unsigned char sendbuf[MAX_MQTT_PACKET_SIZE]; 00225 unsigned char readbuf[MAX_MQTT_PACKET_SIZE]; 00226 00227 Timer last_sent, last_received, ping_response; 00228 unsigned int keepAliveInterval; 00229 bool ping_outstanding; 00230 bool cleansession; 00231 00232 PacketId packetid; 00233 00234 struct MessageHandlers 00235 { 00236 const char* topicFilter; 00237 FP<void, MessageData&> fp; 00238 } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic 00239 00240 FP<void, MessageData&> defaultMessageHandler; 00241 00242 bool isconnected; 00243 00244 bool connAckReceived; 00245 bool subAckReceived; 00246 bool unsubAckReceived; 00247 bool pubAckReceived; 00248 bool pubCompReceived; 00249 00250 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00251 unsigned char pubbuf[MAX_MQTT_PACKET_SIZE]; // store the last publish for sending on reconnect 00252 int inflightLen; 00253 unsigned short inflightMsgid; 00254 enum QoS inflightQoS; 00255 #endif 00256 00257 #if MQTTCLIENT_QOS2 00258 bool pubrel; 00259 #if !defined(MAX_INCOMING_QOS2_MESSAGES) 00260 #define MAX_INCOMING_QOS2_MESSAGES 10 00261 #endif 00262 unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES]; 00263 bool isQoS2msgidFree(unsigned short id); 00264 bool useQoS2msgid(unsigned short id); 00265 void freeQoS2msgid(unsigned short id); 00266 #endif 00267 00268 }; 00269 00270 } 00271 00272 00273 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00274 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::cleanSession() 00275 { 00276 ping_outstanding = false; 00277 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00278 messageHandlers[i].topicFilter = 0; 00279 isconnected = false; 00280 00281 connAckReceived = false; 00282 subAckReceived = false; 00283 unsubAckReceived = false; 00284 pubAckReceived = false; 00285 pubCompReceived = false; 00286 00287 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00288 inflightMsgid = 0; 00289 inflightQoS = QOS0; 00290 #endif 00291 00292 #if MQTTCLIENT_QOS2 00293 pubrel = false; 00294 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00295 incomingQoS2messages[i] = 0; 00296 #endif 00297 } 00298 00299 00300 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00301 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms) : ipstack(network), packetid() 00302 { 00303 this->command_timeout_ms = command_timeout_ms; 00304 cleanSession(); 00305 } 00306 00307 00308 #if MQTTCLIENT_QOS2 00309 template<class Network, class Timer, int a, int b> 00310 bool MQTT::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id) 00311 { 00312 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00313 { 00314 if (incomingQoS2messages[i] == id) 00315 return false; 00316 } 00317 return true; 00318 } 00319 00320 00321 template<class Network, class Timer, int a, int b> 00322 bool MQTT::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id) 00323 { 00324 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00325 { 00326 if (incomingQoS2messages[i] == 0) 00327 { 00328 incomingQoS2messages[i] = id; 00329 return true; 00330 } 00331 } 00332 return false; 00333 } 00334 00335 00336 template<class Network, class Timer, int a, int b> 00337 void MQTT::Client<Network, Timer, a, b>::freeQoS2msgid(unsigned short id) 00338 { 00339 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00340 { 00341 if (incomingQoS2messages[i] == id) 00342 { 00343 incomingQoS2messages[i] = 0; 00344 return; 00345 } 00346 } 00347 } 00348 #endif 00349 00350 00351 template<class Network, class Timer, int a, int b> 00352 int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer) 00353 { 00354 int rc = FAILURE, 00355 sent = 0; 00356 00357 while (sent < length && !timer.expired()) 00358 { 00359 rc = ipstack.write(&sendbuf[sent], length - sent, timer.left_ms()); 00360 if (rc < 0) // there was an error writing the data 00361 break; 00362 sent += rc; 00363 } 00364 if (sent == length) 00365 { 00366 if (this->keepAliveInterval > 0) 00367 last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet 00368 rc = SUCCESS; 00369 } 00370 else 00371 rc = FAILURE; 00372 00373 #if defined(MQTT_DEBUG) 00374 char printbuf[150]; 00375 DEBUG("Rc %d from sending packet %s\n", rc, MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length)); 00376 #endif 00377 return rc; 00378 } 00379 00380 00381 template<class Network, class Timer, int a, int b> 00382 int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout) 00383 { 00384 unsigned char c; 00385 int multiplier = 1; 00386 int len = 0; 00387 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; 00388 00389 *value = 0; 00390 do 00391 { 00392 int rc = MQTTPACKET_READ_ERROR; 00393 00394 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) 00395 { 00396 rc = MQTTPACKET_READ_ERROR; /* bad data */ 00397 goto exit; 00398 } 00399 rc = ipstack.read(&c, 1, timeout); 00400 if (rc != 1) 00401 goto exit; 00402 *value += (c & 127) * multiplier; 00403 multiplier *= 128; 00404 } while ((c & 128) != 0); 00405 exit: 00406 return len; 00407 } 00408 00409 00410 /** 00411 * If any read fails in this method, then we should disconnect from the network, as on reconnect 00412 * the packets can be retried. 00413 * @param timeout the max time to wait for the packet read to complete, in milliseconds 00414 * @return the MQTT packet type, or -1 if none 00415 */ 00416 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00417 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::readPacket(Timer& timer) 00418 { 00419 int rc = FAILURE; 00420 MQTTHeader header = {0}; 00421 int len = 0; 00422 int rem_len = 0; 00423 00424 /* 1. read the header byte. This has the packet type in it */ 00425 if (ipstack.read(readbuf, 1, timer.left_ms()) != 1) 00426 goto exit; 00427 00428 len = 1; 00429 /* 2. read the remaining length. This is variable in itself */ 00430 decodePacket(&rem_len, timer.left_ms()); 00431 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */ 00432 00433 if (rem_len > (MAX_MQTT_PACKET_SIZE - len)) 00434 { 00435 rc = BUFFER_OVERFLOW; 00436 goto exit; 00437 } 00438 00439 /* 3. read the rest of the buffer using a callback to supply the rest of the data */ 00440 if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len)) 00441 goto exit; 00442 00443 header.byte = readbuf[0]; 00444 rc = header.bits.type; 00445 if (this->keepAliveInterval > 0) 00446 last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet 00447 exit: 00448 00449 #if defined(MQTT_DEBUG) 00450 if (rc >= 0) 00451 { 00452 char printbuf[50]; 00453 DEBUG("Rc %d from receiving packet %s\n", rc, MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len)); 00454 } 00455 #endif 00456 return rc; 00457 } 00458 00459 00460 // assume topic filter and name is in correct format 00461 // # can only be at end 00462 // + and # can only be next to separator 00463 template<class Network, class Timer, int a, int b> 00464 bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName) 00465 { 00466 char* curf = topicFilter; 00467 char* curn = topicName.lenstring.data; 00468 char* curn_end = curn + topicName.lenstring.len; 00469 00470 while (*curf && curn < curn_end) 00471 { 00472 if (*curn == '/' && *curf != '/') 00473 break; 00474 if (*curf != '+' && *curf != '#' && *curf != *curn) 00475 break; 00476 if (*curf == '+') 00477 { // skip until we meet the next separator, or end of string 00478 char* nextpos = curn + 1; 00479 while (nextpos < curn_end && *nextpos != '/') 00480 nextpos = ++curn + 1; 00481 } 00482 else if (*curf == '#') 00483 curn = curn_end - 1; // skip until end of string 00484 curf++; 00485 curn++; 00486 }; 00487 00488 return (curn == curn_end) && (*curf == '\0'); 00489 } 00490 00491 00492 00493 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00494 int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message) 00495 { 00496 int rc = FAILURE; 00497 // we have to find the right message handler - indexed by topic 00498 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00499 { 00500 if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) || 00501 isTopicMatched((char*)messageHandlers[i].topicFilter, topicName))) 00502 { 00503 if (messageHandlers[i].fp.attached()) 00504 { 00505 MessageData md(topicName, message); 00506 messageHandlers[i].fp(md); 00507 rc = SUCCESS; 00508 } 00509 } 00510 } 00511 00512 if (rc == FAILURE && defaultMessageHandler.attached()) 00513 { 00514 MessageData md(topicName, message); 00515 defaultMessageHandler(md); 00516 rc = SUCCESS; 00517 } 00518 00519 return rc; 00520 } 00521 00522 00523 00524 template<class Network, class Timer, int a, int b> 00525 int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms) 00526 { 00527 int rc = SUCCESS; 00528 Timer timer; 00529 00530 timer.countdown_ms(timeout_ms); 00531 while (!timer.expired()) 00532 { 00533 if (cycle(timer) < 0) 00534 { 00535 rc = FAILURE; 00536 break; 00537 } 00538 } 00539 00540 return rc; 00541 } 00542 00543 00544 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00545 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer) 00546 { 00547 /* get one piece of work off the wire and one pass through */ 00548 00549 // read the socket, see what work is due 00550 int packet_type = readPacket(timer); 00551 00552 int len = 0, 00553 rc = SUCCESS; 00554 00555 switch (packet_type) 00556 { 00557 case FAILURE: 00558 case BUFFER_OVERFLOW: 00559 rc = packet_type; 00560 break; 00561 case CONNACK_MSG: 00562 connAckReceived = true; 00563 break; 00564 case PUBACK_MSG: 00565 pubAckReceived = true; 00566 break; 00567 case SUBACK_MSG: 00568 subAckReceived = true; 00569 break; 00570 case UNSUBACK_MSG: 00571 unsubAckReceived = true; 00572 break; 00573 case PUBLISH_MSG: 00574 { 00575 MQTTString topicName = MQTTString_initializer; 00576 Message msg; 00577 int intQoS; 00578 if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName, 00579 (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00580 goto exit; 00581 msg.qos = (enum QoS)intQoS; 00582 #if MQTTCLIENT_QOS2 00583 if (msg.qos != QOS2) 00584 #endif 00585 deliverMessage(topicName, msg); 00586 #if MQTTCLIENT_QOS2 00587 else if (isQoS2msgidFree(msg.id)) 00588 { 00589 if (useQoS2msgid(msg.id)) 00590 deliverMessage(topicName, msg); 00591 else 00592 WARN("Maximum number of incoming QoS2 messages exceeded"); 00593 } 00594 #endif 00595 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00596 if (msg.qos != QOS0) 00597 { 00598 if (msg.qos == QOS1) 00599 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK_MSG, 0, msg.id); 00600 else if (msg.qos == QOS2) 00601 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC_MSG, 0, msg.id); 00602 if (len <= 0) 00603 rc = FAILURE; 00604 else 00605 rc = sendPacket(len, timer); 00606 if (rc == FAILURE) 00607 goto exit; // there was a problem 00608 } 00609 break; 00610 #endif 00611 } 00612 #if MQTTCLIENT_QOS2 00613 case PUBREC_MSG: 00614 case PUBREL_MSG: 00615 unsigned short mypacketid; 00616 unsigned char dup, type; 00617 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00618 rc = FAILURE; 00619 else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, 00620 (packet_type == PUBREC_MSG) ? PUBREL_MSG : PUBCOMP_MSG, 0, mypacketid)) <= 0) 00621 rc = FAILURE; 00622 else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL_MSG packet 00623 rc = FAILURE; // there was a problem 00624 if (rc == FAILURE) 00625 goto exit; // there was a problem 00626 if (packet_type == PUBREL_MSG) 00627 freeQoS2msgid(mypacketid); 00628 break; 00629 00630 case PUBCOMP_MSG: 00631 pubCompReceived = true; 00632 break; 00633 #endif 00634 case PINGRESP_MSG: 00635 ping_outstanding = false; 00636 break; 00637 } 00638 keepalive(); 00639 exit: 00640 if (rc == SUCCESS) 00641 rc = packet_type; 00642 return rc; 00643 } 00644 00645 00646 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00647 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive() 00648 { 00649 int rc = FAILURE; 00650 00651 if (keepAliveInterval == 0) 00652 { 00653 rc = SUCCESS; 00654 goto exit; 00655 } 00656 00657 if (last_sent.expired() || last_received.expired()) 00658 { 00659 if (!ping_outstanding) 00660 { 00661 Timer timer(1000); 00662 int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE); 00663 if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet 00664 { 00665 ping_response.countdown(this->keepAliveInterval); 00666 ping_outstanding = true; 00667 } 00668 } 00669 else if(ping_response.expired()) 00670 { 00671 isconnected = false; 00672 } 00673 } 00674 00675 exit: 00676 return rc; 00677 } 00678 00679 00680 // only used in single-threaded mode where one command at a time is in process 00681 template<class Network, class Timer, int a, int b> 00682 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer) 00683 { 00684 int rc = FAILURE; 00685 00686 // Use bool values to determine if a packet type has been received. This only works if waitfor is 00687 // called once at a time per type. However, it can be called with a different type at the same 00688 // time, for instance, while waiting for a subscription acknowledgement (SUBACK_MSG) we could 00689 // publish a QoS1 message and wait for the acknowledgement (PUBACK_MSG). 00690 switch(packet_type) 00691 { 00692 case CONNACK_MSG: 00693 connAckReceived = false; 00694 break; 00695 case SUBACK_MSG: 00696 subAckReceived = false; 00697 break; 00698 case UNSUBACK_MSG: 00699 unsubAckReceived = false; 00700 break; 00701 case PUBACK_MSG: 00702 pubAckReceived = false; 00703 break; 00704 case PUBCOMP_MSG: 00705 pubCompReceived = false; 00706 break; 00707 } 00708 00709 do 00710 { 00711 switch(packet_type) 00712 { 00713 case CONNACK_MSG: 00714 if(connAckReceived) { 00715 connAckReceived = false; 00716 return packet_type; 00717 } 00718 break; 00719 case SUBACK_MSG: 00720 if(subAckReceived) { 00721 subAckReceived = false; 00722 return packet_type; 00723 } 00724 break; 00725 case UNSUBACK_MSG: 00726 if(unsubAckReceived) { 00727 unsubAckReceived = false; 00728 return packet_type; 00729 } 00730 break; 00731 case PUBACK_MSG: 00732 if(pubAckReceived) { 00733 pubAckReceived = false; 00734 return packet_type; 00735 } 00736 break; 00737 case PUBCOMP_MSG: 00738 if(pubCompReceived) { 00739 pubCompReceived = false; 00740 return packet_type; 00741 } 00742 break; 00743 } 00744 if (timer.expired()) 00745 break; // we timed out 00746 cycle(timer); 00747 } 00748 while (true); 00749 00750 return rc; 00751 } 00752 00753 00754 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00755 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options) 00756 { 00757 Timer connect_timer(command_timeout_ms); 00758 int rc = FAILURE; 00759 int len = 0; 00760 00761 if (isconnected) // don't send connect packet again if we are already connected 00762 goto exit; 00763 00764 this->keepAliveInterval = options.keepAliveInterval; 00765 this->cleansession = options.cleansession; 00766 if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0) 00767 goto exit; 00768 if ((rc = sendPacket(len, connect_timer)) != SUCCESS) // send the connect packet 00769 goto exit; // there was a problem 00770 00771 if (this->keepAliveInterval > 0) 00772 last_received.countdown(this->keepAliveInterval); 00773 // this will be a blocking call, wait for the connack 00774 if (waitfor(CONNACK_MSG, connect_timer) == CONNACK_MSG) 00775 { 00776 unsigned char connack_rc = 255; 00777 bool sessionPresent = false; 00778 if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00779 rc = connack_rc; 00780 else 00781 rc = FAILURE; 00782 } 00783 else 00784 rc = FAILURE; 00785 00786 #if MQTTCLIENT_QOS2 00787 // resend any inflight publish 00788 if (inflightMsgid > 0 && inflightQoS == QOS2 && pubrel) 00789 { 00790 if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL_MSG, 0, inflightMsgid)) <= 0) 00791 rc = FAILURE; 00792 else 00793 rc = publish(len, connect_timer, inflightQoS); 00794 } 00795 else 00796 #endif 00797 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00798 if (inflightMsgid > 0) 00799 { 00800 memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE); 00801 rc = publish(inflightLen, connect_timer, inflightQoS); 00802 } 00803 #endif 00804 00805 exit: 00806 if (rc == SUCCESS) 00807 isconnected = true; 00808 return rc; 00809 } 00810 00811 00812 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00813 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect() 00814 { 00815 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; 00816 return connect(default_options); 00817 } 00818 00819 00820 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00821 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler) 00822 { 00823 int rc = FAILURE; 00824 Timer timer(command_timeout_ms); 00825 int len = 0; 00826 MQTTString topic = {(char*)topicFilter, {0, 0}}; 00827 00828 if (!isconnected) 00829 goto exit; 00830 00831 len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); 00832 if (len <= 0) 00833 goto exit; 00834 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet 00835 goto exit; // there was a problem 00836 00837 if (waitfor(SUBACK_MSG, timer) == SUBACK_MSG) // wait for suback 00838 { 00839 int count = 0, grantedQoS = -1; 00840 unsigned short mypacketid; 00841 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00842 rc = grantedQoS; // 0, 1, 2 or 0x80 00843 if (rc != 0x80) 00844 { 00845 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00846 { 00847 if (messageHandlers[i].topicFilter == 0) 00848 { 00849 messageHandlers[i].topicFilter = topicFilter; 00850 messageHandlers[i].fp.attach(messageHandler); 00851 rc = 0; 00852 break; 00853 } 00854 } 00855 } 00856 } 00857 else 00858 rc = FAILURE; 00859 00860 exit: 00861 if (rc != SUCCESS) 00862 cleanSession(); 00863 return rc; 00864 } 00865 00866 00867 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00868 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter) 00869 { 00870 int rc = FAILURE; 00871 Timer timer(command_timeout_ms); 00872 MQTTString topic = {(char*)topicFilter, {0, 0}}; 00873 int len = 0; 00874 00875 if (!isconnected) 00876 goto exit; 00877 00878 if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0) 00879 goto exit; 00880 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet 00881 goto exit; // there was a problem 00882 00883 if (waitfor(UNSUBACK_MSG, timer) == UNSUBACK_MSG) 00884 { 00885 unsigned short mypacketid; // should be the same as the packetid above 00886 if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00887 { 00888 rc = 0; 00889 00890 // remove the subscription message handler associated with this topic, if there is one 00891 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00892 { 00893 if (messageHandlers[i].topicFilter != 0 && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0) 00894 { 00895 messageHandlers[i].topicFilter = 0; 00896 break; 00897 } 00898 } 00899 } 00900 } 00901 else 00902 rc = FAILURE; 00903 00904 exit: 00905 if (rc != SUCCESS) 00906 cleanSession(); 00907 return rc; 00908 } 00909 00910 00911 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00912 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos) 00913 { 00914 int rc; 00915 00916 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet 00917 goto exit; // there was a problem 00918 00919 #if MQTTCLIENT_QOS1 00920 if (qos == QOS1) 00921 { 00922 if (waitfor(PUBACK_MSG, timer) == PUBACK_MSG) 00923 { 00924 unsigned short mypacketid; 00925 unsigned char dup, type; 00926 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00927 rc = FAILURE; 00928 else if (inflightMsgid == mypacketid) 00929 inflightMsgid = 0; 00930 } 00931 else 00932 rc = FAILURE; 00933 } 00934 #endif 00935 #if MQTTCLIENT_QOS2 00936 if (qos == QOS2) 00937 { 00938 if (waitfor(PUBCOMP_MSG, timer) == PUBCOMP_MSG) 00939 { 00940 unsigned short mypacketid; 00941 unsigned char dup, type; 00942 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00943 rc = FAILURE; 00944 else if (inflightMsgid == mypacketid) 00945 inflightMsgid = 0; 00946 } 00947 else 00948 rc = FAILURE; 00949 } 00950 #endif 00951 00952 exit: 00953 if (rc != SUCCESS) 00954 cleanSession(); 00955 return rc; 00956 } 00957 00958 00959 00960 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00961 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained) 00962 { 00963 int rc = FAILURE; 00964 Timer timer(command_timeout_ms); 00965 MQTTString topicString = MQTTString_initializer; 00966 int len = 0; 00967 00968 if (!isconnected) 00969 goto exit; 00970 00971 topicString.cstring = (char*)topicName; 00972 00973 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00974 if (qos == QOS1 || qos == QOS2) 00975 id = packetid.getNext(); 00976 #endif 00977 00978 len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id, 00979 topicString, (unsigned char*)payload, payloadlen); 00980 if (len <= 0) 00981 goto exit; 00982 00983 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00984 if (!cleansession) 00985 { 00986 memcpy(pubbuf, sendbuf, len); 00987 inflightMsgid = id; 00988 inflightLen = len; 00989 inflightQoS = qos; 00990 #if MQTTCLIENT_QOS2 00991 pubrel = false; 00992 #endif 00993 } 00994 #endif 00995 00996 rc = publish(len, timer, qos); 00997 exit: 00998 return rc; 00999 } 01000 01001 01002 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 01003 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained) 01004 { 01005 unsigned short id = 0; // dummy - not used for anything 01006 return publish(topicName, payload, payloadlen, id, qos, retained); 01007 } 01008 01009 01010 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 01011 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message) 01012 { 01013 return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained); 01014 } 01015 01016 01017 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 01018 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect() 01019 { 01020 int rc = FAILURE; 01021 Timer timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete 01022 int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE); 01023 if (len > 0) 01024 rc = sendPacket(len, timer); // send the disconnect packet 01025 01026 if (cleansession) 01027 cleanSession(); 01028 else 01029 isconnected = false; 01030 return rc; 01031 } 01032 01033 01034 #endif
Generated on Tue Jul 12 2022 21:31:38 by
1.7.2
