d

Dependencies:   MQTTPacket FP

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTClient.h Source File

MQTTClient.h

00001 /*******************************************************************************
00002  * Copyright (c) 2014, 2017 IBM Corp.
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors:
00014  *    Ian Craggs - initial API and implementation and/or initial documentation
00015  *    Ian Craggs - fix for bug 458512 - QoS 2 messages
00016  *    Ian Craggs - fix for bug 460389 - send loop uses wrong length
00017  *    Ian Craggs - fix for bug 464169 - clearing subscriptions
00018  *    Ian Craggs - fix for bug 464551 - enums and ints can be different size
00019  *    Mark Sonnentag - fix for bug 475204 - inefficient instantiation of Timer
00020  *    Ian Craggs - fix for bug 475749 - packetid modified twice
00021  *    Ian Craggs - add ability to set message handler separately #6
00022  *******************************************************************************/
00023 
00024 #if !defined(MQTTCLIENT_H)
00025 #define MQTTCLIENT_H
00026 
00027 #include "FP.h"
00028 #include "MQTTPacket.h"
00029 #include <stdio.h>
00030 #include "MQTTLogging.h"
00031 
00032 #if !defined(MQTTCLIENT_QOS1)
00033     #define MQTTCLIENT_QOS1 1
00034 #endif
00035 #if !defined(MQTTCLIENT_QOS2)
00036     #define MQTTCLIENT_QOS2 0
00037 #endif
00038 
00039 namespace MQTT
00040 {
00041 
00042 
00043 enum QoS { QOS0, QOS1, QOS2 };
00044 
00045 // all failure return codes must be negative
00046 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
00047 
00048 
00049 struct Message
00050 {
00051     enum QoS qos;
00052     bool retained;
00053     bool dup;
00054     unsigned short id;
00055     void *payload;
00056     size_t payloadlen;
00057 };
00058 
00059 
00060 struct MessageData
00061 {
00062     MessageData(MQTTString &aTopicName, struct Message &aMessage)  : message(aMessage), topicName(aTopicName)
00063     { }
00064 
00065     struct Message &message;
00066     MQTTString &topicName;
00067 };
00068 
00069 
00070 struct connackData
00071 {
00072     int rc;
00073     bool sessionPresent;
00074 };
00075 
00076 
00077 struct subackData
00078 {
00079     int grantedQoS;
00080 };
00081 
00082 
00083 class PacketId
00084 {
00085 public:
00086     PacketId()
00087     {
00088         next = 0;
00089     }
00090 
00091     int getNext()
00092     {
00093         return next = (next == MAX_PACKET_ID) ? 1 : next + 1;
00094     }
00095 
00096 private:
00097     static const int MAX_PACKET_ID = 65535;
00098     int next;
00099 };
00100 
00101 
00102 /**
00103  * @class Client
00104  * @brief blocking, non-threaded MQTT client API
00105  *
00106  * This version of the API blocks on all method calls, until they are complete.  This means that only one
00107  * MQTT request can be in process at any one time.
00108  * @param Network a network class which supports send, receive
00109  * @param Timer a timer class with the methods:
00110  */
00111 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 11>
00112 class Client
00113 {
00114 
00115 public:
00116 
00117     typedef void (*messageHandler)(MessageData&);
00118 
00119     /** Construct the client
00120      *  @param network - pointer to an instance of the Network class - must be connected to the endpoint
00121      *      before calling MQTT connect
00122      *  @param limits an instance of the Limit class - to alter limits as required
00123      */
00124     Client(Network& network, unsigned int command_timeout_ms = 30000);
00125 
00126     /** Set the default message handling callback - used for any message which does not match a subscription message handler
00127      *  @param mh - pointer to the callback function.  Set to 0 to remove.
00128      */
00129     void setDefaultMessageHandler(messageHandler mh)
00130     {
00131         if (mh != 0)
00132             defaultMessageHandler.attach(mh);
00133         else
00134             defaultMessageHandler.detach();
00135     }
00136 
00137     /** Set a message handling callback.  This can be used outside of the the subscribe method.
00138      *  @param topicFilter - a topic pattern which can include wildcards
00139      *  @param mh - pointer to the callback function. If 0, removes the callback if any
00140      */
00141     int setMessageHandler(const char* topicFilter, messageHandler mh);
00142 
00143     /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
00144      *  The nework object must be connected to the network endpoint before calling this
00145      *  Default connect options are used
00146      *  @return success code -
00147      */
00148     int connect();
00149 
00150     /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
00151      *  The nework object must be connected to the network endpoint before calling this
00152      *  @param options - connect options
00153      *  @return success code -
00154      */
00155     int connect(MQTTPacket_connectData& options);
00156 
00157     /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
00158      *  The nework object must be connected to the network endpoint before calling this
00159      *  @param options - connect options
00160      *  @param connackData - connack data to be returned
00161      *  @return success code -
00162      */
00163     int connect(MQTTPacket_connectData& options, connackData& data);
00164 
00165     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00166      *  @param topic - the topic to publish to
00167      *  @param message - the message to send
00168      *  @return success code -
00169      */
00170     int publish(const char* topicName, Message& message);
00171 
00172     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00173      *  @param topic - the topic to publish to
00174      *  @param payload - the data to send
00175      *  @param payloadlen - the length of the data
00176      *  @param qos - the QoS to send the publish at
00177      *  @param retained - whether the message should be retained
00178      *  @return success code -
00179      */
00180     int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false);
00181 
00182     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00183      *  @param topic - the topic to publish to
00184      *  @param payload - the data to send
00185      *  @param payloadlen - the length of the data
00186      *  @param id - the packet id used - returned
00187      *  @param qos - the QoS to send the publish at
00188      *  @param retained - whether the message should be retained
00189      *  @return success code -
00190      */
00191     int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false);
00192 
00193     /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
00194      *  @param topicFilter - a topic pattern which can include wildcards
00195      *  @param qos - the MQTT QoS to subscribe at
00196      *  @param mh - the callback function to be invoked when a message is received for this subscription
00197      *  @return success code -
00198      */
00199     int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh);
00200 
00201     /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
00202      *  @param topicFilter - a topic pattern which can include wildcards
00203      *  @param qos - the MQTT QoS to subscribe at©
00204      *  @param mh - the callback function to be invoked when a message is received for this subscription
00205      *  @param
00206      *  @return success code -
00207      */
00208     int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh, subackData &data);
00209 
00210     /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback
00211      *  @param topicFilter - a topic pattern which can include wildcards
00212      *  @return success code -
00213      */
00214     int unsubscribe(const char* topicFilter);
00215 
00216     /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state
00217      *  @return success code -
00218      */
00219     int disconnect();
00220 
00221     /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive
00222      *  yield can be called if no other MQTT operation is needed.  This will also allow messages to be
00223      *  received.
00224      *  @param timeout_ms the time to wait, in milliseconds
00225      *  @return success code - on failure, this means the client has disconnected
00226      */
00227     int yield(unsigned long timeout_ms = 1000L);
00228 
00229     /** Is the client connected?
00230      *  @return flag - is the client connected or not?
00231      */
00232     bool isConnected()
00233     {
00234         return isconnected;
00235     }
00236 
00237 private:
00238 
00239     void closeSession();
00240     void cleanSession();
00241     int cycle(Timer& timer);
00242     int waitfor(int packet_type, Timer& timer);
00243     int keepalive();
00244     int publish(int len, Timer& timer, enum QoS qos);
00245 
00246     int decodePacket(int* value, int timeout);
00247     int readPacket(Timer& timer);
00248     int sendPacket(int length, Timer& timer);
00249     int deliverMessage(MQTTString& topicName, Message& message);
00250     bool isTopicMatched(char* topicFilter, MQTTString& topicName);
00251 
00252     Network& ipstack;
00253     unsigned long command_timeout_ms;
00254 
00255     unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
00256     unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
00257 
00258     Timer last_sent, last_received;
00259     unsigned int keepAliveInterval;
00260     bool ping_outstanding;
00261     bool cleansession;
00262 
00263     PacketId packetid;
00264 
00265     struct MessageHandlers
00266     {
00267         const char* topicFilter;
00268         FP<void, MessageData&> fp;
00269     } messageHandlers[MAX_MESSAGE_HANDLERS];      // Message handlers are indexed by subscription topic
00270 
00271     FP<void, MessageData&> defaultMessageHandler;
00272 
00273     bool isconnected;
00274 
00275 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00276     unsigned char pubbuf[MAX_MQTT_PACKET_SIZE];  // store the last publish for sending on reconnect
00277     int inflightLen;
00278     unsigned short inflightMsgid;
00279     enum QoS inflightQoS;
00280 #endif
00281 
00282 #if MQTTCLIENT_QOS2
00283     bool pubrel;
00284     #if !defined(MAX_INCOMING_QOS2_MESSAGES)
00285         #define MAX_INCOMING_QOS2_MESSAGES 10
00286     #endif
00287     unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES];
00288     bool isQoS2msgidFree(unsigned short id);
00289     bool useQoS2msgid(unsigned short id);
00290     void freeQoS2msgid(unsigned short id);
00291 #endif
00292 
00293 };
00294 
00295 }
00296 
00297 
00298 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
00299 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::cleanSession()
00300 {
00301     for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00302         messageHandlers[i].topicFilter = 0;
00303 
00304 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00305     inflightMsgid = 0;
00306     inflightQoS = QOS0;
00307 #endif
00308 
00309 #if MQTTCLIENT_QOS2
00310     pubrel = false;
00311     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00312         incomingQoS2messages[i] = 0;
00313 #endif
00314 }
00315 
00316 
00317 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
00318 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::closeSession()
00319 {
00320     ping_outstanding = false;
00321     isconnected = false;
00322     if (cleansession)
00323         cleanSession();
00324 }
00325 
00326 
00327 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
00328 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms)  : ipstack(network), packetid()
00329 {
00330     this->command_timeout_ms = command_timeout_ms;
00331     cleansession = true;
00332       closeSession();
00333 }
00334 
00335 
00336 #if MQTTCLIENT_QOS2
00337 template<class Network, class Timer, int a, int b>
00338 bool MQTT::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id)
00339 {
00340     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00341     {
00342         if (incomingQoS2messages[i] == id)
00343             return false;
00344     }
00345     return true;
00346 }
00347 
00348 
00349 template<class Network, class Timer, int a, int b>
00350 bool MQTT::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id)
00351 {
00352     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00353     {
00354         if (incomingQoS2messages[i] == 0)
00355         {
00356             incomingQoS2messages[i] = id;
00357             return true;
00358         }
00359     }
00360     return false;
00361 }
00362 
00363 
00364 template<class Network, class Timer, int a, int b>
00365 void MQTT::Client<Network, Timer, a, b>::freeQoS2msgid(unsigned short id)
00366 {
00367     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00368     {
00369         if (incomingQoS2messages[i] == id)
00370         {
00371             incomingQoS2messages[i] = 0;
00372             return;
00373         }
00374     }
00375 }
00376 #endif
00377 
00378 
00379 template<class Network, class Timer, int a, int b>
00380 int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer)
00381 {
00382     int rc = FAILURE,
00383         sent = 0;
00384 
00385     while (sent < length)
00386     {
00387         rc = ipstack.write(&sendbuf[sent], length - sent, timer.left_ms());
00388         if (rc < 0)  // there was an error writing the data
00389             break;
00390         sent += rc;
00391         if (timer.expired()) // only check expiry after at least one attempt to write
00392             break;
00393     }
00394     if (sent == length)
00395     {
00396         if (this->keepAliveInterval > 0)
00397             last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet
00398         rc = SUCCESS;
00399     }
00400     else
00401         rc = FAILURE;
00402 
00403 #if defined(MQTT_DEBUG)
00404     char printbuf[150];
00405     DEBUG("Rc %d from sending packet %s\r\n", rc, 
00406         MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length));
00407 #endif
00408     return rc;
00409 }
00410 
00411 
00412 template<class Network, class Timer, int a, int b>
00413 int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout)
00414 {
00415     unsigned char c;
00416     int multiplier = 1;
00417     int len = 0;
00418     const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
00419 
00420     *value = 0;
00421     do
00422     {
00423         int rc = MQTTPACKET_READ_ERROR;
00424 
00425         if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
00426         {
00427             rc = MQTTPACKET_READ_ERROR; /* bad data */
00428             goto exit;
00429         }
00430         rc = ipstack.read(&c, 1, timeout);
00431         //printf("decode: %d\r\n",rc);
00432         if (rc != 1)
00433             goto exit;
00434         *value += (c & 127) * multiplier;
00435         multiplier *= 128;
00436         //printf("c: %d\r\n",c);
00437         //printf("value: %d\r\n",value);
00438         //printf("multiplier: %d\r\n",multiplier);
00439     } while ((c & 128) != 0);
00440 exit:
00441     return len;
00442 }
00443 
00444 
00445 /**
00446  * If any read fails in this method, then we should disconnect from the network, as on reconnect
00447  * the packets can be retried.
00448  * @param timeout the max time to wait for the packet read to complete, in milliseconds
00449  * @return the MQTT packet type, 0 if none, -1 if error
00450  */
00451 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00452 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::readPacket(Timer& timer)
00453 {
00454     int rc = FAILURE;
00455     MQTTHeader header = {0};
00456     int len = 0;
00457     int rem_len = 0;
00458 
00459     /* 1. read the header byte.  This has the packet type in it */
00460     rc = ipstack.read(readbuf, 1, timer.left_ms());
00461     //printf("read: %d\r\n",rc);
00462     //printf("%c\r\n",readbuf[0]);
00463     if (rc != 1)
00464         goto exit;
00465 
00466     len = 1;
00467     /* 2. read the remaining length.  This is variable in itself */
00468     decodePacket(&rem_len, timer.left_ms());
00469     len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */
00470 
00471     if (rem_len > (MAX_MQTT_PACKET_SIZE - len))
00472     {
00473         rc = BUFFER_OVERFLOW;
00474         goto exit;
00475     }
00476 
00477     /* 3. read the rest of the buffer using a callback to supply the rest of the data */
00478     if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len))
00479         goto exit;
00480 
00481     header.byte = readbuf[0];
00482     rc = header.bits.type;
00483     if (this->keepAliveInterval > 0)
00484         last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet
00485 exit:
00486 
00487 #if defined(MQTT_DEBUG)
00488     if (rc >= 0)
00489     {
00490         char printbuf[50];
00491         DEBUG("Rc %d receiving packet %s\r\n", rc, 
00492             MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len));
00493     }
00494 #endif
00495     return rc;
00496 }
00497 
00498 
00499 // assume topic filter and name is in correct format
00500 // # can only be at end
00501 // + and # can only be next to separator
00502 template<class Network, class Timer, int a, int b>
00503 bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName)
00504 {
00505     char* curf = topicFilter;
00506     char* curn = topicName.lenstring.data;
00507     char* curn_end = curn + topicName.lenstring.len;
00508 
00509     while (*curf && curn < curn_end)
00510     {
00511         if (*curn == '/' && *curf != '/')
00512             break;
00513         if (*curf != '+' && *curf != '#' && *curf != *curn)
00514             break;
00515         if (*curf == '+')
00516         {   // skip until we meet the next separator, or end of string
00517             char* nextpos = curn + 1;
00518             while (nextpos < curn_end && *nextpos != '/')
00519                 nextpos = ++curn + 1;
00520         }
00521         else if (*curf == '#')
00522             curn = curn_end - 1;    // skip until end of string
00523         curf++;
00524         curn++;
00525     };
00526 
00527     return (curn == curn_end) && (*curf == '\0');
00528 }
00529 
00530 
00531 
00532 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
00533 int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message)
00534 {
00535     int rc = FAILURE;
00536 
00537     // we have to find the right message handler - indexed by topic
00538     for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00539     {
00540         if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) ||
00541                 isTopicMatched((char*)messageHandlers[i].topicFilter, topicName)))
00542         {
00543             if (messageHandlers[i].fp.attached())
00544             {
00545                 MessageData md(topicName, message);
00546                 messageHandlers[i].fp(md);
00547                 rc = SUCCESS;
00548             }
00549         }
00550     }
00551 
00552     if (rc == FAILURE && defaultMessageHandler.attached())
00553     {
00554         MessageData md(topicName, message);
00555         defaultMessageHandler(md);
00556         rc = SUCCESS;
00557     }
00558 
00559     return rc;
00560 }
00561 
00562 
00563 
00564 template<class Network, class Timer, int a, int b>
00565 int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms)
00566 {
00567     int rc = SUCCESS;
00568     Timer timer;
00569 
00570     timer.countdown_ms(timeout_ms);
00571     while (!timer.expired())
00572     {
00573         //printf("while...\r\n");
00574         int ans=cycle(timer);
00575         if (ans == -3004)
00576         {
00577             rc = -3004;
00578             break;
00579         }
00580         else if (ans < 0)
00581         {
00582             rc = FAILURE;
00583             break;
00584         }
00585     }
00586 
00587     return rc;
00588 }
00589 
00590 
00591 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00592 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer)
00593 {
00594     // get one piece of work off the wire and one pass through
00595     int len = 0,
00596         rc = SUCCESS;
00597 
00598     
00599     int packet_type = readPacket(timer);    // read the socket, see what work is due
00600     //printf("packet_type: %d\r\n",packet_type);
00601 
00602     switch (packet_type)
00603     {
00604         default:
00605             // no more data to read, unrecoverable. Or read packet fails due to unexpected network error
00606             rc = packet_type;
00607             //printf("default\r\n");
00608             goto exit;
00609         case 0: // timed out reading packet
00610             //printf("timeout\r\n");
00611             break;
00612         case -3001: // timed out reading packet
00613             //printf("timeout\r\n");
00614             break;
00615         case CONNACK:
00616         case PUBACK:
00617         case SUBACK:
00618             break;
00619         case PUBLISH:
00620         {
00621             MQTTString topicName = MQTTString_initializer;
00622             Message msg;
00623             int intQoS;
00624             msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */
00625             if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName,
00626                                  (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00627                 goto exit;
00628             msg.qos = (enum QoS)intQoS;
00629 #if MQTTCLIENT_QOS2
00630             if (msg.qos != QOS2)
00631 #endif
00632                 deliverMessage(topicName, msg);
00633 #if MQTTCLIENT_QOS2
00634             else if (isQoS2msgidFree(msg.id))
00635             {
00636                 if (useQoS2msgid(msg.id))
00637                     deliverMessage(topicName, msg);
00638                 else
00639                     WARN("Maximum number of incoming QoS2 messages exceeded");
00640             }
00641 #endif
00642 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00643             if (msg.qos != QOS0)
00644             {
00645                 if (msg.qos == QOS1)
00646                     len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id);
00647                 else if (msg.qos == QOS2)
00648                     len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id);
00649                 if (len <= 0)
00650                     rc = FAILURE;
00651                 else
00652                     rc = sendPacket(len, timer);
00653                 if (rc == FAILURE)
00654                     goto exit; // there was a problem
00655             }
00656             break;
00657 #endif
00658         }
00659 #if MQTTCLIENT_QOS2
00660         case PUBREC:
00661         case PUBREL:
00662             unsigned short mypacketid;
00663             unsigned char dup, type;
00664             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00665                 rc = FAILURE;
00666             else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE,
00667                                  (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0)
00668                 rc = FAILURE;
00669             else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet
00670                 rc = FAILURE; // there was a problem
00671             if (rc == FAILURE)
00672                 goto exit; // there was a problem
00673             if (packet_type == PUBREL)
00674                 freeQoS2msgid(mypacketid);
00675             break;
00676 
00677         case PUBCOMP:
00678             break;
00679 #endif
00680         case PINGRESP:
00681             ping_outstanding = false;
00682             break;
00683     }
00684 
00685     if (keepalive() != SUCCESS)
00686         //check only keepalive FAILURE status so that previous FAILURE status can be considered as FAULT
00687         rc = FAILURE;
00688 
00689 exit:
00690     if (rc == SUCCESS)
00691         rc = packet_type;
00692     else if (isconnected)
00693         closeSession();
00694     return rc;
00695 }
00696 
00697 
00698 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00699 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive()
00700 {
00701     int rc = SUCCESS;
00702     static Timer ping_sent;
00703 
00704     if (keepAliveInterval == 0)
00705         goto exit;
00706     
00707     if (ping_outstanding)
00708     {
00709         if (ping_sent.expired())
00710         {
00711             rc = FAILURE; // session failure
00712             #if defined(MQTT_DEBUG)
00713                 DEBUG("PINGRESP not received in keepalive interval\r\n");
00714             #endif
00715         }
00716     }
00717     else if (last_sent.expired() || last_received.expired())
00718     {
00719         Timer timer(1000);
00720         int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
00721         if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet
00722         {
00723             ping_outstanding = true;
00724             ping_sent.countdown(this->keepAliveInterval);
00725         }
00726     }
00727 exit:
00728     return rc;
00729 }
00730 
00731 
00732 // only used in single-threaded mode where one command at a time is in process
00733 template<class Network, class Timer, int a, int b>
00734 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer)
00735 {
00736     int rc = FAILURE;
00737 
00738     do
00739     {
00740         if (timer.expired())
00741             break; // we timed out
00742         rc = cycle(timer);
00743     }
00744     while (rc != packet_type && rc >= 0);
00745 
00746     return rc;
00747 }
00748 
00749 
00750 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00751 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options, connackData& data)
00752 {
00753     Timer connect_timer(command_timeout_ms);
00754     int rc = FAILURE;
00755     int len = 0;
00756 
00757     if (isconnected) // don't send connect packet again if we are already connected
00758         goto exit;
00759 
00760     this->keepAliveInterval = options.keepAliveInterval;
00761     this->cleansession = options.cleansession;
00762     if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0)
00763         goto exit;
00764     if ((rc = sendPacket(len, connect_timer)) != SUCCESS)  // send the connect packet
00765         goto exit; // there was a problem
00766 
00767     if (this->keepAliveInterval > 0)
00768         last_received.countdown(this->keepAliveInterval);
00769     // this will be a blocking call, wait for the connack
00770     if (waitfor(CONNACK, connect_timer) == CONNACK)
00771     {
00772         data.rc = 0;
00773         data.sessionPresent = false;
00774         if (MQTTDeserialize_connack((unsigned char*)&data.sessionPresent,
00775                             (unsigned char*)&data.rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
00776             rc = data.rc;
00777         else
00778             rc = FAILURE;
00779     }
00780     else
00781         rc = FAILURE;
00782 
00783 #if MQTTCLIENT_QOS2
00784     // resend any inflight publish
00785     if (inflightMsgid > 0 && inflightQoS == QOS2 && pubrel)
00786     {
00787         if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0)
00788             rc = FAILURE;
00789         else
00790             rc = publish(len, connect_timer, inflightQoS);
00791     }
00792     else
00793 #endif
00794 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00795     if (inflightMsgid > 0)
00796     {
00797         memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE);
00798         rc = publish(inflightLen, connect_timer, inflightQoS);
00799     }
00800 #endif
00801 
00802 exit:
00803     if (rc == SUCCESS)
00804     {
00805         isconnected = true;
00806         ping_outstanding = false;
00807     }
00808     return rc;
00809 }
00810 
00811 
00812 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00813 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options)
00814 {
00815     connackData data;
00816     return connect(options, data);
00817 }
00818 
00819 
00820 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00821 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect()
00822 {
00823     MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
00824     return connect(default_options);
00825 }
00826 
00827 
00828 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
00829 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::setMessageHandler(const char* topicFilter, messageHandler messageHandler)
00830 {
00831     int rc = FAILURE;
00832     int i = -1;
00833 
00834     // first check for an existing matching slot
00835     for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00836     {
00837         if (messageHandlers[i].topicFilter != 0 && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0)
00838         {
00839             if (messageHandler == 0) // remove existing
00840             {
00841                 messageHandlers[i].topicFilter = 0;
00842                 messageHandlers[i].fp.detach();
00843             }
00844             rc = SUCCESS; // return i when adding new subscription
00845             break;
00846         }
00847     }
00848     // if no existing, look for empty slot (unless we are removing)
00849     if (messageHandler != 0) {
00850         if (rc == FAILURE)
00851         {
00852             for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00853             {
00854                 if (messageHandlers[i].topicFilter == 0)
00855                 {
00856                     rc = SUCCESS;
00857                     break;
00858                 }
00859             }
00860         }
00861         if (i < MAX_MESSAGE_HANDLERS)
00862         {
00863             messageHandlers[i].topicFilter = topicFilter;
00864             messageHandlers[i].fp.attach(messageHandler);
00865         }
00866     }
00867     return rc;
00868 }
00869 
00870 
00871 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
00872 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter,
00873      enum QoS qos, messageHandler messageHandler, subackData& data)
00874 {
00875     int rc = FAILURE;
00876     Timer timer(command_timeout_ms);
00877     int len = 0;
00878     MQTTString topic = {(char*)topicFilter, {0, 0}};
00879 
00880     if (!isconnected)
00881         goto exit;
00882 
00883     len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
00884     if (len <= 0)
00885         goto exit;
00886     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
00887         goto exit;             // there was a problem
00888 
00889     if (waitfor(SUBACK, timer) == SUBACK)      // wait for suback
00890     {
00891         int count = 0;
00892         unsigned short mypacketid;
00893         data.grantedQoS = 0;
00894         if (MQTTDeserialize_suback(&mypacketid, 1, &count, &data.grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
00895         {
00896             if (data.grantedQoS != 0x80)
00897                 rc = setMessageHandler(topicFilter, messageHandler);
00898         }
00899     }
00900     else
00901         rc = FAILURE;
00902 
00903 exit:
00904     if (rc == FAILURE)
00905         closeSession();
00906     return rc;
00907 }
00908 
00909 
00910 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
00911 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
00912 {
00913     subackData data;
00914     return subscribe(topicFilter, qos, messageHandler, data);
00915 }
00916 
00917 
00918 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
00919 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter)
00920 {
00921     int rc = FAILURE;
00922     Timer timer(command_timeout_ms);
00923     MQTTString topic = {(char*)topicFilter, {0, 0}};
00924     int len = 0;
00925 
00926     if (!isconnected)
00927         goto exit;
00928 
00929     if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0)
00930         goto exit;
00931     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet
00932         goto exit; // there was a problem
00933 
00934     if (waitfor(UNSUBACK, timer) == UNSUBACK)
00935     {
00936         unsigned short mypacketid;  // should be the same as the packetid above
00937         if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
00938         {
00939             // remove the subscription message handler associated with this topic, if there is one
00940             setMessageHandler(topicFilter, 0);
00941         }
00942     }
00943     else
00944         rc = FAILURE;
00945 
00946 exit:
00947     if (rc != SUCCESS)
00948         closeSession();
00949     return rc;
00950 }
00951 
00952 
00953 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00954 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos)
00955 {
00956     int rc;
00957 
00958     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet
00959         goto exit; // there was a problem
00960 
00961 #if MQTTCLIENT_QOS1
00962     if (qos == QOS1)
00963     {
00964         if (waitfor(PUBACK, timer) == PUBACK)
00965         {
00966             unsigned short mypacketid;
00967             unsigned char dup, type;
00968             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00969                 rc = FAILURE;
00970             else if (inflightMsgid == mypacketid)
00971                 inflightMsgid = 0;
00972         }
00973         else
00974             rc = FAILURE;
00975     }
00976 #endif
00977 #if MQTTCLIENT_QOS2
00978     else if (qos == QOS2)
00979     {
00980         if (waitfor(PUBCOMP, timer) == PUBCOMP)
00981         {
00982             unsigned short mypacketid;
00983             unsigned char dup, type;
00984             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00985                 rc = FAILURE;
00986             else if (inflightMsgid == mypacketid)
00987                 inflightMsgid = 0;
00988         }
00989         else
00990             rc = FAILURE;
00991     }
00992 #endif
00993 
00994 exit:
00995     if (rc != SUCCESS)
00996         closeSession();
00997     return rc;
00998 }
00999 
01000 
01001 
01002 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
01003 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained)
01004 {
01005     int rc = FAILURE;
01006     Timer timer(command_timeout_ms);
01007     MQTTString topicString = MQTTString_initializer;
01008     int len = 0;
01009 
01010     if (!isconnected)
01011         goto exit;
01012 
01013     topicString.cstring = (char*)topicName;
01014 
01015 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
01016     if (qos == QOS1 || qos == QOS2)
01017         id = packetid.getNext();
01018 #endif
01019 
01020     len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id,
01021               topicString, (unsigned char*)payload, payloadlen);
01022     if (len <= 0)
01023         goto exit;
01024 
01025 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
01026     if (!cleansession)
01027     {
01028         memcpy(pubbuf, sendbuf, len);
01029         inflightMsgid = id;
01030         inflightLen = len;
01031         inflightQoS = qos;
01032 #if MQTTCLIENT_QOS2
01033         pubrel = false;
01034 #endif
01035     }
01036 #endif
01037 
01038     rc = publish(len, timer, qos);
01039 exit:
01040     return rc;
01041 }
01042 
01043 
01044 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
01045 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained)
01046 {
01047     unsigned short id = 0;  // dummy - not used for anything
01048     return publish(topicName, payload, payloadlen, id, qos, retained);
01049 }
01050 
01051 
01052 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
01053 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message)
01054 {
01055     return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained);
01056 }
01057 
01058 
01059 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
01060 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect()
01061 {
01062     int rc = FAILURE;
01063     Timer timer(command_timeout_ms);     // we might wait for incomplete incoming publishes to complete
01064     int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE);
01065     if (len > 0)
01066         rc = sendPacket(len, timer);            // send the disconnect packet
01067     closeSession();
01068     return rc;
01069 }
01070 
01071 #endif