Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependents: mbed-os-example-wifi-milkcocoa MilkcocoaOsSample_Eth MilkcocoaOsSample_ESP8266 MilkcocoaOsSample_Eth_DigitalIn
MQTTClient.h
00001 /******************************************************************************* 00002 * Copyright (c) 2014, 2015 IBM Corp. 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Ian Craggs - initial API and implementation and/or initial documentation 00015 * Ian Craggs - fix for bug 458512 - QoS 2 messages 00016 * Ian Craggs - fix for bug 460389 - send loop uses wrong length 00017 * Ian Craggs - fix for bug 464169 - clearing subscriptions 00018 * Ian Craggs - fix for bug 464551 - enums and ints can be different size 00019 *******************************************************************************/ 00020 00021 #if !defined(MQTTCLIENT_H) 00022 #define MQTTCLIENT_H 00023 00024 #include "FP.h" 00025 #include "MQTTPacket.h" 00026 #include "stdio.h" 00027 #include "MQTTLogging.h" 00028 #include "OldTimer.h" 00029 00030 #if !defined(MQTTCLIENT_QOS1) 00031 #define MQTTCLIENT_QOS1 1 00032 #endif 00033 #if !defined(MQTTCLIENT_QOS2) 00034 #define MQTTCLIENT_QOS2 0 00035 #endif 00036 00037 #if 1 00038 extern RawSerial pc; 00039 00040 #define DBG(x) x 00041 #else 00042 #define DBG(x) 00043 #endif 00044 00045 namespace MQTT 00046 { 00047 00048 00049 enum QoS { QOS0, QOS1, QOS2 }; 00050 00051 // all failure return codes must be negative 00052 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 }; 00053 00054 00055 struct Message 00056 { 00057 enum QoS qos; 00058 bool retained; 00059 bool dup; 00060 unsigned short id; 00061 void *payload; 00062 size_t payloadlen; 00063 }; 00064 00065 00066 struct MessageData 00067 { 00068 MessageData(MQTTString &aTopicName, struct Message &aMessage) : message(aMessage), topicName(aTopicName) 00069 { } 00070 00071 struct Message &message; 00072 MQTTString &topicName; 00073 }; 00074 00075 00076 class PacketId 00077 { 00078 public: 00079 PacketId() 00080 { 00081 next = 0; 00082 } 00083 00084 int getNext() 00085 { 00086 return next = (next == MAX_PACKET_ID) ? 1 : ++next; 00087 } 00088 00089 private: 00090 static const int MAX_PACKET_ID = 65535; 00091 int next; 00092 }; 00093 00094 00095 /** 00096 * @class Client 00097 * @brief blocking, non-threaded MQTT client API 00098 * 00099 * This version of the API blocks on all method calls, until they are complete. This means that only one 00100 * MQTT request can be in process at any one time. 00101 * @param Network a network class which supports send, receive 00102 * @param Timer a timer class with the methods: 00103 */ 00104 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE = 256, int MAX_MESSAGE_HANDLERS = 5> 00105 class Client 00106 { 00107 00108 public: 00109 00110 typedef void (*messageHandler)(MessageData&); 00111 00112 /** Construct the client 00113 * @param network - pointer to an instance of the Network class - must be connected to the endpoint 00114 * before calling MQTT connect 00115 * @param limits an instance of the Limit class - to alter limits as required 00116 */ 00117 Client(Network& network, unsigned int command_timeout_ms = 30000); 00118 00119 /** Set the default message handling callback - used for any message which does not match a subscription message handler 00120 * @param mh - pointer to the callback function 00121 */ 00122 void setDefaultMessageHandler(messageHandler mh) 00123 { 00124 defaultMessageHandler.attach(mh); 00125 } 00126 00127 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00128 * The nework object must be connected to the network endpoint before calling this 00129 * Default connect options are used 00130 * @return success code - 00131 */ 00132 int connect(); 00133 00134 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00135 * The nework object must be connected to the network endpoint before calling this 00136 * @param options - connect options 00137 * @return success code - 00138 */ 00139 int connect(MQTTPacket_connectData& options); 00140 00141 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00142 * @param topic - the topic to publish to 00143 * @param message - the message to send 00144 * @return success code - 00145 */ 00146 int publish(const char* topicName, Message& message); 00147 00148 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00149 * @param topic - the topic to publish to 00150 * @param payload - the data to send 00151 * @param payloadlen - the length of the data 00152 * @param qos - the QoS to send the publish at 00153 * @param retained - whether the message should be retained 00154 * @return success code - 00155 */ 00156 int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false); 00157 00158 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00159 * @param topic - the topic to publish to 00160 * @param payload - the data to send 00161 * @param payloadlen - the length of the data 00162 * @param id - the packet id used - returned 00163 * @param qos - the QoS to send the publish at 00164 * @param retained - whether the message should be retained 00165 * @return success code - 00166 */ 00167 int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false); 00168 00169 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback 00170 * @param topicFilter - a topic pattern which can include wildcards 00171 * @param qos - the MQTT QoS to subscribe at 00172 * @param mh - the callback function to be invoked when a message is received for this subscription 00173 * @return success code - 00174 */ 00175 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh); 00176 00177 /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback 00178 * @param topicFilter - a topic pattern which can include wildcards 00179 * @return success code - 00180 */ 00181 int unsubscribe(const char* topicFilter); 00182 00183 /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state 00184 * @return success code - 00185 */ 00186 int disconnect(); 00187 00188 /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive 00189 * yield can be called if no other MQTT operation is needed. This will also allow messages to be 00190 * received. 00191 * @param timeout_ms the time to wait, in milliseconds 00192 * @return success code - on failure, this means the client has disconnected 00193 */ 00194 int yield(unsigned long timeout_ms = 1000L); 00195 00196 /** Is the client connected? 00197 * @return flag - is the client connected or not? 00198 */ 00199 bool isConnected() 00200 { 00201 return isconnected; 00202 } 00203 00204 private: 00205 00206 void cleanSession(); 00207 int cycle(OldTimer& timer); 00208 int waitfor(int packet_type, OldTimer& timer); 00209 int keepalive(); 00210 int publish(int len, OldTimer& timer, enum QoS qos); 00211 00212 int decodePacket(int* value, int timeout); 00213 int readPacket(OldTimer& timer); 00214 int sendPacket(int length, OldTimer& timer); 00215 int deliverMessage(MQTTString& topicName, Message& message); 00216 bool isTopicMatched(char* topicFilter, MQTTString& topicName); 00217 00218 Network& ipstack; 00219 unsigned long command_timeout_ms; 00220 00221 unsigned char sendbuf[MAX_MQTT_PACKET_SIZE]; 00222 unsigned char readbuf[MAX_MQTT_PACKET_SIZE]; 00223 00224 OldTimer last_sent, last_received; 00225 unsigned int keepAliveInterval; 00226 bool ping_outstanding; 00227 bool cleansession; 00228 00229 PacketId packetid; 00230 00231 struct MessageHandlers 00232 { 00233 const char* topicFilter; 00234 _FP<void, MessageData&> fp; 00235 } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic 00236 00237 _FP<void, MessageData&> defaultMessageHandler; 00238 00239 bool isconnected; 00240 00241 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00242 unsigned char pubbuf[MAX_MQTT_PACKET_SIZE]; // store the last publish for sending on reconnect 00243 int inflightLen; 00244 unsigned short inflightMsgid; 00245 enum QoS inflightQoS; 00246 #endif 00247 00248 #if MQTTCLIENT_QOS2 00249 bool pubrel; 00250 #if !defined(MAX_INCOMING_QOS2_MESSAGES) 00251 #define MAX_INCOMING_QOS2_MESSAGES 10 00252 #endif 00253 unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES]; 00254 bool isQoS2msgidFree(unsigned short id); 00255 bool useQoS2msgid(unsigned short id); 00256 void freeQoS2msgid(unsigned short id); 00257 #endif 00258 00259 }; 00260 00261 } 00262 00263 00264 template<class Network, class OldTimer, int a, int MAX_MESSAGE_HANDLERS> 00265 void MQTT::Client<Network, OldTimer, a, MAX_MESSAGE_HANDLERS>::cleanSession() 00266 { 00267 ping_outstanding = false; 00268 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00269 messageHandlers[i].topicFilter = 0; 00270 isconnected = false; 00271 00272 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00273 inflightMsgid = 0; 00274 inflightQoS = QOS0; 00275 #endif 00276 00277 #if MQTTCLIENT_QOS2 00278 pubrel = false; 00279 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00280 incomingQoS2messages[i] = 0; 00281 #endif 00282 } 00283 00284 00285 template<class Network, class OldTimer, int a, int MAX_MESSAGE_HANDLERS> 00286 MQTT::Client<Network, OldTimer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms) : ipstack(network), packetid() 00287 { 00288 last_sent = OldTimer(); 00289 last_received = OldTimer(); 00290 this->command_timeout_ms = command_timeout_ms; 00291 cleanSession(); 00292 } 00293 00294 00295 #if MQTTCLIENT_QOS2 00296 template<class Network, class OldTimer, int a, int b> 00297 bool MQTT::Client<Network, OldTimer, a, b>::isQoS2msgidFree(unsigned short id) 00298 { 00299 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00300 { 00301 if (incomingQoS2messages[i] == id) 00302 return false; 00303 } 00304 return true; 00305 } 00306 00307 00308 template<class Network, class OldTimer, int a, int b> 00309 bool MQTT::Client<Network, OldTimer, a, b>::useQoS2msgid(unsigned short id) 00310 { 00311 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00312 { 00313 if (incomingQoS2messages[i] == 0) 00314 { 00315 incomingQoS2messages[i] = id; 00316 return true; 00317 } 00318 } 00319 return false; 00320 } 00321 00322 00323 template<class Network, class OldTimer, int a, int b> 00324 void MQTT::Client<Network, OldTimer, a, b>::freeQoS2msgid(unsigned short id) 00325 { 00326 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00327 { 00328 if (incomingQoS2messages[i] == id) 00329 { 00330 incomingQoS2messages[i] = 0; 00331 return; 00332 } 00333 } 00334 } 00335 #endif 00336 00337 00338 template<class Network, class OldTimer, int a, int b> 00339 int MQTT::Client<Network, OldTimer, a, b>::sendPacket(int length, OldTimer& timer) 00340 { 00341 int rc = FAILURE, 00342 sent = 0; 00343 00344 while (sent < length && !timer.expired()) 00345 { 00346 rc = ipstack.write(&sendbuf[sent], length - sent, timer.left_ms()); 00347 if (rc < 0) // there was an error writing the data 00348 break; 00349 sent += rc; 00350 } 00351 if (sent == length) 00352 { 00353 if (this->keepAliveInterval > 0) 00354 last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet 00355 rc = SUCCESS; 00356 } 00357 else 00358 rc = FAILURE; 00359 00360 #if defined(MQTT_DEBUG) 00361 char printbuf[150]; 00362 DEBUG("Rc %d from sending packet %s\n", rc, MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length)); 00363 #endif 00364 return rc; 00365 } 00366 00367 00368 template<class Network, class OldTimer, int a, int b> 00369 int MQTT::Client<Network, OldTimer, a, b>::decodePacket(int* value, int timeout) 00370 { 00371 unsigned char c; 00372 int multiplier = 1; 00373 int len = 0; 00374 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; 00375 00376 *value = 0; 00377 do 00378 { 00379 int rc = MQTTPACKET_READ_ERROR; 00380 00381 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) 00382 { 00383 rc = MQTTPACKET_READ_ERROR; /* bad data */ 00384 goto exit; 00385 } 00386 rc = ipstack.read(&c, 1, timeout); 00387 if (rc != 1) 00388 goto exit; 00389 *value += (c & 127) * multiplier; 00390 multiplier *= 128; 00391 } while ((c & 128) != 0); 00392 exit: 00393 return len; 00394 } 00395 00396 00397 /** 00398 * If any read fails in this method, then we should disconnect from the network, as on reconnect 00399 * the packets can be retried. 00400 * @param timeout the max time to wait for the packet read to complete, in milliseconds 00401 * @return the MQTT packet type, or -1 if none 00402 */ 00403 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int b> 00404 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, b>::readPacket(OldTimer& timer) 00405 { 00406 int rc = FAILURE; 00407 MQTTHeader header = {0}; 00408 int len = 0; 00409 int rem_len = 0; 00410 00411 /* 1. read the header byte. This has the packet type in it */ 00412 if (ipstack.read(readbuf, 1, timer.left_ms()) != 1) 00413 goto exit; 00414 00415 len = 1; 00416 /* 2. read the remaining length. This is variable in itself */ 00417 decodePacket(&rem_len, timer.left_ms()); 00418 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */ 00419 00420 if (rem_len > (MAX_MQTT_PACKET_SIZE - len)) 00421 { 00422 rc = BUFFER_OVERFLOW; 00423 goto exit; 00424 } 00425 00426 /* 3. read the rest of the buffer using a callback to supply the rest of the data */ 00427 if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len)) 00428 goto exit; 00429 00430 header.byte = readbuf[0]; 00431 rc = header.bits.type; 00432 if (this->keepAliveInterval > 0) 00433 last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet 00434 exit: 00435 00436 #if defined(MQTT_DEBUG) 00437 if (rc >= 0) 00438 { 00439 char printbuf[50]; 00440 DEBUG("Rc %d from receiving packet %s\n", rc, MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len)); 00441 } 00442 #endif 00443 return rc; 00444 } 00445 00446 00447 // assume topic filter and name is in correct format 00448 // # can only be at end 00449 // + and # can only be next to separator 00450 template<class Network, class OldTimer, int a, int b> 00451 bool MQTT::Client<Network, OldTimer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName) 00452 { 00453 char* curf = topicFilter; 00454 char* curn = topicName.lenstring.data; 00455 char* curn_end = curn + topicName.lenstring.len; 00456 00457 while (*curf && curn < curn_end) 00458 { 00459 if (*curn == '/' && *curf != '/') 00460 break; 00461 if (*curf != '+' && *curf != '#' && *curf != *curn) 00462 break; 00463 if (*curf == '+') 00464 { // skip until we meet the next separator, or end of string 00465 char* nextpos = curn + 1; 00466 while (nextpos < curn_end && *nextpos != '/') 00467 nextpos = ++curn + 1; 00468 } 00469 else if (*curf == '#') 00470 curn = curn_end - 1; // skip until end of string 00471 curf++; 00472 curn++; 00473 }; 00474 00475 return (curn == curn_end) && (*curf == '\0'); 00476 } 00477 00478 00479 00480 template<class Network, class OldTimer, int a, int MAX_MESSAGE_HANDLERS> 00481 int MQTT::Client<Network, OldTimer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message) 00482 { 00483 int rc = FAILURE; 00484 00485 // we have to find the right message handler - indexed by topic 00486 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00487 { 00488 if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) || 00489 isTopicMatched((char*)messageHandlers[i].topicFilter, topicName))) 00490 { 00491 if (messageHandlers[i].fp.attached()) 00492 { 00493 MessageData md(topicName, message); 00494 messageHandlers[i].fp(md); 00495 rc = SUCCESS; 00496 } 00497 } 00498 } 00499 00500 if (rc == FAILURE && defaultMessageHandler.attached()) 00501 { 00502 MessageData md(topicName, message); 00503 defaultMessageHandler(md); 00504 rc = SUCCESS; 00505 } 00506 00507 return rc; 00508 } 00509 00510 00511 00512 template<class Network, class OldTimer, int a, int b> 00513 int MQTT::Client<Network, OldTimer, a, b>::yield(unsigned long timeout_ms) 00514 { 00515 int rc = SUCCESS; 00516 OldTimer timer = OldTimer(); 00517 00518 timer.countdown_ms(timeout_ms); 00519 while (!timer.expired()) 00520 { 00521 if (cycle(timer) < 0) 00522 { 00523 rc = FAILURE; 00524 break; 00525 } 00526 Thread::wait(10); 00527 } 00528 00529 return rc; 00530 } 00531 00532 00533 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int b> 00534 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, b>::cycle(OldTimer& timer) 00535 { 00536 /* get one piece of work off the wire and one pass through */ 00537 00538 // read the socket, see what work is due 00539 int packet_type = readPacket(timer); 00540 00541 int len = 0, 00542 rc = SUCCESS; 00543 00544 switch (packet_type) 00545 { 00546 case FAILURE: 00547 case BUFFER_OVERFLOW: 00548 rc = packet_type; 00549 break; 00550 case CONNACK: 00551 case PUBACK: 00552 case SUBACK: 00553 break; 00554 case PUBLISH: 00555 { 00556 MQTTString topicName = MQTTString_initializer; 00557 Message msg; 00558 int intQoS; 00559 if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName, 00560 (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00561 goto exit; 00562 msg.qos = (enum QoS)intQoS; 00563 #if MQTTCLIENT_QOS2 00564 if (msg.qos != QOS2) 00565 #endif 00566 deliverMessage(topicName, msg); 00567 #if MQTTCLIENT_QOS2 00568 else if (isQoS2msgidFree(msg.id)) 00569 { 00570 if (useQoS2msgid(msg.id)) 00571 deliverMessage(topicName, msg); 00572 else 00573 WARN("Maximum number of incoming QoS2 messages exceeded"); 00574 } 00575 #endif 00576 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00577 if (msg.qos != QOS0) 00578 { 00579 if (msg.qos == QOS1) 00580 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id); 00581 else if (msg.qos == QOS2) 00582 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id); 00583 if (len <= 0) 00584 rc = FAILURE; 00585 else 00586 rc = sendPacket(len, timer); 00587 if (rc == FAILURE) 00588 goto exit; // there was a problem 00589 } 00590 break; 00591 #endif 00592 } 00593 #if MQTTCLIENT_QOS2 00594 case PUBREC: 00595 case PUBREL: 00596 unsigned short mypacketid; 00597 unsigned char dup, type; 00598 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00599 rc = FAILURE; 00600 else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, 00601 (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0) 00602 rc = FAILURE; 00603 else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet 00604 rc = FAILURE; // there was a problem 00605 if (rc == FAILURE) 00606 goto exit; // there was a problem 00607 if (packet_type == PUBREL) 00608 freeQoS2msgid(mypacketid); 00609 break; 00610 00611 case PUBCOMP: 00612 break; 00613 #endif 00614 case PINGRESP: 00615 ping_outstanding = false; 00616 break; 00617 } 00618 keepalive(); 00619 exit: 00620 if (rc == SUCCESS) 00621 rc = packet_type; 00622 return rc; 00623 } 00624 00625 00626 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int b> 00627 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, b>::keepalive() 00628 { 00629 int rc = FAILURE; 00630 00631 if (keepAliveInterval == 0) 00632 { 00633 rc = SUCCESS; 00634 goto exit; 00635 } 00636 00637 if (last_sent.expired() || last_received.expired()) 00638 { 00639 if (!ping_outstanding) 00640 { 00641 OldTimer timer(1000); 00642 int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE); 00643 if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet 00644 ping_outstanding = true; 00645 } 00646 } 00647 00648 exit: 00649 return rc; 00650 } 00651 00652 00653 // only used in single-threaded mode where one command at a time is in process 00654 template<class Network, class OldTimer, int a, int b> 00655 int MQTT::Client<Network, OldTimer, a, b>::waitfor(int packet_type, OldTimer& timer) 00656 { 00657 int rc = FAILURE; 00658 00659 do 00660 { 00661 if (timer.expired()) 00662 break; // we timed out 00663 } 00664 while ((rc = cycle(timer)) != packet_type); 00665 00666 return rc; 00667 } 00668 00669 00670 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int b> 00671 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options) 00672 { 00673 OldTimer connect_timer(command_timeout_ms); 00674 int rc = FAILURE; 00675 int len = 0; 00676 00677 if (isconnected) // don't send connect packet again if we are already connected 00678 goto exit; 00679 00680 this->keepAliveInterval = options.keepAliveInterval; 00681 this->cleansession = options.cleansession; 00682 if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0) 00683 goto exit; 00684 if ((rc = sendPacket(len, connect_timer)) != SUCCESS) // send the connect packet 00685 goto exit; // there was a problem 00686 00687 if (this->keepAliveInterval > 0) 00688 last_received.countdown(this->keepAliveInterval); 00689 // this will be a blocking call, wait for the connack 00690 if (waitfor(CONNACK, connect_timer) == CONNACK) 00691 { 00692 unsigned char connack_rc = 255; 00693 bool sessionPresent = false; 00694 if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00695 rc = connack_rc; 00696 else 00697 rc = FAILURE; 00698 } 00699 else 00700 rc = FAILURE; 00701 00702 #if MQTTCLIENT_QOS2 00703 // resend any inflight publish 00704 if (inflightMsgid > 0 && inflightQoS == QOS2 && pubrel) 00705 { 00706 if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0) 00707 rc = FAILURE; 00708 else 00709 rc = publish(len, connect_timer, inflightQoS); 00710 } 00711 else 00712 #endif 00713 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00714 if (inflightMsgid > 0) 00715 { 00716 memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE); 00717 rc = publish(inflightLen, connect_timer, inflightQoS); 00718 } 00719 #endif 00720 00721 exit: 00722 if (rc == SUCCESS) 00723 isconnected = true; 00724 return rc; 00725 } 00726 00727 00728 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int b> 00729 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, b>::connect() 00730 { 00731 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; 00732 return connect(default_options); 00733 } 00734 00735 00736 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00737 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler) 00738 { 00739 int rc = FAILURE; 00740 OldTimer timer(command_timeout_ms); 00741 int len = 0; 00742 MQTTString topic = {(char*)topicFilter, {0, 0}}; 00743 00744 if (!isconnected) 00745 goto exit; 00746 00747 len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); 00748 if (len <= 0) 00749 goto exit; 00750 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet 00751 goto exit; // there was a problem 00752 00753 if (waitfor(SUBACK, timer) == SUBACK) // wait for suback 00754 { 00755 int count = 0, grantedQoS = -1; 00756 unsigned short mypacketid; 00757 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00758 rc = grantedQoS; // 0, 1, 2 or 0x80 00759 if (rc != 0x80) 00760 { 00761 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00762 { 00763 if (messageHandlers[i].topicFilter == 0) 00764 { 00765 messageHandlers[i].topicFilter = topicFilter; 00766 messageHandlers[i].fp.attach(messageHandler); 00767 rc = 0; 00768 break; 00769 } 00770 } 00771 } 00772 } 00773 else 00774 rc = FAILURE; 00775 00776 exit: 00777 if (rc != SUCCESS) 00778 cleanSession(); 00779 return rc; 00780 } 00781 00782 00783 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00784 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter) 00785 { 00786 int rc = FAILURE; 00787 OldTimer timer(command_timeout_ms); 00788 MQTTString topic = {(char*)topicFilter, {0, 0}}; 00789 int len = 0; 00790 00791 if (!isconnected) 00792 goto exit; 00793 00794 if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0) 00795 goto exit; 00796 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet 00797 goto exit; // there was a problem 00798 00799 if (waitfor(UNSUBACK, timer) == UNSUBACK) 00800 { 00801 unsigned short mypacketid; // should be the same as the packetid above 00802 if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00803 { 00804 rc = 0; 00805 00806 // remove the subscription message handler associated with this topic, if there is one 00807 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00808 { 00809 if (messageHandlers[i].topicFilter && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0) 00810 { 00811 messageHandlers[i].topicFilter = 0; 00812 break; 00813 } 00814 } 00815 } 00816 } 00817 else 00818 rc = FAILURE; 00819 00820 exit: 00821 if (rc != SUCCESS) 00822 cleanSession(); 00823 return rc; 00824 } 00825 00826 00827 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int b> 00828 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, OldTimer& timer, enum QoS qos) 00829 { 00830 int rc; 00831 00832 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet 00833 goto exit; // there was a problem 00834 00835 #if MQTTCLIENT_QOS1 00836 if (qos == QOS1) 00837 { 00838 if (waitfor(PUBACK, timer) == PUBACK) 00839 { 00840 unsigned short mypacketid; 00841 unsigned char dup, type; 00842 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00843 rc = FAILURE; 00844 else if (inflightMsgid == mypacketid) 00845 inflightMsgid = 0; 00846 } 00847 else 00848 rc = FAILURE; 00849 } 00850 #elif MQTTCLIENT_QOS2 00851 else if (qos == QOS2) 00852 { 00853 if (waitfor(PUBCOMP, timer) == PUBCOMP) 00854 { 00855 unsigned short mypacketid; 00856 unsigned char dup, type; 00857 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00858 rc = FAILURE; 00859 else if (inflightMsgid == mypacketid) 00860 inflightMsgid = 0; 00861 } 00862 else 00863 rc = FAILURE; 00864 } 00865 #endif 00866 00867 exit: 00868 if (rc != SUCCESS) 00869 cleanSession(); 00870 return rc; 00871 } 00872 00873 00874 00875 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int b> 00876 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained) 00877 { 00878 int rc = FAILURE; 00879 OldTimer timer(command_timeout_ms); 00880 MQTTString topicString = MQTTString_initializer; 00881 int len = 0; 00882 00883 if (!isconnected) 00884 goto exit; 00885 00886 topicString.cstring = (char*)topicName; 00887 00888 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00889 if (qos == QOS1 || qos == QOS2) 00890 id = packetid.getNext(); 00891 #endif 00892 00893 len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id, 00894 topicString, (unsigned char*)payload, payloadlen); 00895 if (len <= 0) 00896 goto exit; 00897 00898 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00899 if (!cleansession) 00900 { 00901 memcpy(pubbuf, sendbuf, len); 00902 inflightMsgid = id; 00903 inflightLen = len; 00904 inflightQoS = qos; 00905 #if MQTTCLIENT_QOS2 00906 pubrel = false; 00907 #endif 00908 } 00909 #endif 00910 00911 rc = publish(len, timer, qos); 00912 exit: 00913 return rc; 00914 } 00915 00916 00917 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int b> 00918 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained) 00919 { 00920 unsigned short id = 0; // dummy - not used for anything 00921 return publish(topicName, payload, payloadlen, id, qos, retained); 00922 } 00923 00924 00925 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int b> 00926 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message) 00927 { 00928 return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained); 00929 } 00930 00931 00932 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int b> 00933 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, b>::disconnect() 00934 { 00935 int rc = FAILURE; 00936 OldTimer timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete 00937 int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE); 00938 if (len > 0) 00939 rc = sendPacket(len, timer); // send the disconnect packet 00940 00941 if (cleansession) 00942 cleanSession(); 00943 else 00944 isconnected = false; 00945 return rc; 00946 } 00947 00948 00949 #endif
Generated on Tue Jul 12 2022 21:37:29 by
1.7.2