cc

Dependencies:   MQTTPacket FP

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTClient.h Source File

MQTTClient.h

00001 /*******************************************************************************
00002  * Copyright (c) 2014, 2015 IBM Corp.
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors:
00014  *    Ian Craggs - initial API and implementation and/or initial documentation
00015  *    Ian Craggs - fix for bug 458512 - QoS 2 messages
00016  *    Ian Craggs - fix for bug 460389 - send loop uses wrong length
00017  *    Ian Craggs - fix for bug 464169 - clearing subscriptions
00018  *    Ian Craggs - fix for bug 464551 - enums and ints can be different size
00019  *******************************************************************************/
00020 
00021 #if !defined(MQTTCLIENT_H)
00022 #define MQTTCLIENT_H
00023 
00024 #include "FP.h"
00025 #include "MQTTPacket.h"
00026 #include "stdio.h"
00027 #include "MQTTLogging.h"
00028 
00029 #if !defined(MQTTCLIENT_QOS1)
00030     #define MQTTCLIENT_QOS1 1
00031 #endif
00032 #if !defined(MQTTCLIENT_QOS2)
00033     #define MQTTCLIENT_QOS2 0
00034 #endif
00035 
00036 namespace MQTT
00037 {
00038 
00039 
00040 enum QoS { QOS0, QOS1, QOS2 };
00041 
00042 // all failure return codes must be negative
00043 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
00044 
00045 
00046 struct Message
00047 {
00048     enum QoS qos;
00049     bool retained;
00050     bool dup;
00051     unsigned short id;
00052     void *payload;
00053     size_t payloadlen;
00054 };
00055 
00056 
00057 struct MessageData
00058 {
00059     MessageData(MQTTString &aTopicName, struct Message &aMessage)  : message(aMessage), topicName(aTopicName)
00060     { }
00061 
00062     struct Message &message;
00063     MQTTString &topicName;
00064 };
00065 
00066 
00067 class PacketId
00068 {
00069 public:
00070     PacketId()
00071     {
00072         next = 0;
00073     }
00074 
00075     int getNext()
00076     {
00077         return next = (next == MAX_PACKET_ID) ? 1 : ++next;
00078     }
00079 
00080 private:
00081     static const int MAX_PACKET_ID = 65535;
00082     int next;
00083 };
00084 
00085 
00086 /**
00087  * @class Client
00088  * @brief blocking, non-threaded MQTT client API
00089  *
00090  * This version of the API blocks on all method calls, until they are complete.  This means that only one
00091  * MQTT request can be in process at any one time.
00092  * @param Network a network class which supports send, receive
00093  * @param Timer a timer class with the methods:
00094  */
00095 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5>
00096 class Client
00097 {
00098 
00099 public:
00100 
00101     typedef void (*messageHandler)(MessageData&);
00102 
00103     /** Construct the client
00104      *  @param network - pointer to an instance of the Network class - must be connected to the endpoint
00105      *      before calling MQTT connect
00106      *  @param limits an instance of the Limit class - to alter limits as required
00107      */
00108     Client(Network& network, unsigned int command_timeout_ms = 30000);
00109 
00110     /** Set the default message handling callback - used for any message which does not match a subscription message handler
00111      *  @param mh - pointer to the callback function
00112      */
00113     void setDefaultMessageHandler(messageHandler mh)
00114     {
00115         defaultMessageHandler.attach(mh);
00116     }
00117 
00118     /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
00119      *  The nework object must be connected to the network endpoint before calling this
00120      *  Default connect options are used
00121      *  @return success code -
00122      */
00123     int connect();
00124     
00125         /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
00126      *  The nework object must be connected to the network endpoint before calling this
00127      *  @param options - connect options
00128      *  @return success code -
00129      */
00130     int connect(MQTTPacket_connectData& options);
00131 
00132     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00133      *  @param topic - the topic to publish to
00134      *  @param message - the message to send
00135      *  @return success code -
00136      */
00137     int publish(const char* topicName, Message& message);
00138     
00139     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00140      *  @param topic - the topic to publish to
00141      *  @param payload - the data to send
00142      *  @param payloadlen - the length of the data
00143      *  @param qos - the QoS to send the publish at
00144      *  @param retained - whether the message should be retained
00145      *  @return success code -
00146      */
00147     int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false);
00148     
00149     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00150      *  @param topic - the topic to publish to
00151      *  @param payload - the data to send
00152      *  @param payloadlen - the length of the data
00153      *  @param id - the packet id used - returned 
00154      *  @param qos - the QoS to send the publish at
00155      *  @param retained - whether the message should be retained
00156      *  @return success code -
00157      */
00158     int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false);
00159 
00160     /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
00161      *  @param topicFilter - a topic pattern which can include wildcards
00162      *  @param qos - the MQTT QoS to subscribe at
00163      *  @param mh - the callback function to be invoked when a message is received for this subscription
00164      *  @return success code -
00165      */
00166     int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh);
00167 
00168     /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback
00169      *  @param topicFilter - a topic pattern which can include wildcards
00170      *  @return success code -
00171      */
00172     int unsubscribe(const char* topicFilter);
00173 
00174     /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state
00175      *  @return success code -
00176      */
00177     int disconnect();
00178 
00179     /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive
00180      *  yield can be called if no other MQTT operation is needed.  This will also allow messages to be
00181      *  received.
00182      *  @param timeout_ms the time to wait, in milliseconds
00183      *  @return success code - on failure, this means the client has disconnected
00184      */
00185     int yield(unsigned long timeout_ms = 1000L);
00186 
00187     /** Is the client connected?
00188      *  @return flag - is the client connected or not?
00189      */
00190     bool isConnected()
00191     {
00192         return isconnected;
00193     }
00194 
00195 private:
00196 
00197     void cleanSession();
00198     int cycle(Timer& timer);
00199     int waitfor(int packet_type, Timer& timer);
00200     int keepalive();
00201     int publish(int len, Timer& timer, enum QoS qos);
00202 
00203     int decodePacket(int* value, int timeout);
00204     int readPacket(Timer& timer);
00205     int sendPacket(int length, Timer& timer);
00206     int deliverMessage(MQTTString& topicName, Message& message);
00207     bool isTopicMatched(char* topicFilter, MQTTString& topicName);
00208 
00209     Network& ipstack;
00210     unsigned long command_timeout_ms;
00211 
00212     unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
00213     unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
00214 
00215     Timer last_sent, last_received;
00216     unsigned int keepAliveInterval;
00217     bool ping_outstanding;
00218     bool cleansession;
00219 
00220     PacketId packetid;
00221 
00222     struct MessageHandlers
00223     {
00224         const char* topicFilter;
00225         FP<void, MessageData&> fp;
00226     } messageHandlers[MAX_MESSAGE_HANDLERS];      // Message handlers are indexed by subscription topic
00227 
00228     FP<void, MessageData&> defaultMessageHandler;
00229 
00230     bool isconnected;
00231 
00232 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00233     unsigned char pubbuf[MAX_MQTT_PACKET_SIZE];  // store the last publish for sending on reconnect
00234     int inflightLen;
00235     unsigned short inflightMsgid;
00236     enum QoS inflightQoS;
00237 #endif
00238 
00239 #if MQTTCLIENT_QOS2
00240     bool pubrel;
00241     #if !defined(MAX_INCOMING_QOS2_MESSAGES)
00242         #define MAX_INCOMING_QOS2_MESSAGES 10
00243     #endif
00244     unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES];
00245     bool isQoS2msgidFree(unsigned short id);
00246     bool useQoS2msgid(unsigned short id);
00247     void freeQoS2msgid(unsigned short id);
00248 #endif
00249 
00250 };
00251 
00252 }
00253 
00254 
00255 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
00256 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::cleanSession() 
00257 {
00258     ping_outstanding = false;
00259     for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00260         messageHandlers[i].topicFilter = 0;
00261     isconnected = false;
00262 
00263 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00264     inflightMsgid = 0;
00265     inflightQoS = QOS0;
00266 #endif
00267 
00268 #if MQTTCLIENT_QOS2
00269     pubrel = false;
00270     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00271         incomingQoS2messages[i] = 0;
00272 #endif
00273 }
00274 
00275 
00276 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
00277 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms)  : ipstack(network), packetid()
00278 {
00279     this->command_timeout_ms = command_timeout_ms;
00280     cleanSession();
00281 }
00282 
00283 
00284 #if MQTTCLIENT_QOS2
00285 template<class Network, class Timer, int a, int b>
00286 bool MQTT::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id)
00287 {
00288     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00289     {
00290         if (incomingQoS2messages[i] == id)
00291             return false;
00292     }
00293     return true;
00294 }
00295 
00296 
00297 template<class Network, class Timer, int a, int b>
00298 bool MQTT::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id)
00299 {
00300     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00301     {
00302         if (incomingQoS2messages[i] == 0)
00303         {
00304             incomingQoS2messages[i] = id;
00305             return true;
00306         }
00307     }
00308     return false;
00309 }
00310 
00311 
00312 template<class Network, class Timer, int a, int b>
00313 void MQTT::Client<Network, Timer, a, b>::freeQoS2msgid(unsigned short id)
00314 {
00315     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00316     {
00317         if (incomingQoS2messages[i] == id)
00318         {
00319             incomingQoS2messages[i] = 0;
00320             return;
00321         }
00322     }
00323 }
00324 #endif
00325 
00326 
00327 template<class Network, class Timer, int a, int b>
00328 int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer)
00329 {
00330     int rc = FAILURE,
00331         sent = 0;
00332 
00333     while (sent < length && !timer.expired())
00334     {
00335         rc = ipstack.write(&sendbuf[sent], length - sent, timer.left_ms());
00336         if (rc < 0)  // there was an error writing the data
00337             break;
00338         sent += rc;
00339     }
00340     if (sent == length)
00341     {
00342         if (this->keepAliveInterval > 0)
00343             last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet
00344         rc = SUCCESS;
00345     }
00346     else
00347         rc = FAILURE;
00348         
00349 #if defined(MQTT_DEBUG)
00350     char printbuf[150];
00351     DEBUG("Rc %d from sending packet %s\n", rc, MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length));
00352 #endif
00353     return rc;
00354 }
00355 
00356 
00357 template<class Network, class Timer, int a, int b>
00358 int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout)
00359 {
00360     unsigned char c;
00361     int multiplier = 1;
00362     int len = 0;
00363     const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
00364 
00365     *value = 0;
00366     do
00367     {
00368         int rc = MQTTPACKET_READ_ERROR;
00369 
00370         if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
00371         {
00372             rc = MQTTPACKET_READ_ERROR; /* bad data */
00373             goto exit;
00374         }
00375         rc = ipstack.read(&c, 1, timeout);
00376         if (rc != 1)
00377             goto exit;
00378         *value += (c & 127) * multiplier;
00379         multiplier *= 128;
00380     } while ((c & 128) != 0);
00381 exit:
00382     return len;
00383 }
00384 
00385 
00386 /**
00387  * If any read fails in this method, then we should disconnect from the network, as on reconnect
00388  * the packets can be retried.
00389  * @param timeout the max time to wait for the packet read to complete, in milliseconds
00390  * @return the MQTT packet type, or -1 if none
00391  */
00392 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00393 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::readPacket(Timer& timer)
00394 {
00395     int rc = FAILURE;
00396     MQTTHeader header = {0};
00397     int len = 0;
00398     int rem_len = 0;
00399 
00400     /* 1. read the header byte.  This has the packet type in it */
00401     if (ipstack.read(readbuf, 1, timer.left_ms()) != 1)
00402         goto exit;
00403 
00404     len = 1;
00405     /* 2. read the remaining length.  This is variable in itself */
00406     decodePacket(&rem_len, timer.left_ms());
00407     len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */
00408 
00409     if (rem_len > (MAX_MQTT_PACKET_SIZE - len))
00410     {
00411         rc = BUFFER_OVERFLOW;
00412         goto exit;
00413     }
00414 
00415     /* 3. read the rest of the buffer using a callback to supply the rest of the data */
00416     if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len))
00417         goto exit;
00418 
00419     header.byte = readbuf[0];
00420     rc = header.bits.type;
00421     if (this->keepAliveInterval > 0)
00422         last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet
00423 exit:
00424         
00425 #if defined(MQTT_DEBUG)
00426     if (rc >= 0)
00427     {
00428         char printbuf[50];
00429         DEBUG("Rc %d from receiving packet %s\n", rc, MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len));
00430     }
00431 #endif
00432     return rc;
00433 }
00434 
00435 
00436 // assume topic filter and name is in correct format
00437 // # can only be at end
00438 // + and # can only be next to separator
00439 template<class Network, class Timer, int a, int b>
00440 bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName)
00441 {
00442     char* curf = topicFilter;
00443     char* curn = topicName.lenstring.data;
00444     char* curn_end = curn + topicName.lenstring.len;
00445 
00446     while (*curf && curn < curn_end)
00447     {
00448         if (*curn == '/' && *curf != '/')
00449             break;
00450         if (*curf != '+' && *curf != '#' && *curf != *curn)
00451             break;
00452         if (*curf == '+')
00453         {   // skip until we meet the next separator, or end of string
00454             char* nextpos = curn + 1;
00455             while (nextpos < curn_end && *nextpos != '/')
00456                 nextpos = ++curn + 1;
00457         }
00458         else if (*curf == '#')
00459             curn = curn_end - 1;    // skip until end of string
00460         curf++;
00461         curn++;
00462     };
00463 
00464     return (curn == curn_end) && (*curf == '\0');
00465 }
00466 
00467 
00468 
00469 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
00470 int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message)
00471 {
00472     int rc = FAILURE;
00473 
00474     // we have to find the right message handler - indexed by topic
00475     for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00476     {
00477         if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) ||
00478                 isTopicMatched((char*)messageHandlers[i].topicFilter, topicName)))
00479         {
00480             if (messageHandlers[i].fp.attached())
00481             {
00482                 MessageData md(topicName, message);
00483                 messageHandlers[i].fp(md);
00484                 rc = SUCCESS;
00485             }
00486         }
00487     }
00488 
00489     if (rc == FAILURE && defaultMessageHandler.attached())
00490     {
00491         MessageData md(topicName, message);
00492         defaultMessageHandler(md);
00493         rc = SUCCESS;
00494     }
00495 
00496     return rc;
00497 }
00498 
00499 
00500 
00501 template<class Network, class Timer, int a, int b>
00502 int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms)
00503 {
00504     int rc = SUCCESS;
00505     Timer timer = Timer();
00506 
00507     timer.countdown_ms(timeout_ms);
00508     while (!timer.expired())
00509     {
00510         if (cycle(timer) < 0)
00511         {
00512             rc = FAILURE;
00513             break;
00514         }
00515     }
00516 
00517     return rc;
00518 }
00519 
00520 
00521 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00522 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer)
00523 {
00524     /* get one piece of work off the wire and one pass through */
00525 
00526     // read the socket, see what work is due
00527     int packet_type = readPacket(timer);
00528 
00529     int len = 0,
00530         rc = SUCCESS;
00531 
00532     switch (packet_type)
00533     {
00534         case FAILURE:
00535         case BUFFER_OVERFLOW:
00536             rc = packet_type;
00537             break;
00538         case CONNACK:
00539         case PUBACK:
00540         case SUBACK:
00541             break;
00542         case PUBLISH:
00543         {
00544             MQTTString topicName = MQTTString_initializer;
00545             Message msg;
00546             int intQoS;
00547             if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName,
00548                                  (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00549                 goto exit;
00550             msg.qos = (enum QoS)intQoS;
00551 #if MQTTCLIENT_QOS2
00552             if (msg.qos != QOS2)
00553 #endif
00554                 deliverMessage(topicName, msg);
00555 #if MQTTCLIENT_QOS2
00556             else if (isQoS2msgidFree(msg.id))
00557             {
00558                 if (useQoS2msgid(msg.id))
00559                     deliverMessage(topicName, msg);
00560                 else
00561                     WARN("Maximum number of incoming QoS2 messages exceeded");
00562             }
00563 #endif
00564 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00565             if (msg.qos != QOS0)
00566             {
00567                 if (msg.qos == QOS1)
00568                     len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id);
00569                 else if (msg.qos == QOS2)
00570                     len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id);
00571                 if (len <= 0)
00572                     rc = FAILURE;
00573                 else
00574                     rc = sendPacket(len, timer);
00575                 if (rc == FAILURE)
00576                     goto exit; // there was a problem
00577             }
00578             break;
00579 #endif
00580         }
00581 #if MQTTCLIENT_QOS2
00582         case PUBREC:
00583         case PUBREL:
00584             unsigned short mypacketid;
00585             unsigned char dup, type;
00586             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00587                 rc = FAILURE;
00588             else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, 
00589                         (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0)
00590                 rc = FAILURE;
00591             else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet
00592                 rc = FAILURE; // there was a problem
00593             if (rc == FAILURE)
00594                 goto exit; // there was a problem
00595             if (packet_type == PUBREL)
00596                 freeQoS2msgid(mypacketid);
00597             break;
00598             
00599         case PUBCOMP:
00600             break;
00601 #endif
00602         case PINGRESP:
00603             ping_outstanding = false;
00604             break;
00605     }
00606     keepalive();
00607 exit:
00608     if (rc == SUCCESS)
00609         rc = packet_type;
00610     return rc;
00611 }
00612 
00613 
00614 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00615 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive()
00616 {
00617     int rc = FAILURE;
00618 
00619     if (keepAliveInterval == 0)
00620     {
00621         rc = SUCCESS;
00622         goto exit;
00623     }
00624 
00625     if (last_sent.expired() || last_received.expired())
00626     {
00627         if (!ping_outstanding)
00628         {
00629             Timer timer(1000);
00630             int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
00631             if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet
00632                 ping_outstanding = true;
00633         }
00634     }
00635 
00636 exit:
00637     return rc;
00638 }
00639 
00640 
00641 // only used in single-threaded mode where one command at a time is in process
00642 template<class Network, class Timer, int a, int b>
00643 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer)
00644 {
00645     int rc = FAILURE;
00646 
00647     do
00648     {
00649         if (timer.expired())
00650             break; // we timed out
00651     }
00652     while ((rc = cycle(timer)) != packet_type);
00653 
00654     return rc;
00655 }
00656 
00657 
00658 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00659 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options)
00660 {
00661     Timer connect_timer(command_timeout_ms);
00662     int rc = FAILURE;
00663     int len = 0;
00664 
00665     if (isconnected) // don't send connect packet again if we are already connected
00666         goto exit;
00667 
00668     this->keepAliveInterval = options.keepAliveInterval;
00669     this->cleansession = options.cleansession;
00670     if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0)
00671         goto exit;
00672     if ((rc = sendPacket(len, connect_timer)) != SUCCESS)  // send the connect packet
00673         goto exit; // there was a problem
00674 
00675     if (this->keepAliveInterval > 0)
00676         last_received.countdown(this->keepAliveInterval);
00677     // this will be a blocking call, wait for the connack
00678     if (waitfor(CONNACK, connect_timer) == CONNACK)
00679     {
00680         unsigned char connack_rc = 255;
00681         bool sessionPresent = false;
00682         if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
00683             rc = connack_rc;
00684         else
00685             rc = FAILURE;
00686     }
00687     else
00688         rc = FAILURE;
00689 
00690 #if MQTTCLIENT_QOS2
00691     // resend any inflight publish
00692     if (inflightMsgid > 0 && inflightQoS == QOS2 && pubrel)
00693     {
00694         if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0)
00695             rc = FAILURE;
00696         else
00697             rc = publish(len, connect_timer, inflightQoS);
00698     }
00699     else
00700 #endif
00701 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00702     if (inflightMsgid > 0)
00703     {
00704         memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE);
00705         rc = publish(inflightLen, connect_timer, inflightQoS);
00706     }
00707 #endif
00708 
00709 exit:
00710     if (rc == SUCCESS)
00711         isconnected = true;
00712     return rc;
00713 }
00714 
00715 
00716 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00717 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect()
00718 {
00719     MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
00720     return connect(default_options);
00721 }
00722 
00723 
00724 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
00725 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
00726 {
00727     int rc = FAILURE;
00728     Timer timer(command_timeout_ms);
00729     int len = 0;
00730     MQTTString topic = {(char*)topicFilter, {0, 0}};
00731 
00732     if (!isconnected)
00733         goto exit;
00734 
00735     len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
00736     if (len <= 0)
00737         goto exit;
00738     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
00739         goto exit;             // there was a problem
00740 
00741     if (waitfor(SUBACK, timer) == SUBACK)      // wait for suback
00742     {
00743         int count = 0, grantedQoS = -1;
00744         unsigned short mypacketid;
00745         if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
00746             rc = grantedQoS; // 0, 1, 2 or 0x80
00747         if (rc != 0x80)
00748         {
00749             for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00750             {
00751                 if (messageHandlers[i].topicFilter == 0)
00752                 {
00753                     messageHandlers[i].topicFilter = topicFilter;
00754                     messageHandlers[i].fp.attach(messageHandler);
00755                     rc = 0;
00756                     break;
00757                 }
00758             }
00759         }
00760     }
00761     else
00762         rc = FAILURE;
00763 
00764 exit:
00765     if (rc != SUCCESS)
00766         cleanSession();
00767     return rc;
00768 }
00769 
00770 
00771 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
00772 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter)
00773 {
00774     int rc = FAILURE;
00775     Timer timer(command_timeout_ms);
00776     MQTTString topic = {(char*)topicFilter, {0, 0}};
00777     int len = 0;
00778 
00779     if (!isconnected)
00780         goto exit;
00781 
00782     if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0)
00783         goto exit;
00784     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet
00785         goto exit; // there was a problem
00786 
00787     if (waitfor(UNSUBACK, timer) == UNSUBACK)
00788     {
00789         unsigned short mypacketid;  // should be the same as the packetid above
00790         if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
00791         {
00792             rc = 0;
00793 
00794             // remove the subscription message handler associated with this topic, if there is one
00795             for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00796             {
00797                 if (messageHandlers[i].topicFilter && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0)
00798                 {
00799                     messageHandlers[i].topicFilter = 0;
00800                     break;
00801                 }
00802             }
00803         }
00804     }
00805     else
00806         rc = FAILURE;
00807 
00808 exit:
00809     if (rc != SUCCESS)
00810         cleanSession();
00811     return rc;
00812 }
00813 
00814 
00815 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00816 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos)
00817 {
00818     int rc;
00819 
00820     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet
00821         goto exit; // there was a problem
00822 
00823 #if MQTTCLIENT_QOS1
00824     if (qos == QOS1)
00825     {
00826         if (waitfor(PUBACK, timer) == PUBACK)
00827         {
00828             unsigned short mypacketid;
00829             unsigned char dup, type;
00830             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00831                 rc = FAILURE;
00832             else if (inflightMsgid == mypacketid)
00833                 inflightMsgid = 0;
00834         }
00835         else
00836             rc = FAILURE;
00837     }
00838 #elif MQTTCLIENT_QOS2
00839     else if (qos == QOS2)
00840     {
00841         if (waitfor(PUBCOMP, timer) == PUBCOMP)
00842         {
00843             unsigned short mypacketid;
00844             unsigned char dup, type;
00845             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00846                 rc = FAILURE;
00847             else if (inflightMsgid == mypacketid)
00848                 inflightMsgid = 0;
00849         }
00850         else
00851             rc = FAILURE;
00852     }
00853 #endif
00854 
00855 exit:
00856     if (rc != SUCCESS)
00857         cleanSession();
00858     return rc;
00859 }
00860 
00861 
00862 
00863 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00864 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained)
00865 {
00866     int rc = FAILURE;
00867     Timer timer(command_timeout_ms);
00868     MQTTString topicString = MQTTString_initializer;
00869     int len = 0;
00870 
00871     if (!isconnected)
00872         goto exit;
00873 
00874     topicString.cstring = (char*)topicName;
00875 
00876 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00877     if (qos == QOS1 || qos == QOS2)
00878         id = packetid.getNext();
00879 #endif
00880 
00881     len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id,
00882               topicString, (unsigned char*)payload, payloadlen);
00883     if (len <= 0)
00884         goto exit;
00885 
00886 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00887     if (!cleansession)
00888     {
00889         memcpy(pubbuf, sendbuf, len);
00890         inflightMsgid = id;
00891         inflightLen = len;
00892         inflightQoS = qos;
00893 #if MQTTCLIENT_QOS2
00894         pubrel = false;
00895 #endif
00896     }
00897 #endif
00898 
00899     rc = publish(len, timer, qos);
00900 exit:
00901     return rc;
00902 }
00903 
00904 
00905 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00906 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained)
00907 {
00908     unsigned short id = 0;  // dummy - not used for anything
00909     return publish(topicName, payload, payloadlen, id, qos, retained);
00910 }
00911 
00912 
00913 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00914 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message)
00915 {
00916     return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained);
00917 }
00918 
00919 
00920 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00921 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect()
00922 {
00923     int rc = FAILURE;
00924     Timer timer(command_timeout_ms);            // we might wait for incomplete incoming publishes to complete
00925     int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE);
00926     if (len > 0)
00927         rc = sendPacket(len, timer);            // send the disconnect packet
00928 
00929     if (cleansession)
00930         cleanSession();
00931     else
00932         isconnected = false;
00933     return rc;
00934 }
00935 
00936 
00937 #endif