Example of AWS IoT connection and Web Dashboard thru STM32 Nucleo evaluation board and mbed OS.

Dependencies:   X_NUCLEO_IKS01A1 mbed FP MQTTPacket DnsQuery ATParser

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTClient.h Source File

MQTTClient.h

00001 /*******************************************************************************
00002  * Copyright (c) 2014, 2015 IBM Corp.
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors: *
00014  *    Ian Craggs - initial API and implementation and/or initial documentation
00015  *    Ian Craggs - fix for bug 458512 - QoS 2 messages
00016  *    Ian Craggs - fix for bug 460389 - send loop uses wrong length
00017  *    Ian Craggs - fix for bug 464169 - clearing subscriptions
00018  *    Ian Craggs - fix for bug 464551 - enums and ints can be different size
00019  *******************************************************************************/
00020 
00021 #if !defined(MQTTCLIENT_H)
00022 #define MQTTCLIENT_H
00023 
00024 #include "FP.h"
00025 #include "MQTTPacket.h"
00026 #include "stdio.h"
00027 #include "MQTTLogging.h"
00028 
00029 #if !defined(MQTTCLIENT_QOS1)
00030     #define MQTTCLIENT_QOS1 1
00031 #endif
00032 #if !defined(MQTTCLIENT_QOS2)
00033     #define MQTTCLIENT_QOS2 0
00034 #endif
00035 
00036 namespace MQTT
00037 {
00038 
00039 
00040 enum QoS { QOS0, QOS1, QOS2 };
00041 
00042 // all failure return codes must be negative
00043 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
00044 
00045 
00046 struct Message
00047 {
00048     enum QoS qos;
00049     bool retained;
00050     bool dup;
00051     unsigned short id;
00052     void *payload;
00053     size_t payloadlen;
00054 };
00055 
00056 
00057 struct MessageData
00058 {
00059     MessageData(MQTTString &aTopicName, struct Message &aMessage)  : message(aMessage), topicName(aTopicName)
00060     { }
00061 
00062     struct Message &message;
00063     MQTTString &topicName;
00064 };
00065 
00066 
00067 class PacketId
00068 {
00069 public:
00070     PacketId()
00071     {
00072         next = 0;
00073     }
00074 
00075     int getNext()
00076     {
00077         return next = (next == MAX_PACKET_ID) ? 1 : ++next;
00078     }
00079 
00080 private:
00081     static const int MAX_PACKET_ID = 65535;
00082     int next;
00083 };
00084 
00085 
00086 /**
00087  * @class Client
00088  * @brief blocking, non-threaded MQTT client API
00089  *
00090  * This version of the API blocks on all method calls, until they are complete.  This means that only one
00091  * MQTT request can be in process at any one time.
00092  * @param Network a network class which supports send, receive
00093  * @param Timer a timer class with the methods:
00094  */
00095 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5>
00096 class Client
00097 {
00098 
00099 public:
00100 
00101     typedef void (*messageHandler)(MessageData&);
00102 
00103     /** Construct the client
00104      *  @param network - pointer to an instance of the Network class - must be connected to the endpoint
00105      *      before calling MQTT connect
00106      *  @param limits an instance of the Limit class - to alter limits as required
00107      */
00108     Client(Network& network, unsigned int command_timeout_ms = 30000);
00109 
00110     /** Set the default message handling callback - used for any message which does not match a subscription message handler
00111      *  @param mh - pointer to the callback function
00112      */
00113     void setDefaultMessageHandler(messageHandler mh)
00114     {
00115         defaultMessageHandler.attach(mh);
00116     }
00117 
00118     /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
00119      *  The nework object must be connected to the network endpoint before calling this
00120      *  Default connect options are used
00121      *  @return success code -
00122      */
00123     int connect();
00124     
00125         /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
00126      *  The nework object must be connected to the network endpoint before calling this
00127      *  @param options - connect options
00128      *  @return success code -
00129      */
00130     int connect(MQTTPacket_connectData& options);
00131 
00132     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00133      *  @param topic - the topic to publish to
00134      *  @param message - the message to send
00135      *  @return success code -
00136      */
00137     int publish(const char* topicName, Message& message);
00138     
00139     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00140      *  @param topic - the topic to publish to
00141      *  @param payload - the data to send
00142      *  @param payloadlen - the length of the data
00143      *  @param qos - the QoS to send the publish at
00144      *  @param retained - whether the message should be retained
00145      *  @return success code -
00146      */
00147     int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false);
00148     
00149     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00150      *  @param topic - the topic to publish to
00151      *  @param payload - the data to send
00152      *  @param payloadlen - the length of the data
00153      *  @param id - the packet id used - returned 
00154      *  @param qos - the QoS to send the publish at
00155      *  @param retained - whether the message should be retained
00156      *  @return success code -
00157      */
00158     int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false);
00159 
00160     /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
00161      *  @param topicFilter - a topic pattern which can include wildcards
00162      *  @param qos - the MQTT QoS to subscribe at
00163      *  @param mh - the callback function to be invoked when a message is received for this subscription
00164      *  @return success code -
00165      */
00166     int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh);
00167 
00168     /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback
00169      *  @param topicFilter - a topic pattern which can include wildcards
00170      *  @return success code -
00171      */
00172     int unsubscribe(const char* topicFilter);
00173 
00174     /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state
00175      *  @return success code -
00176      */
00177     int disconnect();
00178 
00179     /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive
00180      *  yield can be called if no other MQTT operation is needed.  This will also allow messages to be
00181      *  received.
00182      *  @param timeout_ms the time to wait, in milliseconds
00183      *  @return success code - on failure, this means the client has disconnected
00184      */
00185     int yield(unsigned long timeout_ms = 1000L);
00186 
00187     /** Is the client connected?
00188      *  @return flag - is the client connected or not?
00189      */
00190     bool isConnected()
00191     {
00192         return isconnected;
00193     }
00194 
00195 private:
00196 
00197     void cleanSession();
00198     int cycle(Timer& timer);
00199     int waitfor(int packet_type, Timer& timer);
00200     int keepalive();
00201     int publish(int len, Timer& timer, enum QoS qos);
00202 
00203     int decodePacket(int* value, int timeout);
00204     int readPacket(Timer& timer);
00205     int sendPacket(int length, Timer& timer);
00206     int deliverMessage(MQTTString& topicName, Message& message);
00207     bool isTopicMatched(char* topicFilter, MQTTString& topicName);
00208 
00209     Network& ipstack;
00210     unsigned long command_timeout_ms;
00211 
00212     unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
00213     unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
00214 
00215     Timer last_sent, last_received;
00216     unsigned int keepAliveInterval;
00217     bool ping_outstanding;
00218     bool cleansession;
00219 
00220     PacketId packetid;
00221 
00222     struct MessageHandlers
00223     {
00224         const char* topicFilter;
00225         FP<void, MessageData&> fp;
00226     } messageHandlers[MAX_MESSAGE_HANDLERS];      // Message handlers are indexed by subscription topic
00227 
00228     FP<void, MessageData&> defaultMessageHandler;
00229 
00230     bool isconnected;
00231 
00232 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00233     unsigned char pubbuf[MAX_MQTT_PACKET_SIZE];  // store the last publish for sending on reconnect
00234     int inflightLen;
00235     unsigned short inflightMsgid;
00236     enum QoS inflightQoS;
00237 #endif
00238 
00239 #if MQTTCLIENT_QOS2
00240     bool pubrel;
00241     #if !defined(MAX_INCOMING_QOS2_MESSAGES)
00242         #define MAX_INCOMING_QOS2_MESSAGES 10
00243     #endif
00244     unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES];
00245     bool isQoS2msgidFree(unsigned short id);
00246     bool useQoS2msgid(unsigned short id);
00247     void freeQoS2msgid(unsigned short id);
00248 #endif
00249 
00250 };
00251 
00252 }
00253 
00254 
00255 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
00256 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::cleanSession() 
00257 {
00258     ping_outstanding = false;
00259     for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00260         messageHandlers[i].topicFilter = 0;
00261     isconnected = false;
00262 
00263 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00264     inflightMsgid = 0;
00265     inflightQoS = QOS0;
00266 #endif
00267 
00268 #if MQTTCLIENT_QOS2
00269     pubrel = false;
00270     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00271         incomingQoS2messages[i] = 0;
00272 #endif
00273 }
00274 
00275 
00276 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
00277 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms)  : ipstack(network), packetid()
00278 {
00279     last_sent = Timer();
00280     last_received = Timer();
00281     this->command_timeout_ms = command_timeout_ms;
00282     cleanSession();
00283 }
00284 
00285 
00286 #if MQTTCLIENT_QOS2
00287 template<class Network, class Timer, int a, int b>
00288 bool MQTT::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id)
00289 {
00290     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00291     {
00292         if (incomingQoS2messages[i] == id)
00293             return false;
00294     }
00295     return true;
00296 }
00297 
00298 
00299 template<class Network, class Timer, int a, int b>
00300 bool MQTT::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id)
00301 {
00302     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00303     {
00304         if (incomingQoS2messages[i] == 0)
00305         {
00306             incomingQoS2messages[i] = id;
00307             return true;
00308         }
00309     }
00310     return false;
00311 }
00312 
00313 
00314 template<class Network, class Timer, int a, int b>
00315 void MQTT::Client<Network, Timer, a, b>::freeQoS2msgid(unsigned short id)
00316 {
00317     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00318     {
00319         if (incomingQoS2messages[i] == id)
00320         {
00321             incomingQoS2messages[i] = 0;
00322             return;
00323         }
00324     }
00325 }
00326 #endif
00327 
00328 
00329 template<class Network, class Timer, int a, int b>
00330 int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer)
00331 {
00332     int rc = FAILURE,
00333         sent = 0;
00334 
00335     while (sent < length && !timer.expired())
00336     {
00337         rc = ipstack.write(&sendbuf[sent], length - sent, timer.left_ms());
00338         if (rc < 0)  // there was an error writing the data
00339             break;
00340         sent += rc;
00341     }
00342     if (sent == length)
00343     {
00344         if (this->keepAliveInterval > 0)
00345             last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet
00346         rc = SUCCESS;
00347     }
00348     else
00349         rc = FAILURE;
00350         
00351 #if defined(MQTT_DEBUG)
00352     char printbuf[150];
00353     DEBUG("Rc %d from sending packet %s\n", rc, MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length));
00354 #endif
00355     return rc;
00356 }
00357 
00358 
00359 template<class Network, class Timer, int a, int b>
00360 int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout)
00361 {
00362     unsigned char c;
00363     int multiplier = 1;
00364     int len = 0;
00365     const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
00366 
00367     *value = 0;
00368     do
00369     {
00370         int rc = MQTTPACKET_READ_ERROR;
00371 
00372         if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
00373         {
00374             rc = MQTTPACKET_READ_ERROR; /* bad data */
00375             goto exit;
00376         }
00377         rc = ipstack.read(&c, 1, timeout);
00378         if (rc != 1)
00379             goto exit;
00380         *value += (c & 127) * multiplier;
00381         multiplier *= 128;
00382     } while ((c & 128) != 0);
00383 exit:
00384     return len;
00385 }
00386 
00387 
00388 /**
00389  * If any read fails in this method, then we should disconnect from the network, as on reconnect
00390  * the packets can be retried.
00391  * @param timeout the max time to wait for the packet read to complete, in milliseconds
00392  * @return the MQTT packet type, or -1 if none
00393  */
00394 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00395 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::readPacket(Timer& timer)
00396 {
00397     int rc = FAILURE;
00398     MQTTHeader header = {0};
00399     int len = 0;
00400     int rem_len = 0;
00401 
00402     /* 1. read the header byte.  This has the packet type in it */
00403     if (ipstack.read(readbuf, 1, timer.left_ms()) != 1)
00404         goto exit;
00405 
00406     len = 1;
00407     /* 2. read the remaining length.  This is variable in itself */
00408     decodePacket(&rem_len, timer.left_ms());
00409     len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */
00410 
00411     if (rem_len > (MAX_MQTT_PACKET_SIZE - len))
00412     {
00413         rc = BUFFER_OVERFLOW;
00414         goto exit;
00415     }
00416 
00417     /* 3. read the rest of the buffer using a callback to supply the rest of the data */
00418     if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len))
00419         goto exit;
00420 
00421     header.byte = readbuf[0];
00422     rc = header.bits.type;
00423     if (this->keepAliveInterval > 0)
00424         last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet
00425 exit:
00426         
00427 #if defined(MQTT_DEBUG)
00428     if (rc >= 0)
00429     {
00430         char printbuf[50];
00431         DEBUG("Rc %d from receiving packet %s\n", rc, MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len));
00432     }
00433 #endif
00434     return rc;
00435 }
00436 
00437 
00438 // assume topic filter and name is in correct format
00439 // # can only be at end
00440 // + and # can only be next to separator
00441 template<class Network, class Timer, int a, int b>
00442 bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName)
00443 {
00444     char* curf = topicFilter;
00445     char* curn = topicName.lenstring.data;
00446     char* curn_end = curn + topicName.lenstring.len;
00447 
00448     while (*curf && curn < curn_end)
00449     {
00450         if (*curn == '/' && *curf != '/')
00451             break;
00452         if (*curf != '+' && *curf != '#' && *curf != *curn)
00453             break;
00454         if (*curf == '+')
00455         {   // skip until we meet the next separator, or end of string
00456             char* nextpos = curn + 1;
00457             while (nextpos < curn_end && *nextpos != '/')
00458                 nextpos = ++curn + 1;
00459         }
00460         else if (*curf == '#')
00461             curn = curn_end - 1;    // skip until end of string
00462         curf++;
00463         curn++;
00464     };
00465 
00466     return (curn == curn_end) && (*curf == '\0');
00467 }
00468 
00469 
00470 
00471 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
00472 int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message)
00473 {
00474     int rc = FAILURE;
00475 
00476     // we have to find the right message handler - indexed by topic
00477     for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00478     {
00479         if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) ||
00480                 isTopicMatched((char*)messageHandlers[i].topicFilter, topicName)))
00481         {
00482             if (messageHandlers[i].fp.attached())
00483             {
00484                 MessageData md(topicName, message);
00485                 messageHandlers[i].fp(md);
00486                 rc = SUCCESS;
00487             }
00488         }
00489     }
00490 
00491     if (rc == FAILURE && defaultMessageHandler.attached())
00492     {
00493         MessageData md(topicName, message);
00494         defaultMessageHandler(md);
00495         rc = SUCCESS;
00496     }
00497 
00498     return rc;
00499 }
00500 
00501 
00502 
00503 template<class Network, class Timer, int a, int b>
00504 int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms)
00505 {
00506     int rc = SUCCESS;
00507     Timer timer = Timer();
00508 
00509     timer.countdown_ms(timeout_ms);
00510     while (!timer.expired())
00511     {
00512         if (cycle(timer) < 0)
00513         {
00514             rc = FAILURE;
00515             break;
00516         }
00517     }
00518 
00519     return rc;
00520 }
00521 
00522 
00523 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00524 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer)
00525 {
00526     /* get one piece of work off the wire and one pass through */
00527 
00528     // read the socket, see what work is due
00529     int packet_type = readPacket(timer);
00530 
00531     int len = 0,
00532         rc = SUCCESS;
00533 
00534     switch (packet_type)
00535     {
00536         case FAILURE:
00537         case BUFFER_OVERFLOW:
00538             rc = packet_type;
00539             break;
00540         case CONNACK:
00541         case PUBACK:
00542         case SUBACK:
00543             break;
00544         case PUBLISH:
00545         {
00546             MQTTString topicName = MQTTString_initializer;
00547             Message msg;
00548             int intQoS;
00549             if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName,
00550                                  (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00551                 goto exit;
00552             msg.qos = (enum QoS)intQoS;
00553 #if MQTTCLIENT_QOS2
00554             if (msg.qos != QOS2)
00555 #endif
00556                 deliverMessage(topicName, msg);
00557 #if MQTTCLIENT_QOS2
00558             else if (isQoS2msgidFree(msg.id))
00559             {
00560                 if (useQoS2msgid(msg.id))
00561                     deliverMessage(topicName, msg);
00562                 else
00563                     WARN("Maximum number of incoming QoS2 messages exceeded");
00564             }
00565 #endif
00566 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00567             if (msg.qos != QOS0)
00568             {
00569                 if (msg.qos == QOS1)
00570                     len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id);
00571                 else if (msg.qos == QOS2)
00572                     len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id);
00573                 if (len <= 0)
00574                     rc = FAILURE;
00575                 else
00576                     rc = sendPacket(len, timer);
00577                 if (rc == FAILURE)
00578                     goto exit; // there was a problem
00579             }
00580             break;
00581 #endif
00582         }
00583 #if MQTTCLIENT_QOS2
00584         case PUBREC:
00585         case PUBREL:
00586             unsigned short mypacketid;
00587             unsigned char dup, type;
00588             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00589                 rc = FAILURE;
00590             else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, 
00591                         (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0)
00592                 rc = FAILURE;
00593             else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet
00594                 rc = FAILURE; // there was a problem
00595             if (rc == FAILURE)
00596                 goto exit; // there was a problem
00597             if (packet_type == PUBREL)
00598                 freeQoS2msgid(mypacketid);
00599             break;
00600             
00601         case PUBCOMP:
00602             break;
00603 #endif
00604         case PINGRESP:
00605             ping_outstanding = false;
00606             break;
00607     }
00608     keepalive();
00609 exit:
00610     if (rc == SUCCESS)
00611         rc = packet_type;
00612     return rc;
00613 }
00614 
00615 
00616 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00617 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive()
00618 {
00619     int rc = FAILURE;
00620 
00621     if (keepAliveInterval == 0)
00622     {
00623         rc = SUCCESS;
00624         goto exit;
00625     }
00626 
00627     if (last_sent.expired() || last_received.expired())
00628     {
00629         if (!ping_outstanding)
00630         {
00631             Timer timer(1000);
00632             int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
00633             if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet
00634                 ping_outstanding = true;
00635         }
00636     }
00637 
00638 exit:
00639     return rc;
00640 }
00641 
00642 
00643 // only used in single-threaded mode where one command at a time is in process
00644 template<class Network, class Timer, int a, int b>
00645 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer)
00646 {
00647     int rc = FAILURE;
00648 
00649     do
00650     {
00651         if (timer.expired())
00652             break; // we timed out
00653     }
00654     while ((rc = cycle(timer)) != packet_type);
00655 
00656     return rc;
00657 }
00658 
00659 
00660 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00661 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options)
00662 {
00663     Timer connect_timer(command_timeout_ms);
00664     int rc = FAILURE;
00665     int len = 0;
00666 
00667     if (isconnected) // don't send connect packet again if we are already connected
00668         goto exit;
00669 
00670     this->keepAliveInterval = options.keepAliveInterval;
00671     this->cleansession = options.cleansession;
00672     if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0)
00673         goto exit;
00674     if ((rc = sendPacket(len, connect_timer)) != SUCCESS)  // send the connect packet
00675         goto exit; // there was a problem
00676 
00677     if (this->keepAliveInterval > 0)
00678         last_received.countdown(this->keepAliveInterval);
00679     // this will be a blocking call, wait for the connack
00680     if (waitfor(CONNACK, connect_timer) == CONNACK)
00681     {
00682         unsigned char connack_rc = 255;
00683         bool sessionPresent = false;
00684         if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
00685             rc = connack_rc;
00686         else
00687             rc = FAILURE;
00688     }
00689     else
00690         rc = FAILURE;
00691 
00692 #if MQTTCLIENT_QOS2
00693     // resend any inflight publish
00694     if (inflightMsgid > 0 && inflightQoS == QOS2 && pubrel)
00695     {
00696         if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0)
00697             rc = FAILURE;
00698         else
00699             rc = publish(len, connect_timer, inflightQoS);
00700     }
00701     else
00702 #endif
00703 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00704     if (inflightMsgid > 0)
00705     {
00706         memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE);
00707         rc = publish(inflightLen, connect_timer, inflightQoS);
00708     }
00709 #endif
00710 
00711 exit:
00712     if (rc == SUCCESS)
00713         isconnected = true;
00714     return rc;
00715 }
00716 
00717 
00718 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00719 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect()
00720 {
00721     MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
00722     return connect(default_options);
00723 }
00724 
00725 
00726 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
00727 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
00728 {
00729     int rc = FAILURE;
00730     Timer timer(command_timeout_ms);
00731     int len = 0;
00732     MQTTString topic = {(char*)topicFilter, {0, 0}};
00733 
00734     if (!isconnected)
00735         goto exit;
00736 
00737     len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
00738     if (len <= 0)
00739         goto exit;
00740     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
00741         goto exit;             // there was a problem
00742 
00743     if (waitfor(SUBACK, timer) == SUBACK)      // wait for suback
00744     {
00745         int count = 0, grantedQoS = -1;
00746         unsigned short mypacketid;
00747         if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
00748             rc = grantedQoS; // 0, 1, 2 or 0x80
00749         if (rc != 0x80)
00750         {
00751             for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00752             {
00753                 if (messageHandlers[i].topicFilter == 0)
00754                 {
00755                     messageHandlers[i].topicFilter = topicFilter;
00756                     messageHandlers[i].fp.attach(messageHandler);
00757                     rc = 0;
00758                     break;
00759                 }
00760             }
00761         }
00762     }
00763     else
00764         rc = FAILURE;
00765 
00766 exit:
00767     if (rc != SUCCESS)
00768         cleanSession();
00769     return rc;
00770 }
00771 
00772 
00773 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
00774 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter)
00775 {
00776     int rc = FAILURE;
00777     Timer timer(command_timeout_ms);
00778     MQTTString topic = {(char*)topicFilter, {0, 0}};
00779     int len = 0;
00780 
00781     if (!isconnected)
00782         goto exit;
00783 
00784     if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0)
00785         goto exit;
00786     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet
00787         goto exit; // there was a problem
00788 
00789     if (waitfor(UNSUBACK, timer) == UNSUBACK)
00790     {
00791         unsigned short mypacketid;  // should be the same as the packetid above
00792         if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
00793         {
00794             rc = 0;
00795 
00796             // remove the subscription message handler associated with this topic, if there is one
00797             for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00798             {
00799                 if (messageHandlers[i].topicFilter && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0)
00800                 {
00801                     messageHandlers[i].topicFilter = 0;
00802                     break;
00803                 }
00804             }
00805         }
00806     }
00807     else
00808         rc = FAILURE;
00809 
00810 exit:
00811     if (rc != SUCCESS)
00812         cleanSession();
00813     return rc;
00814 }
00815 
00816 
00817 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00818 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos)
00819 {
00820     int rc;
00821 
00822     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet
00823         goto exit; // there was a problem
00824 
00825 #if MQTTCLIENT_QOS1
00826     if (qos == QOS1)
00827     {
00828         if (waitfor(PUBACK, timer) == PUBACK)
00829         {
00830             unsigned short mypacketid;
00831             unsigned char dup, type;
00832             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00833                 rc = FAILURE;
00834             else if (inflightMsgid == mypacketid)
00835                 inflightMsgid = 0;
00836         }
00837         else
00838             rc = FAILURE;
00839     }
00840 #elif MQTTCLIENT_QOS2
00841     else if (qos == QOS2)
00842     {
00843         if (waitfor(PUBCOMP, timer) == PUBCOMP)
00844         {
00845             unsigned short mypacketid;
00846             unsigned char dup, type;
00847             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00848                 rc = FAILURE;
00849             else if (inflightMsgid == mypacketid)
00850                 inflightMsgid = 0;
00851         }
00852         else
00853             rc = FAILURE;
00854     }
00855 #endif
00856 
00857 exit:
00858     if (rc != SUCCESS)
00859         cleanSession();
00860     return rc;
00861 }
00862 
00863 
00864 
00865 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00866 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained)
00867 {
00868     int rc = FAILURE;
00869     Timer timer(command_timeout_ms);
00870     MQTTString topicString = MQTTString_initializer;
00871     int len = 0;
00872 
00873     if (!isconnected)
00874         goto exit;
00875 
00876     topicString.cstring = (char*)topicName;
00877 
00878 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00879     if (qos == QOS1 || qos == QOS2)
00880         id = packetid.getNext();
00881 #endif
00882 
00883     len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id,
00884               topicString, (unsigned char*)payload, payloadlen);
00885     if (len <= 0)
00886         goto exit;
00887 
00888 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00889     if (!cleansession)
00890     {
00891         memcpy(pubbuf, sendbuf, len);
00892         inflightMsgid = id;
00893         inflightLen = len;
00894         inflightQoS = qos;
00895 #if MQTTCLIENT_QOS2
00896         pubrel = false;
00897 #endif
00898     }
00899 #endif
00900 
00901     rc = publish(len, timer, qos);
00902 exit:
00903     return rc;
00904 }
00905 
00906 
00907 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00908 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained)
00909 {
00910     unsigned short id = 0;  // dummy - not used for anything
00911     return publish(topicName, payload, payloadlen, id, qos, retained);
00912 }
00913 
00914 
00915 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00916 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message)
00917 {
00918     return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained);
00919 }
00920 
00921 
00922 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00923 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect()
00924 {
00925     int rc = FAILURE;
00926     Timer timer(command_timeout_ms);            // we might wait for incomplete incoming publishes to complete
00927     int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE);
00928     if (len > 0)
00929         rc = sendPacket(len, timer);            // send the disconnect packet
00930 
00931     if (cleansession)
00932         cleanSession();
00933     else
00934         isconnected = false;
00935     return rc;
00936 }
00937 
00938 
00939 #endif