No idea why this is forking.
Dependents: MQTTw7500 MQTTw7500 wizwiki_wizfi310_mqtt MQTT_TestByChandana ... more
Fork of MQTT by
MQTTClient.h
00001 /******************************************************************************* 00002 * Copyright (c) 2014 IBM Corp. 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Ian Craggs - initial API and implementation and/or initial documentation 00015 *******************************************************************************/ 00016 00017 #if !defined(MQTTCLIENT_H) 00018 #define MQTTCLIENT_H 00019 00020 #include "FP.h" 00021 #include "MQTTPacket.h" 00022 #include "stdio.h" 00023 #include "MQTTLogging.h" 00024 00025 #if !defined(MQTTCLIENT_QOS1) 00026 #define MQTTCLIENT_QOS1 1 00027 #endif 00028 #if !defined(MQTTCLIENT_QOS2) 00029 #define MQTTCLIENT_QOS2 0 00030 #endif 00031 00032 namespace MQTT 00033 { 00034 00035 00036 enum QoS { QOS0, QOS1, QOS2 }; 00037 00038 // all failure return codes must be negative 00039 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 }; 00040 00041 00042 struct Message 00043 { 00044 enum QoS qos; 00045 bool retained; 00046 bool dup; 00047 unsigned short id; 00048 void *payload; 00049 size_t payloadlen; 00050 }; 00051 00052 00053 struct MessageData 00054 { 00055 MessageData(MQTTString &aTopicName, struct Message &aMessage) : message(aMessage), topicName(aTopicName) 00056 { } 00057 00058 struct Message &message; 00059 MQTTString &topicName; 00060 }; 00061 00062 00063 class PacketId 00064 { 00065 public: 00066 PacketId() 00067 { 00068 next = 0; 00069 } 00070 00071 int getNext() 00072 { 00073 return next = (next == MAX_PACKET_ID) ? 1 : ++next; 00074 } 00075 00076 private: 00077 static const int MAX_PACKET_ID = 65535; 00078 int next; 00079 }; 00080 00081 00082 /** 00083 * @class Client 00084 * @brief blocking, non-threaded MQTT client API 00085 * 00086 * This version of the API blocks on all method calls, until they are complete. This means that only one 00087 * MQTT request can be in process at any one time. 00088 * @param Network a network class which supports send, receive 00089 * @param Timer a timer class with the methods: 00090 */ 00091 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5> 00092 class Client 00093 { 00094 00095 public: 00096 00097 typedef void (*messageHandler)(MessageData&); 00098 00099 /** Construct the client 00100 * @param network - pointer to an instance of the Network class - must be connected to the endpoint 00101 * before calling MQTT connect 00102 * @param limits an instance of the Limit class - to alter limits as required 00103 */ 00104 Client(Network& network, unsigned int command_timeout_ms = 30000); 00105 00106 /** Set the default message handling callback - used for any message which does not match a subscription message handler 00107 * @param mh - pointer to the callback function 00108 */ 00109 void setDefaultMessageHandler(messageHandler mh) 00110 { 00111 defaultMessageHandler.attach(mh); 00112 } 00113 00114 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00115 * The nework object must be connected to the network endpoint before calling this 00116 * Default connect options are used 00117 * @return success code - 00118 */ 00119 int connect(); 00120 00121 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00122 * The nework object must be connected to the network endpoint before calling this 00123 * @param options - connect options 00124 * @return success code - 00125 */ 00126 int connect(MQTTPacket_connectData& options); 00127 00128 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00129 * @param topic - the topic to publish to 00130 * @param message - the message to send 00131 * @return success code - 00132 */ 00133 int publish(const char* topicName, Message& message); 00134 00135 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00136 * @param topic - the topic to publish to 00137 * @param payload - the data to send 00138 * @param payloadlen - the length of the data 00139 * @param qos - the QoS to send the publish at 00140 * @param retained - whether the message should be retained 00141 * @return success code - 00142 */ 00143 int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false); 00144 00145 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00146 * @param topic - the topic to publish to 00147 * @param payload - the data to send 00148 * @param payloadlen - the length of the data 00149 * @param id - the packet id used - returned 00150 * @param qos - the QoS to send the publish at 00151 * @param retained - whether the message should be retained 00152 * @return success code - 00153 */ 00154 int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false); 00155 00156 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback 00157 * @param topicFilter - a topic pattern which can include wildcards 00158 * @param qos - the MQTT QoS to subscribe at 00159 * @param mh - the callback function to be invoked when a message is received for this subscription 00160 * @return success code - 00161 */ 00162 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh); 00163 00164 /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback 00165 * @param topicFilter - a topic pattern which can include wildcards 00166 * @return success code - 00167 */ 00168 int unsubscribe(const char* topicFilter); 00169 00170 /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state 00171 * @return success code - 00172 */ 00173 int disconnect(); 00174 00175 /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive 00176 * yield can be called if no other MQTT operation is needed. This will also allow messages to be 00177 * received. 00178 * @param timeout_ms the time to wait, in milliseconds 00179 * @return success code - on failure, this means the client has disconnected 00180 */ 00181 int yield(unsigned long timeout_ms = 1000L); 00182 00183 /** Is the client connected? 00184 * @return flag - is the client connected or not? 00185 */ 00186 bool isConnected() 00187 { 00188 return isconnected; 00189 } 00190 00191 private: 00192 00193 int cycle(Timer& timer); 00194 int waitfor(int packet_type, Timer& timer); 00195 int keepalive(); 00196 int publish(int len, Timer& timer, enum QoS qos); 00197 00198 int decodePacket(int* value, int timeout); 00199 int readPacket(Timer& timer); 00200 int sendPacket(int length, Timer& timer); 00201 int deliverMessage(MQTTString& topicName, Message& message); 00202 bool isTopicMatched(char* topicFilter, MQTTString& topicName); 00203 00204 Network& ipstack; 00205 unsigned long command_timeout_ms; 00206 00207 unsigned char sendbuf[MAX_MQTT_PACKET_SIZE]; 00208 unsigned char readbuf[MAX_MQTT_PACKET_SIZE]; 00209 00210 Timer last_sent, last_received; 00211 unsigned int keepAliveInterval; 00212 bool ping_outstanding; 00213 bool cleansession; 00214 00215 PacketId packetid; 00216 00217 struct MessageHandlers 00218 { 00219 const char* topicFilter; 00220 FP<void, MessageData&> fp; 00221 } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic 00222 00223 FP<void, MessageData&> defaultMessageHandler; 00224 00225 bool isconnected; 00226 00227 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00228 unsigned char pubbuf[MAX_MQTT_PACKET_SIZE]; // store the last publish for sending on reconnect 00229 int inflightLen; 00230 unsigned short inflightMsgid; 00231 enum QoS inflightQoS; 00232 #endif 00233 00234 #if MQTTCLIENT_QOS2 00235 bool pubrel; 00236 #if !defined(MAX_INCOMING_QOS2_MESSAGES) 00237 #define MAX_INCOMING_QOS2_MESSAGES 10 00238 #endif 00239 unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES]; 00240 bool isQoS2msgidFree(unsigned short id); 00241 bool useQoS2msgid(unsigned short id); 00242 #endif 00243 00244 }; 00245 00246 } 00247 00248 00249 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00250 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms) : ipstack(network), packetid() 00251 { 00252 last_sent = Timer(); 00253 last_received = Timer(); 00254 ping_outstanding = false; 00255 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00256 messageHandlers[i].topicFilter = 0; 00257 this->command_timeout_ms = command_timeout_ms; 00258 isconnected = false; 00259 00260 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00261 inflightMsgid = 0; 00262 inflightQoS = QOS0; 00263 #endif 00264 00265 00266 #if MQTTCLIENT_QOS2 00267 pubrel = false; 00268 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00269 incomingQoS2messages[i] = 0; 00270 #endif 00271 } 00272 00273 #if MQTTCLIENT_QOS2 00274 template<class Network, class Timer, int a, int b> 00275 bool MQTT::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id) 00276 { 00277 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00278 { 00279 if (incomingQoS2messages[i] == id) 00280 return false; 00281 } 00282 return true; 00283 } 00284 00285 00286 template<class Network, class Timer, int a, int b> 00287 bool MQTT::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id) 00288 { 00289 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00290 { 00291 if (incomingQoS2messages[i] == 0) 00292 { 00293 incomingQoS2messages[i] = id; 00294 return true; 00295 } 00296 } 00297 return false; 00298 } 00299 #endif 00300 00301 00302 template<class Network, class Timer, int a, int b> 00303 int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer) 00304 { 00305 int rc = FAILURE, 00306 sent = 0; 00307 00308 while (sent < length && !timer.expired()) 00309 { 00310 rc = ipstack.write(&sendbuf[sent], length, timer.left_ms()); 00311 if (rc < 0) // there was an error writing the data 00312 break; 00313 sent += rc; 00314 } 00315 if (sent == length) 00316 { 00317 if (this->keepAliveInterval > 0) 00318 last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet 00319 rc = SUCCESS; 00320 } 00321 else 00322 rc = FAILURE; 00323 00324 #if defined(MQTT_DEBUG) 00325 char printbuf[50]; 00326 DEBUG("Rc %d from sending packet %s\n", rc, MQTTPacket_toString(printbuf, sizeof(printbuf), sendbuf, length)); 00327 #endif 00328 return rc; 00329 } 00330 00331 00332 template<class Network, class Timer, int a, int b> 00333 int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout) 00334 { 00335 unsigned char c; 00336 int multiplier = 1; 00337 int len = 0; 00338 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; 00339 00340 *value = 0; 00341 do 00342 { 00343 int rc = MQTTPACKET_READ_ERROR; 00344 00345 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) 00346 { 00347 rc = MQTTPACKET_READ_ERROR; /* bad data */ 00348 goto exit; 00349 } 00350 rc = ipstack.read(&c, 1, timeout); 00351 if (rc != 1) 00352 goto exit; 00353 *value += (c & 127) * multiplier; 00354 multiplier *= 128; 00355 } while ((c & 128) != 0); 00356 exit: 00357 return len; 00358 } 00359 00360 00361 /** 00362 * If any read fails in this method, then we should disconnect from the network, as on reconnect 00363 * the packets can be retried. 00364 * @param timeout the max time to wait for the packet read to complete, in milliseconds 00365 * @return the MQTT packet type, or -1 if none 00366 */ 00367 template<class Network, class Timer, int a, int b> 00368 int MQTT::Client<Network, Timer, a, b>::readPacket(Timer& timer) 00369 { 00370 int rc = FAILURE; 00371 MQTTHeader header = {0}; 00372 int len = 0; 00373 int rem_len = 0; 00374 00375 /* 1. read the header byte. This has the packet type in it */ 00376 if (ipstack.read(readbuf, 1, timer.left_ms()) != 1) 00377 goto exit; 00378 00379 len = 1; 00380 /* 2. read the remaining length. This is variable in itself */ 00381 decodePacket(&rem_len, timer.left_ms()); 00382 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */ 00383 00384 /* 3. read the rest of the buffer using a callback to supply the rest of the data */ 00385 if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len)) 00386 goto exit; 00387 00388 header.byte = readbuf[0]; 00389 rc = header.bits.type; 00390 if (this->keepAliveInterval > 0) 00391 last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet 00392 exit: 00393 00394 #if defined(MQTT_DEBUG) 00395 char printbuf[50]; 00396 DEBUG("Rc %d from receiving packet %s\n", rc, MQTTPacket_toString(printbuf, sizeof(printbuf), readbuf, len)); 00397 #endif 00398 return rc; 00399 } 00400 00401 00402 // assume topic filter and name is in correct format 00403 // # can only be at end 00404 // + and # can only be next to separator 00405 template<class Network, class Timer, int a, int b> 00406 bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName) 00407 { 00408 char* curf = topicFilter; 00409 char* curn = topicName.lenstring.data; 00410 char* curn_end = curn + topicName.lenstring.len; 00411 00412 while (*curf && curn < curn_end) 00413 { 00414 if (*curn == '/' && *curf != '/') 00415 break; 00416 if (*curf != '+' && *curf != '#' && *curf != *curn) 00417 break; 00418 if (*curf == '+') 00419 { // skip until we meet the next separator, or end of string 00420 char* nextpos = curn + 1; 00421 while (nextpos < curn_end && *nextpos != '/') 00422 nextpos = ++curn + 1; 00423 } 00424 else if (*curf == '#') 00425 curn = curn_end - 1; // skip until end of string 00426 curf++; 00427 curn++; 00428 }; 00429 00430 return (curn == curn_end) && (*curf == '\0'); 00431 } 00432 00433 00434 00435 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00436 int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message) 00437 { 00438 int rc = FAILURE; 00439 00440 // we have to find the right message handler - indexed by topic 00441 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00442 { 00443 if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) || 00444 isTopicMatched((char*)messageHandlers[i].topicFilter, topicName))) 00445 { 00446 if (messageHandlers[i].fp.attached()) 00447 { 00448 MessageData md(topicName, message); 00449 messageHandlers[i].fp(md); 00450 rc = SUCCESS; 00451 } 00452 } 00453 } 00454 00455 if (rc == FAILURE && defaultMessageHandler.attached()) 00456 { 00457 MessageData md(topicName, message); 00458 defaultMessageHandler(md); 00459 rc = SUCCESS; 00460 } 00461 00462 return rc; 00463 } 00464 00465 00466 00467 template<class Network, class Timer, int a, int b> 00468 int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms) 00469 { 00470 int rc = SUCCESS; 00471 Timer timer = Timer(); 00472 00473 timer.countdown_ms(timeout_ms); 00474 while (!timer.expired()) 00475 { 00476 if (cycle(timer) == FAILURE) 00477 { 00478 rc = FAILURE; 00479 break; 00480 } 00481 } 00482 00483 return rc; 00484 } 00485 00486 00487 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00488 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer) 00489 { 00490 /* get one piece of work off the wire and one pass through */ 00491 00492 // read the socket, see what work is due 00493 unsigned short packet_type = readPacket(timer); 00494 00495 int len = 0, 00496 rc = SUCCESS; 00497 00498 switch (packet_type) 00499 { 00500 case CONNACK: 00501 case PUBACK: 00502 case SUBACK: 00503 break; 00504 case PUBLISH: 00505 MQTTString topicName; 00506 Message msg; 00507 if (MQTTDeserialize_publish((unsigned char*)&msg.dup, (int*)&msg.qos, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName, 00508 (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00509 goto exit; 00510 #if MQTTCLIENT_QOS2 00511 if (msg.qos != QOS2) 00512 #endif 00513 deliverMessage(topicName, msg); 00514 #if MQTTCLIENT_QOS2 00515 else if (isQoS2msgidFree(msg.id)) 00516 { 00517 if (useQoS2msgid(msg.id)) 00518 deliverMessage(topicName, msg); 00519 else 00520 WARN("Maximum number of incoming QoS2 messages exceeded"); 00521 } 00522 #endif 00523 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00524 if (msg.qos != QOS0) 00525 { 00526 if (msg.qos == QOS1) 00527 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id); 00528 else if (msg.qos == QOS2) 00529 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id); 00530 if (len <= 0) 00531 rc = FAILURE; 00532 else 00533 rc = sendPacket(len, timer); 00534 if (rc == FAILURE) 00535 goto exit; // there was a problem 00536 } 00537 break; 00538 #endif 00539 #if MQTTCLIENT_QOS2 00540 case PUBREC: 00541 unsigned short mypacketid; 00542 unsigned char dup, type; 00543 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00544 rc = FAILURE; 00545 else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid)) <= 0) 00546 rc = FAILURE; 00547 else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet 00548 rc = FAILURE; // there was a problem 00549 if (rc == FAILURE) 00550 goto exit; // there was a problem 00551 break; 00552 case PUBCOMP: 00553 break; 00554 #endif 00555 case PINGRESP: 00556 ping_outstanding = false; 00557 break; 00558 } 00559 keepalive(); 00560 exit: 00561 if (rc == SUCCESS) 00562 rc = packet_type; 00563 return rc; 00564 } 00565 00566 00567 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00568 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive() 00569 { 00570 int rc = FAILURE; 00571 00572 if (keepAliveInterval == 0) 00573 { 00574 rc = SUCCESS; 00575 goto exit; 00576 } 00577 00578 if (last_sent.expired() || last_received.expired()) 00579 { 00580 if (!ping_outstanding) 00581 { 00582 Timer timer = Timer(1000); 00583 int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE); 00584 if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet 00585 ping_outstanding = true; 00586 } 00587 } 00588 00589 exit: 00590 return rc; 00591 } 00592 00593 00594 // only used in single-threaded mode where one command at a time is in process 00595 template<class Network, class Timer, int a, int b> 00596 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer) 00597 { 00598 int rc = FAILURE; 00599 00600 do 00601 { 00602 if (timer.expired()) 00603 break; // we timed out 00604 } 00605 while ((rc = cycle(timer)) != packet_type); 00606 00607 return rc; 00608 } 00609 00610 00611 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00612 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options) 00613 { 00614 Timer connect_timer = Timer(command_timeout_ms); 00615 int rc = FAILURE; 00616 int len = 0; 00617 00618 if (isconnected) // don't send connect packet again if we are already connected 00619 goto exit; 00620 00621 this->keepAliveInterval = options.keepAliveInterval; 00622 this->cleansession = options.cleansession; 00623 if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0) 00624 goto exit; 00625 if ((rc = sendPacket(len, connect_timer)) != SUCCESS) // send the connect packet 00626 goto exit; // there was a problem 00627 00628 if (this->keepAliveInterval > 0) 00629 last_received.countdown(this->keepAliveInterval); 00630 // this will be a blocking call, wait for the connack 00631 if (waitfor(CONNACK, connect_timer) == CONNACK) 00632 { 00633 unsigned char connack_rc = 255; 00634 bool sessionPresent = false; 00635 if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00636 rc = connack_rc; 00637 else 00638 rc = FAILURE; 00639 } 00640 else 00641 rc = FAILURE; 00642 00643 #if MQTTCLIENT_QOS2 00644 // resend an inflight publish 00645 if (inflightMsgid >0 && inflightQoS == QOS2 && pubrel) 00646 { 00647 if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0) 00648 rc = FAILURE; 00649 else 00650 rc = publish(len, connect_timer, inflightQoS); 00651 } 00652 else 00653 #endif 00654 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00655 if (inflightMsgid > 0) 00656 { 00657 memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE); 00658 rc = publish(inflightLen, connect_timer, inflightQoS); 00659 } 00660 #endif 00661 00662 exit: 00663 if (rc == SUCCESS) 00664 isconnected = true; 00665 return rc; 00666 } 00667 00668 00669 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00670 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect() 00671 { 00672 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; 00673 return connect(default_options); 00674 } 00675 00676 00677 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00678 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler) 00679 { 00680 int rc = FAILURE; 00681 Timer timer = Timer(command_timeout_ms); 00682 int len = 0; 00683 MQTTString topic = {(char*)topicFilter, 0, 0}; 00684 00685 if (!isconnected) 00686 goto exit; 00687 00688 len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); 00689 if (len <= 0) 00690 goto exit; 00691 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet 00692 goto exit; // there was a problem 00693 00694 if (waitfor(SUBACK, timer) == SUBACK) // wait for suback 00695 { 00696 int count = 0, grantedQoS = -1; 00697 unsigned short mypacketid; 00698 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00699 rc = grantedQoS; // 0, 1, 2 or 0x80 00700 if (rc != 0x80) 00701 { 00702 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00703 { 00704 if (messageHandlers[i].topicFilter == 0) 00705 { 00706 messageHandlers[i].topicFilter = topicFilter; 00707 messageHandlers[i].fp.attach(messageHandler); 00708 rc = 0; 00709 break; 00710 } 00711 } 00712 } 00713 } 00714 else 00715 rc = FAILURE; 00716 00717 exit: 00718 if (rc != SUCCESS) 00719 isconnected = false; 00720 return rc; 00721 } 00722 00723 00724 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00725 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter) 00726 { 00727 int rc = FAILURE; 00728 Timer timer = Timer(command_timeout_ms); 00729 MQTTString topic = {(char*)topicFilter, 0, 0}; 00730 int len = 0; 00731 00732 if (!isconnected) 00733 goto exit; 00734 00735 if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0) 00736 goto exit; 00737 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet 00738 goto exit; // there was a problem 00739 00740 if (waitfor(UNSUBACK, timer) == UNSUBACK) 00741 { 00742 unsigned short mypacketid; // should be the same as the packetid above 00743 if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00744 rc = 0; 00745 } 00746 else 00747 rc = FAILURE; 00748 00749 exit: 00750 if (rc != SUCCESS) 00751 isconnected = false; 00752 return rc; 00753 } 00754 00755 00756 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00757 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos) 00758 { 00759 int rc; 00760 00761 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet 00762 goto exit; // there was a problem 00763 00764 #if MQTTCLIENT_QOS1 00765 if (qos == QOS1) 00766 { 00767 if (waitfor(PUBACK, timer) == PUBACK) 00768 { 00769 unsigned short mypacketid; 00770 unsigned char dup, type; 00771 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00772 rc = FAILURE; 00773 else if (inflightMsgid == mypacketid) 00774 inflightMsgid = 0; 00775 } 00776 else 00777 rc = FAILURE; 00778 } 00779 #elif MQTTCLIENT_QOS2 00780 else if (qos == QOS2) 00781 { 00782 if (waitfor(PUBCOMP, timer) == PUBCOMP) 00783 { 00784 unsigned short mypacketid; 00785 unsigned char dup, type; 00786 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00787 rc = FAILURE; 00788 else if (inflightMsgid == mypacketid) 00789 inflightMsgid = 0; 00790 } 00791 else 00792 rc = FAILURE; 00793 } 00794 #endif 00795 00796 exit: 00797 if (rc != SUCCESS) 00798 isconnected = false; 00799 return rc; 00800 } 00801 00802 00803 00804 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00805 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained) 00806 { 00807 int rc = FAILURE; 00808 Timer timer = Timer(command_timeout_ms); 00809 MQTTString topicString = MQTTString_initializer; 00810 int len = 0; 00811 00812 if (!isconnected) 00813 goto exit; 00814 00815 topicString.cstring = (char*)topicName; 00816 00817 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00818 if (qos == QOS1 || qos == QOS2) 00819 id = packetid.getNext(); 00820 #endif 00821 00822 len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id, 00823 topicString, (unsigned char*)payload, payloadlen); 00824 if (len <= 0) 00825 goto exit; 00826 00827 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00828 if (!cleansession) 00829 { 00830 memcpy(pubbuf, sendbuf, len); 00831 inflightMsgid = id; 00832 inflightLen = len; 00833 inflightQoS = qos; 00834 #if MQTTCLIENT_QOS2 00835 pubrel = false; 00836 #endif 00837 } 00838 #endif 00839 00840 rc = publish(len, timer, qos); 00841 exit: 00842 return rc; 00843 } 00844 00845 00846 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00847 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained) 00848 { 00849 unsigned short id = 0; // dummy - not used for anything 00850 return publish(topicName, payload, payloadlen, id, qos, retained); 00851 } 00852 00853 00854 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00855 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message) 00856 { 00857 return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained); 00858 } 00859 00860 00861 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 00862 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect() 00863 { 00864 int rc = FAILURE; 00865 Timer timer = Timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete 00866 int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE); 00867 if (len > 0) 00868 rc = sendPacket(len, timer); // send the disconnect packet 00869 00870 isconnected = false; 00871 return rc; 00872 } 00873 00874 00875 #endif
Generated on Tue Jul 12 2022 15:44:48 by 1.7.2