The Cayenne MQTT mbed Library provides functions to easily connect to the Cayenne IoT project builder.

Dependents:   Cayenne-ESP8266Interface Cayenne-WIZnet_Library Cayenne-WIZnetInterface Cayenne-X-NUCLEO-IDW01M1 ... more

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTClient.h Source File

MQTTClient.h

00001 /*******************************************************************************
00002  * Copyright (c) 2014, 2015 IBM Corp.
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors:
00014  *    Ian Craggs - initial API and implementation and/or initial documentation
00015  *    Ian Craggs - fix for bug 458512 - QoS 2 messages
00016  *    Ian Craggs - fix for bug 460389 - send loop uses wrong length
00017  *    Ian Craggs - fix for bug 464169 - clearing subscriptions
00018  *    Ian Craggs - fix for bug 464551 - enums and ints can be different size
00019  *    Mark Sonnentag - fix for bug 475204 - inefficient instantiation of Timer
00020  *    Ian Craggs - fix for bug 475749 - packetid modified twice
00021  *******************************************************************************/
00022 
00023 #if !defined(MQTTCLIENT_H)
00024 #define MQTTCLIENT_H
00025 
00026 #include "FP.h"
00027 #include "MQTTPacket.h"
00028 #include "stdio.h"
00029 #include "MQTTLogging.h"
00030 
00031 #if !defined(MQTTCLIENT_QOS1)
00032     #define MQTTCLIENT_QOS1 1
00033 #endif
00034 #if !defined(MQTTCLIENT_QOS2)
00035     #define MQTTCLIENT_QOS2 0
00036 #endif
00037 
00038 namespace MQTT
00039 {
00040 
00041 
00042 enum QoS { QOS0, QOS1, QOS2 };
00043 
00044 // all failure return codes must be negative
00045 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
00046 
00047 
00048 struct Message
00049 {
00050     enum QoS qos;
00051     bool retained;
00052     bool dup;
00053     unsigned short id;
00054     void *payload;
00055     size_t payloadlen;
00056 };
00057 
00058 
00059 struct MessageData
00060 {
00061     MessageData(MQTTString &aTopicName, struct Message &aMessage)  : message(aMessage), topicName(aTopicName)
00062     { }
00063 
00064     struct Message &message;
00065     MQTTString &topicName;
00066 };
00067 
00068 
00069 class PacketId
00070 {
00071 public:
00072     PacketId()
00073     {
00074         next = 0;
00075     }
00076 
00077     int getNext()
00078     {
00079         return next = (next == MAX_PACKET_ID) ? 1 : next + 1;
00080     }
00081 
00082 private:
00083     static const int MAX_PACKET_ID = 65535;
00084     int next;
00085 };
00086 
00087 
00088 /**
00089  * @class Client
00090  * @brief blocking, non-threaded MQTT client API
00091  *
00092  * This version of the API blocks on all method calls, until they are complete.  This means that only one
00093  * MQTT request can be in process at any one time.
00094  * @param Network a network class with the methods: read, write. See NetworkInterface.h for function definitions.
00095  * @param Timer a timer class with the methods: countdown_ms, countdown, left_ms, expired. See TimerInterface.h for function definitions.
00096  */
00097 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5>
00098 class Client
00099 {
00100 
00101 public:
00102 
00103     typedef void (*messageHandler)(MessageData&);
00104 
00105     /** Construct the client
00106      *  @param network - pointer to an instance of the Network class - must be connected to the endpoint
00107      *      before calling MQTT connect
00108      *  @param limits an instance of the Limit class - to alter limits as required
00109      */
00110     Client(Network& network, unsigned int command_timeout_ms = 30000);
00111 
00112     /** Set the default message handling callback - used for any message which does not match a subscription message handler
00113      *  @param mh - pointer to the callback function
00114      */
00115     void setDefaultMessageHandler(messageHandler mh)
00116     {
00117         defaultMessageHandler.attach(mh);
00118     }
00119 
00120     /** Set the default message handling callback - used for any message which does not match a subscription message handler
00121     *  @param time - address of initialized object
00122     *  @param mh - pointer to the callback function
00123     */
00124     template<class T>
00125     void setDefaultMessageHandler(T *item, void (T::*method)(MessageData&))
00126     {
00127         defaultMessageHandler.attach(item, method);
00128     }
00129     
00130     /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
00131      *  The nework object must be connected to the network endpoint before calling this
00132      *  Default connect options are used
00133      *  @return success code -
00134      */
00135     int connect();
00136     
00137         /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
00138      *  The nework object must be connected to the network endpoint before calling this
00139      *  @param options - connect options
00140      *  @return success code -
00141      */
00142     int connect(MQTTPacket_connectData& options);
00143 
00144     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00145      *  @param topic - the topic to publish to
00146      *  @param message - the message to send
00147      *  @return success code -
00148      */
00149     int publish(const char* topicName, Message& message);
00150     
00151     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00152      *  @param topic - the topic to publish to
00153      *  @param payload - the data to send
00154      *  @param payloadlen - the length of the data
00155      *  @param qos - the QoS to send the publish at
00156      *  @param retained - whether the message should be retained
00157      *  @return success code -
00158      */
00159     int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false);
00160     
00161     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00162      *  @param topic - the topic to publish to
00163      *  @param payload - the data to send
00164      *  @param payloadlen - the length of the data
00165      *  @param id - the packet id used - returned 
00166      *  @param qos - the QoS to send the publish at
00167      *  @param retained - whether the message should be retained
00168      *  @return success code -
00169      */
00170     int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false);
00171 
00172     /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
00173      *  @param topicFilter - a topic pattern which can include wildcards
00174      *  @param qos - the MQTT QoS to subscribe at
00175      *  @param mh - the callback function to be invoked when a message is received for this subscription
00176      *  @return success code -
00177      */
00178     int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh);
00179 
00180     /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback
00181      *  @param topicFilter - a topic pattern which can include wildcards
00182      *  @return success code -
00183      */
00184     int unsubscribe(const char* topicFilter);
00185 
00186     /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state
00187      *  @return success code -
00188      */
00189     int disconnect();
00190 
00191     /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive
00192      *  yield can be called if no other MQTT operation is needed.  This will also allow messages to be
00193      *  received.
00194      *  @param timeout_ms the time to wait, in milliseconds
00195      *  @return success code - on failure, this means the client has disconnected
00196      */
00197     int yield(unsigned long timeout_ms = 1000L);
00198 
00199     /** Is the client connected?
00200      *  @return flag - is the client connected or not?
00201      */
00202     bool isConnected()
00203     {
00204         return isconnected;
00205     }
00206 
00207 private:
00208 
00209     void cleanSession();
00210     int cycle(Timer& timer);
00211     int waitfor(int packet_type, Timer& timer);
00212     int keepalive();
00213     int publish(int len, Timer& timer, enum QoS qos);
00214 
00215     int decodePacket(int* value, int timeout);
00216     int readPacket(Timer& timer);
00217     int sendPacket(int length, Timer& timer);
00218     int deliverMessage(MQTTString& topicName, Message& message);
00219     bool isTopicMatched(char* topicFilter, MQTTString& topicName);
00220 
00221     Network& ipstack;
00222     unsigned long command_timeout_ms;
00223 
00224     unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
00225     unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
00226 
00227     Timer last_sent, last_received, ping_response;
00228     unsigned int keepAliveInterval;
00229     bool ping_outstanding;
00230     bool cleansession;
00231 
00232     PacketId packetid;
00233 
00234     struct MessageHandlers
00235     {
00236         const char* topicFilter;
00237         FP<void, MessageData&>  fp;
00238     } messageHandlers[MAX_MESSAGE_HANDLERS];      // Message handlers are indexed by subscription topic
00239 
00240     FP<void, MessageData&>  defaultMessageHandler;
00241 
00242     bool isconnected;
00243     
00244     bool connAckReceived;
00245     bool subAckReceived;
00246     bool unsubAckReceived;
00247     bool pubAckReceived;
00248     bool pubCompReceived;
00249 
00250 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00251     unsigned char pubbuf[MAX_MQTT_PACKET_SIZE];  // store the last publish for sending on reconnect
00252     int inflightLen;
00253     unsigned short inflightMsgid;
00254     enum QoS inflightQoS;
00255 #endif
00256 
00257 #if MQTTCLIENT_QOS2
00258     bool pubrel;
00259     #if !defined(MAX_INCOMING_QOS2_MESSAGES)
00260         #define MAX_INCOMING_QOS2_MESSAGES 10
00261     #endif
00262     unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES];
00263     bool isQoS2msgidFree(unsigned short id);
00264     bool useQoS2msgid(unsigned short id);
00265     void freeQoS2msgid(unsigned short id);
00266 #endif
00267 
00268 };
00269 
00270 }
00271 
00272 
00273 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
00274 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::cleanSession() 
00275 {
00276     ping_outstanding = false;
00277     for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00278         messageHandlers[i].topicFilter = 0;
00279     isconnected = false;
00280 
00281     connAckReceived = false;
00282     subAckReceived = false;
00283     unsubAckReceived = false;
00284     pubAckReceived = false;
00285     pubCompReceived = false;
00286     
00287 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00288     inflightMsgid = 0;
00289     inflightQoS = QOS0;
00290 #endif
00291 
00292 #if MQTTCLIENT_QOS2
00293     pubrel = false;
00294     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00295         incomingQoS2messages[i] = 0;
00296 #endif
00297 }
00298 
00299 
00300 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
00301 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms)  : ipstack(network), packetid()
00302 {
00303     this->command_timeout_ms = command_timeout_ms;
00304     cleanSession();
00305 }
00306 
00307 
00308 #if MQTTCLIENT_QOS2
00309 template<class Network, class Timer, int a, int b>
00310 bool MQTT::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id)
00311 {
00312     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00313     {
00314         if (incomingQoS2messages[i] == id)
00315             return false;
00316     }
00317     return true;
00318 }
00319 
00320 
00321 template<class Network, class Timer, int a, int b>
00322 bool MQTT::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id)
00323 {
00324     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00325     {
00326         if (incomingQoS2messages[i] == 0)
00327         {
00328             incomingQoS2messages[i] = id;
00329             return true;
00330         }
00331     }
00332     return false;
00333 }
00334 
00335 
00336 template<class Network, class Timer, int a, int b>
00337 void MQTT::Client<Network, Timer, a, b>::freeQoS2msgid(unsigned short id)
00338 {
00339     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00340     {
00341         if (incomingQoS2messages[i] == id)
00342         {
00343             incomingQoS2messages[i] = 0;
00344             return;
00345         }
00346     }
00347 }
00348 #endif
00349 
00350 
00351 template<class Network, class Timer, int a, int b>
00352 int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer)
00353 {
00354     int rc = FAILURE,
00355         sent = 0;
00356 
00357     while (sent < length && !timer.expired())
00358     {
00359         rc = ipstack.write(&sendbuf[sent], length - sent, timer.left_ms());
00360         if (rc < 0)  // there was an error writing the data
00361             break;
00362         sent += rc;
00363     }
00364     if (sent == length)
00365     {
00366         if (this->keepAliveInterval > 0)
00367             last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet
00368         rc = SUCCESS;
00369     }
00370     else
00371         rc = FAILURE;
00372         
00373 #if defined(MQTT_DEBUG)
00374     char printbuf[150];
00375     DEBUG("Rc %d from sending packet %s\n", rc, MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length));
00376 #endif
00377     return rc;
00378 }
00379 
00380 
00381 template<class Network, class Timer, int a, int b>
00382 int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout)
00383 {
00384     unsigned char c;
00385     int multiplier = 1;
00386     int len = 0;
00387     const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
00388 
00389     *value = 0;
00390     do
00391     {
00392         int rc = MQTTPACKET_READ_ERROR;
00393 
00394         if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
00395         {
00396             rc = MQTTPACKET_READ_ERROR; /* bad data */
00397             goto exit;
00398         }
00399         rc = ipstack.read(&c, 1, timeout);
00400         if (rc != 1)
00401             goto exit;
00402         *value += (c & 127) * multiplier;
00403         multiplier *= 128;
00404     } while ((c & 128) != 0);
00405 exit:
00406     return len;
00407 }
00408 
00409 
00410 /**
00411  * If any read fails in this method, then we should disconnect from the network, as on reconnect
00412  * the packets can be retried.
00413  * @param timeout the max time to wait for the packet read to complete, in milliseconds
00414  * @return the MQTT packet type, or -1 if none
00415  */
00416 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00417 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::readPacket(Timer& timer)
00418 {
00419     int rc = FAILURE;
00420     MQTTHeader header = {0};
00421     int len = 0;
00422     int rem_len = 0;
00423 
00424     /* 1. read the header byte.  This has the packet type in it */
00425     if (ipstack.read(readbuf, 1, timer.left_ms()) != 1)
00426         goto exit;
00427 
00428     len = 1;
00429     /* 2. read the remaining length.  This is variable in itself */
00430     decodePacket(&rem_len, timer.left_ms());
00431     len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */
00432 
00433     if (rem_len > (MAX_MQTT_PACKET_SIZE - len))
00434     {
00435         rc = BUFFER_OVERFLOW;
00436         goto exit;
00437     }
00438 
00439     /* 3. read the rest of the buffer using a callback to supply the rest of the data */
00440     if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len))
00441         goto exit;
00442 
00443     header.byte = readbuf[0];
00444     rc = header.bits.type;
00445     if (this->keepAliveInterval > 0)
00446         last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet
00447 exit:
00448         
00449 #if defined(MQTT_DEBUG)
00450     if (rc >= 0)
00451     {
00452         char printbuf[50];
00453         DEBUG("Rc %d from receiving packet %s\n", rc, MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len));
00454     }
00455 #endif
00456     return rc;
00457 }
00458 
00459 
00460 // assume topic filter and name is in correct format
00461 // # can only be at end
00462 // + and # can only be next to separator
00463 template<class Network, class Timer, int a, int b>
00464 bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName)
00465 {
00466     char* curf = topicFilter;
00467     char* curn = topicName.lenstring.data;
00468     char* curn_end = curn + topicName.lenstring.len;
00469 
00470     while (*curf && curn < curn_end)
00471     {
00472         if (*curn == '/' && *curf != '/')
00473             break;
00474         if (*curf != '+' && *curf != '#' && *curf != *curn)
00475             break;
00476         if (*curf == '+')
00477         {   // skip until we meet the next separator, or end of string
00478             char* nextpos = curn + 1;
00479             while (nextpos < curn_end && *nextpos != '/')
00480                 nextpos = ++curn + 1;
00481         }
00482         else if (*curf == '#')
00483             curn = curn_end - 1;    // skip until end of string
00484         curf++;
00485         curn++;
00486     };
00487 
00488     return (curn == curn_end) && (*curf == '\0');
00489 }
00490 
00491 
00492 
00493 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
00494 int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message)
00495 {
00496     int rc = FAILURE;
00497     // we have to find the right message handler - indexed by topic
00498     for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00499     {
00500         if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) ||
00501                 isTopicMatched((char*)messageHandlers[i].topicFilter, topicName)))
00502         {
00503             if (messageHandlers[i].fp.attached())
00504             {
00505                 MessageData md(topicName, message);
00506                 messageHandlers[i].fp(md);
00507                 rc = SUCCESS;
00508             }
00509         }
00510     }
00511 
00512     if (rc == FAILURE && defaultMessageHandler.attached())
00513     {
00514         MessageData md(topicName, message);
00515         defaultMessageHandler(md);
00516         rc = SUCCESS;
00517     }
00518 
00519     return rc;
00520 }
00521 
00522 
00523 
00524 template<class Network, class Timer, int a, int b>
00525 int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms)
00526 {
00527     int rc = SUCCESS;
00528     Timer timer;
00529 
00530     timer.countdown_ms(timeout_ms);
00531     while (!timer.expired())
00532     {
00533         if (cycle(timer) < 0)
00534         {
00535             rc = FAILURE;
00536             break;
00537         }
00538     }
00539 
00540     return rc;
00541 }
00542 
00543 
00544 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00545 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer)
00546 {
00547     /* get one piece of work off the wire and one pass through */
00548 
00549     // read the socket, see what work is due
00550     int packet_type = readPacket(timer);
00551 
00552     int len = 0,
00553         rc = SUCCESS;
00554 
00555     switch (packet_type)
00556     {
00557         case FAILURE:
00558         case BUFFER_OVERFLOW:
00559             rc = packet_type;
00560             break;
00561         case CONNACK_MSG:
00562             connAckReceived = true;
00563             break;
00564         case PUBACK_MSG:
00565             pubAckReceived = true;
00566             break;
00567         case SUBACK_MSG:
00568             subAckReceived = true;
00569             break;
00570         case UNSUBACK_MSG:
00571             unsubAckReceived = true;
00572             break;
00573         case PUBLISH_MSG:
00574         {
00575             MQTTString topicName = MQTTString_initializer;
00576             Message msg;
00577             int intQoS;
00578             if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName,
00579                                  (unsigned char**)&msg.payload, &msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00580                 goto exit;
00581             msg.qos = (enum QoS)intQoS;
00582 #if MQTTCLIENT_QOS2
00583             if (msg.qos != QOS2)
00584 #endif
00585                 deliverMessage(topicName, msg);
00586 #if MQTTCLIENT_QOS2
00587             else if (isQoS2msgidFree(msg.id))
00588             {
00589                 if (useQoS2msgid(msg.id))
00590                     deliverMessage(topicName, msg);
00591                 else
00592                     WARN("Maximum number of incoming QoS2 messages exceeded");
00593             }
00594 #endif
00595 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00596             if (msg.qos != QOS0)
00597             {
00598                 if (msg.qos == QOS1)
00599                     len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK_MSG, 0, msg.id);
00600                 else if (msg.qos == QOS2)
00601                     len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC_MSG, 0, msg.id);
00602                 if (len <= 0)
00603                     rc = FAILURE;
00604                 else
00605                     rc = sendPacket(len, timer);
00606                 if (rc == FAILURE)
00607                     goto exit; // there was a problem
00608             }
00609             break;
00610 #endif
00611         }
00612 #if MQTTCLIENT_QOS2
00613         case PUBREC_MSG:
00614         case PUBREL_MSG:
00615             unsigned short mypacketid;
00616             unsigned char dup, type;
00617             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00618                 rc = FAILURE;
00619             else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, 
00620                         (packet_type == PUBREC_MSG) ? PUBREL_MSG : PUBCOMP_MSG, 0, mypacketid)) <= 0)
00621                 rc = FAILURE;
00622             else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL_MSG packet
00623                 rc = FAILURE; // there was a problem
00624             if (rc == FAILURE)
00625                 goto exit; // there was a problem
00626             if (packet_type == PUBREL_MSG)
00627                 freeQoS2msgid(mypacketid);
00628             break;
00629             
00630         case PUBCOMP_MSG:
00631             pubCompReceived = true;
00632             break;
00633 #endif
00634         case PINGRESP_MSG:
00635             ping_outstanding = false;
00636             break;
00637     }
00638     keepalive();
00639 exit:
00640     if (rc == SUCCESS)
00641         rc = packet_type;
00642     return rc;
00643 }
00644 
00645 
00646 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00647 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive()
00648 {
00649     int rc = FAILURE;
00650 
00651     if (keepAliveInterval == 0)
00652     {
00653         rc = SUCCESS;
00654         goto exit;
00655     }
00656 
00657     if (last_sent.expired() || last_received.expired())
00658     {
00659         if (!ping_outstanding)
00660         {
00661             Timer timer(1000);
00662             int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
00663             if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet
00664             {
00665                 ping_response.countdown(this->keepAliveInterval);
00666                 ping_outstanding = true;
00667             }
00668         }
00669         else if(ping_response.expired())
00670         {
00671             isconnected = false;
00672         }
00673     }
00674 
00675 exit:
00676     return rc;
00677 }
00678 
00679 
00680 // only used in single-threaded mode where one command at a time is in process
00681 template<class Network, class Timer, int a, int b>
00682 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer)
00683 {
00684     int rc = FAILURE;
00685 
00686     // Use bool values to determine if a packet type has been received. This only works if waitfor is 
00687     // called once at a time per type. However, it can be called with a different type at the same 
00688     // time, for instance, while waiting for a subscription acknowledgement (SUBACK_MSG) we could 
00689     // publish a QoS1 message and wait for the acknowledgement (PUBACK_MSG).
00690     switch(packet_type)
00691     {
00692     case CONNACK_MSG:
00693         connAckReceived = false;
00694         break;
00695     case SUBACK_MSG:
00696         subAckReceived = false;
00697         break;
00698     case UNSUBACK_MSG:
00699         unsubAckReceived = false;
00700         break;
00701     case PUBACK_MSG:
00702         pubAckReceived = false;
00703         break;
00704     case PUBCOMP_MSG:
00705         pubCompReceived = false;
00706         break;
00707     }
00708         
00709     do
00710     {
00711         switch(packet_type)
00712         {
00713         case CONNACK_MSG:
00714             if(connAckReceived) {
00715                 connAckReceived = false;
00716                 return packet_type;                 
00717             }
00718             break;
00719         case SUBACK_MSG:
00720             if(subAckReceived) {
00721                 subAckReceived = false;
00722                 return packet_type;                 
00723             }
00724             break;
00725         case UNSUBACK_MSG:
00726             if(unsubAckReceived) {
00727                 unsubAckReceived = false;
00728                 return packet_type;                 
00729             }
00730             break;
00731         case PUBACK_MSG:
00732             if(pubAckReceived) {
00733                 pubAckReceived = false;
00734                 return packet_type;                 
00735             }
00736             break;
00737         case PUBCOMP_MSG:
00738             if(pubCompReceived) {
00739                 pubCompReceived = false;
00740                 return packet_type;                 
00741             }
00742             break;
00743         }
00744         if (timer.expired())
00745             break; // we timed out
00746         cycle(timer);    
00747     }
00748     while (true);
00749 
00750     return rc;
00751 }
00752 
00753 
00754 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00755 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options)
00756 {
00757     Timer connect_timer(command_timeout_ms);
00758     int rc = FAILURE;
00759     int len = 0;
00760 
00761     if (isconnected) // don't send connect packet again if we are already connected
00762         goto exit;
00763 
00764     this->keepAliveInterval = options.keepAliveInterval;
00765     this->cleansession = (options.cleansession != 0);
00766     if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0)
00767         goto exit;
00768     if ((rc = sendPacket(len, connect_timer)) != SUCCESS)  // send the connect packet
00769         goto exit; // there was a problem
00770 
00771     if (this->keepAliveInterval > 0)
00772         last_received.countdown(this->keepAliveInterval);
00773     // this will be a blocking call, wait for the connack
00774     if (waitfor(CONNACK_MSG, connect_timer) == CONNACK_MSG)
00775     {
00776         unsigned char connack_rc = 255;
00777         bool sessionPresent = false;
00778         if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
00779             rc = connack_rc;
00780         else
00781             rc = FAILURE;
00782     }
00783     else
00784         rc = FAILURE;
00785 
00786 #if MQTTCLIENT_QOS2
00787     // resend any inflight publish
00788     if (inflightMsgid > 0 && inflightQoS == QOS2 && pubrel)
00789     {
00790         if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL_MSG, 0, inflightMsgid)) <= 0)
00791             rc = FAILURE;
00792         else
00793             rc = publish(len, connect_timer, inflightQoS);
00794     }
00795     else
00796 #endif
00797 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00798     if (inflightMsgid > 0)
00799     {
00800         memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE);
00801         rc = publish(inflightLen, connect_timer, inflightQoS);
00802     }
00803 #endif
00804 
00805 exit:
00806     if (rc == SUCCESS)
00807         isconnected = true;
00808     return rc;
00809 }
00810 
00811 
00812 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00813 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect()
00814 {
00815     MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
00816     return connect(default_options);
00817 }
00818 
00819 
00820 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
00821 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
00822 {
00823     int rc = FAILURE;
00824     Timer timer(command_timeout_ms);
00825     int len = 0;
00826     MQTTString topic = {(char*)topicFilter, {0, 0}};
00827 
00828     if (!isconnected)
00829         goto exit;
00830 
00831     len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
00832     if (len <= 0)
00833         goto exit;
00834     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
00835         goto exit;             // there was a problem
00836 
00837     if (waitfor(SUBACK_MSG, timer) == SUBACK_MSG)      // wait for suback
00838     {
00839         int count = 0, grantedQoS = -1;
00840         unsigned short mypacketid;
00841         if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
00842             rc = grantedQoS; // 0, 1, 2 or 0x80
00843         if (rc != 0x80)
00844         {
00845             for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00846             {
00847                 if (messageHandlers[i].topicFilter == 0)
00848                 {
00849                     messageHandlers[i].topicFilter = topicFilter;
00850                     messageHandlers[i].fp.attach(messageHandler);
00851                     rc = 0;
00852                     break;
00853                 }
00854             }
00855         }
00856     }
00857     else
00858         rc = FAILURE;
00859 
00860 exit:
00861     if (rc != SUCCESS)
00862         cleanSession();
00863     return rc;
00864 }
00865 
00866 
00867 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
00868 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter)
00869 {
00870     int rc = FAILURE;
00871     Timer timer(command_timeout_ms);
00872     MQTTString topic = {(char*)topicFilter, {0, 0}};
00873     int len = 0;
00874 
00875     if (!isconnected)
00876         goto exit;
00877 
00878     if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0)
00879         goto exit;
00880     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet
00881         goto exit; // there was a problem
00882 
00883     if (waitfor(UNSUBACK_MSG, timer) == UNSUBACK_MSG)
00884     {
00885         unsigned short mypacketid;  // should be the same as the packetid above
00886         if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
00887         {
00888             rc = 0;
00889 
00890             // remove the subscription message handler associated with this topic, if there is one
00891             for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00892             {
00893                 if (messageHandlers[i].topicFilter != 0 && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0)
00894                 {
00895                     messageHandlers[i].topicFilter = 0;
00896                     break;
00897                 }
00898             }
00899         }
00900     }
00901     else
00902         rc = FAILURE;
00903 
00904 exit:
00905     if (rc != SUCCESS)
00906         cleanSession();
00907     return rc;
00908 }
00909 
00910 
00911 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00912 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos)
00913 {
00914     int rc;
00915 
00916     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet
00917         goto exit; // there was a problem
00918 
00919 #if MQTTCLIENT_QOS1
00920     if (qos == QOS1)
00921     {
00922         if (waitfor(PUBACK_MSG, timer) == PUBACK_MSG)
00923         {
00924             unsigned short mypacketid;
00925             unsigned char dup, type;
00926             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00927                 rc = FAILURE;
00928             else if (inflightMsgid == mypacketid)
00929                 inflightMsgid = 0;
00930         }
00931         else
00932             rc = FAILURE;
00933     }
00934 #endif
00935 #if MQTTCLIENT_QOS2
00936     if (qos == QOS2)
00937     {
00938         if (waitfor(PUBCOMP_MSG, timer) == PUBCOMP_MSG)
00939         {
00940             unsigned short mypacketid;
00941             unsigned char dup, type;
00942             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00943                 rc = FAILURE;
00944             else if (inflightMsgid == mypacketid)
00945                 inflightMsgid = 0;
00946         }
00947         else
00948             rc = FAILURE;
00949     }
00950 #endif
00951 
00952 exit:
00953     if (rc != SUCCESS)
00954         cleanSession();
00955     return rc;
00956 }
00957 
00958 
00959 
00960 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00961 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained)
00962 {
00963     int rc = FAILURE;
00964     Timer timer(command_timeout_ms);
00965     MQTTString topicString = MQTTString_initializer;
00966     int len = 0;
00967 
00968     if (!isconnected)
00969         goto exit;
00970 
00971     topicString.cstring = (char*)topicName;
00972 
00973 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00974     if (qos == QOS1 || qos == QOS2)
00975         id = packetid.getNext();
00976 #endif
00977 
00978     len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id,
00979               topicString, (unsigned char*)payload, payloadlen);
00980     if (len <= 0)
00981         goto exit;
00982 
00983 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00984     if (!cleansession)
00985     {
00986         memcpy(pubbuf, sendbuf, len);
00987         inflightMsgid = id;
00988         inflightLen = len;
00989         inflightQoS = qos;
00990 #if MQTTCLIENT_QOS2
00991         pubrel = false;
00992 #endif
00993     }
00994 #endif
00995 
00996     rc = publish(len, timer, qos);
00997 exit:
00998     return rc;
00999 }
01000 
01001 
01002 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
01003 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained)
01004 {
01005     unsigned short id = 0;  // dummy - not used for anything
01006     return publish(topicName, payload, payloadlen, id, qos, retained);
01007 }
01008 
01009 
01010 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
01011 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message)
01012 {
01013     return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained);
01014 }
01015 
01016 
01017 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
01018 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect()
01019 {
01020     int rc = FAILURE;
01021     Timer timer(command_timeout_ms);     // we might wait for incomplete incoming publishes to complete
01022     int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE);
01023     if (len > 0)
01024         rc = sendPacket(len, timer);            // send the disconnect packet
01025 
01026     if (cleansession)
01027         cleanSession();
01028     else
01029         isconnected = false;
01030     return rc;
01031 }
01032 
01033 
01034 #endif