Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
MQTTSNClient.h
00001 /******************************************************************************* 00002 * Copyright (c) 2014, 2015 IBM Corp. 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Ian Craggs - initial API and implementation and/or initial documentation 00015 *******************************************************************************/ 00016 00017 #if !defined(MQTTSNCLIENT_H) 00018 #define MQTTSNCLIENT_H 00019 00020 #include "FP.h" 00021 #include "MQTTSNPacket.h" 00022 #include "stdio.h" 00023 #include "MQTTLogging.h" 00024 00025 // Data limits 00026 #if !defined(MAX_REGISTRATIONS) 00027 #define MAX_REGISTRATIONS 5 00028 #endif 00029 #if !defined(MAX_REGISTRATION_TOPIC_NAME_LENGTH) 00030 #define MAX_REGISTRATION_TOPIC_NAME_LENGTH 20 00031 #endif 00032 #if !defined(MAX_INCOMING_QOS2_MESSAGES) 00033 #define MAX_INCOMING_QOS2_MESSAGES 10 00034 #endif 00035 00036 #if !defined(MQTTSNCLIENT_QOS1) 00037 #define MQTTSNCLIENT_QOS1 1 00038 #endif 00039 #if !defined(MQTTSNCLIENT_QOS2) 00040 #define MQTTSNCLIENT_QOS2 0 00041 #endif 00042 00043 namespace MQTTSN 00044 { 00045 00046 00047 enum QoS { QOS0, QOS1, QOS2 }; 00048 00049 // all failure return codes must be negative 00050 enum returnCode { MAX_SUBSCRIPTIONS_EXCEEDED = -3, BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 }; 00051 00052 00053 struct Message 00054 { 00055 enum QoS qos; 00056 bool retained; 00057 bool dup; 00058 unsigned short id; 00059 void *payload; 00060 size_t payloadlen; 00061 }; 00062 00063 00064 struct MessageData 00065 { 00066 MessageData(MQTTSN_topicid &aTopic, struct Message &aMessage) : message(aMessage), topic(aTopic) 00067 { } 00068 00069 struct Message &message; 00070 MQTTSN_topicid &topic; 00071 }; 00072 00073 00074 class PacketId 00075 { 00076 public: 00077 PacketId() 00078 { 00079 next = 0; 00080 } 00081 00082 int getNext() 00083 { 00084 return next = (next == MAX_PACKET_ID) ? 1 : next + 1; 00085 } 00086 00087 private: 00088 static const int MAX_PACKET_ID = 65535; 00089 int next; 00090 }; 00091 00092 00093 /** 00094 * @class MQTTSNClient 00095 * @brief blocking, non-threaded MQTTSN client API 00096 * 00097 * This version of the API blocks on all method calls, until they are complete. This means that only one 00098 * MQTT request can be in process at any one time. 00099 * @param Network a network class which supports send, receive 00100 * @param Timer a timer class with the methods: 00101 */ 00102 template<class Network, class Timer, int MAX_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5> 00103 class Client 00104 { 00105 00106 public: 00107 00108 typedef void (*messageHandler)(MessageData&); 00109 00110 /** Construct the client 00111 * @param network - pointer to an instance of the Network class - must be connected to the endpoint 00112 * before calling MQTT connect 00113 * @param limits an instance of the Limit class - to alter limits as required 00114 */ 00115 Client(Network& network, unsigned int command_timeout_ms = 30000); 00116 00117 /** Set the default message handling callback - used for any message which does not match a subscription message handler 00118 * @param mh - pointer to the callback function 00119 */ 00120 void setDefaultMessageHandler(messageHandler mh) 00121 { 00122 defaultMessageHandler.attach(mh); 00123 } 00124 00125 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00126 * The nework object must be connected to the network endpoint before calling this 00127 * Default connect options are used 00128 * @return success code - 00129 */ 00130 int connect(); 00131 00132 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00133 * The nework object must be connected to the network endpoint before calling this 00134 * @param options - connect options 00135 * @return success code - 00136 */ 00137 int connect(MQTTSNPacket_connectData& options); 00138 00139 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00140 * @param topic - the topic to publish to 00141 * @param message - the message to send 00142 * @return success code - 00143 */ 00144 int publish(MQTTSN_topicid& topic, Message& message); 00145 00146 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00147 * @param topic - the topic to publish to 00148 * @param payload - the data to send 00149 * @param payloadlen - the length of the data 00150 * @param qos - the QoS to send the publish at 00151 * @param retained - whether the message should be retained 00152 * @return success code - 00153 */ 00154 int publish(MQTTSN_topicid &topic, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false); 00155 00156 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00157 * @param topic - the topic to publish to 00158 * @param payload - the data to send 00159 * @param payloadlen - the length of the data 00160 * @param id - the packet id used - returned 00161 * @param qos - the QoS to send the publish at 00162 * @param retained - whether the message should be retained 00163 * @return success code - 00164 */ 00165 int publish(MQTTSN_topicid& topic, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false); 00166 00167 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback 00168 * @param topicFilter - a topic pattern which can include wildcards 00169 * @param qos - the MQTT QoS to subscribe at 00170 * @param mh - the callback function to be invoked when a message is received for this subscription 00171 * @return success code - 00172 */ 00173 int subscribe(MQTTSN_topicid& topicFilter, enum QoS qos, messageHandler mh); 00174 00175 /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback 00176 * @param topicFilter - a topic pattern which can include wildcards 00177 * @return success code - 00178 */ 00179 int unsubscribe(MQTTSN_topicid& topicFilter); 00180 00181 /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state 00182 * @param duration - used for sleeping clients, 0 means no duration 00183 * @return success code - 00184 */ 00185 int disconnect(unsigned short duration = 0); 00186 00187 /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive 00188 * yield can be called if no other MQTT operation is needed. This will also allow messages to be 00189 * received. 00190 * @param timeout_ms the time to wait, in milliseconds 00191 * @return success code - on failure, this means the client has disconnected 00192 */ 00193 int yield(unsigned long timeout_ms = 1000L); 00194 00195 /** Is the client connected? 00196 * @return flag - is the client connected or not? 00197 */ 00198 bool isConnected() 00199 { 00200 return isconnected; 00201 } 00202 00203 protected: 00204 00205 int cycle(Timer& timer); 00206 int waitfor(int packet_type, Timer& timer); 00207 00208 private: 00209 00210 int keepalive(); 00211 int publish(int len, Timer& timer, enum QoS qos); 00212 00213 int decodePacket(int* value, int timeout); 00214 int readPacket(Timer& timer); 00215 int sendPacket(int length, Timer& timer); 00216 int deliverMessage(MQTTSN_topicid& topic, Message& message); 00217 bool isTopicMatched(char* topicFilter, MQTTSNString& topicName); 00218 00219 Network& ipstack; 00220 unsigned long command_timeout_ms; 00221 00222 unsigned char sendbuf[MAX_PACKET_SIZE]; 00223 unsigned char readbuf[MAX_PACKET_SIZE]; 00224 00225 Timer last_sent, last_received; 00226 unsigned short duration; 00227 bool ping_outstanding; 00228 bool cleansession; 00229 00230 PacketId packetid; 00231 00232 struct MessageHandlers 00233 { 00234 MQTTSN_topicid* topicFilter; 00235 FP<void, MessageData&> fp; 00236 } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic 00237 00238 FP<void, MessageData&> defaultMessageHandler; 00239 00240 bool isconnected; 00241 00242 struct Registrations 00243 { 00244 unsigned short id; 00245 char name[MAX_REGISTRATION_TOPIC_NAME_LENGTH]; 00246 } registrations[MAX_REGISTRATIONS]; 00247 00248 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00249 unsigned char pubbuf[MAX_PACKET_SIZE]; // store the last publish for sending on reconnect 00250 int inflightLen; 00251 unsigned short inflightMsgid; 00252 enum QoS inflightQoS; 00253 #endif 00254 00255 #if MQTTCLIENT_QOS2 00256 bool pubrel; 00257 unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES]; 00258 bool isQoS2msgidFree(unsigned short id); 00259 bool useQoS2msgid(unsigned short id); 00260 #endif 00261 00262 }; 00263 00264 } 00265 00266 00267 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00268 MQTTSN::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms) : ipstack(network), packetid() 00269 { 00270 ping_outstanding = false; 00271 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00272 messageHandlers[i].topicFilter = 0; 00273 this->command_timeout_ms = command_timeout_ms; 00274 isconnected = false; 00275 00276 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00277 inflightMsgid = 0; 00278 inflightQoS = QOS0; 00279 #endif 00280 00281 00282 #if MQTTCLIENT_QOS2 00283 pubrel = false; 00284 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00285 incomingQoS2messages[i] = 0; 00286 #endif 00287 } 00288 00289 #if MQTTCLIENT_QOS2 00290 template<class Network, class Timer, int a, int b> 00291 bool MQTTSN::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id) 00292 { 00293 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00294 { 00295 if (incomingQoS2messages[i] == id) 00296 return false; 00297 } 00298 return true; 00299 } 00300 00301 00302 template<class Network, class Timer, int a, int b> 00303 bool MQTTSN::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id) 00304 { 00305 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00306 { 00307 if (incomingQoS2messages[i] == 0) 00308 { 00309 incomingQoS2messages[i] = id; 00310 return true; 00311 } 00312 } 00313 return false; 00314 } 00315 #endif 00316 00317 00318 template<class Network, class Timer, int a, int b> 00319 int MQTTSN::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer) 00320 { 00321 int rc = FAILURE, 00322 sent = 0; 00323 00324 while (sent < length && !timer.expired()) 00325 { 00326 rc = ipstack.write(&sendbuf[sent], length, timer.left_ms()); 00327 if (rc < 0) // there was an error writing the data 00328 break; 00329 sent += rc; 00330 } 00331 if (sent == length) 00332 { 00333 if (this->duration > 0) 00334 last_sent.countdown(this->duration); // record the fact that we have successfully sent the packet 00335 rc = SUCCESS; 00336 } 00337 else 00338 rc = FAILURE; 00339 00340 #if defined(MQTT_DEBUG) 00341 char printbuf[50]; 00342 DEBUG("Rc %d from sending packet %s\n", rc, MQTTPacket_toString(printbuf, sizeof(printbuf), sendbuf, length)); 00343 #endif 00344 return rc; 00345 } 00346 00347 00348 template<class Network, class Timer, int a, int b> 00349 int MQTTSN::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout) 00350 { 00351 unsigned char c; 00352 int multiplier = 1; 00353 int len = 0; 00354 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; 00355 00356 *value = 0; 00357 do 00358 { 00359 int rc = MQTTSNPACKET_READ_ERROR; 00360 00361 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) 00362 { 00363 rc = MQTTSNPACKET_READ_ERROR; /* bad data */ 00364 goto exit; 00365 } 00366 rc = ipstack.read(&c, 1, timeout); 00367 if (rc != 1) 00368 goto exit; 00369 *value += (c & 127) * multiplier; 00370 multiplier *= 128; 00371 } while ((c & 128) != 0); 00372 exit: 00373 return len; 00374 } 00375 00376 00377 /** 00378 * If any read fails in this method, then we should disconnect from the network, as on reconnect 00379 * the packets can be retried. 00380 * @param timeout the max time to wait for the packet read to complete, in milliseconds 00381 * @return the MQTT packet type, or -1 if none 00382 */ 00383 template<class Network, class Timer, int MAX_PACKET_SIZE, int b> 00384 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::readPacket(Timer& timer) 00385 { 00386 int rc = FAILURE; 00387 int len = 0; // the length of the whole packet including length field 00388 int lenlen = 0; 00389 int datalen = 0; 00390 00391 #define MQTTSN_MIN_PACKET_LENGTH 2 00392 // 1. read the packet, datagram style 00393 if ((len = ipstack.read(readbuf, MAX_PACKET_SIZE, timer.left_ms())) < MQTTSN_MIN_PACKET_LENGTH) 00394 goto exit; 00395 00396 // 2. read the length. This is variable in itself 00397 lenlen = MQTTSNPacket_decode(readbuf, len, &datalen); 00398 if (datalen != len) 00399 goto exit; // there was an error 00400 00401 rc = readbuf[lenlen]; 00402 if (this->duration > 0) 00403 last_received.countdown(this->duration); // record the fact that we have successfully received a packet 00404 exit: 00405 00406 #if defined(MQTT_DEBUG) 00407 char printbuf[50]; 00408 DEBUG("Rc %d from receiving packet %s\n", rc, MQTTPacket_toString(printbuf, sizeof(printbuf), readbuf, len)); 00409 #endif 00410 return rc; 00411 } 00412 00413 00414 // assume topic filter and name is in correct format 00415 // # can only be at end 00416 // + and # can only be next to separator 00417 template<class Network, class Timer, int a, int b> 00418 bool MQTTSN::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTSNString& topicName) 00419 { 00420 char* curf = topicFilter; 00421 char* curn = topicName.lenstring.data; 00422 char* curn_end = curn + topicName.lenstring.len; 00423 00424 while (*curf && curn < curn_end) 00425 { 00426 if (*curn == '/' && *curf != '/') 00427 break; 00428 if (*curf != '+' && *curf != '#' && *curf != *curn) 00429 break; 00430 if (*curf == '+') 00431 { // skip until we meet the next separator, or end of string 00432 char* nextpos = curn + 1; 00433 while (nextpos < curn_end && *nextpos != '/') 00434 nextpos = ++curn + 1; 00435 } 00436 else if (*curf == '#') 00437 curn = curn_end - 1; // skip until end of string 00438 curf++; 00439 curn++; 00440 }; 00441 00442 return (curn == curn_end) && (*curf == '\0'); 00443 } 00444 00445 00446 00447 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00448 int MQTTSN::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTSN_topicid& topic, Message& message) 00449 { 00450 int rc = FAILURE; 00451 00452 // we have to find the right message handler - indexed by topic 00453 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00454 { 00455 MQTTSNString str = MQTTSNString_initializer; 00456 str.lenstring.data = topic.data.long_.name; 00457 str.lenstring.len = topic.data.long_.len; 00458 if (messageHandlers[i].topicFilter != 0 && (MQTTSNTopic_equals(&topic, messageHandlers[i].topicFilter) || 00459 isTopicMatched(messageHandlers[i].topicFilter->data.long_.name, str))) 00460 { 00461 if (messageHandlers[i].fp.attached()) 00462 { 00463 MessageData md(topic, message); 00464 messageHandlers[i].fp(md); 00465 rc = SUCCESS; 00466 } 00467 } 00468 } 00469 00470 if (rc == FAILURE && defaultMessageHandler.attached()) 00471 { 00472 MessageData md(topic, message); 00473 defaultMessageHandler(md); 00474 rc = SUCCESS; 00475 } 00476 00477 return rc; 00478 } 00479 00480 00481 00482 template<class Network, class Timer, int a, int b> 00483 int MQTTSN::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms) 00484 { 00485 int rc = SUCCESS; 00486 Timer timer; 00487 00488 timer.countdown_ms(timeout_ms); 00489 while (!timer.expired()) 00490 { 00491 if (cycle(timer) == FAILURE) 00492 { 00493 rc = FAILURE; 00494 break; 00495 } 00496 } 00497 00498 return rc; 00499 } 00500 00501 00502 template<class Network, class Timer, int MAX_PACKET_SIZE, int b> 00503 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::cycle(Timer& timer) 00504 { 00505 /* get one piece of work off the wire and one pass through */ 00506 00507 // read the socket, see what work is due 00508 unsigned short packet_type = readPacket(timer); 00509 00510 int len = 0; 00511 unsigned char rc = SUCCESS; 00512 00513 switch (packet_type) 00514 { 00515 case MQTTSN_CONNACK: 00516 case MQTTSN_PUBACK: 00517 case MQTTSN_SUBACK: 00518 case MQTTSN_REGACK: 00519 break; 00520 case MQTTSN_REGISTER: 00521 { 00522 unsigned short topicid, packetid; 00523 MQTTSNString topicName; 00524 rc = MQTTSN_RC_ACCEPTED; 00525 if (MQTTSNDeserialize_register(&topicid, &packetid, &topicName, readbuf, MAX_PACKET_SIZE) != 1) 00526 goto exit; 00527 len = MQTTSNSerialize_regack(sendbuf, MAX_PACKET_SIZE, topicid, packetid, rc); 00528 if (len <= 0) 00529 rc = FAILURE; 00530 else 00531 rc = sendPacket(len, timer); 00532 break; 00533 } 00534 case MQTTSN_PUBLISH: 00535 MQTTSN_topicid topicid; 00536 Message msg; 00537 if (MQTTSNDeserialize_publish((unsigned char*)&msg.dup, (int*)&msg.qos, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicid, 00538 (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_PACKET_SIZE) != 1) 00539 goto exit; 00540 #if MQTTCLIENT_QOS2 00541 if (msg.qos != QOS2) 00542 #endif 00543 deliverMessage(topicid, msg); 00544 #if MQTTCLIENT_QOS2 00545 else if (isQoS2msgidFree(msg.id)) 00546 { 00547 if (useQoS2msgid(msg.id)) 00548 deliverMessage(topicid, msg); 00549 else 00550 WARN("Maximum number of incoming QoS2 messages exceeded"); 00551 } 00552 #endif 00553 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00554 if (msg.qos != QOS0) 00555 { 00556 if (msg.qos == QOS1) 00557 len = MQTTSNSerialize_puback(sendbuf, MAX_PACKET_SIZE, topicid.data.id, msg.id, 0); 00558 else if (msg.qos == QOS2) 00559 len = MQTTSNSerialize_pubrec(sendbuf, MAX_PACKET_SIZE, msg.id); 00560 if (len <= 0) 00561 rc = FAILURE; 00562 else 00563 rc = sendPacket(len, timer); 00564 if (rc == FAILURE) 00565 goto exit; // there was a problem 00566 } 00567 break; 00568 #endif 00569 #if MQTTCLIENT_QOS2 00570 case MQTTSN_PUBREC: 00571 unsigned short mypacketid; 00572 unsigned char dup, type; 00573 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_PACKET_SIZE) != 1) 00574 rc = FAILURE; 00575 else if ((len = MQTTSerialize_ack(sendbuf, MAX_PACKET_SIZE, PUBREL, 0, mypacketid)) <= 0) 00576 rc = FAILURE; 00577 else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet 00578 rc = FAILURE; // there was a problem 00579 if (rc == FAILURE) 00580 goto exit; // there was a problem 00581 break; 00582 case MQTTSN_PUBCOMP: 00583 break; 00584 #endif 00585 case MQTTSN_PINGRESP: 00586 ping_outstanding = false; 00587 break; 00588 } 00589 keepalive(); 00590 exit: 00591 if (rc == SUCCESS) 00592 rc = packet_type; 00593 return rc; 00594 } 00595 00596 00597 template<class Network, class Timer, int MAX_PACKET_SIZE, int b> 00598 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::keepalive() 00599 { 00600 int rc = FAILURE; 00601 00602 if (duration == 0) 00603 { 00604 rc = SUCCESS; 00605 goto exit; 00606 } 00607 00608 if (last_sent.expired() || last_received.expired()) 00609 { 00610 if (!ping_outstanding) 00611 { 00612 MQTTSNString clientid = MQTTSNString_initializer; 00613 Timer timer(1000); 00614 int len = MQTTSNSerialize_pingreq(sendbuf, MAX_PACKET_SIZE, clientid); 00615 if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet 00616 ping_outstanding = true; 00617 } 00618 } 00619 00620 exit: 00621 return rc; 00622 } 00623 00624 00625 // only used in single-threaded mode where one command at a time is in process 00626 template<class Network, class Timer, int a, int b> 00627 int MQTTSN::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer) 00628 { 00629 int rc = FAILURE; 00630 00631 do 00632 { 00633 if (timer.expired()) 00634 break; // we timed out 00635 } 00636 while ((rc = cycle(timer)) != packet_type); 00637 00638 return rc; 00639 } 00640 00641 00642 template<class Network, class Timer, int MAX_PACKET_SIZE, int b> 00643 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::connect(MQTTSNPacket_connectData& options) 00644 { 00645 Timer connect_timer(command_timeout_ms); 00646 int rc = FAILURE; 00647 int len = 0; 00648 00649 if (isconnected) // don't send connect packet again if we are already connected 00650 goto exit; 00651 00652 this->duration = options.duration; 00653 this->cleansession = options.cleansession; 00654 if ((len = MQTTSNSerialize_connect(sendbuf, MAX_PACKET_SIZE, &options)) <= 0) 00655 goto exit; 00656 if ((rc = sendPacket(len, connect_timer)) != SUCCESS) // send the connect packet 00657 goto exit; // there was a problem 00658 00659 if (this->duration > 0) 00660 last_received.countdown(this->duration); 00661 // this will be a blocking call, wait for the connack 00662 if (waitfor(MQTTSN_CONNACK, connect_timer) == MQTTSN_CONNACK) 00663 { 00664 //unsigned char connack_rc = 255; 00665 int connack_rc = 255; 00666 if (MQTTSNDeserialize_connack(&connack_rc, readbuf, MAX_PACKET_SIZE) == 1) 00667 rc = connack_rc; 00668 else 00669 rc = FAILURE; 00670 } 00671 else 00672 rc = FAILURE; 00673 00674 #if MQTTCLIENT_QOS2 00675 // resend an inflight publish 00676 if (inflightMsgid >0 && inflightQoS == QOS2 && pubrel) 00677 { 00678 if ((len = MQTTSerialize_ack(sendbuf, MAX_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0) 00679 rc = FAILURE; 00680 else 00681 rc = publish(len, connect_timer, inflightQoS); 00682 } 00683 else 00684 #endif 00685 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00686 if (inflightMsgid > 0) 00687 { 00688 memcpy(sendbuf, pubbuf, MAX_PACKET_SIZE); 00689 rc = publish(inflightLen, connect_timer, inflightQoS); 00690 } 00691 #endif 00692 00693 exit: 00694 if (rc == SUCCESS) 00695 isconnected = true; 00696 return rc; 00697 } 00698 00699 00700 template<class Network, class Timer, int MAX_PACKET_SIZE, int b> 00701 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::connect() 00702 { 00703 MQTTSNPacket_connectData default_options = MQTTSNPacket_connectData_initializer; 00704 return connect(default_options); 00705 } 00706 00707 00708 template<class Network, class Timer, int MAX_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00709 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(MQTTSN_topicid& topicFilter, enum QoS qos, messageHandler messageHandler) 00710 { 00711 int rc = FAILURE; 00712 Timer timer(command_timeout_ms); 00713 int len = 0; 00714 00715 if (!isconnected) 00716 return FAILURE; // goto exit cannot cross variable initialization 00717 00718 bool freeHandler = false; 00719 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00720 { 00721 if (messageHandlers[i].topicFilter == 0) 00722 { 00723 freeHandler = true; 00724 break; 00725 } 00726 } 00727 if (!freeHandler) 00728 { // No message handler free 00729 rc = MAX_SUBSCRIPTIONS_EXCEEDED; 00730 goto exit; 00731 } 00732 00733 len = MQTTSNSerialize_subscribe(sendbuf, MAX_PACKET_SIZE, 0, qos, packetid.getNext(), &topicFilter); 00734 if (len <= 0) 00735 goto exit; 00736 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet 00737 goto exit; // there was a problem 00738 00739 if (waitfor(MQTTSN_SUBACK, timer) == MQTTSN_SUBACK) // wait for suback 00740 { 00741 int grantedQoS = -1; 00742 unsigned short mypacketid; 00743 unsigned char rc; 00744 if (MQTTSNDeserialize_suback(&grantedQoS, &topicFilter.data.id, &mypacketid, &rc, readbuf, MAX_PACKET_SIZE) != 1) 00745 goto exit; 00746 00747 if (qos != grantedQoS) 00748 goto exit; 00749 00750 if (rc == MQTTSN_RC_ACCEPTED) 00751 { 00752 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00753 { 00754 if (messageHandlers[i].topicFilter == 0) 00755 { 00756 messageHandlers[i].topicFilter = &topicFilter; 00757 messageHandlers[i].fp.attach(messageHandler); 00758 rc = 0; 00759 break; 00760 } 00761 } 00762 } 00763 } 00764 else 00765 rc = FAILURE; 00766 00767 exit: 00768 if (rc != SUCCESS) 00769 isconnected = false; 00770 return rc; 00771 } 00772 00773 00774 template<class Network, class Timer, int MAX_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00775 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(MQTTSN_topicid& topicFilter) 00776 { 00777 int rc = FAILURE; 00778 Timer timer(command_timeout_ms); 00779 int len = 0; 00780 00781 if (!isconnected) 00782 goto exit; 00783 00784 if ((len = MQTTSNSerialize_unsubscribe(sendbuf, MAX_PACKET_SIZE, packetid.getNext(), &topicFilter)) <= 0) 00785 goto exit; 00786 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet 00787 goto exit; // there was a problem 00788 00789 if (waitfor(MQTTSN_UNSUBACK, timer) == MQTTSN_UNSUBACK) 00790 { 00791 unsigned short mypacketid; // should be the same as the packetid above 00792 if (MQTTSNDeserialize_unsuback(&mypacketid, readbuf, MAX_PACKET_SIZE) == 1) 00793 rc = 0; 00794 } 00795 else 00796 rc = FAILURE; 00797 00798 exit: 00799 if (rc != SUCCESS) 00800 isconnected = false; 00801 return rc; 00802 } 00803 00804 00805 template<class Network, class Timer, int MAX_PACKET_SIZE, int b> 00806 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos) 00807 { 00808 int rc; 00809 00810 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet 00811 goto exit; // there was a problem 00812 00813 #if MQTTCLIENT_QOS1 00814 if (qos == QOS1) 00815 { 00816 if (waitfor(MQTTSN_PUBACK, timer) == MQTTSN_PUBACK) 00817 { 00818 unsigned short mypacketid; 00819 unsigned char type; 00820 if (MQTTSNDeserialize_ack(&type, &mypacketid, readbuf, MAX_PACKET_SIZE) != 1) 00821 rc = FAILURE; 00822 else if (inflightMsgid == mypacketid) 00823 inflightMsgid = 0; 00824 } 00825 else 00826 rc = FAILURE; 00827 } 00828 #elif MQTTCLIENT_QOS2 00829 else if (qos == QOS2) 00830 { 00831 if (waitfor(PUBCOMP, timer) == PUBCOMP) 00832 { 00833 unsigned short mypacketid; 00834 unsigned char type; 00835 if (MQTTDeserialize_ack(&type, &mypacketid, readbuf, MAX_PACKET_SIZE) != 1) 00836 rc = FAILURE; 00837 else if (inflightMsgid == mypacketid) 00838 inflightMsgid = 0; 00839 } 00840 else 00841 rc = FAILURE; 00842 } 00843 #endif 00844 00845 exit: 00846 if (rc != SUCCESS) 00847 isconnected = false; 00848 return rc; 00849 } 00850 00851 00852 00853 template<class Network, class Timer, int MAX_PACKET_SIZE, int b> 00854 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::publish(MQTTSN_topicid& topic, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained) 00855 { 00856 int rc = FAILURE; 00857 Timer timer(command_timeout_ms); 00858 int len = 0; 00859 00860 if (!isconnected) 00861 goto exit; 00862 00863 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00864 if (qos == QOS1 || qos == QOS2) 00865 id = packetid.getNext(); 00866 #endif 00867 00868 len = MQTTSNSerialize_publish(sendbuf, MAX_PACKET_SIZE, 0, qos, retained, id, 00869 topic, (unsigned char*)payload, payloadlen); 00870 if (len <= 0) 00871 goto exit; 00872 00873 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 00874 if (!cleansession) 00875 { 00876 memcpy(pubbuf, sendbuf, len); 00877 inflightMsgid = id; 00878 inflightLen = len; 00879 inflightQoS = qos; 00880 #if MQTTCLIENT_QOS2 00881 pubrel = false; 00882 #endif 00883 } 00884 #endif 00885 00886 rc = publish(len, timer, qos); 00887 exit: 00888 return rc; 00889 } 00890 00891 00892 template<class Network, class Timer, int MAX_PACKET_SIZE, int b> 00893 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::publish(MQTTSN_topicid& topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained) 00894 { 00895 unsigned short id = 0; // dummy - not used for anything 00896 return publish(topicName, payload, payloadlen, id, qos, retained); 00897 } 00898 00899 00900 template<class Network, class Timer, int MAX_PACKET_SIZE, int b> 00901 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::publish(MQTTSN_topicid& topicName, Message& message) 00902 { 00903 return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained); 00904 } 00905 00906 00907 template<class Network, class Timer, int MAX_PACKET_SIZE, int b> 00908 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::disconnect(unsigned short duration) 00909 { 00910 int rc = FAILURE; 00911 Timer timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete 00912 int int_duration = (duration == 0) ? -1 : (int)duration; 00913 int len = MQTTSNSerialize_disconnect(sendbuf, MAX_PACKET_SIZE, int_duration); 00914 if (len > 0) 00915 rc = sendPacket(len, timer); // send the disconnect packet 00916 00917 isconnected = false; 00918 return rc; 00919 } 00920 00921 00922 #endif 00923
Generated on Wed Jul 13 2022 10:46:02 by
1.7.2