Junichi Katsu / Mbed OS mbed-os-example-milkcocoa

Fork of mbed-os-example-mbed5-blinky by mbed-os-examples

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTClient.h Source File

MQTTClient.h

00001 /*******************************************************************************
00002  * Copyright (c) 2014, 2015 IBM Corp.
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors:
00014  *    Ian Craggs - initial API and implementation and/or initial documentation
00015  *    Ian Craggs - fix for bug 458512 - QoS 2 messages
00016  *    Ian Craggs - fix for bug 460389 - send loop uses wrong length
00017  *    Ian Craggs - fix for bug 464169 - clearing subscriptions
00018  *    Ian Craggs - fix for bug 464551 - enums and ints can be different size
00019  *******************************************************************************/
00020 
00021 #if !defined(MQTTCLIENT_H)
00022 #define MQTTCLIENT_H
00023 
00024 #include "FP.h"
00025 #include "MQTTPacket.h"
00026 #include "stdio.h"
00027 #include "MQTTLogging.h"
00028 
00029 #if !defined(MQTTCLIENT_QOS1)
00030     #define MQTTCLIENT_QOS1 1
00031 #endif
00032 #if !defined(MQTTCLIENT_QOS2)
00033     #define MQTTCLIENT_QOS2 0
00034 #endif
00035 
00036 namespace MQTT
00037 {
00038 
00039 
00040 enum QoS { QOS0, QOS1, QOS2 };
00041 
00042 // all failure return codes must be negative
00043 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
00044 
00045 
00046 struct Message
00047 {
00048     enum QoS qos;
00049     bool retained;
00050     bool dup;
00051     unsigned short id;
00052     void *payload;
00053     size_t payloadlen;
00054 };
00055 
00056 
00057 struct MessageData
00058 {
00059     MessageData(MQTTString &aTopicName, struct Message &aMessage)  : message(aMessage), topicName(aTopicName)
00060     { }
00061 
00062     struct Message &message;
00063     MQTTString &topicName;
00064 };
00065 
00066 
00067 class PacketId
00068 {
00069 public:
00070     PacketId()
00071     {
00072         next = 0;
00073     }
00074 
00075     int getNext()
00076     {
00077         return next = (next == MAX_PACKET_ID) ? 1 : ++next;
00078     }
00079 
00080 private:
00081     static const int MAX_PACKET_ID = 65535;
00082     int next;
00083 };
00084 
00085 
00086 /**
00087  * @class Client
00088  * @brief blocking, non-threaded MQTT client API
00089  *
00090  * This version of the API blocks on all method calls, until they are complete.  This means that only one
00091  * MQTT request can be in process at any one time.
00092  * @param Network a network class which supports send, receive
00093  * @param Timer a timer class with the methods:
00094  */
00095 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5>
00096 class Client
00097 {
00098 
00099 public:
00100 
00101     typedef void (*messageHandler)(MessageData&);
00102 
00103     /** Construct the client
00104      *  @param network - pointer to an instance of the Network class - must be connected to the endpoint
00105      *      before calling MQTT connect
00106      *  @param limits an instance of the Limit class - to alter limits as required
00107      */
00108     Client(Network& network, unsigned int command_timeout_ms = 30000);
00109 
00110     /** Set the default message handling callback - used for any message which does not match a subscription message handler
00111      *  @param mh - pointer to the callback function
00112      */
00113     void setDefaultMessageHandler(messageHandler mh)
00114     {
00115         defaultMessageHandler.attach(mh);
00116     }
00117 
00118     /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
00119      *  The nework object must be connected to the network endpoint before calling this
00120      *  Default connect options are used
00121      *  @return success code -
00122      */
00123     int connect();
00124     
00125         /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
00126      *  The nework object must be connected to the network endpoint before calling this
00127      *  @param options - connect options
00128      *  @return success code -
00129      */
00130     int connect(MQTTPacket_connectData& options);
00131 
00132     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00133      *  @param topic - the topic to publish to
00134      *  @param message - the message to send
00135      *  @return success code -
00136      */
00137     int publish(const char* topicName, Message& message);
00138     
00139     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00140      *  @param topic - the topic to publish to
00141      *  @param payload - the data to send
00142      *  @param payloadlen - the length of the data
00143      *  @param qos - the QoS to send the publish at
00144      *  @param retained - whether the message should be retained
00145      *  @return success code -
00146      */
00147     int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false);
00148     
00149     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00150      *  @param topic - the topic to publish to
00151      *  @param payload - the data to send
00152      *  @param payloadlen - the length of the data
00153      *  @param id - the packet id used - returned 
00154      *  @param qos - the QoS to send the publish at
00155      *  @param retained - whether the message should be retained
00156      *  @return success code -
00157      */
00158     int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false);
00159 
00160     /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
00161      *  @param topicFilter - a topic pattern which can include wildcards
00162      *  @param qos - the MQTT QoS to subscribe at
00163      *  @param mh - the callback function to be invoked when a message is received for this subscription
00164      *  @return success code -
00165      */
00166     int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh);
00167 
00168     /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback
00169      *  @param topicFilter - a topic pattern which can include wildcards
00170      *  @return success code -
00171      */
00172     int unsubscribe(const char* topicFilter);
00173 
00174     /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state
00175      *  @return success code -
00176      */
00177     int disconnect();
00178 
00179     /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive
00180      *  yield can be called if no other MQTT operation is needed.  This will also allow messages to be
00181      *  received.
00182      *  @param timeout_ms the time to wait, in milliseconds
00183      *  @return success code - on failure, this means the client has disconnected
00184      */
00185     int yield(unsigned long timeout_ms = 1000L);
00186 
00187     /** Is the client connected?
00188      *  @return flag - is the client connected or not?
00189      */
00190     bool isConnected()
00191     {
00192         return isconnected;
00193     }
00194 
00195 private:
00196 
00197     void cleanSession();
00198     int cycle(Timer& timer);
00199     int waitfor(int packet_type, Timer& timer);
00200     int keepalive();
00201     int publish(int len, Timer& timer, enum QoS qos);
00202 
00203     int decodePacket(int* value, int timeout);
00204     int readPacket(Timer& timer);
00205     int sendPacket(int length, Timer& timer);
00206     int deliverMessage(MQTTString& topicName, Message& message);
00207     bool isTopicMatched(char* topicFilter, MQTTString& topicName);
00208 
00209     Network& ipstack;
00210     unsigned long command_timeout_ms;
00211 
00212     unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
00213     unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
00214 
00215     Timer last_sent, last_received;
00216     unsigned int keepAliveInterval;
00217     bool ping_outstanding;
00218     bool cleansession;
00219 
00220     PacketId packetid;
00221 
00222     struct MessageHandlers
00223     {
00224         const char* topicFilter;
00225         FP<void, MessageData&>  fp;
00226     } messageHandlers[MAX_MESSAGE_HANDLERS];      // Message handlers are indexed by subscription topic
00227 
00228     FP<void, MessageData&>  defaultMessageHandler;
00229 
00230     bool isconnected;
00231 
00232 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00233     unsigned char pubbuf[MAX_MQTT_PACKET_SIZE];  // store the last publish for sending on reconnect
00234     int inflightLen;
00235     unsigned short inflightMsgid;
00236     enum QoS inflightQoS;
00237 #endif
00238 
00239 #if MQTTCLIENT_QOS2
00240     bool pubrel;
00241     #if !defined(MAX_INCOMING_QOS2_MESSAGES)
00242         #define MAX_INCOMING_QOS2_MESSAGES 10
00243     #endif
00244     unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES];
00245     bool isQoS2msgidFree(unsigned short id);
00246     bool useQoS2msgid(unsigned short id);
00247     void freeQoS2msgid(unsigned short id);
00248 #endif
00249 
00250 };
00251 
00252 }
00253 
00254 
00255 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
00256 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::cleanSession() 
00257 {
00258     ping_outstanding = false;
00259     for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00260         messageHandlers[i].topicFilter = 0;
00261     isconnected = false;
00262 
00263 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00264     inflightMsgid = 0;
00265     inflightQoS = QOS0;
00266 #endif
00267 
00268 #if MQTTCLIENT_QOS2
00269     pubrel = false;
00270     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00271         incomingQoS2messages[i] = 0;
00272 #endif
00273 }
00274 
00275 
00276 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
00277 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms)  : ipstack(network), packetid()
00278 {
00279     last_sent = Timer();
00280     last_received = Timer();
00281     this->command_timeout_ms = command_timeout_ms;
00282     cleanSession();
00283 }
00284 
00285 
00286 #if MQTTCLIENT_QOS2
00287 template<class Network, class Timer, int a, int b>
00288 bool MQTT::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id)
00289 {
00290     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00291     {
00292         if (incomingQoS2messages[i] == id)
00293             return false;
00294     }
00295     return true;
00296 }
00297 
00298 
00299 template<class Network, class Timer, int a, int b>
00300 bool MQTT::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id)
00301 {
00302     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00303     {
00304         if (incomingQoS2messages[i] == 0)
00305         {
00306             incomingQoS2messages[i] = id;
00307             return true;
00308         }
00309     }
00310     return false;
00311 }
00312 
00313 
00314 template<class Network, class Timer, int a, int b>
00315 void MQTT::Client<Network, Timer, a, b>::freeQoS2msgid(unsigned short id)
00316 {
00317     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00318     {
00319         if (incomingQoS2messages[i] == id)
00320         {
00321             incomingQoS2messages[i] = 0;
00322             return;
00323         }
00324     }
00325 }
00326 #endif
00327 
00328 
00329 template<class Network, class Timer, int a, int b>
00330 int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer)
00331 {
00332     int rc = FAILURE,
00333         sent = 0;
00334 
00335     while (sent < length && !timer.expired())
00336     {
00337         rc = ipstack.write(&sendbuf[sent], length - sent, timer.left_ms());
00338         if (rc < 0)  // there was an error writing the data
00339             break;
00340         sent += rc;
00341     }
00342     if (sent == length)
00343     {
00344         if (this->keepAliveInterval > 0)
00345             last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet
00346         rc = SUCCESS;
00347     }
00348     else
00349         rc = FAILURE;
00350         
00351 #if defined(MQTT_DEBUG)
00352     char printbuf[150];
00353     DEBUG("Rc %d from sending packet %s\n", rc, MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length));
00354 #endif
00355     return rc;
00356 }
00357 
00358 
00359 template<class Network, class Timer, int a, int b>
00360 int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout)
00361 {
00362     unsigned char c;
00363     int multiplier = 1;
00364     int len = 0;
00365     const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
00366 
00367     *value = 0;
00368     do
00369     {
00370         int rc = MQTTPACKET_READ_ERROR;
00371 
00372         if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
00373         {
00374             rc = MQTTPACKET_READ_ERROR; /* bad data */
00375             goto exit;
00376         }
00377         rc = ipstack.read(&c, 1, timeout);
00378         if (rc != 1)
00379             goto exit;
00380         *value += (c & 127) * multiplier;
00381         multiplier *= 128;
00382     } while ((c & 128) != 0);
00383 exit:
00384     return len;
00385 }
00386 
00387 
00388 /**
00389  * If any read fails in this method, then we should disconnect from the network, as on reconnect
00390  * the packets can be retried.
00391  * @param timeout the max time to wait for the packet read to complete, in milliseconds
00392  * @return the MQTT packet type, or -1 if none
00393  */
00394 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00395 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::readPacket(Timer& timer)
00396 {
00397     int rc = FAILURE;
00398     MQTTHeader header = {0};
00399     int len = 0;
00400     int rem_len = 0;
00401 
00402     /* 1. read the header byte.  This has the packet type in it */
00403     if (ipstack.read(readbuf, 1, timer.left_ms()) != 1)
00404         goto exit;
00405 
00406     len = 1;
00407     /* 2. read the remaining length.  This is variable in itself */
00408     decodePacket(&rem_len, timer.left_ms());
00409     len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */
00410 
00411     if (rem_len > (MAX_MQTT_PACKET_SIZE - len))
00412     {
00413         rc = BUFFER_OVERFLOW;
00414         goto exit;
00415     }
00416 
00417     /* 3. read the rest of the buffer using a callback to supply the rest of the data */
00418     if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len))
00419         goto exit;
00420 
00421     header.byte = readbuf[0];
00422     rc = header.bits.type;
00423     if (this->keepAliveInterval > 0)
00424         last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet
00425 exit:
00426         
00427 #if defined(MQTT_DEBUG)
00428     if (rc >= 0)
00429     {
00430         char printbuf[50];
00431         DEBUG("Rc %d from receiving packet %s\n", rc, MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len));
00432     }
00433 #endif
00434     return rc;
00435 }
00436 
00437 
00438 // assume topic filter and name is in correct format
00439 // # can only be at end
00440 // + and # can only be next to separator
00441 template<class Network, class Timer, int a, int b>
00442 bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName)
00443 {
00444     char* curf = topicFilter;
00445     char* curn = topicName.lenstring.data;
00446     char* curn_end = curn + topicName.lenstring.len;
00447 
00448     while (*curf && curn < curn_end)
00449     {
00450         if (*curn == '/' && *curf != '/')
00451             break;
00452         if (*curf != '+' && *curf != '#' && *curf != *curn)
00453             break;
00454         if (*curf == '+')
00455         {   // skip until we meet the next separator, or end of string
00456             char* nextpos = curn + 1;
00457             while (nextpos < curn_end && *nextpos != '/')
00458                 nextpos = ++curn + 1;
00459         }
00460         else if (*curf == '#')
00461             curn = curn_end - 1;    // skip until end of string
00462         curf++;
00463         curn++;
00464     };
00465 
00466     return (curn == curn_end) && (*curf == '\0');
00467 }
00468 
00469 
00470 
00471 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
00472 int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message)
00473 {
00474     int rc = FAILURE;
00475 
00476     // we have to find the right message handler - indexed by topic
00477     for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00478     {
00479         if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) ||
00480                 isTopicMatched((char*)messageHandlers[i].topicFilter, topicName)))
00481         {
00482             if (messageHandlers[i].fp.attached())
00483             {
00484                 MessageData md(topicName, message);
00485                 messageHandlers[i].fp(md);
00486                 rc = SUCCESS;
00487             }
00488         }
00489     }
00490 
00491     if (rc == FAILURE && defaultMessageHandler.attached())
00492     {
00493         MessageData md(topicName, message);
00494         defaultMessageHandler(md);
00495         rc = SUCCESS;
00496     }
00497 
00498     return rc;
00499 }
00500 
00501 
00502 
00503 template<class Network, class Timer, int a, int b>
00504 int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms)
00505 {
00506     int rc = SUCCESS;
00507     Timer timer = Timer();
00508 
00509     timer.countdown_ms(timeout_ms);
00510     while (!timer.expired())
00511     {
00512         if (cycle(timer) < 0)
00513         {
00514             rc = FAILURE;
00515             break;
00516         }
00517         Thread::wait(10);
00518     }
00519 
00520     return rc;
00521 }
00522 
00523 
00524 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00525 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer)
00526 {
00527     /* get one piece of work off the wire and one pass through */
00528 
00529     // read the socket, see what work is due
00530     int packet_type = readPacket(timer);
00531 
00532     int len = 0,
00533         rc = SUCCESS;
00534 
00535     switch (packet_type)
00536     {
00537         case FAILURE:
00538         case BUFFER_OVERFLOW:
00539             rc = packet_type;
00540             break;
00541         case CONNACK:
00542         case PUBACK:
00543         case SUBACK:
00544             break;
00545         case PUBLISH:
00546         {
00547             MQTTString topicName = MQTTString_initializer;
00548             Message msg;
00549             int intQoS;
00550             if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName,
00551                                  (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00552                 goto exit;
00553             msg.qos = (enum QoS)intQoS;
00554 #if MQTTCLIENT_QOS2
00555             if (msg.qos != QOS2)
00556 #endif
00557                 deliverMessage(topicName, msg);
00558 #if MQTTCLIENT_QOS2
00559             else if (isQoS2msgidFree(msg.id))
00560             {
00561                 if (useQoS2msgid(msg.id))
00562                     deliverMessage(topicName, msg);
00563                 else
00564                     WARN("Maximum number of incoming QoS2 messages exceeded");
00565             }
00566 #endif
00567 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00568             if (msg.qos != QOS0)
00569             {
00570                 if (msg.qos == QOS1)
00571                     len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id);
00572                 else if (msg.qos == QOS2)
00573                     len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id);
00574                 if (len <= 0)
00575                     rc = FAILURE;
00576                 else
00577                     rc = sendPacket(len, timer);
00578                 if (rc == FAILURE)
00579                     goto exit; // there was a problem
00580             }
00581             break;
00582 #endif
00583         }
00584 #if MQTTCLIENT_QOS2
00585         case PUBREC:
00586         case PUBREL:
00587             unsigned short mypacketid;
00588             unsigned char dup, type;
00589             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00590                 rc = FAILURE;
00591             else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, 
00592                         (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0)
00593                 rc = FAILURE;
00594             else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet
00595                 rc = FAILURE; // there was a problem
00596             if (rc == FAILURE)
00597                 goto exit; // there was a problem
00598             if (packet_type == PUBREL)
00599                 freeQoS2msgid(mypacketid);
00600             break;
00601             
00602         case PUBCOMP:
00603             break;
00604 #endif
00605         case PINGRESP:
00606             ping_outstanding = false;
00607             break;
00608     }
00609     keepalive();
00610 exit:
00611     if (rc == SUCCESS)
00612         rc = packet_type;
00613     return rc;
00614 }
00615 
00616 
00617 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00618 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive()
00619 {
00620     int rc = FAILURE;
00621 
00622     if (keepAliveInterval == 0)
00623     {
00624         rc = SUCCESS;
00625         goto exit;
00626     }
00627 
00628     if (last_sent.expired() || last_received.expired())
00629     {
00630         if (!ping_outstanding)
00631         {
00632             Timer timer(1000);
00633             int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
00634             if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet
00635                 ping_outstanding = true;
00636         }
00637     }
00638 
00639 exit:
00640     return rc;
00641 }
00642 
00643 
00644 // only used in single-threaded mode where one command at a time is in process
00645 template<class Network, class Timer, int a, int b>
00646 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer)
00647 {
00648     int rc = FAILURE;
00649 
00650     do
00651     {
00652         if (timer.expired())
00653             break; // we timed out
00654     }
00655     while ((rc = cycle(timer)) != packet_type);
00656 
00657     return rc;
00658 }
00659 
00660 
00661 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00662 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options)
00663 {
00664     Timer connect_timer(command_timeout_ms);
00665     int rc = FAILURE;
00666     int len = 0;
00667 
00668     if (isconnected) // don't send connect packet again if we are already connected
00669         goto exit;
00670 
00671     this->keepAliveInterval = options.keepAliveInterval;
00672     this->cleansession = options.cleansession;
00673     if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0)
00674         goto exit;
00675     if ((rc = sendPacket(len, connect_timer)) != SUCCESS)  // send the connect packet
00676         goto exit; // there was a problem
00677 
00678     if (this->keepAliveInterval > 0)
00679         last_received.countdown(this->keepAliveInterval);
00680     // this will be a blocking call, wait for the connack
00681     if (waitfor(CONNACK, connect_timer) == CONNACK)
00682     {
00683         unsigned char connack_rc = 255;
00684         bool sessionPresent = false;
00685         if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
00686             rc = connack_rc;
00687         else
00688             rc = FAILURE;
00689     }
00690     else
00691         rc = FAILURE;
00692 
00693 #if MQTTCLIENT_QOS2
00694     // resend any inflight publish
00695     if (inflightMsgid > 0 && inflightQoS == QOS2 && pubrel)
00696     {
00697         if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0)
00698             rc = FAILURE;
00699         else
00700             rc = publish(len, connect_timer, inflightQoS);
00701     }
00702     else
00703 #endif
00704 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00705     if (inflightMsgid > 0)
00706     {
00707         memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE);
00708         rc = publish(inflightLen, connect_timer, inflightQoS);
00709     }
00710 #endif
00711 
00712 exit:
00713     if (rc == SUCCESS)
00714         isconnected = true;
00715     return rc;
00716 }
00717 
00718 
00719 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00720 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect()
00721 {
00722     MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
00723     return connect(default_options);
00724 }
00725 
00726 
00727 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
00728 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
00729 {
00730     int rc = FAILURE;
00731     Timer timer(command_timeout_ms);
00732     int len = 0;
00733     MQTTString topic = {(char*)topicFilter, {0, 0}};
00734 
00735     if (!isconnected)
00736         goto exit;
00737 
00738     len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
00739     if (len <= 0)
00740         goto exit;
00741     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
00742         goto exit;             // there was a problem
00743 
00744     if (waitfor(SUBACK, timer) == SUBACK)      // wait for suback
00745     {
00746         int count = 0, grantedQoS = -1;
00747         unsigned short mypacketid;
00748         if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
00749             rc = grantedQoS; // 0, 1, 2 or 0x80
00750         if (rc != 0x80)
00751         {
00752             for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00753             {
00754                 if (messageHandlers[i].topicFilter == 0)
00755                 {
00756                     messageHandlers[i].topicFilter = topicFilter;
00757                     messageHandlers[i].fp.attach(messageHandler);
00758                     rc = 0;
00759                     break;
00760                 }
00761             }
00762         }
00763     }
00764     else
00765         rc = FAILURE;
00766 
00767 exit:
00768     if (rc != SUCCESS)
00769         cleanSession();
00770     return rc;
00771 }
00772 
00773 
00774 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
00775 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter)
00776 {
00777     int rc = FAILURE;
00778     Timer timer(command_timeout_ms);
00779     MQTTString topic = {(char*)topicFilter, {0, 0}};
00780     int len = 0;
00781 
00782     if (!isconnected)
00783         goto exit;
00784 
00785     if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0)
00786         goto exit;
00787     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet
00788         goto exit; // there was a problem
00789 
00790     if (waitfor(UNSUBACK, timer) == UNSUBACK)
00791     {
00792         unsigned short mypacketid;  // should be the same as the packetid above
00793         if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
00794         {
00795             rc = 0;
00796 
00797             // remove the subscription message handler associated with this topic, if there is one
00798             for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00799             {
00800                 if (messageHandlers[i].topicFilter && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0)
00801                 {
00802                     messageHandlers[i].topicFilter = 0;
00803                     break;
00804                 }
00805             }
00806         }
00807     }
00808     else
00809         rc = FAILURE;
00810 
00811 exit:
00812     if (rc != SUCCESS)
00813         cleanSession();
00814     return rc;
00815 }
00816 
00817 
00818 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00819 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos)
00820 {
00821     int rc;
00822 
00823     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet
00824         goto exit; // there was a problem
00825 
00826 #if MQTTCLIENT_QOS1
00827     if (qos == QOS1)
00828     {
00829         if (waitfor(PUBACK, timer) == PUBACK)
00830         {
00831             unsigned short mypacketid;
00832             unsigned char dup, type;
00833             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00834                 rc = FAILURE;
00835             else if (inflightMsgid == mypacketid)
00836                 inflightMsgid = 0;
00837         }
00838         else
00839             rc = FAILURE;
00840     }
00841 #elif MQTTCLIENT_QOS2
00842     else if (qos == QOS2)
00843     {
00844         if (waitfor(PUBCOMP, timer) == PUBCOMP)
00845         {
00846             unsigned short mypacketid;
00847             unsigned char dup, type;
00848             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00849                 rc = FAILURE;
00850             else if (inflightMsgid == mypacketid)
00851                 inflightMsgid = 0;
00852         }
00853         else
00854             rc = FAILURE;
00855     }
00856 #endif
00857 
00858 exit:
00859     if (rc != SUCCESS)
00860         cleanSession();
00861     return rc;
00862 }
00863 
00864 
00865 
00866 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00867 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained)
00868 {
00869     int rc = FAILURE;
00870     Timer timer(command_timeout_ms);
00871     MQTTString topicString = MQTTString_initializer;
00872     int len = 0;
00873 
00874     if (!isconnected)
00875         goto exit;
00876 
00877     topicString.cstring = (char*)topicName;
00878 
00879 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00880     if (qos == QOS1 || qos == QOS2)
00881         id = packetid.getNext();
00882 #endif
00883 
00884     len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id,
00885               topicString, (unsigned char*)payload, payloadlen);
00886     if (len <= 0)
00887         goto exit;
00888 
00889 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00890     if (!cleansession)
00891     {
00892         memcpy(pubbuf, sendbuf, len);
00893         inflightMsgid = id;
00894         inflightLen = len;
00895         inflightQoS = qos;
00896 #if MQTTCLIENT_QOS2
00897         pubrel = false;
00898 #endif
00899     }
00900 #endif
00901 
00902     rc = publish(len, timer, qos);
00903 exit:
00904     return rc;
00905 }
00906 
00907 
00908 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00909 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained)
00910 {
00911     unsigned short id = 0;  // dummy - not used for anything
00912     return publish(topicName, payload, payloadlen, id, qos, retained);
00913 }
00914 
00915 
00916 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00917 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message)
00918 {
00919     return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained);
00920 }
00921 
00922 
00923 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00924 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect()
00925 {
00926     int rc = FAILURE;
00927     Timer timer(command_timeout_ms);            // we might wait for incomplete incoming publishes to complete
00928     int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE);
00929     if (len > 0)
00930         rc = sendPacket(len, timer);            // send the disconnect packet
00931 
00932     if (cleansession)
00933         cleanSession();
00934     else
00935         isconnected = false;
00936     return rc;
00937 }
00938 
00939 
00940 #endif