d
Embed:
(wiki syntax)
Show/hide line numbers
MQTTClient.h
00001 /******************************************************************************* 00002 * Copyright (c) 2014, 2017 IBM Corp. 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Ian Craggs - initial API and implementation and/or initial documentation 00015 * Ian Craggs - fix for bug 458512 - QoS 2 messages 00016 * Ian Craggs - fix for bug 460389 - send loop uses wrong length 00017 * Ian Craggs - fix for bug 464169 - clearing subscriptions 00018 * Ian Craggs - fix for bug 464551 - enums and ints can be different size 00019 * Mark Sonnentag - fix for bug 475204 - inefficient instantiation of Timer 00020 * Ian Craggs - fix for bug 475749 - packetid modified twice 00021 * Ian Craggs - add ability to set message handler separately #6 00022 *******************************************************************************/ 00023 00024 #if !defined(MQTTCLIENT_H) 00025 #define MQTTCLIENT_H 00026 00027 #include "FP.h" 00028 #include "MQTTPacket.h" 00029 #include <stdio.h> 00030 #include "MQTTLogging.h" 00031 00032 #if !defined(MQTTCLIENT_QOS1) 00033 #define MQTTCLIENT_QOS1 1 00034 #endif 00035 #if !defined(MQTTCLIENT_QOS2) 00036 #define MQTTCLIENT_QOS2 0 00037 #endif 00038 00039 namespace MQTT 00040 { 00041 00042 00043 enum QoS { QOS0, QOS1, QOS2 }; 00044 00045 // all failure return codes must be negative 00046 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 }; 00047 00048 00049 struct Message 00050 { 00051 enum QoS qos; 00052 bool retained; 00053 bool dup; 00054 unsigned short id; 00055 void *payload; 00056 size_t payloadlen; 00057 }; 00058 00059 00060 struct MessageData 00061 { 00062 MessageData(MQTTString &aTopicName, struct Message &aMessage) : message(aMessage), topicName(aTopicName) 00063 { } 00064 00065 struct Message &message; 00066 MQTTString &topicName; 00067 }; 00068 00069 00070 struct connackData 00071 { 00072 int rc; 00073 bool sessionPresent; 00074 }; 00075 00076 00077 struct subackData 00078 { 00079 int grantedQoS; 00080 }; 00081 00082 00083 class PacketId 00084 { 00085 public: 00086 PacketId() 00087 { 00088 next = 0; 00089 } 00090 00091 int getNext() 00092 { 00093 return next = (next == MAX_PACKET_ID) ? 1 : next + 1; 00094 } 00095 00096 private: 00097 static const int MAX_PACKET_ID = 65535; 00098 int next; 00099 }; 00100 00101 00102 /** 00103 * @class Client 00104 * @brief blocking, non-threaded MQTT client API 00105 * 00106 * This version of the API blocks on all method calls, until they are complete. This means that only one 00107 * MQTT request can be in process at any one time. 00108 * @param Network a network class which supports send, receive 00109 * @param Timer a timer class with the methods: 00110 */ 00111 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 11> 00112 class Client 00113 { 00114 00115 public: 00116 00117 typedef void (*messageHandler)(MessageData&); 00118 00119 /** Construct the client 00120 * @param network - pointer to an instance of the Network class - must be connected to the endpoint 00121 * before calling MQTT connect 00122 * @param limits an instance of the Limit class - to alter limits as required 00123 */ 00124 Client(Network& network, unsigned int command_timeout_ms = 30000); 00125 00126 /** Set the default message handling callback - used for any message which does not match a subscription message handler 00127 * @param mh - pointer to the callback function. Set to 0 to remove. 00128 */ 00129 void setDefaultMessageHandler(messageHandler mh) 00130 { 00131 if (mh != 0) 00132 defaultMessageHandler.attach(mh); 00133 else 00134 defaultMessageHandler.detach(); 00135 } 00136 00137 /** Set a message handling callback. This can be used outside of the the subscribe method. 00138 * @param topicFilter - a topic pattern which can include wildcards 00139 * @param mh - pointer to the callback function. If 0, removes the callback if any 00140 */ 00141 int setMessageHandler(const char* topicFilter, messageHandler mh); 00142 00143 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00144 * The nework object must be connected to the network endpoint before calling this 00145 * Default connect options are used 00146 * @return success code - 00147 */ 00148 int connect(); 00149 00150 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00151 * The nework object must be connected to the network endpoint before calling this 00152 * @param options - connect options 00153 * @return success code - 00154 */ 00155 int connect(MQTTPacket_connectData& options); 00156 00157 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00158 * The nework object must be connected to the network endpoint before calling this 00159 * @param options - connect options 00160 * @param connackData - connack data to be returned 00161 * @return success code - 00162 */ 00163 int connect(MQTTPacket_connectData& options, connackData& data); 00164 00165 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00166 * @param topic - the topic to publish to 00167 * @param message - the message to send 00168 * @return success code - 00169 */ 00170 int publish(const char* topicName, Message& message); 00171 00172 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00173 * @param topic - the topic to publish to 00174 * @param payload - the data to send 00175 * @param payloadlen - the length of the data 00176 * @param qos - the QoS to send the publish at 00177 * @param retained - whether the message should be retained 00178 * @return success code - 00179 */ 00180 int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false); 00181 00182 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00183 * @param topic - the topic to publish to 00184 * @param payload - the data to send 00185 * @param payloadlen - the length of the data 00186 * @param id - the packet id used - returned 00187 * @param qos - the QoS to send the publish at 00188 * @param retained - whether the message should be retained 00189 * @return success code - 00190 */ 00191 int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false); 00192 00193 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback 00194 * @param topicFilter - a topic pattern which can include wildcards 00195 * @param qos - the MQTT QoS to subscribe at 00196 * @param mh - the callback function to be invoked when a message is received for this subscription 00197 * @return success code - 00198 */ 00199 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh); 00200 00201 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback 00202 * @param topicFilter - a topic pattern which can include wildcards 00203 * @param qos - the MQTT QoS to subscribe at© 00204 * @param mh - the callback function to be invoked when a message is received for this subscription 00205 * @param 00206 * @return success code - 00207 */ 00208 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh, subackData &data); 00209 00210 /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback 00211 * @param topicFilter - a topic pattern which can include wildcards 00212 * @return success code - 00213 */ 00214 int unsubscribe(const char* topicFilter); 00215 00216 /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state 00217 * @return success code - 00218 */ 00219 int disconnect(); 00220 00221 /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive 00222 * yield can be called if no other MQTT operation is needed. This will also allow messages to be 00223 * received. 00224 * @param timeout_ms the time to wait, in milliseconds 00225 * @return success code - on failure, this means the client has disconnected 00226 */ 00227 int yield(unsigned long timeout_ms = 1000L); 00228 00229 /** Is the client connected? 00230 * @return flag - is the client connected or not? 00231 */ 00232 bool isConnected() 00233 { 00234 return isconnected; 00235 } 00236 00237 private: 00238 00239 void closeSession(); 00240 void cleanSession(); 00241 int cycle(Timer& timer); 00242 int waitfor(int packet_type, Timer& timer); 00243 int keepalive(); 00244 int publish(int len, Timer& timer, enum QoS qos); 00245 00246 int decodePacket(int* value, int timeout); 00247 int readPacket(Timer& timer); 00248 int sendPacket(int length, Timer& timer); 00249 int deliverMessage(MQTTString& topicName, Message& message); 00250 bool isTopicMatched(char* topicFilter, MQTTString& topicName); 00251 00252 Network& ipstack; 00253 unsigned long command_timeout_ms; 00254 00255 unsigned char sendbuf[MAX_MQTT_PACKET_SIZE]; 00256 unsigned char readbuf[MAX_MQTT_PACKET_SIZE]; 00257 00258 Timer last_sent, last_received; 00259 unsigned int keepAliveInterval; 00260 bool ping_outstanding; 00261 bool cleansession; 00262 00263 PacketId packetid; 00264 00265 struct MessageHandlers 00266 { 00267 const char* topicFilter; 00268 FP<void, MessageData&> fp; 00269 } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic 00270 00271 FP<void, MessageData&> defaultMessageHandler; 00272 00273 bool isconnected; 00274 00275 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00276 unsigned char pubbuf[MAX_MQTT_PACKET_SIZE]; // store the last publish for sending on reconnect 00277 int inflightLen; 00278 unsigned short inflightMsgid; 00279 enum QoS inflightQoS; 00280 #endif 00281 00282 #if MQTTCLIENT_QOS2 00283 bool pubrel; 00284 #if !defined(MAX_INCOMING_QOS2_MESSAGES) 00285 #define MAX_INCOMING_QOS2_MESSAGES 10 00286 #endif 00287 unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES]; 00288 bool isQoS2msgidFree(unsigned short id); 00289 bool useQoS2msgid(unsigned short id); 00290 void freeQoS2msgid(unsigned short id); 00291 #endif 00292 00293 }; 00294 00295 } 00296 00297 00298 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00299 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::cleanSession() 00300 { 00301 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00302 messageHandlers[i].topicFilter = 0; 00303 00304 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00305 inflightMsgid = 0; 00306 inflightQoS = QOS0; 00307 #endif 00308 00309 #if MQTTCLIENT_QOS2 00310 pubrel = false; 00311 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00312 incomingQoS2messages[i] = 0; 00313 #endif 00314 } 00315 00316 00317 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00318 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::closeSession() 00319 { 00320 ping_outstanding = false; 00321 isconnected = false; 00322 if (cleansession) 00323 cleanSession(); 00324 } 00325 00326 00327 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00328 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms) : ipstack(network), packetid() 00329 { 00330 this->command_timeout_ms = command_timeout_ms; 00331 cleansession = true; 00332 closeSession(); 00333 } 00334 00335 00336 #if MQTTCLIENT_QOS2 00337 template<class Network, class Timer, int a, int b> 00338 bool MQTT::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id) 00339 { 00340 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00341 { 00342 if (incomingQoS2messages[i] == id) 00343 return false; 00344 } 00345 return true; 00346 } 00347 00348 00349 template<class Network, class Timer, int a, int b> 00350 bool MQTT::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id) 00351 { 00352 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00353 { 00354 if (incomingQoS2messages[i] == 0) 00355 { 00356 incomingQoS2messages[i] = id; 00357 return true; 00358 } 00359 } 00360 return false; 00361 } 00362 00363 00364 template<class Network, class Timer, int a, int b> 00365 void MQTT::Client<Network, Timer, a, b>::freeQoS2msgid(unsigned short id) 00366 { 00367 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00368 { 00369 if (incomingQoS2messages[i] == id) 00370 { 00371 incomingQoS2messages[i] = 0; 00372 return; 00373 } 00374 } 00375 } 00376 #endif 00377 00378 00379 template<class Network, class Timer, int a, int b> 00380 int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer) 00381 { 00382 int rc = FAILURE, 00383 sent = 0; 00384 00385 while (sent < length) 00386 { 00387 rc = ipstack.write(&sendbuf[sent], length - sent, timer.left_ms()); 00388 if (rc < 0) // there was an error writing the data 00389 break; 00390 sent += rc; 00391 if (timer.expired()) // only check expiry after at least one attempt to write 00392 break; 00393 } 00394 if (sent == length) 00395 { 00396 if (this->keepAliveInterval > 0) 00397 last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet 00398 rc = SUCCESS; 00399 } 00400 else 00401 rc = FAILURE; 00402 00403 #if defined(MQTT_DEBUG) 00404 char printbuf[150]; 00405 DEBUG("Rc %d from sending packet %s\r\n", rc, 00406 MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length)); 00407 #endif 00408 return rc; 00409 } 00410 00411 00412 template<class Network, class Timer, int a, int b> 00413 int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout) 00414 { 00415 unsigned char c; 00416 int multiplier = 1; 00417 int len = 0; 00418 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; 00419 00420 *value = 0; 00421 do 00422 { 00423 int rc = MQTTPACKET_READ_ERROR; 00424 00425 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) 00426 { 00427 rc = MQTTPACKET_READ_ERROR; /* bad data */ 00428 goto exit; 00429 } 00430 rc = ipstack.read(&c, 1, timeout); 00431 //printf("decode: %d\r\n",rc); 00432 if (rc != 1) 00433 goto exit; 00434 *value += (c & 127) * multiplier; 00435 multiplier *= 128; 00436 //printf("c: %d\r\n",c); 00437 //printf("value: %d\r\n",value); 00438 //printf("multiplier: %d\r\n",multiplier); 00439 } while ((c & 128) != 0); 00440 exit: 00441 return len; 00442 } 00443 00444 00445 /** 00446 * If any read fails in this method, then we should disconnect from the network, as on reconnect 00447 * the packets can be retried. 00448 * @param timeout the max time to wait for the packet read to complete, in milliseconds 00449 * @return the MQTT packet type, 0 if none, -1 if error 00450 */ 00451 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00452 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::readPacket(Timer& timer) 00453 { 00454 int rc = FAILURE; 00455 MQTTHeader header = {0}; 00456 int len = 0; 00457 int rem_len = 0; 00458 00459 /* 1. read the header byte. This has the packet type in it */ 00460 rc = ipstack.read(readbuf, 1, timer.left_ms()); 00461 //printf("read: %d\r\n",rc); 00462 //printf("%c\r\n",readbuf[0]); 00463 if (rc != 1) 00464 goto exit; 00465 00466 len = 1; 00467 /* 2. read the remaining length. This is variable in itself */ 00468 decodePacket(&rem_len, timer.left_ms()); 00469 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */ 00470 00471 if (rem_len > (MAX_MQTT_PACKET_SIZE - len)) 00472 { 00473 rc = BUFFER_OVERFLOW; 00474 goto exit; 00475 } 00476 00477 /* 3. read the rest of the buffer using a callback to supply the rest of the data */ 00478 if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len)) 00479 goto exit; 00480 00481 header.byte = readbuf[0]; 00482 rc = header.bits.type; 00483 if (this->keepAliveInterval > 0) 00484 last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet 00485 exit: 00486 00487 #if defined(MQTT_DEBUG) 00488 if (rc >= 0) 00489 { 00490 char printbuf[50]; 00491 DEBUG("Rc %d receiving packet %s\r\n", rc, 00492 MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len)); 00493 } 00494 #endif 00495 return rc; 00496 } 00497 00498 00499 // assume topic filter and name is in correct format 00500 // # can only be at end 00501 // + and # can only be next to separator 00502 template<class Network, class Timer, int a, int b> 00503 bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName) 00504 { 00505 char* curf = topicFilter; 00506 char* curn = topicName.lenstring.data; 00507 char* curn_end = curn + topicName.lenstring.len; 00508 00509 while (*curf && curn < curn_end) 00510 { 00511 if (*curn == '/' && *curf != '/') 00512 break; 00513 if (*curf != '+' && *curf != '#' && *curf != *curn) 00514 break; 00515 if (*curf == '+') 00516 { // skip until we meet the next separator, or end of string 00517 char* nextpos = curn + 1; 00518 while (nextpos < curn_end && *nextpos != '/') 00519 nextpos = ++curn + 1; 00520 } 00521 else if (*curf == '#') 00522 curn = curn_end - 1; // skip until end of string 00523 curf++; 00524 curn++; 00525 }; 00526 00527 return (curn == curn_end) && (*curf == '\0'); 00528 } 00529 00530 00531 00532 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00533 int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message) 00534 { 00535 int rc = FAILURE; 00536 00537 // we have to find the right message handler - indexed by topic 00538 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00539 { 00540 if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) || 00541 isTopicMatched((char*)messageHandlers[i].topicFilter, topicName))) 00542 { 00543 if (messageHandlers[i].fp.attached()) 00544 { 00545 MessageData md(topicName, message); 00546 messageHandlers[i].fp(md); 00547 rc = SUCCESS; 00548 } 00549 } 00550 } 00551 00552 if (rc == FAILURE && defaultMessageHandler.attached()) 00553 { 00554 MessageData md(topicName, message); 00555 defaultMessageHandler(md); 00556 rc = SUCCESS; 00557 } 00558 00559 return rc; 00560 } 00561 00562 00563 00564 template<class Network, class Timer, int a, int b> 00565 int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms) 00566 { 00567 int rc = SUCCESS; 00568 Timer timer; 00569 00570 timer.countdown_ms(timeout_ms); 00571 while (!timer.expired()) 00572 { 00573 //printf("while...\r\n"); 00574 int ans=cycle(timer); 00575 if (ans == -3004) 00576 { 00577 rc = -3004; 00578 break; 00579 } 00580 else if (ans < 0) 00581 { 00582 rc = FAILURE; 00583 break; 00584 } 00585 } 00586 00587 return rc; 00588 } 00589 00590 00591 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00592 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer) 00593 { 00594 // get one piece of work off the wire and one pass through 00595 int len = 0, 00596 rc = SUCCESS; 00597 00598 00599 int packet_type = readPacket(timer); // read the socket, see what work is due 00600 //printf("packet_type: %d\r\n",packet_type); 00601 00602 switch (packet_type) 00603 { 00604 default: 00605 // no more data to read, unrecoverable. Or read packet fails due to unexpected network error 00606 rc = packet_type; 00607 //printf("default\r\n"); 00608 goto exit; 00609 case 0: // timed out reading packet 00610 //printf("timeout\r\n"); 00611 break; 00612 case -3001: // timed out reading packet 00613 //printf("timeout\r\n"); 00614 break; 00615 case CONNACK: 00616 case PUBACK: 00617 case SUBACK: 00618 break; 00619 case PUBLISH: 00620 { 00621 MQTTString topicName = MQTTString_initializer; 00622 Message msg; 00623 int intQoS; 00624 msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */ 00625 if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName, 00626 (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00627 goto exit; 00628 msg.qos = (enum QoS)intQoS; 00629 #if MQTTCLIENT_QOS2 00630 if (msg.qos != QOS2) 00631 #endif 00632 deliverMessage(topicName, msg); 00633 #if MQTTCLIENT_QOS2 00634 else if (isQoS2msgidFree(msg.id)) 00635 { 00636 if (useQoS2msgid(msg.id)) 00637 deliverMessage(topicName, msg); 00638 else 00639 WARN("Maximum number of incoming QoS2 messages exceeded"); 00640 } 00641 #endif 00642 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00643 if (msg.qos != QOS0) 00644 { 00645 if (msg.qos == QOS1) 00646 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id); 00647 else if (msg.qos == QOS2) 00648 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id); 00649 if (len <= 0) 00650 rc = FAILURE; 00651 else 00652 rc = sendPacket(len, timer); 00653 if (rc == FAILURE) 00654 goto exit; // there was a problem 00655 } 00656 break; 00657 #endif 00658 } 00659 #if MQTTCLIENT_QOS2 00660 case PUBREC: 00661 case PUBREL: 00662 unsigned short mypacketid; 00663 unsigned char dup, type; 00664 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00665 rc = FAILURE; 00666 else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, 00667 (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0) 00668 rc = FAILURE; 00669 else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet 00670 rc = FAILURE; // there was a problem 00671 if (rc == FAILURE) 00672 goto exit; // there was a problem 00673 if (packet_type == PUBREL) 00674 freeQoS2msgid(mypacketid); 00675 break; 00676 00677 case PUBCOMP: 00678 break; 00679 #endif 00680 case PINGRESP: 00681 ping_outstanding = false; 00682 break; 00683 } 00684 00685 if (keepalive() != SUCCESS) 00686 //check only keepalive FAILURE status so that previous FAILURE status can be considered as FAULT 00687 rc = FAILURE; 00688 00689 exit: 00690 if (rc == SUCCESS) 00691 rc = packet_type; 00692 else if (isconnected) 00693 closeSession(); 00694 return rc; 00695 } 00696 00697 00698 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00699 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive() 00700 { 00701 int rc = SUCCESS; 00702 static Timer ping_sent; 00703 00704 if (keepAliveInterval == 0) 00705 goto exit; 00706 00707 if (ping_outstanding) 00708 { 00709 if (ping_sent.expired()) 00710 { 00711 rc = FAILURE; // session failure 00712 #if defined(MQTT_DEBUG) 00713 DEBUG("PINGRESP not received in keepalive interval\r\n"); 00714 #endif 00715 } 00716 } 00717 else if (last_sent.expired() || last_received.expired()) 00718 { 00719 Timer timer(1000); 00720 int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE); 00721 if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet 00722 { 00723 ping_outstanding = true; 00724 ping_sent.countdown(this->keepAliveInterval); 00725 } 00726 } 00727 exit: 00728 return rc; 00729 } 00730 00731 00732 // only used in single-threaded mode where one command at a time is in process 00733 template<class Network, class Timer, int a, int b> 00734 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer) 00735 { 00736 int rc = FAILURE; 00737 00738 do 00739 { 00740 if (timer.expired()) 00741 break; // we timed out 00742 rc = cycle(timer); 00743 } 00744 while (rc != packet_type && rc >= 0); 00745 00746 return rc; 00747 } 00748 00749 00750 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00751 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options, connackData& data) 00752 { 00753 Timer connect_timer(command_timeout_ms); 00754 int rc = FAILURE; 00755 int len = 0; 00756 00757 if (isconnected) // don't send connect packet again if we are already connected 00758 goto exit; 00759 00760 this->keepAliveInterval = options.keepAliveInterval; 00761 this->cleansession = options.cleansession; 00762 if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0) 00763 goto exit; 00764 if ((rc = sendPacket(len, connect_timer)) != SUCCESS) // send the connect packet 00765 goto exit; // there was a problem 00766 00767 if (this->keepAliveInterval > 0) 00768 last_received.countdown(this->keepAliveInterval); 00769 // this will be a blocking call, wait for the connack 00770 if (waitfor(CONNACK, connect_timer) == CONNACK) 00771 { 00772 data.rc = 0; 00773 data.sessionPresent = false; 00774 if (MQTTDeserialize_connack((unsigned char*)&data.sessionPresent, 00775 (unsigned char*)&data.rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00776 rc = data.rc; 00777 else 00778 rc = FAILURE; 00779 } 00780 else 00781 rc = FAILURE; 00782 00783 #if MQTTCLIENT_QOS2 00784 // resend any inflight publish 00785 if (inflightMsgid > 0 && inflightQoS == QOS2 && pubrel) 00786 { 00787 if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0) 00788 rc = FAILURE; 00789 else 00790 rc = publish(len, connect_timer, inflightQoS); 00791 } 00792 else 00793 #endif 00794 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00795 if (inflightMsgid > 0) 00796 { 00797 memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE); 00798 rc = publish(inflightLen, connect_timer, inflightQoS); 00799 } 00800 #endif 00801 00802 exit: 00803 if (rc == SUCCESS) 00804 { 00805 isconnected = true; 00806 ping_outstanding = false; 00807 } 00808 return rc; 00809 } 00810 00811 00812 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00813 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options) 00814 { 00815 connackData data; 00816 return connect(options, data); 00817 } 00818 00819 00820 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00821 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect() 00822 { 00823 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; 00824 return connect(default_options); 00825 } 00826 00827 00828 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00829 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::setMessageHandler(const char* topicFilter, messageHandler messageHandler) 00830 { 00831 int rc = FAILURE; 00832 int i = -1; 00833 00834 // first check for an existing matching slot 00835 for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00836 { 00837 if (messageHandlers[i].topicFilter != 0 && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0) 00838 { 00839 if (messageHandler == 0) // remove existing 00840 { 00841 messageHandlers[i].topicFilter = 0; 00842 messageHandlers[i].fp.detach(); 00843 } 00844 rc = SUCCESS; // return i when adding new subscription 00845 break; 00846 } 00847 } 00848 // if no existing, look for empty slot (unless we are removing) 00849 if (messageHandler != 0) { 00850 if (rc == FAILURE) 00851 { 00852 for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00853 { 00854 if (messageHandlers[i].topicFilter == 0) 00855 { 00856 rc = SUCCESS; 00857 break; 00858 } 00859 } 00860 } 00861 if (i < MAX_MESSAGE_HANDLERS) 00862 { 00863 messageHandlers[i].topicFilter = topicFilter; 00864 messageHandlers[i].fp.attach(messageHandler); 00865 } 00866 } 00867 return rc; 00868 } 00869 00870 00871 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00872 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, 00873 enum QoS qos, messageHandler messageHandler, subackData& data) 00874 { 00875 int rc = FAILURE; 00876 Timer timer(command_timeout_ms); 00877 int len = 0; 00878 MQTTString topic = {(char*)topicFilter, {0, 0}}; 00879 00880 if (!isconnected) 00881 goto exit; 00882 00883 len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); 00884 if (len <= 0) 00885 goto exit; 00886 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet 00887 goto exit; // there was a problem 00888 00889 if (waitfor(SUBACK, timer) == SUBACK) // wait for suback 00890 { 00891 int count = 0; 00892 unsigned short mypacketid; 00893 data.grantedQoS = 0; 00894 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &data.grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00895 { 00896 if (data.grantedQoS != 0x80) 00897 rc = setMessageHandler(topicFilter, messageHandler); 00898 } 00899 } 00900 else 00901 rc = FAILURE; 00902 00903 exit: 00904 if (rc == FAILURE) 00905 closeSession(); 00906 return rc; 00907 } 00908 00909 00910 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00911 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler) 00912 { 00913 subackData data; 00914 return subscribe(topicFilter, qos, messageHandler, data); 00915 } 00916 00917 00918 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00919 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter) 00920 { 00921 int rc = FAILURE; 00922 Timer timer(command_timeout_ms); 00923 MQTTString topic = {(char*)topicFilter, {0, 0}}; 00924 int len = 0; 00925 00926 if (!isconnected) 00927 goto exit; 00928 00929 if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0) 00930 goto exit; 00931 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet 00932 goto exit; // there was a problem 00933 00934 if (waitfor(UNSUBACK, timer) == UNSUBACK) 00935 { 00936 unsigned short mypacketid; // should be the same as the packetid above 00937 if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00938 { 00939 // remove the subscription message handler associated with this topic, if there is one 00940 setMessageHandler(topicFilter, 0); 00941 } 00942 } 00943 else 00944 rc = FAILURE; 00945 00946 exit: 00947 if (rc != SUCCESS) 00948 closeSession(); 00949 return rc; 00950 } 00951 00952 00953 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00954 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos) 00955 { 00956 int rc; 00957 00958 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet 00959 goto exit; // there was a problem 00960 00961 #if MQTTCLIENT_QOS1 00962 if (qos == QOS1) 00963 { 00964 if (waitfor(PUBACK, timer) == PUBACK) 00965 { 00966 unsigned short mypacketid; 00967 unsigned char dup, type; 00968 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00969 rc = FAILURE; 00970 else if (inflightMsgid == mypacketid) 00971 inflightMsgid = 0; 00972 } 00973 else 00974 rc = FAILURE; 00975 } 00976 #endif 00977 #if MQTTCLIENT_QOS2 00978 else if (qos == QOS2) 00979 { 00980 if (waitfor(PUBCOMP, timer) == PUBCOMP) 00981 { 00982 unsigned short mypacketid; 00983 unsigned char dup, type; 00984 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00985 rc = FAILURE; 00986 else if (inflightMsgid == mypacketid) 00987 inflightMsgid = 0; 00988 } 00989 else 00990 rc = FAILURE; 00991 } 00992 #endif 00993 00994 exit: 00995 if (rc != SUCCESS) 00996 closeSession(); 00997 return rc; 00998 } 00999 01000 01001 01002 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 01003 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained) 01004 { 01005 int rc = FAILURE; 01006 Timer timer(command_timeout_ms); 01007 MQTTString topicString = MQTTString_initializer; 01008 int len = 0; 01009 01010 if (!isconnected) 01011 goto exit; 01012 01013 topicString.cstring = (char*)topicName; 01014 01015 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 01016 if (qos == QOS1 || qos == QOS2) 01017 id = packetid.getNext(); 01018 #endif 01019 01020 len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id, 01021 topicString, (unsigned char*)payload, payloadlen); 01022 if (len <= 0) 01023 goto exit; 01024 01025 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 01026 if (!cleansession) 01027 { 01028 memcpy(pubbuf, sendbuf, len); 01029 inflightMsgid = id; 01030 inflightLen = len; 01031 inflightQoS = qos; 01032 #if MQTTCLIENT_QOS2 01033 pubrel = false; 01034 #endif 01035 } 01036 #endif 01037 01038 rc = publish(len, timer, qos); 01039 exit: 01040 return rc; 01041 } 01042 01043 01044 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 01045 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained) 01046 { 01047 unsigned short id = 0; // dummy - not used for anything 01048 return publish(topicName, payload, payloadlen, id, qos, retained); 01049 } 01050 01051 01052 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 01053 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message) 01054 { 01055 return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained); 01056 } 01057 01058 01059 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 01060 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect() 01061 { 01062 int rc = FAILURE; 01063 Timer timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete 01064 int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE); 01065 if (len > 0) 01066 rc = sendPacket(len, timer); // send the disconnect packet 01067 closeSession(); 01068 return rc; 01069 } 01070 01071 #endif
Generated on Tue Jul 12 2022 17:35:33 by 1.7.2