Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
MQTTClient.h
00001 /******************************************************************************* 00002 * Copyright (c) 2014, 2017 IBM Corp. 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Ian Craggs - initial API and implementation and/or initial documentation 00015 * Ian Craggs - fix for bug 458512 - QoS 2 messages 00016 * Ian Craggs - fix for bug 460389 - send loop uses wrong length 00017 * Ian Craggs - fix for bug 464169 - clearing subscriptions 00018 * Ian Craggs - fix for bug 464551 - enums and ints can be different size 00019 * Mark Sonnentag - fix for bug 475204 - inefficient instantiation of Timer 00020 * Ian Craggs - fix for bug 475749 - packetid modified twice 00021 * Ian Craggs - add ability to set message handler separately #6 00022 *******************************************************************************/ 00023 00024 #if !defined(MQTTCLIENT_H) 00025 #define MQTTCLIENT_H 00026 00027 #include "FP.h" 00028 #include "MQTTPacket.h" 00029 #include <stdio.h> 00030 #include <string.h> 00031 #include "MQTTLogging.h" 00032 00033 #if !defined(MQTTCLIENT_QOS1) 00034 #define MQTTCLIENT_QOS1 1 00035 #endif 00036 #if !defined(MQTTCLIENT_QOS2) 00037 #define MQTTCLIENT_QOS2 0 00038 #endif 00039 00040 namespace MQTT 00041 { 00042 00043 00044 enum QoS { QOS0, QOS1, QOS2 }; 00045 00046 // all failure return codes must be negative 00047 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 }; 00048 00049 00050 struct Message 00051 { 00052 enum QoS qos; 00053 bool retained; 00054 bool dup; 00055 unsigned short id; 00056 void *payload; 00057 size_t payloadlen; 00058 }; 00059 00060 00061 struct MessageData 00062 { 00063 MessageData(MQTTString &aTopicName, struct Message &aMessage) : message(aMessage), topicName(aTopicName) 00064 { } 00065 00066 struct Message &message; 00067 MQTTString &topicName; 00068 }; 00069 00070 00071 struct connackData 00072 { 00073 int rc; 00074 bool sessionPresent; 00075 }; 00076 00077 00078 struct subackData 00079 { 00080 int grantedQoS; 00081 }; 00082 00083 00084 class PacketId 00085 { 00086 public: 00087 PacketId() 00088 { 00089 next = 0; 00090 } 00091 00092 int getNext() 00093 { 00094 return next = (next == MAX_PACKET_ID) ? 1 : next + 1; 00095 } 00096 00097 private: 00098 static const int MAX_PACKET_ID = 65535; 00099 int next; 00100 }; 00101 00102 00103 /** 00104 * @class Client 00105 * @brief blocking, non-threaded MQTT client API 00106 * 00107 * This version of the API blocks on all method calls, until they are complete. This means that only one 00108 * MQTT request can be in process at any one time. 00109 * @param Network a network class which supports send, receive 00110 * @param Timer a timer class with the methods: 00111 */ 00112 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5> 00113 class Client 00114 { 00115 00116 public: 00117 00118 typedef void (*messageHandler)(MessageData&); 00119 00120 /** Construct the client 00121 * @param network - pointer to an instance of the Network class - must be connected to the endpoint 00122 * before calling MQTT connect 00123 * @param limits an instance of the Limit class - to alter limits as required 00124 */ 00125 Client(Network& network, unsigned int command_timeout_ms = 30000); 00126 00127 /** Set the default message handling callback - used for any message which does not match a subscription message handler 00128 * @param mh - pointer to the callback function. Set to 0 to remove. 00129 */ 00130 void setDefaultMessageHandler(messageHandler mh) 00131 { 00132 if (mh != 0) 00133 defaultMessageHandler.attach(mh); 00134 else 00135 defaultMessageHandler.detach(); 00136 } 00137 00138 /** Set a message handling callback. This can be used outside of the the subscribe method. 00139 * @param topicFilter - a topic pattern which can include wildcards 00140 * @param mh - pointer to the callback function. If 0, removes the callback if any 00141 */ 00142 int setMessageHandler(const char* topicFilter, messageHandler mh); 00143 00144 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00145 * The nework object must be connected to the network endpoint before calling this 00146 * Default connect options are used 00147 * @return success code - 00148 */ 00149 int connect(); 00150 00151 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00152 * The nework object must be connected to the network endpoint before calling this 00153 * @param options - connect options 00154 * @return success code - 00155 */ 00156 int connect(MQTTPacket_connectData& options); 00157 00158 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00159 * The nework object must be connected to the network endpoint before calling this 00160 * @param options - connect options 00161 * @param connackData - connack data to be returned 00162 * @return success code - 00163 */ 00164 int connect(MQTTPacket_connectData& options, connackData& data); 00165 00166 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00167 * @param topic - the topic to publish to 00168 * @param message - the message to send 00169 * @return success code - 00170 */ 00171 int publish(const char* topicName, Message& message); 00172 00173 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00174 * @param topic - the topic to publish to 00175 * @param payload - the data to send 00176 * @param payloadlen - the length of the data 00177 * @param qos - the QoS to send the publish at 00178 * @param retained - whether the message should be retained 00179 * @return success code - 00180 */ 00181 int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false); 00182 00183 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00184 * @param topic - the topic to publish to 00185 * @param payload - the data to send 00186 * @param payloadlen - the length of the data 00187 * @param id - the packet id used - returned 00188 * @param qos - the QoS to send the publish at 00189 * @param retained - whether the message should be retained 00190 * @return success code - 00191 */ 00192 int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false); 00193 00194 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback 00195 * @param topicFilter - a topic pattern which can include wildcards 00196 * @param qos - the MQTT QoS to subscribe at 00197 * @param mh - the callback function to be invoked when a message is received for this subscription 00198 * @return success code - 00199 */ 00200 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh); 00201 00202 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback 00203 * @param topicFilter - a topic pattern which can include wildcards 00204 * @param qos - the MQTT QoS to subscribe at© 00205 * @param mh - the callback function to be invoked when a message is received for this subscription 00206 * @param 00207 * @return success code - 00208 */ 00209 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh, subackData &data); 00210 00211 /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback 00212 * @param topicFilter - a topic pattern which can include wildcards 00213 * @return success code - 00214 */ 00215 int unsubscribe(const char* topicFilter); 00216 00217 /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state 00218 * @return success code - 00219 */ 00220 int disconnect(); 00221 00222 /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive 00223 * yield can be called if no other MQTT operation is needed. This will also allow messages to be 00224 * received. 00225 * @param timeout_ms the time to wait, in milliseconds 00226 * @return success code - on failure, this means the client has disconnected 00227 */ 00228 int yield(unsigned long timeout_ms = 1000L); 00229 00230 /** Is the client connected? 00231 * @return flag - is the client connected or not? 00232 */ 00233 bool isConnected() 00234 { 00235 return isconnected; 00236 } 00237 00238 private: 00239 00240 void closeSession(); 00241 void cleanSession(); 00242 int cycle(Timer& timer); 00243 int waitfor(int packet_type, Timer& timer); 00244 int keepalive(); 00245 int publish(int len, Timer& timer, enum QoS qos); 00246 00247 int decodePacket(int* value, int timeout); 00248 int readPacket(Timer& timer); 00249 int sendPacket(int length, Timer& timer); 00250 int deliverMessage(MQTTString& topicName, Message& message); 00251 bool isTopicMatched(char* topicFilter, MQTTString& topicName); 00252 00253 Network& ipstack; 00254 unsigned long command_timeout_ms; 00255 00256 unsigned char sendbuf[MAX_MQTT_PACKET_SIZE]; 00257 unsigned char readbuf[MAX_MQTT_PACKET_SIZE]; 00258 00259 Timer last_sent, last_received; 00260 unsigned int keepAliveInterval; 00261 bool ping_outstanding; 00262 bool cleansession; 00263 00264 PacketId packetid; 00265 00266 struct MessageHandlers 00267 { 00268 const char* topicFilter; 00269 FP<void, MessageData&> fp; 00270 } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic 00271 00272 FP<void, MessageData&> defaultMessageHandler; 00273 00274 bool isconnected; 00275 00276 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00277 unsigned char pubbuf[MAX_MQTT_PACKET_SIZE]; // store the last publish for sending on reconnect 00278 int inflightLen; 00279 unsigned short inflightMsgid; 00280 enum QoS inflightQoS; 00281 #endif 00282 00283 #if MQTTCLIENT_QOS2 00284 bool pubrel; 00285 #if !defined(MAX_INCOMING_QOS2_MESSAGES) 00286 #define MAX_INCOMING_QOS2_MESSAGES 10 00287 #endif 00288 unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES]; 00289 bool isQoS2msgidFree(unsigned short id); 00290 bool useQoS2msgid(unsigned short id); 00291 void freeQoS2msgid(unsigned short id); 00292 #endif 00293 00294 }; 00295 00296 } 00297 00298 00299 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00300 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::cleanSession() 00301 { 00302 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00303 messageHandlers[i].topicFilter = 0; 00304 00305 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00306 inflightMsgid = 0; 00307 inflightQoS = QOS0; 00308 #endif 00309 00310 #if MQTTCLIENT_QOS2 00311 pubrel = false; 00312 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00313 incomingQoS2messages[i] = 0; 00314 #endif 00315 } 00316 00317 00318 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00319 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::closeSession() 00320 { 00321 ping_outstanding = false; 00322 isconnected = false; 00323 if (cleansession) 00324 cleanSession(); 00325 } 00326 00327 00328 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00329 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms) : ipstack(network), packetid() 00330 { 00331 this->command_timeout_ms = command_timeout_ms; 00332 cleansession = true; 00333 closeSession(); 00334 } 00335 00336 00337 #if MQTTCLIENT_QOS2 00338 template<class Network, class Timer, int a, int b> 00339 bool MQTT::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id) 00340 { 00341 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00342 { 00343 if (incomingQoS2messages[i] == id) 00344 return false; 00345 } 00346 return true; 00347 } 00348 00349 00350 template<class Network, class Timer, int a, int b> 00351 bool MQTT::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id) 00352 { 00353 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00354 { 00355 if (incomingQoS2messages[i] == 0) 00356 { 00357 incomingQoS2messages[i] = id; 00358 return true; 00359 } 00360 } 00361 return false; 00362 } 00363 00364 00365 template<class Network, class Timer, int a, int b> 00366 void MQTT::Client<Network, Timer, a, b>::freeQoS2msgid(unsigned short id) 00367 { 00368 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00369 { 00370 if (incomingQoS2messages[i] == id) 00371 { 00372 incomingQoS2messages[i] = 0; 00373 return; 00374 } 00375 } 00376 } 00377 #endif 00378 00379 00380 template<class Network, class Timer, int a, int b> 00381 int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer) 00382 { 00383 int rc = FAILURE, 00384 sent = 0; 00385 00386 while (sent < length) 00387 { 00388 rc = ipstack.write(&sendbuf[sent], length - sent, timer.left_ms()); 00389 if (rc < 0) // there was an error writing the data 00390 break; 00391 sent += rc; 00392 if (timer.expired()) // only check expiry after at least one attempt to write 00393 break; 00394 } 00395 if (sent == length) 00396 { 00397 if (this->keepAliveInterval > 0) 00398 last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet 00399 rc = SUCCESS; 00400 } 00401 else 00402 rc = FAILURE; 00403 00404 #if defined(MQTT_DEBUG) 00405 char printbuf[150]; 00406 DEBUG("Rc %d from sending packet %s\r\n", rc, 00407 MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length)); 00408 #endif 00409 return rc; 00410 } 00411 00412 00413 template<class Network, class Timer, int a, int b> 00414 int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout) 00415 { 00416 unsigned char c; 00417 int multiplier = 1; 00418 int len = 0; 00419 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; 00420 00421 *value = 0; 00422 do 00423 { 00424 int rc = MQTTPACKET_READ_ERROR; 00425 00426 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) 00427 { 00428 rc = MQTTPACKET_READ_ERROR; /* bad data */ 00429 goto exit; 00430 } 00431 rc = ipstack.read(&c, 1, timeout); 00432 if (rc != 1) 00433 goto exit; 00434 *value += (c & 127) * multiplier; 00435 multiplier *= 128; 00436 } while ((c & 128) != 0); 00437 exit: 00438 return len; 00439 } 00440 00441 00442 /** 00443 * If any read fails in this method, then we should disconnect from the network, as on reconnect 00444 * the packets can be retried. 00445 * @param timeout the max time to wait for the packet read to complete, in milliseconds 00446 * @return the MQTT packet type, 0 if none, -1 if error 00447 */ 00448 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00449 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::readPacket(Timer& timer) 00450 { 00451 int rc = FAILURE; 00452 MQTTHeader header = {0}; 00453 int len = 0; 00454 int rem_len = 0; 00455 00456 /* 1. read the header byte. This has the packet type in it */ 00457 rc = ipstack.read(readbuf, 1, timer.left_ms()); 00458 if (rc != 1) 00459 { 00460 rc = 0; // timed out reading packet 00461 goto exit; 00462 } 00463 00464 len = 1; 00465 /* 2. read the remaining length. This is variable in itself */ 00466 decodePacket(&rem_len, timer.left_ms()); 00467 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */ 00468 00469 if (rem_len > (MAX_MQTT_PACKET_SIZE - len)) 00470 { 00471 rc = BUFFER_OVERFLOW; 00472 goto exit; 00473 } 00474 00475 /* 3. read the rest of the buffer using a callback to supply the rest of the data */ 00476 if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len)) 00477 { 00478 rc = FAILURE; 00479 goto exit; 00480 } 00481 00482 header.byte = readbuf[0]; 00483 rc = header.bits.type; 00484 if (this->keepAliveInterval > 0) 00485 last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet 00486 exit: 00487 00488 #if defined(MQTT_DEBUG) 00489 if (rc >= 0) 00490 { 00491 char printbuf[50]; 00492 DEBUG("Rc %d receiving packet %s\r\n", rc, 00493 MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len)); 00494 } 00495 #endif 00496 return rc; 00497 } 00498 00499 00500 // assume topic filter and name is in correct format 00501 // # can only be at end 00502 // + and # can only be next to separator 00503 template<class Network, class Timer, int a, int b> 00504 bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName) 00505 { 00506 char* curf = topicFilter; 00507 char* curn = topicName.lenstring.data; 00508 char* curn_end = curn + topicName.lenstring.len; 00509 00510 while (*curf && curn < curn_end) 00511 { 00512 if (*curn == '/' && *curf != '/') 00513 break; 00514 if (*curf != '+' && *curf != '#' && *curf != *curn) 00515 break; 00516 if (*curf == '+') 00517 { // skip until we meet the next separator, or end of string 00518 char* nextpos = curn + 1; 00519 while (nextpos < curn_end && *nextpos != '/') 00520 nextpos = ++curn + 1; 00521 } 00522 else if (*curf == '#') 00523 curn = curn_end - 1; // skip until end of string 00524 curf++; 00525 curn++; 00526 }; 00527 00528 return (curn == curn_end) && (*curf == '\0'); 00529 } 00530 00531 00532 00533 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00534 int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message) 00535 { 00536 int rc = FAILURE; 00537 00538 // we have to find the right message handler - indexed by topic 00539 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00540 { 00541 if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) || 00542 isTopicMatched((char*)messageHandlers[i].topicFilter, topicName))) 00543 { 00544 if (messageHandlers[i].fp.attached()) 00545 { 00546 MessageData md(topicName, message); 00547 messageHandlers[i].fp(md); 00548 rc = SUCCESS; 00549 } 00550 } 00551 } 00552 00553 if (rc == FAILURE && defaultMessageHandler.attached()) 00554 { 00555 MessageData md(topicName, message); 00556 defaultMessageHandler(md); 00557 rc = SUCCESS; 00558 } 00559 00560 return rc; 00561 } 00562 00563 00564 00565 template<class Network, class Timer, int a, int b> 00566 int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms) 00567 { 00568 int rc = SUCCESS; 00569 Timer timer; 00570 00571 timer.countdown_ms(timeout_ms); 00572 while (!timer.expired()) 00573 { 00574 if (cycle(timer) < 0) 00575 { 00576 rc = FAILURE; 00577 break; 00578 } 00579 } 00580 00581 return rc; 00582 } 00583 00584 00585 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00586 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer) 00587 { 00588 // get one piece of work off the wire and one pass through 00589 int len = 0, 00590 rc = SUCCESS; 00591 00592 int packet_type = readPacket(timer); // read the socket, see what work is due 00593 00594 switch (packet_type) 00595 { 00596 default: 00597 // no more data to read, unrecoverable. Or read packet fails due to unexpected network error 00598 rc = packet_type; 00599 goto exit; 00600 case 0: // timed out reading packet 00601 break; 00602 case CONNACK: 00603 case PUBACK: 00604 case SUBACK: 00605 case UNSUBACK: 00606 break; 00607 case PUBLISH: 00608 { 00609 MQTTString topicName = MQTTString_initializer; 00610 Message msg; 00611 int intQoS; 00612 msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */ 00613 if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName, 00614 (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1) { 00615 rc = FAILURE; 00616 goto exit; 00617 } 00618 msg.qos = (enum QoS)intQoS; 00619 #if MQTTCLIENT_QOS2 00620 if (msg.qos != QOS2) 00621 #endif 00622 deliverMessage(topicName, msg); 00623 #if MQTTCLIENT_QOS2 00624 else if (isQoS2msgidFree(msg.id)) 00625 { 00626 if (useQoS2msgid(msg.id)) 00627 deliverMessage(topicName, msg); 00628 else 00629 WARN("Maximum number of incoming QoS2 messages exceeded"); 00630 } 00631 #endif 00632 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00633 if (msg.qos != QOS0) 00634 { 00635 if (msg.qos == QOS1) 00636 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id); 00637 else if (msg.qos == QOS2) 00638 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id); 00639 if (len <= 0) 00640 rc = FAILURE; 00641 else 00642 rc = sendPacket(len, timer); 00643 if (rc == FAILURE) 00644 goto exit; // there was a problem 00645 } 00646 break; 00647 #endif 00648 } 00649 #if MQTTCLIENT_QOS2 00650 case PUBREC: 00651 case PUBREL: 00652 unsigned short mypacketid; 00653 unsigned char dup, type; 00654 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00655 rc = FAILURE; 00656 else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, 00657 (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0) 00658 rc = FAILURE; 00659 else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet 00660 rc = FAILURE; // there was a problem 00661 if (rc == FAILURE) 00662 goto exit; // there was a problem 00663 if (packet_type == PUBREL) 00664 freeQoS2msgid(mypacketid); 00665 break; 00666 00667 case PUBCOMP: 00668 break; 00669 #endif 00670 case PINGRESP: 00671 ping_outstanding = false; 00672 break; 00673 } 00674 00675 if (keepalive() != SUCCESS) 00676 //check only keepalive FAILURE status so that previous FAILURE status can be considered as FAULT 00677 rc = FAILURE; 00678 00679 exit: 00680 if (rc == SUCCESS) 00681 rc = packet_type; 00682 else if (isconnected) 00683 closeSession(); 00684 return rc; 00685 } 00686 00687 00688 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00689 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive() 00690 { 00691 int rc = SUCCESS; 00692 static Timer ping_sent; 00693 00694 if (keepAliveInterval == 0) 00695 goto exit; 00696 00697 if (ping_outstanding) 00698 { 00699 if (ping_sent.expired()) 00700 { 00701 rc = FAILURE; // session failure 00702 #if defined(MQTT_DEBUG) 00703 DEBUG("PINGRESP not received in keepalive interval\r\n"); 00704 #endif 00705 } 00706 } 00707 else if (last_sent.expired() || last_received.expired()) 00708 { 00709 Timer timer(1000); 00710 int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE); 00711 if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet 00712 { 00713 ping_outstanding = true; 00714 ping_sent.countdown(this->keepAliveInterval); 00715 } 00716 } 00717 exit: 00718 return rc; 00719 } 00720 00721 00722 // only used in single-threaded mode where one command at a time is in process 00723 template<class Network, class Timer, int a, int b> 00724 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer) 00725 { 00726 int rc = FAILURE; 00727 00728 do 00729 { 00730 if (timer.expired()) 00731 break; // we timed out 00732 rc = cycle(timer); 00733 } 00734 while (rc != packet_type && rc >= 0); 00735 00736 return rc; 00737 } 00738 00739 00740 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00741 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options, connackData& data) 00742 { 00743 Timer connect_timer(command_timeout_ms); 00744 int rc = FAILURE; 00745 int len = 0; 00746 00747 if (isconnected) // don't send connect packet again if we are already connected 00748 goto exit; 00749 00750 this->keepAliveInterval = options.keepAliveInterval; 00751 this->cleansession = options.cleansession; 00752 if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0) 00753 goto exit; 00754 if ((rc = sendPacket(len, connect_timer)) != SUCCESS) // send the connect packet 00755 goto exit; // there was a problem 00756 00757 if (this->keepAliveInterval > 0) 00758 last_received.countdown(this->keepAliveInterval); 00759 // this will be a blocking call, wait for the connack 00760 if (waitfor(CONNACK, connect_timer) == CONNACK) 00761 { 00762 data.rc = 0; 00763 data.sessionPresent = false; 00764 if (MQTTDeserialize_connack((unsigned char*)&data.sessionPresent, 00765 (unsigned char*)&data.rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00766 rc = data.rc; 00767 else 00768 rc = FAILURE; 00769 } 00770 else 00771 rc = FAILURE; 00772 00773 #if MQTTCLIENT_QOS2 00774 // resend any inflight publish 00775 if (inflightMsgid > 0 && inflightQoS == QOS2 && pubrel) 00776 { 00777 if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0) 00778 rc = FAILURE; 00779 else 00780 rc = publish(len, connect_timer, inflightQoS); 00781 } 00782 else 00783 #endif 00784 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00785 if (inflightMsgid > 0) 00786 { 00787 memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE); 00788 rc = publish(inflightLen, connect_timer, inflightQoS); 00789 } 00790 #endif 00791 00792 exit: 00793 if (rc == SUCCESS) 00794 { 00795 isconnected = true; 00796 ping_outstanding = false; 00797 } 00798 return rc; 00799 } 00800 00801 00802 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00803 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options) 00804 { 00805 connackData data; 00806 return connect(options, data); 00807 } 00808 00809 00810 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00811 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect() 00812 { 00813 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; 00814 return connect(default_options); 00815 } 00816 00817 00818 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00819 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::setMessageHandler(const char* topicFilter, messageHandler messageHandler) 00820 { 00821 int rc = FAILURE; 00822 int i = -1; 00823 00824 // first check for an existing matching slot 00825 for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00826 { 00827 if (messageHandlers[i].topicFilter != 0 && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0) 00828 { 00829 if (messageHandler == 0) // remove existing 00830 { 00831 messageHandlers[i].topicFilter = 0; 00832 messageHandlers[i].fp.detach(); 00833 } 00834 rc = SUCCESS; // return i when adding new subscription 00835 break; 00836 } 00837 } 00838 // if no existing, look for empty slot (unless we are removing) 00839 if (messageHandler != 0) { 00840 if (rc == FAILURE) 00841 { 00842 for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00843 { 00844 if (messageHandlers[i].topicFilter == 0) 00845 { 00846 rc = SUCCESS; 00847 break; 00848 } 00849 } 00850 } 00851 if (i < MAX_MESSAGE_HANDLERS) 00852 { 00853 messageHandlers[i].topicFilter = topicFilter; 00854 messageHandlers[i].fp.attach(messageHandler); 00855 } 00856 } 00857 return rc; 00858 } 00859 00860 00861 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00862 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, 00863 enum QoS qos, messageHandler messageHandler, subackData& data) 00864 { 00865 int rc = FAILURE; 00866 Timer timer(command_timeout_ms); 00867 int len = 0; 00868 MQTTString topic = {(char*)topicFilter, {0, 0}}; 00869 00870 if (!isconnected) 00871 goto exit; 00872 00873 len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); 00874 if (len <= 0) 00875 goto exit; 00876 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet 00877 goto exit; // there was a problem 00878 00879 if (waitfor(SUBACK, timer) == SUBACK) // wait for suback 00880 { 00881 int count = 0; 00882 unsigned short mypacketid; 00883 data.grantedQoS = 0; 00884 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &data.grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00885 { 00886 if (data.grantedQoS != 0x80) 00887 rc = setMessageHandler(topicFilter, messageHandler); 00888 } 00889 } 00890 else 00891 rc = FAILURE; 00892 00893 exit: 00894 if (rc == FAILURE) 00895 closeSession(); 00896 return rc; 00897 } 00898 00899 00900 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00901 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler) 00902 { 00903 subackData data; 00904 return subscribe(topicFilter, qos, messageHandler, data); 00905 } 00906 00907 00908 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00909 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter) 00910 { 00911 int rc = FAILURE; 00912 Timer timer(command_timeout_ms); 00913 MQTTString topic = {(char*)topicFilter, {0, 0}}; 00914 int len = 0; 00915 00916 if (!isconnected) 00917 goto exit; 00918 00919 if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0) 00920 goto exit; 00921 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet 00922 goto exit; // there was a problem 00923 00924 if (waitfor(UNSUBACK, timer) == UNSUBACK) 00925 { 00926 unsigned short mypacketid; // should be the same as the packetid above 00927 if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00928 { 00929 // remove the subscription message handler associated with this topic, if there is one 00930 setMessageHandler(topicFilter, 0); 00931 } 00932 } 00933 else 00934 rc = FAILURE; 00935 00936 exit: 00937 if (rc != SUCCESS) 00938 closeSession(); 00939 return rc; 00940 } 00941 00942 00943 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00944 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos) 00945 { 00946 int rc; 00947 00948 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet 00949 goto exit; // there was a problem 00950 00951 #if MQTTCLIENT_QOS1 00952 if (qos == QOS1) 00953 { 00954 if (waitfor(PUBACK, timer) == PUBACK) 00955 { 00956 unsigned short mypacketid; 00957 unsigned char dup, type; 00958 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00959 rc = FAILURE; 00960 else if (inflightMsgid == mypacketid) 00961 inflightMsgid = 0; 00962 } 00963 else 00964 rc = FAILURE; 00965 } 00966 #endif 00967 #if MQTTCLIENT_QOS2 00968 else if (qos == QOS2) 00969 { 00970 if (waitfor(PUBCOMP, timer) == PUBCOMP) 00971 { 00972 unsigned short mypacketid; 00973 unsigned char dup, type; 00974 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00975 rc = FAILURE; 00976 else if (inflightMsgid == mypacketid) 00977 inflightMsgid = 0; 00978 } 00979 else 00980 rc = FAILURE; 00981 } 00982 #endif 00983 00984 exit: 00985 if (rc != SUCCESS) 00986 closeSession(); 00987 return rc; 00988 } 00989 00990 00991 00992 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00993 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained) 00994 { 00995 int rc = FAILURE; 00996 Timer timer(command_timeout_ms); 00997 MQTTString topicString = MQTTString_initializer; 00998 int len = 0; 00999 01000 if (!isconnected) 01001 goto exit; 01002 01003 topicString.cstring = (char*)topicName; 01004 01005 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 01006 if (qos == QOS1 || qos == QOS2) 01007 id = packetid.getNext(); 01008 #endif 01009 01010 len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id, 01011 topicString, (unsigned char*)payload, payloadlen); 01012 if (len <= 0) 01013 goto exit; 01014 01015 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 01016 if (!cleansession) 01017 { 01018 memcpy(pubbuf, sendbuf, len); 01019 inflightMsgid = id; 01020 inflightLen = len; 01021 inflightQoS = qos; 01022 #if MQTTCLIENT_QOS2 01023 pubrel = false; 01024 #endif 01025 } 01026 #endif 01027 01028 rc = publish(len, timer, qos); 01029 exit: 01030 return rc; 01031 } 01032 01033 01034 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 01035 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained) 01036 { 01037 unsigned short id = 0; // dummy - not used for anything 01038 return publish(topicName, payload, payloadlen, id, qos, retained); 01039 } 01040 01041 01042 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 01043 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message) 01044 { 01045 return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained); 01046 } 01047 01048 01049 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 01050 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect() 01051 { 01052 int rc = FAILURE; 01053 Timer timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete 01054 int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE); 01055 if (len > 0) 01056 rc = sendPacket(len, timer); // send the disconnect packet 01057 closeSession(); 01058 return rc; 01059 } 01060 01061 #endif
Generated on Wed Jul 13 2022 10:46:02 by
1.7.2