Jim Flynn / MQTT
Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTClient.h Source File

MQTTClient.h

00001 /*******************************************************************************
00002  * Copyright (c) 2014, 2017 IBM Corp.
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors:
00014  *    Ian Craggs - initial API and implementation and/or initial documentation
00015  *    Ian Craggs - fix for bug 458512 - QoS 2 messages
00016  *    Ian Craggs - fix for bug 460389 - send loop uses wrong length
00017  *    Ian Craggs - fix for bug 464169 - clearing subscriptions
00018  *    Ian Craggs - fix for bug 464551 - enums and ints can be different size
00019  *    Mark Sonnentag - fix for bug 475204 - inefficient instantiation of Timer
00020  *    Ian Craggs - fix for bug 475749 - packetid modified twice
00021  *    Ian Craggs - add ability to set message handler separately #6
00022  *******************************************************************************/
00023 
00024 #if !defined(MQTTCLIENT_H)
00025 #define MQTTCLIENT_H
00026 
00027 #include "MQTTPacket.h"
00028 #include <stdio.h>
00029 #include "MQTTLogging.h"
00030 
00031 #if !defined(MQTTCLIENT_QOS1)
00032     #define MQTTCLIENT_QOS1 1
00033 #endif
00034 #if !defined(MQTTCLIENT_QOS2)
00035     #define MQTTCLIENT_QOS2 0
00036 #endif
00037 
00038 namespace MQTT
00039 {
00040 
00041 
00042 enum QoS { QOS0, QOS1, QOS2 };
00043 
00044 // all failure return codes must be negative
00045 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
00046 
00047 
00048 struct Message
00049 {
00050     enum QoS qos;
00051     bool retained;
00052     bool dup;
00053     unsigned short id;
00054     void *payload;
00055     size_t payloadlen;
00056 };
00057 
00058 
00059 struct MessageData
00060 {
00061     MessageData(MQTTString &aTopicName, struct Message &aMessage)  : message(aMessage), topicName(aTopicName)
00062     { }
00063 
00064     struct Message &message;
00065     MQTTString &topicName;
00066 };
00067 
00068 
00069 struct connackData
00070 {
00071     int rc;
00072     bool sessionPresent;
00073 };
00074 
00075 
00076 struct subackData
00077 {
00078     int grantedQoS;
00079 };
00080 
00081 
00082 class PacketId
00083 {
00084 public:
00085     PacketId()
00086     {
00087         next = 0;
00088     }
00089 
00090     int getNext()
00091     {
00092         return next = (next == MAX_PACKET_ID) ? 1 : next + 1;
00093     }
00094 
00095 private:
00096     static const int MAX_PACKET_ID = 65535;
00097     int next;
00098 };
00099 
00100 
00101 /**
00102  * @class Client
00103  * @brief blocking, non-threaded MQTT client API
00104  *
00105  * This version of the API blocks on all method calls, until they are complete.  This means that only one
00106  * MQTT request can be in process at any one time.
00107  * @param Network a network class which supports send, receive
00108  * @param Timer a timer class with the methods:
00109  */
00110 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5>
00111 class Client
00112 {
00113 
00114 public:
00115 
00116     typedef void (*messageHandler)(MessageData&);
00117 
00118     /** Construct the client
00119      *  @param network - pointer to an instance of the Network class - must be connected to the endpoint
00120      *      before calling MQTT connect
00121      *  @param limits an instance of the Limit class - to alter limits as required
00122      */
00123     Client(Network& network, unsigned int command_timeout_ms = 30000);
00124 
00125     /** Set the default message handling callback - used for any message which does not match a subscription message handler
00126      *  @param mh - pointer to the callback function.  Set to 0 to remove.
00127      */
00128     void setDefaultMessageHandler(messageHandler mh)
00129     {
00130         if (mh != 0)
00131             defaultMessageHandler=mh;
00132         else
00133             defaultMessageHandler=NULL;
00134     }
00135 
00136     /** Set a message handling callback.  This can be used outside of the the subscribe method.
00137      *  @param topicFilter - a topic pattern which can include wildcards
00138      *  @param mh - pointer to the callback function. If 0, removes the callback if any
00139      */
00140     int setMessageHandler(const char* topicFilter, messageHandler mh);
00141 
00142     /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
00143      *  The nework object must be connected to the network endpoint before calling this
00144      *  Default connect options are used
00145      *  @return success code -
00146      */
00147     int connect();
00148 
00149     /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
00150      *  The nework object must be connected to the network endpoint before calling this
00151      *  @param options - connect options
00152      *  @return success code -
00153      */
00154     int connect(MQTTPacket_connectData& options);
00155 
00156     /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
00157      *  The nework object must be connected to the network endpoint before calling this
00158      *  @param options - connect options
00159      *  @param connackData - connack data to be returned
00160      *  @return success code -
00161      */
00162     int connect(MQTTPacket_connectData& options, connackData& data);
00163 
00164     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00165      *  @param topic - the topic to publish to
00166      *  @param message - the message to send
00167      *  @return success code -
00168      */
00169     int publish(const char* topicName, Message& message);
00170 
00171     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00172      *  @param topic - the topic to publish to
00173      *  @param payload - the data to send
00174      *  @param payloadlen - the length of the data
00175      *  @param qos - the QoS to send the publish at
00176      *  @param retained - whether the message should be retained
00177      *  @return success code -
00178      */
00179     int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false);
00180 
00181     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00182      *  @param topic - the topic to publish to
00183      *  @param payload - the data to send
00184      *  @param payloadlen - the length of the data
00185      *  @param id - the packet id used - returned
00186      *  @param qos - the QoS to send the publish at
00187      *  @param retained - whether the message should be retained
00188      *  @return success code -
00189      */
00190     int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false);
00191 
00192     /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
00193      *  @param topicFilter - a topic pattern which can include wildcards
00194      *  @param qos - the MQTT QoS to subscribe at
00195      *  @param mh - the callback function to be invoked when a message is received for this subscription
00196      *  @return success code -
00197      */
00198     int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh);
00199 
00200     /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
00201      *  @param topicFilter - a topic pattern which can include wildcards
00202      *  @param qos - the MQTT QoS to subscribe at©
00203      *  @param mh - the callback function to be invoked when a message is received for this subscription
00204      *  @param
00205      *  @return success code -
00206      */
00207     int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh, subackData &data);
00208 
00209     /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback
00210      *  @param topicFilter - a topic pattern which can include wildcards
00211      *  @return success code -
00212      */
00213     int unsubscribe(const char* topicFilter);
00214 
00215     /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state
00216      *  @return success code -
00217      */
00218     int disconnect();
00219 
00220     /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive
00221      *  yield can be called if no other MQTT operation is needed.  This will also allow messages to be
00222      *  received.
00223      *  @param timeout_ms the time to wait, in milliseconds
00224      *  @return success code - on failure, this means the client has disconnected
00225      */
00226     int yield(unsigned long timeout_ms = 1000L);
00227 
00228     /** Is the client connected?
00229      *  @return flag - is the client connected or not?
00230      */
00231     bool isConnected()
00232     {
00233         return isconnected;
00234     }
00235 
00236 private:
00237 
00238     void closeSession();
00239     void cleanSession();
00240     int cycle(Timer& timer);
00241     int waitfor(int packet_type, Timer& timer);
00242     int keepalive();
00243     int publish(int len, Timer& timer, enum QoS qos);
00244 
00245     int decodePacket(int* value, int timeout);
00246     int readPacket(Timer& timer);
00247     int sendPacket(int length, Timer& timer);
00248     int deliverMessage(MQTTString& topicName, Message& message);
00249     bool isTopicMatched(char* topicFilter, MQTTString& topicName);
00250 
00251     Network& ipstack;
00252     unsigned long command_timeout_ms;
00253 
00254     unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
00255     unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
00256 
00257     Timer last_sent, last_received;
00258     unsigned int keepAliveInterval;
00259     bool ping_outstanding;
00260     bool cleansession;
00261 
00262     PacketId packetid;
00263 
00264     struct MessageHandlers
00265     {
00266         const char* topicFilter;
00267         Callback<void( MessageData&)> fp;
00268     } messageHandlers[MAX_MESSAGE_HANDLERS];      // Message handlers are indexed by subscription topic
00269 
00270     Callback<void(MessageData&)> defaultMessageHandler;
00271 
00272     bool isconnected;
00273 
00274 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00275     unsigned char pubbuf[MAX_MQTT_PACKET_SIZE];  // store the last publish for sending on reconnect
00276     int inflightLen;
00277     unsigned short inflightMsgid;
00278     enum QoS inflightQoS;
00279 #endif
00280 
00281 #if MQTTCLIENT_QOS2
00282     bool pubrel;
00283     #if !defined(MAX_INCOMING_QOS2_MESSAGES)
00284         #define MAX_INCOMING_QOS2_MESSAGES 10
00285     #endif
00286     unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES];
00287     bool isQoS2msgidFree(unsigned short id);
00288     bool useQoS2msgid(unsigned short id);
00289     void freeQoS2msgid(unsigned short id);
00290 #endif
00291 
00292 };
00293 
00294 }
00295 
00296 
00297 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
00298 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::cleanSession()
00299 {
00300     for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00301         messageHandlers[i].topicFilter = 0;
00302 
00303 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00304     inflightMsgid = 0;
00305     inflightQoS = QOS0;
00306 #endif
00307 
00308 #if MQTTCLIENT_QOS2
00309     pubrel = false;
00310     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00311         incomingQoS2messages[i] = 0;
00312 #endif
00313 }
00314 
00315 
00316 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
00317 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::closeSession()
00318 {
00319     ping_outstanding = false;
00320     isconnected = false;
00321     if (cleansession)
00322         cleanSession();
00323 }
00324 
00325 
00326 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
00327 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms)  : ipstack(network), packetid()
00328 {
00329     this->command_timeout_ms = command_timeout_ms;
00330     cleansession = true;
00331       closeSession();
00332 }
00333 
00334 
00335 #if MQTTCLIENT_QOS2
00336 template<class Network, class Timer, int a, int b>
00337 bool MQTT::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id)
00338 {
00339     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00340     {
00341         if (incomingQoS2messages[i] == id)
00342             return false;
00343     }
00344     return true;
00345 }
00346 
00347 
00348 template<class Network, class Timer, int a, int b>
00349 bool MQTT::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id)
00350 {
00351     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00352     {
00353         if (incomingQoS2messages[i] == 0)
00354         {
00355             incomingQoS2messages[i] = id;
00356             return true;
00357         }
00358     }
00359     return false;
00360 }
00361 
00362 
00363 template<class Network, class Timer, int a, int b>
00364 void MQTT::Client<Network, Timer, a, b>::freeQoS2msgid(unsigned short id)
00365 {
00366     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00367     {
00368         if (incomingQoS2messages[i] == id)
00369         {
00370             incomingQoS2messages[i] = 0;
00371             return;
00372         }
00373     }
00374 }
00375 #endif
00376 
00377 
00378 template<class Network, class Timer, int a, int b>
00379 int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer)
00380 {
00381     int rc = FAILURE,
00382         sent = 0;
00383 
00384     while (sent < length)
00385     {
00386         rc = ipstack.write(&sendbuf[sent], length - sent, timer.left_ms());
00387         if (rc < 0)  // there was an error writing the data
00388             break;
00389         sent += rc;
00390         if (timer.expired()) // only check expiry after at least one attempt to write
00391             break;
00392     }
00393     if (sent == length)
00394     {
00395         if (this->keepAliveInterval > 0)
00396             last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet
00397         rc = SUCCESS;
00398     }
00399     else
00400         rc = FAILURE;
00401 
00402 #if defined(MQTT_DEBUG)
00403     char printbuf[150];
00404     DEBUG("Rc %d from sending packet %s\r\n", rc, 
00405         MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length));
00406 #endif
00407     return rc;
00408 }
00409 
00410 
00411 template<class Network, class Timer, int a, int b>
00412 int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout)
00413 {
00414     unsigned char c;
00415     int multiplier = 1;
00416     int len = 0;
00417     const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
00418 
00419     *value = 0;
00420     do
00421     {
00422         int rc = MQTTPACKET_READ_ERROR;
00423 
00424         if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
00425         {
00426             rc = MQTTPACKET_READ_ERROR; /* bad data */
00427             goto exit;
00428         }
00429         rc = ipstack.read(&c, 1, timeout);
00430         if (rc != 1)
00431             goto exit;
00432         *value += (c & 127) * multiplier;
00433         multiplier *= 128;
00434     } while ((c & 128) != 0);
00435 exit:
00436     return len;
00437 }
00438 
00439 
00440 /**
00441  * If any read fails in this method, then we should disconnect from the network, as on reconnect
00442  * the packets can be retried.
00443  * @param timeout the max time to wait for the packet read to complete, in milliseconds
00444  * @return the MQTT packet type, 0 if none, -1 if error
00445  */
00446 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00447 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::readPacket(Timer& timer)
00448 {
00449     int rc = FAILURE;
00450     MQTTHeader header = {0};
00451     int len = 0;
00452     int rem_len = 0;
00453 
00454     /* 1. read the header byte.  This has the packet type in it */
00455     rc = ipstack.read(readbuf, 1, timer.left_ms());
00456     if (rc != 1)
00457         goto exit;
00458 
00459     len = 1;
00460     /* 2. read the remaining length.  This is variable in itself */
00461     decodePacket(&rem_len, timer.left_ms());
00462     len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */
00463 
00464     if (rem_len > (MAX_MQTT_PACKET_SIZE - len))
00465     {
00466         rc = BUFFER_OVERFLOW;
00467         goto exit;
00468     }
00469 
00470     /* 3. read the rest of the buffer using a callback to supply the rest of the data */
00471     if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len))
00472         goto exit;
00473 
00474     header.byte = readbuf[0];
00475     rc = header.bits.type;
00476     if (this->keepAliveInterval > 0)
00477         last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet
00478 exit:
00479 
00480 #if defined(MQTT_DEBUG)
00481     if (rc >= 0)
00482     {
00483         char printbuf[50];
00484         DEBUG("Rc %d receiving packet %s\r\n", rc, 
00485             MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len));
00486     }
00487 #endif
00488     return rc;
00489 }
00490 
00491 
00492 // assume topic filter and name is in correct format
00493 // # can only be at end
00494 // + and # can only be next to separator
00495 template<class Network, class Timer, int a, int b>
00496 bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName)
00497 {
00498     char* curf = topicFilter;
00499     char* curn = topicName.lenstring.data;
00500     char* curn_end = curn + topicName.lenstring.len;
00501 
00502     while (*curf && curn < curn_end)
00503     {
00504         if (*curn == '/' && *curf != '/')
00505             break;
00506         if (*curf != '+' && *curf != '#' && *curf != *curn)
00507             break;
00508         if (*curf == '+')
00509         {   // skip until we meet the next separator, or end of string
00510             char* nextpos = curn + 1;
00511             while (nextpos < curn_end && *nextpos != '/')
00512                 nextpos = ++curn + 1;
00513         }
00514         else if (*curf == '#')
00515             curn = curn_end - 1;    // skip until end of string
00516         curf++;
00517         curn++;
00518     };
00519 
00520     return (curn == curn_end) && (*curf == '\0');
00521 }
00522 
00523 
00524 
00525 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
00526 int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message)
00527 {
00528     int rc = FAILURE;
00529 
00530     // we have to find the right message handler - indexed by topic
00531     for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00532     {
00533         if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) ||
00534                 isTopicMatched((char*)messageHandlers[i].topicFilter, topicName)))
00535         {
00536             if (messageHandlers[i].fp)  //test to see if it is attached
00537             {
00538                 MessageData md(topicName, message);
00539                 messageHandlers[i].fp(md);
00540                 rc = SUCCESS;
00541             }
00542         }
00543     }
00544 
00545     if (rc == FAILURE && defaultMessageHandler )
00546     {
00547         MessageData md(topicName, message);
00548         defaultMessageHandler(md);
00549         rc = SUCCESS;
00550     }
00551 
00552     return rc;
00553 }
00554 
00555 
00556 
00557 template<class Network, class Timer, int a, int b>
00558 int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms)
00559 {
00560     int rc = SUCCESS;
00561     Timer timer;
00562 
00563     timer.countdown_ms(timeout_ms);
00564     while (!timer.expired())
00565     {
00566         if (cycle(timer) < 0)
00567         {
00568             rc = FAILURE;
00569             break;
00570         }
00571     }
00572 
00573     return rc;
00574 }
00575 
00576 
00577 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00578 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer)
00579 {
00580     // get one piece of work off the wire and one pass through
00581     int len = 0,
00582         rc = SUCCESS;
00583 
00584     int packet_type = readPacket(timer);    // read the socket, see what work is due
00585 
00586     switch (packet_type)
00587     {
00588         default:
00589             // no more data to read, unrecoverable. Or read packet fails due to unexpected network error
00590             rc = packet_type;
00591             goto exit;
00592         case 0: // timed out reading packet
00593             break;
00594         case CONNACK:
00595         case PUBACK:
00596         case SUBACK:
00597             break;
00598         case PUBLISH:
00599         {
00600             MQTTString topicName = MQTTString_initializer;
00601             Message msg;
00602             int intQoS;
00603             msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */
00604             if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName,
00605                                  (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00606                 goto exit;
00607             msg.qos = (enum QoS)intQoS;
00608 #if MQTTCLIENT_QOS2
00609             if (msg.qos != QOS2)
00610 #endif
00611                 deliverMessage(topicName, msg);
00612 #if MQTTCLIENT_QOS2
00613             else if (isQoS2msgidFree(msg.id))
00614             {
00615                 if (useQoS2msgid(msg.id))
00616                     deliverMessage(topicName, msg);
00617                 else
00618                     WARN("Maximum number of incoming QoS2 messages exceeded");
00619             }
00620 #endif
00621 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00622             if (msg.qos != QOS0)
00623             {
00624                 if (msg.qos == QOS1)
00625                     len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id);
00626                 else if (msg.qos == QOS2)
00627                     len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id);
00628                 if (len <= 0)
00629                     rc = FAILURE;
00630                 else
00631                     rc = sendPacket(len, timer);
00632                 if (rc == FAILURE)
00633                     goto exit; // there was a problem
00634             }
00635             break;
00636 #endif
00637         }
00638 #if MQTTCLIENT_QOS2
00639         case PUBREC:
00640         case PUBREL:
00641             unsigned short mypacketid;
00642             unsigned char dup, type;
00643             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00644                 rc = FAILURE;
00645             else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE,
00646                                  (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0)
00647                 rc = FAILURE;
00648             else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet
00649                 rc = FAILURE; // there was a problem
00650             if (rc == FAILURE)
00651                 goto exit; // there was a problem
00652             if (packet_type == PUBREL)
00653                 freeQoS2msgid(mypacketid);
00654             break;
00655 
00656         case PUBCOMP:
00657             break;
00658 #endif
00659         case PINGRESP:
00660             ping_outstanding = false;
00661             break;
00662     }
00663 
00664     if (keepalive() != SUCCESS)
00665         //check only keepalive FAILURE status so that previous FAILURE status can be considered as FAULT
00666         rc = FAILURE;
00667 
00668 exit:
00669     if (rc == SUCCESS)
00670         rc = packet_type;
00671     else if (isconnected)
00672         closeSession();
00673     return rc;
00674 }
00675 
00676 
00677 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00678 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive()
00679 {
00680     int rc = SUCCESS;
00681     static Timer ping_sent;
00682 
00683     if (keepAliveInterval == 0)
00684         goto exit;
00685     
00686     if (ping_outstanding)
00687     {
00688         if (ping_sent.expired())
00689         {
00690             rc = FAILURE; // session failure
00691             #if defined(MQTT_DEBUG)
00692                 DEBUG("PINGRESP not received in keepalive interval\r\n");
00693             #endif
00694         }
00695     }
00696     else if (last_sent.expired() || last_received.expired())
00697     {
00698         Timer timer(1000);
00699         int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
00700         if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet
00701         {
00702             ping_outstanding = true;
00703             ping_sent.countdown(this->keepAliveInterval);
00704         }
00705     }
00706 exit:
00707     return rc;
00708 }
00709 
00710 
00711 // only used in single-threaded mode where one command at a time is in process
00712 template<class Network, class Timer, int a, int b>
00713 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer)
00714 {
00715     int rc = FAILURE;
00716 
00717     do
00718     {
00719         if (timer.expired())
00720             break; // we timed out
00721         rc = cycle(timer);
00722     }
00723     while (rc != packet_type && rc >= 0);
00724 
00725     return rc;
00726 }
00727 
00728 
00729 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00730 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options, connackData& data)
00731 {
00732     Timer connect_timer(command_timeout_ms);
00733     int rc = FAILURE;
00734     int len = 0;
00735 
00736     if (isconnected) // don't send connect packet again if we are already connected
00737         goto exit;
00738 
00739     this->keepAliveInterval = options.keepAliveInterval;
00740     this->cleansession = options.cleansession;
00741     if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0)
00742         goto exit;
00743     if ((rc = sendPacket(len, connect_timer)) != SUCCESS)  // send the connect packet
00744         goto exit; // there was a problem
00745 
00746     if (this->keepAliveInterval > 0)
00747         last_received.countdown(this->keepAliveInterval);
00748     // this will be a blocking call, wait for the connack
00749     if (waitfor(CONNACK, connect_timer) == CONNACK)
00750     {
00751         data.rc = 0;
00752         data.sessionPresent = false;
00753         if (MQTTDeserialize_connack((unsigned char*)&data.sessionPresent,
00754                             (unsigned char*)&data.rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
00755             rc = data.rc;
00756         else
00757             rc = FAILURE;
00758     }
00759     else
00760         rc = FAILURE;
00761 
00762 #if MQTTCLIENT_QOS2
00763     // resend any inflight publish
00764     if (inflightMsgid > 0 && inflightQoS == QOS2 && pubrel)
00765     {
00766         if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0)
00767             rc = FAILURE;
00768         else
00769             rc = publish(len, connect_timer, inflightQoS);
00770     }
00771     else
00772 #endif
00773 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00774     if (inflightMsgid > 0)
00775     {
00776         memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE);
00777         rc = publish(inflightLen, connect_timer, inflightQoS);
00778     }
00779 #endif
00780 
00781 exit:
00782     if (rc == SUCCESS)
00783     {
00784         isconnected = true;
00785         ping_outstanding = false;
00786     }
00787     return rc;
00788 }
00789 
00790 
00791 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00792 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options)
00793 {
00794     connackData data;
00795     return connect(options, data);
00796 }
00797 
00798 
00799 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00800 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect()
00801 {
00802     MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
00803     return connect(default_options);
00804 }
00805 
00806 
00807 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
00808 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::setMessageHandler(const char* topicFilter, messageHandler messageHandler)
00809 {
00810     int rc = FAILURE;
00811     int i = -1;
00812 
00813     // first check for an existing matching slot
00814     for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00815     {
00816         if (messageHandlers[i].topicFilter != 0 && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0)
00817         {
00818             if (messageHandler == 0) // remove existing
00819             {
00820                 messageHandlers[i].topicFilter = 0;
00821                 messageHandlers[i].fp.detach();
00822             }
00823             rc = SUCCESS; // return i when adding new subscription
00824             break;
00825         }
00826     }
00827     // if no existing, look for empty slot (unless we are removing)
00828     if (messageHandler != 0) {
00829         if (rc == FAILURE)
00830         {
00831             for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00832             {
00833                 if (messageHandlers[i].topicFilter == 0)
00834                 {
00835                     rc = SUCCESS;
00836                     break;
00837                 }
00838             }
00839         }
00840         if (i < MAX_MESSAGE_HANDLERS)
00841         {
00842             messageHandlers[i].topicFilter = topicFilter;
00843             messageHandlers[i].fp.attach(messageHandler);
00844         }
00845     }
00846     return rc;
00847 }
00848 
00849 
00850 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
00851 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter,
00852      enum QoS qos, messageHandler messageHandler, subackData& data)
00853 {
00854     int rc = FAILURE;
00855     Timer timer(command_timeout_ms);
00856     int len = 0;
00857     MQTTString topic = {(char*)topicFilter, {0, 0}};
00858 
00859     if (!isconnected)
00860         goto exit;
00861 
00862     len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
00863     if (len <= 0)
00864         goto exit;
00865     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
00866         goto exit;             // there was a problem
00867 
00868     if (waitfor(SUBACK, timer) == SUBACK)      // wait for suback
00869     {
00870         int count = 0;
00871         unsigned short mypacketid;
00872         data.grantedQoS = 0;
00873         if (MQTTDeserialize_suback(&mypacketid, 1, &count, &data.grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
00874         {
00875             if (data.grantedQoS != 0x80)
00876                 rc = setMessageHandler(topicFilter, messageHandler);
00877         }
00878     }
00879     else
00880         rc = FAILURE;
00881 
00882 exit:
00883     if (rc == FAILURE)
00884         closeSession();
00885     return rc;
00886 }
00887 
00888 
00889 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
00890 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
00891 {
00892     subackData data;
00893     return subscribe(topicFilter, qos, messageHandler, data);
00894 }
00895 
00896 
00897 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
00898 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter)
00899 {
00900     int rc = FAILURE;
00901     Timer timer(command_timeout_ms);
00902     MQTTString topic = {(char*)topicFilter, {0, 0}};
00903     int len = 0;
00904 
00905     if (!isconnected)
00906         goto exit;
00907 
00908     if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0)
00909         goto exit;
00910     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet
00911         goto exit; // there was a problem
00912 
00913     if (waitfor(UNSUBACK, timer) == UNSUBACK)
00914     {
00915         unsigned short mypacketid;  // should be the same as the packetid above
00916         if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
00917         {
00918             // remove the subscription message handler associated with this topic, if there is one
00919             setMessageHandler(topicFilter, 0);
00920         }
00921     }
00922     else
00923         rc = FAILURE;
00924 
00925 exit:
00926     if (rc != SUCCESS)
00927         closeSession();
00928     return rc;
00929 }
00930 
00931 
00932 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00933 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos)
00934 {
00935     int rc;
00936 
00937     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet
00938         goto exit; // there was a problem
00939 
00940 #if MQTTCLIENT_QOS1
00941     if (qos == QOS1)
00942     {
00943         if (waitfor(PUBACK, timer) == PUBACK)
00944         {
00945             unsigned short mypacketid;
00946             unsigned char dup, type;
00947             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00948                 rc = FAILURE;
00949             else if (inflightMsgid == mypacketid)
00950                 inflightMsgid = 0;
00951         }
00952         else
00953             rc = FAILURE;
00954     }
00955 #endif
00956 #if MQTTCLIENT_QOS2
00957     else if (qos == QOS2)
00958     {
00959         if (waitfor(PUBCOMP, timer) == PUBCOMP)
00960         {
00961             unsigned short mypacketid;
00962             unsigned char dup, type;
00963             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00964                 rc = FAILURE;
00965             else if (inflightMsgid == mypacketid)
00966                 inflightMsgid = 0;
00967         }
00968         else
00969             rc = FAILURE;
00970     }
00971 #endif
00972 
00973 exit:
00974     if (rc != SUCCESS)
00975         closeSession();
00976     return rc;
00977 }
00978 
00979 
00980 
00981 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00982 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained)
00983 {
00984     int rc = FAILURE;
00985     Timer timer(command_timeout_ms);
00986     MQTTString topicString = MQTTString_initializer;
00987     int len = 0;
00988 
00989     if (!isconnected)
00990         goto exit;
00991 
00992     topicString.cstring = (char*)topicName;
00993 
00994 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00995     if (qos == QOS1 || qos == QOS2)
00996         id = packetid.getNext();
00997 #endif
00998 
00999     len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id,
01000               topicString, (unsigned char*)payload, payloadlen);
01001     if (len <= 0)
01002         goto exit;
01003 
01004 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
01005     if (!cleansession)
01006     {
01007         memcpy(pubbuf, sendbuf, len);
01008         inflightMsgid = id;
01009         inflightLen = len;
01010         inflightQoS = qos;
01011 #if MQTTCLIENT_QOS2
01012         pubrel = false;
01013 #endif
01014     }
01015 #endif
01016 
01017     rc = publish(len, timer, qos);
01018 exit:
01019     return rc;
01020 }
01021 
01022 
01023 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
01024 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained)
01025 {
01026     unsigned short id = 0;  // dummy - not used for anything
01027     return publish(topicName, payload, payloadlen, id, qos, retained);
01028 }
01029 
01030 
01031 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
01032 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message)
01033 {
01034     return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained);
01035 }
01036 
01037 
01038 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
01039 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect()
01040 {
01041     int rc = FAILURE;
01042     Timer timer(command_timeout_ms);     // we might wait for incomplete incoming publishes to complete
01043     int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE);
01044     if (len > 0)
01045         rc = sendPacket(len, timer);            // send the disconnect packet
01046     closeSession();
01047     return rc;
01048 }
01049 
01050 #endif