Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Fork of mbed-os-example-mbed5-blinky by
MQTTClient.h
00001 /******************************************************************************* 00002 * Copyright (c) 2014, 2015 IBM Corp. 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Ian Craggs - initial API and implementation and/or initial documentation 00015 * Ian Craggs - fix for bug 458512 - QoS 2 messages 00016 * Ian Craggs - fix for bug 460389 - send loop uses wrong length 00017 * Ian Craggs - fix for bug 464169 - clearing subscriptions 00018 * Ian Craggs - fix for bug 464551 - enums and ints can be different size 00019 *******************************************************************************/ 00020 00021 #if !defined(MQTTCLIENT_H) 00022 #define MQTTCLIENT_H 00023 00024 #include "FP.h" 00025 #include "MQTTPacket.h" 00026 #include "stdio.h" 00027 #include "MQTTLogging.h" 00028 00029 #if !defined(MQTTCLIENT_QOS1) 00030 #define MQTTCLIENT_QOS1 1 00031 #endif 00032 #if !defined(MQTTCLIENT_QOS2) 00033 #define MQTTCLIENT_QOS2 0 00034 #endif 00035 00036 namespace MQTT 00037 { 00038 00039 00040 enum QoS { QOS0, QOS1, QOS2 }; 00041 00042 // all failure return codes must be negative 00043 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 }; 00044 00045 00046 struct Message 00047 { 00048 enum QoS qos; 00049 bool retained; 00050 bool dup; 00051 unsigned short id; 00052 void *payload; 00053 size_t payloadlen; 00054 }; 00055 00056 00057 struct MessageData 00058 { 00059 MessageData(MQTTString &aTopicName, struct Message &aMessage) : message(aMessage), topicName(aTopicName) 00060 { } 00061 00062 struct Message &message; 00063 MQTTString &topicName; 00064 }; 00065 00066 00067 class PacketId 00068 { 00069 public: 00070 PacketId() 00071 { 00072 next = 0; 00073 } 00074 00075 int getNext() 00076 { 00077 return next = (next == MAX_PACKET_ID) ? 1 : ++next; 00078 } 00079 00080 private: 00081 static const int MAX_PACKET_ID = 65535; 00082 int next; 00083 }; 00084 00085 00086 /** 00087 * @class Client 00088 * @brief blocking, non-threaded MQTT client API 00089 * 00090 * This version of the API blocks on all method calls, until they are complete. This means that only one 00091 * MQTT request can be in process at any one time. 00092 * @param Network a network class which supports send, receive 00093 * @param Timer a timer class with the methods: 00094 */ 00095 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5> 00096 class Client 00097 { 00098 00099 public: 00100 00101 typedef void (*messageHandler)(MessageData&); 00102 00103 /** Construct the client 00104 * @param network - pointer to an instance of the Network class - must be connected to the endpoint 00105 * before calling MQTT connect 00106 * @param limits an instance of the Limit class - to alter limits as required 00107 */ 00108 Client(Network& network, unsigned int command_timeout_ms = 30000); 00109 00110 /** Set the default message handling callback - used for any message which does not match a subscription message handler 00111 * @param mh - pointer to the callback function 00112 */ 00113 void setDefaultMessageHandler(messageHandler mh) 00114 { 00115 defaultMessageHandler.attach(mh); 00116 } 00117 00118 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00119 * The nework object must be connected to the network endpoint before calling this 00120 * Default connect options are used 00121 * @return success code - 00122 */ 00123 int connect(); 00124 00125 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00126 * The nework object must be connected to the network endpoint before calling this 00127 * @param options - connect options 00128 * @return success code - 00129 */ 00130 int connect(MQTTPacket_connectData& options); 00131 00132 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00133 * @param topic - the topic to publish to 00134 * @param message - the message to send 00135 * @return success code - 00136 */ 00137 int publish(const char* topicName, Message& message); 00138 00139 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00140 * @param topic - the topic to publish to 00141 * @param payload - the data to send 00142 * @param payloadlen - the length of the data 00143 * @param qos - the QoS to send the publish at 00144 * @param retained - whether the message should be retained 00145 * @return success code - 00146 */ 00147 int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false); 00148 00149 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00150 * @param topic - the topic to publish to 00151 * @param payload - the data to send 00152 * @param payloadlen - the length of the data 00153 * @param id - the packet id used - returned 00154 * @param qos - the QoS to send the publish at 00155 * @param retained - whether the message should be retained 00156 * @return success code - 00157 */ 00158 int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false); 00159 00160 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback 00161 * @param topicFilter - a topic pattern which can include wildcards 00162 * @param qos - the MQTT QoS to subscribe at 00163 * @param mh - the callback function to be invoked when a message is received for this subscription 00164 * @return success code - 00165 */ 00166 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh); 00167 00168 /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback 00169 * @param topicFilter - a topic pattern which can include wildcards 00170 * @return success code - 00171 */ 00172 int unsubscribe(const char* topicFilter); 00173 00174 /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state 00175 * @return success code - 00176 */ 00177 int disconnect(); 00178 00179 /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive 00180 * yield can be called if no other MQTT operation is needed. This will also allow messages to be 00181 * received. 00182 * @param timeout_ms the time to wait, in milliseconds 00183 * @return success code - on failure, this means the client has disconnected 00184 */ 00185 int yield(unsigned long timeout_ms = 1000L); 00186 00187 /** Is the client connected? 00188 * @return flag - is the client connected or not? 00189 */ 00190 bool isConnected() 00191 { 00192 return isconnected; 00193 } 00194 00195 private: 00196 00197 void cleanSession(); 00198 int cycle(Timer& timer); 00199 int waitfor(int packet_type, Timer& timer); 00200 int keepalive(); 00201 int publish(int len, Timer& timer, enum QoS qos); 00202 00203 int decodePacket(int* value, int timeout); 00204 int readPacket(Timer& timer); 00205 int sendPacket(int length, Timer& timer); 00206 int deliverMessage(MQTTString& topicName, Message& message); 00207 bool isTopicMatched(char* topicFilter, MQTTString& topicName); 00208 00209 Network& ipstack; 00210 unsigned long command_timeout_ms; 00211 00212 unsigned char sendbuf[MAX_MQTT_PACKET_SIZE]; 00213 unsigned char readbuf[MAX_MQTT_PACKET_SIZE]; 00214 00215 Timer last_sent, last_received; 00216 unsigned int keepAliveInterval; 00217 bool ping_outstanding; 00218 bool cleansession; 00219 00220 PacketId packetid; 00221 00222 struct MessageHandlers 00223 { 00224 const char* topicFilter; 00225 FP<void, MessageData&> fp; 00226 } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic 00227 00228 FP<void, MessageData&> defaultMessageHandler; 00229 00230 bool isconnected; 00231 00232 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00233 unsigned char pubbuf[MAX_MQTT_PACKET_SIZE]; // store the last publish for sending on reconnect 00234 int inflightLen; 00235 unsigned short inflightMsgid; 00236 enum QoS inflightQoS; 00237 #endif 00238 00239 #if MQTTCLIENT_QOS2 00240 bool pubrel; 00241 #if !defined(MAX_INCOMING_QOS2_MESSAGES) 00242 #define MAX_INCOMING_QOS2_MESSAGES 10 00243 #endif 00244 unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES]; 00245 bool isQoS2msgidFree(unsigned short id); 00246 bool useQoS2msgid(unsigned short id); 00247 void freeQoS2msgid(unsigned short id); 00248 #endif 00249 00250 }; 00251 00252 } 00253 00254 00255 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00256 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::cleanSession() 00257 { 00258 ping_outstanding = false; 00259 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00260 messageHandlers[i].topicFilter = 0; 00261 isconnected = false; 00262 00263 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00264 inflightMsgid = 0; 00265 inflightQoS = QOS0; 00266 #endif 00267 00268 #if MQTTCLIENT_QOS2 00269 pubrel = false; 00270 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00271 incomingQoS2messages[i] = 0; 00272 #endif 00273 } 00274 00275 00276 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00277 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms) : ipstack(network), packetid() 00278 { 00279 last_sent = Timer(); 00280 last_received = Timer(); 00281 this->command_timeout_ms = command_timeout_ms; 00282 cleanSession(); 00283 } 00284 00285 00286 #if MQTTCLIENT_QOS2 00287 template<class Network, class Timer, int a, int b> 00288 bool MQTT::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id) 00289 { 00290 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00291 { 00292 if (incomingQoS2messages[i] == id) 00293 return false; 00294 } 00295 return true; 00296 } 00297 00298 00299 template<class Network, class Timer, int a, int b> 00300 bool MQTT::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id) 00301 { 00302 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00303 { 00304 if (incomingQoS2messages[i] == 0) 00305 { 00306 incomingQoS2messages[i] = id; 00307 return true; 00308 } 00309 } 00310 return false; 00311 } 00312 00313 00314 template<class Network, class Timer, int a, int b> 00315 void MQTT::Client<Network, Timer, a, b>::freeQoS2msgid(unsigned short id) 00316 { 00317 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00318 { 00319 if (incomingQoS2messages[i] == id) 00320 { 00321 incomingQoS2messages[i] = 0; 00322 return; 00323 } 00324 } 00325 } 00326 #endif 00327 00328 00329 template<class Network, class Timer, int a, int b> 00330 int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer) 00331 { 00332 int rc = FAILURE, 00333 sent = 0; 00334 00335 while (sent < length && !timer.expired()) 00336 { 00337 rc = ipstack.write(&sendbuf[sent], length - sent, timer.left_ms()); 00338 if (rc < 0) // there was an error writing the data 00339 break; 00340 sent += rc; 00341 } 00342 if (sent == length) 00343 { 00344 if (this->keepAliveInterval > 0) 00345 last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet 00346 rc = SUCCESS; 00347 } 00348 else 00349 rc = FAILURE; 00350 00351 #if defined(MQTT_DEBUG) 00352 char printbuf[150]; 00353 DEBUG("Rc %d from sending packet %s\n", rc, MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length)); 00354 #endif 00355 return rc; 00356 } 00357 00358 00359 template<class Network, class Timer, int a, int b> 00360 int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout) 00361 { 00362 unsigned char c; 00363 int multiplier = 1; 00364 int len = 0; 00365 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; 00366 00367 *value = 0; 00368 do 00369 { 00370 int rc = MQTTPACKET_READ_ERROR; 00371 00372 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) 00373 { 00374 rc = MQTTPACKET_READ_ERROR; /* bad data */ 00375 goto exit; 00376 } 00377 rc = ipstack.read(&c, 1, timeout); 00378 if (rc != 1) 00379 goto exit; 00380 *value += (c & 127) * multiplier; 00381 multiplier *= 128; 00382 } while ((c & 128) != 0); 00383 exit: 00384 return len; 00385 } 00386 00387 00388 /** 00389 * If any read fails in this method, then we should disconnect from the network, as on reconnect 00390 * the packets can be retried. 00391 * @param timeout the max time to wait for the packet read to complete, in milliseconds 00392 * @return the MQTT packet type, or -1 if none 00393 */ 00394 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00395 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::readPacket(Timer& timer) 00396 { 00397 int rc = FAILURE; 00398 MQTTHeader header = {0}; 00399 int len = 0; 00400 int rem_len = 0; 00401 00402 /* 1. read the header byte. This has the packet type in it */ 00403 if (ipstack.read(readbuf, 1, timer.left_ms()) != 1) 00404 goto exit; 00405 00406 len = 1; 00407 /* 2. read the remaining length. This is variable in itself */ 00408 decodePacket(&rem_len, timer.left_ms()); 00409 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */ 00410 00411 if (rem_len > (MAX_MQTT_PACKET_SIZE - len)) 00412 { 00413 rc = BUFFER_OVERFLOW; 00414 goto exit; 00415 } 00416 00417 /* 3. read the rest of the buffer using a callback to supply the rest of the data */ 00418 if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len)) 00419 goto exit; 00420 00421 header.byte = readbuf[0]; 00422 rc = header.bits.type; 00423 if (this->keepAliveInterval > 0) 00424 last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet 00425 exit: 00426 00427 #if defined(MQTT_DEBUG) 00428 if (rc >= 0) 00429 { 00430 char printbuf[50]; 00431 DEBUG("Rc %d from receiving packet %s\n", rc, MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len)); 00432 } 00433 #endif 00434 return rc; 00435 } 00436 00437 00438 // assume topic filter and name is in correct format 00439 // # can only be at end 00440 // + and # can only be next to separator 00441 template<class Network, class Timer, int a, int b> 00442 bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName) 00443 { 00444 char* curf = topicFilter; 00445 char* curn = topicName.lenstring.data; 00446 char* curn_end = curn + topicName.lenstring.len; 00447 00448 while (*curf && curn < curn_end) 00449 { 00450 if (*curn == '/' && *curf != '/') 00451 break; 00452 if (*curf != '+' && *curf != '#' && *curf != *curn) 00453 break; 00454 if (*curf == '+') 00455 { // skip until we meet the next separator, or end of string 00456 char* nextpos = curn + 1; 00457 while (nextpos < curn_end && *nextpos != '/') 00458 nextpos = ++curn + 1; 00459 } 00460 else if (*curf == '#') 00461 curn = curn_end - 1; // skip until end of string 00462 curf++; 00463 curn++; 00464 }; 00465 00466 return (curn == curn_end) && (*curf == '\0'); 00467 } 00468 00469 00470 00471 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00472 int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message) 00473 { 00474 int rc = FAILURE; 00475 00476 // we have to find the right message handler - indexed by topic 00477 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00478 { 00479 if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) || 00480 isTopicMatched((char*)messageHandlers[i].topicFilter, topicName))) 00481 { 00482 if (messageHandlers[i].fp.attached()) 00483 { 00484 MessageData md(topicName, message); 00485 messageHandlers[i].fp(md); 00486 rc = SUCCESS; 00487 } 00488 } 00489 } 00490 00491 if (rc == FAILURE && defaultMessageHandler.attached()) 00492 { 00493 MessageData md(topicName, message); 00494 defaultMessageHandler(md); 00495 rc = SUCCESS; 00496 } 00497 00498 return rc; 00499 } 00500 00501 00502 00503 template<class Network, class Timer, int a, int b> 00504 int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms) 00505 { 00506 int rc = SUCCESS; 00507 Timer timer = Timer(); 00508 00509 timer.countdown_ms(timeout_ms); 00510 while (!timer.expired()) 00511 { 00512 if (cycle(timer) < 0) 00513 { 00514 rc = FAILURE; 00515 break; 00516 } 00517 Thread::wait(10); 00518 } 00519 00520 return rc; 00521 } 00522 00523 00524 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00525 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer) 00526 { 00527 /* get one piece of work off the wire and one pass through */ 00528 00529 // read the socket, see what work is due 00530 int packet_type = readPacket(timer); 00531 00532 int len = 0, 00533 rc = SUCCESS; 00534 00535 switch (packet_type) 00536 { 00537 case FAILURE: 00538 case BUFFER_OVERFLOW: 00539 rc = packet_type; 00540 break; 00541 case CONNACK: 00542 case PUBACK: 00543 case SUBACK: 00544 break; 00545 case PUBLISH: 00546 { 00547 MQTTString topicName = MQTTString_initializer; 00548 Message msg; 00549 int intQoS; 00550 if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName, 00551 (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00552 goto exit; 00553 msg.qos = (enum QoS)intQoS; 00554 #if MQTTCLIENT_QOS2 00555 if (msg.qos != QOS2) 00556 #endif 00557 deliverMessage(topicName, msg); 00558 #if MQTTCLIENT_QOS2 00559 else if (isQoS2msgidFree(msg.id)) 00560 { 00561 if (useQoS2msgid(msg.id)) 00562 deliverMessage(topicName, msg); 00563 else 00564 WARN("Maximum number of incoming QoS2 messages exceeded"); 00565 } 00566 #endif 00567 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00568 if (msg.qos != QOS0) 00569 { 00570 if (msg.qos == QOS1) 00571 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id); 00572 else if (msg.qos == QOS2) 00573 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id); 00574 if (len <= 0) 00575 rc = FAILURE; 00576 else 00577 rc = sendPacket(len, timer); 00578 if (rc == FAILURE) 00579 goto exit; // there was a problem 00580 } 00581 break; 00582 #endif 00583 } 00584 #if MQTTCLIENT_QOS2 00585 case PUBREC: 00586 case PUBREL: 00587 unsigned short mypacketid; 00588 unsigned char dup, type; 00589 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00590 rc = FAILURE; 00591 else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, 00592 (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0) 00593 rc = FAILURE; 00594 else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet 00595 rc = FAILURE; // there was a problem 00596 if (rc == FAILURE) 00597 goto exit; // there was a problem 00598 if (packet_type == PUBREL) 00599 freeQoS2msgid(mypacketid); 00600 break; 00601 00602 case PUBCOMP: 00603 break; 00604 #endif 00605 case PINGRESP: 00606 ping_outstanding = false; 00607 break; 00608 } 00609 keepalive(); 00610 exit: 00611 if (rc == SUCCESS) 00612 rc = packet_type; 00613 return rc; 00614 } 00615 00616 00617 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00618 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive() 00619 { 00620 int rc = FAILURE; 00621 00622 if (keepAliveInterval == 0) 00623 { 00624 rc = SUCCESS; 00625 goto exit; 00626 } 00627 00628 if (last_sent.expired() || last_received.expired()) 00629 { 00630 if (!ping_outstanding) 00631 { 00632 Timer timer(1000); 00633 int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE); 00634 if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet 00635 ping_outstanding = true; 00636 } 00637 } 00638 00639 exit: 00640 return rc; 00641 } 00642 00643 00644 // only used in single-threaded mode where one command at a time is in process 00645 template<class Network, class Timer, int a, int b> 00646 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer) 00647 { 00648 int rc = FAILURE; 00649 00650 do 00651 { 00652 if (timer.expired()) 00653 break; // we timed out 00654 } 00655 while ((rc = cycle(timer)) != packet_type); 00656 00657 return rc; 00658 } 00659 00660 00661 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00662 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options) 00663 { 00664 Timer connect_timer(command_timeout_ms); 00665 int rc = FAILURE; 00666 int len = 0; 00667 00668 if (isconnected) // don't send connect packet again if we are already connected 00669 goto exit; 00670 00671 this->keepAliveInterval = options.keepAliveInterval; 00672 this->cleansession = options.cleansession; 00673 if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0) 00674 goto exit; 00675 if ((rc = sendPacket(len, connect_timer)) != SUCCESS) // send the connect packet 00676 goto exit; // there was a problem 00677 00678 if (this->keepAliveInterval > 0) 00679 last_received.countdown(this->keepAliveInterval); 00680 // this will be a blocking call, wait for the connack 00681 if (waitfor(CONNACK, connect_timer) == CONNACK) 00682 { 00683 unsigned char connack_rc = 255; 00684 bool sessionPresent = false; 00685 if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00686 rc = connack_rc; 00687 else 00688 rc = FAILURE; 00689 } 00690 else 00691 rc = FAILURE; 00692 00693 #if MQTTCLIENT_QOS2 00694 // resend any inflight publish 00695 if (inflightMsgid > 0 && inflightQoS == QOS2 && pubrel) 00696 { 00697 if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0) 00698 rc = FAILURE; 00699 else 00700 rc = publish(len, connect_timer, inflightQoS); 00701 } 00702 else 00703 #endif 00704 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00705 if (inflightMsgid > 0) 00706 { 00707 memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE); 00708 rc = publish(inflightLen, connect_timer, inflightQoS); 00709 } 00710 #endif 00711 00712 exit: 00713 if (rc == SUCCESS) 00714 isconnected = true; 00715 return rc; 00716 } 00717 00718 00719 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00720 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect() 00721 { 00722 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; 00723 return connect(default_options); 00724 } 00725 00726 00727 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00728 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler) 00729 { 00730 int rc = FAILURE; 00731 Timer timer(command_timeout_ms); 00732 int len = 0; 00733 MQTTString topic = {(char*)topicFilter, {0, 0}}; 00734 00735 if (!isconnected) 00736 goto exit; 00737 00738 len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); 00739 if (len <= 0) 00740 goto exit; 00741 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet 00742 goto exit; // there was a problem 00743 00744 if (waitfor(SUBACK, timer) == SUBACK) // wait for suback 00745 { 00746 int count = 0, grantedQoS = -1; 00747 unsigned short mypacketid; 00748 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00749 rc = grantedQoS; // 0, 1, 2 or 0x80 00750 if (rc != 0x80) 00751 { 00752 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00753 { 00754 if (messageHandlers[i].topicFilter == 0) 00755 { 00756 messageHandlers[i].topicFilter = topicFilter; 00757 messageHandlers[i].fp.attach(messageHandler); 00758 rc = 0; 00759 break; 00760 } 00761 } 00762 } 00763 } 00764 else 00765 rc = FAILURE; 00766 00767 exit: 00768 if (rc != SUCCESS) 00769 cleanSession(); 00770 return rc; 00771 } 00772 00773 00774 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00775 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter) 00776 { 00777 int rc = FAILURE; 00778 Timer timer(command_timeout_ms); 00779 MQTTString topic = {(char*)topicFilter, {0, 0}}; 00780 int len = 0; 00781 00782 if (!isconnected) 00783 goto exit; 00784 00785 if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0) 00786 goto exit; 00787 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet 00788 goto exit; // there was a problem 00789 00790 if (waitfor(UNSUBACK, timer) == UNSUBACK) 00791 { 00792 unsigned short mypacketid; // should be the same as the packetid above 00793 if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00794 { 00795 rc = 0; 00796 00797 // remove the subscription message handler associated with this topic, if there is one 00798 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00799 { 00800 if (messageHandlers[i].topicFilter && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0) 00801 { 00802 messageHandlers[i].topicFilter = 0; 00803 break; 00804 } 00805 } 00806 } 00807 } 00808 else 00809 rc = FAILURE; 00810 00811 exit: 00812 if (rc != SUCCESS) 00813 cleanSession(); 00814 return rc; 00815 } 00816 00817 00818 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00819 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos) 00820 { 00821 int rc; 00822 00823 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet 00824 goto exit; // there was a problem 00825 00826 #if MQTTCLIENT_QOS1 00827 if (qos == QOS1) 00828 { 00829 if (waitfor(PUBACK, timer) == PUBACK) 00830 { 00831 unsigned short mypacketid; 00832 unsigned char dup, type; 00833 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00834 rc = FAILURE; 00835 else if (inflightMsgid == mypacketid) 00836 inflightMsgid = 0; 00837 } 00838 else 00839 rc = FAILURE; 00840 } 00841 #elif MQTTCLIENT_QOS2 00842 else if (qos == QOS2) 00843 { 00844 if (waitfor(PUBCOMP, timer) == PUBCOMP) 00845 { 00846 unsigned short mypacketid; 00847 unsigned char dup, type; 00848 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00849 rc = FAILURE; 00850 else if (inflightMsgid == mypacketid) 00851 inflightMsgid = 0; 00852 } 00853 else 00854 rc = FAILURE; 00855 } 00856 #endif 00857 00858 exit: 00859 if (rc != SUCCESS) 00860 cleanSession(); 00861 return rc; 00862 } 00863 00864 00865 00866 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00867 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained) 00868 { 00869 int rc = FAILURE; 00870 Timer timer(command_timeout_ms); 00871 MQTTString topicString = MQTTString_initializer; 00872 int len = 0; 00873 00874 if (!isconnected) 00875 goto exit; 00876 00877 topicString.cstring = (char*)topicName; 00878 00879 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00880 if (qos == QOS1 || qos == QOS2) 00881 id = packetid.getNext(); 00882 #endif 00883 00884 len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id, 00885 topicString, (unsigned char*)payload, payloadlen); 00886 if (len <= 0) 00887 goto exit; 00888 00889 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00890 if (!cleansession) 00891 { 00892 memcpy(pubbuf, sendbuf, len); 00893 inflightMsgid = id; 00894 inflightLen = len; 00895 inflightQoS = qos; 00896 #if MQTTCLIENT_QOS2 00897 pubrel = false; 00898 #endif 00899 } 00900 #endif 00901 00902 rc = publish(len, timer, qos); 00903 exit: 00904 return rc; 00905 } 00906 00907 00908 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00909 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained) 00910 { 00911 unsigned short id = 0; // dummy - not used for anything 00912 return publish(topicName, payload, payloadlen, id, qos, retained); 00913 } 00914 00915 00916 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00917 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message) 00918 { 00919 return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained); 00920 } 00921 00922 00923 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00924 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect() 00925 { 00926 int rc = FAILURE; 00927 Timer timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete 00928 int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE); 00929 if (len > 0) 00930 rc = sendPacket(len, timer); // send the disconnect packet 00931 00932 if (cleansession) 00933 cleanSession(); 00934 else 00935 isconnected = false; 00936 return rc; 00937 } 00938 00939 00940 #endif
Generated on Wed Jul 13 2022 21:16:17 by
