Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
MQTTClient.h
00001 /******************************************************************************* 00002 * Copyright (c) 2014, 2017 IBM Corp. 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Ian Craggs - initial API and implementation and/or initial documentation 00015 * Ian Craggs - fix for bug 458512 - QoS 2 messages 00016 * Ian Craggs - fix for bug 460389 - send loop uses wrong length 00017 * Ian Craggs - fix for bug 464169 - clearing subscriptions 00018 * Ian Craggs - fix for bug 464551 - enums and ints can be different size 00019 * Mark Sonnentag - fix for bug 475204 - inefficient instantiation of Timer 00020 * Ian Craggs - fix for bug 475749 - packetid modified twice 00021 * Ian Craggs - add ability to set message handler separately #6 00022 *******************************************************************************/ 00023 00024 #if !defined(MQTTCLIENT_H) 00025 #define MQTTCLIENT_H 00026 00027 #include "FP.h" 00028 #include "MQTTPacket.h" 00029 #include <stdio.h> 00030 #include "MQTTLogging.h" 00031 00032 #if !defined(MQTTCLIENT_QOS1) 00033 #define MQTTCLIENT_QOS1 1 00034 #endif 00035 #if !defined(MQTTCLIENT_QOS2) 00036 #define MQTTCLIENT_QOS2 0 00037 #endif 00038 00039 namespace MQTT 00040 { 00041 00042 00043 enum QoS { QOS0, QOS1, QOS2 }; 00044 00045 // all failure return codes must be negative 00046 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 }; 00047 00048 00049 struct Message 00050 { 00051 enum QoS qos; 00052 bool retained; 00053 bool dup; 00054 unsigned short id; 00055 void *payload; 00056 size_t payloadlen; 00057 }; 00058 00059 00060 struct MessageData 00061 { 00062 MessageData(MQTTString &aTopicName, struct Message &aMessage) : message(aMessage), topicName(aTopicName) 00063 { } 00064 00065 struct Message &message; 00066 MQTTString &topicName; 00067 }; 00068 00069 00070 struct connackData 00071 { 00072 int rc; 00073 bool sessionPresent; 00074 }; 00075 00076 00077 struct subackData 00078 { 00079 int grantedQoS; 00080 }; 00081 00082 00083 class PacketId 00084 { 00085 public: 00086 PacketId() 00087 { 00088 next = 0; 00089 } 00090 00091 int getNext() 00092 { 00093 return next = (next == MAX_PACKET_ID) ? 1 : next + 1; 00094 } 00095 00096 private: 00097 static const int MAX_PACKET_ID = 65535; 00098 int next; 00099 }; 00100 00101 00102 /** 00103 * @class Client 00104 * @brief blocking, non-threaded MQTT client API 00105 * 00106 * This version of the API blocks on all method calls, until they are complete. This means that only one 00107 * MQTT request can be in process at any one time. 00108 * @param Network a network class which supports send, receive 00109 * @param Timer a timer class with the methods: 00110 */ 00111 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5> 00112 class Client 00113 { 00114 00115 public: 00116 00117 typedef void (*messageHandler)(MessageData&); 00118 00119 /** Construct the client 00120 * @param network - pointer to an instance of the Network class - must be connected to the endpoint 00121 * before calling MQTT connect 00122 * @param limits an instance of the Limit class - to alter limits as required 00123 */ 00124 Client(Network& network, unsigned int command_timeout_ms = 30000); 00125 00126 /** Set the default message handling callback - used for any message which does not match a subscription message handler 00127 * @param mh - pointer to the callback function. Set to 0 to remove. 00128 */ 00129 void setDefaultMessageHandler(messageHandler mh) 00130 { 00131 if (mh != 0) 00132 defaultMessageHandler.attach(mh); 00133 else 00134 defaultMessageHandler.detach(); 00135 } 00136 00137 /** Set a message handling callback. This can be used outside of the the subscribe method. 00138 * @param topicFilter - a topic pattern which can include wildcards 00139 * @param mh - pointer to the callback function. If 0, removes the callback if any 00140 */ 00141 int setMessageHandler(const char* topicFilter, messageHandler mh); 00142 00143 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00144 * The nework object must be connected to the network endpoint before calling this 00145 * Default connect options are used 00146 * @return success code - 00147 */ 00148 int connect(); 00149 00150 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00151 * The nework object must be connected to the network endpoint before calling this 00152 * @param options - connect options 00153 * @return success code - 00154 */ 00155 int connect(MQTTPacket_connectData& options); 00156 00157 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00158 * The nework object must be connected to the network endpoint before calling this 00159 * @param options - connect options 00160 * @param connackData - connack data to be returned 00161 * @return success code - 00162 */ 00163 int connect(MQTTPacket_connectData& options, connackData& data); 00164 00165 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00166 * @param topic - the topic to publish to 00167 * @param message - the message to send 00168 * @return success code - 00169 */ 00170 int publish(const char* topicName, Message& message); 00171 00172 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00173 * @param topic - the topic to publish to 00174 * @param payload - the data to send 00175 * @param payloadlen - the length of the data 00176 * @param qos - the QoS to send the publish at 00177 * @param retained - whether the message should be retained 00178 * @return success code - 00179 */ 00180 int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false); 00181 00182 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00183 * @param topic - the topic to publish to 00184 * @param payload - the data to send 00185 * @param payloadlen - the length of the data 00186 * @param id - the packet id used - returned 00187 * @param qos - the QoS to send the publish at 00188 * @param retained - whether the message should be retained 00189 * @return success code - 00190 */ 00191 int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false); 00192 00193 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback 00194 * @param topicFilter - a topic pattern which can include wildcards 00195 * @param qos - the MQTT QoS to subscribe at 00196 * @param mh - the callback function to be invoked when a message is received for this subscription 00197 * @return success code - 00198 */ 00199 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh); 00200 00201 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback 00202 * @param topicFilter - a topic pattern which can include wildcards 00203 * @param qos - the MQTT QoS to subscribe at© 00204 * @param mh - the callback function to be invoked when a message is received for this subscription 00205 * @param 00206 * @return success code - 00207 */ 00208 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh, subackData &data); 00209 00210 /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback 00211 * @param topicFilter - a topic pattern which can include wildcards 00212 * @return success code - 00213 */ 00214 int unsubscribe(const char* topicFilter); 00215 00216 /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state 00217 * @return success code - 00218 */ 00219 int disconnect(); 00220 00221 /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive 00222 * yield can be called if no other MQTT operation is needed. This will also allow messages to be 00223 * received. 00224 * @param timeout_ms the time to wait, in milliseconds 00225 * @return success code - on failure, this means the client has disconnected 00226 */ 00227 int yield(unsigned long timeout_ms = 1000L); 00228 00229 /** Is the client connected? 00230 * @return flag - is the client connected or not? 00231 */ 00232 bool isConnected() 00233 { 00234 return isconnected; 00235 } 00236 00237 private: 00238 00239 void closeSession(); 00240 void cleanSession(); 00241 int cycle(Timer& timer); 00242 int waitfor(int packet_type, Timer& timer); 00243 int keepalive(); 00244 int publish(int len, Timer& timer, enum QoS qos); 00245 00246 int decodePacket(int* value, int timeout); 00247 int readPacket(Timer& timer); 00248 int sendPacket(int length, Timer& timer); 00249 int deliverMessage(MQTTString& topicName, Message& message); 00250 bool isTopicMatched(char* topicFilter, MQTTString& topicName); 00251 00252 Network& ipstack; 00253 unsigned long command_timeout_ms; 00254 00255 unsigned char sendbuf[MAX_MQTT_PACKET_SIZE]; 00256 unsigned char readbuf[MAX_MQTT_PACKET_SIZE]; 00257 00258 Timer last_sent, last_received; 00259 unsigned int keepAliveInterval; 00260 bool ping_outstanding; 00261 bool cleansession; 00262 00263 PacketId packetid; 00264 00265 struct MessageHandlers 00266 { 00267 const char* topicFilter; 00268 FP<void, MessageData&> fp; 00269 } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic 00270 00271 FP<void, MessageData&> defaultMessageHandler; 00272 00273 bool isconnected; 00274 00275 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00276 unsigned char pubbuf[MAX_MQTT_PACKET_SIZE]; // store the last publish for sending on reconnect 00277 int inflightLen; 00278 unsigned short inflightMsgid; 00279 enum QoS inflightQoS; 00280 #endif 00281 00282 #if MQTTCLIENT_QOS2 00283 bool pubrel; 00284 #if !defined(MAX_INCOMING_QOS2_MESSAGES) 00285 #define MAX_INCOMING_QOS2_MESSAGES 10 00286 #endif 00287 unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES]; 00288 bool isQoS2msgidFree(unsigned short id); 00289 bool useQoS2msgid(unsigned short id); 00290 void freeQoS2msgid(unsigned short id); 00291 #endif 00292 00293 }; 00294 00295 } 00296 00297 00298 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00299 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::cleanSession() 00300 { 00301 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00302 messageHandlers[i].topicFilter = 0; 00303 00304 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00305 inflightMsgid = 0; 00306 inflightQoS = QOS0; 00307 #endif 00308 00309 #if MQTTCLIENT_QOS2 00310 pubrel = false; 00311 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00312 incomingQoS2messages[i] = 0; 00313 #endif 00314 } 00315 00316 00317 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00318 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::closeSession() 00319 { 00320 ping_outstanding = false; 00321 isconnected = false; 00322 if (cleansession) 00323 cleanSession(); 00324 } 00325 00326 00327 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00328 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms) : ipstack(network), packetid() 00329 { 00330 this->command_timeout_ms = command_timeout_ms; 00331 cleansession = true; 00332 closeSession(); 00333 } 00334 00335 00336 #if MQTTCLIENT_QOS2 00337 template<class Network, class Timer, int a, int b> 00338 bool MQTT::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id) 00339 { 00340 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00341 { 00342 if (incomingQoS2messages[i] == id) 00343 return false; 00344 } 00345 return true; 00346 } 00347 00348 00349 template<class Network, class Timer, int a, int b> 00350 bool MQTT::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id) 00351 { 00352 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00353 { 00354 if (incomingQoS2messages[i] == 0) 00355 { 00356 incomingQoS2messages[i] = id; 00357 return true; 00358 } 00359 } 00360 return false; 00361 } 00362 00363 00364 template<class Network, class Timer, int a, int b> 00365 void MQTT::Client<Network, Timer, a, b>::freeQoS2msgid(unsigned short id) 00366 { 00367 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00368 { 00369 if (incomingQoS2messages[i] == id) 00370 { 00371 incomingQoS2messages[i] = 0; 00372 return; 00373 } 00374 } 00375 } 00376 #endif 00377 00378 00379 template<class Network, class Timer, int a, int b> 00380 int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer) 00381 { 00382 int rc = FAILURE, 00383 sent = 0; 00384 00385 while (sent < length) 00386 { 00387 rc = ipstack.write(&sendbuf[sent], length - sent, timer.left_ms()); 00388 if (rc < 0) // there was an error writing the data 00389 break; 00390 sent += rc; 00391 if (timer.expired()) // only check expiry after at least one attempt to write 00392 break; 00393 } 00394 if (sent == length) 00395 { 00396 if (this->keepAliveInterval > 0) 00397 last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet 00398 rc = SUCCESS; 00399 } 00400 else 00401 rc = FAILURE; 00402 00403 #if defined(MQTT_DEBUG) 00404 char printbuf[150]; 00405 DEBUG("Rc %d from sending packet %s\r\n", rc, 00406 MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length)); 00407 #endif 00408 return rc; 00409 } 00410 00411 00412 template<class Network, class Timer, int a, int b> 00413 int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout) 00414 { 00415 unsigned char c; 00416 int multiplier = 1; 00417 int len = 0; 00418 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; 00419 00420 *value = 0; 00421 do 00422 { 00423 int rc = MQTTPACKET_READ_ERROR; 00424 00425 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) 00426 { 00427 rc = MQTTPACKET_READ_ERROR; /* bad data */ 00428 goto exit; 00429 } 00430 rc = ipstack.read(&c, 1, timeout); 00431 if (rc != 1) 00432 goto exit; 00433 *value += (c & 127) * multiplier; 00434 multiplier *= 128; 00435 } while ((c & 128) != 0); 00436 exit: 00437 return len; 00438 } 00439 00440 00441 /** 00442 * If any read fails in this method, then we should disconnect from the network, as on reconnect 00443 * the packets can be retried. 00444 * @param timeout the max time to wait for the packet read to complete, in milliseconds 00445 * @return the MQTT packet type, 0 if none, -1 if error 00446 */ 00447 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00448 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::readPacket(Timer& timer) 00449 { 00450 int rc = FAILURE; 00451 MQTTHeader header = {0}; 00452 int len = 0; 00453 int rem_len = 0; 00454 00455 /* 1. read the header byte. This has the packet type in it */ 00456 rc = ipstack.read(readbuf, 1, timer.left_ms()); 00457 if (rc != 1) 00458 goto exit; 00459 00460 len = 1; 00461 /* 2. read the remaining length. This is variable in itself */ 00462 decodePacket(&rem_len, timer.left_ms()); 00463 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */ 00464 00465 if (rem_len > (MAX_MQTT_PACKET_SIZE - len)) 00466 { 00467 rc = BUFFER_OVERFLOW; 00468 goto exit; 00469 } 00470 00471 /* 3. read the rest of the buffer using a callback to supply the rest of the data */ 00472 if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len)) 00473 goto exit; 00474 00475 header.byte = readbuf[0]; 00476 rc = header.bits.type; 00477 if (this->keepAliveInterval > 0) 00478 last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet 00479 exit: 00480 00481 #if defined(MQTT_DEBUG) 00482 if (rc >= 0) 00483 { 00484 char printbuf[50]; 00485 DEBUG("Rc %d receiving packet %s\r\n", rc, 00486 MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len)); 00487 } 00488 #endif 00489 return rc; 00490 } 00491 00492 00493 // assume topic filter and name is in correct format 00494 // # can only be at end 00495 // + and # can only be next to separator 00496 template<class Network, class Timer, int a, int b> 00497 bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName) 00498 { 00499 char* curf = topicFilter; 00500 char* curn = topicName.lenstring.data; 00501 char* curn_end = curn + topicName.lenstring.len; 00502 00503 while (*curf && curn < curn_end) 00504 { 00505 if (*curn == '/' && *curf != '/') 00506 break; 00507 if (*curf != '+' && *curf != '#' && *curf != *curn) 00508 break; 00509 if (*curf == '+') 00510 { // skip until we meet the next separator, or end of string 00511 char* nextpos = curn + 1; 00512 while (nextpos < curn_end && *nextpos != '/') 00513 nextpos = ++curn + 1; 00514 } 00515 else if (*curf == '#') 00516 curn = curn_end - 1; // skip until end of string 00517 curf++; 00518 curn++; 00519 }; 00520 00521 return (curn == curn_end) && (*curf == '\0'); 00522 } 00523 00524 00525 00526 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00527 int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message) 00528 { 00529 int rc = FAILURE; 00530 00531 // we have to find the right message handler - indexed by topic 00532 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00533 { 00534 if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) || 00535 isTopicMatched((char*)messageHandlers[i].topicFilter, topicName))) 00536 { 00537 if (messageHandlers[i].fp.attached()) 00538 { 00539 MessageData md(topicName, message); 00540 messageHandlers[i].fp(md); 00541 rc = SUCCESS; 00542 } 00543 } 00544 } 00545 00546 if (rc == FAILURE && defaultMessageHandler.attached()) 00547 { 00548 MessageData md(topicName, message); 00549 defaultMessageHandler(md); 00550 rc = SUCCESS; 00551 } 00552 00553 return rc; 00554 } 00555 00556 00557 00558 template<class Network, class Timer, int a, int b> 00559 int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms) 00560 { 00561 int rc = SUCCESS; 00562 Timer timer; 00563 00564 timer.countdown_ms(timeout_ms); 00565 while (!timer.expired()) 00566 { 00567 if (cycle(timer) < 0) 00568 { 00569 rc = FAILURE; 00570 break; 00571 } 00572 } 00573 00574 return rc; 00575 } 00576 00577 00578 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00579 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer) 00580 { 00581 // get one piece of work off the wire and one pass through 00582 int len = 0, 00583 rc = SUCCESS; 00584 00585 int packet_type = readPacket(timer); // read the socket, see what work is due 00586 00587 switch (packet_type) 00588 { 00589 default: 00590 // no more data to read, unrecoverable. Or read packet fails due to unexpected network error 00591 rc = packet_type; 00592 goto exit; 00593 case 0: // timed out reading packet 00594 break; 00595 case CONNACK: 00596 case PUBACK: 00597 case SUBACK: 00598 break; 00599 case PUBLISH: 00600 { 00601 MQTTString topicName = MQTTString_initializer; 00602 Message msg; 00603 int intQoS; 00604 msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */ 00605 if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName, 00606 (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00607 goto exit; 00608 msg.qos = (enum QoS)intQoS; 00609 #if MQTTCLIENT_QOS2 00610 if (msg.qos != QOS2) 00611 #endif 00612 deliverMessage(topicName, msg); 00613 #if MQTTCLIENT_QOS2 00614 else if (isQoS2msgidFree(msg.id)) 00615 { 00616 if (useQoS2msgid(msg.id)) 00617 deliverMessage(topicName, msg); 00618 else 00619 WARN("Maximum number of incoming QoS2 messages exceeded"); 00620 } 00621 #endif 00622 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00623 if (msg.qos != QOS0) 00624 { 00625 if (msg.qos == QOS1) 00626 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id); 00627 else if (msg.qos == QOS2) 00628 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id); 00629 if (len <= 0) 00630 rc = FAILURE; 00631 else 00632 rc = sendPacket(len, timer); 00633 if (rc == FAILURE) 00634 goto exit; // there was a problem 00635 } 00636 break; 00637 #endif 00638 } 00639 #if MQTTCLIENT_QOS2 00640 case PUBREC: 00641 case PUBREL: 00642 unsigned short mypacketid; 00643 unsigned char dup, type; 00644 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00645 rc = FAILURE; 00646 else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, 00647 (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0) 00648 rc = FAILURE; 00649 else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet 00650 rc = FAILURE; // there was a problem 00651 if (rc == FAILURE) 00652 goto exit; // there was a problem 00653 if (packet_type == PUBREL) 00654 freeQoS2msgid(mypacketid); 00655 break; 00656 00657 case PUBCOMP: 00658 break; 00659 #endif 00660 case PINGRESP: 00661 ping_outstanding = false; 00662 break; 00663 } 00664 00665 if (keepalive() != SUCCESS) 00666 //check only keepalive FAILURE status so that previous FAILURE status can be considered as FAULT 00667 rc = FAILURE; 00668 00669 exit: 00670 if (rc == SUCCESS) 00671 rc = packet_type; 00672 else if (isconnected) 00673 closeSession(); 00674 return rc; 00675 } 00676 00677 00678 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00679 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive() 00680 { 00681 int rc = SUCCESS; 00682 static Timer ping_sent; 00683 00684 if (keepAliveInterval == 0) 00685 goto exit; 00686 00687 if (ping_outstanding) 00688 { 00689 if (ping_sent.expired()) 00690 { 00691 rc = FAILURE; // session failure 00692 #if defined(MQTT_DEBUG) 00693 DEBUG("PINGRESP not received in keepalive interval\r\n"); 00694 #endif 00695 } 00696 } 00697 else if (last_sent.expired() || last_received.expired()) 00698 { 00699 Timer timer(1000); 00700 int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE); 00701 if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet 00702 { 00703 ping_outstanding = true; 00704 ping_sent.countdown(this->keepAliveInterval); 00705 } 00706 } 00707 exit: 00708 return rc; 00709 } 00710 00711 00712 // only used in single-threaded mode where one command at a time is in process 00713 template<class Network, class Timer, int a, int b> 00714 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer) 00715 { 00716 int rc = FAILURE; 00717 00718 do 00719 { 00720 if (timer.expired()) 00721 break; // we timed out 00722 rc = cycle(timer); 00723 } 00724 while (rc != packet_type && rc >= 0); 00725 00726 return rc; 00727 } 00728 00729 00730 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00731 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options, connackData& data) 00732 { 00733 Timer connect_timer(command_timeout_ms); 00734 int rc = FAILURE; 00735 int len = 0; 00736 00737 if (isconnected) // don't send connect packet again if we are already connected 00738 goto exit; 00739 00740 this->keepAliveInterval = options.keepAliveInterval; 00741 this->cleansession = options.cleansession; 00742 if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0) 00743 goto exit; 00744 if ((rc = sendPacket(len, connect_timer)) != SUCCESS) // send the connect packet 00745 goto exit; // there was a problem 00746 00747 if (this->keepAliveInterval > 0) 00748 last_received.countdown(this->keepAliveInterval); 00749 // this will be a blocking call, wait for the connack 00750 if (waitfor(CONNACK, connect_timer) == CONNACK) 00751 { 00752 data.rc = 0; 00753 data.sessionPresent = false; 00754 if (MQTTDeserialize_connack((unsigned char*)&data.sessionPresent, 00755 (unsigned char*)&data.rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00756 rc = data.rc; 00757 else 00758 rc = FAILURE; 00759 } 00760 else 00761 rc = FAILURE; 00762 00763 #if MQTTCLIENT_QOS2 00764 // resend any inflight publish 00765 if (inflightMsgid > 0 && inflightQoS == QOS2 && pubrel) 00766 { 00767 if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0) 00768 rc = FAILURE; 00769 else 00770 rc = publish(len, connect_timer, inflightQoS); 00771 } 00772 else 00773 #endif 00774 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00775 if (inflightMsgid > 0) 00776 { 00777 memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE); 00778 rc = publish(inflightLen, connect_timer, inflightQoS); 00779 } 00780 #endif 00781 00782 exit: 00783 if (rc == SUCCESS) 00784 { 00785 isconnected = true; 00786 ping_outstanding = false; 00787 } 00788 return rc; 00789 } 00790 00791 00792 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00793 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options) 00794 { 00795 connackData data; 00796 return connect(options, data); 00797 } 00798 00799 00800 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00801 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect() 00802 { 00803 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; 00804 return connect(default_options); 00805 } 00806 00807 00808 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00809 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::setMessageHandler(const char* topicFilter, messageHandler messageHandler) 00810 { 00811 int rc = FAILURE; 00812 int i = -1; 00813 00814 // first check for an existing matching slot 00815 for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00816 { 00817 if (messageHandlers[i].topicFilter != 0 && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0) 00818 { 00819 if (messageHandler == 0) // remove existing 00820 { 00821 messageHandlers[i].topicFilter = 0; 00822 messageHandlers[i].fp.detach(); 00823 } 00824 rc = SUCCESS; // return i when adding new subscription 00825 break; 00826 } 00827 } 00828 // if no existing, look for empty slot (unless we are removing) 00829 if (messageHandler != 0) { 00830 if (rc == FAILURE) 00831 { 00832 for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00833 { 00834 if (messageHandlers[i].topicFilter == 0) 00835 { 00836 rc = SUCCESS; 00837 break; 00838 } 00839 } 00840 } 00841 if (i < MAX_MESSAGE_HANDLERS) 00842 { 00843 messageHandlers[i].topicFilter = topicFilter; 00844 messageHandlers[i].fp.attach(messageHandler); 00845 } 00846 } 00847 return rc; 00848 } 00849 00850 00851 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00852 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, 00853 enum QoS qos, messageHandler messageHandler, subackData& data) 00854 { 00855 int rc = FAILURE; 00856 Timer timer(command_timeout_ms); 00857 int len = 0; 00858 MQTTString topic = {(char*)topicFilter, {0, 0}}; 00859 00860 if (!isconnected) 00861 goto exit; 00862 00863 len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); 00864 if (len <= 0) 00865 goto exit; 00866 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet 00867 goto exit; // there was a problem 00868 00869 if (waitfor(SUBACK, timer) == SUBACK) // wait for suback 00870 { 00871 int count = 0; 00872 unsigned short mypacketid; 00873 data.grantedQoS = 0; 00874 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &data.grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00875 { 00876 if (data.grantedQoS != 0x80) 00877 rc = setMessageHandler(topicFilter, messageHandler); 00878 } 00879 } 00880 else 00881 rc = FAILURE; 00882 00883 exit: 00884 if (rc == FAILURE) 00885 closeSession(); 00886 return rc; 00887 } 00888 00889 00890 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00891 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler) 00892 { 00893 subackData data; 00894 return subscribe(topicFilter, qos, messageHandler, data); 00895 } 00896 00897 00898 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00899 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter) 00900 { 00901 int rc = FAILURE; 00902 Timer timer(command_timeout_ms); 00903 MQTTString topic = {(char*)topicFilter, {0, 0}}; 00904 int len = 0; 00905 00906 if (!isconnected) 00907 goto exit; 00908 00909 if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0) 00910 goto exit; 00911 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet 00912 goto exit; // there was a problem 00913 00914 if (waitfor(UNSUBACK, timer) == UNSUBACK) 00915 { 00916 unsigned short mypacketid; // should be the same as the packetid above 00917 if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00918 { 00919 // remove the subscription message handler associated with this topic, if there is one 00920 setMessageHandler(topicFilter, 0); 00921 } 00922 } 00923 else 00924 rc = FAILURE; 00925 00926 exit: 00927 if (rc != SUCCESS) 00928 closeSession(); 00929 return rc; 00930 } 00931 00932 00933 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00934 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos) 00935 { 00936 int rc; 00937 00938 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet 00939 goto exit; // there was a problem 00940 00941 #if MQTTCLIENT_QOS1 00942 if (qos == QOS1) 00943 { 00944 if (waitfor(PUBACK, timer) == PUBACK) 00945 { 00946 unsigned short mypacketid; 00947 unsigned char dup, type; 00948 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00949 rc = FAILURE; 00950 else if (inflightMsgid == mypacketid) 00951 inflightMsgid = 0; 00952 } 00953 else 00954 rc = FAILURE; 00955 } 00956 #endif 00957 #if MQTTCLIENT_QOS2 00958 else if (qos == QOS2) 00959 { 00960 if (waitfor(PUBCOMP, timer) == PUBCOMP) 00961 { 00962 unsigned short mypacketid; 00963 unsigned char dup, type; 00964 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00965 rc = FAILURE; 00966 else if (inflightMsgid == mypacketid) 00967 inflightMsgid = 0; 00968 } 00969 else 00970 rc = FAILURE; 00971 } 00972 #endif 00973 00974 exit: 00975 if (rc != SUCCESS) 00976 closeSession(); 00977 return rc; 00978 } 00979 00980 00981 00982 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00983 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained) 00984 { 00985 int rc = FAILURE; 00986 Timer timer(command_timeout_ms); 00987 MQTTString topicString = MQTTString_initializer; 00988 int len = 0; 00989 00990 if (!isconnected) 00991 goto exit; 00992 00993 topicString.cstring = (char*)topicName; 00994 00995 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00996 if (qos == QOS1 || qos == QOS2) 00997 id = packetid.getNext(); 00998 #endif 00999 01000 len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id, 01001 topicString, (unsigned char*)payload, payloadlen); 01002 if (len <= 0) 01003 goto exit; 01004 01005 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 01006 if (!cleansession) 01007 { 01008 memcpy(pubbuf, sendbuf, len); 01009 inflightMsgid = id; 01010 inflightLen = len; 01011 inflightQoS = qos; 01012 #if MQTTCLIENT_QOS2 01013 pubrel = false; 01014 #endif 01015 } 01016 #endif 01017 01018 rc = publish(len, timer, qos); 01019 exit: 01020 return rc; 01021 } 01022 01023 01024 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 01025 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained) 01026 { 01027 unsigned short id = 0; // dummy - not used for anything 01028 return publish(topicName, payload, payloadlen, id, qos, retained); 01029 } 01030 01031 01032 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 01033 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message) 01034 { 01035 return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained); 01036 } 01037 01038 01039 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 01040 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect() 01041 { 01042 int rc = FAILURE; 01043 Timer timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete 01044 int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE); 01045 if (len > 0) 01046 rc = sendPacket(len, timer); // send the disconnect packet 01047 closeSession(); 01048 return rc; 01049 } 01050 01051 #endif
Generated on Tue Jul 12 2022 23:30:50 by
1.7.2