Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Fork of MQTT by
MQTTClient.h
00001 /******************************************************************************* 00002 * Copyright (c) 2014, 2015 IBM Corp. 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Ian Craggs - initial API and implementation and/or initial documentation 00015 * Ian Craggs - fix for bug 458512 - QoS 2 messages 00016 * Ian Craggs - fix for bug 460389 - send loop uses wrong length 00017 * Ian Craggs - fix for bug 464169 - clearing subscriptions 00018 * Ian Craggs - fix for bug 464551 - enums and ints can be different size 00019 *******************************************************************************/ 00020 00021 #if !defined(MQTTCLIENT_H) 00022 #define MQTTCLIENT_H 00023 00024 #include "FP.h" 00025 #include "MQTTPacket.h" 00026 #include "stdio.h" 00027 #include "MQTTLogging.h" 00028 00029 #if !defined(MQTTCLIENT_QOS1) 00030 #define MQTTCLIENT_QOS1 1 00031 #endif 00032 #if !defined(MQTTCLIENT_QOS2) 00033 #define MQTTCLIENT_QOS2 0 00034 #endif 00035 00036 namespace MQTT 00037 { 00038 00039 00040 enum QoS { QOS0, QOS1, QOS2 }; 00041 00042 // all failure return codes must be negative 00043 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 }; 00044 00045 00046 struct Message 00047 { 00048 enum QoS qos; 00049 bool retained; 00050 bool dup; 00051 unsigned short id; 00052 void *payload; 00053 size_t payloadlen; 00054 }; 00055 00056 00057 struct MessageData 00058 { 00059 MessageData(MQTTString &aTopicName, struct Message &aMessage) : message(aMessage), topicName(aTopicName) 00060 { } 00061 00062 struct Message &message; 00063 MQTTString &topicName; 00064 }; 00065 00066 00067 class PacketId 00068 { 00069 public: 00070 PacketId() 00071 { 00072 next = 0; 00073 } 00074 00075 int getNext() 00076 { 00077 return next = (next == MAX_PACKET_ID) ? 1 : ++next; 00078 } 00079 00080 private: 00081 static const int MAX_PACKET_ID = 65535; 00082 int next; 00083 }; 00084 00085 00086 /** 00087 * @class Client 00088 * @brief blocking, non-threaded MQTT client API 00089 * 00090 * This version of the API blocks on all method calls, until they are complete. This means that only one 00091 * MQTT request can be in process at any one time. 00092 * @param Network a network class which supports send, receive 00093 * @param Timer a timer class with the methods: 00094 */ 00095 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5> 00096 class Client 00097 { 00098 00099 public: 00100 00101 typedef void (*messageHandler)(MessageData&); 00102 00103 /** Construct the client 00104 * @param network - pointer to an instance of the Network class - must be connected to the endpoint 00105 * before calling MQTT connect 00106 * @param limits an instance of the Limit class - to alter limits as required 00107 */ 00108 Client(Network& network, unsigned int command_timeout_ms = 30000); 00109 00110 /** Set the default message handling callback - used for any message which does not match a subscription message handler 00111 * @param mh - pointer to the callback function 00112 */ 00113 void setDefaultMessageHandler(messageHandler mh) 00114 { 00115 defaultMessageHandler.attach(mh); 00116 } 00117 00118 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00119 * The nework object must be connected to the network endpoint before calling this 00120 * Default connect options are used 00121 * @return success code - 00122 */ 00123 int connect(); 00124 00125 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00126 * The nework object must be connected to the network endpoint before calling this 00127 * @param options - connect options 00128 * @return success code - 00129 */ 00130 int connect(MQTTPacket_connectData& options); 00131 00132 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00133 * @param topic - the topic to publish to 00134 * @param message - the message to send 00135 * @return success code - 00136 */ 00137 int publish(const char* topicName, Message& message); 00138 00139 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00140 * @param topic - the topic to publish to 00141 * @param payload - the data to send 00142 * @param payloadlen - the length of the data 00143 * @param qos - the QoS to send the publish at 00144 * @param retained - whether the message should be retained 00145 * @return success code - 00146 */ 00147 int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false); 00148 00149 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00150 * @param topic - the topic to publish to 00151 * @param payload - the data to send 00152 * @param payloadlen - the length of the data 00153 * @param id - the packet id used - returned 00154 * @param qos - the QoS to send the publish at 00155 * @param retained - whether the message should be retained 00156 * @return success code - 00157 */ 00158 int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false); 00159 00160 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback 00161 * @param topicFilter - a topic pattern which can include wildcards 00162 * @param qos - the MQTT QoS to subscribe at 00163 * @param mh - the callback function to be invoked when a message is received for this subscription 00164 * @return success code - 00165 */ 00166 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh); 00167 00168 /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback 00169 * @param topicFilter - a topic pattern which can include wildcards 00170 * @return success code - 00171 */ 00172 int unsubscribe(const char* topicFilter); 00173 00174 /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state 00175 * @return success code - 00176 */ 00177 int disconnect(); 00178 00179 /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive 00180 * yield can be called if no other MQTT operation is needed. This will also allow messages to be 00181 * received. 00182 * @param timeout_ms the time to wait, in milliseconds 00183 * @return success code - on failure, this means the client has disconnected 00184 */ 00185 int yield(unsigned long timeout_ms = 1000L); 00186 00187 /** Is the client connected? 00188 * @return flag - is the client connected or not? 00189 */ 00190 bool isConnected() 00191 { 00192 return isconnected; 00193 } 00194 00195 private: 00196 00197 void cleanSession(); 00198 int cycle(Timer& timer); 00199 int waitfor(int packet_type, Timer& timer); 00200 int keepalive(); 00201 int publish(int len, Timer& timer, enum QoS qos); 00202 00203 int decodePacket(int* value, int timeout); 00204 int readPacket(Timer& timer); 00205 int sendPacket(int length, Timer& timer); 00206 int deliverMessage(MQTTString& topicName, Message& message); 00207 bool isTopicMatched(char* topicFilter, MQTTString& topicName); 00208 00209 Network& ipstack; 00210 unsigned long command_timeout_ms; 00211 00212 unsigned char sendbuf[MAX_MQTT_PACKET_SIZE]; 00213 unsigned char readbuf[MAX_MQTT_PACKET_SIZE]; 00214 00215 Timer last_sent, last_received; 00216 unsigned int keepAliveInterval; 00217 bool ping_outstanding; 00218 bool cleansession; 00219 00220 PacketId packetid; 00221 00222 struct MessageHandlers 00223 { 00224 const char* topicFilter; 00225 FP<void, MessageData&> fp; 00226 } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic 00227 00228 FP<void, MessageData&> defaultMessageHandler; 00229 00230 bool isconnected; 00231 00232 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00233 unsigned char pubbuf[MAX_MQTT_PACKET_SIZE]; // store the last publish for sending on reconnect 00234 int inflightLen; 00235 unsigned short inflightMsgid; 00236 enum QoS inflightQoS; 00237 #endif 00238 00239 #if MQTTCLIENT_QOS2 00240 bool pubrel; 00241 #if !defined(MAX_INCOMING_QOS2_MESSAGES) 00242 #define MAX_INCOMING_QOS2_MESSAGES 10 00243 #endif 00244 unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES]; 00245 bool isQoS2msgidFree(unsigned short id); 00246 bool useQoS2msgid(unsigned short id); 00247 void freeQoS2msgid(unsigned short id); 00248 #endif 00249 00250 }; 00251 00252 } 00253 00254 00255 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00256 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::cleanSession() 00257 { 00258 ping_outstanding = false; 00259 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00260 messageHandlers[i].topicFilter = 0; 00261 isconnected = false; 00262 00263 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00264 inflightMsgid = 0; 00265 inflightQoS = QOS0; 00266 #endif 00267 00268 #if MQTTCLIENT_QOS2 00269 pubrel = false; 00270 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00271 incomingQoS2messages[i] = 0; 00272 #endif 00273 } 00274 00275 00276 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00277 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms) : ipstack(network), packetid() 00278 { 00279 this->command_timeout_ms = command_timeout_ms; 00280 cleanSession(); 00281 } 00282 00283 00284 #if MQTTCLIENT_QOS2 00285 template<class Network, class Timer, int a, int b> 00286 bool MQTT::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id) 00287 { 00288 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00289 { 00290 if (incomingQoS2messages[i] == id) 00291 return false; 00292 } 00293 return true; 00294 } 00295 00296 00297 template<class Network, class Timer, int a, int b> 00298 bool MQTT::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id) 00299 { 00300 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00301 { 00302 if (incomingQoS2messages[i] == 0) 00303 { 00304 incomingQoS2messages[i] = id; 00305 return true; 00306 } 00307 } 00308 return false; 00309 } 00310 00311 00312 template<class Network, class Timer, int a, int b> 00313 void MQTT::Client<Network, Timer, a, b>::freeQoS2msgid(unsigned short id) 00314 { 00315 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00316 { 00317 if (incomingQoS2messages[i] == id) 00318 { 00319 incomingQoS2messages[i] = 0; 00320 return; 00321 } 00322 } 00323 } 00324 #endif 00325 00326 00327 template<class Network, class Timer, int a, int b> 00328 int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer) 00329 { 00330 int rc = FAILURE, 00331 sent = 0; 00332 00333 while (sent < length && !timer.expired()) 00334 { 00335 rc = ipstack.write(&sendbuf[sent], length - sent, timer.left_ms()); 00336 if (rc < 0) // there was an error writing the data 00337 break; 00338 sent += rc; 00339 } 00340 if (sent == length) 00341 { 00342 if (this->keepAliveInterval > 0) 00343 last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet 00344 rc = SUCCESS; 00345 } 00346 else 00347 rc = FAILURE; 00348 00349 #if defined(MQTT_DEBUG) 00350 char printbuf[150]; 00351 DEBUG("Rc %d from sending packet %s\n", rc, MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length)); 00352 #endif 00353 return rc; 00354 } 00355 00356 00357 template<class Network, class Timer, int a, int b> 00358 int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout) 00359 { 00360 unsigned char c; 00361 int multiplier = 1; 00362 int len = 0; 00363 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; 00364 00365 *value = 0; 00366 do 00367 { 00368 int rc = MQTTPACKET_READ_ERROR; 00369 00370 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) 00371 { 00372 rc = MQTTPACKET_READ_ERROR; /* bad data */ 00373 goto exit; 00374 } 00375 rc = ipstack.read(&c, 1, timeout); 00376 if (rc != 1) 00377 goto exit; 00378 *value += (c & 127) * multiplier; 00379 multiplier *= 128; 00380 } while ((c & 128) != 0); 00381 exit: 00382 return len; 00383 } 00384 00385 00386 /** 00387 * If any read fails in this method, then we should disconnect from the network, as on reconnect 00388 * the packets can be retried. 00389 * @param timeout the max time to wait for the packet read to complete, in milliseconds 00390 * @return the MQTT packet type, or -1 if none 00391 */ 00392 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00393 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::readPacket(Timer& timer) 00394 { 00395 int rc = FAILURE; 00396 MQTTHeader header = {0}; 00397 int len = 0; 00398 int rem_len = 0; 00399 00400 /* 1. read the header byte. This has the packet type in it */ 00401 if (ipstack.read(readbuf, 1, timer.left_ms()) != 1) 00402 goto exit; 00403 00404 len = 1; 00405 /* 2. read the remaining length. This is variable in itself */ 00406 decodePacket(&rem_len, timer.left_ms()); 00407 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */ 00408 00409 if (rem_len > (MAX_MQTT_PACKET_SIZE - len)) 00410 { 00411 rc = BUFFER_OVERFLOW; 00412 goto exit; 00413 } 00414 00415 /* 3. read the rest of the buffer using a callback to supply the rest of the data */ 00416 if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len)) 00417 goto exit; 00418 00419 header.byte = readbuf[0]; 00420 rc = header.bits.type; 00421 if (this->keepAliveInterval > 0) 00422 last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet 00423 exit: 00424 00425 #if defined(MQTT_DEBUG) 00426 if (rc >= 0) 00427 { 00428 char printbuf[50]; 00429 DEBUG("Rc %d from receiving packet %s\n", rc, MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len)); 00430 } 00431 #endif 00432 return rc; 00433 } 00434 00435 00436 // assume topic filter and name is in correct format 00437 // # can only be at end 00438 // + and # can only be next to separator 00439 template<class Network, class Timer, int a, int b> 00440 bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName) 00441 { 00442 char* curf = topicFilter; 00443 char* curn = topicName.lenstring.data; 00444 char* curn_end = curn + topicName.lenstring.len; 00445 00446 while (*curf && curn < curn_end) 00447 { 00448 if (*curn == '/' && *curf != '/') 00449 break; 00450 if (*curf != '+' && *curf != '#' && *curf != *curn) 00451 break; 00452 if (*curf == '+') 00453 { // skip until we meet the next separator, or end of string 00454 char* nextpos = curn + 1; 00455 while (nextpos < curn_end && *nextpos != '/') 00456 nextpos = ++curn + 1; 00457 } 00458 else if (*curf == '#') 00459 curn = curn_end - 1; // skip until end of string 00460 curf++; 00461 curn++; 00462 }; 00463 00464 return (curn == curn_end) && (*curf == '\0'); 00465 } 00466 00467 00468 00469 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00470 int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message) 00471 { 00472 int rc = FAILURE; 00473 00474 // we have to find the right message handler - indexed by topic 00475 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00476 { 00477 if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) || 00478 isTopicMatched((char*)messageHandlers[i].topicFilter, topicName))) 00479 { 00480 if (messageHandlers[i].fp.attached()) 00481 { 00482 MessageData md(topicName, message); 00483 messageHandlers[i].fp(md); 00484 rc = SUCCESS; 00485 } 00486 } 00487 } 00488 00489 if (rc == FAILURE && defaultMessageHandler.attached()) 00490 { 00491 MessageData md(topicName, message); 00492 defaultMessageHandler(md); 00493 rc = SUCCESS; 00494 } 00495 00496 return rc; 00497 } 00498 00499 00500 00501 template<class Network, class Timer, int a, int b> 00502 int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms) 00503 { 00504 int rc = SUCCESS; 00505 Timer timer = Timer(); 00506 00507 timer.countdown_ms(timeout_ms); 00508 while (!timer.expired()) 00509 { 00510 if (cycle(timer) < 0) 00511 { 00512 rc = FAILURE; 00513 break; 00514 } 00515 } 00516 00517 return rc; 00518 } 00519 00520 00521 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00522 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer) 00523 { 00524 /* get one piece of work off the wire and one pass through */ 00525 00526 // read the socket, see what work is due 00527 int packet_type = readPacket(timer); 00528 00529 int len = 0, 00530 rc = SUCCESS; 00531 00532 switch (packet_type) 00533 { 00534 case FAILURE: 00535 case BUFFER_OVERFLOW: 00536 rc = packet_type; 00537 break; 00538 case CONNACK: 00539 case PUBACK: 00540 case SUBACK: 00541 break; 00542 case PUBLISH: 00543 { 00544 MQTTString topicName = MQTTString_initializer; 00545 Message msg; 00546 int intQoS; 00547 if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName, 00548 (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00549 goto exit; 00550 msg.qos = (enum QoS)intQoS; 00551 #if MQTTCLIENT_QOS2 00552 if (msg.qos != QOS2) 00553 #endif 00554 deliverMessage(topicName, msg); 00555 #if MQTTCLIENT_QOS2 00556 else if (isQoS2msgidFree(msg.id)) 00557 { 00558 if (useQoS2msgid(msg.id)) 00559 deliverMessage(topicName, msg); 00560 else 00561 WARN("Maximum number of incoming QoS2 messages exceeded"); 00562 } 00563 #endif 00564 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00565 if (msg.qos != QOS0) 00566 { 00567 if (msg.qos == QOS1) 00568 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id); 00569 else if (msg.qos == QOS2) 00570 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id); 00571 if (len <= 0) 00572 rc = FAILURE; 00573 else 00574 rc = sendPacket(len, timer); 00575 if (rc == FAILURE) 00576 goto exit; // there was a problem 00577 } 00578 break; 00579 #endif 00580 } 00581 #if MQTTCLIENT_QOS2 00582 case PUBREC: 00583 case PUBREL: 00584 unsigned short mypacketid; 00585 unsigned char dup, type; 00586 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00587 rc = FAILURE; 00588 else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, 00589 (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0) 00590 rc = FAILURE; 00591 else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet 00592 rc = FAILURE; // there was a problem 00593 if (rc == FAILURE) 00594 goto exit; // there was a problem 00595 if (packet_type == PUBREL) 00596 freeQoS2msgid(mypacketid); 00597 break; 00598 00599 case PUBCOMP: 00600 break; 00601 #endif 00602 case PINGRESP: 00603 ping_outstanding = false; 00604 break; 00605 } 00606 keepalive(); 00607 exit: 00608 if (rc == SUCCESS) 00609 rc = packet_type; 00610 return rc; 00611 } 00612 00613 00614 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00615 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive() 00616 { 00617 int rc = FAILURE; 00618 00619 if (keepAliveInterval == 0) 00620 { 00621 rc = SUCCESS; 00622 goto exit; 00623 } 00624 00625 if (last_sent.expired() || last_received.expired()) 00626 { 00627 if (!ping_outstanding) 00628 { 00629 Timer timer(1000); 00630 int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE); 00631 if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet 00632 ping_outstanding = true; 00633 } 00634 } 00635 00636 exit: 00637 return rc; 00638 } 00639 00640 00641 // only used in single-threaded mode where one command at a time is in process 00642 template<class Network, class Timer, int a, int b> 00643 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer) 00644 { 00645 int rc = FAILURE; 00646 00647 do 00648 { 00649 if (timer.expired()) 00650 break; // we timed out 00651 } 00652 while ((rc = cycle(timer)) != packet_type); 00653 00654 return rc; 00655 } 00656 00657 00658 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00659 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options) 00660 { 00661 Timer connect_timer(command_timeout_ms); 00662 int rc = FAILURE; 00663 int len = 0; 00664 00665 if (isconnected) // don't send connect packet again if we are already connected 00666 goto exit; 00667 00668 this->keepAliveInterval = options.keepAliveInterval; 00669 this->cleansession = options.cleansession; 00670 if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0) 00671 goto exit; 00672 if ((rc = sendPacket(len, connect_timer)) != SUCCESS) // send the connect packet 00673 goto exit; // there was a problem 00674 00675 if (this->keepAliveInterval > 0) 00676 last_received.countdown(this->keepAliveInterval); 00677 // this will be a blocking call, wait for the connack 00678 if (waitfor(CONNACK, connect_timer) == CONNACK) 00679 { 00680 unsigned char connack_rc = 255; 00681 bool sessionPresent = false; 00682 if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00683 rc = connack_rc; 00684 else 00685 rc = FAILURE; 00686 } 00687 else 00688 rc = FAILURE; 00689 00690 #if MQTTCLIENT_QOS2 00691 // resend any inflight publish 00692 if (inflightMsgid > 0 && inflightQoS == QOS2 && pubrel) 00693 { 00694 if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0) 00695 rc = FAILURE; 00696 else 00697 rc = publish(len, connect_timer, inflightQoS); 00698 } 00699 else 00700 #endif 00701 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00702 if (inflightMsgid > 0) 00703 { 00704 memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE); 00705 rc = publish(inflightLen, connect_timer, inflightQoS); 00706 } 00707 #endif 00708 00709 exit: 00710 if (rc == SUCCESS) 00711 isconnected = true; 00712 return rc; 00713 } 00714 00715 00716 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00717 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect() 00718 { 00719 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; 00720 return connect(default_options); 00721 } 00722 00723 00724 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00725 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler) 00726 { 00727 int rc = FAILURE; 00728 Timer timer(command_timeout_ms); 00729 int len = 0; 00730 MQTTString topic = {(char*)topicFilter, {0, 0}}; 00731 00732 if (!isconnected) 00733 goto exit; 00734 00735 len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); 00736 if (len <= 0) 00737 goto exit; 00738 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet 00739 goto exit; // there was a problem 00740 00741 if (waitfor(SUBACK, timer) == SUBACK) // wait for suback 00742 { 00743 int count = 0, grantedQoS = -1; 00744 unsigned short mypacketid; 00745 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00746 rc = grantedQoS; // 0, 1, 2 or 0x80 00747 if (rc != 0x80) 00748 { 00749 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00750 { 00751 if (messageHandlers[i].topicFilter == 0) 00752 { 00753 messageHandlers[i].topicFilter = topicFilter; 00754 messageHandlers[i].fp.attach(messageHandler); 00755 rc = 0; 00756 break; 00757 } 00758 } 00759 } 00760 } 00761 else 00762 rc = FAILURE; 00763 00764 exit: 00765 if (rc != SUCCESS) 00766 cleanSession(); 00767 return rc; 00768 } 00769 00770 00771 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00772 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter) 00773 { 00774 int rc = FAILURE; 00775 Timer timer(command_timeout_ms); 00776 MQTTString topic = {(char*)topicFilter, {0, 0}}; 00777 int len = 0; 00778 00779 if (!isconnected) 00780 goto exit; 00781 00782 if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0) 00783 goto exit; 00784 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet 00785 goto exit; // there was a problem 00786 00787 if (waitfor(UNSUBACK, timer) == UNSUBACK) 00788 { 00789 unsigned short mypacketid; // should be the same as the packetid above 00790 if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00791 { 00792 rc = 0; 00793 00794 // remove the subscription message handler associated with this topic, if there is one 00795 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00796 { 00797 if (messageHandlers[i].topicFilter && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0) 00798 { 00799 messageHandlers[i].topicFilter = 0; 00800 break; 00801 } 00802 } 00803 } 00804 } 00805 else 00806 rc = FAILURE; 00807 00808 exit: 00809 if (rc != SUCCESS) 00810 cleanSession(); 00811 return rc; 00812 } 00813 00814 00815 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00816 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos) 00817 { 00818 int rc; 00819 00820 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet 00821 goto exit; // there was a problem 00822 00823 #if MQTTCLIENT_QOS1 00824 if (qos == QOS1) 00825 { 00826 if (waitfor(PUBACK, timer) == PUBACK) 00827 { 00828 unsigned short mypacketid; 00829 unsigned char dup, type; 00830 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00831 rc = FAILURE; 00832 else if (inflightMsgid == mypacketid) 00833 inflightMsgid = 0; 00834 } 00835 else 00836 rc = FAILURE; 00837 } 00838 #elif MQTTCLIENT_QOS2 00839 else if (qos == QOS2) 00840 { 00841 if (waitfor(PUBCOMP, timer) == PUBCOMP) 00842 { 00843 unsigned short mypacketid; 00844 unsigned char dup, type; 00845 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00846 rc = FAILURE; 00847 else if (inflightMsgid == mypacketid) 00848 inflightMsgid = 0; 00849 } 00850 else 00851 rc = FAILURE; 00852 } 00853 #endif 00854 00855 exit: 00856 if (rc != SUCCESS) 00857 cleanSession(); 00858 return rc; 00859 } 00860 00861 00862 00863 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00864 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained) 00865 { 00866 int rc = FAILURE; 00867 Timer timer(command_timeout_ms); 00868 MQTTString topicString = MQTTString_initializer; 00869 int len = 0; 00870 00871 if (!isconnected) 00872 goto exit; 00873 00874 topicString.cstring = (char*)topicName; 00875 00876 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00877 if (qos == QOS1 || qos == QOS2) 00878 id = packetid.getNext(); 00879 #endif 00880 00881 len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id, 00882 topicString, (unsigned char*)payload, payloadlen); 00883 if (len <= 0) 00884 goto exit; 00885 00886 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00887 if (!cleansession) 00888 { 00889 memcpy(pubbuf, sendbuf, len); 00890 inflightMsgid = id; 00891 inflightLen = len; 00892 inflightQoS = qos; 00893 #if MQTTCLIENT_QOS2 00894 pubrel = false; 00895 #endif 00896 } 00897 #endif 00898 00899 rc = publish(len, timer, qos); 00900 exit: 00901 return rc; 00902 } 00903 00904 00905 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00906 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained) 00907 { 00908 unsigned short id = 0; // dummy - not used for anything 00909 return publish(topicName, payload, payloadlen, id, qos, retained); 00910 } 00911 00912 00913 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00914 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message) 00915 { 00916 return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained); 00917 } 00918 00919 00920 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00921 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect() 00922 { 00923 int rc = FAILURE; 00924 Timer timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete 00925 int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE); 00926 if (len > 0) 00927 rc = sendPacket(len, timer); // send the disconnect packet 00928 00929 if (cleansession) 00930 cleanSession(); 00931 else 00932 isconnected = false; 00933 return rc; 00934 } 00935 00936 00937 #endif
Generated on Thu Jul 14 2022 00:21:46 by
1.7.2
