Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Fork of MQTT by
MQTTClient.h
00001 /******************************************************************************* 00002 * Copyright (c) 2014, 2015 IBM Corp. 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Ian Craggs - initial API and implementation and/or initial documentation 00015 * Ian Craggs - fix for bug 458512 - QoS 2 messages 00016 * Ian Craggs - fix for bug 460389 - send loop uses wrong length 00017 * Ian Craggs - fix for bug 464169 - clearing subscriptions 00018 * Ian Craggs - fix for bug 464551 - enums and ints can be different size 00019 *******************************************************************************/ 00020 00021 #if !defined(MQTTCLIENT_H) 00022 #define MQTTCLIENT_H 00023 00024 #include "FP.h" 00025 #include "MQTTPacket.h" 00026 #include "stdio.h" 00027 #include "MQTTLogging.h" 00028 00029 #if !defined(MQTTCLIENT_QOS1) 00030 #define MQTTCLIENT_QOS1 1 00031 #endif 00032 #if !defined(MQTTCLIENT_QOS2) 00033 #define MQTTCLIENT_QOS2 0 00034 #endif 00035 00036 namespace MQTT 00037 { 00038 00039 00040 enum QoS { QOS0, QOS1, QOS2 }; 00041 00042 // all failure return codes must be negative 00043 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 }; 00044 00045 00046 struct Message 00047 { 00048 enum QoS qos; 00049 bool retained; 00050 bool dup; 00051 unsigned short id; 00052 void *payload; 00053 size_t payloadlen; 00054 }; 00055 00056 00057 struct MessageData 00058 { 00059 MessageData(MQTTString &aTopicName, struct Message &aMessage) : message(aMessage), topicName(aTopicName) 00060 { } 00061 00062 struct Message &message; 00063 MQTTString &topicName; 00064 }; 00065 00066 00067 class PacketId 00068 { 00069 public: 00070 PacketId() 00071 { 00072 next = 0; 00073 } 00074 00075 int getNext() 00076 { 00077 return next = (next == MAX_PACKET_ID) ? 1 : ++next; 00078 } 00079 00080 private: 00081 static const int MAX_PACKET_ID = 65535; 00082 int next; 00083 }; 00084 00085 00086 /** 00087 * @class Client 00088 * @brief blocking, non-threaded MQTT client API 00089 * 00090 * This version of the API blocks on all method calls, until they are complete. This means that only one 00091 * MQTT request can be in process at any one time. 00092 * @param Network a network class which supports send, receive 00093 * @param Timer a timer class with the methods: 00094 */ 00095 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5> 00096 class Client 00097 { 00098 00099 public: 00100 00101 typedef void (*messageHandler)(MessageData&); 00102 00103 /** Construct the client 00104 * @param network - pointer to an instance of the Network class - must be connected to the endpoint 00105 * before calling MQTT connect 00106 * @param limits an instance of the Limit class - to alter limits as required 00107 */ 00108 Client(Network& network, unsigned int command_timeout_ms = 30000); 00109 00110 /** Set the default message handling callback - used for any message which does not match a subscription message handler 00111 * @param mh - pointer to the callback function 00112 */ 00113 void setDefaultMessageHandler(messageHandler mh) 00114 { 00115 defaultMessageHandler.attach(mh); 00116 } 00117 00118 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00119 * The nework object must be connected to the network endpoint before calling this 00120 * Default connect options are used 00121 * @return success code - 00122 */ 00123 int connect(); 00124 00125 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00126 * The nework object must be connected to the network endpoint before calling this 00127 * @param options - connect options 00128 * @return success code - 00129 */ 00130 int connect(MQTTPacket_connectData& options); 00131 00132 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00133 * @param topic - the topic to publish to 00134 * @param message - the message to send 00135 * @return success code - 00136 */ 00137 int publish(const char* topicName, Message& message); 00138 00139 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00140 * @param topic - the topic to publish to 00141 * @param payload - the data to send 00142 * @param payloadlen - the length of the data 00143 * @param qos - the QoS to send the publish at 00144 * @param retained - whether the message should be retained 00145 * @return success code - 00146 */ 00147 int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false); 00148 00149 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00150 * @param topic - the topic to publish to 00151 * @param payload - the data to send 00152 * @param payloadlen - the length of the data 00153 * @param id - the packet id used - returned 00154 * @param qos - the QoS to send the publish at 00155 * @param retained - whether the message should be retained 00156 * @return success code - 00157 */ 00158 int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false); 00159 00160 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback 00161 * @param topicFilter - a topic pattern which can include wildcards 00162 * @param qos - the MQTT QoS to subscribe at 00163 * @param mh - the callback function to be invoked when a message is received for this subscription 00164 * @return success code - 00165 */ 00166 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh); 00167 00168 /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback 00169 * @param topicFilter - a topic pattern which can include wildcards 00170 * @return success code - 00171 */ 00172 int unsubscribe(const char* topicFilter); 00173 00174 /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state 00175 * @return success code - 00176 */ 00177 int disconnect(); 00178 00179 /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive 00180 * yield can be called if no other MQTT operation is needed. This will also allow messages to be 00181 * received. 00182 * @param timeout_ms the time to wait, in milliseconds 00183 * @return success code - on failure, this means the client has disconnected 00184 */ 00185 int yield(unsigned long timeout_ms = 1000L); 00186 00187 /** Is the client connected? 00188 * @return flag - is the client connected or not? 00189 */ 00190 bool isConnected() 00191 { 00192 return isconnected; 00193 } 00194 00195 private: 00196 00197 void cleanSession(); 00198 int cycle(Timer& timer); 00199 int waitfor(int packet_type, Timer& timer); 00200 int keepalive(); 00201 int publish(int len, Timer& timer, enum QoS qos); 00202 00203 int decodePacket(int* value, int timeout); 00204 int readPacket(Timer& timer); 00205 int sendPacket(int length, Timer& timer); 00206 int deliverMessage(MQTTString& topicName, Message& message); 00207 bool isTopicMatched(char* topicFilter, MQTTString& topicName); 00208 00209 Network& ipstack; 00210 unsigned long command_timeout_ms; 00211 00212 unsigned char sendbuf[MAX_MQTT_PACKET_SIZE]; 00213 unsigned char readbuf[MAX_MQTT_PACKET_SIZE]; 00214 00215 Timer last_sent, last_received; 00216 unsigned int keepAliveInterval; 00217 bool ping_outstanding; 00218 bool cleansession; 00219 00220 PacketId packetid; 00221 00222 struct MessageHandlers 00223 { 00224 const char* topicFilter; 00225 FP<void, MessageData&> fp; 00226 } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic 00227 00228 FP<void, MessageData&> defaultMessageHandler; 00229 00230 bool isconnected; 00231 00232 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00233 unsigned char pubbuf[MAX_MQTT_PACKET_SIZE]; // store the last publish for sending on reconnect 00234 int inflightLen; 00235 unsigned short inflightMsgid; 00236 enum QoS inflightQoS; 00237 #endif 00238 00239 #if MQTTCLIENT_QOS2 00240 bool pubrel; 00241 #if !defined(MAX_INCOMING_QOS2_MESSAGES) 00242 #define MAX_INCOMING_QOS2_MESSAGES 10 00243 #endif 00244 unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES]; 00245 bool isQoS2msgidFree(unsigned short id); 00246 bool useQoS2msgid(unsigned short id); 00247 void freeQoS2msgid(unsigned short id); 00248 #endif 00249 00250 }; 00251 00252 } 00253 00254 00255 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00256 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::cleanSession() 00257 { 00258 ping_outstanding = false; 00259 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00260 messageHandlers[i].topicFilter = 0; 00261 isconnected = false; 00262 00263 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00264 inflightMsgid = 0; 00265 inflightQoS = QOS0; 00266 #endif 00267 00268 #if MQTTCLIENT_QOS2 00269 pubrel = false; 00270 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00271 incomingQoS2messages[i] = 0; 00272 #endif 00273 } 00274 00275 00276 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00277 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms) : ipstack(network), packetid() 00278 { 00279 last_sent = Timer(); 00280 last_received = Timer(); 00281 this->command_timeout_ms = command_timeout_ms; 00282 cleanSession(); 00283 } 00284 00285 00286 #if MQTTCLIENT_QOS2 00287 template<class Network, class Timer, int a, int b> 00288 bool MQTT::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id) 00289 { 00290 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00291 { 00292 if (incomingQoS2messages[i] == id) 00293 return false; 00294 } 00295 return true; 00296 } 00297 00298 00299 template<class Network, class Timer, int a, int b> 00300 bool MQTT::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id) 00301 { 00302 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00303 { 00304 if (incomingQoS2messages[i] == 0) 00305 { 00306 incomingQoS2messages[i] = id; 00307 return true; 00308 } 00309 } 00310 return false; 00311 } 00312 00313 00314 template<class Network, class Timer, int a, int b> 00315 void MQTT::Client<Network, Timer, a, b>::freeQoS2msgid(unsigned short id) 00316 { 00317 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00318 { 00319 if (incomingQoS2messages[i] == id) 00320 { 00321 incomingQoS2messages[i] = 0; 00322 return; 00323 } 00324 } 00325 } 00326 #endif 00327 00328 00329 template<class Network, class Timer, int a, int b> 00330 int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer) 00331 { 00332 int rc = FAILURE, 00333 sent = 0; 00334 00335 while (sent < length && !timer.expired()) 00336 { 00337 rc = ipstack.write(&sendbuf[sent], length - sent, timer.left_ms()); 00338 if (rc < 0) // there was an error writing the data 00339 break; 00340 sent += rc; 00341 } 00342 if (sent == length) 00343 { 00344 if (this->keepAliveInterval > 0) 00345 last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet 00346 rc = SUCCESS; 00347 } 00348 else 00349 rc = FAILURE; 00350 00351 #if defined(MQTT_DEBUG) 00352 char printbuf[150]; 00353 DEBUG("Rc %d from sending packet %s\n", rc, MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length)); 00354 #endif 00355 return rc; 00356 } 00357 00358 00359 template<class Network, class Timer, int a, int b> 00360 int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout) 00361 { 00362 unsigned char c; 00363 int multiplier = 1; 00364 int len = 0; 00365 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; 00366 00367 *value = 0; 00368 do 00369 { 00370 int rc = MQTTPACKET_READ_ERROR; 00371 00372 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) 00373 { 00374 rc = MQTTPACKET_READ_ERROR; /* bad data */ 00375 goto exit; 00376 } 00377 rc = ipstack.read(&c, 1, timeout); 00378 if (rc != 1) 00379 goto exit; 00380 *value += (c & 127) * multiplier; 00381 multiplier *= 128; 00382 } while ((c & 128) != 0); 00383 exit: 00384 return len; 00385 } 00386 00387 00388 /** 00389 * If any read fails in this method, then we should disconnect from the network, as on reconnect 00390 * the packets can be retried. 00391 * @param timeout the max time to wait for the packet read to complete, in milliseconds 00392 * @return the MQTT packet type, or -1 if none 00393 */ 00394 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00395 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::readPacket(Timer& timer) 00396 { 00397 int rc = FAILURE; 00398 MQTTHeader header = {0}; 00399 int len = 0; 00400 int rem_len = 0; 00401 00402 /* 1. read the header byte. This has the packet type in it */ 00403 if (ipstack.read(readbuf, 1, timer.left_ms()) != 1) 00404 goto exit; 00405 00406 len = 1; 00407 /* 2. read the remaining length. This is variable in itself */ 00408 decodePacket(&rem_len, timer.left_ms()); 00409 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */ 00410 00411 if (rem_len > (MAX_MQTT_PACKET_SIZE - len)) 00412 { 00413 rc = BUFFER_OVERFLOW; 00414 goto exit; 00415 } 00416 00417 /* 3. read the rest of the buffer using a callback to supply the rest of the data */ 00418 if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len)) 00419 goto exit; 00420 00421 header.byte = readbuf[0]; 00422 rc = header.bits.type; 00423 if (this->keepAliveInterval > 0) 00424 last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet 00425 exit: 00426 00427 #if defined(MQTT_DEBUG) 00428 if (rc >= 0) 00429 { 00430 char printbuf[50]; 00431 DEBUG("Rc %d from receiving packet %s\n", rc, MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len)); 00432 } 00433 #endif 00434 return rc; 00435 } 00436 00437 00438 // assume topic filter and name is in correct format 00439 // # can only be at end 00440 // + and # can only be next to separator 00441 template<class Network, class Timer, int a, int b> 00442 bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName) 00443 { 00444 char* curf = topicFilter; 00445 char* curn = topicName.lenstring.data; 00446 char* curn_end = curn + topicName.lenstring.len; 00447 00448 while (*curf && curn < curn_end) 00449 { 00450 if (*curn == '/' && *curf != '/') 00451 break; 00452 if (*curf != '+' && *curf != '#' && *curf != *curn) 00453 break; 00454 if (*curf == '+') 00455 { // skip until we meet the next separator, or end of string 00456 char* nextpos = curn + 1; 00457 while (nextpos < curn_end && *nextpos != '/') 00458 nextpos = ++curn + 1; 00459 } 00460 else if (*curf == '#') 00461 curn = curn_end - 1; // skip until end of string 00462 curf++; 00463 curn++; 00464 }; 00465 00466 return (curn == curn_end) && (*curf == '\0'); 00467 } 00468 00469 00470 00471 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00472 int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message) 00473 { 00474 int rc = FAILURE; 00475 00476 // we have to find the right message handler - indexed by topic 00477 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00478 { 00479 if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) || 00480 isTopicMatched((char*)messageHandlers[i].topicFilter, topicName))) 00481 { 00482 if (messageHandlers[i].fp.attached()) 00483 { 00484 MessageData md(topicName, message); 00485 messageHandlers[i].fp(md); 00486 rc = SUCCESS; 00487 } 00488 } 00489 } 00490 00491 if (rc == FAILURE && defaultMessageHandler.attached()) 00492 { 00493 MessageData md(topicName, message); 00494 defaultMessageHandler(md); 00495 rc = SUCCESS; 00496 } 00497 00498 return rc; 00499 } 00500 00501 00502 00503 template<class Network, class Timer, int a, int b> 00504 int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms) 00505 { 00506 int rc = SUCCESS; 00507 Timer timer = Timer(); 00508 00509 timer.countdown_ms(timeout_ms); 00510 while (!timer.expired()) 00511 { 00512 if (cycle(timer) < 0) 00513 { 00514 rc = FAILURE; 00515 break; 00516 } 00517 } 00518 00519 return rc; 00520 } 00521 00522 00523 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00524 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer) 00525 { 00526 /* get one piece of work off the wire and one pass through */ 00527 00528 // read the socket, see what work is due 00529 int packet_type = readPacket(timer); 00530 00531 int len = 0, 00532 rc = SUCCESS; 00533 00534 switch (packet_type) 00535 { 00536 case FAILURE: 00537 case BUFFER_OVERFLOW: 00538 rc = packet_type; 00539 break; 00540 case CONNACK: 00541 case PUBACK: 00542 case SUBACK: 00543 break; 00544 case PUBLISH: 00545 { 00546 MQTTString topicName = MQTTString_initializer; 00547 Message msg; 00548 int intQoS; 00549 if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName, 00550 (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00551 goto exit; 00552 msg.qos = (enum QoS)intQoS; 00553 #if MQTTCLIENT_QOS2 00554 if (msg.qos != QOS2) 00555 #endif 00556 deliverMessage(topicName, msg); 00557 #if MQTTCLIENT_QOS2 00558 else if (isQoS2msgidFree(msg.id)) 00559 { 00560 if (useQoS2msgid(msg.id)) 00561 deliverMessage(topicName, msg); 00562 else 00563 WARN("Maximum number of incoming QoS2 messages exceeded"); 00564 } 00565 #endif 00566 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00567 if (msg.qos != QOS0) 00568 { 00569 if (msg.qos == QOS1) 00570 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id); 00571 else if (msg.qos == QOS2) 00572 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id); 00573 if (len <= 0) 00574 rc = FAILURE; 00575 else 00576 rc = sendPacket(len, timer); 00577 if (rc == FAILURE) 00578 goto exit; // there was a problem 00579 } 00580 break; 00581 #endif 00582 } 00583 #if MQTTCLIENT_QOS2 00584 case PUBREC: 00585 case PUBREL: 00586 unsigned short mypacketid; 00587 unsigned char dup, type; 00588 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00589 rc = FAILURE; 00590 else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, 00591 (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0) 00592 rc = FAILURE; 00593 else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet 00594 rc = FAILURE; // there was a problem 00595 if (rc == FAILURE) 00596 goto exit; // there was a problem 00597 if (packet_type == PUBREL) 00598 freeQoS2msgid(mypacketid); 00599 break; 00600 00601 case PUBCOMP: 00602 break; 00603 #endif 00604 case PINGRESP: 00605 ping_outstanding = false; 00606 break; 00607 } 00608 keepalive(); 00609 exit: 00610 if (rc == SUCCESS) 00611 rc = packet_type; 00612 return rc; 00613 } 00614 00615 00616 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00617 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive() 00618 { 00619 int rc = FAILURE; 00620 00621 if (keepAliveInterval == 0) 00622 { 00623 rc = SUCCESS; 00624 goto exit; 00625 } 00626 00627 if (last_sent.expired() || last_received.expired()) 00628 { 00629 if (!ping_outstanding) 00630 { 00631 Timer timer(1000); 00632 int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE); 00633 if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet 00634 ping_outstanding = true; 00635 } 00636 } 00637 00638 exit: 00639 return rc; 00640 } 00641 00642 00643 // only used in single-threaded mode where one command at a time is in process 00644 template<class Network, class Timer, int a, int b> 00645 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer) 00646 { 00647 int rc = FAILURE; 00648 00649 do 00650 { 00651 if (timer.expired()) 00652 break; // we timed out 00653 } 00654 while ((rc = cycle(timer)) != packet_type); 00655 00656 return rc; 00657 } 00658 00659 00660 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00661 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options) 00662 { 00663 Timer connect_timer(command_timeout_ms); 00664 int rc = FAILURE; 00665 int len = 0; 00666 00667 if (isconnected) // don't send connect packet again if we are already connected 00668 goto exit; 00669 00670 this->keepAliveInterval = options.keepAliveInterval; 00671 this->cleansession = options.cleansession; 00672 if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0) 00673 goto exit; 00674 if ((rc = sendPacket(len, connect_timer)) != SUCCESS) // send the connect packet 00675 goto exit; // there was a problem 00676 00677 if (this->keepAliveInterval > 0) 00678 last_received.countdown(this->keepAliveInterval); 00679 // this will be a blocking call, wait for the connack 00680 if (waitfor(CONNACK, connect_timer) == CONNACK) 00681 { 00682 unsigned char connack_rc = 255; 00683 bool sessionPresent = false; 00684 if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00685 rc = connack_rc; 00686 else 00687 rc = FAILURE; 00688 } 00689 else 00690 rc = FAILURE; 00691 00692 #if MQTTCLIENT_QOS2 00693 // resend any inflight publish 00694 if (inflightMsgid > 0 && inflightQoS == QOS2 && pubrel) 00695 { 00696 if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0) 00697 rc = FAILURE; 00698 else 00699 rc = publish(len, connect_timer, inflightQoS); 00700 } 00701 else 00702 #endif 00703 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00704 if (inflightMsgid > 0) 00705 { 00706 memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE); 00707 rc = publish(inflightLen, connect_timer, inflightQoS); 00708 } 00709 #endif 00710 00711 exit: 00712 if (rc == SUCCESS) 00713 isconnected = true; 00714 return rc; 00715 } 00716 00717 00718 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00719 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect() 00720 { 00721 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; 00722 return connect(default_options); 00723 } 00724 00725 00726 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00727 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler) 00728 { 00729 int rc = FAILURE; 00730 Timer timer(command_timeout_ms); 00731 int len = 0; 00732 MQTTString topic = {(char*)topicFilter, {0, 0}}; 00733 00734 if (!isconnected) 00735 goto exit; 00736 00737 len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); 00738 if (len <= 0) 00739 goto exit; 00740 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet 00741 goto exit; // there was a problem 00742 00743 if (waitfor(SUBACK, timer) == SUBACK) // wait for suback 00744 { 00745 int count = 0, grantedQoS = -1; 00746 unsigned short mypacketid; 00747 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00748 rc = grantedQoS; // 0, 1, 2 or 0x80 00749 if (rc != 0x80) 00750 { 00751 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00752 { 00753 if (messageHandlers[i].topicFilter == 0) 00754 { 00755 messageHandlers[i].topicFilter = topicFilter; 00756 messageHandlers[i].fp.attach(messageHandler); 00757 rc = 0; 00758 break; 00759 } 00760 } 00761 } 00762 } 00763 else 00764 rc = FAILURE; 00765 00766 exit: 00767 if (rc != SUCCESS) 00768 cleanSession(); 00769 return rc; 00770 } 00771 00772 00773 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00774 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter) 00775 { 00776 int rc = FAILURE; 00777 Timer timer(command_timeout_ms); 00778 MQTTString topic = {(char*)topicFilter, {0, 0}}; 00779 int len = 0; 00780 00781 if (!isconnected) 00782 goto exit; 00783 00784 if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0) 00785 goto exit; 00786 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet 00787 goto exit; // there was a problem 00788 00789 if (waitfor(UNSUBACK, timer) == UNSUBACK) 00790 { 00791 unsigned short mypacketid; // should be the same as the packetid above 00792 if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00793 { 00794 rc = 0; 00795 00796 // remove the subscription message handler associated with this topic, if there is one 00797 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00798 { 00799 if (messageHandlers[i].topicFilter && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0) 00800 { 00801 messageHandlers[i].topicFilter = 0; 00802 break; 00803 } 00804 } 00805 } 00806 } 00807 else 00808 rc = FAILURE; 00809 00810 exit: 00811 if (rc != SUCCESS) 00812 cleanSession(); 00813 return rc; 00814 } 00815 00816 00817 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00818 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos) 00819 { 00820 int rc; 00821 00822 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet 00823 goto exit; // there was a problem 00824 00825 #if MQTTCLIENT_QOS1 00826 if (qos == QOS1) 00827 { 00828 if (waitfor(PUBACK, timer) == PUBACK) 00829 { 00830 unsigned short mypacketid; 00831 unsigned char dup, type; 00832 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00833 rc = FAILURE; 00834 else if (inflightMsgid == mypacketid) 00835 inflightMsgid = 0; 00836 } 00837 else 00838 rc = FAILURE; 00839 } 00840 #elif MQTTCLIENT_QOS2 00841 else if (qos == QOS2) 00842 { 00843 if (waitfor(PUBCOMP, timer) == PUBCOMP) 00844 { 00845 unsigned short mypacketid; 00846 unsigned char dup, type; 00847 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00848 rc = FAILURE; 00849 else if (inflightMsgid == mypacketid) 00850 inflightMsgid = 0; 00851 } 00852 else 00853 rc = FAILURE; 00854 } 00855 #endif 00856 00857 exit: 00858 if (rc != SUCCESS) 00859 cleanSession(); 00860 return rc; 00861 } 00862 00863 00864 00865 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00866 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained) 00867 { 00868 int rc = FAILURE; 00869 Timer timer(command_timeout_ms); 00870 MQTTString topicString = MQTTString_initializer; 00871 int len = 0; 00872 00873 if (!isconnected) 00874 goto exit; 00875 00876 topicString.cstring = (char*)topicName; 00877 00878 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00879 if (qos == QOS1 || qos == QOS2) 00880 id = packetid.getNext(); 00881 #endif 00882 00883 len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id, 00884 topicString, (unsigned char*)payload, payloadlen); 00885 if (len <= 0) 00886 goto exit; 00887 00888 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00889 if (!cleansession) 00890 { 00891 memcpy(pubbuf, sendbuf, len); 00892 inflightMsgid = id; 00893 inflightLen = len; 00894 inflightQoS = qos; 00895 #if MQTTCLIENT_QOS2 00896 pubrel = false; 00897 #endif 00898 } 00899 #endif 00900 00901 rc = publish(len, timer, qos); 00902 exit: 00903 return rc; 00904 } 00905 00906 00907 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00908 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained) 00909 { 00910 unsigned short id = 0; // dummy - not used for anything 00911 return publish(topicName, payload, payloadlen, id, qos, retained); 00912 } 00913 00914 00915 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00916 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message) 00917 { 00918 return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained); 00919 } 00920 00921 00922 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00923 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect() 00924 { 00925 int rc = FAILURE; 00926 Timer timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete 00927 int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE); 00928 if (len > 0) 00929 rc = sendPacket(len, timer); // send the disconnect packet 00930 00931 if (cleansession) 00932 cleanSession(); 00933 else 00934 isconnected = false; 00935 return rc; 00936 } 00937 00938 00939 #endif
Generated on Tue Jul 12 2022 15:36:17 by
1.7.2
