Jack Hansdampf / mbed-mqtt-GSOE1

Dependents:   ESP8266MQTT

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTClient.h Source File

MQTTClient.h

00001 /*******************************************************************************
00002  * Copyright (c) 2014, 2017 IBM Corp.
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors:
00014  *    Ian Craggs - initial API and implementation and/or initial documentation
00015  *    Ian Craggs - fix for bug 458512 - QoS 2 messages
00016  *    Ian Craggs - fix for bug 460389 - send loop uses wrong length
00017  *    Ian Craggs - fix for bug 464169 - clearing subscriptions
00018  *    Ian Craggs - fix for bug 464551 - enums and ints can be different size
00019  *    Mark Sonnentag - fix for bug 475204 - inefficient instantiation of Timer
00020  *    Ian Craggs - fix for bug 475749 - packetid modified twice
00021  *    Ian Craggs - add ability to set message handler separately #6
00022  *******************************************************************************/
00023 
00024 #if !defined(MQTTCLIENT_H)
00025 #define MQTTCLIENT_H
00026 
00027 #include "FP.h"
00028 #include "MQTTPacket.h"
00029 #include <stdio.h>
00030 #include <string.h>
00031 #include "MQTTLogging.h"
00032 
00033 #if !defined(MQTTCLIENT_QOS1)
00034     #define MQTTCLIENT_QOS1 1
00035 #endif
00036 #if !defined(MQTTCLIENT_QOS2)
00037     #define MQTTCLIENT_QOS2 0
00038 #endif
00039 
00040 namespace MQTT
00041 {
00042 
00043 
00044 enum QoS { QOS0, QOS1, QOS2 };
00045 
00046 // all failure return codes must be negative
00047 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
00048 
00049 
00050 struct Message
00051 {
00052     enum QoS qos;
00053     bool retained;
00054     bool dup;
00055     unsigned short id;
00056     void *payload;
00057     size_t payloadlen;
00058 };
00059 
00060 
00061 struct MessageData
00062 {
00063     MessageData(MQTTString &aTopicName, struct Message &aMessage)  : message(aMessage), topicName(aTopicName)
00064     { }
00065 
00066     struct Message &message;
00067     MQTTString &topicName;
00068 };
00069 
00070 
00071 struct connackData
00072 {
00073     int rc;
00074     bool sessionPresent;
00075 };
00076 
00077 
00078 struct subackData
00079 {
00080     int grantedQoS;
00081 };
00082 
00083 
00084 class PacketId
00085 {
00086 public:
00087     PacketId()
00088     {
00089         next = 0;
00090     }
00091 
00092     int getNext()
00093     {
00094         return next = (next == MAX_PACKET_ID) ? 1 : next + 1;
00095     }
00096 
00097 private:
00098     static const int MAX_PACKET_ID = 65535;
00099     int next;
00100 };
00101 
00102 
00103 /**
00104  * @class Client
00105  * @brief blocking, non-threaded MQTT client API
00106  *
00107  * This version of the API blocks on all method calls, until they are complete.  This means that only one
00108  * MQTT request can be in process at any one time.
00109  * @param Network a network class which supports send, receive
00110  * @param Timer a timer class with the methods:
00111  */
00112 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5>
00113 class Client
00114 {
00115 
00116 public:
00117 
00118     typedef void (*messageHandler)(MessageData&);
00119 
00120     /** Construct the client
00121      *  @param network - pointer to an instance of the Network class - must be connected to the endpoint
00122      *      before calling MQTT connect
00123      *  @param limits an instance of the Limit class - to alter limits as required
00124      */
00125     Client(Network& network, unsigned int command_timeout_ms = 30000);
00126 
00127     /** Set the default message handling callback - used for any message which does not match a subscription message handler
00128      *  @param mh - pointer to the callback function.  Set to 0 to remove.
00129      */
00130     void setDefaultMessageHandler(messageHandler mh)
00131     {
00132         if (mh != 0)
00133             defaultMessageHandler.attach(mh);
00134         else
00135             defaultMessageHandler.detach();
00136     }
00137 
00138     /** Set a message handling callback.  This can be used outside of the the subscribe method.
00139      *  @param topicFilter - a topic pattern which can include wildcards
00140      *  @param mh - pointer to the callback function. If 0, removes the callback if any
00141      */
00142     int setMessageHandler(const char* topicFilter, messageHandler mh);
00143 
00144     /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
00145      *  The nework object must be connected to the network endpoint before calling this
00146      *  Default connect options are used
00147      *  @return success code -
00148      */
00149     int connect();
00150 
00151     /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
00152      *  The nework object must be connected to the network endpoint before calling this
00153      *  @param options - connect options
00154      *  @return success code -
00155      */
00156     int connect(MQTTPacket_connectData& options);
00157 
00158     /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
00159      *  The nework object must be connected to the network endpoint before calling this
00160      *  @param options - connect options
00161      *  @param connackData - connack data to be returned
00162      *  @return success code -
00163      */
00164     int connect(MQTTPacket_connectData& options, connackData& data);
00165 
00166     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00167      *  @param topic - the topic to publish to
00168      *  @param message - the message to send
00169      *  @return success code -
00170      */
00171     int publish(const char* topicName, Message& message);
00172 
00173     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00174      *  @param topic - the topic to publish to
00175      *  @param payload - the data to send
00176      *  @param payloadlen - the length of the data
00177      *  @param qos - the QoS to send the publish at
00178      *  @param retained - whether the message should be retained
00179      *  @return success code -
00180      */
00181     int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false);
00182 
00183     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00184      *  @param topic - the topic to publish to
00185      *  @param payload - the data to send
00186      *  @param payloadlen - the length of the data
00187      *  @param id - the packet id used - returned
00188      *  @param qos - the QoS to send the publish at
00189      *  @param retained - whether the message should be retained
00190      *  @return success code -
00191      */
00192     int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false);
00193 
00194     /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
00195      *  @param topicFilter - a topic pattern which can include wildcards
00196      *  @param qos - the MQTT QoS to subscribe at
00197      *  @param mh - the callback function to be invoked when a message is received for this subscription
00198      *  @return success code -
00199      */
00200     int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh);
00201 
00202     /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
00203      *  @param topicFilter - a topic pattern which can include wildcards
00204      *  @param qos - the MQTT QoS to subscribe at©
00205      *  @param mh - the callback function to be invoked when a message is received for this subscription
00206      *  @param
00207      *  @return success code -
00208      */
00209     int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh, subackData &data);
00210 
00211     /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback
00212      *  @param topicFilter - a topic pattern which can include wildcards
00213      *  @return success code -
00214      */
00215     int unsubscribe(const char* topicFilter);
00216 
00217     /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state
00218      *  @return success code -
00219      */
00220     int disconnect();
00221 
00222     /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive
00223      *  yield can be called if no other MQTT operation is needed.  This will also allow messages to be
00224      *  received.
00225      *  @param timeout_ms the time to wait, in milliseconds
00226      *  @return success code - on failure, this means the client has disconnected
00227      */
00228     int yield(unsigned long timeout_ms = 1000L);
00229 
00230     /** Is the client connected?
00231      *  @return flag - is the client connected or not?
00232      */
00233     bool isConnected()
00234     {
00235         return isconnected;
00236     }
00237 
00238 private:
00239 
00240     void closeSession();
00241     void cleanSession();
00242     int cycle(Timer& timer);
00243     int waitfor(int packet_type, Timer& timer);
00244     int keepalive();
00245     int publish(int len, Timer& timer, enum QoS qos);
00246 
00247     int decodePacket(int* value, int timeout);
00248     int readPacket(Timer& timer);
00249     int sendPacket(int length, Timer& timer);
00250     int deliverMessage(MQTTString& topicName, Message& message);
00251     bool isTopicMatched(char* topicFilter, MQTTString& topicName);
00252 
00253     Network& ipstack;
00254     unsigned long command_timeout_ms;
00255 
00256     unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
00257     unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
00258 
00259     Timer last_sent, last_received;
00260     unsigned int keepAliveInterval;
00261     bool ping_outstanding;
00262     bool cleansession;
00263 
00264     PacketId packetid;
00265 
00266     struct MessageHandlers
00267     {
00268         const char* topicFilter;
00269         FP<void, MessageData&>  fp;
00270     } messageHandlers[MAX_MESSAGE_HANDLERS];      // Message handlers are indexed by subscription topic
00271 
00272     FP<void, MessageData&>  defaultMessageHandler;
00273 
00274     bool isconnected;
00275 
00276 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00277     unsigned char pubbuf[MAX_MQTT_PACKET_SIZE];  // store the last publish for sending on reconnect
00278     int inflightLen;
00279     unsigned short inflightMsgid;
00280     enum QoS inflightQoS;
00281 #endif
00282 
00283 #if MQTTCLIENT_QOS2
00284     bool pubrel;
00285     #if !defined(MAX_INCOMING_QOS2_MESSAGES)
00286         #define MAX_INCOMING_QOS2_MESSAGES 10
00287     #endif
00288     unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES];
00289     bool isQoS2msgidFree(unsigned short id);
00290     bool useQoS2msgid(unsigned short id);
00291     void freeQoS2msgid(unsigned short id);
00292 #endif
00293 
00294 };
00295 
00296 }
00297 
00298 
00299 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
00300 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::cleanSession()
00301 {
00302     for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00303         messageHandlers[i].topicFilter = 0;
00304 
00305 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00306     inflightMsgid = 0;
00307     inflightQoS = QOS0;
00308 #endif
00309 
00310 #if MQTTCLIENT_QOS2
00311     pubrel = false;
00312     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00313         incomingQoS2messages[i] = 0;
00314 #endif
00315 }
00316 
00317 
00318 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
00319 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::closeSession()
00320 {
00321     ping_outstanding = false;
00322     isconnected = false;
00323     if (cleansession)
00324         cleanSession();
00325 }
00326 
00327 
00328 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
00329 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms)  : ipstack(network), packetid()
00330 {
00331     this->command_timeout_ms = command_timeout_ms;
00332     cleansession = true;
00333       closeSession();
00334 }
00335 
00336 
00337 #if MQTTCLIENT_QOS2
00338 template<class Network, class Timer, int a, int b>
00339 bool MQTT::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id)
00340 {
00341     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00342     {
00343         if (incomingQoS2messages[i] == id)
00344             return false;
00345     }
00346     return true;
00347 }
00348 
00349 
00350 template<class Network, class Timer, int a, int b>
00351 bool MQTT::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id)
00352 {
00353     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00354     {
00355         if (incomingQoS2messages[i] == 0)
00356         {
00357             incomingQoS2messages[i] = id;
00358             return true;
00359         }
00360     }
00361     return false;
00362 }
00363 
00364 
00365 template<class Network, class Timer, int a, int b>
00366 void MQTT::Client<Network, Timer, a, b>::freeQoS2msgid(unsigned short id)
00367 {
00368     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00369     {
00370         if (incomingQoS2messages[i] == id)
00371         {
00372             incomingQoS2messages[i] = 0;
00373             return;
00374         }
00375     }
00376 }
00377 #endif
00378 
00379 
00380 template<class Network, class Timer, int a, int b>
00381 int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer)
00382 {
00383     int rc = FAILURE,
00384         sent = 0;
00385 
00386     while (sent < length)
00387     {
00388         rc = ipstack.write(&sendbuf[sent], length - sent, timer.left_ms());
00389         if (rc < 0)  // there was an error writing the data
00390             break;
00391         sent += rc;
00392         if (timer.expired()) // only check expiry after at least one attempt to write
00393             break;
00394     }
00395     if (sent == length)
00396     {
00397         if (this->keepAliveInterval > 0)
00398             last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet
00399         rc = SUCCESS;
00400     }
00401     else
00402         rc = FAILURE;
00403 
00404 #if defined(MQTT_DEBUG)
00405     char printbuf[150];
00406     DEBUG("Rc %d from sending packet %s\r\n", rc,
00407         MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length));
00408 #endif
00409     return rc;
00410 }
00411 
00412 
00413 template<class Network, class Timer, int a, int b>
00414 int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout)
00415 {
00416     unsigned char c;
00417     int multiplier = 1;
00418     int len = 0;
00419     const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
00420 
00421     *value = 0;
00422     do
00423     {
00424         int rc = MQTTPACKET_READ_ERROR;
00425 
00426         if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
00427         {
00428             rc = MQTTPACKET_READ_ERROR; /* bad data */
00429             goto exit;
00430         }
00431         rc = ipstack.read(&c, 1, timeout);
00432         if (rc != 1)
00433             goto exit;
00434         *value += (c & 127) * multiplier;
00435         multiplier *= 128;
00436     } while ((c & 128) != 0);
00437 exit:
00438     return len;
00439 }
00440 
00441 
00442 /**
00443  * If any read fails in this method, then we should disconnect from the network, as on reconnect
00444  * the packets can be retried.
00445  * @param timeout the max time to wait for the packet read to complete, in milliseconds
00446  * @return the MQTT packet type, 0 if none, -1 if error
00447  */
00448 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00449 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::readPacket(Timer& timer)
00450 {
00451     int rc = FAILURE;
00452     MQTTHeader header = {0};
00453     int len = 0;
00454     int rem_len = 0;
00455 
00456     /* 1. read the header byte.  This has the packet type in it */
00457     rc = ipstack.read(readbuf, 1, timer.left_ms());
00458     if (rc != 1)
00459     {
00460         rc = 0; // timed out reading packet
00461         goto exit;
00462     }
00463 
00464     len = 1;
00465     /* 2. read the remaining length.  This is variable in itself */
00466     decodePacket(&rem_len, timer.left_ms());
00467     len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */
00468 
00469     if (rem_len > (MAX_MQTT_PACKET_SIZE - len))
00470     {
00471         rc = BUFFER_OVERFLOW;
00472         goto exit;
00473     }
00474 
00475     /* 3. read the rest of the buffer using a callback to supply the rest of the data */
00476     if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len))
00477     {
00478         rc = FAILURE;
00479         goto exit;
00480     }
00481 
00482     header.byte = readbuf[0];
00483     rc = header.bits.type;
00484     if (this->keepAliveInterval > 0)
00485         last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet
00486 exit:
00487 
00488 #if defined(MQTT_DEBUG)
00489     if (rc >= 0)
00490     {
00491         char printbuf[50];
00492         DEBUG("Rc %d receiving packet %s\r\n", rc,
00493             MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len));
00494     }
00495 #endif
00496     return rc;
00497 }
00498 
00499 
00500 // assume topic filter and name is in correct format
00501 // # can only be at end
00502 // + and # can only be next to separator
00503 template<class Network, class Timer, int a, int b>
00504 bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName)
00505 {
00506     char* curf = topicFilter;
00507     char* curn = topicName.lenstring.data;
00508     char* curn_end = curn + topicName.lenstring.len;
00509 
00510     while (*curf && curn < curn_end)
00511     {
00512         if (*curn == '/' && *curf != '/')
00513             break;
00514         if (*curf != '+' && *curf != '#' && *curf != *curn)
00515             break;
00516         if (*curf == '+')
00517         {   // skip until we meet the next separator, or end of string
00518             char* nextpos = curn + 1;
00519             while (nextpos < curn_end && *nextpos != '/')
00520                 nextpos = ++curn + 1;
00521         }
00522         else if (*curf == '#')
00523             curn = curn_end - 1;    // skip until end of string
00524         curf++;
00525         curn++;
00526     };
00527 
00528     return (curn == curn_end) && (*curf == '\0');
00529 }
00530 
00531 
00532 
00533 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
00534 int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message)
00535 {
00536     int rc = FAILURE;
00537 
00538     // we have to find the right message handler - indexed by topic
00539     for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00540     {
00541         if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) ||
00542                 isTopicMatched((char*)messageHandlers[i].topicFilter, topicName)))
00543         {
00544             if (messageHandlers[i].fp.attached())
00545             {
00546                 MessageData md(topicName, message);
00547                 messageHandlers[i].fp(md);
00548                 rc = SUCCESS;
00549             }
00550         }
00551     }
00552 
00553     if (rc == FAILURE && defaultMessageHandler.attached())
00554     {
00555         MessageData md(topicName, message);
00556         defaultMessageHandler(md);
00557         rc = SUCCESS;
00558     }
00559 
00560     return rc;
00561 }
00562 
00563 
00564 
00565 template<class Network, class Timer, int a, int b>
00566 int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms)
00567 {
00568     int rc = SUCCESS;
00569     Timer timer;
00570 
00571     timer.countdown_ms(timeout_ms);
00572     while (!timer.expired())
00573     {
00574         if (cycle(timer) < 0)
00575         {
00576             rc = FAILURE;
00577             break;
00578         }
00579     }
00580 
00581     return rc;
00582 }
00583 
00584 
00585 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00586 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer)
00587 {
00588     // get one piece of work off the wire and one pass through
00589     int len = 0,
00590         rc = SUCCESS;
00591 
00592     int packet_type = readPacket(timer);    // read the socket, see what work is due
00593 
00594     switch (packet_type)
00595     {
00596         default:
00597             // no more data to read, unrecoverable. Or read packet fails due to unexpected network error
00598             rc = packet_type;
00599             goto exit;
00600         case 0: // timed out reading packet
00601             break;
00602         case CONNACK:
00603         case PUBACK:
00604         case SUBACK:
00605         case UNSUBACK:
00606             break;
00607         case PUBLISH:
00608         {
00609             MQTTString topicName = MQTTString_initializer;
00610             Message msg;
00611             int intQoS;
00612             msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */
00613             if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName,
00614                                  (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1) {
00615                 rc = FAILURE;
00616                 goto exit;
00617             }
00618             msg.qos = (enum QoS)intQoS;
00619 #if MQTTCLIENT_QOS2
00620             if (msg.qos != QOS2)
00621 #endif
00622                 deliverMessage(topicName, msg);
00623 #if MQTTCLIENT_QOS2
00624             else if (isQoS2msgidFree(msg.id))
00625             {
00626                 if (useQoS2msgid(msg.id))
00627                     deliverMessage(topicName, msg);
00628                 else
00629                     WARN("Maximum number of incoming QoS2 messages exceeded");
00630             }
00631 #endif
00632 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00633             if (msg.qos != QOS0)
00634             {
00635                 if (msg.qos == QOS1)
00636                     len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id);
00637                 else if (msg.qos == QOS2)
00638                     len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id);
00639                 if (len <= 0)
00640                     rc = FAILURE;
00641                 else
00642                     rc = sendPacket(len, timer);
00643                 if (rc == FAILURE)
00644                     goto exit; // there was a problem
00645             }
00646             break;
00647 #endif
00648         }
00649 #if MQTTCLIENT_QOS2
00650         case PUBREC:
00651         case PUBREL:
00652             unsigned short mypacketid;
00653             unsigned char dup, type;
00654             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00655                 rc = FAILURE;
00656             else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE,
00657                                  (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0)
00658                 rc = FAILURE;
00659             else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet
00660                 rc = FAILURE; // there was a problem
00661             if (rc == FAILURE)
00662                 goto exit; // there was a problem
00663             if (packet_type == PUBREL)
00664                 freeQoS2msgid(mypacketid);
00665             break;
00666 
00667         case PUBCOMP:
00668             break;
00669 #endif
00670         case PINGRESP:
00671             ping_outstanding = false;
00672             break;
00673     }
00674 
00675     if (keepalive() != SUCCESS)
00676         //check only keepalive FAILURE status so that previous FAILURE status can be considered as FAULT
00677         rc = FAILURE;
00678 
00679 exit:
00680     if (rc == SUCCESS)
00681         rc = packet_type;
00682     else if (isconnected)
00683         closeSession();
00684     return rc;
00685 }
00686 
00687 
00688 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00689 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive()
00690 {
00691     int rc = SUCCESS;
00692     static Timer ping_sent;
00693 
00694     if (keepAliveInterval == 0)
00695         goto exit;
00696 
00697     if (ping_outstanding)
00698     {
00699         if (ping_sent.expired())
00700         {
00701             rc = FAILURE; // session failure
00702             #if defined(MQTT_DEBUG)
00703                 DEBUG("PINGRESP not received in keepalive interval\r\n");
00704             #endif
00705         }
00706     }
00707     else if (last_sent.expired() || last_received.expired())
00708     {
00709         Timer timer(1000);
00710         int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
00711         if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet
00712         {
00713             ping_outstanding = true;
00714             ping_sent.countdown(this->keepAliveInterval);
00715         }
00716     }
00717 exit:
00718     return rc;
00719 }
00720 
00721 
00722 // only used in single-threaded mode where one command at a time is in process
00723 template<class Network, class Timer, int a, int b>
00724 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer)
00725 {
00726     int rc = FAILURE;
00727 
00728     do
00729     {
00730         if (timer.expired())
00731             break; // we timed out
00732         rc = cycle(timer);
00733     }
00734     while (rc != packet_type && rc >= 0);
00735 
00736     return rc;
00737 }
00738 
00739 
00740 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00741 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options, connackData& data)
00742 {
00743     Timer connect_timer(command_timeout_ms);
00744     int rc = FAILURE;
00745     int len = 0;
00746 
00747     if (isconnected) // don't send connect packet again if we are already connected
00748         goto exit;
00749 
00750     this->keepAliveInterval = options.keepAliveInterval;
00751     this->cleansession = options.cleansession;
00752     if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0)
00753         goto exit;
00754     if ((rc = sendPacket(len, connect_timer)) != SUCCESS)  // send the connect packet
00755         goto exit; // there was a problem
00756 
00757     if (this->keepAliveInterval > 0)
00758         last_received.countdown(this->keepAliveInterval);
00759     // this will be a blocking call, wait for the connack
00760     if (waitfor(CONNACK, connect_timer) == CONNACK)
00761     {
00762         data.rc = 0;
00763         data.sessionPresent = false;
00764         if (MQTTDeserialize_connack((unsigned char*)&data.sessionPresent,
00765                             (unsigned char*)&data.rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
00766             rc = data.rc;
00767         else
00768             rc = FAILURE;
00769     }
00770     else
00771         rc = FAILURE;
00772 
00773 #if MQTTCLIENT_QOS2
00774     // resend any inflight publish
00775     if (inflightMsgid > 0 && inflightQoS == QOS2 && pubrel)
00776     {
00777         if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0)
00778             rc = FAILURE;
00779         else
00780             rc = publish(len, connect_timer, inflightQoS);
00781     }
00782     else
00783 #endif
00784 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00785     if (inflightMsgid > 0)
00786     {
00787         memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE);
00788         rc = publish(inflightLen, connect_timer, inflightQoS);
00789     }
00790 #endif
00791 
00792 exit:
00793     if (rc == SUCCESS)
00794     {
00795         isconnected = true;
00796         ping_outstanding = false;
00797     }
00798     return rc;
00799 }
00800 
00801 
00802 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00803 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options)
00804 {
00805     connackData data;
00806     return connect(options, data);
00807 }
00808 
00809 
00810 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00811 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect()
00812 {
00813     MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
00814     return connect(default_options);
00815 }
00816 
00817 
00818 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
00819 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::setMessageHandler(const char* topicFilter, messageHandler messageHandler)
00820 {
00821     int rc = FAILURE;
00822     int i = -1;
00823 
00824     // first check for an existing matching slot
00825     for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00826     {
00827         if (messageHandlers[i].topicFilter != 0 && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0)
00828         {
00829             if (messageHandler == 0) // remove existing
00830             {
00831                 messageHandlers[i].topicFilter = 0;
00832                 messageHandlers[i].fp.detach();
00833             }
00834             rc = SUCCESS; // return i when adding new subscription
00835             break;
00836         }
00837     }
00838     // if no existing, look for empty slot (unless we are removing)
00839     if (messageHandler != 0) {
00840         if (rc == FAILURE)
00841         {
00842             for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00843             {
00844                 if (messageHandlers[i].topicFilter == 0)
00845                 {
00846                     rc = SUCCESS;
00847                     break;
00848                 }
00849             }
00850         }
00851         if (i < MAX_MESSAGE_HANDLERS)
00852         {
00853             messageHandlers[i].topicFilter = topicFilter;
00854             messageHandlers[i].fp.attach(messageHandler);
00855         }
00856     }
00857     return rc;
00858 }
00859 
00860 
00861 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
00862 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter,
00863      enum QoS qos, messageHandler messageHandler, subackData& data)
00864 {
00865     int rc = FAILURE;
00866     Timer timer(command_timeout_ms);
00867     int len = 0;
00868     MQTTString topic = {(char*)topicFilter, {0, 0}};
00869 
00870     if (!isconnected)
00871         goto exit;
00872 
00873     len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
00874     if (len <= 0)
00875         goto exit;
00876     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
00877         goto exit;             // there was a problem
00878 
00879     if (waitfor(SUBACK, timer) == SUBACK)      // wait for suback
00880     {
00881         int count = 0;
00882         unsigned short mypacketid;
00883         data.grantedQoS = 0;
00884         if (MQTTDeserialize_suback(&mypacketid, 1, &count, &data.grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
00885         {
00886             if (data.grantedQoS != 0x80)
00887                 rc = setMessageHandler(topicFilter, messageHandler);
00888         }
00889     }
00890     else
00891         rc = FAILURE;
00892 
00893 exit:
00894     if (rc == FAILURE)
00895         closeSession();
00896     return rc;
00897 }
00898 
00899 
00900 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
00901 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
00902 {
00903     subackData data;
00904     return subscribe(topicFilter, qos, messageHandler, data);
00905 }
00906 
00907 
00908 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
00909 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter)
00910 {
00911     int rc = FAILURE;
00912     Timer timer(command_timeout_ms);
00913     MQTTString topic = {(char*)topicFilter, {0, 0}};
00914     int len = 0;
00915 
00916     if (!isconnected)
00917         goto exit;
00918 
00919     if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0)
00920         goto exit;
00921     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet
00922         goto exit; // there was a problem
00923 
00924     if (waitfor(UNSUBACK, timer) == UNSUBACK)
00925     {
00926         unsigned short mypacketid;  // should be the same as the packetid above
00927         if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
00928         {
00929             // remove the subscription message handler associated with this topic, if there is one
00930             setMessageHandler(topicFilter, 0);
00931         }
00932     }
00933     else
00934         rc = FAILURE;
00935 
00936 exit:
00937     if (rc != SUCCESS)
00938         closeSession();
00939     return rc;
00940 }
00941 
00942 
00943 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00944 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos)
00945 {
00946     int rc;
00947 
00948     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet
00949         goto exit; // there was a problem
00950 
00951 #if MQTTCLIENT_QOS1
00952     if (qos == QOS1)
00953     {
00954         if (waitfor(PUBACK, timer) == PUBACK)
00955         {
00956             unsigned short mypacketid;
00957             unsigned char dup, type;
00958             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00959                 rc = FAILURE;
00960             else if (inflightMsgid == mypacketid)
00961                 inflightMsgid = 0;
00962         }
00963         else
00964             rc = FAILURE;
00965     }
00966 #endif
00967 #if MQTTCLIENT_QOS2
00968     else if (qos == QOS2)
00969     {
00970         if (waitfor(PUBCOMP, timer) == PUBCOMP)
00971         {
00972             unsigned short mypacketid;
00973             unsigned char dup, type;
00974             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00975                 rc = FAILURE;
00976             else if (inflightMsgid == mypacketid)
00977                 inflightMsgid = 0;
00978         }
00979         else
00980             rc = FAILURE;
00981     }
00982 #endif
00983 
00984 exit:
00985     if (rc != SUCCESS)
00986         closeSession();
00987     return rc;
00988 }
00989 
00990 
00991 
00992 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00993 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained)
00994 {
00995     int rc = FAILURE;
00996     Timer timer(command_timeout_ms);
00997     MQTTString topicString = MQTTString_initializer;
00998     int len = 0;
00999 
01000     if (!isconnected)
01001         goto exit;
01002 
01003     topicString.cstring = (char*)topicName;
01004 
01005 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
01006     if (qos == QOS1 || qos == QOS2)
01007         id = packetid.getNext();
01008 #endif
01009 
01010     len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id,
01011               topicString, (unsigned char*)payload, payloadlen);
01012     if (len <= 0)
01013         goto exit;
01014 
01015 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
01016     if (!cleansession)
01017     {
01018         memcpy(pubbuf, sendbuf, len);
01019         inflightMsgid = id;
01020         inflightLen = len;
01021         inflightQoS = qos;
01022 #if MQTTCLIENT_QOS2
01023         pubrel = false;
01024 #endif
01025     }
01026 #endif
01027 
01028     rc = publish(len, timer, qos);
01029 exit:
01030     return rc;
01031 }
01032 
01033 
01034 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
01035 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained)
01036 {
01037     unsigned short id = 0;  // dummy - not used for anything
01038     return publish(topicName, payload, payloadlen, id, qos, retained);
01039 }
01040 
01041 
01042 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
01043 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message)
01044 {
01045     return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained);
01046 }
01047 
01048 
01049 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
01050 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect()
01051 {
01052     int rc = FAILURE;
01053     Timer timer(command_timeout_ms);     // we might wait for incomplete incoming publishes to complete
01054     int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE);
01055     if (len > 0)
01056         rc = sendPacket(len, timer);            // send the disconnect packet
01057     closeSession();
01058     return rc;
01059 }
01060 
01061 #endif