..

Dependencies:   FP

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTSNClient.h Source File

MQTTSNClient.h

00001 /*******************************************************************************
00002  * Copyright (c) 2014, 2015 IBM Corp.
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors:
00014  *    Ian Craggs - initial API and implementation and/or initial documentation
00015  *******************************************************************************/
00016 
00017 #if !defined(MQTTSNCLIENT_H)
00018 #define MQTTSNCLIENT_H
00019 
00020 #include "FP.h"
00021 #include "MQTTSNPacket.h"
00022 #include "MQTTLogging.h"
00023 
00024 // Data limits
00025 #if !defined(MAX_REGISTRATIONS)
00026     #define MAX_REGISTRATIONS 5
00027 #endif
00028 #if !defined(MAX_REGISTRATION_TOPIC_NAME_LENGTH)
00029     #define MAX_REGISTRATION_TOPIC_NAME_LENGTH 20
00030 #endif
00031 #if !defined(MAX_INCOMING_QOS2_MESSAGES)
00032     #define MAX_INCOMING_QOS2_MESSAGES 10
00033 #endif
00034 
00035 #if !defined(MQTTSNCLIENT_QOS1)
00036     #define MQTTSNCLIENT_QOS1 1
00037 #endif
00038 #if !defined(MQTTSNCLIENT_QOS2)
00039     #define MQTTSNCLIENT_QOS2 1
00040 #endif
00041 
00042 namespace MQTTSN
00043 {
00044 
00045 
00046 enum QoS { QOS0, QOS1, QOS2 };
00047 
00048 // all failure return codes must be negative
00049 enum returnCode { MAX_SUBSCRIPTIONS_EXCEEDED = -3, BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
00050 
00051 
00052 struct Message
00053 {
00054     enum QoS qos;
00055     bool retained;
00056     bool dup;
00057     unsigned short id;
00058     void *payload;
00059     size_t payloadlen;
00060 };
00061 
00062 
00063 struct MessageData
00064 {
00065     MessageData(MQTTSN_topicid &aTopic, struct Message &aMessage)  : message(aMessage), topic(aTopic)
00066     { }
00067 
00068     struct Message &message;
00069     MQTTSN_topicid &topic;
00070 };
00071 
00072 
00073 class PacketId
00074 {
00075 public:
00076     PacketId()
00077     {
00078         next = 0;
00079     }
00080 
00081     int getNext()
00082     {
00083         return next = (next == MAX_PACKET_ID) ? 1 : ++next;
00084     }
00085 
00086 private:
00087     static const int MAX_PACKET_ID = 65535;
00088     int next;
00089 };
00090 
00091 
00092 /**
00093  * @class MQTTSNClient
00094  * @brief blocking, non-threaded MQTTSN client API
00095  *
00096  * This version of the API blocks on all method calls, until they are complete.  This means that only one
00097  * MQTT request can be in process at any one time.
00098  * @param Network a network class which supports send, receive
00099  * @param Timer a timer class with the methods:
00100  */
00101 template<class Network, class Timer, int MAX_PACKET_SIZE = 1024, int MAX_MESSAGE_HANDLERS = 5>
00102 class Client
00103 {
00104 
00105 public:
00106 
00107     typedef void (*messageHandler)(MessageData&);
00108 
00109     /** Construct the client
00110      *  @param network - pointer to an instance of the Network class - must be connected to the endpoint
00111      *      before calling MQTT connect
00112      *  @param limits an instance of the Limit class - to alter limits as required
00113      */
00114     Client(Network& network, unsigned int command_timeout_ms = 30000);
00115 
00116     /** Set the default message handling callback - used for any message which does not match a subscription message handler
00117      *  @param mh - pointer to the callback function
00118      */
00119     void setDefaultMessageHandler(messageHandler mh)
00120     {
00121         defaultMessageHandler.attach(mh);
00122     }
00123 
00124     /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
00125      *  The nework object must be connected to the network endpoint before calling this
00126      *  Default connect options are used
00127      *  @return success code -
00128      */
00129     int connect();
00130     
00131     /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
00132      *  The nework object must be connected to the network endpoint before calling this
00133      *  @param options - connect options
00134      *  @return success code -
00135      */
00136     int connect(MQTTSNPacket_connectData& options);
00137 
00138     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00139      *  @param topic - the topic to publish to
00140      *  @param message - the message to send
00141      *  @return success code -
00142      */
00143     int publish(MQTTSN_topicid& topic, Message& message);
00144     
00145     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00146      *  @param topic - the topic to publish to
00147      *  @param payload - the data to send
00148      *  @param payloadlen - the length of the data
00149      *  @param qos - the QoS to send the publish at
00150      *  @param retained - whether the message should be retained
00151      *  @return success code -
00152      */
00153     int publish(MQTTSN_topicid &topic, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false);
00154     
00155     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00156      *  @param topic - the topic to publish to
00157      *  @param payload - the data to send
00158      *  @param payloadlen - the length of the data
00159      *  @param id - the packet id used - returned 
00160      *  @param qos - the QoS to send the publish at
00161      *  @param retained - whether the message should be retained
00162      *  @return success code -
00163      */
00164     int publish(MQTTSN_topicid& topic, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false);
00165     
00166     /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
00167      *  @param topicFilter - a topic pattern which can include wildcards
00168      *  @param qos - the MQTT QoS to subscribe at
00169      *  @param mh - the callback function to be invoked when a message is received for this subscription
00170      *  @return success code -
00171      */
00172     int subscribe(MQTTSN_topicid& topicFilter, enum QoS qos, enum QoS &grantedQoS, messageHandler mh);
00173 
00174     /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback
00175      *  @param topicFilter - a topic pattern which can include wildcards
00176      *  @return success code -
00177      */
00178     int unsubscribe(MQTTSN_topicid& topicFilter);
00179 
00180     /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state
00181      *  @param duration - used for sleeping clients, 0 means no duration
00182      *  @return success code -
00183      */
00184     int disconnect(unsigned short duration = 0);
00185 
00186     /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive
00187      *  yield can be called if no other MQTT operation is needed.  This will also allow messages to be
00188      *  received.
00189      *  @param timeout_ms the time to wait, in milliseconds
00190      *  @return success code - on failure, this means the client has disconnected
00191      */
00192     int yield(unsigned long timeout_ms = 1000L);
00193 
00194     /** Is the client connected?
00195      *  @return flag - is the client connected or not?
00196      */
00197     bool isConnected()
00198     {
00199         return isconnected;
00200     }
00201     
00202 protected:
00203 
00204     int cycle(Timer& timer);
00205     int waitfor(int packet_type, Timer& timer);
00206 
00207 private:
00208 
00209     int keepalive();
00210     int publish(int len, Timer& timer, enum QoS qos);
00211 
00212     int readPacket(Timer& timer);
00213     int sendPacket(int length, Timer& timer);
00214     int deliverMessage(MQTTSN_topicid& topic, Message& message);
00215     bool isTopicMatched(char* topicFilter, MQTTSNString& topicName);
00216 
00217     Network& ipstack;
00218     unsigned long command_timeout_ms;
00219 
00220     unsigned char sendbuf[MAX_PACKET_SIZE];
00221     unsigned char readbuf[MAX_PACKET_SIZE];
00222 
00223     Timer last_sent, last_received;
00224     unsigned short duration;
00225     bool ping_outstanding;
00226     bool cleansession;
00227 
00228     PacketId packetid;
00229 
00230     struct MessageHandlers
00231     {
00232         MQTTSN_topicid* topicFilter;
00233         FP<void, MessageData&> fp;
00234     } messageHandlers[MAX_MESSAGE_HANDLERS];      // Message handlers are indexed by subscription topic
00235 
00236     FP<void, MessageData&> defaultMessageHandler;
00237 
00238     bool isconnected;
00239     
00240     struct Registrations
00241     {
00242         unsigned short id;
00243         char name[MAX_REGISTRATION_TOPIC_NAME_LENGTH];
00244     } registrations[MAX_REGISTRATIONS];
00245 
00246 #if MQTTSNCLIENT_QOS1 || MQTTSNCLIENT_QOS2
00247     unsigned char pubbuf[MAX_PACKET_SIZE];  // store the last publish for sending on reconnect
00248     int inflightLen;
00249     unsigned short inflightMsgid;
00250     enum QoS inflightQoS;
00251 #endif
00252 
00253 #if MQTTSNCLIENT_QOS2
00254     bool pubrel;
00255     unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES];
00256     bool isQoS2msgidFree(unsigned short id);
00257     bool useQoS2msgid(unsigned short id);
00258 #endif
00259 
00260 };
00261 
00262 }
00263 
00264 
00265 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
00266 MQTTSN::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms)  : ipstack(network), packetid()
00267 {
00268     last_sent = Timer();
00269     last_received = Timer();
00270     ping_outstanding = false;
00271     for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00272         messageHandlers[i].topicFilter = 0;
00273     this->command_timeout_ms = command_timeout_ms;
00274     isconnected = false;
00275     
00276 #if MQTTSNCLIENT_QOS1 || MQTTSNCLIENT_QOS2
00277     inflightMsgid = 0;
00278     inflightQoS = QOS0;
00279 #endif
00280 
00281     
00282 #if MQTTSNCLIENT_QOS2
00283     pubrel = false;
00284     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00285         incomingQoS2messages[i] = 0;
00286 #endif
00287 }
00288 
00289 #if MQTTSNCLIENT_QOS2
00290 template<class Network, class Timer, int a, int b>
00291 bool MQTTSN::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id)
00292 {
00293     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00294     {
00295         if (incomingQoS2messages[i] == id)
00296             return false;
00297     }
00298     return true;
00299 }
00300 
00301 
00302 template<class Network, class Timer, int a, int b>
00303 bool MQTTSN::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id)
00304 {
00305     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00306     {
00307         if (incomingQoS2messages[i] == 0)
00308         {
00309             incomingQoS2messages[i] = id;
00310             return true;
00311         }
00312     }
00313     return false;
00314 }
00315 #endif
00316 
00317 
00318 template<class Network, class Timer, int a, int b>
00319 int MQTTSN::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer)
00320 {
00321     int rc = FAILURE,
00322         sent = 0;
00323 
00324     do
00325     {
00326         sent = ipstack.write(sendbuf, length, timer.left_ms());
00327         printf("sendPacket, rc %d from write of %d bytes\n", sent, length);
00328         if (sent < 0)  // there was an error writing the data
00329             break;
00330     }
00331     while (sent != length && !timer.expired());
00332     
00333     if (sent == length)
00334     {
00335         if (this->duration > 0)
00336             last_sent.countdown(this->duration); // record the fact that we have successfully sent the packet
00337         rc = SUCCESS;
00338     }
00339     else
00340         rc = FAILURE;
00341         
00342 #if defined(MQTT_DEBUG)
00343     char printbuf[50];
00344     DEBUG("Rc %d from sending packet %s\n", rc, MQTTPacket_toString(printbuf, sizeof(printbuf), sendbuf, length));
00345 #endif
00346     return rc;
00347 }
00348 
00349 
00350 /**
00351  * If any read fails in this method, then we should disconnect from the network, as on reconnect
00352  * the packets can be retried.
00353  * @param timeout the max time to wait for the packet read to complete, in milliseconds
00354  * @return the MQTT packet type, or -1 if none
00355  */
00356 template<class Network, class Timer, int MAX_PACKET_SIZE, int b>
00357 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::readPacket(Timer& timer)
00358 {
00359     int rc = FAILURE;
00360     int len = 0;  // the length of the whole packet including length field 
00361     int lenlen = 0;
00362     int datalen = 0;
00363 
00364     #define MQTTSN_MIN_PACKET_LENGTH 3
00365     // 1. read the packet, datagram style 
00366     if ((len = ipstack.read(readbuf, MAX_PACKET_SIZE, timer.left_ms())) < MQTTSN_MIN_PACKET_LENGTH)
00367         goto exit;
00368         
00369     // 2. read the length.  This is variable in itself 
00370     lenlen = MQTTSNPacket_decode(readbuf, len, &datalen);
00371     if (datalen != len)
00372         goto exit; // there was an error 
00373         
00374     rc = readbuf[lenlen];
00375     if (this->duration > 0)
00376         last_received.countdown(this->duration); // record the fact that we have successfully received a packet
00377 exit:
00378         
00379 #if defined(MQTT_DEBUG)
00380     char printbuf[50];
00381     DEBUG("Rc %d from receiving packet %s\n", rc, MQTTPacket_toString(printbuf, sizeof(printbuf), readbuf, len));
00382 #endif
00383     return rc;
00384 }
00385 
00386 
00387 // assume topic filter and name is in correct format
00388 // # can only be at end
00389 // + and # can only be next to separator
00390 template<class Network, class Timer, int a, int b>
00391 bool MQTTSN::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTSNString& topicName)
00392 {
00393     char* curf = topicFilter;
00394     char* curn = topicName.lenstring.data;
00395     char* curn_end = curn + topicName.lenstring.len;
00396 
00397     while (*curf && curn < curn_end)
00398     {
00399         if (*curn == '/' && *curf != '/')
00400             break;
00401         if (*curf != '+' && *curf != '#' && *curf != *curn)
00402             break;
00403         if (*curf == '+')
00404         {   // skip until we meet the next separator, or end of string
00405             char* nextpos = curn + 1;
00406             while (nextpos < curn_end && *nextpos != '/')
00407                 nextpos = ++curn + 1;
00408         }
00409         else if (*curf == '#')
00410             curn = curn_end - 1;    // skip until end of string
00411         curf++;
00412         curn++;
00413     };
00414 
00415     return (curn == curn_end) && (*curf == '\0');
00416 }
00417 
00418 
00419 
00420 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
00421 int MQTTSN::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTSN_topicid& topic, Message& message)
00422 {
00423     int rc = FAILURE;
00424     printf("deliverMessage topic id is %d\n", topic.data.id);
00425 
00426     // we have to find the right message handler - indexed by topic
00427     for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00428     {
00429         if (messageHandlers[i].topicFilter != 0)
00430             printf("messageHandler %d topic id is %d\n", i, messageHandlers[i].topicFilter->data.id);
00431         if (messageHandlers[i].topicFilter != 0 && (topic.data.id == messageHandlers[i].topicFilter->data.id))
00432         //MQTTSNtopic_equals(&topic, messageHandlers[i].topicFilter) ||
00433          //       isTopicMatched(messageHandlers[i].topicFilter, topic)))
00434         {
00435             if (messageHandlers[i].fp.attached())
00436             {
00437                 MessageData md(topic, message);
00438                 messageHandlers[i].fp(md);
00439                 rc = SUCCESS;
00440             }
00441         }
00442     }
00443 
00444     if (rc == FAILURE && defaultMessageHandler.attached())
00445     {
00446         MessageData md(topic, message);
00447         defaultMessageHandler(md);
00448         rc = SUCCESS;
00449     }
00450 
00451     return rc;
00452 }
00453 
00454 
00455 
00456 template<class Network, class Timer, int a, int b>
00457 int MQTTSN::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms)
00458 {
00459     int rc = SUCCESS;
00460     Timer timer = Timer();
00461 
00462     timer.countdown_ms(timeout_ms);
00463     while (!timer.expired())
00464     {
00465         if (cycle(timer) == FAILURE)
00466         {
00467             rc = FAILURE;
00468             break;
00469         }
00470     }
00471 
00472     return rc;
00473 }
00474 
00475 
00476 template<class Network, class Timer, int MAX_PACKET_SIZE, int b>
00477 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::cycle(Timer& timer)
00478 {
00479     /* get one piece of work off the wire and one pass through */
00480 
00481     // read the socket, see what work is due
00482     unsigned short packet_type = readPacket(timer);
00483 
00484     int len = 0;
00485     int rc = SUCCESS;
00486 
00487     switch (packet_type)
00488     {
00489         case MQTTSN_CONNACK:
00490         case MQTTSN_PUBACK:
00491         case MQTTSN_SUBACK:
00492         case MQTTSN_REGACK:
00493             break;
00494         case MQTTSN_REGISTER:
00495         {
00496             unsigned short topicid, packetid;
00497             MQTTSNString topicName;
00498             unsigned char reg_rc = MQTTSN_RC_ACCEPTED;
00499             if (MQTTSNDeserialize_register(&topicid, &packetid, &topicName, readbuf, MAX_PACKET_SIZE) != 1)
00500                 goto exit;
00501             len = MQTTSNSerialize_regack(sendbuf, MAX_PACKET_SIZE, topicid, packetid, reg_rc);
00502             if (len <= 0)
00503                 rc = FAILURE;
00504             else
00505                 rc = sendPacket(len, timer);
00506             break;
00507         }
00508         case MQTTSN_PUBLISH:
00509             MQTTSN_topicid topicid;
00510             Message msg;
00511             if (MQTTSNDeserialize_publish((unsigned char*)&msg.dup, (int*)&msg.qos, (unsigned char*)&msg.retained, &msg.id, &topicid,
00512                                  (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_PACKET_SIZE) != 1)
00513                 goto exit;
00514 #if MQTTSNCLIENT_QOS2
00515             if (msg.qos != QOS2)
00516 #endif
00517                 deliverMessage(topicid, msg);
00518 #if MQTTSNCLIENT_QOS2
00519             else if (isQoS2msgidFree(msg.id))
00520             {
00521                 if (useQoS2msgid(msg.id))
00522                     deliverMessage(topicid, msg);
00523                 else
00524                     WARN("Maximum number of incoming QoS2 messages exceeded");
00525             }   
00526 #endif
00527 #if MQTTSNCLIENT_QOS1 || MQTTSNCLIENT_QOS2
00528             if (msg.qos != QOS0)
00529             {
00530                 if (msg.qos == QOS1)
00531                     len = MQTTSNSerialize_puback(sendbuf, MAX_PACKET_SIZE, topicid.data.id, msg.id, 0);
00532                 else if (msg.qos == QOS2)
00533                     len = MQTTSNSerialize_pubrec(sendbuf, MAX_PACKET_SIZE, msg.id);
00534                 if (len <= 0)
00535                     rc = FAILURE;
00536                 else
00537                 {
00538                     printf("sending puback len %d\n", len);
00539                     rc = sendPacket(len, timer);
00540                     printf("rc %d from sending puback\n", rc);
00541                 }
00542                 if (rc == FAILURE)
00543                     goto exit; // there was a problem
00544             }
00545             break;
00546 #endif
00547 #if MQTTSNCLIENT_QOS2
00548         case MQTTSN_PUBREC:
00549             unsigned short mypacketid;
00550             unsigned char type;
00551             if (MQTTSNDeserialize_ack(&type, &mypacketid, readbuf, MAX_PACKET_SIZE) != 1)
00552                 rc = FAILURE;
00553             else if ((len = MQTTSNSerialize_pubrel(sendbuf, MAX_PACKET_SIZE, mypacketid)) <= 0)
00554                 rc = FAILURE;
00555             else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet
00556                 rc = FAILURE; // there was a problem
00557             if (rc == FAILURE)
00558                 goto exit; // there was a problem
00559             break;
00560         case MQTTSN_PUBCOMP:
00561             break;
00562 #endif
00563         case MQTTSN_PINGRESP:
00564             ping_outstanding = false;
00565             break;
00566     }
00567     keepalive();
00568 exit:
00569     if (rc == SUCCESS)
00570         rc = packet_type;
00571     return rc;
00572 }
00573 
00574 
00575 template<class Network, class Timer, int MAX_PACKET_SIZE, int b>
00576 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::keepalive()
00577 {
00578     int rc = FAILURE;
00579 
00580     if (duration == 0)
00581     {
00582         rc = SUCCESS;
00583         goto exit;
00584     }
00585 
00586     if (last_sent.expired() || last_received.expired())
00587     {
00588         if (!ping_outstanding)
00589         {
00590             MQTTSNString clientid = MQTTSNString_initializer;
00591             Timer timer = Timer(1000);
00592             int len = MQTTSNSerialize_pingreq(sendbuf, MAX_PACKET_SIZE, clientid);
00593             if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet
00594                 ping_outstanding = true;
00595         }
00596     }
00597 
00598 exit:
00599     return rc;
00600 }
00601 
00602 
00603 // only used in single-threaded mode where one command at a time is in process
00604 template<class Network, class Timer, int a, int b>
00605 int MQTTSN::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer)
00606 {
00607     int rc = FAILURE;
00608 
00609     do
00610     {
00611         if (timer.expired())
00612             break; // we timed out
00613     }
00614     while ((rc = cycle(timer)) != packet_type);
00615 
00616     return rc;
00617 }
00618 
00619 
00620 template<class Network, class Timer, int MAX_PACKET_SIZE, int b>
00621 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::connect(MQTTSNPacket_connectData& options)
00622 {
00623     Timer connect_timer = Timer(command_timeout_ms);
00624     int rc = FAILURE;
00625     int len = 0;
00626 
00627     if (isconnected) // don't send connect packet again if we are already connected
00628         goto exit;
00629 
00630     this->duration = options.duration;
00631     this->cleansession = options.cleansession;
00632     if ((len = MQTTSNSerialize_connect(sendbuf, MAX_PACKET_SIZE, &options)) <= 0)
00633         goto exit;
00634     if ((rc = sendPacket(len, connect_timer)) != SUCCESS)  // send the connect packet
00635         goto exit; // there was a problem
00636 
00637     if (this->duration > 0)
00638         last_received.countdown(this->duration);
00639     // this will be a blocking call, wait for the connack
00640     if (waitfor(MQTTSN_CONNACK, connect_timer) == MQTTSN_CONNACK)
00641     {
00642         //unsigned char connack_rc = 255;
00643         int connack_rc = 255;
00644         if (MQTTSNDeserialize_connack(&connack_rc, readbuf, MAX_PACKET_SIZE) == 1)
00645             rc = connack_rc;
00646         else
00647             rc = FAILURE;
00648     }
00649     else
00650         rc = FAILURE;
00651         
00652 #if MQTTSNCLIENT_QOS2
00653     // resend an inflight publish
00654     if (inflightMsgid >0 && inflightQoS == QOS2 && pubrel)
00655     {
00656         if ((len = MQTTSNSerialize_pubrel(sendbuf, MAX_PACKET_SIZE, inflightMsgid)) <= 0)
00657             rc = FAILURE;
00658         else
00659             rc = publish(len, connect_timer, inflightQoS);
00660     }
00661     else
00662 #endif
00663 #if MQTTSNCLIENT_QOS1 || MQTTSNCLIENT_QOS2
00664     if (inflightMsgid > 0)
00665     {
00666         memcpy(sendbuf, pubbuf, MAX_PACKET_SIZE);
00667         rc = publish(inflightLen, connect_timer, inflightQoS);
00668     }
00669 #endif
00670 
00671 exit:
00672     if (rc == SUCCESS)
00673         isconnected = true;
00674     return rc;
00675 }
00676 
00677 
00678 template<class Network, class Timer, int MAX_PACKET_SIZE, int b>
00679 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::connect()
00680 {
00681     MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
00682     return connect(default_options);
00683 }
00684 
00685 
00686 template<class Network, class Timer, int MAX_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
00687 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(MQTTSN_topicid& topicFilter, enum QoS qos, enum QoS &grantedQoS, messageHandler messageHandler)
00688 {
00689     int rc = FAILURE;
00690     Timer timer = Timer(command_timeout_ms);
00691     int len = 0;
00692 /*
00693     if (!isconnected) {
00694         printf("hello 01");
00695         goto exit;
00696     }
00697 */        
00698     bool freeHandler = false;
00699     for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00700     {
00701         if (messageHandlers[i].topicFilter == 0)
00702         {
00703             freeHandler = true;
00704             break;
00705         }
00706     }
00707     if (!freeHandler)
00708     {
00709         printf("hello 02");                                 
00710         // No message handler free
00711         rc = MAX_SUBSCRIPTIONS_EXCEEDED;
00712         goto exit; 
00713     }
00714 
00715     len = MQTTSNSerialize_subscribe(sendbuf, MAX_PACKET_SIZE, 0, qos, packetid.getNext(), &topicFilter);
00716     if (len <= 0) {
00717         printf("hello 03");
00718         goto exit;
00719     }
00720     
00721     if ((rc = sendPacket(len, timer)) != SUCCESS) { // send the subscribe packet 
00722         printf("hello 04");
00723         goto exit;             // there was a problem
00724     }
00725 
00726     if (waitfor(MQTTSN_SUBACK, timer) == MQTTSN_SUBACK)      // wait for suback
00727     {
00728         unsigned short mypacketid;
00729         unsigned char suback_rc;
00730         if (MQTTSNDeserialize_suback((int*)&grantedQoS, &topicFilter.data.id, &mypacketid, &suback_rc, readbuf, MAX_PACKET_SIZE) != 1) {
00731             printf("hello 06");
00732             rc = FAILURE;
00733         }
00734         else
00735             rc = suback_rc;
00736         if (suback_rc == MQTTSN_RC_ACCEPTED)
00737         {
00738             for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00739             {
00740                 if (messageHandlers[i].topicFilter == 0)
00741                 {
00742                     messageHandlers[i].topicFilter = &topicFilter;
00743                     messageHandlers[i].fp.attach(messageHandler);
00744                     rc = 0;
00745                     break;
00746                 }
00747             }
00748         }
00749     }
00750     else {
00751         printf("hello 07");
00752         rc = FAILURE;
00753     }
00754 
00755 exit:
00756     if (rc != SUCCESS)
00757         isconnected = false;
00758     return rc;
00759 }
00760 
00761 
00762 template<class Network, class Timer, int MAX_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
00763 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(MQTTSN_topicid& topicFilter)
00764 {
00765     int rc = FAILURE;
00766     Timer timer = Timer(command_timeout_ms);
00767     int len = 0;
00768 
00769     if (!isconnected)
00770         goto exit;
00771 
00772     if ((len = MQTTSNSerialize_unsubscribe(sendbuf, MAX_PACKET_SIZE, packetid.getNext(), &topicFilter)) <= 0)
00773         goto exit;
00774     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet
00775         goto exit; // there was a problem
00776 
00777     if (waitfor(MQTTSN_UNSUBACK, timer) == MQTTSN_UNSUBACK)
00778     {
00779         unsigned short mypacketid;  // should be the same as the packetid above
00780         if (MQTTSNDeserialize_unsuback(&mypacketid, readbuf, MAX_PACKET_SIZE) == 1)
00781             rc = 0;
00782     }
00783     else
00784         rc = FAILURE;
00785 
00786 exit:
00787     if (rc != SUCCESS)
00788         isconnected = false;
00789     return rc;
00790 }
00791 
00792 
00793 template<class Network, class Timer, int MAX_PACKET_SIZE, int b>
00794 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos)
00795 {
00796     int rc;
00797     
00798     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet
00799         goto exit; // there was a problem
00800 
00801 #if MQTTSNCLIENT_QOS1 
00802     if (qos == QOS1)
00803     {
00804         if (waitfor(MQTTSN_PUBACK, timer) == MQTTSN_PUBACK)
00805         {
00806             unsigned short mypacketid;
00807             unsigned char type;
00808             if (MQTTSNDeserialize_ack(&type, &mypacketid, readbuf, MAX_PACKET_SIZE) != 1)
00809                 rc = FAILURE;
00810             else if (inflightMsgid == mypacketid)
00811                 inflightMsgid = 0;
00812         }
00813         else
00814             rc = FAILURE;
00815     }
00816 #elif MQTTSNCLIENT_QOS2
00817     else if (qos == QOS2)
00818     {
00819         if (waitfor(PUBCOMP, timer) == PUBCOMP)
00820         {
00821             unsigned short mypacketid;
00822             unsigned char type;
00823             if (MQTTDeserialize_ack(&type, &mypacketid, readbuf, MAX_PACKET_SIZE) != 1)
00824                 rc = FAILURE;
00825             else if (inflightMsgid == mypacketid)
00826                 inflightMsgid = 0;
00827         }
00828         else
00829             rc = FAILURE;
00830     }
00831 #endif
00832 
00833 exit:
00834     if (rc != SUCCESS)
00835         isconnected = false;
00836     return rc;
00837 }
00838 
00839 
00840 
00841 template<class Network, class Timer, int MAX_PACKET_SIZE, int b>
00842 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::publish(MQTTSN_topicid& topic, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained)
00843 {
00844     int rc = FAILURE;
00845     Timer timer = Timer(command_timeout_ms);
00846     int len = 0;
00847 
00848 /*
00849     if (!isconnected)
00850         goto exit;
00851 */
00852 
00853 #if MQTTSNCLIENT_QOS1 || MQTTSNCLIENT_QOS2
00854     if (qos == QOS1 || qos == QOS2)
00855         id = packetid.getNext();
00856 #endif
00857 
00858     len = MQTTSNSerialize_publish(sendbuf, MAX_PACKET_SIZE, 0, qos, retained, id,
00859               topic, (unsigned char*)payload, payloadlen);
00860     if (len <= 0)
00861         goto exit;
00862         
00863 #if MQTTSNCLIENT_QOS1 || MQTTSNCLIENT_QOS2
00864     if (!cleansession)
00865     {
00866         memcpy(pubbuf, sendbuf, len);
00867         inflightMsgid = id;
00868         inflightLen = len;
00869         inflightQoS = qos;
00870 #if MQTTSNCLIENT_QOS2
00871         pubrel = false;
00872 #endif
00873     }
00874 #endif
00875         
00876     rc = publish(len, timer, qos);
00877 exit:
00878     return rc;
00879 }
00880 
00881 
00882 template<class Network, class Timer, int MAX_PACKET_SIZE, int b>
00883 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::publish(MQTTSN_topicid& topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained)
00884 {
00885     unsigned short id = 0;  // dummy - not used for anything
00886     return publish(topicName, payload, payloadlen, id, qos, retained);
00887 }
00888 
00889 
00890 template<class Network, class Timer, int MAX_PACKET_SIZE, int b>
00891 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::publish(MQTTSN_topicid& topicName, Message& message)
00892 {
00893     return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained);
00894 }
00895 
00896 
00897 template<class Network, class Timer, int MAX_PACKET_SIZE, int b>
00898 int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::disconnect(unsigned short duration)
00899 {
00900     int rc = FAILURE;
00901     Timer timer = Timer(command_timeout_ms);     // we might wait for incomplete incoming publishes to complete
00902     int int_duration = (duration == 0) ? -1 : (int)duration;
00903     int len = MQTTSNSerialize_disconnect(sendbuf, MAX_PACKET_SIZE, int_duration);
00904     if (len > 0)
00905         rc = sendPacket(len, timer);            // send the disconnect packet
00906 
00907     isconnected = false;
00908     return rc;
00909 }
00910 
00911 
00912 #endif