Asavie IoT Connect MQTT cloud service connectors
Dependents: AsavieIoTConnectCloudConnector
MQTTClient.h
00001 /******************************************************************************* 00002 * Copyright (c) 2014, 2017 IBM Corp. 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Ian Craggs - initial API and implementation and/or initial documentation 00015 * Ian Craggs - fix for bug 458512 - QoS 2 messages 00016 * Ian Craggs - fix for bug 460389 - send loop uses wrong length 00017 * Ian Craggs - fix for bug 464169 - clearing subscriptions 00018 * Ian Craggs - fix for bug 464551 - enums and ints can be different size 00019 * Mark Sonnentag - fix for bug 475204 - inefficient instantiation of Timer 00020 * Ian Craggs - fix for bug 475749 - packetid modified twice 00021 * Ian Craggs - add ability to set message handler separately #6 00022 *******************************************************************************/ 00023 00024 #if !defined(MQTTCLIENT_H) 00025 #define MQTTCLIENT_H 00026 00027 #include <string.h> 00028 #include "FP.h" 00029 #include "MQTTPacket.h" 00030 #include <stdio.h> 00031 #include "MQTTLogging.h" 00032 00033 #if !defined(MQTTCLIENT_QOS1) 00034 #define MQTTCLIENT_QOS1 1 00035 #endif 00036 #if !defined(MQTTCLIENT_QOS2) 00037 #define MQTTCLIENT_QOS2 0 00038 #endif 00039 00040 namespace MQTT 00041 { 00042 00043 00044 enum QoS { QOS0, QOS1, QOS2 }; 00045 00046 // all failure return codes must be negative 00047 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 }; 00048 00049 00050 struct Message 00051 { 00052 enum QoS qos; 00053 bool retained; 00054 bool dup; 00055 unsigned short id; 00056 void *payload; 00057 size_t payloadlen; 00058 }; 00059 00060 00061 struct MessageData 00062 { 00063 MessageData(MQTTString &aTopicName, struct Message &aMessage) : message(aMessage), topicName(aTopicName) 00064 { } 00065 00066 struct Message &message; 00067 MQTTString &topicName; 00068 }; 00069 00070 00071 struct connackData 00072 { 00073 int rc; 00074 bool sessionPresent; 00075 }; 00076 00077 00078 struct subackData 00079 { 00080 int grantedQoS; 00081 }; 00082 00083 00084 class PacketId 00085 { 00086 public: 00087 PacketId() 00088 { 00089 next = 0; 00090 } 00091 00092 int getNext() 00093 { 00094 return next = (next == MAX_PACKET_ID) ? 1 : next + 1; 00095 } 00096 00097 private: 00098 static const int MAX_PACKET_ID = 65535; 00099 int next; 00100 }; 00101 00102 00103 /** 00104 * @class Client 00105 * @brief blocking, non-threaded MQTT client API 00106 * 00107 * This version of the API blocks on all method calls, until they are complete. This means that only one 00108 * MQTT request can be in process at any one time. 00109 * @param Network a network class which supports send, receive 00110 * @param Timer a timer class with the methods: 00111 */ 00112 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5> 00113 class Client 00114 { 00115 00116 public: 00117 00118 typedef void (*messageHandler)(MessageData&); 00119 00120 /** Construct the client 00121 * @param network - pointer to an instance of the Network class - must be connected to the endpoint 00122 * before calling MQTT connect 00123 * @param limits an instance of the Limit class - to alter limits as required 00124 */ 00125 Client(Network& network, unsigned int command_timeout_ms = 30000); 00126 00127 /** Set the default message handling callback - used for any message which does not match a subscription message handler 00128 * @param mh - pointer to the callback function. Set to 0 to remove. 00129 */ 00130 void setDefaultMessageHandler(messageHandler mh) 00131 { 00132 if (mh != 0) 00133 defaultMessageHandler.attach(mh); 00134 else 00135 defaultMessageHandler.detach(); 00136 } 00137 00138 /** Set a message handling callback. This can be used outside of the the subscribe method. 00139 * @param topicFilter - a topic pattern which can include wildcards 00140 * @param mh - pointer to the callback function. If 0, removes the callback if any 00141 */ 00142 int setMessageHandler(const char* topicFilter, messageHandler mh); 00143 00144 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00145 * The nework object must be connected to the network endpoint before calling this 00146 * Default connect options are used 00147 * @return success code - 00148 */ 00149 int connect(); 00150 00151 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00152 * The nework object must be connected to the network endpoint before calling this 00153 * @param options - connect options 00154 * @return success code - 00155 */ 00156 int connect(MQTTPacket_connectData& options); 00157 00158 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00159 * The nework object must be connected to the network endpoint before calling this 00160 * @param options - connect options 00161 * @param connackData - connack data to be returned 00162 * @return success code - 00163 */ 00164 int connect(MQTTPacket_connectData& options, connackData& data); 00165 00166 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00167 * @param topic - the topic to publish to 00168 * @param message - the message to send 00169 * @return success code - 00170 */ 00171 int publish(const char* topicName, Message& message); 00172 00173 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00174 * @param topic - the topic to publish to 00175 * @param payload - the data to send 00176 * @param payloadlen - the length of the data 00177 * @param qos - the QoS to send the publish at 00178 * @param retained - whether the message should be retained 00179 * @return success code - 00180 */ 00181 int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false); 00182 00183 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00184 * @param topic - the topic to publish to 00185 * @param payload - the data to send 00186 * @param payloadlen - the length of the data 00187 * @param id - the packet id used - returned 00188 * @param qos - the QoS to send the publish at 00189 * @param retained - whether the message should be retained 00190 * @return success code - 00191 */ 00192 int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false); 00193 00194 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback 00195 * @param topicFilter - a topic pattern which can include wildcards 00196 * @param qos - the MQTT QoS to subscribe at 00197 * @param mh - the callback function to be invoked when a message is received for this subscription 00198 * @return success code - 00199 */ 00200 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh); 00201 00202 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback 00203 * @param topicFilter - a topic pattern which can include wildcards 00204 * @param qos - the MQTT QoS to subscribe at© 00205 * @param mh - the callback function to be invoked when a message is received for this subscription 00206 * @param 00207 * @return success code - 00208 */ 00209 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh, subackData &data); 00210 00211 /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback 00212 * @param topicFilter - a topic pattern which can include wildcards 00213 * @return success code - 00214 */ 00215 int unsubscribe(const char* topicFilter); 00216 00217 /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state 00218 * @return success code - 00219 */ 00220 int disconnect(); 00221 00222 /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive 00223 * yield can be called if no other MQTT operation is needed. This will also allow messages to be 00224 * received. 00225 * @param timeout_ms the time to wait, in milliseconds 00226 * @return success code - on failure, this means the client has disconnected 00227 */ 00228 int yield(unsigned long timeout_ms = 1000L); 00229 00230 /** Is the client connected? 00231 * @return flag - is the client connected or not? 00232 */ 00233 bool isConnected() 00234 { 00235 return isconnected; 00236 } 00237 00238 private: 00239 00240 void closeSession(); 00241 void cleanSession(); 00242 int cycle(Timer& timer); 00243 int waitfor(int packet_type, Timer& timer); 00244 int keepalive(); 00245 int publish(int len, Timer& timer, enum QoS qos); 00246 00247 int decodePacket(int* value, int timeout); 00248 int readPacket(Timer& timer); 00249 int sendPacket(int length, Timer& timer); 00250 int deliverMessage(MQTTString& topicName, Message& message); 00251 bool isTopicMatched(char* topicFilter, MQTTString& topicName); 00252 00253 Network& ipstack; 00254 unsigned long command_timeout_ms; 00255 00256 unsigned char sendbuf[MAX_MQTT_PACKET_SIZE]; 00257 unsigned char readbuf[MAX_MQTT_PACKET_SIZE]; 00258 00259 Timer last_sent, last_received; 00260 unsigned int keepAliveInterval; 00261 bool ping_outstanding; 00262 bool cleansession; 00263 00264 PacketId packetid; 00265 00266 struct MessageHandlers 00267 { 00268 const char* topicFilter; 00269 FP<void, MessageData&> fp; 00270 } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic 00271 00272 FP<void, MessageData&> defaultMessageHandler; 00273 00274 bool isconnected; 00275 00276 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00277 unsigned char pubbuf[MAX_MQTT_PACKET_SIZE]; // store the last publish for sending on reconnect 00278 int inflightLen; 00279 unsigned short inflightMsgid; 00280 enum QoS inflightQoS; 00281 #endif 00282 00283 #if MQTTCLIENT_QOS2 00284 bool pubrel; 00285 #if !defined(MAX_INCOMING_QOS2_MESSAGES) 00286 #define MAX_INCOMING_QOS2_MESSAGES 10 00287 #endif 00288 unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES]; 00289 bool isQoS2msgidFree(unsigned short id); 00290 bool useQoS2msgid(unsigned short id); 00291 void freeQoS2msgid(unsigned short id); 00292 #endif 00293 00294 }; 00295 00296 } 00297 00298 00299 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00300 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::cleanSession() 00301 { 00302 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00303 messageHandlers[i].topicFilter = 0; 00304 00305 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00306 inflightMsgid = 0; 00307 inflightQoS = QOS0; 00308 #endif 00309 00310 #if MQTTCLIENT_QOS2 00311 pubrel = false; 00312 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00313 incomingQoS2messages[i] = 0; 00314 #endif 00315 } 00316 00317 00318 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00319 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::closeSession() 00320 { 00321 ping_outstanding = false; 00322 isconnected = false; 00323 if (cleansession) 00324 cleanSession(); 00325 } 00326 00327 00328 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00329 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms) : ipstack(network), packetid() 00330 { 00331 this->command_timeout_ms = command_timeout_ms; 00332 cleansession = true; 00333 closeSession(); 00334 } 00335 00336 00337 #if MQTTCLIENT_QOS2 00338 template<class Network, class Timer, int a, int b> 00339 bool MQTT::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id) 00340 { 00341 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00342 { 00343 if (incomingQoS2messages[i] == id) 00344 return false; 00345 } 00346 return true; 00347 } 00348 00349 00350 template<class Network, class Timer, int a, int b> 00351 bool MQTT::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id) 00352 { 00353 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00354 { 00355 if (incomingQoS2messages[i] == 0) 00356 { 00357 incomingQoS2messages[i] = id; 00358 return true; 00359 } 00360 } 00361 return false; 00362 } 00363 00364 00365 template<class Network, class Timer, int a, int b> 00366 void MQTT::Client<Network, Timer, a, b>::freeQoS2msgid(unsigned short id) 00367 { 00368 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00369 { 00370 if (incomingQoS2messages[i] == id) 00371 { 00372 incomingQoS2messages[i] = 0; 00373 return; 00374 } 00375 } 00376 } 00377 #endif 00378 00379 00380 template<class Network, class Timer, int a, int b> 00381 int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer) 00382 { 00383 int rc = FAILURE, 00384 sent = 0; 00385 00386 while (sent < length) 00387 { 00388 rc = ipstack.write(&sendbuf[sent], length - sent, timer.left_ms()); 00389 if (rc < 0) // there was an error writing the data 00390 break; 00391 sent += rc; 00392 if (timer.expired()) // only check expiry after at least one attempt to write 00393 break; 00394 } 00395 if (sent == length) 00396 { 00397 if (this->keepAliveInterval > 0) 00398 last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet 00399 rc = SUCCESS; 00400 } 00401 else 00402 rc = FAILURE; 00403 00404 #if defined(MQTT_DEBUG) 00405 char printbuf[150]; 00406 DEBUG("Rc %d from sending packet %s\r\n", rc, 00407 MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length)); 00408 #endif 00409 return rc; 00410 } 00411 00412 00413 template<class Network, class Timer, int a, int b> 00414 int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout) 00415 { 00416 unsigned char c; 00417 int multiplier = 1; 00418 int len = 0; 00419 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; 00420 00421 *value = 0; 00422 do 00423 { 00424 int rc = MQTTPACKET_READ_ERROR; 00425 00426 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) 00427 { 00428 rc = MQTTPACKET_READ_ERROR; /* bad data */ 00429 goto exit; 00430 } 00431 rc = ipstack.read(&c, 1, timeout); 00432 if (rc != 1) 00433 goto exit; 00434 *value += (c & 127) * multiplier; 00435 multiplier *= 128; 00436 } while ((c & 128) != 0); 00437 exit: 00438 return len; 00439 } 00440 00441 00442 /** 00443 * If any read fails in this method, then we should disconnect from the network, as on reconnect 00444 * the packets can be retried. 00445 * @param timeout the max time to wait for the packet read to complete, in milliseconds 00446 * @return the MQTT packet type, 0 if none, -1 if error 00447 */ 00448 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00449 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::readPacket(Timer& timer) 00450 { 00451 int rc = FAILURE; 00452 MQTTHeader header = {0}; 00453 int len = 0; 00454 int rem_len = 0; 00455 00456 /* 1. read the header byte. This has the packet type in it */ 00457 rc = ipstack.read(readbuf, 1, timer.left_ms()); 00458 if (rc != 1) 00459 goto exit; 00460 00461 len = 1; 00462 /* 2. read the remaining length. This is variable in itself */ 00463 decodePacket(&rem_len, timer.left_ms()); 00464 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */ 00465 00466 if (rem_len > (MAX_MQTT_PACKET_SIZE - len)) 00467 { 00468 rc = BUFFER_OVERFLOW; 00469 goto exit; 00470 } 00471 00472 /* 3. read the rest of the buffer using a callback to supply the rest of the data */ 00473 if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len)) 00474 goto exit; 00475 00476 header.byte = readbuf[0]; 00477 rc = header.bits.type; 00478 if (this->keepAliveInterval > 0) 00479 last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet 00480 exit: 00481 00482 #if defined(MQTT_DEBUG) 00483 if (rc >= 0) 00484 { 00485 char printbuf[50]; 00486 DEBUG("Rc %d receiving packet %s\r\n", rc, 00487 MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len)); 00488 } 00489 #endif 00490 return rc; 00491 } 00492 00493 00494 // assume topic filter and name is in correct format 00495 // # can only be at end 00496 // + and # can only be next to separator 00497 template<class Network, class Timer, int a, int b> 00498 bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName) 00499 { 00500 char* curf = topicFilter; 00501 char* curn = topicName.lenstring.data; 00502 char* curn_end = curn + topicName.lenstring.len; 00503 00504 while (*curf && curn < curn_end) 00505 { 00506 if (*curn == '/' && *curf != '/') 00507 break; 00508 if (*curf != '+' && *curf != '#' && *curf != *curn) 00509 break; 00510 if (*curf == '+') 00511 { // skip until we meet the next separator, or end of string 00512 char* nextpos = curn + 1; 00513 while (nextpos < curn_end && *nextpos != '/') 00514 nextpos = ++curn + 1; 00515 } 00516 else if (*curf == '#') 00517 curn = curn_end - 1; // skip until end of string 00518 curf++; 00519 curn++; 00520 }; 00521 00522 return (curn == curn_end) && (*curf == '\0'); 00523 } 00524 00525 00526 00527 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00528 int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message) 00529 { 00530 int rc = FAILURE; 00531 00532 // we have to find the right message handler - indexed by topic 00533 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00534 { 00535 if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) || 00536 isTopicMatched((char*)messageHandlers[i].topicFilter, topicName))) 00537 { 00538 if (messageHandlers[i].fp.attached()) 00539 { 00540 MessageData md(topicName, message); 00541 messageHandlers[i].fp(md); 00542 rc = SUCCESS; 00543 } 00544 } 00545 } 00546 00547 if (rc == FAILURE && defaultMessageHandler.attached()) 00548 { 00549 MessageData md(topicName, message); 00550 defaultMessageHandler(md); 00551 rc = SUCCESS; 00552 } 00553 00554 return rc; 00555 } 00556 00557 00558 00559 template<class Network, class Timer, int a, int b> 00560 int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms) 00561 { 00562 int rc = SUCCESS; 00563 Timer timer; 00564 00565 timer.countdown_ms(timeout_ms); 00566 while (!timer.expired()) 00567 { 00568 if (cycle(timer) < 0) 00569 { 00570 rc = FAILURE; 00571 break; 00572 } 00573 } 00574 00575 return rc; 00576 } 00577 00578 00579 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00580 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer) 00581 { 00582 // get one piece of work off the wire and one pass through 00583 int len = 0, 00584 rc = SUCCESS; 00585 00586 int packet_type = readPacket(timer); // read the socket, see what work is due 00587 00588 switch (packet_type) 00589 { 00590 default: 00591 // no more data to read, unrecoverable. Or read packet fails due to unexpected network error 00592 rc = packet_type; 00593 goto exit; 00594 case 0: // timed out reading packet 00595 break; 00596 case CONNACK: 00597 case PUBACK: 00598 case SUBACK: 00599 break; 00600 case PUBLISH: 00601 { 00602 MQTTString topicName = MQTTString_initializer; 00603 Message msg; 00604 int intQoS; 00605 msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */ 00606 if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName, 00607 (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00608 goto exit; 00609 msg.qos = (enum QoS)intQoS; 00610 #if MQTTCLIENT_QOS2 00611 if (msg.qos != QOS2) 00612 #endif 00613 deliverMessage(topicName, msg); 00614 #if MQTTCLIENT_QOS2 00615 else if (isQoS2msgidFree(msg.id)) 00616 { 00617 if (useQoS2msgid(msg.id)) 00618 deliverMessage(topicName, msg); 00619 else 00620 WARN("Maximum number of incoming QoS2 messages exceeded"); 00621 } 00622 #endif 00623 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00624 if (msg.qos != QOS0) 00625 { 00626 if (msg.qos == QOS1) 00627 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id); 00628 else if (msg.qos == QOS2) 00629 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id); 00630 if (len <= 0) 00631 rc = FAILURE; 00632 else 00633 rc = sendPacket(len, timer); 00634 if (rc == FAILURE) 00635 goto exit; // there was a problem 00636 } 00637 break; 00638 #endif 00639 } 00640 #if MQTTCLIENT_QOS2 00641 case PUBREC: 00642 case PUBREL: 00643 unsigned short mypacketid; 00644 unsigned char dup, type; 00645 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00646 rc = FAILURE; 00647 else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, 00648 (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0) 00649 rc = FAILURE; 00650 else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet 00651 rc = FAILURE; // there was a problem 00652 if (rc == FAILURE) 00653 goto exit; // there was a problem 00654 if (packet_type == PUBREL) 00655 freeQoS2msgid(mypacketid); 00656 break; 00657 00658 case PUBCOMP: 00659 break; 00660 #endif 00661 case PINGRESP: 00662 ping_outstanding = false; 00663 break; 00664 } 00665 00666 if (keepalive() != SUCCESS) 00667 //check only keepalive FAILURE status so that previous FAILURE status can be considered as FAULT 00668 rc = FAILURE; 00669 00670 exit: 00671 if (rc == SUCCESS) 00672 rc = packet_type; 00673 else if (isconnected) 00674 closeSession(); 00675 return rc; 00676 } 00677 00678 00679 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00680 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive() 00681 { 00682 int rc = SUCCESS; 00683 static Timer ping_sent; 00684 00685 if (keepAliveInterval == 0) 00686 goto exit; 00687 00688 if (ping_outstanding) 00689 { 00690 if (ping_sent.expired()) 00691 { 00692 rc = FAILURE; // session failure 00693 #if defined(MQTT_DEBUG) 00694 DEBUG("PINGRESP not received in keepalive interval\r\n"); 00695 #endif 00696 } 00697 } 00698 else if (last_sent.expired() || last_received.expired()) 00699 { 00700 Timer timer(1000); 00701 int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE); 00702 if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet 00703 { 00704 ping_outstanding = true; 00705 ping_sent.countdown(this->keepAliveInterval); 00706 } 00707 } 00708 exit: 00709 return rc; 00710 } 00711 00712 00713 // only used in single-threaded mode where one command at a time is in process 00714 template<class Network, class Timer, int a, int b> 00715 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer) 00716 { 00717 int rc = FAILURE; 00718 00719 do 00720 { 00721 if (timer.expired()) 00722 break; // we timed out 00723 rc = cycle(timer); 00724 } 00725 while (rc != packet_type && rc >= 0); 00726 00727 return rc; 00728 } 00729 00730 00731 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00732 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options, connackData& data) 00733 { 00734 Timer connect_timer(command_timeout_ms); 00735 int rc = FAILURE; 00736 int len = 0; 00737 00738 if (isconnected) // don't send connect packet again if we are already connected 00739 goto exit; 00740 00741 this->keepAliveInterval = options.keepAliveInterval; 00742 this->cleansession = options.cleansession; 00743 if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0) 00744 goto exit; 00745 if ((rc = sendPacket(len, connect_timer)) != SUCCESS) // send the connect packet 00746 goto exit; // there was a problem 00747 00748 if (this->keepAliveInterval > 0) 00749 last_received.countdown(this->keepAliveInterval); 00750 // this will be a blocking call, wait for the connack 00751 if (waitfor(CONNACK, connect_timer) == CONNACK) 00752 { 00753 data.rc = 0; 00754 data.sessionPresent = false; 00755 if (MQTTDeserialize_connack((unsigned char*)&data.sessionPresent, 00756 (unsigned char*)&data.rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00757 rc = data.rc; 00758 else 00759 rc = FAILURE; 00760 } 00761 else 00762 rc = FAILURE; 00763 00764 #if MQTTCLIENT_QOS2 00765 // resend any inflight publish 00766 if (inflightMsgid > 0 && inflightQoS == QOS2 && pubrel) 00767 { 00768 if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0) 00769 rc = FAILURE; 00770 else 00771 rc = publish(len, connect_timer, inflightQoS); 00772 } 00773 else 00774 #endif 00775 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00776 if (inflightMsgid > 0) 00777 { 00778 memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE); 00779 rc = publish(inflightLen, connect_timer, inflightQoS); 00780 } 00781 #endif 00782 00783 exit: 00784 if (rc == SUCCESS) 00785 { 00786 isconnected = true; 00787 ping_outstanding = false; 00788 } 00789 return rc; 00790 } 00791 00792 00793 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00794 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options) 00795 { 00796 connackData data; 00797 return connect(options, data); 00798 } 00799 00800 00801 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00802 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect() 00803 { 00804 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; 00805 return connect(default_options); 00806 } 00807 00808 00809 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00810 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::setMessageHandler(const char* topicFilter, messageHandler messageHandler) 00811 { 00812 int rc = FAILURE; 00813 int i = -1; 00814 00815 // first check for an existing matching slot 00816 for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00817 { 00818 if (messageHandlers[i].topicFilter != 0 && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0) 00819 { 00820 if (messageHandler == 0) // remove existing 00821 { 00822 messageHandlers[i].topicFilter = 0; 00823 messageHandlers[i].fp.detach(); 00824 } 00825 rc = SUCCESS; // return i when adding new subscription 00826 break; 00827 } 00828 } 00829 // if no existing, look for empty slot (unless we are removing) 00830 if (messageHandler != 0) { 00831 if (rc == FAILURE) 00832 { 00833 for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00834 { 00835 if (messageHandlers[i].topicFilter == 0) 00836 { 00837 rc = SUCCESS; 00838 break; 00839 } 00840 } 00841 } 00842 if (i < MAX_MESSAGE_HANDLERS) 00843 { 00844 messageHandlers[i].topicFilter = topicFilter; 00845 messageHandlers[i].fp.attach(messageHandler); 00846 } 00847 } 00848 return rc; 00849 } 00850 00851 00852 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00853 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, 00854 enum QoS qos, messageHandler messageHandler, subackData& data) 00855 { 00856 int rc = FAILURE; 00857 Timer timer(command_timeout_ms); 00858 int len = 0; 00859 MQTTString topic = {(char*)topicFilter, {0, 0}}; 00860 00861 if (!isconnected) 00862 goto exit; 00863 00864 len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); 00865 if (len <= 0) 00866 goto exit; 00867 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet 00868 goto exit; // there was a problem 00869 00870 if (waitfor(SUBACK, timer) == SUBACK) // wait for suback 00871 { 00872 int count = 0; 00873 unsigned short mypacketid; 00874 data.grantedQoS = 0; 00875 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &data.grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00876 { 00877 if (data.grantedQoS != 0x80) 00878 rc = setMessageHandler(topicFilter, messageHandler); 00879 } 00880 } 00881 else 00882 rc = FAILURE; 00883 00884 exit: 00885 if (rc == FAILURE) 00886 closeSession(); 00887 return rc; 00888 } 00889 00890 00891 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00892 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler) 00893 { 00894 subackData data; 00895 return subscribe(topicFilter, qos, messageHandler, data); 00896 } 00897 00898 00899 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00900 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter) 00901 { 00902 int rc = FAILURE; 00903 Timer timer(command_timeout_ms); 00904 MQTTString topic = {(char*)topicFilter, {0, 0}}; 00905 int len = 0; 00906 00907 if (!isconnected) 00908 goto exit; 00909 00910 if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0) 00911 goto exit; 00912 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet 00913 goto exit; // there was a problem 00914 00915 if (waitfor(UNSUBACK, timer) == UNSUBACK) 00916 { 00917 unsigned short mypacketid; // should be the same as the packetid above 00918 if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00919 { 00920 // remove the subscription message handler associated with this topic, if there is one 00921 setMessageHandler(topicFilter, 0); 00922 } 00923 } 00924 else 00925 rc = FAILURE; 00926 00927 exit: 00928 if (rc != SUCCESS) 00929 closeSession(); 00930 return rc; 00931 } 00932 00933 00934 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00935 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos) 00936 { 00937 int rc; 00938 00939 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet 00940 goto exit; // there was a problem 00941 00942 #if MQTTCLIENT_QOS1 00943 if (qos == QOS1) 00944 { 00945 if (waitfor(PUBACK, timer) == PUBACK) 00946 { 00947 unsigned short mypacketid; 00948 unsigned char dup, type; 00949 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00950 rc = FAILURE; 00951 else if (inflightMsgid == mypacketid) 00952 inflightMsgid = 0; 00953 } 00954 else 00955 rc = FAILURE; 00956 } 00957 #endif 00958 #if MQTTCLIENT_QOS2 00959 else if (qos == QOS2) 00960 { 00961 if (waitfor(PUBCOMP, timer) == PUBCOMP) 00962 { 00963 unsigned short mypacketid; 00964 unsigned char dup, type; 00965 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00966 rc = FAILURE; 00967 else if (inflightMsgid == mypacketid) 00968 inflightMsgid = 0; 00969 } 00970 else 00971 rc = FAILURE; 00972 } 00973 #endif 00974 00975 exit: 00976 if (rc != SUCCESS) 00977 closeSession(); 00978 return rc; 00979 } 00980 00981 00982 00983 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00984 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained) 00985 { 00986 int rc = FAILURE; 00987 Timer timer(command_timeout_ms); 00988 MQTTString topicString = MQTTString_initializer; 00989 int len = 0; 00990 00991 if (!isconnected) 00992 goto exit; 00993 00994 topicString.cstring = (char*)topicName; 00995 00996 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00997 if (qos == QOS1 || qos == QOS2) 00998 id = packetid.getNext(); 00999 #endif 01000 01001 len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id, 01002 topicString, (unsigned char*)payload, payloadlen); 01003 if (len <= 0) 01004 goto exit; 01005 01006 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 01007 if (!cleansession) 01008 { 01009 memcpy(pubbuf, sendbuf, len); 01010 inflightMsgid = id; 01011 inflightLen = len; 01012 inflightQoS = qos; 01013 #if MQTTCLIENT_QOS2 01014 pubrel = false; 01015 #endif 01016 } 01017 #endif 01018 01019 rc = publish(len, timer, qos); 01020 exit: 01021 return rc; 01022 } 01023 01024 01025 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 01026 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained) 01027 { 01028 unsigned short id = 0; // dummy - not used for anything 01029 return publish(topicName, payload, payloadlen, id, qos, retained); 01030 } 01031 01032 01033 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 01034 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message) 01035 { 01036 return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained); 01037 } 01038 01039 01040 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 01041 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect() 01042 { 01043 int rc = FAILURE; 01044 Timer timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete 01045 int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE); 01046 if (len > 0) 01047 rc = sendPacket(len, timer); // send the disconnect packet 01048 closeSession(); 01049 return rc; 01050 } 01051 01052 #endif
Generated on Fri Aug 5 2022 17:48:29 by 1.7.2