Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Fork of MQTT by
MQTTClient.h
00001 /******************************************************************************* 00002 * Copyright (c) 2014 IBM Corp. 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Ian Craggs - initial API and implementation and/or initial documentation 00015 *******************************************************************************/ 00016 00017 #if !defined(MQTTCLIENT_H) 00018 #define MQTTCLIENT_H 00019 00020 #include "FP.h" 00021 #include "MQTTPacket.h" 00022 #include "stdio.h" 00023 #include "MQTTLogging.h" 00024 00025 #if !defined(MQTTCLIENT_QOS1) 00026 #define MQTTCLIENT_QOS1 1 00027 #endif 00028 #if !defined(MQTTCLIENT_QOS2) 00029 #define MQTTCLIENT_QOS2 0 00030 #endif 00031 00032 namespace MQTT 00033 { 00034 00035 00036 enum QoS { QOS0, QOS1, QOS2 }; 00037 00038 // all failure return codes must be negative 00039 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 }; 00040 00041 00042 struct Message 00043 { 00044 enum QoS qos; 00045 bool retained; 00046 bool dup; 00047 unsigned short id; 00048 void *payload; 00049 size_t payloadlen; 00050 }; 00051 00052 00053 struct MessageData 00054 { 00055 MessageData(MQTTString &aTopicName, struct Message &aMessage) : message(aMessage), topicName(aTopicName) 00056 { } 00057 00058 struct Message &message; 00059 MQTTString &topicName; 00060 }; 00061 00062 00063 class PacketId 00064 { 00065 public: 00066 PacketId() 00067 { 00068 next = 0; 00069 } 00070 00071 int getNext() 00072 { 00073 return next = (next == MAX_PACKET_ID) ? 1 : ++next; 00074 } 00075 00076 private: 00077 static const int MAX_PACKET_ID = 65535; 00078 int next; 00079 }; 00080 00081 00082 /** 00083 * @class Client 00084 * @brief blocking, non-threaded MQTT client API 00085 * 00086 * This version of the API blocks on all method calls, until they are complete. This means that only one 00087 * MQTT request can be in process at any one time. 00088 * @param Network a network class which supports send, receive 00089 * @param Timer a timer class with the methods: 00090 */ 00091 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 500, int MAX_MESSAGE_HANDLERS = 5> 00092 class Client 00093 { 00094 00095 public: 00096 00097 typedef void (*messageHandler)(MessageData&); 00098 00099 /** Construct the client 00100 * @param network - pointer to an instance of the Network class - must be connected to the endpoint 00101 * before calling MQTT connect 00102 * @param limits an instance of the Limit class - to alter limits as required 00103 */ 00104 Client(Network& network, unsigned int command_timeout_ms = 30000); 00105 00106 /** Set the default message handling callback - used for any message which does not match a subscription message handler 00107 * @param mh - pointer to the callback function 00108 */ 00109 void setDefaultMessageHandler(messageHandler mh) 00110 { 00111 defaultMessageHandler.attach(mh); 00112 } 00113 00114 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00115 * The nework object must be connected to the network endpoint before calling this 00116 * Default connect options are used 00117 * @return success code - 00118 */ 00119 int connect(); 00120 00121 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00122 * The nework object must be connected to the network endpoint before calling this 00123 * @param options - connect options 00124 * @return success code - 00125 */ 00126 int connect(MQTTPacket_connectData& options); 00127 00128 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00129 * @param topic - the topic to publish to 00130 * @param message - the message to send 00131 * @return success code - 00132 */ 00133 int publish(const char* topicName, Message& message); 00134 00135 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00136 * @param topic - the topic to publish to 00137 * @param payload - the data to send 00138 * @param payloadlen - the length of the data 00139 * @param qos - the QoS to send the publish at 00140 * @param retained - whether the message should be retained 00141 * @return success code - 00142 */ 00143 int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false); 00144 00145 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00146 * @param topic - the topic to publish to 00147 * @param payload - the data to send 00148 * @param payloadlen - the length of the data 00149 * @param id - the packet id used - returned 00150 * @param qos - the QoS to send the publish at 00151 * @param retained - whether the message should be retained 00152 * @return success code - 00153 */ 00154 int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false); 00155 00156 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback 00157 * @param topicFilter - a topic pattern which can include wildcards 00158 * @param qos - the MQTT QoS to subscribe at 00159 * @param mh - the callback function to be invoked when a message is received for this subscription 00160 * @return success code - 00161 */ 00162 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh); 00163 00164 /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback 00165 * @param topicFilter - a topic pattern which can include wildcards 00166 * @return success code - 00167 */ 00168 int unsubscribe(const char* topicFilter); 00169 00170 /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state 00171 * @return success code - 00172 */ 00173 int disconnect(); 00174 00175 /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive 00176 * yield can be called if no other MQTT operation is needed. This will also allow messages to be 00177 * received. 00178 * @param timeout_ms the time to wait, in milliseconds 00179 * @return success code - on failure, this means the client has disconnected 00180 */ 00181 int yield(unsigned long timeout_ms = 1000L); 00182 00183 /** Is the client connected? 00184 * @return flag - is the client connected or not? 00185 */ 00186 bool isConnected() 00187 { 00188 return isconnected; 00189 } 00190 00191 private: 00192 00193 int cycle(Timer& timer); 00194 int waitfor(int packet_type, Timer& timer); 00195 int keepalive(); 00196 int publish(int len, Timer& timer, enum QoS qos); 00197 00198 int decodePacket(int* value, int timeout); 00199 int readPacket(Timer& timer); 00200 int sendPacket(int length, Timer& timer); 00201 int deliverMessage(MQTTString& topicName, Message& message); 00202 bool isTopicMatched(char* topicFilter, MQTTString& topicName); 00203 00204 Network& ipstack; 00205 unsigned long command_timeout_ms; 00206 00207 unsigned char sendbuf[MAX_MQTT_PACKET_SIZE]; 00208 unsigned char readbuf[MAX_MQTT_PACKET_SIZE]; 00209 00210 Timer last_sent, last_received; 00211 unsigned int keepAliveInterval; 00212 bool ping_outstanding; 00213 bool cleansession; 00214 00215 PacketId packetid; 00216 00217 struct MessageHandlers 00218 { 00219 const char* topicFilter; 00220 FP<void, MessageData&> fp; 00221 } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic 00222 00223 FP<void, MessageData&> defaultMessageHandler; 00224 00225 bool isconnected; 00226 00227 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00228 unsigned char pubbuf[MAX_MQTT_PACKET_SIZE]; // store the last publish for sending on reconnect 00229 int inflightLen; 00230 unsigned short inflightMsgid; 00231 enum QoS inflightQoS; 00232 #endif 00233 00234 #if MQTTCLIENT_QOS2 00235 bool pubrel; 00236 #if !defined(MAX_INCOMING_QOS2_MESSAGES) 00237 #define MAX_INCOMING_QOS2_MESSAGES 10 00238 #endif 00239 unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES]; 00240 bool isQoS2msgidFree(unsigned short id); 00241 bool useQoS2msgid(unsigned short id); 00242 #endif 00243 00244 }; 00245 00246 } 00247 00248 00249 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00250 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms) : ipstack(network), packetid() 00251 { 00252 last_sent = Timer(); 00253 last_received = Timer(); 00254 ping_outstanding = false; 00255 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00256 messageHandlers[i].topicFilter = 0; 00257 this->command_timeout_ms = command_timeout_ms; 00258 isconnected = false; 00259 00260 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00261 inflightMsgid = 0; 00262 inflightQoS = QOS0; 00263 #endif 00264 00265 00266 #if MQTTCLIENT_QOS2 00267 pubrel = false; 00268 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00269 incomingQoS2messages[i] = 0; 00270 #endif 00271 } 00272 00273 #if MQTTCLIENT_QOS2 00274 template<class Network, class Timer, int a, int b> 00275 bool MQTT::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id) 00276 { 00277 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00278 { 00279 if (incomingQoS2messages[i] == id) 00280 return false; 00281 } 00282 return true; 00283 } 00284 00285 00286 template<class Network, class Timer, int a, int b> 00287 bool MQTT::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id) 00288 { 00289 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00290 { 00291 if (incomingQoS2messages[i] == 0) 00292 { 00293 incomingQoS2messages[i] = id; 00294 return true; 00295 } 00296 } 00297 return false; 00298 } 00299 #endif 00300 00301 00302 template<class Network, class Timer, int a, int b> 00303 int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer) 00304 { 00305 int rc = FAILURE, 00306 sent = 0; 00307 00308 while (sent < length && !timer.expired()) 00309 { 00310 rc = ipstack.write(&sendbuf[sent], length, timer.left_ms()); 00311 if (rc < 0) // there was an error writing the data 00312 break; 00313 sent += rc; 00314 } 00315 if (sent == length) 00316 { 00317 if (this->keepAliveInterval > 0) 00318 last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet 00319 rc = SUCCESS; 00320 } 00321 else 00322 rc = FAILURE; 00323 00324 #if defined(MQTT_DEBUG) 00325 char printbuf[50]; 00326 DEBUG("Rc %d from sending packet %s\n", rc, MQTTPacket_toString(printbuf, sizeof(printbuf), sendbuf, length)); 00327 #endif 00328 return rc; 00329 } 00330 00331 00332 template<class Network, class Timer, int a, int b> 00333 int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout) 00334 { 00335 unsigned char c; 00336 int multiplier = 1; 00337 int len = 0; 00338 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; 00339 00340 *value = 0; 00341 do 00342 { 00343 int rc = MQTTPACKET_READ_ERROR; 00344 00345 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) 00346 { 00347 rc = MQTTPACKET_READ_ERROR; /* bad data */ 00348 goto exit; 00349 } 00350 rc = ipstack.read(&c, 1, timeout); 00351 if (rc != 1) 00352 goto exit; 00353 *value += (c & 127) * multiplier; 00354 multiplier *= 128; 00355 } while ((c & 128) != 0); 00356 exit: 00357 return len; 00358 } 00359 00360 00361 /** 00362 * If any read fails in this method, then we should disconnect from the network, as on reconnect 00363 * the packets can be retried. 00364 * @param timeout the max time to wait for the packet read to complete, in milliseconds 00365 * @return the MQTT packet type, or -1 if none 00366 */ 00367 template<class Network, class Timer, int a, int b> 00368 int MQTT::Client<Network, Timer, a, b>::readPacket(Timer& timer) 00369 { 00370 int rc = FAILURE; 00371 MQTTHeader header = {0}; 00372 int len = 0; 00373 int rem_len = 0; 00374 00375 /* 1. read the header byte. This has the packet type in it */ 00376 if (ipstack.read(readbuf, 1, timer.left_ms()) != 1) 00377 goto exit; 00378 00379 len = 1; 00380 /* 2. read the remaining length. This is variable in itself */ 00381 decodePacket(&rem_len, timer.left_ms()); 00382 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */ 00383 00384 /* 3. read the rest of the buffer using a callback to supply the rest of the data */ 00385 if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len)) 00386 goto exit; 00387 00388 header.byte = readbuf[0]; 00389 rc = header.bits.type; 00390 if (this->keepAliveInterval > 0) 00391 last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet 00392 exit: 00393 00394 #if defined(MQTT_DEBUG) 00395 char printbuf[50]; 00396 DEBUG("Rc %d from receiving packet %s\n", rc, MQTTPacket_toString(printbuf, sizeof(printbuf), readbuf, len)); 00397 #endif 00398 return rc; 00399 } 00400 00401 00402 // assume topic filter and name is in correct format 00403 // # can only be at end 00404 // + and # can only be next to separator 00405 template<class Network, class Timer, int a, int b> 00406 bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName) 00407 { 00408 char* curf = topicFilter; 00409 char* curn = topicName.lenstring.data; 00410 char* curn_end = curn + topicName.lenstring.len; 00411 00412 while (*curf && curn < curn_end) 00413 { 00414 if (*curn == '/' && *curf != '/') 00415 break; 00416 if (*curf != '+' && *curf != '#' && *curf != *curn) 00417 break; 00418 if (*curf == '+') 00419 { // skip until we meet the next separator, or end of string 00420 char* nextpos = curn + 1; 00421 while (nextpos < curn_end && *nextpos != '/') 00422 nextpos = ++curn + 1; 00423 } 00424 else if (*curf == '#') 00425 curn = curn_end - 1; // skip until end of string 00426 curf++; 00427 curn++; 00428 }; 00429 00430 return (curn == curn_end) && (*curf == '\0'); 00431 } 00432 00433 00434 00435 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00436 int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message) 00437 { 00438 int rc = FAILURE; 00439 00440 // we have to find the right message handler - indexed by topic 00441 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00442 { 00443 if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) || 00444 isTopicMatched((char*)messageHandlers[i].topicFilter, topicName))) 00445 { 00446 if (messageHandlers[i].fp.attached()) 00447 { 00448 MessageData md(topicName, message); 00449 messageHandlers[i].fp(md); 00450 rc = SUCCESS; 00451 } 00452 } 00453 } 00454 00455 if (rc == FAILURE && defaultMessageHandler.attached()) 00456 { 00457 MessageData md(topicName, message); 00458 defaultMessageHandler(md); 00459 rc = SUCCESS; 00460 } 00461 00462 return rc; 00463 } 00464 00465 00466 00467 template<class Network, class Timer, int a, int b> 00468 int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms) 00469 { 00470 int rc = SUCCESS; 00471 Timer timer = Timer(); 00472 00473 timer.countdown_ms(timeout_ms); 00474 while (!timer.expired()) 00475 { 00476 if (cycle(timer) == FAILURE) 00477 { 00478 rc = FAILURE; 00479 break; 00480 } 00481 } 00482 00483 return rc; 00484 } 00485 00486 00487 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00488 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer) 00489 { 00490 /* get one piece of work off the wire and one pass through */ 00491 00492 // read the socket, see what work is due 00493 unsigned short packet_type = readPacket(timer); 00494 00495 int len = 0, 00496 rc = SUCCESS; 00497 00498 switch (packet_type) 00499 { 00500 case CONNACK: 00501 case PUBACK: 00502 case SUBACK: 00503 break; 00504 case PUBLISH: 00505 MQTTString topicName; 00506 Message msg; 00507 if (MQTTDeserialize_publish((unsigned char*)&msg.dup, (int*)&msg.qos, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName, 00508 (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00509 goto exit; 00510 #if MQTTCLIENT_QOS2 00511 if (msg.qos != QOS2) 00512 #endif 00513 deliverMessage(topicName, msg); 00514 #if MQTTCLIENT_QOS2 00515 else if (isQoS2msgidFree(msg.id)) 00516 { 00517 if (useQoS2msgid(msg.id)) 00518 deliverMessage(topicName, msg); 00519 else 00520 WARN("Maximum number of incoming QoS2 messages exceeded"); 00521 } 00522 #endif 00523 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00524 if (msg.qos != QOS0) 00525 { 00526 if (msg.qos == QOS1) 00527 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id); 00528 else if (msg.qos == QOS2) 00529 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id); 00530 if (len <= 0) 00531 rc = FAILURE; 00532 else 00533 rc = sendPacket(len, timer); 00534 if (rc == FAILURE) 00535 goto exit; // there was a problem 00536 } 00537 break; 00538 #endif 00539 #if MQTTCLIENT_QOS2 00540 case PUBREC: 00541 unsigned short mypacketid; 00542 unsigned char dup, type; 00543 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00544 rc = FAILURE; 00545 else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid)) <= 0) 00546 rc = FAILURE; 00547 else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet 00548 rc = FAILURE; // there was a problem 00549 if (rc == FAILURE) 00550 goto exit; // there was a problem 00551 break; 00552 case PUBCOMP: 00553 break; 00554 #endif 00555 case PINGRESP: 00556 ping_outstanding = false; 00557 break; 00558 } 00559 keepalive(); 00560 exit: 00561 if (rc == SUCCESS) 00562 rc = packet_type; 00563 return rc; 00564 } 00565 00566 00567 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00568 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive() 00569 { 00570 int rc = FAILURE; 00571 00572 if (keepAliveInterval == 0) 00573 { 00574 rc = SUCCESS; 00575 goto exit; 00576 } 00577 00578 if (last_sent.expired() || last_received.expired()) 00579 { 00580 if (!ping_outstanding) 00581 { 00582 Timer timer = Timer(1000); 00583 int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE); 00584 if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet 00585 ping_outstanding = true; 00586 } 00587 } 00588 00589 exit: 00590 return rc; 00591 } 00592 00593 00594 // only used in single-threaded mode where one command at a time is in process 00595 template<class Network, class Timer, int a, int b> 00596 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer) 00597 { 00598 int rc = FAILURE; 00599 00600 do 00601 { 00602 if (timer.expired()) 00603 break; // we timed out 00604 } 00605 while ((rc = cycle(timer)) != packet_type); 00606 00607 return rc; 00608 } 00609 00610 00611 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00612 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options) 00613 { 00614 Timer connect_timer = Timer(command_timeout_ms); 00615 int rc = FAILURE; 00616 int len = 0; 00617 00618 if (isconnected) // don't send connect packet again if we are already connected 00619 goto exit; 00620 00621 this->keepAliveInterval = options.keepAliveInterval; 00622 this->cleansession = options.cleansession; 00623 if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0) 00624 goto exit; 00625 if ((rc = sendPacket(len, connect_timer)) != SUCCESS) // send the connect packet 00626 goto exit; // there was a problem 00627 00628 if (this->keepAliveInterval > 0) 00629 last_received.countdown(this->keepAliveInterval); 00630 // this will be a blocking call, wait for the connack 00631 if (waitfor(CONNACK, connect_timer) == CONNACK) 00632 { 00633 unsigned char connack_rc = 255; 00634 bool sessionPresent = false; 00635 if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00636 rc = connack_rc; 00637 else 00638 rc = FAILURE; 00639 } 00640 else 00641 rc = FAILURE; 00642 00643 #if MQTTCLIENT_QOS2 00644 // resend an inflight publish 00645 if (inflightMsgid >0 && inflightQoS == QOS2 && pubrel) 00646 { 00647 if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0) 00648 rc = FAILURE; 00649 else 00650 rc = publish(len, connect_timer, inflightQoS); 00651 } 00652 else 00653 #endif 00654 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00655 if (inflightMsgid > 0) 00656 { 00657 memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE); 00658 rc = publish(inflightLen, connect_timer, inflightQoS); 00659 } 00660 #endif 00661 00662 exit: 00663 if (rc == SUCCESS) 00664 isconnected = true; 00665 return rc; 00666 } 00667 00668 00669 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00670 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect() 00671 { 00672 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; 00673 return connect(default_options); 00674 } 00675 00676 00677 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00678 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler) 00679 { 00680 int rc = FAILURE; 00681 Timer timer = Timer(command_timeout_ms); 00682 int len = 0; 00683 MQTTString topic = {(char*)topicFilter, 0, 0}; 00684 00685 if (!isconnected) 00686 goto exit; 00687 00688 len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); 00689 if (len <= 0) 00690 goto exit; 00691 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet 00692 goto exit; // there was a problem 00693 00694 if (waitfor(SUBACK, timer) == SUBACK) // wait for suback 00695 { 00696 int count = 0, grantedQoS = -1; 00697 unsigned short mypacketid; 00698 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00699 rc = grantedQoS; // 0, 1, 2 or 0x80 00700 if (rc != 0x80) 00701 { 00702 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00703 { 00704 if (messageHandlers[i].topicFilter == 0) 00705 { 00706 messageHandlers[i].topicFilter = topicFilter; 00707 messageHandlers[i].fp.attach(messageHandler); 00708 rc = 0; 00709 break; 00710 } 00711 } 00712 } 00713 } 00714 else 00715 rc = FAILURE; 00716 00717 exit: 00718 if (rc != SUCCESS) 00719 isconnected = false; 00720 return rc; 00721 } 00722 00723 00724 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00725 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter) 00726 { 00727 int rc = FAILURE; 00728 Timer timer = Timer(command_timeout_ms); 00729 MQTTString topic = {(char*)topicFilter, 0, 0}; 00730 int len = 0; 00731 00732 if (!isconnected) 00733 goto exit; 00734 00735 if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0) 00736 goto exit; 00737 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet 00738 goto exit; // there was a problem 00739 00740 if (waitfor(UNSUBACK, timer) == UNSUBACK) 00741 { 00742 unsigned short mypacketid; // should be the same as the packetid above 00743 if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00744 rc = 0; 00745 } 00746 else 00747 rc = FAILURE; 00748 00749 exit: 00750 if (rc != SUCCESS) 00751 isconnected = false; 00752 return rc; 00753 } 00754 00755 00756 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00757 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos) 00758 { 00759 int rc; 00760 00761 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet 00762 goto exit; // there was a problem 00763 00764 #if MQTTCLIENT_QOS1 00765 if (qos == QOS1) 00766 { 00767 if (waitfor(PUBACK, timer) == PUBACK) 00768 { 00769 unsigned short mypacketid; 00770 unsigned char dup, type; 00771 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00772 rc = FAILURE; 00773 else if (inflightMsgid == mypacketid) 00774 inflightMsgid = 0; 00775 } 00776 else 00777 rc = FAILURE; 00778 } 00779 #elif MQTTCLIENT_QOS2 00780 else if (qos == QOS2) 00781 { 00782 if (waitfor(PUBCOMP, timer) == PUBCOMP) 00783 { 00784 unsigned short mypacketid; 00785 unsigned char dup, type; 00786 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00787 rc = FAILURE; 00788 else if (inflightMsgid == mypacketid) 00789 inflightMsgid = 0; 00790 } 00791 else 00792 rc = FAILURE; 00793 } 00794 #endif 00795 00796 exit: 00797 if (rc != SUCCESS) 00798 isconnected = false; 00799 return rc; 00800 } 00801 00802 00803 00804 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00805 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained) 00806 { 00807 int rc = FAILURE; 00808 Timer timer = Timer(command_timeout_ms); 00809 MQTTString topicString = MQTTString_initializer; 00810 int len = 0; 00811 00812 if (!isconnected) 00813 goto exit; 00814 00815 topicString.cstring = (char*)topicName; 00816 00817 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00818 if (qos == QOS1 || qos == QOS2) 00819 id = packetid.getNext(); 00820 #endif 00821 00822 len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id, 00823 topicString, (unsigned char*)payload, payloadlen); 00824 if (len <= 0) 00825 goto exit; 00826 00827 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00828 if (!cleansession) 00829 { 00830 memcpy(pubbuf, sendbuf, len); 00831 inflightMsgid = id; 00832 inflightLen = len; 00833 inflightQoS = qos; 00834 #if MQTTCLIENT_QOS2 00835 pubrel = false; 00836 #endif 00837 } 00838 #endif 00839 00840 rc = publish(len, timer, qos); 00841 exit: 00842 return rc; 00843 } 00844 00845 00846 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00847 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained) 00848 { 00849 unsigned short id = 0; // dummy - not used for anything 00850 return publish(topicName, payload, payloadlen, id, qos, retained); 00851 } 00852 00853 00854 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00855 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message) 00856 { 00857 return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained); 00858 } 00859 00860 00861 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00862 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect() 00863 { 00864 int rc = FAILURE; 00865 Timer timer = Timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete 00866 int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE); 00867 if (len > 0) 00868 rc = sendPacket(len, timer); // send the disconnect packet 00869 00870 isconnected = false; 00871 return rc; 00872 } 00873 00874 00875 #endif
Generated on Fri Jul 15 2022 17:16:07 by
1.7.2
