High level MQTT-SN C++ library
Dependencies: EthernetInterface FP MQTTSNPacket
MQTTSNClient.h
00001 /******************************************************************************* 00002 * Copyright (c) 2014, 2015 IBM Corp. 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Ian Craggs - initial API and implementation and/or initial documentation 00015 *******************************************************************************/ 00016 00017 #if !defined(MQTTSNCLIENT_H) 00018 #define MQTTSNCLIENT_H 00019 00020 #include "FP.h" 00021 #include "MQTTSNPacket.h" 00022 #include "stdio.h" 00023 #include "MQTTLogging.h" 00024 00025 // Data limits 00026 #if !defined(MAX_REGISTRATIONS) 00027 #define MAX_REGISTRATIONS 5 00028 #endif 00029 #if !defined(MAX_REGISTRATION_TOPIC_NAME_LENGTH) 00030 #define MAX_REGISTRATION_TOPIC_NAME_LENGTH 20 00031 #endif 00032 #if !defined(MAX_INCOMING_QOS2_MESSAGES) 00033 #define MAX_INCOMING_QOS2_MESSAGES 10 00034 #endif 00035 00036 #if !defined(MQTTSNCLIENT_QOS1) 00037 #define MQTTSNCLIENT_QOS1 1 00038 #endif 00039 #if !defined(MQTTSNCLIENT_QOS2) 00040 #define MQTTSNCLIENT_QOS2 0 00041 #endif 00042 00043 namespace MQTTSN 00044 { 00045 00046 00047 enum QoS { QOS0, QOS1, QOS2 }; 00048 00049 // all failure return codes must be negative 00050 enum returnCode { MAX_SUBSCRIPTIONS_EXCEEDED = -3, BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 }; 00051 00052 00053 struct Message 00054 { 00055 enum QoS qos; 00056 bool retained; 00057 bool dup; 00058 unsigned short id; 00059 void *payload; 00060 size_t payloadlen; 00061 }; 00062 00063 00064 struct MessageData 00065 { 00066 MessageData(MQTTSN_topicid &aTopic, struct Message &aMessage) : message(aMessage), topic(aTopic) 00067 { } 00068 00069 struct Message &message; 00070 MQTTSN_topicid &topic; 00071 }; 00072 00073 00074 class PacketId 00075 { 00076 public: 00077 PacketId() 00078 { 00079 next = 0; 00080 } 00081 00082 int getNext() 00083 { 00084 return next = (next == MAX_PACKET_ID) ? 1 : ++next; 00085 } 00086 00087 private: 00088 static const int MAX_PACKET_ID = 65535; 00089 int next; 00090 }; 00091 00092 00093 /** 00094 * @class MQTTSNClient 00095 * @brief blocking, non-threaded MQTTSN client API 00096 * 00097 * This version of the API blocks on all method calls, until they are complete. This means that only one 00098 * MQTT request can be in process at any one time. 00099 * @param Network a network class which supports send, receive 00100 * @param Timer a timer class with the methods: 00101 */ 00102 template<class Network, class Timer, int MAX_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5> 00103 class Client 00104 { 00105 00106 public: 00107 00108 typedef void (*messageHandler)(MessageData&); 00109 00110 /** Construct the client 00111 * @param network - pointer to an instance of the Network class - must be connected to the endpoint 00112 * before calling MQTT connect 00113 * @param limits an instance of the Limit class - to alter limits as required 00114 */ 00115 Client(Network& network, unsigned int command_timeout_ms = 30000); 00116 00117 /** Set the default message handling callback - used for any message which does not match a subscription message handler 00118 * @param mh - pointer to the callback function 00119 */ 00120 void setDefaultMessageHandler(messageHandler mh) 00121 { 00122 defaultMessageHandler.attach(mh); 00123 } 00124 00125 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00126 * The nework object must be connected to the network endpoint before calling this 00127 * Default connect options are used 00128 * @return success code - 00129 */ 00130 int connect(); 00131 00132 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack 00133 * The nework object must be connected to the network endpoint before calling this 00134 * @param options - connect options 00135 * @return success code - 00136 */ 00137 int connect(MQTTSNPacket_connectData& options); 00138 00139 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00140 * @param topic - the topic to publish to 00141 * @param message - the message to send 00142 * @return success code - 00143 */ 00144 int publish(MQTTSN_topicid& topic, Message& message); 00145 00146 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00147 * @param topic - the topic to publish to 00148 * @param payload - the data to send 00149 * @param payloadlen - the length of the data 00150 * @param qos - the QoS to send the publish at 00151 * @param retained - whether the message should be retained 00152 * @return success code - 00153 */ 00154 int publish(MQTTSN_topicid &topic, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false); 00155 00156 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs 00157 * @param topic - the topic to publish to 00158 * @param payload - the data to send 00159 * @param payloadlen - the length of the data 00160 * @param id - the packet id used - returned 00161 * @param qos - the QoS to send the publish at 00162 * @param retained - whether the message should be retained 00163 * @return success code - 00164 */ 00165 int publish(MQTTSN_topicid& topic, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false); 00166 00167 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback 00168 * @param topicFilter - a topic pattern which can include wildcards 00169 * @param qos - the MQTT QoS to subscribe at 00170 * @param mh - the callback function to be invoked when a message is received for this subscription 00171 * @return success code - 00172 */ 00173 int subscribe(MQTTSN_topicid& topicFilter, enum QoS qos, enum QoS &grantedQoS, messageHandler mh); 00174 00175 /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback 00176 * @param topicFilter - a topic pattern which can include wildcards 00177 * @return success code - 00178 */ 00179 int unsubscribe(MQTTSN_topicid& topicFilter); 00180 00181 /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state 00182 * @param duration - used for sleeping clients, 0 means no duration 00183 * @return success code - 00184 */ 00185 int disconnect(unsigned short duration = 0); 00186 00187 /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive 00188 * yield can be called if no other MQTT operation is needed. This will also allow messages to be 00189 * received. 00190 * @param timeout_ms the time to wait, in milliseconds 00191 * @return success code - on failure, this means the client has disconnected 00192 */ 00193 int yield(unsigned long timeout_ms = 1000L); 00194 00195 /** Is the client connected? 00196 * @return flag - is the client connected or not? 00197 */ 00198 bool isConnected() 00199 { 00200 return isconnected; 00201 } 00202 00203 protected: 00204 00205 int cycle(Timer& timer); 00206 int waitfor(int packet_type, Timer& timer); 00207 00208 private: 00209 00210 int keepalive(); 00211 int publish(int len, Timer& timer, enum QoS qos); 00212 00213 int readPacket(Timer& timer); 00214 int sendPacket(int length, Timer& timer); 00215 int deliverMessage(MQTTSN_topicid& topic, Message& message); 00216 bool isTopicMatched(char* topicFilter, MQTTSNString& topicName); 00217 00218 Network& ipstack; 00219 unsigned long command_timeout_ms; 00220 00221 unsigned char sendbuf[MAX_PACKET_SIZE]; 00222 unsigned char readbuf[MAX_PACKET_SIZE]; 00223 00224 Timer last_sent, last_received; 00225 unsigned short duration; 00226 bool ping_outstanding; 00227 bool cleansession; 00228 00229 PacketId packetid; 00230 00231 struct MessageHandlers 00232 { 00233 MQTTSN_topicid* topicFilter; 00234 FP<void, MessageData&> fp; 00235 } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic 00236 00237 FP<void, MessageData&> defaultMessageHandler; 00238 00239 bool isconnected; 00240 00241 struct Registrations 00242 { 00243 unsigned short id; 00244 char name[MAX_REGISTRATION_TOPIC_NAME_LENGTH]; 00245 } registrations[MAX_REGISTRATIONS]; 00246 00247 #if MQTTSNCLIENT_QOS1 || MQTTSNCLIENT_QOS2 00248 unsigned char pubbuf[MAX_PACKET_SIZE]; // store the last publish for sending on reconnect 00249 int inflightLen; 00250 unsigned short inflightMsgid; 00251 enum QoS inflightQoS; 00252 #endif 00253 00254 #if MQTTSNCLIENT_QOS2 00255 bool pubrel; 00256 unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES]; 00257 bool isQoS2msgidFree(unsigned short id); 00258 bool useQoS2msgid(unsigned short id); 00259 #endif 00260 00261 }; 00262 00263 } 00264 00265 00266 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00267 MQTTSN::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms) : ipstack(network), packetid() 00268 { 00269 last_sent = Timer(); 00270 last_received = Timer(); 00271 ping_outstanding = false; 00272 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00273 messageHandlers[i].topicFilter = 0; 00274 this->command_timeout_ms = command_timeout_ms; 00275 isconnected = false; 00276 00277 #if MQTTSNCLIENT_QOS1 || MQTTSNCLIENT_QOS2 00278 inflightMsgid = 0; 00279 inflightQoS = QOS0; 00280 #endif 00281 00282 00283 #if MQTTSNCLIENT_QOS2 00284 pubrel = false; 00285 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00286 incomingQoS2messages[i] = 0; 00287 #endif 00288 } 00289 00290 #if MQTTSNCLIENT_QOS2 00291 template<class Network, class Timer, int a, int b> 00292 bool MQTTSN::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id) 00293 { 00294 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00295 { 00296 if (incomingQoS2messages[i] == id) 00297 return false; 00298 } 00299 return true; 00300 } 00301 00302 00303 template<class Network, class Timer, int a, int b> 00304 bool MQTTSN::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id) 00305 { 00306 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) 00307 { 00308 if (incomingQoS2messages[i] == 0) 00309 { 00310 incomingQoS2messages[i] = id; 00311 return true; 00312 } 00313 } 00314 return false; 00315 } 00316 #endif 00317 00318 00319 template<class Network, class Timer, int a, int b> 00320 int MQTTSN::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer) 00321 { 00322 int rc = FAILURE, 00323 sent = 0; 00324 00325 do 00326 { 00327 sent = ipstack.write(sendbuf, length, timer.left_ms()); 00328 printf("sendPacket, rc %d from write of %d bytes\n", sent, length); 00329 if (sent < 0) // there was an error writing the data 00330 break; 00331 } 00332 while (sent != length && !timer.expired()); 00333 00334 if (sent == length) 00335 { 00336 if (this->duration > 0) 00337 last_sent.countdown(this->duration); // record the fact that we have successfully sent the packet 00338 rc = SUCCESS; 00339 } 00340 else 00341 rc = FAILURE; 00342 00343 #if defined(MQTT_DEBUG) 00344 char printbuf[50]; 00345 DEBUG("Rc %d from sending packet %s\n", rc, MQTTPacket_toString(printbuf, sizeof(printbuf), sendbuf, length)); 00346 #endif 00347 return rc; 00348 } 00349 00350 00351 /** 00352 * If any read fails in this method, then we should disconnect from the network, as on reconnect 00353 * the packets can be retried. 00354 * @param timeout the max time to wait for the packet read to complete, in milliseconds 00355 * @return the MQTT packet type, or -1 if none 00356 */ 00357 template<class Network, class Timer, int MAX_PACKET_SIZE, int b> 00358 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::readPacket(Timer& timer) 00359 { 00360 int rc = FAILURE; 00361 int len = 0; // the length of the whole packet including length field 00362 int lenlen = 0; 00363 int datalen = 0; 00364 00365 #define MQTTSN_MIN_PACKET_LENGTH 3 00366 // 1. read the packet, datagram style 00367 if ((len = ipstack.read(readbuf, MAX_PACKET_SIZE, timer.left_ms())) < MQTTSN_MIN_PACKET_LENGTH) 00368 goto exit; 00369 00370 // 2. read the length. This is variable in itself 00371 lenlen = MQTTSNPacket_decode(readbuf, len, &datalen); 00372 if (datalen != len) 00373 goto exit; // there was an error 00374 00375 rc = readbuf[lenlen]; 00376 if (this->duration > 0) 00377 last_received.countdown(this->duration); // record the fact that we have successfully received a packet 00378 exit: 00379 00380 #if defined(MQTT_DEBUG) 00381 char printbuf[50]; 00382 DEBUG("Rc %d from receiving packet %s\n", rc, MQTTPacket_toString(printbuf, sizeof(printbuf), readbuf, len)); 00383 #endif 00384 return rc; 00385 } 00386 00387 00388 // assume topic filter and name is in correct format 00389 // # can only be at end 00390 // + and # can only be next to separator 00391 template<class Network, class Timer, int a, int b> 00392 bool MQTTSN::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTSNString& topicName) 00393 { 00394 char* curf = topicFilter; 00395 char* curn = topicName.lenstring.data; 00396 char* curn_end = curn + topicName.lenstring.len; 00397 00398 while (*curf && curn < curn_end) 00399 { 00400 if (*curn == '/' && *curf != '/') 00401 break; 00402 if (*curf != '+' && *curf != '#' && *curf != *curn) 00403 break; 00404 if (*curf == '+') 00405 { // skip until we meet the next separator, or end of string 00406 char* nextpos = curn + 1; 00407 while (nextpos < curn_end && *nextpos != '/') 00408 nextpos = ++curn + 1; 00409 } 00410 else if (*curf == '#') 00411 curn = curn_end - 1; // skip until end of string 00412 curf++; 00413 curn++; 00414 }; 00415 00416 return (curn == curn_end) && (*curf == '\0'); 00417 } 00418 00419 00420 00421 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 00422 int MQTTSN::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTSN_topicid& topic, Message& message) 00423 { 00424 int rc = FAILURE; 00425 printf("deliverMessage topic id is %d\n", topic.data.id); 00426 00427 // we have to find the right message handler - indexed by topic 00428 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00429 { 00430 if (messageHandlers[i].topicFilter != 0) 00431 printf("messageHandler %d topic id is %d\n", i, messageHandlers[i].topicFilter->data.id); 00432 if (messageHandlers[i].topicFilter != 0 && (topic.data.id == messageHandlers[i].topicFilter->data.id)) 00433 //MQTTSNtopic_equals(&topic, messageHandlers[i].topicFilter) || 00434 // isTopicMatched(messageHandlers[i].topicFilter, topic))) 00435 { 00436 if (messageHandlers[i].fp.attached()) 00437 { 00438 MessageData md(topic, message); 00439 messageHandlers[i].fp(md); 00440 rc = SUCCESS; 00441 } 00442 } 00443 } 00444 00445 if (rc == FAILURE && defaultMessageHandler.attached()) 00446 { 00447 MessageData md(topic, message); 00448 defaultMessageHandler(md); 00449 rc = SUCCESS; 00450 } 00451 00452 return rc; 00453 } 00454 00455 00456 00457 template<class Network, class Timer, int a, int b> 00458 int MQTTSN::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms) 00459 { 00460 int rc = SUCCESS; 00461 Timer timer = Timer(); 00462 00463 timer.countdown_ms(timeout_ms); 00464 while (!timer.expired()) 00465 { 00466 if (cycle(timer) == FAILURE) 00467 { 00468 rc = FAILURE; 00469 break; 00470 } 00471 } 00472 00473 return rc; 00474 } 00475 00476 00477 template<class Network, class Timer, int MAX_PACKET_SIZE, int b> 00478 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::cycle(Timer& timer) 00479 { 00480 /* get one piece of work off the wire and one pass through */ 00481 00482 // read the socket, see what work is due 00483 unsigned short packet_type = readPacket(timer); 00484 00485 int len = 0; 00486 int rc = SUCCESS; 00487 00488 switch (packet_type) 00489 { 00490 case MQTTSN_CONNACK: 00491 case MQTTSN_PUBACK: 00492 case MQTTSN_SUBACK: 00493 case MQTTSN_REGACK: 00494 break; 00495 case MQTTSN_REGISTER: 00496 { 00497 unsigned short topicid, packetid; 00498 MQTTSNString topicName; 00499 unsigned char reg_rc = MQTTSN_RC_ACCEPTED; 00500 if (MQTTSNDeserialize_register(&topicid, &packetid, &topicName, readbuf, MAX_PACKET_SIZE) != 1) 00501 goto exit; 00502 len = MQTTSNSerialize_regack(sendbuf, MAX_PACKET_SIZE, topicid, packetid, reg_rc); 00503 if (len <= 0) 00504 rc = FAILURE; 00505 else 00506 rc = sendPacket(len, timer); 00507 break; 00508 } 00509 case MQTTSN_PUBLISH: 00510 MQTTSN_topicid topicid; 00511 Message msg; 00512 if (MQTTSNDeserialize_publish((unsigned char*)&msg.dup, (int*)&msg.qos, (unsigned char*)&msg.retained, &msg.id, &topicid, 00513 (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_PACKET_SIZE) != 1) 00514 goto exit; 00515 #if MQTTSNCLIENT_QOS2 00516 if (msg.qos != QOS2) 00517 #endif 00518 deliverMessage(topicid, msg); 00519 #if MQTTSNCLIENT_QOS2 00520 else if (isQoS2msgidFree(msg.id)) 00521 { 00522 if (useQoS2msgid(msg.id)) 00523 deliverMessage(topicid, msg); 00524 else 00525 WARN("Maximum number of incoming QoS2 messages exceeded"); 00526 } 00527 #endif 00528 #if MQTTSNCLIENT_QOS1 || MQTTSNCLIENT_QOS2 00529 if (msg.qos != QOS0) 00530 { 00531 if (msg.qos == QOS1) 00532 len = MQTTSNSerialize_puback(sendbuf, MAX_PACKET_SIZE, topicid.data.id, msg.id, 0); 00533 else if (msg.qos == QOS2) 00534 len = MQTTSNSerialize_pubrec(sendbuf, MAX_PACKET_SIZE, msg.id); 00535 if (len <= 0) 00536 rc = FAILURE; 00537 else 00538 { 00539 printf("sending puback len %d\n", len); 00540 rc = sendPacket(len, timer); 00541 printf("rc %d from sending puback\n", rc); 00542 } 00543 if (rc == FAILURE) 00544 goto exit; // there was a problem 00545 } 00546 break; 00547 #endif 00548 #if MQTTSNCLIENT_QOS2 00549 case MQTTSN_PUBREC: 00550 unsigned short mypacketid; 00551 unsigned char type; 00552 if (MQTTSNDeserialize_ack(&type, &mypacketid, readbuf, MAX_PACKET_SIZE) != 1) 00553 rc = FAILURE; 00554 else if ((len = MQTTSNSerialize_pubrel(sendbuf, MAX_PACKET_SIZE, mypacketid)) <= 0) 00555 rc = FAILURE; 00556 else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet 00557 rc = FAILURE; // there was a problem 00558 if (rc == FAILURE) 00559 goto exit; // there was a problem 00560 break; 00561 case MQTTSN_PUBCOMP: 00562 break; 00563 #endif 00564 case MQTTSN_PINGRESP: 00565 ping_outstanding = false; 00566 break; 00567 } 00568 keepalive(); 00569 exit: 00570 if (rc == SUCCESS) 00571 rc = packet_type; 00572 return rc; 00573 } 00574 00575 00576 template<class Network, class Timer, int MAX_PACKET_SIZE, int b> 00577 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::keepalive() 00578 { 00579 int rc = FAILURE; 00580 00581 if (duration == 0) 00582 { 00583 rc = SUCCESS; 00584 goto exit; 00585 } 00586 00587 if (last_sent.expired() || last_received.expired()) 00588 { 00589 if (!ping_outstanding) 00590 { 00591 MQTTSNString clientid = MQTTSNString_initializer; 00592 Timer timer = Timer(1000); 00593 int len = MQTTSNSerialize_pingreq(sendbuf, MAX_PACKET_SIZE, clientid); 00594 if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet 00595 ping_outstanding = true; 00596 } 00597 } 00598 00599 exit: 00600 return rc; 00601 } 00602 00603 00604 // only used in single-threaded mode where one command at a time is in process 00605 template<class Network, class Timer, int a, int b> 00606 int MQTTSN::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer) 00607 { 00608 int rc = FAILURE; 00609 00610 do 00611 { 00612 if (timer.expired()) 00613 break; // we timed out 00614 } 00615 while ((rc = cycle(timer)) != packet_type); 00616 00617 return rc; 00618 } 00619 00620 00621 template<class Network, class Timer, int MAX_PACKET_SIZE, int b> 00622 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::connect(MQTTSNPacket_connectData& options) 00623 { 00624 Timer connect_timer = Timer(command_timeout_ms); 00625 int rc = FAILURE; 00626 int len = 0; 00627 00628 if (isconnected) // don't send connect packet again if we are already connected 00629 goto exit; 00630 00631 this->duration = options.duration; 00632 this->cleansession = options.cleansession; 00633 if ((len = MQTTSNSerialize_connect(sendbuf, MAX_PACKET_SIZE, &options)) <= 0) 00634 goto exit; 00635 if ((rc = sendPacket(len, connect_timer)) != SUCCESS) // send the connect packet 00636 goto exit; // there was a problem 00637 00638 if (this->duration > 0) 00639 last_received.countdown(this->duration); 00640 // this will be a blocking call, wait for the connack 00641 if (waitfor(MQTTSN_CONNACK, connect_timer) == MQTTSN_CONNACK) 00642 { 00643 //unsigned char connack_rc = 255; 00644 int connack_rc = 255; 00645 if (MQTTSNDeserialize_connack(&connack_rc, readbuf, MAX_PACKET_SIZE) == 1) 00646 rc = connack_rc; 00647 else 00648 rc = FAILURE; 00649 } 00650 else 00651 rc = FAILURE; 00652 00653 #if MQTTSNCLIENT_QOS2 00654 // resend an inflight publish 00655 if (inflightMsgid >0 && inflightQoS == QOS2 && pubrel) 00656 { 00657 if ((len = MQTTSNSerialize_pubrel(sendbuf, MAX_PACKET_SIZE, inflightMsgid)) <= 0) 00658 rc = FAILURE; 00659 else 00660 rc = publish(len, connect_timer, inflightQoS); 00661 } 00662 else 00663 #endif 00664 #if MQTTSNCLIENT_QOS1 || MQTTSNCLIENT_QOS2 00665 if (inflightMsgid > 0) 00666 { 00667 memcpy(sendbuf, pubbuf, MAX_PACKET_SIZE); 00668 rc = publish(inflightLen, connect_timer, inflightQoS); 00669 } 00670 #endif 00671 00672 exit: 00673 if (rc == SUCCESS) 00674 isconnected = true; 00675 return rc; 00676 } 00677 00678 00679 template<class Network, class Timer, int MAX_PACKET_SIZE, int b> 00680 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::connect() 00681 { 00682 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; 00683 return connect(default_options); 00684 } 00685 00686 00687 template<class Network, class Timer, int MAX_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00688 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(MQTTSN_topicid& topicFilter, enum QoS qos, enum QoS &grantedQoS, messageHandler messageHandler) 00689 { 00690 int rc = FAILURE; 00691 Timer timer = Timer(command_timeout_ms); 00692 int len = 0; 00693 00694 if (!isconnected) 00695 goto exit; 00696 00697 bool freeHandler = false; 00698 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00699 { 00700 if (messageHandlers[i].topicFilter == 0) 00701 { 00702 freeHandler = true; 00703 break; 00704 } 00705 } 00706 if (!freeHandler) 00707 { // No message handler free 00708 rc = MAX_SUBSCRIPTIONS_EXCEEDED; 00709 goto exit; 00710 } 00711 00712 len = MQTTSNSerialize_subscribe(sendbuf, MAX_PACKET_SIZE, 0, qos, packetid.getNext(), &topicFilter); 00713 if (len <= 0) 00714 goto exit; 00715 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet 00716 goto exit; // there was a problem 00717 00718 if (waitfor(MQTTSN_SUBACK, timer) == MQTTSN_SUBACK) // wait for suback 00719 { 00720 unsigned short mypacketid; 00721 unsigned char suback_rc; 00722 if (MQTTSNDeserialize_suback((int*)&grantedQoS, &topicFilter.data.id, &mypacketid, &suback_rc, readbuf, MAX_PACKET_SIZE) != 1) 00723 rc = FAILURE; 00724 else 00725 rc = suback_rc; 00726 if (suback_rc == MQTTSN_RC_ACCEPTED) 00727 { 00728 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) 00729 { 00730 if (messageHandlers[i].topicFilter == 0) 00731 { 00732 messageHandlers[i].topicFilter = &topicFilter; 00733 messageHandlers[i].fp.attach(messageHandler); 00734 rc = 0; 00735 break; 00736 } 00737 } 00738 } 00739 } 00740 else 00741 rc = FAILURE; 00742 00743 exit: 00744 if (rc != SUCCESS) 00745 isconnected = false; 00746 return rc; 00747 } 00748 00749 00750 template<class Network, class Timer, int MAX_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 00751 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(MQTTSN_topicid& topicFilter) 00752 { 00753 int rc = FAILURE; 00754 Timer timer = Timer(command_timeout_ms); 00755 int len = 0; 00756 00757 if (!isconnected) 00758 goto exit; 00759 00760 if ((len = MQTTSNSerialize_unsubscribe(sendbuf, MAX_PACKET_SIZE, packetid.getNext(), &topicFilter)) <= 0) 00761 goto exit; 00762 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet 00763 goto exit; // there was a problem 00764 00765 if (waitfor(MQTTSN_UNSUBACK, timer) == MQTTSN_UNSUBACK) 00766 { 00767 unsigned short mypacketid; // should be the same as the packetid above 00768 if (MQTTSNDeserialize_unsuback(&mypacketid, readbuf, MAX_PACKET_SIZE) == 1) 00769 rc = 0; 00770 } 00771 else 00772 rc = FAILURE; 00773 00774 exit: 00775 if (rc != SUCCESS) 00776 isconnected = false; 00777 return rc; 00778 } 00779 00780 00781 template<class Network, class Timer, int MAX_PACKET_SIZE, int b> 00782 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos) 00783 { 00784 int rc; 00785 00786 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet 00787 goto exit; // there was a problem 00788 00789 #if MQTTSNCLIENT_QOS1 00790 if (qos == QOS1) 00791 { 00792 if (waitfor(MQTTSN_PUBACK, timer) == MQTTSN_PUBACK) 00793 { 00794 unsigned short mypacketid; 00795 unsigned char type; 00796 if (MQTTSNDeserialize_ack(&type, &mypacketid, readbuf, MAX_PACKET_SIZE) != 1) 00797 rc = FAILURE; 00798 else if (inflightMsgid == mypacketid) 00799 inflightMsgid = 0; 00800 } 00801 else 00802 rc = FAILURE; 00803 } 00804 #elif MQTTSNCLIENT_QOS2 00805 else if (qos == QOS2) 00806 { 00807 if (waitfor(PUBCOMP, timer) == PUBCOMP) 00808 { 00809 unsigned short mypacketid; 00810 unsigned char type; 00811 if (MQTTDeserialize_ack(&type, &mypacketid, readbuf, MAX_PACKET_SIZE) != 1) 00812 rc = FAILURE; 00813 else if (inflightMsgid == mypacketid) 00814 inflightMsgid = 0; 00815 } 00816 else 00817 rc = FAILURE; 00818 } 00819 #endif 00820 00821 exit: 00822 if (rc != SUCCESS) 00823 isconnected = false; 00824 return rc; 00825 } 00826 00827 00828 00829 template<class Network, class Timer, int MAX_PACKET_SIZE, int b> 00830 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::publish(MQTTSN_topicid& topic, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained) 00831 { 00832 int rc = FAILURE; 00833 Timer timer = Timer(command_timeout_ms); 00834 int len = 0; 00835 00836 if (!isconnected) 00837 goto exit; 00838 00839 #if MQTTSNCLIENT_QOS1 || MQTTSNCLIENT_QOS2 00840 if (qos == QOS1 || qos == QOS2) 00841 id = packetid.getNext(); 00842 #endif 00843 00844 len = MQTTSNSerialize_publish(sendbuf, MAX_PACKET_SIZE, 0, qos, retained, id, 00845 topic, (unsigned char*)payload, payloadlen); 00846 if (len <= 0) 00847 goto exit; 00848 00849 #if MQTTSNCLIENT_QOS1 || MQTTSNCLIENT_QOS2 00850 if (!cleansession) 00851 { 00852 memcpy(pubbuf, sendbuf, len); 00853 inflightMsgid = id; 00854 inflightLen = len; 00855 inflightQoS = qos; 00856 #if MQTTSNCLIENT_QOS2 00857 pubrel = false; 00858 #endif 00859 } 00860 #endif 00861 00862 rc = publish(len, timer, qos); 00863 exit: 00864 return rc; 00865 } 00866 00867 00868 template<class Network, class Timer, int MAX_PACKET_SIZE, int b> 00869 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::publish(MQTTSN_topicid& topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained) 00870 { 00871 unsigned short id = 0; // dummy - not used for anything 00872 return publish(topicName, payload, payloadlen, id, qos, retained); 00873 } 00874 00875 00876 template<class Network, class Timer, int MAX_PACKET_SIZE, int b> 00877 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::publish(MQTTSN_topicid& topicName, Message& message) 00878 { 00879 return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained); 00880 } 00881 00882 00883 template<class Network, class Timer, int MAX_PACKET_SIZE, int b> 00884 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::disconnect(unsigned short duration) 00885 { 00886 int rc = FAILURE; 00887 Timer timer = Timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete 00888 int int_duration = (duration == 0) ? -1 : (int)duration; 00889 int len = MQTTSNSerialize_disconnect(sendbuf, MAX_PACKET_SIZE, int_duration); 00890 if (len > 0) 00891 rc = sendPacket(len, timer); // send the disconnect packet 00892 00893 isconnected = false; 00894 return rc; 00895 } 00896 00897 00898 #endif
Generated on Fri Jul 15 2022 11:00:18 by 1.7.2