Jack Hansdampf / mbed-mqtt-GSOE1

Dependents:   ESP8266MQTT

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTSNClient.h Source File

MQTTSNClient.h

00001 /*******************************************************************************
00002  * Copyright (c) 2014, 2015 IBM Corp.
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors:
00014  *    Ian Craggs - initial API and implementation and/or initial documentation
00015  *******************************************************************************/
00016 
00017 #if !defined(MQTTSNCLIENT_H)
00018 #define MQTTSNCLIENT_H
00019 
00020 #include "FP.h"
00021 #include "MQTTSNPacket.h"
00022 #include "stdio.h"
00023 #include "MQTTLogging.h"
00024 
00025 // Data limits
00026 #if !defined(MAX_REGISTRATIONS)
00027     #define MAX_REGISTRATIONS 5
00028 #endif
00029 #if !defined(MAX_REGISTRATION_TOPIC_NAME_LENGTH)
00030     #define MAX_REGISTRATION_TOPIC_NAME_LENGTH 20
00031 #endif
00032 #if !defined(MAX_INCOMING_QOS2_MESSAGES)
00033     #define MAX_INCOMING_QOS2_MESSAGES 10
00034 #endif
00035 
00036 #if !defined(MQTTSNCLIENT_QOS1)
00037     #define MQTTSNCLIENT_QOS1 1
00038 #endif
00039 #if !defined(MQTTSNCLIENT_QOS2)
00040     #define MQTTSNCLIENT_QOS2 0
00041 #endif
00042 
00043 namespace MQTTSN
00044 {
00045 
00046 
00047 enum QoS { QOS0, QOS1, QOS2 };
00048 
00049 // all failure return codes must be negative
00050 enum returnCode { MAX_SUBSCRIPTIONS_EXCEEDED = -3, BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
00051 
00052 
00053 struct Message
00054 {
00055     enum QoS qos;
00056     bool retained;
00057     bool dup;
00058     unsigned short id;
00059     void *payload;
00060     size_t payloadlen;
00061 };
00062 
00063 
00064 struct MessageData
00065 {
00066     MessageData(MQTTSN_topicid &aTopic, struct Message &aMessage)  : message(aMessage), topic(aTopic)
00067     { }
00068 
00069     struct Message &message;
00070     MQTTSN_topicid &topic;
00071 };
00072 
00073 
00074 class PacketId
00075 {
00076 public:
00077     PacketId()
00078     {
00079         next = 0;
00080     }
00081 
00082     int getNext()
00083     {
00084         return next = (next == MAX_PACKET_ID) ? 1 : next + 1;
00085     }
00086 
00087 private:
00088     static const int MAX_PACKET_ID = 65535;
00089     int next;
00090 };
00091 
00092 
00093 /**
00094  * @class MQTTSNClient
00095  * @brief blocking, non-threaded MQTTSN client API
00096  *
00097  * This version of the API blocks on all method calls, until they are complete.  This means that only one
00098  * MQTT request can be in process at any one time.
00099  * @param Network a network class which supports send, receive
00100  * @param Timer a timer class with the methods:
00101  */
00102 template<class Network, class Timer, int MAX_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5>
00103 class Client
00104 {
00105 
00106 public:
00107 
00108     typedef void (*messageHandler)(MessageData&);
00109 
00110     /** Construct the client
00111      *  @param network - pointer to an instance of the Network class - must be connected to the endpoint
00112      *      before calling MQTT connect
00113      *  @param limits an instance of the Limit class - to alter limits as required
00114      */
00115     Client(Network& network, unsigned int command_timeout_ms = 30000);
00116 
00117     /** Set the default message handling callback - used for any message which does not match a subscription message handler
00118      *  @param mh - pointer to the callback function
00119      */
00120     void setDefaultMessageHandler(messageHandler mh)
00121     {
00122         defaultMessageHandler.attach(mh);
00123     }
00124 
00125     /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
00126      *  The nework object must be connected to the network endpoint before calling this
00127      *  Default connect options are used
00128      *  @return success code -
00129      */
00130     int connect();
00131     
00132     /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
00133      *  The nework object must be connected to the network endpoint before calling this
00134      *  @param options - connect options
00135      *  @return success code -
00136      */
00137     int connect(MQTTSNPacket_connectData& options);
00138 
00139     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00140      *  @param topic - the topic to publish to
00141      *  @param message - the message to send
00142      *  @return success code -
00143      */
00144     int publish(MQTTSN_topicid& topic, Message& message);
00145     
00146     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00147      *  @param topic - the topic to publish to
00148      *  @param payload - the data to send
00149      *  @param payloadlen - the length of the data
00150      *  @param qos - the QoS to send the publish at
00151      *  @param retained - whether the message should be retained
00152      *  @return success code -
00153      */
00154     int publish(MQTTSN_topicid &topic, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false);
00155     
00156     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00157      *  @param topic - the topic to publish to
00158      *  @param payload - the data to send
00159      *  @param payloadlen - the length of the data
00160      *  @param id - the packet id used - returned 
00161      *  @param qos - the QoS to send the publish at
00162      *  @param retained - whether the message should be retained
00163      *  @return success code -
00164      */
00165     int publish(MQTTSN_topicid& topic, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false);
00166     
00167     /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
00168      *  @param topicFilter - a topic pattern which can include wildcards
00169      *  @param qos - the MQTT QoS to subscribe at
00170      *  @param mh - the callback function to be invoked when a message is received for this subscription
00171      *  @return success code -
00172      */
00173     int subscribe(MQTTSN_topicid& topicFilter, enum QoS qos, messageHandler mh);
00174 
00175     /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback
00176      *  @param topicFilter - a topic pattern which can include wildcards
00177      *  @return success code -
00178      */
00179     int unsubscribe(MQTTSN_topicid& topicFilter);
00180 
00181     /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state
00182      *  @param duration - used for sleeping clients, 0 means no duration
00183      *  @return success code -
00184      */
00185     int disconnect(unsigned short duration = 0);
00186 
00187     /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive
00188      *  yield can be called if no other MQTT operation is needed.  This will also allow messages to be
00189      *  received.
00190      *  @param timeout_ms the time to wait, in milliseconds
00191      *  @return success code - on failure, this means the client has disconnected
00192      */
00193     int yield(unsigned long timeout_ms = 1000L);
00194 
00195     /** Is the client connected?
00196      *  @return flag - is the client connected or not?
00197      */
00198     bool isConnected()
00199     {
00200         return isconnected;
00201     }
00202     
00203 protected:
00204 
00205     int cycle(Timer& timer);
00206     int waitfor(int packet_type, Timer& timer);
00207 
00208 private:
00209 
00210     int keepalive();
00211     int publish(int len, Timer& timer, enum QoS qos);
00212 
00213     int decodePacket(int* value, int timeout);
00214     int readPacket(Timer& timer);
00215     int sendPacket(int length, Timer& timer);
00216     int deliverMessage(MQTTSN_topicid& topic, Message& message);
00217     bool isTopicMatched(char* topicFilter, MQTTSNString& topicName);
00218 
00219     Network& ipstack;
00220     unsigned long command_timeout_ms;
00221 
00222     unsigned char sendbuf[MAX_PACKET_SIZE];
00223     unsigned char readbuf[MAX_PACKET_SIZE];
00224 
00225     Timer last_sent, last_received;
00226     unsigned short duration;
00227     bool ping_outstanding;
00228     bool cleansession;
00229 
00230     PacketId packetid;
00231 
00232     struct MessageHandlers
00233     {
00234         MQTTSN_topicid* topicFilter;
00235         FP<void, MessageData&>  fp;
00236     } messageHandlers[MAX_MESSAGE_HANDLERS];      // Message handlers are indexed by subscription topic
00237 
00238     FP<void, MessageData&>  defaultMessageHandler;
00239 
00240     bool isconnected;
00241     
00242     struct Registrations
00243     {
00244         unsigned short id;
00245         char name[MAX_REGISTRATION_TOPIC_NAME_LENGTH];
00246     } registrations[MAX_REGISTRATIONS];
00247 
00248 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00249     unsigned char pubbuf[MAX_PACKET_SIZE];  // store the last publish for sending on reconnect
00250     int inflightLen;
00251     unsigned short inflightMsgid;
00252     enum QoS inflightQoS;
00253 #endif
00254 
00255 #if MQTTCLIENT_QOS2
00256     bool pubrel;
00257     unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES];
00258     bool isQoS2msgidFree(unsigned short id);
00259     bool useQoS2msgid(unsigned short id);
00260 #endif
00261 
00262 };
00263 
00264 }
00265 
00266 
00267 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
00268 MQTTSN::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms)  : ipstack(network), packetid()
00269 {
00270     ping_outstanding = false;
00271     for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00272         messageHandlers[i].topicFilter = 0;
00273     this->command_timeout_ms = command_timeout_ms;
00274     isconnected = false;
00275     
00276 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00277     inflightMsgid = 0;
00278     inflightQoS = QOS0;
00279 #endif
00280 
00281     
00282 #if MQTTCLIENT_QOS2
00283     pubrel = false;
00284     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00285         incomingQoS2messages[i] = 0;
00286 #endif
00287 }
00288 
00289 #if MQTTCLIENT_QOS2
00290 template<class Network, class Timer, int a, int b>
00291 bool MQTTSN::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id)
00292 {
00293     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00294     {
00295         if (incomingQoS2messages[i] == id)
00296             return false;
00297     }
00298     return true;
00299 }
00300 
00301 
00302 template<class Network, class Timer, int a, int b>
00303 bool MQTTSN::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id)
00304 {
00305     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00306     {
00307         if (incomingQoS2messages[i] == 0)
00308         {
00309             incomingQoS2messages[i] = id;
00310             return true;
00311         }
00312     }
00313     return false;
00314 }
00315 #endif
00316 
00317 
00318 template<class Network, class Timer, int a, int b>
00319 int MQTTSN::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer)
00320 {
00321     int rc = FAILURE,
00322         sent = 0;
00323 
00324     while (sent < length && !timer.expired())
00325     {
00326         rc = ipstack.write(&sendbuf[sent], length, timer.left_ms());
00327         if (rc < 0)  // there was an error writing the data
00328             break;
00329         sent += rc;
00330     }
00331     if (sent == length)
00332     {
00333         if (this->duration > 0)
00334             last_sent.countdown(this->duration); // record the fact that we have successfully sent the packet
00335         rc = SUCCESS;
00336     }
00337     else
00338         rc = FAILURE;
00339         
00340 #if defined(MQTT_DEBUG)
00341     char printbuf[50];
00342     DEBUG("Rc %d from sending packet %s\n", rc, MQTTPacket_toString(printbuf, sizeof(printbuf), sendbuf, length));
00343 #endif
00344     return rc;
00345 }
00346 
00347 
00348 template<class Network, class Timer, int a, int b>
00349 int MQTTSN::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout)
00350 {
00351     unsigned char c;
00352     int multiplier = 1;
00353     int len = 0;
00354     const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
00355 
00356     *value = 0;
00357     do
00358     {
00359         int rc = MQTTSNPACKET_READ_ERROR;
00360 
00361         if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
00362         {
00363             rc = MQTTSNPACKET_READ_ERROR; /* bad data */
00364             goto exit;
00365         }
00366         rc = ipstack.read(&c, 1, timeout);
00367         if (rc != 1)
00368             goto exit;
00369         *value += (c & 127) * multiplier;
00370         multiplier *= 128;
00371     } while ((c & 128) != 0);
00372 exit:
00373     return len;
00374 }
00375 
00376 
00377 /**
00378  * If any read fails in this method, then we should disconnect from the network, as on reconnect
00379  * the packets can be retried.
00380  * @param timeout the max time to wait for the packet read to complete, in milliseconds
00381  * @return the MQTT packet type, or -1 if none
00382  */
00383 template<class Network, class Timer, int MAX_PACKET_SIZE, int b>
00384 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::readPacket(Timer& timer)
00385 {
00386     int rc = FAILURE;
00387     int len = 0;  // the length of the whole packet including length field 
00388     int lenlen = 0;
00389     int datalen = 0;
00390 
00391     #define MQTTSN_MIN_PACKET_LENGTH 2
00392     // 1. read the packet, datagram style 
00393     if ((len = ipstack.read(readbuf, MAX_PACKET_SIZE, timer.left_ms())) < MQTTSN_MIN_PACKET_LENGTH)
00394         goto exit;
00395         
00396     // 2. read the length.  This is variable in itself 
00397     lenlen = MQTTSNPacket_decode(readbuf, len, &datalen);
00398     if (datalen != len)
00399         goto exit; // there was an error 
00400         
00401     rc = readbuf[lenlen];
00402     if (this->duration > 0)
00403         last_received.countdown(this->duration); // record the fact that we have successfully received a packet
00404 exit:
00405         
00406 #if defined(MQTT_DEBUG)
00407     char printbuf[50];
00408     DEBUG("Rc %d from receiving packet %s\n", rc, MQTTPacket_toString(printbuf, sizeof(printbuf), readbuf, len));
00409 #endif
00410     return rc;
00411 }
00412 
00413 
00414 // assume topic filter and name is in correct format
00415 // # can only be at end
00416 // + and # can only be next to separator
00417 template<class Network, class Timer, int a, int b>
00418 bool MQTTSN::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTSNString& topicName)
00419 {
00420     char* curf = topicFilter;
00421     char* curn = topicName.lenstring.data;
00422     char* curn_end = curn + topicName.lenstring.len;
00423 
00424     while (*curf && curn < curn_end)
00425     {
00426         if (*curn == '/' && *curf != '/')
00427             break;
00428         if (*curf != '+' && *curf != '#' && *curf != *curn)
00429             break;
00430         if (*curf == '+')
00431         {   // skip until we meet the next separator, or end of string
00432             char* nextpos = curn + 1;
00433             while (nextpos < curn_end && *nextpos != '/')
00434                 nextpos = ++curn + 1;
00435         }
00436         else if (*curf == '#')
00437             curn = curn_end - 1;    // skip until end of string
00438         curf++;
00439         curn++;
00440     };
00441 
00442     return (curn == curn_end) && (*curf == '\0');
00443 }
00444 
00445 
00446 
00447 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
00448 int MQTTSN::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTSN_topicid& topic, Message& message)
00449 {
00450     int rc = FAILURE;
00451 
00452     // we have to find the right message handler - indexed by topic
00453     for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00454     {
00455         MQTTSNString str = MQTTSNString_initializer;
00456         str.lenstring.data = topic.data.long_.name;
00457         str.lenstring.len = topic.data.long_.len;
00458         if (messageHandlers[i].topicFilter != 0 && (MQTTSNTopic_equals(&topic, messageHandlers[i].topicFilter) ||
00459                 isTopicMatched(messageHandlers[i].topicFilter->data.long_.name, str)))
00460         {
00461             if (messageHandlers[i].fp.attached())
00462             {
00463                 MessageData md(topic, message);
00464                 messageHandlers[i].fp(md);
00465                 rc = SUCCESS;
00466             }
00467         }
00468     }
00469 
00470     if (rc == FAILURE && defaultMessageHandler.attached())
00471     {
00472         MessageData md(topic, message);
00473         defaultMessageHandler(md);
00474         rc = SUCCESS;
00475     }
00476 
00477     return rc;
00478 }
00479 
00480 
00481 
00482 template<class Network, class Timer, int a, int b>
00483 int MQTTSN::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms)
00484 {
00485     int rc = SUCCESS;
00486     Timer timer;
00487 
00488     timer.countdown_ms(timeout_ms);
00489     while (!timer.expired())
00490     {
00491         if (cycle(timer) == FAILURE)
00492         {
00493             rc = FAILURE;
00494             break;
00495         }
00496     }
00497 
00498     return rc;
00499 }
00500 
00501 
00502 template<class Network, class Timer, int MAX_PACKET_SIZE, int b>
00503 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::cycle(Timer& timer)
00504 {
00505     /* get one piece of work off the wire and one pass through */
00506 
00507     // read the socket, see what work is due
00508     unsigned short packet_type = readPacket(timer);
00509 
00510     int len = 0;
00511     unsigned char rc = SUCCESS;
00512 
00513     switch (packet_type)
00514     {
00515         case MQTTSN_CONNACK:
00516         case MQTTSN_PUBACK:
00517         case MQTTSN_SUBACK:
00518         case MQTTSN_REGACK:
00519             break;
00520         case MQTTSN_REGISTER:
00521         {
00522             unsigned short topicid, packetid;
00523             MQTTSNString topicName;
00524             rc = MQTTSN_RC_ACCEPTED;
00525             if (MQTTSNDeserialize_register(&topicid, &packetid, &topicName, readbuf, MAX_PACKET_SIZE) != 1)
00526                 goto exit;
00527             len = MQTTSNSerialize_regack(sendbuf, MAX_PACKET_SIZE, topicid, packetid, rc);
00528             if (len <= 0)
00529                 rc = FAILURE;
00530             else
00531                 rc = sendPacket(len, timer);
00532             break;
00533         }
00534         case MQTTSN_PUBLISH:
00535             MQTTSN_topicid topicid;
00536             Message msg;
00537             if (MQTTSNDeserialize_publish((unsigned char*)&msg.dup, (int*)&msg.qos, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicid,
00538                                  (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_PACKET_SIZE) != 1)
00539                 goto exit;
00540 #if MQTTCLIENT_QOS2
00541             if (msg.qos != QOS2)
00542 #endif
00543                 deliverMessage(topicid, msg);
00544 #if MQTTCLIENT_QOS2
00545             else if (isQoS2msgidFree(msg.id))
00546             {
00547                 if (useQoS2msgid(msg.id))
00548                     deliverMessage(topicid, msg);
00549                 else
00550                     WARN("Maximum number of incoming QoS2 messages exceeded");
00551             }   
00552 #endif
00553 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00554             if (msg.qos != QOS0)
00555             {
00556                 if (msg.qos == QOS1)
00557                     len = MQTTSNSerialize_puback(sendbuf, MAX_PACKET_SIZE, topicid.data.id, msg.id, 0);
00558                 else if (msg.qos == QOS2)
00559                     len = MQTTSNSerialize_pubrec(sendbuf, MAX_PACKET_SIZE, msg.id);
00560                 if (len <= 0)
00561                     rc = FAILURE;
00562                 else
00563                     rc = sendPacket(len, timer);
00564                 if (rc == FAILURE)
00565                     goto exit; // there was a problem
00566             }
00567             break;
00568 #endif
00569 #if MQTTCLIENT_QOS2
00570         case MQTTSN_PUBREC:
00571             unsigned short mypacketid;
00572             unsigned char dup, type;
00573             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_PACKET_SIZE) != 1)
00574                 rc = FAILURE;
00575             else if ((len = MQTTSerialize_ack(sendbuf, MAX_PACKET_SIZE, PUBREL, 0, mypacketid)) <= 0)
00576                 rc = FAILURE;
00577             else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet
00578                 rc = FAILURE; // there was a problem
00579             if (rc == FAILURE)
00580                 goto exit; // there was a problem
00581             break;
00582         case MQTTSN_PUBCOMP:
00583             break;
00584 #endif
00585         case MQTTSN_PINGRESP:
00586             ping_outstanding = false;
00587             break;
00588     }
00589     keepalive();
00590 exit:
00591     if (rc == SUCCESS)
00592         rc = packet_type;
00593     return rc;
00594 }
00595 
00596 
00597 template<class Network, class Timer, int MAX_PACKET_SIZE, int b>
00598 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::keepalive()
00599 {
00600     int rc = FAILURE;
00601 
00602     if (duration == 0)
00603     {
00604         rc = SUCCESS;
00605         goto exit;
00606     }
00607 
00608     if (last_sent.expired() || last_received.expired())
00609     {
00610         if (!ping_outstanding)
00611         {
00612             MQTTSNString clientid = MQTTSNString_initializer;
00613             Timer timer(1000);
00614             int len = MQTTSNSerialize_pingreq(sendbuf, MAX_PACKET_SIZE, clientid);
00615             if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet
00616                 ping_outstanding = true;
00617         }
00618     }
00619 
00620 exit:
00621     return rc;
00622 }
00623 
00624 
00625 // only used in single-threaded mode where one command at a time is in process
00626 template<class Network, class Timer, int a, int b>
00627 int MQTTSN::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer)
00628 {
00629     int rc = FAILURE;
00630 
00631     do
00632     {
00633         if (timer.expired())
00634             break; // we timed out
00635     }
00636     while ((rc = cycle(timer)) != packet_type);
00637 
00638     return rc;
00639 }
00640 
00641 
00642 template<class Network, class Timer, int MAX_PACKET_SIZE, int b>
00643 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::connect(MQTTSNPacket_connectData& options)
00644 {
00645     Timer connect_timer(command_timeout_ms);
00646     int rc = FAILURE;
00647     int len = 0;
00648 
00649     if (isconnected) // don't send connect packet again if we are already connected
00650         goto exit;
00651 
00652     this->duration = options.duration;
00653     this->cleansession = options.cleansession;
00654     if ((len = MQTTSNSerialize_connect(sendbuf, MAX_PACKET_SIZE, &options)) <= 0)
00655         goto exit;
00656     if ((rc = sendPacket(len, connect_timer)) != SUCCESS)  // send the connect packet
00657         goto exit; // there was a problem
00658 
00659     if (this->duration > 0)
00660         last_received.countdown(this->duration);
00661     // this will be a blocking call, wait for the connack
00662     if (waitfor(MQTTSN_CONNACK, connect_timer) == MQTTSN_CONNACK)
00663     {
00664         //unsigned char connack_rc = 255;
00665         int connack_rc = 255;
00666         if (MQTTSNDeserialize_connack(&connack_rc, readbuf, MAX_PACKET_SIZE) == 1)
00667             rc = connack_rc;
00668         else
00669             rc = FAILURE;
00670     }
00671     else
00672         rc = FAILURE;
00673         
00674 #if MQTTCLIENT_QOS2
00675     // resend an inflight publish
00676     if (inflightMsgid >0 && inflightQoS == QOS2 && pubrel)
00677     {
00678         if ((len = MQTTSerialize_ack(sendbuf, MAX_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0)
00679             rc = FAILURE;
00680         else
00681             rc = publish(len, connect_timer, inflightQoS);
00682     }
00683     else
00684 #endif
00685 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00686     if (inflightMsgid > 0)
00687     {
00688         memcpy(sendbuf, pubbuf, MAX_PACKET_SIZE);
00689         rc = publish(inflightLen, connect_timer, inflightQoS);
00690     }
00691 #endif
00692 
00693 exit:
00694     if (rc == SUCCESS)
00695         isconnected = true;
00696     return rc;
00697 }
00698 
00699 
00700 template<class Network, class Timer, int MAX_PACKET_SIZE, int b>
00701 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::connect()
00702 {
00703     MQTTSNPacket_connectData default_options = MQTTSNPacket_connectData_initializer;
00704     return connect(default_options);
00705 }
00706 
00707 
00708 template<class Network, class Timer, int MAX_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
00709 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(MQTTSN_topicid& topicFilter, enum QoS qos, messageHandler messageHandler)
00710 {
00711     int rc = FAILURE;
00712     Timer timer(command_timeout_ms);
00713     int len = 0;
00714 
00715     if (!isconnected)
00716         return FAILURE; // goto exit cannot cross variable initialization
00717         
00718     bool freeHandler = false;
00719     for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00720     {
00721         if (messageHandlers[i].topicFilter == 0)
00722         {
00723             freeHandler = true;
00724             break;
00725         }
00726     }
00727     if (!freeHandler)
00728     {                                 // No message handler free
00729         rc = MAX_SUBSCRIPTIONS_EXCEEDED;
00730         goto exit; 
00731     }
00732 
00733     len = MQTTSNSerialize_subscribe(sendbuf, MAX_PACKET_SIZE, 0, qos, packetid.getNext(), &topicFilter);
00734     if (len <= 0)
00735         goto exit;
00736     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
00737         goto exit;             // there was a problem
00738 
00739     if (waitfor(MQTTSN_SUBACK, timer) == MQTTSN_SUBACK)      // wait for suback
00740     {
00741         int grantedQoS = -1;
00742         unsigned short mypacketid;
00743         unsigned char rc;
00744         if (MQTTSNDeserialize_suback(&grantedQoS, &topicFilter.data.id, &mypacketid, &rc, readbuf, MAX_PACKET_SIZE) != 1)
00745             goto exit;
00746 
00747         if (qos != grantedQoS)
00748             goto exit;
00749 
00750         if (rc == MQTTSN_RC_ACCEPTED)
00751         {
00752             for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00753             {
00754                 if (messageHandlers[i].topicFilter == 0)
00755                 {
00756                     messageHandlers[i].topicFilter = &topicFilter;
00757                     messageHandlers[i].fp.attach(messageHandler);
00758                     rc = 0;
00759                     break;
00760                 }
00761             }
00762         }
00763     }
00764     else
00765         rc = FAILURE;
00766 
00767 exit:
00768     if (rc != SUCCESS)
00769         isconnected = false;
00770     return rc;
00771 }
00772 
00773 
00774 template<class Network, class Timer, int MAX_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
00775 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(MQTTSN_topicid& topicFilter)
00776 {
00777     int rc = FAILURE;
00778     Timer timer(command_timeout_ms);
00779     int len = 0;
00780 
00781     if (!isconnected)
00782         goto exit;
00783 
00784     if ((len = MQTTSNSerialize_unsubscribe(sendbuf, MAX_PACKET_SIZE, packetid.getNext(), &topicFilter)) <= 0)
00785         goto exit;
00786     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet
00787         goto exit; // there was a problem
00788 
00789     if (waitfor(MQTTSN_UNSUBACK, timer) == MQTTSN_UNSUBACK)
00790     {
00791         unsigned short mypacketid;  // should be the same as the packetid above
00792         if (MQTTSNDeserialize_unsuback(&mypacketid, readbuf, MAX_PACKET_SIZE) == 1)
00793             rc = 0;
00794     }
00795     else
00796         rc = FAILURE;
00797 
00798 exit:
00799     if (rc != SUCCESS)
00800         isconnected = false;
00801     return rc;
00802 }
00803 
00804 
00805 template<class Network, class Timer, int MAX_PACKET_SIZE, int b>
00806 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos)
00807 {
00808     int rc;
00809     
00810     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet
00811         goto exit; // there was a problem
00812 
00813 #if MQTTCLIENT_QOS1 
00814     if (qos == QOS1)
00815     {
00816         if (waitfor(MQTTSN_PUBACK, timer) == MQTTSN_PUBACK)
00817         {
00818             unsigned short mypacketid;
00819             unsigned char type;
00820             if (MQTTSNDeserialize_ack(&type, &mypacketid, readbuf, MAX_PACKET_SIZE) != 1)
00821                 rc = FAILURE;
00822             else if (inflightMsgid == mypacketid)
00823                 inflightMsgid = 0;
00824         }
00825         else
00826             rc = FAILURE;
00827     }
00828 #elif MQTTCLIENT_QOS2
00829     else if (qos == QOS2)
00830     {
00831         if (waitfor(PUBCOMP, timer) == PUBCOMP)
00832         {
00833             unsigned short mypacketid;
00834             unsigned char type;
00835             if (MQTTDeserialize_ack(&type, &mypacketid, readbuf, MAX_PACKET_SIZE) != 1)
00836                 rc = FAILURE;
00837             else if (inflightMsgid == mypacketid)
00838                 inflightMsgid = 0;
00839         }
00840         else
00841             rc = FAILURE;
00842     }
00843 #endif
00844 
00845 exit:
00846     if (rc != SUCCESS)
00847         isconnected = false;
00848     return rc;
00849 }
00850 
00851 
00852 
00853 template<class Network, class Timer, int MAX_PACKET_SIZE, int b>
00854 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::publish(MQTTSN_topicid& topic, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained)
00855 {
00856     int rc = FAILURE;
00857     Timer timer(command_timeout_ms);
00858     int len = 0;
00859 
00860     if (!isconnected)
00861         goto exit;
00862 
00863 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00864     if (qos == QOS1 || qos == QOS2)
00865         id = packetid.getNext();
00866 #endif
00867 
00868     len = MQTTSNSerialize_publish(sendbuf, MAX_PACKET_SIZE, 0, qos, retained, id,
00869               topic, (unsigned char*)payload, payloadlen);
00870     if (len <= 0)
00871         goto exit;
00872         
00873 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00874     if (!cleansession)
00875     {
00876         memcpy(pubbuf, sendbuf, len);
00877         inflightMsgid = id;
00878         inflightLen = len;
00879         inflightQoS = qos;
00880 #if MQTTCLIENT_QOS2
00881         pubrel = false;
00882 #endif
00883     }
00884 #endif
00885         
00886     rc = publish(len, timer, qos);
00887 exit:
00888     return rc;
00889 }
00890 
00891 
00892 template<class Network, class Timer, int MAX_PACKET_SIZE, int b>
00893 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::publish(MQTTSN_topicid& topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained)
00894 {
00895     unsigned short id = 0;  // dummy - not used for anything
00896     return publish(topicName, payload, payloadlen, id, qos, retained);
00897 }
00898 
00899 
00900 template<class Network, class Timer, int MAX_PACKET_SIZE, int b>
00901 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::publish(MQTTSN_topicid& topicName, Message& message)
00902 {
00903     return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained);
00904 }
00905 
00906 
00907 template<class Network, class Timer, int MAX_PACKET_SIZE, int b>
00908 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::disconnect(unsigned short duration)
00909 {
00910     int rc = FAILURE;
00911     Timer timer(command_timeout_ms);     // we might wait for incomplete incoming publishes to complete
00912     int int_duration = (duration == 0) ? -1 : (int)duration;
00913     int len = MQTTSNSerialize_disconnect(sendbuf, MAX_PACKET_SIZE, int_duration);
00914     if (len > 0)
00915         rc = sendPacket(len, timer);            // send the disconnect packet
00916 
00917     isconnected = false;
00918     return rc;
00919 }
00920 
00921 
00922 #endif
00923