Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
MQTTClient.h
00001 /******************************************************************************* 00002 * Copyright (c) 2014, 2017 IBM Corp. 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Ian Craggs - initial API and implementation and/or initial documentation 00015 * Ian Craggs - fix for bug 458512 - QoS 2 messages 00016 * Ian Craggs - fix for bug 460389 - send loop uses wrong length 00017 * Ian Craggs - fix for bug 464169 - clearing subscriptions 00018 * Ian Craggs - fix for bug 464551 - enums and ints can be different size 00019 * Mark Sonnentag - fix for bug 475204 - inefficient instantiation of Timer 00020 * Ian Craggs - fix for bug 475749 - packetid modified twice 00021 * Ian Craggs - add ability to set message handler separately #6 00022 *******************************************************************************/ 00023 00024 #if !defined(MQTTCLIENT_H) 00025 #define MQTTCLIENT_H 00026 00027 #include "MQTTPacket.h" 00028 #include <stdio.h> 00029 #include "MQTTLogging.h" 00030 00031 #if !defined(MQTTCLIENT_QOS1) 00032 #define MQTTCLIENT_QOS1 1 00033 #endif 00034 #if !defined(MQTTCLIENT_QOS2) 00035 #define MQTTCLIENT_QOS2 0 00036 #endif 00037 00038 namespace MQTT 00039 { 00040 00041 00042 enum QoS { QOS0, QOS1, QOS2 }; 00043 00044 // all failure return codes must be negative 00045 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 }; 00046 00047 00048 struct Message 00049 { 00050 enum QoS qos; 00051 bool retained; 00052 bool dup; 00053 unsigned short id; 00054 void *payload; 00055 size_t payloadlen; 00056 }; 00057 00058 00059 struct MessageData 00060 { 00061 MessageData(MQTTString &aTopicName, struct Message &aMessage) : message(aMessage), topicName(aTopicName) 00062 { } 00063 00064 struct Message &message; 00065 MQTTString &topicName; 00066 }; 00067 00068 00069 struct connackData 00070 { 00071 int rc; 00072 bool sessionPresent; 00073 }; 00074 00075 00076 struct subackData 00077 { 00078 int grantedQoS; 00079 }; 00080 00081 00082 class PacketId 00083 { 00084 public: 00085 PacketId() 00086 { 00087 next = 0; 00088 } 00089 00090 int getNext() 00091 { 00092 return next = (next == MAX_PACKET_ID) ? 1 : next + 1; 00093 } 00094 00095 private: 00096 static const int MAX_PACKET_ID = 65535; 00097 int next; 00098 }; 00099 00100 00101 /** 00102 * @class Client 00103 * @brief blocking, non-threaded MQTT client API 00104 * 00105 * This version of the API blocks on all method calls, until they are complete. This means that only one 00106 * MQTT request can be in process at any one time. 00107 * @param Network a network class which supports send, receive 00108 * @param Timer a timer class with the methods: 00109 */ 00110 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5> 00111 class Client 00112 { 00113 00114 public: 00115 00116 typedef void (*messageHandler)(MessageData&); 00117 00118 /** Construct the client 00119 * @param network - pointer to an instance of the Network class - must be connected to the endpoint 00120 * before calling MQTT connect 00121 * @param limits an instance of the Limit class - to alter limits as required 00122 */ 00123 Client(Network& network, unsigned int command_timeout_ms = 30000); 00124 00125 /** Set the default message handling callback - used for any message which does not match a subscription message handler 00126 * @param mh - pointer to the callback function. Set to 0 to remove. 00127 */ 00128 void setDefaultMessageHandler(messageHandler mh) 00129 { 00130 if (mh != 0) 00131 defaultMessageHandler=mh; 00132 else 00133 defaultMessageHandler=NULL; 00134 } 00135 00136 /** Set a message handling callback. This can be used outside of the the subscribe method. 00137 * @param topicFilter - a topic pattern which can include wildcards 00138 * @param mh - pointer to the callback function. If 0, removes the callback if any 00139 */ 00140 int setMessageHandler(const char* topicFilter, messageHandler mh); 00141 00142 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00143 * The nework object must be connected to the network endpoint before calling this 00144 * Default connect options are used 00145 * @return success code - 00146 */ 00147 int connect(); 00148 00149 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00150 * The nework object must be connected to the network endpoint before calling this 00151 * @param options - connect options 00152 * @return success code - 00153 */ 00154 int connect(MQTTPacket_connectData& options); 00155 00156 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00157 * The nework object must be connected to the network endpoint before calling this 00158 * @param options - connect options 00159 * @param connackData - connack data to be returned 00160 * @return success code - 00161 */ 00162 int connect(MQTTPacket_connectData& options, connackData& data); 00163 00164 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00165 * @param topic - the topic to publish to 00166 * @param message - the message to send 00167 * @return success code - 00168 */ 00169 int publish(const char* topicName, Message& message); 00170 00171 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00172 * @param topic - the topic to publish to 00173 * @param payload - the data to send 00174 * @param payloadlen - the length of the data 00175 * @param qos - the QoS to send the publish at 00176 * @param retained - whether the message should be retained 00177 * @return success code - 00178 */ 00179 int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false); 00180 00181 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00182 * @param topic - the topic to publish to 00183 * @param payload - the data to send 00184 * @param payloadlen - the length of the data 00185 * @param id - the packet id used - returned 00186 * @param qos - the QoS to send the publish at 00187 * @param retained - whether the message should be retained 00188 * @return success code - 00189 */ 00190 int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false); 00191 00192 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback 00193 * @param topicFilter - a topic pattern which can include wildcards 00194 * @param qos - the MQTT QoS to subscribe at 00195 * @param mh - the callback function to be invoked when a message is received for this subscription 00196 * @return success code - 00197 */ 00198 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh); 00199 00200 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback 00201 * @param topicFilter - a topic pattern which can include wildcards 00202 * @param qos - the MQTT QoS to subscribe at© 00203 * @param mh - the callback function to be invoked when a message is received for this subscription 00204 * @param 00205 * @return success code - 00206 */ 00207 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh, subackData &data); 00208 00209 /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback 00210 * @param topicFilter - a topic pattern which can include wildcards 00211 * @return success code - 00212 */ 00213 int unsubscribe(const char* topicFilter); 00214 00215 /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state 00216 * @return success code - 00217 */ 00218 int disconnect(); 00219 00220 /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive 00221 * yield can be called if no other MQTT operation is needed. This will also allow messages to be 00222 * received. 00223 * @param timeout_ms the time to wait, in milliseconds 00224 * @return success code - on failure, this means the client has disconnected 00225 */ 00226 int yield(unsigned long timeout_ms = 1000L); 00227 00228 /** Is the client connected? 00229 * @return flag - is the client connected or not? 00230 */ 00231 bool isConnected() 00232 { 00233 return isconnected; 00234 } 00235 00236 private: 00237 00238 void closeSession(); 00239 void cleanSession(); 00240 int cycle(Timer& timer); 00241 int waitfor(int packet_type, Timer& timer); 00242 int keepalive(); 00243 int publish(int len, Timer& timer, enum QoS qos); 00244 00245 int decodePacket(int* value, int timeout); 00246 int readPacket(Timer& timer); 00247 int sendPacket(int length, Timer& timer); 00248 int deliverMessage(MQTTString& topicName, Message& message); 00249 bool isTopicMatched(char* topicFilter, MQTTString& topicName); 00250 00251 Network& ipstack; 00252 unsigned long command_timeout_ms; 00253 00254 unsigned char sendbuf[MAX_MQTT_PACKET_SIZE]; 00255 unsigned char readbuf[MAX_MQTT_PACKET_SIZE]; 00256 00257 Timer last_sent, last_received; 00258 unsigned int keepAliveInterval; 00259 bool ping_outstanding; 00260 bool cleansession; 00261 00262 PacketId packetid; 00263 00264 struct MessageHandlers 00265 { 00266 const char* topicFilter; 00267 Callback<void( MessageData&)> fp; 00268 } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic 00269 00270 Callback<void(MessageData&)> defaultMessageHandler; 00271 00272 bool isconnected; 00273 00274 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00275 unsigned char pubbuf[MAX_MQTT_PACKET_SIZE]; // store the last publish for sending on reconnect 00276 int inflightLen; 00277 unsigned short inflightMsgid; 00278 enum QoS inflightQoS; 00279 #endif 00280 00281 #if MQTTCLIENT_QOS2 00282 bool pubrel; 00283 #if !defined(MAX_INCOMING_QOS2_MESSAGES) 00284 #define MAX_INCOMING_QOS2_MESSAGES 10 00285 #endif 00286 unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES]; 00287 bool isQoS2msgidFree(unsigned short id); 00288 bool useQoS2msgid(unsigned short id); 00289 void freeQoS2msgid(unsigned short id); 00290 #endif 00291 00292 }; 00293 00294 } 00295 00296 00297 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00298 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::cleanSession() 00299 { 00300 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00301 messageHandlers[i].topicFilter = 0; 00302 00303 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00304 inflightMsgid = 0; 00305 inflightQoS = QOS0; 00306 #endif 00307 00308 #if MQTTCLIENT_QOS2 00309 pubrel = false; 00310 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00311 incomingQoS2messages[i] = 0; 00312 #endif 00313 } 00314 00315 00316 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00317 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::closeSession() 00318 { 00319 ping_outstanding = false; 00320 isconnected = false; 00321 if (cleansession) 00322 cleanSession(); 00323 } 00324 00325 00326 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00327 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms) : ipstack(network), packetid() 00328 { 00329 this->command_timeout_ms = command_timeout_ms; 00330 cleansession = true; 00331 closeSession(); 00332 } 00333 00334 00335 #if MQTTCLIENT_QOS2 00336 template<class Network, class Timer, int a, int b> 00337 bool MQTT::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id) 00338 { 00339 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00340 { 00341 if (incomingQoS2messages[i] == id) 00342 return false; 00343 } 00344 return true; 00345 } 00346 00347 00348 template<class Network, class Timer, int a, int b> 00349 bool MQTT::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id) 00350 { 00351 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00352 { 00353 if (incomingQoS2messages[i] == 0) 00354 { 00355 incomingQoS2messages[i] = id; 00356 return true; 00357 } 00358 } 00359 return false; 00360 } 00361 00362 00363 template<class Network, class Timer, int a, int b> 00364 void MQTT::Client<Network, Timer, a, b>::freeQoS2msgid(unsigned short id) 00365 { 00366 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00367 { 00368 if (incomingQoS2messages[i] == id) 00369 { 00370 incomingQoS2messages[i] = 0; 00371 return; 00372 } 00373 } 00374 } 00375 #endif 00376 00377 00378 template<class Network, class Timer, int a, int b> 00379 int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer) 00380 { 00381 int rc = FAILURE, 00382 sent = 0; 00383 00384 while (sent < length) 00385 { 00386 rc = ipstack.write(&sendbuf[sent], length - sent, timer.left_ms()); 00387 if (rc < 0) // there was an error writing the data 00388 break; 00389 sent += rc; 00390 if (timer.expired()) // only check expiry after at least one attempt to write 00391 break; 00392 } 00393 if (sent == length) 00394 { 00395 if (this->keepAliveInterval > 0) 00396 last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet 00397 rc = SUCCESS; 00398 } 00399 else 00400 rc = FAILURE; 00401 00402 #if defined(MQTT_DEBUG) 00403 char printbuf[150]; 00404 DEBUG("Rc %d from sending packet %s\r\n", rc, 00405 MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length)); 00406 #endif 00407 return rc; 00408 } 00409 00410 00411 template<class Network, class Timer, int a, int b> 00412 int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout) 00413 { 00414 unsigned char c; 00415 int multiplier = 1; 00416 int len = 0; 00417 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; 00418 00419 *value = 0; 00420 do 00421 { 00422 int rc = MQTTPACKET_READ_ERROR; 00423 00424 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) 00425 { 00426 rc = MQTTPACKET_READ_ERROR; /* bad data */ 00427 goto exit; 00428 } 00429 rc = ipstack.read(&c, 1, timeout); 00430 if (rc != 1) 00431 goto exit; 00432 *value += (c & 127) * multiplier; 00433 multiplier *= 128; 00434 } while ((c & 128) != 0); 00435 exit: 00436 return len; 00437 } 00438 00439 00440 /** 00441 * If any read fails in this method, then we should disconnect from the network, as on reconnect 00442 * the packets can be retried. 00443 * @param timeout the max time to wait for the packet read to complete, in milliseconds 00444 * @return the MQTT packet type, 0 if none, -1 if error 00445 */ 00446 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00447 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::readPacket(Timer& timer) 00448 { 00449 int rc = FAILURE; 00450 MQTTHeader header = {0}; 00451 int len = 0; 00452 int rem_len = 0; 00453 00454 /* 1. read the header byte. This has the packet type in it */ 00455 rc = ipstack.read(readbuf, 1, timer.left_ms()); 00456 if (rc != 1) 00457 goto exit; 00458 00459 len = 1; 00460 /* 2. read the remaining length. This is variable in itself */ 00461 decodePacket(&rem_len, timer.left_ms()); 00462 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */ 00463 00464 if (rem_len > (MAX_MQTT_PACKET_SIZE - len)) 00465 { 00466 rc = BUFFER_OVERFLOW; 00467 goto exit; 00468 } 00469 00470 /* 3. read the rest of the buffer using a callback to supply the rest of the data */ 00471 if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len)) 00472 goto exit; 00473 00474 header.byte = readbuf[0]; 00475 rc = header.bits.type; 00476 if (this->keepAliveInterval > 0) 00477 last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet 00478 exit: 00479 00480 #if defined(MQTT_DEBUG) 00481 if (rc >= 0) 00482 { 00483 char printbuf[50]; 00484 DEBUG("Rc %d receiving packet %s\r\n", rc, 00485 MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len)); 00486 } 00487 #endif 00488 return rc; 00489 } 00490 00491 00492 // assume topic filter and name is in correct format 00493 // # can only be at end 00494 // + and # can only be next to separator 00495 template<class Network, class Timer, int a, int b> 00496 bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName) 00497 { 00498 char* curf = topicFilter; 00499 char* curn = topicName.lenstring.data; 00500 char* curn_end = curn + topicName.lenstring.len; 00501 00502 while (*curf && curn < curn_end) 00503 { 00504 if (*curn == '/' && *curf != '/') 00505 break; 00506 if (*curf != '+' && *curf != '#' && *curf != *curn) 00507 break; 00508 if (*curf == '+') 00509 { // skip until we meet the next separator, or end of string 00510 char* nextpos = curn + 1; 00511 while (nextpos < curn_end && *nextpos != '/') 00512 nextpos = ++curn + 1; 00513 } 00514 else if (*curf == '#') 00515 curn = curn_end - 1; // skip until end of string 00516 curf++; 00517 curn++; 00518 }; 00519 00520 return (curn == curn_end) && (*curf == '\0'); 00521 } 00522 00523 00524 00525 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00526 int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message) 00527 { 00528 int rc = FAILURE; 00529 00530 // we have to find the right message handler - indexed by topic 00531 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00532 { 00533 if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) || 00534 isTopicMatched((char*)messageHandlers[i].topicFilter, topicName))) 00535 { 00536 if (messageHandlers[i].fp) //test to see if it is attached 00537 { 00538 MessageData md(topicName, message); 00539 messageHandlers[i].fp(md); 00540 rc = SUCCESS; 00541 } 00542 } 00543 } 00544 00545 if (rc == FAILURE && defaultMessageHandler ) 00546 { 00547 MessageData md(topicName, message); 00548 defaultMessageHandler(md); 00549 rc = SUCCESS; 00550 } 00551 00552 return rc; 00553 } 00554 00555 00556 00557 template<class Network, class Timer, int a, int b> 00558 int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms) 00559 { 00560 int rc = SUCCESS; 00561 Timer timer; 00562 00563 timer.countdown_ms(timeout_ms); 00564 while (!timer.expired()) 00565 { 00566 if (cycle(timer) < 0) 00567 { 00568 rc = FAILURE; 00569 break; 00570 } 00571 } 00572 00573 return rc; 00574 } 00575 00576 00577 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00578 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer) 00579 { 00580 // get one piece of work off the wire and one pass through 00581 int len = 0, 00582 rc = SUCCESS; 00583 00584 int packet_type = readPacket(timer); // read the socket, see what work is due 00585 00586 switch (packet_type) 00587 { 00588 default: 00589 // no more data to read, unrecoverable. Or read packet fails due to unexpected network error 00590 rc = packet_type; 00591 goto exit; 00592 case 0: // timed out reading packet 00593 break; 00594 case CONNACK: 00595 case PUBACK: 00596 case SUBACK: 00597 break; 00598 case PUBLISH: 00599 { 00600 MQTTString topicName = MQTTString_initializer; 00601 Message msg; 00602 int intQoS; 00603 msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */ 00604 if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName, 00605 (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00606 goto exit; 00607 msg.qos = (enum QoS)intQoS; 00608 #if MQTTCLIENT_QOS2 00609 if (msg.qos != QOS2) 00610 #endif 00611 deliverMessage(topicName, msg); 00612 #if MQTTCLIENT_QOS2 00613 else if (isQoS2msgidFree(msg.id)) 00614 { 00615 if (useQoS2msgid(msg.id)) 00616 deliverMessage(topicName, msg); 00617 else 00618 WARN("Maximum number of incoming QoS2 messages exceeded"); 00619 } 00620 #endif 00621 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00622 if (msg.qos != QOS0) 00623 { 00624 if (msg.qos == QOS1) 00625 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id); 00626 else if (msg.qos == QOS2) 00627 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id); 00628 if (len <= 0) 00629 rc = FAILURE; 00630 else 00631 rc = sendPacket(len, timer); 00632 if (rc == FAILURE) 00633 goto exit; // there was a problem 00634 } 00635 break; 00636 #endif 00637 } 00638 #if MQTTCLIENT_QOS2 00639 case PUBREC: 00640 case PUBREL: 00641 unsigned short mypacketid; 00642 unsigned char dup, type; 00643 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00644 rc = FAILURE; 00645 else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, 00646 (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0) 00647 rc = FAILURE; 00648 else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet 00649 rc = FAILURE; // there was a problem 00650 if (rc == FAILURE) 00651 goto exit; // there was a problem 00652 if (packet_type == PUBREL) 00653 freeQoS2msgid(mypacketid); 00654 break; 00655 00656 case PUBCOMP: 00657 break; 00658 #endif 00659 case PINGRESP: 00660 ping_outstanding = false; 00661 break; 00662 } 00663 00664 if (keepalive() != SUCCESS) 00665 //check only keepalive FAILURE status so that previous FAILURE status can be considered as FAULT 00666 rc = FAILURE; 00667 00668 exit: 00669 if (rc == SUCCESS) 00670 rc = packet_type; 00671 else if (isconnected) 00672 closeSession(); 00673 return rc; 00674 } 00675 00676 00677 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00678 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive() 00679 { 00680 int rc = SUCCESS; 00681 static Timer ping_sent; 00682 00683 if (keepAliveInterval == 0) 00684 goto exit; 00685 00686 if (ping_outstanding) 00687 { 00688 if (ping_sent.expired()) 00689 { 00690 rc = FAILURE; // session failure 00691 #if defined(MQTT_DEBUG) 00692 DEBUG("PINGRESP not received in keepalive interval\r\n"); 00693 #endif 00694 } 00695 } 00696 else if (last_sent.expired() || last_received.expired()) 00697 { 00698 Timer timer(1000); 00699 int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE); 00700 if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet 00701 { 00702 ping_outstanding = true; 00703 ping_sent.countdown(this->keepAliveInterval); 00704 } 00705 } 00706 exit: 00707 return rc; 00708 } 00709 00710 00711 // only used in single-threaded mode where one command at a time is in process 00712 template<class Network, class Timer, int a, int b> 00713 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer) 00714 { 00715 int rc = FAILURE; 00716 00717 do 00718 { 00719 if (timer.expired()) 00720 break; // we timed out 00721 rc = cycle(timer); 00722 } 00723 while (rc != packet_type && rc >= 0); 00724 00725 return rc; 00726 } 00727 00728 00729 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00730 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options, connackData& data) 00731 { 00732 Timer connect_timer(command_timeout_ms); 00733 int rc = FAILURE; 00734 int len = 0; 00735 00736 if (isconnected) // don't send connect packet again if we are already connected 00737 goto exit; 00738 00739 this->keepAliveInterval = options.keepAliveInterval; 00740 this->cleansession = options.cleansession; 00741 if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0) 00742 goto exit; 00743 if ((rc = sendPacket(len, connect_timer)) != SUCCESS) // send the connect packet 00744 goto exit; // there was a problem 00745 00746 if (this->keepAliveInterval > 0) 00747 last_received.countdown(this->keepAliveInterval); 00748 // this will be a blocking call, wait for the connack 00749 if (waitfor(CONNACK, connect_timer) == CONNACK) 00750 { 00751 data.rc = 0; 00752 data.sessionPresent = false; 00753 if (MQTTDeserialize_connack((unsigned char*)&data.sessionPresent, 00754 (unsigned char*)&data.rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00755 rc = data.rc; 00756 else 00757 rc = FAILURE; 00758 } 00759 else 00760 rc = FAILURE; 00761 00762 #if MQTTCLIENT_QOS2 00763 // resend any inflight publish 00764 if (inflightMsgid > 0 && inflightQoS == QOS2 && pubrel) 00765 { 00766 if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0) 00767 rc = FAILURE; 00768 else 00769 rc = publish(len, connect_timer, inflightQoS); 00770 } 00771 else 00772 #endif 00773 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00774 if (inflightMsgid > 0) 00775 { 00776 memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE); 00777 rc = publish(inflightLen, connect_timer, inflightQoS); 00778 } 00779 #endif 00780 00781 exit: 00782 if (rc == SUCCESS) 00783 { 00784 isconnected = true; 00785 ping_outstanding = false; 00786 } 00787 return rc; 00788 } 00789 00790 00791 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00792 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options) 00793 { 00794 connackData data; 00795 return connect(options, data); 00796 } 00797 00798 00799 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00800 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect() 00801 { 00802 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; 00803 return connect(default_options); 00804 } 00805 00806 00807 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00808 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::setMessageHandler(const char* topicFilter, messageHandler messageHandler) 00809 { 00810 int rc = FAILURE; 00811 int i = -1; 00812 00813 // first check for an existing matching slot 00814 for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00815 { 00816 if (messageHandlers[i].topicFilter != 0 && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0) 00817 { 00818 if (messageHandler == 0) // remove existing 00819 { 00820 messageHandlers[i].topicFilter = 0; 00821 messageHandlers[i].fp.detach(); 00822 } 00823 rc = SUCCESS; // return i when adding new subscription 00824 break; 00825 } 00826 } 00827 // if no existing, look for empty slot (unless we are removing) 00828 if (messageHandler != 0) { 00829 if (rc == FAILURE) 00830 { 00831 for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00832 { 00833 if (messageHandlers[i].topicFilter == 0) 00834 { 00835 rc = SUCCESS; 00836 break; 00837 } 00838 } 00839 } 00840 if (i < MAX_MESSAGE_HANDLERS) 00841 { 00842 messageHandlers[i].topicFilter = topicFilter; 00843 messageHandlers[i].fp.attach(messageHandler); 00844 } 00845 } 00846 return rc; 00847 } 00848 00849 00850 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00851 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, 00852 enum QoS qos, messageHandler messageHandler, subackData& data) 00853 { 00854 int rc = FAILURE; 00855 Timer timer(command_timeout_ms); 00856 int len = 0; 00857 MQTTString topic = {(char*)topicFilter, {0, 0}}; 00858 00859 if (!isconnected) 00860 goto exit; 00861 00862 len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); 00863 if (len <= 0) 00864 goto exit; 00865 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet 00866 goto exit; // there was a problem 00867 00868 if (waitfor(SUBACK, timer) == SUBACK) // wait for suback 00869 { 00870 int count = 0; 00871 unsigned short mypacketid; 00872 data.grantedQoS = 0; 00873 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &data.grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00874 { 00875 if (data.grantedQoS != 0x80) 00876 rc = setMessageHandler(topicFilter, messageHandler); 00877 } 00878 } 00879 else 00880 rc = FAILURE; 00881 00882 exit: 00883 if (rc == FAILURE) 00884 closeSession(); 00885 return rc; 00886 } 00887 00888 00889 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00890 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler) 00891 { 00892 subackData data; 00893 return subscribe(topicFilter, qos, messageHandler, data); 00894 } 00895 00896 00897 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00898 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter) 00899 { 00900 int rc = FAILURE; 00901 Timer timer(command_timeout_ms); 00902 MQTTString topic = {(char*)topicFilter, {0, 0}}; 00903 int len = 0; 00904 00905 if (!isconnected) 00906 goto exit; 00907 00908 if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0) 00909 goto exit; 00910 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet 00911 goto exit; // there was a problem 00912 00913 if (waitfor(UNSUBACK, timer) == UNSUBACK) 00914 { 00915 unsigned short mypacketid; // should be the same as the packetid above 00916 if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00917 { 00918 // remove the subscription message handler associated with this topic, if there is one 00919 setMessageHandler(topicFilter, 0); 00920 } 00921 } 00922 else 00923 rc = FAILURE; 00924 00925 exit: 00926 if (rc != SUCCESS) 00927 closeSession(); 00928 return rc; 00929 } 00930 00931 00932 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00933 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos) 00934 { 00935 int rc; 00936 00937 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet 00938 goto exit; // there was a problem 00939 00940 #if MQTTCLIENT_QOS1 00941 if (qos == QOS1) 00942 { 00943 if (waitfor(PUBACK, timer) == PUBACK) 00944 { 00945 unsigned short mypacketid; 00946 unsigned char dup, type; 00947 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00948 rc = FAILURE; 00949 else if (inflightMsgid == mypacketid) 00950 inflightMsgid = 0; 00951 } 00952 else 00953 rc = FAILURE; 00954 } 00955 #endif 00956 #if MQTTCLIENT_QOS2 00957 else if (qos == QOS2) 00958 { 00959 if (waitfor(PUBCOMP, timer) == PUBCOMP) 00960 { 00961 unsigned short mypacketid; 00962 unsigned char dup, type; 00963 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00964 rc = FAILURE; 00965 else if (inflightMsgid == mypacketid) 00966 inflightMsgid = 0; 00967 } 00968 else 00969 rc = FAILURE; 00970 } 00971 #endif 00972 00973 exit: 00974 if (rc != SUCCESS) 00975 closeSession(); 00976 return rc; 00977 } 00978 00979 00980 00981 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00982 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained) 00983 { 00984 int rc = FAILURE; 00985 Timer timer(command_timeout_ms); 00986 MQTTString topicString = MQTTString_initializer; 00987 int len = 0; 00988 00989 if (!isconnected) 00990 goto exit; 00991 00992 topicString.cstring = (char*)topicName; 00993 00994 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00995 if (qos == QOS1 || qos == QOS2) 00996 id = packetid.getNext(); 00997 #endif 00998 00999 len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id, 01000 topicString, (unsigned char*)payload, payloadlen); 01001 if (len <= 0) 01002 goto exit; 01003 01004 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 01005 if (!cleansession) 01006 { 01007 memcpy(pubbuf, sendbuf, len); 01008 inflightMsgid = id; 01009 inflightLen = len; 01010 inflightQoS = qos; 01011 #if MQTTCLIENT_QOS2 01012 pubrel = false; 01013 #endif 01014 } 01015 #endif 01016 01017 rc = publish(len, timer, qos); 01018 exit: 01019 return rc; 01020 } 01021 01022 01023 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 01024 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained) 01025 { 01026 unsigned short id = 0; // dummy - not used for anything 01027 return publish(topicName, payload, payloadlen, id, qos, retained); 01028 } 01029 01030 01031 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 01032 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message) 01033 { 01034 return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained); 01035 } 01036 01037 01038 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 01039 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect() 01040 { 01041 int rc = FAILURE; 01042 Timer timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete 01043 int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE); 01044 if (len > 0) 01045 rc = sendPacket(len, timer); // send the disconnect packet 01046 closeSession(); 01047 return rc; 01048 } 01049 01050 #endif
Generated on Mon Jul 18 2022 07:40:17 by
1.7.2