High level MQTT-SN C++ library

Dependencies:   EthernetInterface FP MQTTSNPacket

Dependents:   HelloMQTTSN

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTSNClient.h Source File

MQTTSNClient.h

00001 /*******************************************************************************
00002  * Copyright (c) 2014, 2015 IBM Corp.
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors:
00014  *    Ian Craggs - initial API and implementation and/or initial documentation
00015  *******************************************************************************/
00016 
00017 #if !defined(MQTTSNCLIENT_H)
00018 #define MQTTSNCLIENT_H
00019 
00020 #include "FP.h"
00021 #include "MQTTSNPacket.h"
00022 #include "stdio.h"
00023 #include "MQTTLogging.h"
00024 
00025 // Data limits
00026 #if !defined(MAX_REGISTRATIONS)
00027     #define MAX_REGISTRATIONS 5
00028 #endif
00029 #if !defined(MAX_REGISTRATION_TOPIC_NAME_LENGTH)
00030     #define MAX_REGISTRATION_TOPIC_NAME_LENGTH 20
00031 #endif
00032 #if !defined(MAX_INCOMING_QOS2_MESSAGES)
00033     #define MAX_INCOMING_QOS2_MESSAGES 10
00034 #endif
00035 
00036 #if !defined(MQTTSNCLIENT_QOS1)
00037     #define MQTTSNCLIENT_QOS1 1
00038 #endif
00039 #if !defined(MQTTSNCLIENT_QOS2)
00040     #define MQTTSNCLIENT_QOS2 0
00041 #endif
00042 
00043 namespace MQTTSN
00044 {
00045 
00046 
00047 enum QoS { QOS0, QOS1, QOS2 };
00048 
00049 // all failure return codes must be negative
00050 enum returnCode { MAX_SUBSCRIPTIONS_EXCEEDED = -3, BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
00051 
00052 
00053 struct Message
00054 {
00055     enum QoS qos;
00056     bool retained;
00057     bool dup;
00058     unsigned short id;
00059     void *payload;
00060     size_t payloadlen;
00061 };
00062 
00063 
00064 struct MessageData
00065 {
00066     MessageData(MQTTSN_topicid &aTopic, struct Message &aMessage)  : message(aMessage), topic(aTopic)
00067     { }
00068 
00069     struct Message &message;
00070     MQTTSN_topicid &topic;
00071 };
00072 
00073 
00074 class PacketId
00075 {
00076 public:
00077     PacketId()
00078     {
00079         next = 0;
00080     }
00081 
00082     int getNext()
00083     {
00084         return next = (next == MAX_PACKET_ID) ? 1 : ++next;
00085     }
00086 
00087 private:
00088     static const int MAX_PACKET_ID = 65535;
00089     int next;
00090 };
00091 
00092 
00093 /**
00094  * @class MQTTSNClient
00095  * @brief blocking, non-threaded MQTTSN client API
00096  *
00097  * This version of the API blocks on all method calls, until they are complete.  This means that only one
00098  * MQTT request can be in process at any one time.
00099  * @param Network a network class which supports send, receive
00100  * @param Timer a timer class with the methods:
00101  */
00102 template<class Network, class Timer, int MAX_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5>
00103 class Client
00104 {
00105 
00106 public:
00107 
00108     typedef void (*messageHandler)(MessageData&);
00109 
00110     /** Construct the client
00111      *  @param network - pointer to an instance of the Network class - must be connected to the endpoint
00112      *      before calling MQTT connect
00113      *  @param limits an instance of the Limit class - to alter limits as required
00114      */
00115     Client(Network& network, unsigned int command_timeout_ms = 30000);
00116 
00117     /** Set the default message handling callback - used for any message which does not match a subscription message handler
00118      *  @param mh - pointer to the callback function
00119      */
00120     void setDefaultMessageHandler(messageHandler mh)
00121     {
00122         defaultMessageHandler.attach(mh);
00123     }
00124 
00125     /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
00126      *  The nework object must be connected to the network endpoint before calling this
00127      *  Default connect options are used
00128      *  @return success code -
00129      */
00130     int connect();
00131     
00132     /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
00133      *  The nework object must be connected to the network endpoint before calling this
00134      *  @param options - connect options
00135      *  @return success code -
00136      */
00137     int connect(MQTTSNPacket_connectData& options);
00138 
00139     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00140      *  @param topic - the topic to publish to
00141      *  @param message - the message to send
00142      *  @return success code -
00143      */
00144     int publish(MQTTSN_topicid& topic, Message& message);
00145     
00146     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00147      *  @param topic - the topic to publish to
00148      *  @param payload - the data to send
00149      *  @param payloadlen - the length of the data
00150      *  @param qos - the QoS to send the publish at
00151      *  @param retained - whether the message should be retained
00152      *  @return success code -
00153      */
00154     int publish(MQTTSN_topicid &topic, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false);
00155     
00156     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00157      *  @param topic - the topic to publish to
00158      *  @param payload - the data to send
00159      *  @param payloadlen - the length of the data
00160      *  @param id - the packet id used - returned 
00161      *  @param qos - the QoS to send the publish at
00162      *  @param retained - whether the message should be retained
00163      *  @return success code -
00164      */
00165     int publish(MQTTSN_topicid& topic, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false);
00166     
00167     /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
00168      *  @param topicFilter - a topic pattern which can include wildcards
00169      *  @param qos - the MQTT QoS to subscribe at
00170      *  @param mh - the callback function to be invoked when a message is received for this subscription
00171      *  @return success code -
00172      */
00173     int subscribe(MQTTSN_topicid& topicFilter, enum QoS qos, enum QoS &grantedQoS, messageHandler mh);
00174 
00175     /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback
00176      *  @param topicFilter - a topic pattern which can include wildcards
00177      *  @return success code -
00178      */
00179     int unsubscribe(MQTTSN_topicid& topicFilter);
00180 
00181     /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state
00182      *  @param duration - used for sleeping clients, 0 means no duration
00183      *  @return success code -
00184      */
00185     int disconnect(unsigned short duration = 0);
00186 
00187     /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive
00188      *  yield can be called if no other MQTT operation is needed.  This will also allow messages to be
00189      *  received.
00190      *  @param timeout_ms the time to wait, in milliseconds
00191      *  @return success code - on failure, this means the client has disconnected
00192      */
00193     int yield(unsigned long timeout_ms = 1000L);
00194 
00195     /** Is the client connected?
00196      *  @return flag - is the client connected or not?
00197      */
00198     bool isConnected()
00199     {
00200         return isconnected;
00201     }
00202     
00203 protected:
00204 
00205     int cycle(Timer& timer);
00206     int waitfor(int packet_type, Timer& timer);
00207 
00208 private:
00209 
00210     int keepalive();
00211     int publish(int len, Timer& timer, enum QoS qos);
00212 
00213     int readPacket(Timer& timer);
00214     int sendPacket(int length, Timer& timer);
00215     int deliverMessage(MQTTSN_topicid& topic, Message& message);
00216     bool isTopicMatched(char* topicFilter, MQTTSNString& topicName);
00217 
00218     Network& ipstack;
00219     unsigned long command_timeout_ms;
00220 
00221     unsigned char sendbuf[MAX_PACKET_SIZE];
00222     unsigned char readbuf[MAX_PACKET_SIZE];
00223 
00224     Timer last_sent, last_received;
00225     unsigned short duration;
00226     bool ping_outstanding;
00227     bool cleansession;
00228 
00229     PacketId packetid;
00230 
00231     struct MessageHandlers
00232     {
00233         MQTTSN_topicid* topicFilter;
00234         FP<void, MessageData&> fp;
00235     } messageHandlers[MAX_MESSAGE_HANDLERS];      // Message handlers are indexed by subscription topic
00236 
00237     FP<void, MessageData&> defaultMessageHandler;
00238 
00239     bool isconnected;
00240     
00241     struct Registrations
00242     {
00243         unsigned short id;
00244         char name[MAX_REGISTRATION_TOPIC_NAME_LENGTH];
00245     } registrations[MAX_REGISTRATIONS];
00246 
00247 #if MQTTSNCLIENT_QOS1 || MQTTSNCLIENT_QOS2
00248     unsigned char pubbuf[MAX_PACKET_SIZE];  // store the last publish for sending on reconnect
00249     int inflightLen;
00250     unsigned short inflightMsgid;
00251     enum QoS inflightQoS;
00252 #endif
00253 
00254 #if MQTTSNCLIENT_QOS2
00255     bool pubrel;
00256     unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES];
00257     bool isQoS2msgidFree(unsigned short id);
00258     bool useQoS2msgid(unsigned short id);
00259 #endif
00260 
00261 };
00262 
00263 }
00264 
00265 
00266 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
00267 MQTTSN::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms)  : ipstack(network), packetid()
00268 {
00269     last_sent = Timer();
00270     last_received = Timer();
00271     ping_outstanding = false;
00272     for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00273         messageHandlers[i].topicFilter = 0;
00274     this->command_timeout_ms = command_timeout_ms;
00275     isconnected = false;
00276     
00277 #if MQTTSNCLIENT_QOS1 || MQTTSNCLIENT_QOS2
00278     inflightMsgid = 0;
00279     inflightQoS = QOS0;
00280 #endif
00281 
00282     
00283 #if MQTTSNCLIENT_QOS2
00284     pubrel = false;
00285     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00286         incomingQoS2messages[i] = 0;
00287 #endif
00288 }
00289 
00290 #if MQTTSNCLIENT_QOS2
00291 template<class Network, class Timer, int a, int b>
00292 bool MQTTSN::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id)
00293 {
00294     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00295     {
00296         if (incomingQoS2messages[i] == id)
00297             return false;
00298     }
00299     return true;
00300 }
00301 
00302 
00303 template<class Network, class Timer, int a, int b>
00304 bool MQTTSN::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id)
00305 {
00306     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00307     {
00308         if (incomingQoS2messages[i] == 0)
00309         {
00310             incomingQoS2messages[i] = id;
00311             return true;
00312         }
00313     }
00314     return false;
00315 }
00316 #endif
00317 
00318 
00319 template<class Network, class Timer, int a, int b>
00320 int MQTTSN::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer)
00321 {
00322     int rc = FAILURE,
00323         sent = 0;
00324 
00325     do
00326     {
00327         sent = ipstack.write(sendbuf, length, timer.left_ms());
00328         printf("sendPacket, rc %d from write of %d bytes\n", sent, length);
00329         if (sent < 0)  // there was an error writing the data
00330             break;
00331     }
00332     while (sent != length && !timer.expired());
00333     
00334     if (sent == length)
00335     {
00336         if (this->duration > 0)
00337             last_sent.countdown(this->duration); // record the fact that we have successfully sent the packet
00338         rc = SUCCESS;
00339     }
00340     else
00341         rc = FAILURE;
00342         
00343 #if defined(MQTT_DEBUG)
00344     char printbuf[50];
00345     DEBUG("Rc %d from sending packet %s\n", rc, MQTTPacket_toString(printbuf, sizeof(printbuf), sendbuf, length));
00346 #endif
00347     return rc;
00348 }
00349 
00350 
00351 /**
00352  * If any read fails in this method, then we should disconnect from the network, as on reconnect
00353  * the packets can be retried.
00354  * @param timeout the max time to wait for the packet read to complete, in milliseconds
00355  * @return the MQTT packet type, or -1 if none
00356  */
00357 template<class Network, class Timer, int MAX_PACKET_SIZE, int b>
00358 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::readPacket(Timer& timer)
00359 {
00360     int rc = FAILURE;
00361     int len = 0;  // the length of the whole packet including length field 
00362     int lenlen = 0;
00363     int datalen = 0;
00364 
00365     #define MQTTSN_MIN_PACKET_LENGTH 3
00366     // 1. read the packet, datagram style 
00367     if ((len = ipstack.read(readbuf, MAX_PACKET_SIZE, timer.left_ms())) < MQTTSN_MIN_PACKET_LENGTH)
00368         goto exit;
00369         
00370     // 2. read the length.  This is variable in itself 
00371     lenlen = MQTTSNPacket_decode(readbuf, len, &datalen);
00372     if (datalen != len)
00373         goto exit; // there was an error 
00374         
00375     rc = readbuf[lenlen];
00376     if (this->duration > 0)
00377         last_received.countdown(this->duration); // record the fact that we have successfully received a packet
00378 exit:
00379         
00380 #if defined(MQTT_DEBUG)
00381     char printbuf[50];
00382     DEBUG("Rc %d from receiving packet %s\n", rc, MQTTPacket_toString(printbuf, sizeof(printbuf), readbuf, len));
00383 #endif
00384     return rc;
00385 }
00386 
00387 
00388 // assume topic filter and name is in correct format
00389 // # can only be at end
00390 // + and # can only be next to separator
00391 template<class Network, class Timer, int a, int b>
00392 bool MQTTSN::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTSNString& topicName)
00393 {
00394     char* curf = topicFilter;
00395     char* curn = topicName.lenstring.data;
00396     char* curn_end = curn + topicName.lenstring.len;
00397 
00398     while (*curf && curn < curn_end)
00399     {
00400         if (*curn == '/' && *curf != '/')
00401             break;
00402         if (*curf != '+' && *curf != '#' && *curf != *curn)
00403             break;
00404         if (*curf == '+')
00405         {   // skip until we meet the next separator, or end of string
00406             char* nextpos = curn + 1;
00407             while (nextpos < curn_end && *nextpos != '/')
00408                 nextpos = ++curn + 1;
00409         }
00410         else if (*curf == '#')
00411             curn = curn_end - 1;    // skip until end of string
00412         curf++;
00413         curn++;
00414     };
00415 
00416     return (curn == curn_end) && (*curf == '\0');
00417 }
00418 
00419 
00420 
00421 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
00422 int MQTTSN::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTSN_topicid& topic, Message& message)
00423 {
00424     int rc = FAILURE;
00425     printf("deliverMessage topic id is %d\n", topic.data.id);
00426 
00427     // we have to find the right message handler - indexed by topic
00428     for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00429     {
00430         if (messageHandlers[i].topicFilter != 0)
00431             printf("messageHandler %d topic id is %d\n", i, messageHandlers[i].topicFilter->data.id);
00432         if (messageHandlers[i].topicFilter != 0 && (topic.data.id == messageHandlers[i].topicFilter->data.id))
00433         //MQTTSNtopic_equals(&topic, messageHandlers[i].topicFilter) ||
00434          //       isTopicMatched(messageHandlers[i].topicFilter, topic)))
00435         {
00436             if (messageHandlers[i].fp.attached())
00437             {
00438                 MessageData md(topic, message);
00439                 messageHandlers[i].fp(md);
00440                 rc = SUCCESS;
00441             }
00442         }
00443     }
00444 
00445     if (rc == FAILURE && defaultMessageHandler.attached())
00446     {
00447         MessageData md(topic, message);
00448         defaultMessageHandler(md);
00449         rc = SUCCESS;
00450     }
00451 
00452     return rc;
00453 }
00454 
00455 
00456 
00457 template<class Network, class Timer, int a, int b>
00458 int MQTTSN::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms)
00459 {
00460     int rc = SUCCESS;
00461     Timer timer = Timer();
00462 
00463     timer.countdown_ms(timeout_ms);
00464     while (!timer.expired())
00465     {
00466         if (cycle(timer) == FAILURE)
00467         {
00468             rc = FAILURE;
00469             break;
00470         }
00471     }
00472 
00473     return rc;
00474 }
00475 
00476 
00477 template<class Network, class Timer, int MAX_PACKET_SIZE, int b>
00478 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::cycle(Timer& timer)
00479 {
00480     /* get one piece of work off the wire and one pass through */
00481 
00482     // read the socket, see what work is due
00483     unsigned short packet_type = readPacket(timer);
00484 
00485     int len = 0;
00486     int rc = SUCCESS;
00487 
00488     switch (packet_type)
00489     {
00490         case MQTTSN_CONNACK:
00491         case MQTTSN_PUBACK:
00492         case MQTTSN_SUBACK:
00493         case MQTTSN_REGACK:
00494             break;
00495         case MQTTSN_REGISTER:
00496         {
00497             unsigned short topicid, packetid;
00498             MQTTSNString topicName;
00499             unsigned char reg_rc = MQTTSN_RC_ACCEPTED;
00500             if (MQTTSNDeserialize_register(&topicid, &packetid, &topicName, readbuf, MAX_PACKET_SIZE) != 1)
00501                 goto exit;
00502             len = MQTTSNSerialize_regack(sendbuf, MAX_PACKET_SIZE, topicid, packetid, reg_rc);
00503             if (len <= 0)
00504                 rc = FAILURE;
00505             else
00506                 rc = sendPacket(len, timer);
00507             break;
00508         }
00509         case MQTTSN_PUBLISH:
00510             MQTTSN_topicid topicid;
00511             Message msg;
00512             if (MQTTSNDeserialize_publish((unsigned char*)&msg.dup, (int*)&msg.qos, (unsigned char*)&msg.retained, &msg.id, &topicid,
00513                                  (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_PACKET_SIZE) != 1)
00514                 goto exit;
00515 #if MQTTSNCLIENT_QOS2
00516             if (msg.qos != QOS2)
00517 #endif
00518                 deliverMessage(topicid, msg);
00519 #if MQTTSNCLIENT_QOS2
00520             else if (isQoS2msgidFree(msg.id))
00521             {
00522                 if (useQoS2msgid(msg.id))
00523                     deliverMessage(topicid, msg);
00524                 else
00525                     WARN("Maximum number of incoming QoS2 messages exceeded");
00526             }   
00527 #endif
00528 #if MQTTSNCLIENT_QOS1 || MQTTSNCLIENT_QOS2
00529             if (msg.qos != QOS0)
00530             {
00531                 if (msg.qos == QOS1)
00532                     len = MQTTSNSerialize_puback(sendbuf, MAX_PACKET_SIZE, topicid.data.id, msg.id, 0);
00533                 else if (msg.qos == QOS2)
00534                     len = MQTTSNSerialize_pubrec(sendbuf, MAX_PACKET_SIZE, msg.id);
00535                 if (len <= 0)
00536                     rc = FAILURE;
00537                 else
00538                 {
00539                     printf("sending puback len %d\n", len);
00540                     rc = sendPacket(len, timer);
00541                     printf("rc %d from sending puback\n", rc);
00542                 }
00543                 if (rc == FAILURE)
00544                     goto exit; // there was a problem
00545             }
00546             break;
00547 #endif
00548 #if MQTTSNCLIENT_QOS2
00549         case MQTTSN_PUBREC:
00550             unsigned short mypacketid;
00551             unsigned char type;
00552             if (MQTTSNDeserialize_ack(&type, &mypacketid, readbuf, MAX_PACKET_SIZE) != 1)
00553                 rc = FAILURE;
00554             else if ((len = MQTTSNSerialize_pubrel(sendbuf, MAX_PACKET_SIZE, mypacketid)) <= 0)
00555                 rc = FAILURE;
00556             else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet
00557                 rc = FAILURE; // there was a problem
00558             if (rc == FAILURE)
00559                 goto exit; // there was a problem
00560             break;
00561         case MQTTSN_PUBCOMP:
00562             break;
00563 #endif
00564         case MQTTSN_PINGRESP:
00565             ping_outstanding = false;
00566             break;
00567     }
00568     keepalive();
00569 exit:
00570     if (rc == SUCCESS)
00571         rc = packet_type;
00572     return rc;
00573 }
00574 
00575 
00576 template<class Network, class Timer, int MAX_PACKET_SIZE, int b>
00577 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::keepalive()
00578 {
00579     int rc = FAILURE;
00580 
00581     if (duration == 0)
00582     {
00583         rc = SUCCESS;
00584         goto exit;
00585     }
00586 
00587     if (last_sent.expired() || last_received.expired())
00588     {
00589         if (!ping_outstanding)
00590         {
00591             MQTTSNString clientid = MQTTSNString_initializer;
00592             Timer timer = Timer(1000);
00593             int len = MQTTSNSerialize_pingreq(sendbuf, MAX_PACKET_SIZE, clientid);
00594             if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet
00595                 ping_outstanding = true;
00596         }
00597     }
00598 
00599 exit:
00600     return rc;
00601 }
00602 
00603 
00604 // only used in single-threaded mode where one command at a time is in process
00605 template<class Network, class Timer, int a, int b>
00606 int MQTTSN::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer)
00607 {
00608     int rc = FAILURE;
00609 
00610     do
00611     {
00612         if (timer.expired())
00613             break; // we timed out
00614     }
00615     while ((rc = cycle(timer)) != packet_type);
00616 
00617     return rc;
00618 }
00619 
00620 
00621 template<class Network, class Timer, int MAX_PACKET_SIZE, int b>
00622 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::connect(MQTTSNPacket_connectData& options)
00623 {
00624     Timer connect_timer = Timer(command_timeout_ms);
00625     int rc = FAILURE;
00626     int len = 0;
00627 
00628     if (isconnected) // don't send connect packet again if we are already connected
00629         goto exit;
00630 
00631     this->duration = options.duration;
00632     this->cleansession = options.cleansession;
00633     if ((len = MQTTSNSerialize_connect(sendbuf, MAX_PACKET_SIZE, &options)) <= 0)
00634         goto exit;
00635     if ((rc = sendPacket(len, connect_timer)) != SUCCESS)  // send the connect packet
00636         goto exit; // there was a problem
00637 
00638     if (this->duration > 0)
00639         last_received.countdown(this->duration);
00640     // this will be a blocking call, wait for the connack
00641     if (waitfor(MQTTSN_CONNACK, connect_timer) == MQTTSN_CONNACK)
00642     {
00643         //unsigned char connack_rc = 255;
00644         int connack_rc = 255;
00645         if (MQTTSNDeserialize_connack(&connack_rc, readbuf, MAX_PACKET_SIZE) == 1)
00646             rc = connack_rc;
00647         else
00648             rc = FAILURE;
00649     }
00650     else
00651         rc = FAILURE;
00652         
00653 #if MQTTSNCLIENT_QOS2
00654     // resend an inflight publish
00655     if (inflightMsgid >0 && inflightQoS == QOS2 && pubrel)
00656     {
00657         if ((len = MQTTSNSerialize_pubrel(sendbuf, MAX_PACKET_SIZE, inflightMsgid)) <= 0)
00658             rc = FAILURE;
00659         else
00660             rc = publish(len, connect_timer, inflightQoS);
00661     }
00662     else
00663 #endif
00664 #if MQTTSNCLIENT_QOS1 || MQTTSNCLIENT_QOS2
00665     if (inflightMsgid > 0)
00666     {
00667         memcpy(sendbuf, pubbuf, MAX_PACKET_SIZE);
00668         rc = publish(inflightLen, connect_timer, inflightQoS);
00669     }
00670 #endif
00671 
00672 exit:
00673     if (rc == SUCCESS)
00674         isconnected = true;
00675     return rc;
00676 }
00677 
00678 
00679 template<class Network, class Timer, int MAX_PACKET_SIZE, int b>
00680 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::connect()
00681 {
00682     MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
00683     return connect(default_options);
00684 }
00685 
00686 
00687 template<class Network, class Timer, int MAX_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
00688 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(MQTTSN_topicid& topicFilter, enum QoS qos, enum QoS &grantedQoS, messageHandler messageHandler)
00689 {
00690     int rc = FAILURE;
00691     Timer timer = Timer(command_timeout_ms);
00692     int len = 0;
00693 
00694     if (!isconnected)
00695         goto exit;
00696         
00697     bool freeHandler = false;
00698     for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00699     {
00700         if (messageHandlers[i].topicFilter == 0)
00701         {
00702             freeHandler = true;
00703             break;
00704         }
00705     }
00706     if (!freeHandler)
00707     {                                 // No message handler free
00708         rc = MAX_SUBSCRIPTIONS_EXCEEDED;
00709         goto exit; 
00710     }
00711 
00712     len = MQTTSNSerialize_subscribe(sendbuf, MAX_PACKET_SIZE, 0, qos, packetid.getNext(), &topicFilter);
00713     if (len <= 0)
00714         goto exit;
00715     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
00716         goto exit;             // there was a problem
00717 
00718     if (waitfor(MQTTSN_SUBACK, timer) == MQTTSN_SUBACK)      // wait for suback
00719     {
00720         unsigned short mypacketid;
00721         unsigned char suback_rc;
00722         if (MQTTSNDeserialize_suback((int*)&grantedQoS, &topicFilter.data.id, &mypacketid, &suback_rc, readbuf, MAX_PACKET_SIZE) != 1)
00723             rc = FAILURE;
00724         else
00725             rc = suback_rc;
00726         if (suback_rc == MQTTSN_RC_ACCEPTED)
00727         {
00728             for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00729             {
00730                 if (messageHandlers[i].topicFilter == 0)
00731                 {
00732                     messageHandlers[i].topicFilter = &topicFilter;
00733                     messageHandlers[i].fp.attach(messageHandler);
00734                     rc = 0;
00735                     break;
00736                 }
00737             }
00738         }
00739     }
00740     else
00741         rc = FAILURE;
00742 
00743 exit:
00744     if (rc != SUCCESS)
00745         isconnected = false;
00746     return rc;
00747 }
00748 
00749 
00750 template<class Network, class Timer, int MAX_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
00751 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(MQTTSN_topicid& topicFilter)
00752 {
00753     int rc = FAILURE;
00754     Timer timer = Timer(command_timeout_ms);
00755     int len = 0;
00756 
00757     if (!isconnected)
00758         goto exit;
00759 
00760     if ((len = MQTTSNSerialize_unsubscribe(sendbuf, MAX_PACKET_SIZE, packetid.getNext(), &topicFilter)) <= 0)
00761         goto exit;
00762     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet
00763         goto exit; // there was a problem
00764 
00765     if (waitfor(MQTTSN_UNSUBACK, timer) == MQTTSN_UNSUBACK)
00766     {
00767         unsigned short mypacketid;  // should be the same as the packetid above
00768         if (MQTTSNDeserialize_unsuback(&mypacketid, readbuf, MAX_PACKET_SIZE) == 1)
00769             rc = 0;
00770     }
00771     else
00772         rc = FAILURE;
00773 
00774 exit:
00775     if (rc != SUCCESS)
00776         isconnected = false;
00777     return rc;
00778 }
00779 
00780 
00781 template<class Network, class Timer, int MAX_PACKET_SIZE, int b>
00782 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos)
00783 {
00784     int rc;
00785     
00786     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet
00787         goto exit; // there was a problem
00788 
00789 #if MQTTSNCLIENT_QOS1 
00790     if (qos == QOS1)
00791     {
00792         if (waitfor(MQTTSN_PUBACK, timer) == MQTTSN_PUBACK)
00793         {
00794             unsigned short mypacketid;
00795             unsigned char type;
00796             if (MQTTSNDeserialize_ack(&type, &mypacketid, readbuf, MAX_PACKET_SIZE) != 1)
00797                 rc = FAILURE;
00798             else if (inflightMsgid == mypacketid)
00799                 inflightMsgid = 0;
00800         }
00801         else
00802             rc = FAILURE;
00803     }
00804 #elif MQTTSNCLIENT_QOS2
00805     else if (qos == QOS2)
00806     {
00807         if (waitfor(PUBCOMP, timer) == PUBCOMP)
00808         {
00809             unsigned short mypacketid;
00810             unsigned char type;
00811             if (MQTTDeserialize_ack(&type, &mypacketid, readbuf, MAX_PACKET_SIZE) != 1)
00812                 rc = FAILURE;
00813             else if (inflightMsgid == mypacketid)
00814                 inflightMsgid = 0;
00815         }
00816         else
00817             rc = FAILURE;
00818     }
00819 #endif
00820 
00821 exit:
00822     if (rc != SUCCESS)
00823         isconnected = false;
00824     return rc;
00825 }
00826 
00827 
00828 
00829 template<class Network, class Timer, int MAX_PACKET_SIZE, int b>
00830 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::publish(MQTTSN_topicid& topic, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained)
00831 {
00832     int rc = FAILURE;
00833     Timer timer = Timer(command_timeout_ms);
00834     int len = 0;
00835 
00836     if (!isconnected)
00837         goto exit;
00838 
00839 #if MQTTSNCLIENT_QOS1 || MQTTSNCLIENT_QOS2
00840     if (qos == QOS1 || qos == QOS2)
00841         id = packetid.getNext();
00842 #endif
00843 
00844     len = MQTTSNSerialize_publish(sendbuf, MAX_PACKET_SIZE, 0, qos, retained, id,
00845               topic, (unsigned char*)payload, payloadlen);
00846     if (len <= 0)
00847         goto exit;
00848         
00849 #if MQTTSNCLIENT_QOS1 || MQTTSNCLIENT_QOS2
00850     if (!cleansession)
00851     {
00852         memcpy(pubbuf, sendbuf, len);
00853         inflightMsgid = id;
00854         inflightLen = len;
00855         inflightQoS = qos;
00856 #if MQTTSNCLIENT_QOS2
00857         pubrel = false;
00858 #endif
00859     }
00860 #endif
00861         
00862     rc = publish(len, timer, qos);
00863 exit:
00864     return rc;
00865 }
00866 
00867 
00868 template<class Network, class Timer, int MAX_PACKET_SIZE, int b>
00869 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::publish(MQTTSN_topicid& topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained)
00870 {
00871     unsigned short id = 0;  // dummy - not used for anything
00872     return publish(topicName, payload, payloadlen, id, qos, retained);
00873 }
00874 
00875 
00876 template<class Network, class Timer, int MAX_PACKET_SIZE, int b>
00877 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::publish(MQTTSN_topicid& topicName, Message& message)
00878 {
00879     return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained);
00880 }
00881 
00882 
00883 template<class Network, class Timer, int MAX_PACKET_SIZE, int b>
00884 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::disconnect(unsigned short duration)
00885 {
00886     int rc = FAILURE;
00887     Timer timer = Timer(command_timeout_ms);     // we might wait for incomplete incoming publishes to complete
00888     int int_duration = (duration == 0) ? -1 : (int)duration;
00889     int len = MQTTSNSerialize_disconnect(sendbuf, MAX_PACKET_SIZE, int_duration);
00890     if (len > 0)
00891         rc = sendPacket(len, timer);            // send the disconnect packet
00892 
00893     isconnected = false;
00894     return rc;
00895 }
00896 
00897 
00898 #endif