cc
Embed:
(wiki syntax)
Show/hide line numbers
MQTTClient.h
00001 /******************************************************************************* 00002 * Copyright (c) 2014, 2015 IBM Corp. 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Ian Craggs - initial API and implementation and/or initial documentation 00015 * Ian Craggs - fix for bug 458512 - QoS 2 messages 00016 * Ian Craggs - fix for bug 460389 - send loop uses wrong length 00017 * Ian Craggs - fix for bug 464169 - clearing subscriptions 00018 * Ian Craggs - fix for bug 464551 - enums and ints can be different size 00019 *******************************************************************************/ 00020 00021 #if !defined(MQTTCLIENT_H) 00022 #define MQTTCLIENT_H 00023 00024 #include "FP.h" 00025 #include "MQTTPacket.h" 00026 #include "stdio.h" 00027 #include "MQTTLogging.h" 00028 00029 #if !defined(MQTTCLIENT_QOS1) 00030 #define MQTTCLIENT_QOS1 1 00031 #endif 00032 #if !defined(MQTTCLIENT_QOS2) 00033 #define MQTTCLIENT_QOS2 0 00034 #endif 00035 00036 namespace MQTT 00037 { 00038 00039 00040 enum QoS { QOS0, QOS1, QOS2 }; 00041 00042 // all failure return codes must be negative 00043 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 }; 00044 00045 00046 struct Message 00047 { 00048 enum QoS qos; 00049 bool retained; 00050 bool dup; 00051 unsigned short id; 00052 void *payload; 00053 size_t payloadlen; 00054 }; 00055 00056 00057 struct MessageData 00058 { 00059 MessageData(MQTTString &aTopicName, struct Message &aMessage) : message(aMessage), topicName(aTopicName) 00060 { } 00061 00062 struct Message &message; 00063 MQTTString &topicName; 00064 }; 00065 00066 00067 class PacketId 00068 { 00069 public: 00070 PacketId() 00071 { 00072 next = 0; 00073 } 00074 00075 int getNext() 00076 { 00077 return next = (next == MAX_PACKET_ID) ? 1 : ++next; 00078 } 00079 00080 private: 00081 static const int MAX_PACKET_ID = 65535; 00082 int next; 00083 }; 00084 00085 00086 /** 00087 * @class Client 00088 * @brief blocking, non-threaded MQTT client API 00089 * 00090 * This version of the API blocks on all method calls, until they are complete. This means that only one 00091 * MQTT request can be in process at any one time. 00092 * @param Network a network class which supports send, receive 00093 * @param Timer a timer class with the methods: 00094 */ 00095 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5> 00096 class Client 00097 { 00098 00099 public: 00100 00101 typedef void (*messageHandler)(MessageData&); 00102 00103 /** Construct the client 00104 * @param network - pointer to an instance of the Network class - must be connected to the endpoint 00105 * before calling MQTT connect 00106 * @param limits an instance of the Limit class - to alter limits as required 00107 */ 00108 Client(Network& network, unsigned int command_timeout_ms = 30000); 00109 00110 /** Set the default message handling callback - used for any message which does not match a subscription message handler 00111 * @param mh - pointer to the callback function 00112 */ 00113 void setDefaultMessageHandler(messageHandler mh) 00114 { 00115 defaultMessageHandler.attach(mh); 00116 } 00117 00118 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00119 * The nework object must be connected to the network endpoint before calling this 00120 * Default connect options are used 00121 * @return success code - 00122 */ 00123 int connect(); 00124 00125 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00126 * The nework object must be connected to the network endpoint before calling this 00127 * @param options - connect options 00128 * @return success code - 00129 */ 00130 int connect(MQTTPacket_connectData& options); 00131 00132 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00133 * @param topic - the topic to publish to 00134 * @param message - the message to send 00135 * @return success code - 00136 */ 00137 int publish(const char* topicName, Message& message); 00138 00139 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00140 * @param topic - the topic to publish to 00141 * @param payload - the data to send 00142 * @param payloadlen - the length of the data 00143 * @param qos - the QoS to send the publish at 00144 * @param retained - whether the message should be retained 00145 * @return success code - 00146 */ 00147 int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false); 00148 00149 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00150 * @param topic - the topic to publish to 00151 * @param payload - the data to send 00152 * @param payloadlen - the length of the data 00153 * @param id - the packet id used - returned 00154 * @param qos - the QoS to send the publish at 00155 * @param retained - whether the message should be retained 00156 * @return success code - 00157 */ 00158 int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false); 00159 00160 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback 00161 * @param topicFilter - a topic pattern which can include wildcards 00162 * @param qos - the MQTT QoS to subscribe at 00163 * @param mh - the callback function to be invoked when a message is received for this subscription 00164 * @return success code - 00165 */ 00166 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh); 00167 00168 /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback 00169 * @param topicFilter - a topic pattern which can include wildcards 00170 * @return success code - 00171 */ 00172 int unsubscribe(const char* topicFilter); 00173 00174 /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state 00175 * @return success code - 00176 */ 00177 int disconnect(); 00178 00179 /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive 00180 * yield can be called if no other MQTT operation is needed. This will also allow messages to be 00181 * received. 00182 * @param timeout_ms the time to wait, in milliseconds 00183 * @return success code - on failure, this means the client has disconnected 00184 */ 00185 int yield(unsigned long timeout_ms = 1000L); 00186 00187 /** Is the client connected? 00188 * @return flag - is the client connected or not? 00189 */ 00190 bool isConnected() 00191 { 00192 return isconnected; 00193 } 00194 00195 private: 00196 00197 void cleanSession(); 00198 int cycle(Timer& timer); 00199 int waitfor(int packet_type, Timer& timer); 00200 int keepalive(); 00201 int publish(int len, Timer& timer, enum QoS qos); 00202 00203 int decodePacket(int* value, int timeout); 00204 int readPacket(Timer& timer); 00205 int sendPacket(int length, Timer& timer); 00206 int deliverMessage(MQTTString& topicName, Message& message); 00207 bool isTopicMatched(char* topicFilter, MQTTString& topicName); 00208 00209 Network& ipstack; 00210 unsigned long command_timeout_ms; 00211 00212 unsigned char sendbuf[MAX_MQTT_PACKET_SIZE]; 00213 unsigned char readbuf[MAX_MQTT_PACKET_SIZE]; 00214 00215 Timer last_sent, last_received; 00216 unsigned int keepAliveInterval; 00217 bool ping_outstanding; 00218 bool cleansession; 00219 00220 PacketId packetid; 00221 00222 struct MessageHandlers 00223 { 00224 const char* topicFilter; 00225 FP<void, MessageData&> fp; 00226 } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic 00227 00228 FP<void, MessageData&> defaultMessageHandler; 00229 00230 bool isconnected; 00231 00232 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00233 unsigned char pubbuf[MAX_MQTT_PACKET_SIZE]; // store the last publish for sending on reconnect 00234 int inflightLen; 00235 unsigned short inflightMsgid; 00236 enum QoS inflightQoS; 00237 #endif 00238 00239 #if MQTTCLIENT_QOS2 00240 bool pubrel; 00241 #if !defined(MAX_INCOMING_QOS2_MESSAGES) 00242 #define MAX_INCOMING_QOS2_MESSAGES 10 00243 #endif 00244 unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES]; 00245 bool isQoS2msgidFree(unsigned short id); 00246 bool useQoS2msgid(unsigned short id); 00247 void freeQoS2msgid(unsigned short id); 00248 #endif 00249 00250 }; 00251 00252 } 00253 00254 00255 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00256 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::cleanSession() 00257 { 00258 ping_outstanding = false; 00259 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00260 messageHandlers[i].topicFilter = 0; 00261 isconnected = false; 00262 00263 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00264 inflightMsgid = 0; 00265 inflightQoS = QOS0; 00266 #endif 00267 00268 #if MQTTCLIENT_QOS2 00269 pubrel = false; 00270 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00271 incomingQoS2messages[i] = 0; 00272 #endif 00273 } 00274 00275 00276 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00277 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms) : ipstack(network), packetid() 00278 { 00279 this->command_timeout_ms = command_timeout_ms; 00280 cleanSession(); 00281 } 00282 00283 00284 #if MQTTCLIENT_QOS2 00285 template<class Network, class Timer, int a, int b> 00286 bool MQTT::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id) 00287 { 00288 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00289 { 00290 if (incomingQoS2messages[i] == id) 00291 return false; 00292 } 00293 return true; 00294 } 00295 00296 00297 template<class Network, class Timer, int a, int b> 00298 bool MQTT::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id) 00299 { 00300 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00301 { 00302 if (incomingQoS2messages[i] == 0) 00303 { 00304 incomingQoS2messages[i] = id; 00305 return true; 00306 } 00307 } 00308 return false; 00309 } 00310 00311 00312 template<class Network, class Timer, int a, int b> 00313 void MQTT::Client<Network, Timer, a, b>::freeQoS2msgid(unsigned short id) 00314 { 00315 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00316 { 00317 if (incomingQoS2messages[i] == id) 00318 { 00319 incomingQoS2messages[i] = 0; 00320 return; 00321 } 00322 } 00323 } 00324 #endif 00325 00326 00327 template<class Network, class Timer, int a, int b> 00328 int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer) 00329 { 00330 int rc = FAILURE, 00331 sent = 0; 00332 00333 while (sent < length && !timer.expired()) 00334 { 00335 rc = ipstack.write(&sendbuf[sent], length - sent, timer.left_ms()); 00336 if (rc < 0) // there was an error writing the data 00337 break; 00338 sent += rc; 00339 } 00340 if (sent == length) 00341 { 00342 if (this->keepAliveInterval > 0) 00343 last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet 00344 rc = SUCCESS; 00345 } 00346 else 00347 rc = FAILURE; 00348 00349 #if defined(MQTT_DEBUG) 00350 char printbuf[150]; 00351 DEBUG("Rc %d from sending packet %s\n", rc, MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length)); 00352 #endif 00353 return rc; 00354 } 00355 00356 00357 template<class Network, class Timer, int a, int b> 00358 int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout) 00359 { 00360 unsigned char c; 00361 int multiplier = 1; 00362 int len = 0; 00363 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; 00364 00365 *value = 0; 00366 do 00367 { 00368 int rc = MQTTPACKET_READ_ERROR; 00369 00370 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) 00371 { 00372 rc = MQTTPACKET_READ_ERROR; /* bad data */ 00373 goto exit; 00374 } 00375 rc = ipstack.read(&c, 1, timeout); 00376 if (rc != 1) 00377 goto exit; 00378 *value += (c & 127) * multiplier; 00379 multiplier *= 128; 00380 } while ((c & 128) != 0); 00381 exit: 00382 return len; 00383 } 00384 00385 00386 /** 00387 * If any read fails in this method, then we should disconnect from the network, as on reconnect 00388 * the packets can be retried. 00389 * @param timeout the max time to wait for the packet read to complete, in milliseconds 00390 * @return the MQTT packet type, or -1 if none 00391 */ 00392 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00393 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::readPacket(Timer& timer) 00394 { 00395 int rc = FAILURE; 00396 MQTTHeader header = {0}; 00397 int len = 0; 00398 int rem_len = 0; 00399 00400 /* 1. read the header byte. This has the packet type in it */ 00401 if (ipstack.read(readbuf, 1, timer.left_ms()) != 1) 00402 goto exit; 00403 00404 len = 1; 00405 /* 2. read the remaining length. This is variable in itself */ 00406 decodePacket(&rem_len, timer.left_ms()); 00407 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */ 00408 00409 if (rem_len > (MAX_MQTT_PACKET_SIZE - len)) 00410 { 00411 rc = BUFFER_OVERFLOW; 00412 goto exit; 00413 } 00414 00415 /* 3. read the rest of the buffer using a callback to supply the rest of the data */ 00416 if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len)) 00417 goto exit; 00418 00419 header.byte = readbuf[0]; 00420 rc = header.bits.type; 00421 if (this->keepAliveInterval > 0) 00422 last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet 00423 exit: 00424 00425 #if defined(MQTT_DEBUG) 00426 if (rc >= 0) 00427 { 00428 char printbuf[50]; 00429 DEBUG("Rc %d from receiving packet %s\n", rc, MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len)); 00430 } 00431 #endif 00432 return rc; 00433 } 00434 00435 00436 // assume topic filter and name is in correct format 00437 // # can only be at end 00438 // + and # can only be next to separator 00439 template<class Network, class Timer, int a, int b> 00440 bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName) 00441 { 00442 char* curf = topicFilter; 00443 char* curn = topicName.lenstring.data; 00444 char* curn_end = curn + topicName.lenstring.len; 00445 00446 while (*curf && curn < curn_end) 00447 { 00448 if (*curn == '/' && *curf != '/') 00449 break; 00450 if (*curf != '+' && *curf != '#' && *curf != *curn) 00451 break; 00452 if (*curf == '+') 00453 { // skip until we meet the next separator, or end of string 00454 char* nextpos = curn + 1; 00455 while (nextpos < curn_end && *nextpos != '/') 00456 nextpos = ++curn + 1; 00457 } 00458 else if (*curf == '#') 00459 curn = curn_end - 1; // skip until end of string 00460 curf++; 00461 curn++; 00462 }; 00463 00464 return (curn == curn_end) && (*curf == '\0'); 00465 } 00466 00467 00468 00469 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00470 int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message) 00471 { 00472 int rc = FAILURE; 00473 00474 // we have to find the right message handler - indexed by topic 00475 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00476 { 00477 if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) || 00478 isTopicMatched((char*)messageHandlers[i].topicFilter, topicName))) 00479 { 00480 if (messageHandlers[i].fp.attached()) 00481 { 00482 MessageData md(topicName, message); 00483 messageHandlers[i].fp(md); 00484 rc = SUCCESS; 00485 } 00486 } 00487 } 00488 00489 if (rc == FAILURE && defaultMessageHandler.attached()) 00490 { 00491 MessageData md(topicName, message); 00492 defaultMessageHandler(md); 00493 rc = SUCCESS; 00494 } 00495 00496 return rc; 00497 } 00498 00499 00500 00501 template<class Network, class Timer, int a, int b> 00502 int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms) 00503 { 00504 int rc = SUCCESS; 00505 Timer timer = Timer(); 00506 00507 timer.countdown_ms(timeout_ms); 00508 while (!timer.expired()) 00509 { 00510 if (cycle(timer) < 0) 00511 { 00512 rc = FAILURE; 00513 break; 00514 } 00515 } 00516 00517 return rc; 00518 } 00519 00520 00521 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00522 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer) 00523 { 00524 /* get one piece of work off the wire and one pass through */ 00525 00526 // read the socket, see what work is due 00527 int packet_type = readPacket(timer); 00528 00529 int len = 0, 00530 rc = SUCCESS; 00531 00532 switch (packet_type) 00533 { 00534 case FAILURE: 00535 case BUFFER_OVERFLOW: 00536 rc = packet_type; 00537 break; 00538 case CONNACK: 00539 case PUBACK: 00540 case SUBACK: 00541 break; 00542 case PUBLISH: 00543 { 00544 MQTTString topicName = MQTTString_initializer; 00545 Message msg; 00546 int intQoS; 00547 if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName, 00548 (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00549 goto exit; 00550 msg.qos = (enum QoS)intQoS; 00551 #if MQTTCLIENT_QOS2 00552 if (msg.qos != QOS2) 00553 #endif 00554 deliverMessage(topicName, msg); 00555 #if MQTTCLIENT_QOS2 00556 else if (isQoS2msgidFree(msg.id)) 00557 { 00558 if (useQoS2msgid(msg.id)) 00559 deliverMessage(topicName, msg); 00560 else 00561 WARN("Maximum number of incoming QoS2 messages exceeded"); 00562 } 00563 #endif 00564 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00565 if (msg.qos != QOS0) 00566 { 00567 if (msg.qos == QOS1) 00568 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id); 00569 else if (msg.qos == QOS2) 00570 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id); 00571 if (len <= 0) 00572 rc = FAILURE; 00573 else 00574 rc = sendPacket(len, timer); 00575 if (rc == FAILURE) 00576 goto exit; // there was a problem 00577 } 00578 break; 00579 #endif 00580 } 00581 #if MQTTCLIENT_QOS2 00582 case PUBREC: 00583 case PUBREL: 00584 unsigned short mypacketid; 00585 unsigned char dup, type; 00586 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00587 rc = FAILURE; 00588 else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, 00589 (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0) 00590 rc = FAILURE; 00591 else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet 00592 rc = FAILURE; // there was a problem 00593 if (rc == FAILURE) 00594 goto exit; // there was a problem 00595 if (packet_type == PUBREL) 00596 freeQoS2msgid(mypacketid); 00597 break; 00598 00599 case PUBCOMP: 00600 break; 00601 #endif 00602 case PINGRESP: 00603 ping_outstanding = false; 00604 break; 00605 } 00606 keepalive(); 00607 exit: 00608 if (rc == SUCCESS) 00609 rc = packet_type; 00610 return rc; 00611 } 00612 00613 00614 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00615 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive() 00616 { 00617 int rc = FAILURE; 00618 00619 if (keepAliveInterval == 0) 00620 { 00621 rc = SUCCESS; 00622 goto exit; 00623 } 00624 00625 if (last_sent.expired() || last_received.expired()) 00626 { 00627 if (!ping_outstanding) 00628 { 00629 Timer timer(1000); 00630 int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE); 00631 if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet 00632 ping_outstanding = true; 00633 } 00634 } 00635 00636 exit: 00637 return rc; 00638 } 00639 00640 00641 // only used in single-threaded mode where one command at a time is in process 00642 template<class Network, class Timer, int a, int b> 00643 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer) 00644 { 00645 int rc = FAILURE; 00646 00647 do 00648 { 00649 if (timer.expired()) 00650 break; // we timed out 00651 } 00652 while ((rc = cycle(timer)) != packet_type); 00653 00654 return rc; 00655 } 00656 00657 00658 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00659 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options) 00660 { 00661 Timer connect_timer(command_timeout_ms); 00662 int rc = FAILURE; 00663 int len = 0; 00664 00665 if (isconnected) // don't send connect packet again if we are already connected 00666 goto exit; 00667 00668 this->keepAliveInterval = options.keepAliveInterval; 00669 this->cleansession = options.cleansession; 00670 if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0) 00671 goto exit; 00672 if ((rc = sendPacket(len, connect_timer)) != SUCCESS) // send the connect packet 00673 goto exit; // there was a problem 00674 00675 if (this->keepAliveInterval > 0) 00676 last_received.countdown(this->keepAliveInterval); 00677 // this will be a blocking call, wait for the connack 00678 if (waitfor(CONNACK, connect_timer) == CONNACK) 00679 { 00680 unsigned char connack_rc = 255; 00681 bool sessionPresent = false; 00682 if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00683 rc = connack_rc; 00684 else 00685 rc = FAILURE; 00686 } 00687 else 00688 rc = FAILURE; 00689 00690 #if MQTTCLIENT_QOS2 00691 // resend any inflight publish 00692 if (inflightMsgid > 0 && inflightQoS == QOS2 && pubrel) 00693 { 00694 if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0) 00695 rc = FAILURE; 00696 else 00697 rc = publish(len, connect_timer, inflightQoS); 00698 } 00699 else 00700 #endif 00701 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00702 if (inflightMsgid > 0) 00703 { 00704 memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE); 00705 rc = publish(inflightLen, connect_timer, inflightQoS); 00706 } 00707 #endif 00708 00709 exit: 00710 if (rc == SUCCESS) 00711 isconnected = true; 00712 return rc; 00713 } 00714 00715 00716 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00717 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect() 00718 { 00719 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; 00720 return connect(default_options); 00721 } 00722 00723 00724 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00725 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler) 00726 { 00727 int rc = FAILURE; 00728 Timer timer(command_timeout_ms); 00729 int len = 0; 00730 MQTTString topic = {(char*)topicFilter, {0, 0}}; 00731 00732 if (!isconnected) 00733 goto exit; 00734 00735 len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); 00736 if (len <= 0) 00737 goto exit; 00738 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet 00739 goto exit; // there was a problem 00740 00741 if (waitfor(SUBACK, timer) == SUBACK) // wait for suback 00742 { 00743 int count = 0, grantedQoS = -1; 00744 unsigned short mypacketid; 00745 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00746 rc = grantedQoS; // 0, 1, 2 or 0x80 00747 if (rc != 0x80) 00748 { 00749 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00750 { 00751 if (messageHandlers[i].topicFilter == 0) 00752 { 00753 messageHandlers[i].topicFilter = topicFilter; 00754 messageHandlers[i].fp.attach(messageHandler); 00755 rc = 0; 00756 break; 00757 } 00758 } 00759 } 00760 } 00761 else 00762 rc = FAILURE; 00763 00764 exit: 00765 if (rc != SUCCESS) 00766 cleanSession(); 00767 return rc; 00768 } 00769 00770 00771 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00772 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter) 00773 { 00774 int rc = FAILURE; 00775 Timer timer(command_timeout_ms); 00776 MQTTString topic = {(char*)topicFilter, {0, 0}}; 00777 int len = 0; 00778 00779 if (!isconnected) 00780 goto exit; 00781 00782 if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0) 00783 goto exit; 00784 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet 00785 goto exit; // there was a problem 00786 00787 if (waitfor(UNSUBACK, timer) == UNSUBACK) 00788 { 00789 unsigned short mypacketid; // should be the same as the packetid above 00790 if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00791 { 00792 rc = 0; 00793 00794 // remove the subscription message handler associated with this topic, if there is one 00795 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00796 { 00797 if (messageHandlers[i].topicFilter && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0) 00798 { 00799 messageHandlers[i].topicFilter = 0; 00800 break; 00801 } 00802 } 00803 } 00804 } 00805 else 00806 rc = FAILURE; 00807 00808 exit: 00809 if (rc != SUCCESS) 00810 cleanSession(); 00811 return rc; 00812 } 00813 00814 00815 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00816 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos) 00817 { 00818 int rc; 00819 00820 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet 00821 goto exit; // there was a problem 00822 00823 #if MQTTCLIENT_QOS1 00824 if (qos == QOS1) 00825 { 00826 if (waitfor(PUBACK, timer) == PUBACK) 00827 { 00828 unsigned short mypacketid; 00829 unsigned char dup, type; 00830 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00831 rc = FAILURE; 00832 else if (inflightMsgid == mypacketid) 00833 inflightMsgid = 0; 00834 } 00835 else 00836 rc = FAILURE; 00837 } 00838 #elif MQTTCLIENT_QOS2 00839 else if (qos == QOS2) 00840 { 00841 if (waitfor(PUBCOMP, timer) == PUBCOMP) 00842 { 00843 unsigned short mypacketid; 00844 unsigned char dup, type; 00845 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00846 rc = FAILURE; 00847 else if (inflightMsgid == mypacketid) 00848 inflightMsgid = 0; 00849 } 00850 else 00851 rc = FAILURE; 00852 } 00853 #endif 00854 00855 exit: 00856 if (rc != SUCCESS) 00857 cleanSession(); 00858 return rc; 00859 } 00860 00861 00862 00863 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00864 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained) 00865 { 00866 int rc = FAILURE; 00867 Timer timer(command_timeout_ms); 00868 MQTTString topicString = MQTTString_initializer; 00869 int len = 0; 00870 00871 if (!isconnected) 00872 goto exit; 00873 00874 topicString.cstring = (char*)topicName; 00875 00876 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00877 if (qos == QOS1 || qos == QOS2) 00878 id = packetid.getNext(); 00879 #endif 00880 00881 len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id, 00882 topicString, (unsigned char*)payload, payloadlen); 00883 if (len <= 0) 00884 goto exit; 00885 00886 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00887 if (!cleansession) 00888 { 00889 memcpy(pubbuf, sendbuf, len); 00890 inflightMsgid = id; 00891 inflightLen = len; 00892 inflightQoS = qos; 00893 #if MQTTCLIENT_QOS2 00894 pubrel = false; 00895 #endif 00896 } 00897 #endif 00898 00899 rc = publish(len, timer, qos); 00900 exit: 00901 return rc; 00902 } 00903 00904 00905 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00906 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained) 00907 { 00908 unsigned short id = 0; // dummy - not used for anything 00909 return publish(topicName, payload, payloadlen, id, qos, retained); 00910 } 00911 00912 00913 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00914 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message) 00915 { 00916 return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained); 00917 } 00918 00919 00920 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00921 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect() 00922 { 00923 int rc = FAILURE; 00924 Timer timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete 00925 int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE); 00926 if (len > 0) 00927 rc = sendPacket(len, timer); // send the disconnect packet 00928 00929 if (cleansession) 00930 cleanSession(); 00931 else 00932 isconnected = false; 00933 return rc; 00934 } 00935 00936 00937 #endif
Generated on Wed Jan 25 2023 07:20:50 by 1.7.2