Rick McConney / MQTT-JMF

Dependencies:   FP

Dependents:   WNCProximityMqtt

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTClient.h Source File

MQTTClient.h

00001 /*******************************************************************************
00002  * Copyright (c) 2014, 2015 IBM Corp.
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors:
00014  *    Ian Craggs - initial API and implementation and/or initial documentation
00015  *    Ian Craggs - fix for bug 458512 - QoS 2 messages
00016  *    Ian Craggs - fix for bug 460389 - send loop uses wrong length
00017  *    Ian Craggs - fix for bug 464169 - clearing subscriptions
00018  *    Ian Craggs - fix for bug 464551 - enums and ints can be different size
00019  *******************************************************************************/
00020 
00021 #if !defined(MQTTCLIENT_H)
00022 #define MQTTCLIENT_H
00023 
00024 #include "FP.h"
00025 #include "MQTTPacket.h"
00026 #include "stdio.h"
00027 #include "MQTTLogging.h"
00028 
00029 #if !defined(MQTTCLIENT_QOS1)
00030     #define MQTTCLIENT_QOS1 1
00031 #endif
00032 #if !defined(MQTTCLIENT_QOS2)
00033     #define MQTTCLIENT_QOS2 0
00034 #endif
00035 
00036 namespace MQTT
00037 {
00038 
00039 
00040 enum QoS { QOS0, QOS1, QOS2 };
00041 
00042 // all failure return codes must be negative
00043 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
00044 
00045 
00046 struct Message
00047 {
00048     enum QoS qos;
00049     bool retained;
00050     bool dup;
00051     unsigned short id;
00052     void *payload;
00053     size_t payloadlen;
00054 };
00055 
00056 
00057 struct MessageData
00058 {
00059     MessageData(MQTTString &aTopicName, struct Message &aMessage)  : message(aMessage), topicName(aTopicName)
00060     { }
00061 
00062     struct Message &message;
00063     MQTTString &topicName;
00064 };
00065 
00066 
00067 class PacketId
00068 {
00069 public:
00070     PacketId()
00071     {
00072         next = 0;
00073     }
00074 
00075     int getNext()
00076     {
00077         return next = (next == MAX_PACKET_ID) ? 1 : ++next;
00078     }
00079 
00080 private:
00081     static const int MAX_PACKET_ID = 65535;
00082     int next;
00083 };
00084 
00085 
00086 /**
00087  * @class Client
00088  * @brief blocking, non-threaded MQTT client API
00089  *
00090  * This version of the API blocks on all method calls, until they are complete.  This means that only one
00091  * MQTT request can be in process at any one time.
00092  * @param Network a network class which supports send, receive
00093  * @param Timer a timer class with the methods:
00094  */
00095 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 512, int MAX_MESSAGE_HANDLERS = 5>
00096 class Client
00097 {
00098 
00099 public:
00100 
00101     typedef void (*messageHandler)(MessageData&);
00102 
00103     /** Construct the client
00104      *  @param network - pointer to an instance of the Network class - must be connected to the endpoint
00105      *      before calling MQTT connect
00106      *  @param limits an instance of the Limit class - to alter limits as required
00107      */
00108     Client(Network& network, unsigned int command_timeout_ms = 30000);
00109 
00110     /** Set the default message handling callback - used for any message which does not match a subscription message handler
00111      *  @param mh - pointer to the callback function
00112      */
00113     void setDefaultMessageHandler(messageHandler mh)
00114     {
00115         defaultMessageHandler.attach(mh);
00116     }
00117 
00118     /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
00119      *  The nework object must be connected to the network endpoint before calling this
00120      *  Default connect options are used
00121      *  @return success code -
00122      */
00123     int connect();
00124     
00125         /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
00126      *  The nework object must be connected to the network endpoint before calling this
00127      *  @param options - connect options
00128      *  @return success code -
00129      */
00130     int connect(MQTTPacket_connectData& options);
00131 
00132     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00133      *  @param topic - the topic to publish to
00134      *  @param message - the message to send
00135      *  @return success code -
00136      */
00137     int publish(const char* topicName, Message& message);
00138     
00139     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00140      *  @param topic - the topic to publish to
00141      *  @param payload - the data to send
00142      *  @param payloadlen - the length of the data
00143      *  @param qos - the QoS to send the publish at
00144      *  @param retained - whether the message should be retained
00145      *  @return success code -
00146      */
00147     int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false);
00148     
00149     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00150      *  @param topic - the topic to publish to
00151      *  @param payload - the data to send
00152      *  @param payloadlen - the length of the data
00153      *  @param id - the packet id used - returned 
00154      *  @param qos - the QoS to send the publish at
00155      *  @param retained - whether the message should be retained
00156      *  @return success code -
00157      */
00158     int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false);
00159 
00160     /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
00161      *  @param topicFilter - a topic pattern which can include wildcards
00162      *  @param qos - the MQTT QoS to subscribe at
00163      *  @param mh - the callback function to be invoked when a message is received for this subscription
00164      *  @return success code -
00165      */
00166     int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh);
00167 
00168     /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback
00169      *  @param topicFilter - a topic pattern which can include wildcards
00170      *  @return success code -
00171      */
00172     int unsubscribe(const char* topicFilter);
00173 
00174     /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state
00175      *  @return success code -
00176      */
00177     int disconnect();
00178 
00179     /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive
00180      *  yield can be called if no other MQTT operation is needed.  This will also allow messages to be
00181      *  received.
00182      *  @param timeout_ms the time to wait, in milliseconds
00183      *  @return success code - on failure, this means the client has disconnected
00184      */
00185     int yield(unsigned long timeout_ms = 1000L);
00186 
00187     /** Is the client connected?
00188      *  @return flag - is the client connected or not?
00189      */
00190     bool isConnected()
00191     {
00192         return isconnected;
00193     }
00194 
00195 private:
00196 
00197     void cleanSession();
00198     int cycle(Timer& timer);
00199     int waitfor(int packet_type, Timer& timer);
00200     int keepalive();
00201     int publish(int len, Timer& timer, enum QoS qos);
00202 
00203     int decodePacket(int* value, int timeout);
00204     int readPacket(Timer& timer);
00205     int sendPacket(int length, Timer& timer);
00206     int deliverMessage(MQTTString& topicName, Message& message);
00207     bool isTopicMatched(char* topicFilter, MQTTString& topicName);
00208 
00209     Network& ipstack;
00210     unsigned long command_timeout_ms;
00211 
00212     unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
00213     unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
00214 
00215     Timer last_sent, last_received;
00216     unsigned int keepAliveInterval;
00217     bool ping_outstanding;
00218     bool cleansession;
00219 
00220     PacketId packetid;
00221 
00222     struct MessageHandlers
00223     {
00224         const char* topicFilter;
00225         FP<void, MessageData&> fp;
00226     } messageHandlers[MAX_MESSAGE_HANDLERS];      // Message handlers are indexed by subscription topic
00227 
00228     FP<void, MessageData&> defaultMessageHandler;
00229 
00230     bool isconnected;
00231 
00232 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00233     unsigned char pubbuf[MAX_MQTT_PACKET_SIZE];  // store the last publish for sending on reconnect
00234     int inflightLen;
00235     unsigned short inflightMsgid;
00236     enum QoS inflightQoS;
00237 #endif
00238 
00239 #if MQTTCLIENT_QOS2
00240     bool pubrel;
00241     #if !defined(MAX_INCOMING_QOS2_MESSAGES)
00242         #define MAX_INCOMING_QOS2_MESSAGES 10
00243     #endif
00244     unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES];
00245     bool isQoS2msgidFree(unsigned short id);
00246     bool useQoS2msgid(unsigned short id);
00247     void freeQoS2msgid(unsigned short id);
00248 #endif
00249 
00250 };
00251 
00252 }
00253 
00254 
00255 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
00256 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::cleanSession() 
00257 {
00258     ping_outstanding = false;
00259     for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00260         messageHandlers[i].topicFilter = 0;
00261     isconnected = false;
00262 
00263 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00264     inflightMsgid = 0;
00265     inflightQoS = QOS0;
00266 #endif
00267 
00268 #if MQTTCLIENT_QOS2
00269     pubrel = false;
00270     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00271         incomingQoS2messages[i] = 0;
00272 #endif
00273 }
00274 
00275 
00276 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
00277 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms)  : ipstack(network), packetid()
00278 {
00279     last_sent = Timer();
00280     last_received = Timer();
00281     this->command_timeout_ms = command_timeout_ms;
00282     cleanSession();
00283 }
00284 
00285 
00286 #if MQTTCLIENT_QOS2
00287 template<class Network, class Timer, int a, int b>
00288 bool MQTT::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id)
00289 {
00290     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00291     {
00292         if (incomingQoS2messages[i] == id)
00293             return false;
00294     }
00295     return true;
00296 }
00297 
00298 
00299 template<class Network, class Timer, int a, int b>
00300 bool MQTT::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id)
00301 {
00302     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00303     {
00304         if (incomingQoS2messages[i] == 0)
00305         {
00306             incomingQoS2messages[i] = id;
00307             return true;
00308         }
00309     }
00310     return false;
00311 }
00312 
00313 
00314 template<class Network, class Timer, int a, int b>
00315 void MQTT::Client<Network, Timer, a, b>::freeQoS2msgid(unsigned short id)
00316 {
00317     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00318     {
00319         if (incomingQoS2messages[i] == id)
00320         {
00321             incomingQoS2messages[i] = 0;
00322             return;
00323         }
00324     }
00325 }
00326 #endif
00327 
00328 
00329 template<class Network, class Timer, int a, int b>
00330 int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer)
00331 {
00332     int rc = FAILURE,
00333         sent = 0;
00334 
00335     while (sent < length && !timer.expired())
00336     {
00337         rc = ipstack.write(&sendbuf[sent], length - sent, timer.left_ms());
00338         if (rc < 0)  // there was an error writing the data
00339             break;
00340         sent += rc;
00341     }
00342     if (sent == length)
00343     {
00344         if (this->keepAliveInterval > 0)
00345             last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet
00346         rc = SUCCESS;
00347     }
00348     else
00349         rc = FAILURE;
00350         
00351 #if defined(MQTT_DEBUG)
00352     char printbuf[150];
00353     char * MQTTFormat_toServerString(char* strbuf, int strbuflen, unsigned char* buf, int buflen);
00354     DEBUG("Rc %d from sending packet %s\n", rc, MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length));
00355 #endif
00356     return rc;
00357 }
00358 
00359 
00360 template<class Network, class Timer, int a, int b>
00361 int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout)
00362 {
00363     unsigned char c;
00364     int multiplier = 1;
00365     int len = 0;
00366     const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
00367 
00368     *value = 0;
00369     do
00370     {
00371         int rc = MQTTPACKET_READ_ERROR;
00372 
00373         if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
00374         {
00375             rc = MQTTPACKET_READ_ERROR; /* bad data */
00376             goto exit;
00377         }
00378         rc = ipstack.read(&c, 1, timeout);
00379         if (rc != 1)
00380             goto exit;
00381         *value += (c & 127) * multiplier;
00382         multiplier *= 128;
00383     } while ((c & 128) != 0);
00384 exit:
00385     return len;
00386 }
00387 
00388 
00389 /**
00390  * If any read fails in this method, then we should disconnect from the network, as on reconnect
00391  * the packets can be retried.
00392  * @param timeout the max time to wait for the packet read to complete, in milliseconds
00393  * @return the MQTT packet type, or -1 if none
00394  */
00395 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00396 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::readPacket(Timer& timer)
00397 {
00398     int rc = FAILURE;
00399     MQTTHeader header = {0};
00400     int len = 0;
00401     int rem_len = 0;
00402 
00403     /* 1. read the header byte.  This has the packet type in it */
00404     if (ipstack.read(readbuf, 1, timer.left_ms()) != 1)
00405         goto exit;
00406 
00407     len = 1;
00408     /* 2. read the remaining length.  This is variable in itself */
00409     decodePacket(&rem_len, timer.left_ms());
00410     len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */
00411 
00412     if (rem_len > (MAX_MQTT_PACKET_SIZE - len))
00413     {
00414         rc = BUFFER_OVERFLOW;
00415         goto exit;
00416     }
00417 
00418     /* 3. read the rest of the buffer using a callback to supply the rest of the data */
00419     if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len))
00420         goto exit;
00421 
00422     header.byte = readbuf[0];
00423     rc = header.bits.type;
00424     if (this->keepAliveInterval > 0)
00425         last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet
00426 exit:
00427         
00428 #if defined(MQTT_DEBUG)
00429     if (rc >= 0)
00430     {
00431         char printbuf[50];
00432         char* MQTTFormat_toClientString(char* strbuf, int strbuflen, unsigned char* buf, int buflen);
00433         DEBUG("Rc %d from receiving packet %s\n", rc, MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len));
00434     }
00435 #endif
00436     return rc;
00437 }
00438 
00439 
00440 // assume topic filter and name is in correct format
00441 // # can only be at end
00442 // + and # can only be next to separator
00443 template<class Network, class Timer, int a, int b>
00444 bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName)
00445 {
00446     char* curf = topicFilter;
00447     char* curn = topicName.lenstring.data;
00448     char* curn_end = curn + topicName.lenstring.len;
00449 
00450     while (*curf && curn < curn_end)
00451     {
00452         if (*curn == '/' && *curf != '/')
00453             break;
00454         if (*curf != '+' && *curf != '#' && *curf != *curn)
00455             break;
00456         if (*curf == '+')
00457         {   // skip until we meet the next separator, or end of string
00458             char* nextpos = curn + 1;
00459             while (nextpos < curn_end && *nextpos != '/')
00460                 nextpos = ++curn + 1;
00461         }
00462         else if (*curf == '#')
00463             curn = curn_end - 1;    // skip until end of string
00464         curf++;
00465         curn++;
00466     };
00467 
00468     return (curn == curn_end) && (*curf == '\0');
00469 }
00470 
00471 
00472 
00473 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
00474 int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message)
00475 {
00476     int rc = FAILURE;
00477 
00478     // we have to find the right message handler - indexed by topic
00479     for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00480     {
00481         if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) ||
00482                 isTopicMatched((char*)messageHandlers[i].topicFilter, topicName)))
00483         {
00484             if (messageHandlers[i].fp.attached())
00485             {
00486                 MessageData md(topicName, message);
00487                 messageHandlers[i].fp(md);
00488                 rc = SUCCESS;
00489             }
00490         }
00491     }
00492 
00493     if (rc == FAILURE && defaultMessageHandler.attached())
00494     {
00495         MessageData md(topicName, message);
00496         defaultMessageHandler(md);
00497         rc = SUCCESS;
00498     }
00499 
00500     return rc;
00501 }
00502 
00503 
00504 
00505 template<class Network, class Timer, int a, int b>
00506 int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms)
00507 {
00508     int rc = SUCCESS;
00509     Timer timer = Timer();
00510 
00511     timer.countdown_ms(timeout_ms);
00512     while (!timer.expired())
00513     {
00514         if (cycle(timer) < 0)
00515         {
00516             rc = FAILURE;
00517             break;
00518         }
00519     }
00520 
00521     return rc;
00522 }
00523 
00524 
00525 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00526 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer)
00527 {
00528     /* get one piece of work off the wire and one pass through */
00529 
00530     // read the socket, see what work is due
00531     int packet_type = readPacket(timer);
00532 
00533     int len = 0,
00534         rc = SUCCESS;
00535 
00536     switch (packet_type)
00537     {
00538         case FAILURE:
00539         case BUFFER_OVERFLOW:
00540             rc = packet_type;
00541             break;
00542         case CONNACK:
00543         case PUBACK:
00544         case SUBACK:
00545             break;
00546         case PUBLISH:
00547         {
00548             MQTTString topicName = MQTTString_initializer;
00549             Message msg;
00550             int intQoS;
00551             if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName,
00552                                  (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00553                 goto exit;
00554             msg.qos = (enum QoS)intQoS;
00555 #if MQTTCLIENT_QOS2
00556             if (msg.qos != QOS2)
00557 #endif
00558                 deliverMessage(topicName, msg);
00559 #if MQTTCLIENT_QOS2
00560             else if (isQoS2msgidFree(msg.id))
00561             {
00562                 if (useQoS2msgid(msg.id))
00563                     deliverMessage(topicName, msg);
00564                 else
00565                     WARN("Maximum number of incoming QoS2 messages exceeded");
00566             }
00567 #endif
00568 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00569             if (msg.qos != QOS0)
00570             {
00571                 if (msg.qos == QOS1)
00572                     len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id);
00573                 else if (msg.qos == QOS2)
00574                     len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id);
00575                 if (len <= 0)
00576                     rc = FAILURE;
00577                 else
00578                     rc = sendPacket(len, timer);
00579                 if (rc == FAILURE)
00580                     goto exit; // there was a problem
00581             }
00582             break;
00583 #endif
00584         }
00585 #if MQTTCLIENT_QOS2
00586         case PUBREC:
00587         case PUBREL:
00588             unsigned short mypacketid;
00589             unsigned char dup, type;
00590             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00591                 rc = FAILURE;
00592             else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, 
00593                         (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0)
00594                 rc = FAILURE;
00595             else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet
00596                 rc = FAILURE; // there was a problem
00597             if (rc == FAILURE)
00598                 goto exit; // there was a problem
00599             if (packet_type == PUBREL)
00600                 freeQoS2msgid(mypacketid);
00601             break;
00602             
00603         case PUBCOMP:
00604             break;
00605 #endif
00606         case PINGRESP:
00607             ping_outstanding = false;
00608             break;
00609     }
00610     keepalive();
00611 exit:
00612     if (rc == SUCCESS)
00613         rc = packet_type;
00614     return rc;
00615 }
00616 
00617 
00618 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00619 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive()
00620 {
00621     int rc = FAILURE;
00622 
00623     if (keepAliveInterval == 0)
00624     {
00625         rc = SUCCESS;
00626         goto exit;
00627     }
00628 
00629     if (last_sent.expired() || last_received.expired())
00630     {
00631         if (!ping_outstanding)
00632         {
00633             Timer timer(1000);
00634             int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
00635             if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet
00636                 ping_outstanding = true;
00637         }
00638     }
00639 
00640 exit:
00641     return rc;
00642 }
00643 
00644 
00645 // only used in single-threaded mode where one command at a time is in process
00646 template<class Network, class Timer, int a, int b>
00647 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer)
00648 {
00649     int rc = FAILURE;
00650 
00651     do
00652     {
00653         if (timer.expired())
00654             break; // we timed out
00655     }
00656     while ((rc = cycle(timer)) != packet_type);
00657 
00658     return rc;
00659 }
00660 
00661 
00662 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00663 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options)
00664 {
00665     Timer connect_timer(command_timeout_ms);
00666     int rc = FAILURE;
00667     int len = 0;
00668 
00669     if (isconnected) // don't send connect packet again if we are already connected
00670         goto exit;
00671 
00672     this->keepAliveInterval = options.keepAliveInterval;
00673     this->cleansession = options.cleansession;
00674     if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0)
00675         goto exit;
00676     if ((rc = sendPacket(len, connect_timer)) != SUCCESS)  // send the connect packet
00677         goto exit; // there was a problem
00678 
00679     if (this->keepAliveInterval > 0)
00680         last_received.countdown(this->keepAliveInterval);
00681     // this will be a blocking call, wait for the connack
00682     if (waitfor(CONNACK, connect_timer) == CONNACK)
00683     {
00684         unsigned char connack_rc = 255;
00685         bool sessionPresent = false;
00686         if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
00687             rc = connack_rc;
00688         else
00689             rc = FAILURE;
00690     }
00691     else
00692         rc = FAILURE;
00693 
00694 #if MQTTCLIENT_QOS2
00695     // resend any inflight publish
00696     if (inflightMsgid > 0 && inflightQoS == QOS2 && pubrel)
00697     {
00698         if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0)
00699             rc = FAILURE;
00700         else
00701             rc = publish(len, connect_timer, inflightQoS);
00702     }
00703     else
00704 #endif
00705 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00706     if (inflightMsgid > 0)
00707     {
00708         memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE);
00709         rc = publish(inflightLen, connect_timer, inflightQoS);
00710     }
00711 #endif
00712 
00713 exit:
00714     if (rc == SUCCESS)
00715         isconnected = true;
00716     return rc;
00717 }
00718 
00719 
00720 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00721 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect()
00722 {
00723     MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
00724     return connect(default_options);
00725 }
00726 
00727 
00728 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
00729 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
00730 {
00731     int rc = FAILURE;
00732     Timer timer(command_timeout_ms);
00733     int len = 0;
00734     MQTTString topic = {(char*)topicFilter, {0, 0}};
00735 
00736     if (!isconnected)
00737         goto exit;
00738 
00739     len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
00740     if (len <= 0)
00741         goto exit;
00742     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
00743         goto exit;             // there was a problem
00744 
00745     if (waitfor(SUBACK, timer) == SUBACK)      // wait for suback
00746     {
00747         int count = 0, grantedQoS = -1;
00748         unsigned short mypacketid;
00749         if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
00750             rc = grantedQoS; // 0, 1, 2 or 0x80
00751         if (rc != 0x80)
00752         {
00753             for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00754             {
00755                 if (messageHandlers[i].topicFilter == 0)
00756                 {
00757                     messageHandlers[i].topicFilter = topicFilter;
00758                     messageHandlers[i].fp.attach(messageHandler);
00759                     rc = 0;
00760                     break;
00761                 }
00762             }
00763         }
00764     }
00765     else
00766         rc = FAILURE;
00767 
00768 exit:
00769     if (rc != SUCCESS)
00770         cleanSession();
00771     return rc;
00772 }
00773 
00774 
00775 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
00776 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter)
00777 {
00778     int rc = FAILURE;
00779     Timer timer(command_timeout_ms);
00780     MQTTString topic = {(char*)topicFilter, {0, 0}};
00781     int len = 0;
00782 
00783     if (!isconnected)
00784         goto exit;
00785 
00786     if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0)
00787         goto exit;
00788     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet
00789         goto exit; // there was a problem
00790 
00791     if (waitfor(UNSUBACK, timer) == UNSUBACK)
00792     {
00793         unsigned short mypacketid;  // should be the same as the packetid above
00794         if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
00795         {
00796             rc = 0;
00797 
00798             // remove the subscription message handler associated with this topic, if there is one
00799             for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00800             {
00801                 if (messageHandlers[i].topicFilter && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0)
00802                 {
00803                     messageHandlers[i].topicFilter = 0;
00804                     break;
00805                 }
00806             }
00807         }
00808     }
00809     else
00810         rc = FAILURE;
00811 
00812 exit:
00813     if (rc != SUCCESS)
00814         cleanSession();
00815     return rc;
00816 }
00817 
00818 
00819 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00820 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos)
00821 {
00822     int rc;
00823 
00824     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet
00825         goto exit; // there was a problem
00826 
00827 #if MQTTCLIENT_QOS1
00828     if (qos == QOS1)
00829     {
00830         if (waitfor(PUBACK, timer) == PUBACK)
00831         {
00832             unsigned short mypacketid;
00833             unsigned char dup, type;
00834             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00835                 rc = FAILURE;
00836             else if (inflightMsgid == mypacketid)
00837                 inflightMsgid = 0;
00838         }
00839         else
00840             rc = FAILURE;
00841     }
00842 #elif MQTTCLIENT_QOS2
00843     else if (qos == QOS2)
00844     {
00845         if (waitfor(PUBCOMP, timer) == PUBCOMP)
00846         {
00847             unsigned short mypacketid;
00848             unsigned char dup, type;
00849             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00850                 rc = FAILURE;
00851             else if (inflightMsgid == mypacketid)
00852                 inflightMsgid = 0;
00853         }
00854         else
00855             rc = FAILURE;
00856     }
00857 #endif
00858 
00859 exit:
00860     if (rc != SUCCESS)
00861         cleanSession();
00862     return rc;
00863 }
00864 
00865 
00866 
00867 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00868 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained)
00869 {
00870     int rc = FAILURE;
00871     Timer timer(command_timeout_ms);
00872     MQTTString topicString = MQTTString_initializer;
00873     int len = 0;
00874 
00875     if (!isconnected)
00876         goto exit;
00877 
00878     topicString.cstring = (char*)topicName;
00879 
00880 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00881     if (qos == QOS1 || qos == QOS2)
00882         id = packetid.getNext();
00883 #endif
00884 
00885     len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id,
00886               topicString, (unsigned char*)payload, payloadlen);
00887     if (len <= 0)
00888         goto exit;
00889 
00890 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00891     if (!cleansession)
00892     {
00893         memcpy(pubbuf, sendbuf, len);
00894         inflightMsgid = id;
00895         inflightLen = len;
00896         inflightQoS = qos;
00897 #if MQTTCLIENT_QOS2
00898         pubrel = false;
00899 #endif
00900     }
00901 #endif
00902 
00903     rc = publish(len, timer, qos);
00904 exit:
00905     return rc;
00906 }
00907 
00908 
00909 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00910 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained)
00911 {
00912     unsigned short id = 0;  // dummy - not used for anything
00913     return publish(topicName, payload, payloadlen, id, qos, retained);
00914 }
00915 
00916 
00917 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00918 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message)
00919 {
00920     return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained);
00921 }
00922 
00923 
00924 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
00925 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect()
00926 {
00927     int rc = FAILURE;
00928     Timer timer(command_timeout_ms);            // we might wait for incomplete incoming publishes to complete
00929     int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE);
00930     if (len > 0)
00931         rc = sendPacket(len, timer);            // send the disconnect packet
00932 
00933     if (cleansession)
00934         cleanSession();
00935     else
00936         isconnected = false;
00937     return rc;
00938 }
00939 
00940 
00941 #endif