Junichi Katsu / Milkcocoa-os

Dependents:   mbed-os-example-wifi-milkcocoa MilkcocoaOsSample_Eth MilkcocoaOsSample_ESP8266 MilkcocoaOsSample_Eth_DigitalIn

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTClient.h Source File

MQTTClient.h

00001 /*******************************************************************************
00002  * Copyright (c) 2014, 2015 IBM Corp.
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors:
00014  *    Ian Craggs - initial API and implementation and/or initial documentation
00015  *    Ian Craggs - fix for bug 458512 - QoS 2 messages
00016  *    Ian Craggs - fix for bug 460389 - send loop uses wrong length
00017  *    Ian Craggs - fix for bug 464169 - clearing subscriptions
00018  *    Ian Craggs - fix for bug 464551 - enums and ints can be different size
00019  *******************************************************************************/
00020 
00021 #if !defined(MQTTCLIENT_H)
00022 #define MQTTCLIENT_H
00023 
00024 #include "FP.h"
00025 #include "MQTTPacket.h"
00026 #include "stdio.h"
00027 #include "MQTTLogging.h"
00028 #include "OldTimer.h"
00029 
00030 #if !defined(MQTTCLIENT_QOS1)
00031     #define MQTTCLIENT_QOS1 1
00032 #endif
00033 #if !defined(MQTTCLIENT_QOS2)
00034     #define MQTTCLIENT_QOS2 0
00035 #endif
00036 
00037 #if 1
00038 extern RawSerial pc;
00039 
00040 #define DBG(x)  x
00041 #else
00042 #define DBG(x)
00043 #endif
00044 
00045 namespace MQTT
00046 {
00047 
00048 
00049 enum QoS { QOS0, QOS1, QOS2 };
00050 
00051 // all failure return codes must be negative
00052 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
00053 
00054 
00055 struct Message
00056 {
00057     enum QoS qos;
00058     bool retained;
00059     bool dup;
00060     unsigned short id;
00061     void *payload;
00062     size_t payloadlen;
00063 };
00064 
00065 
00066 struct MessageData
00067 {
00068     MessageData(MQTTString &aTopicName, struct Message &aMessage)  : message(aMessage), topicName(aTopicName)
00069     { }
00070 
00071     struct Message &message;
00072     MQTTString &topicName;
00073 };
00074 
00075 
00076 class PacketId
00077 {
00078 public:
00079     PacketId()
00080     {
00081         next = 0;
00082     }
00083 
00084     int getNext()
00085     {
00086         return next = (next == MAX_PACKET_ID) ? 1 : ++next;
00087     }
00088 
00089 private:
00090     static const int MAX_PACKET_ID = 65535;
00091     int next;
00092 };
00093 
00094 
00095 /**
00096  * @class Client
00097  * @brief blocking, non-threaded MQTT client API
00098  *
00099  * This version of the API blocks on all method calls, until they are complete.  This means that only one
00100  * MQTT request can be in process at any one time.
00101  * @param Network a network class which supports send, receive
00102  * @param Timer a timer class with the methods:
00103  */
00104 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE = 256, int MAX_MESSAGE_HANDLERS = 5>
00105 class Client
00106 {
00107 
00108 public:
00109 
00110     typedef void (*messageHandler)(MessageData&);
00111 
00112     /** Construct the client
00113      *  @param network - pointer to an instance of the Network class - must be connected to the endpoint
00114      *      before calling MQTT connect
00115      *  @param limits an instance of the Limit class - to alter limits as required
00116      */
00117     Client(Network& network, unsigned int command_timeout_ms = 30000);
00118 
00119     /** Set the default message handling callback - used for any message which does not match a subscription message handler
00120      *  @param mh - pointer to the callback function
00121      */
00122     void setDefaultMessageHandler(messageHandler mh)
00123     {
00124         defaultMessageHandler.attach(mh);
00125     }
00126 
00127     /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
00128      *  The nework object must be connected to the network endpoint before calling this
00129      *  Default connect options are used
00130      *  @return success code -
00131      */
00132     int connect();
00133     
00134         /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
00135      *  The nework object must be connected to the network endpoint before calling this
00136      *  @param options - connect options
00137      *  @return success code -
00138      */
00139     int connect(MQTTPacket_connectData& options);
00140 
00141     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00142      *  @param topic - the topic to publish to
00143      *  @param message - the message to send
00144      *  @return success code -
00145      */
00146     int publish(const char* topicName, Message& message);
00147     
00148     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00149      *  @param topic - the topic to publish to
00150      *  @param payload - the data to send
00151      *  @param payloadlen - the length of the data
00152      *  @param qos - the QoS to send the publish at
00153      *  @param retained - whether the message should be retained
00154      *  @return success code -
00155      */
00156     int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false);
00157     
00158     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
00159      *  @param topic - the topic to publish to
00160      *  @param payload - the data to send
00161      *  @param payloadlen - the length of the data
00162      *  @param id - the packet id used - returned 
00163      *  @param qos - the QoS to send the publish at
00164      *  @param retained - whether the message should be retained
00165      *  @return success code -
00166      */
00167     int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false);
00168 
00169     /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
00170      *  @param topicFilter - a topic pattern which can include wildcards
00171      *  @param qos - the MQTT QoS to subscribe at
00172      *  @param mh - the callback function to be invoked when a message is received for this subscription
00173      *  @return success code -
00174      */
00175     int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh);
00176 
00177     /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback
00178      *  @param topicFilter - a topic pattern which can include wildcards
00179      *  @return success code -
00180      */
00181     int unsubscribe(const char* topicFilter);
00182 
00183     /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state
00184      *  @return success code -
00185      */
00186     int disconnect();
00187 
00188     /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive
00189      *  yield can be called if no other MQTT operation is needed.  This will also allow messages to be
00190      *  received.
00191      *  @param timeout_ms the time to wait, in milliseconds
00192      *  @return success code - on failure, this means the client has disconnected
00193      */
00194     int yield(unsigned long timeout_ms = 1000L);
00195 
00196     /** Is the client connected?
00197      *  @return flag - is the client connected or not?
00198      */
00199     bool isConnected()
00200     {
00201         return isconnected;
00202     }
00203 
00204 private:
00205 
00206     void cleanSession();
00207     int cycle(OldTimer& timer);
00208     int waitfor(int packet_type, OldTimer& timer);
00209     int keepalive();
00210     int publish(int len, OldTimer& timer, enum QoS qos);
00211 
00212     int decodePacket(int* value, int timeout);
00213     int readPacket(OldTimer& timer);
00214     int sendPacket(int length, OldTimer& timer);
00215     int deliverMessage(MQTTString& topicName, Message& message);
00216     bool isTopicMatched(char* topicFilter, MQTTString& topicName);
00217 
00218     Network& ipstack;
00219     unsigned long command_timeout_ms;
00220 
00221     unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
00222     unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
00223 
00224     OldTimer last_sent, last_received;
00225     unsigned int keepAliveInterval;
00226     bool ping_outstanding;
00227     bool cleansession;
00228 
00229     PacketId packetid;
00230 
00231     struct MessageHandlers
00232     {
00233         const char* topicFilter;
00234         _FP<void, MessageData&> fp;
00235     } messageHandlers[MAX_MESSAGE_HANDLERS];      // Message handlers are indexed by subscription topic
00236 
00237     _FP<void, MessageData&> defaultMessageHandler;
00238 
00239     bool isconnected;
00240 
00241 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00242     unsigned char pubbuf[MAX_MQTT_PACKET_SIZE];  // store the last publish for sending on reconnect
00243     int inflightLen;
00244     unsigned short inflightMsgid;
00245     enum QoS inflightQoS;
00246 #endif
00247 
00248 #if MQTTCLIENT_QOS2
00249     bool pubrel;
00250     #if !defined(MAX_INCOMING_QOS2_MESSAGES)
00251         #define MAX_INCOMING_QOS2_MESSAGES 10
00252     #endif
00253     unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES];
00254     bool isQoS2msgidFree(unsigned short id);
00255     bool useQoS2msgid(unsigned short id);
00256     void freeQoS2msgid(unsigned short id);
00257 #endif
00258 
00259 };
00260 
00261 }
00262 
00263 
00264 template<class Network, class OldTimer, int a, int MAX_MESSAGE_HANDLERS>
00265 void MQTT::Client<Network, OldTimer, a, MAX_MESSAGE_HANDLERS>::cleanSession() 
00266 {
00267     ping_outstanding = false;
00268     for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00269         messageHandlers[i].topicFilter = 0;
00270     isconnected = false;
00271 
00272 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00273     inflightMsgid = 0;
00274     inflightQoS = QOS0;
00275 #endif
00276 
00277 #if MQTTCLIENT_QOS2
00278     pubrel = false;
00279     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00280         incomingQoS2messages[i] = 0;
00281 #endif
00282 }
00283 
00284 
00285 template<class Network, class OldTimer, int a, int MAX_MESSAGE_HANDLERS>
00286 MQTT::Client<Network, OldTimer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms)  : ipstack(network), packetid()
00287 {
00288     last_sent = OldTimer();
00289     last_received = OldTimer();
00290     this->command_timeout_ms = command_timeout_ms;
00291     cleanSession();
00292 }
00293 
00294 
00295 #if MQTTCLIENT_QOS2
00296 template<class Network, class OldTimer, int a, int b>
00297 bool MQTT::Client<Network, OldTimer, a, b>::isQoS2msgidFree(unsigned short id)
00298 {
00299     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00300     {
00301         if (incomingQoS2messages[i] == id)
00302             return false;
00303     }
00304     return true;
00305 }
00306 
00307 
00308 template<class Network, class OldTimer, int a, int b>
00309 bool MQTT::Client<Network, OldTimer, a, b>::useQoS2msgid(unsigned short id)
00310 {
00311     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00312     {
00313         if (incomingQoS2messages[i] == 0)
00314         {
00315             incomingQoS2messages[i] = id;
00316             return true;
00317         }
00318     }
00319     return false;
00320 }
00321 
00322 
00323 template<class Network, class OldTimer, int a, int b>
00324 void MQTT::Client<Network, OldTimer, a, b>::freeQoS2msgid(unsigned short id)
00325 {
00326     for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
00327     {
00328         if (incomingQoS2messages[i] == id)
00329         {
00330             incomingQoS2messages[i] = 0;
00331             return;
00332         }
00333     }
00334 }
00335 #endif
00336 
00337 
00338 template<class Network, class OldTimer, int a, int b>
00339 int MQTT::Client<Network, OldTimer, a, b>::sendPacket(int length, OldTimer& timer)
00340 {
00341     int rc = FAILURE,
00342         sent = 0;
00343 
00344     while (sent < length && !timer.expired())
00345     {
00346         rc = ipstack.write(&sendbuf[sent], length - sent, timer.left_ms());
00347         if (rc < 0)  // there was an error writing the data
00348             break;
00349         sent += rc;
00350     }
00351     if (sent == length)
00352     {
00353         if (this->keepAliveInterval > 0)
00354             last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet
00355         rc = SUCCESS;
00356     }
00357     else
00358         rc = FAILURE;
00359         
00360 #if defined(MQTT_DEBUG)
00361     char printbuf[150];
00362     DEBUG("Rc %d from sending packet %s\n", rc, MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length));
00363 #endif
00364     return rc;
00365 }
00366 
00367 
00368 template<class Network, class OldTimer, int a, int b>
00369 int MQTT::Client<Network, OldTimer, a, b>::decodePacket(int* value, int timeout)
00370 {
00371     unsigned char c;
00372     int multiplier = 1;
00373     int len = 0;
00374     const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
00375 
00376     *value = 0;
00377     do
00378     {
00379         int rc = MQTTPACKET_READ_ERROR;
00380 
00381         if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
00382         {
00383             rc = MQTTPACKET_READ_ERROR; /* bad data */
00384             goto exit;
00385         }
00386         rc = ipstack.read(&c, 1, timeout);
00387         if (rc != 1)
00388             goto exit;
00389         *value += (c & 127) * multiplier;
00390         multiplier *= 128;
00391     } while ((c & 128) != 0);
00392 exit:
00393     return len;
00394 }
00395 
00396 
00397 /**
00398  * If any read fails in this method, then we should disconnect from the network, as on reconnect
00399  * the packets can be retried.
00400  * @param timeout the max time to wait for the packet read to complete, in milliseconds
00401  * @return the MQTT packet type, or -1 if none
00402  */
00403 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int b>
00404 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, b>::readPacket(OldTimer& timer)
00405 {
00406     int rc = FAILURE;
00407     MQTTHeader header = {0};
00408     int len = 0;
00409     int rem_len = 0;
00410 
00411     /* 1. read the header byte.  This has the packet type in it */
00412     if (ipstack.read(readbuf, 1, timer.left_ms()) != 1)
00413         goto exit;
00414 
00415     len = 1;
00416     /* 2. read the remaining length.  This is variable in itself */
00417     decodePacket(&rem_len, timer.left_ms());
00418     len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */
00419 
00420     if (rem_len > (MAX_MQTT_PACKET_SIZE - len))
00421     {
00422         rc = BUFFER_OVERFLOW;
00423         goto exit;
00424     }
00425 
00426     /* 3. read the rest of the buffer using a callback to supply the rest of the data */
00427     if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len))
00428         goto exit;
00429 
00430     header.byte = readbuf[0];
00431     rc = header.bits.type;
00432     if (this->keepAliveInterval > 0)
00433         last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet
00434 exit:
00435         
00436 #if defined(MQTT_DEBUG)
00437     if (rc >= 0)
00438     {
00439         char printbuf[50];
00440         DEBUG("Rc %d from receiving packet %s\n", rc, MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len));
00441     }
00442 #endif
00443     return rc;
00444 }
00445 
00446 
00447 // assume topic filter and name is in correct format
00448 // # can only be at end
00449 // + and # can only be next to separator
00450 template<class Network, class OldTimer, int a, int b>
00451 bool MQTT::Client<Network, OldTimer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName)
00452 {
00453     char* curf = topicFilter;
00454     char* curn = topicName.lenstring.data;
00455     char* curn_end = curn + topicName.lenstring.len;
00456 
00457     while (*curf && curn < curn_end)
00458     {
00459         if (*curn == '/' && *curf != '/')
00460             break;
00461         if (*curf != '+' && *curf != '#' && *curf != *curn)
00462             break;
00463         if (*curf == '+')
00464         {   // skip until we meet the next separator, or end of string
00465             char* nextpos = curn + 1;
00466             while (nextpos < curn_end && *nextpos != '/')
00467                 nextpos = ++curn + 1;
00468         }
00469         else if (*curf == '#')
00470             curn = curn_end - 1;    // skip until end of string
00471         curf++;
00472         curn++;
00473     };
00474 
00475     return (curn == curn_end) && (*curf == '\0');
00476 }
00477 
00478 
00479 
00480 template<class Network, class OldTimer, int a, int MAX_MESSAGE_HANDLERS>
00481 int MQTT::Client<Network, OldTimer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message)
00482 {
00483     int rc = FAILURE;
00484 
00485     // we have to find the right message handler - indexed by topic
00486     for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00487     {
00488         if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) ||
00489                 isTopicMatched((char*)messageHandlers[i].topicFilter, topicName)))
00490         {
00491             if (messageHandlers[i].fp.attached())
00492             {
00493                 MessageData md(topicName, message);
00494                 messageHandlers[i].fp(md);
00495                 rc = SUCCESS;
00496             }
00497         }
00498     }
00499 
00500     if (rc == FAILURE && defaultMessageHandler.attached())
00501     {
00502         MessageData md(topicName, message);
00503         defaultMessageHandler(md);
00504         rc = SUCCESS;
00505     }
00506 
00507     return rc;
00508 }
00509 
00510 
00511 
00512 template<class Network, class OldTimer, int a, int b>
00513 int MQTT::Client<Network, OldTimer, a, b>::yield(unsigned long timeout_ms)
00514 {
00515     int rc = SUCCESS;
00516     OldTimer timer = OldTimer();
00517 
00518     timer.countdown_ms(timeout_ms);
00519     while (!timer.expired())
00520     {
00521         if (cycle(timer) < 0)
00522         {
00523             rc = FAILURE;
00524             break;
00525         }
00526         Thread::wait(10);
00527     }
00528 
00529     return rc;
00530 }
00531 
00532 
00533 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int b>
00534 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, b>::cycle(OldTimer& timer)
00535 {
00536     /* get one piece of work off the wire and one pass through */
00537 
00538     // read the socket, see what work is due
00539     int packet_type = readPacket(timer);
00540 
00541     int len = 0,
00542         rc = SUCCESS;
00543 
00544     switch (packet_type)
00545     {
00546         case FAILURE:
00547         case BUFFER_OVERFLOW:
00548             rc = packet_type;
00549             break;
00550         case CONNACK:
00551         case PUBACK:
00552         case SUBACK:
00553             break;
00554         case PUBLISH:
00555         {
00556             MQTTString topicName = MQTTString_initializer;
00557             Message msg;
00558             int intQoS;
00559             if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName,
00560                                  (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00561                 goto exit;
00562             msg.qos = (enum QoS)intQoS;
00563 #if MQTTCLIENT_QOS2
00564             if (msg.qos != QOS2)
00565 #endif
00566                 deliverMessage(topicName, msg);
00567 #if MQTTCLIENT_QOS2
00568             else if (isQoS2msgidFree(msg.id))
00569             {
00570                 if (useQoS2msgid(msg.id))
00571                     deliverMessage(topicName, msg);
00572                 else
00573                     WARN("Maximum number of incoming QoS2 messages exceeded");
00574             }
00575 #endif
00576 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00577             if (msg.qos != QOS0)
00578             {
00579                 if (msg.qos == QOS1)
00580                     len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id);
00581                 else if (msg.qos == QOS2)
00582                     len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id);
00583                 if (len <= 0)
00584                     rc = FAILURE;
00585                 else
00586                     rc = sendPacket(len, timer);
00587                 if (rc == FAILURE)
00588                     goto exit; // there was a problem
00589             }
00590             break;
00591 #endif
00592         }
00593 #if MQTTCLIENT_QOS2
00594         case PUBREC:
00595         case PUBREL:
00596             unsigned short mypacketid;
00597             unsigned char dup, type;
00598             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00599                 rc = FAILURE;
00600             else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, 
00601                         (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0)
00602                 rc = FAILURE;
00603             else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet
00604                 rc = FAILURE; // there was a problem
00605             if (rc == FAILURE)
00606                 goto exit; // there was a problem
00607             if (packet_type == PUBREL)
00608                 freeQoS2msgid(mypacketid);
00609             break;
00610             
00611         case PUBCOMP:
00612             break;
00613 #endif
00614         case PINGRESP:
00615             ping_outstanding = false;
00616             break;
00617     }
00618     keepalive();
00619 exit:
00620     if (rc == SUCCESS)
00621         rc = packet_type;
00622     return rc;
00623 }
00624 
00625 
00626 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int b>
00627 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, b>::keepalive()
00628 {
00629     int rc = FAILURE;
00630 
00631     if (keepAliveInterval == 0)
00632     {
00633         rc = SUCCESS;
00634         goto exit;
00635     }
00636 
00637     if (last_sent.expired() || last_received.expired())
00638     {
00639         if (!ping_outstanding)
00640         {
00641             OldTimer timer(1000);
00642             int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
00643             if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet
00644                 ping_outstanding = true;
00645         }
00646     }
00647 
00648 exit:
00649     return rc;
00650 }
00651 
00652 
00653 // only used in single-threaded mode where one command at a time is in process
00654 template<class Network, class OldTimer, int a, int b>
00655 int MQTT::Client<Network, OldTimer, a, b>::waitfor(int packet_type, OldTimer& timer)
00656 {
00657     int rc = FAILURE;
00658 
00659     do
00660     {
00661         if (timer.expired())
00662             break; // we timed out
00663     }
00664     while ((rc = cycle(timer)) != packet_type);
00665 
00666     return rc;
00667 }
00668 
00669 
00670 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int b>
00671 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options)
00672 {
00673     OldTimer connect_timer(command_timeout_ms);
00674     int rc = FAILURE;
00675     int len = 0;
00676 
00677     if (isconnected) // don't send connect packet again if we are already connected
00678         goto exit;
00679 
00680     this->keepAliveInterval = options.keepAliveInterval;
00681     this->cleansession = options.cleansession;
00682     if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0)
00683         goto exit;
00684     if ((rc = sendPacket(len, connect_timer)) != SUCCESS)  // send the connect packet
00685         goto exit; // there was a problem
00686 
00687     if (this->keepAliveInterval > 0)
00688         last_received.countdown(this->keepAliveInterval);
00689     // this will be a blocking call, wait for the connack
00690     if (waitfor(CONNACK, connect_timer) == CONNACK)
00691     {
00692         unsigned char connack_rc = 255;
00693         bool sessionPresent = false;
00694         if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
00695             rc = connack_rc;
00696         else
00697             rc = FAILURE;
00698     }
00699     else
00700         rc = FAILURE;
00701 
00702 #if MQTTCLIENT_QOS2
00703     // resend any inflight publish
00704     if (inflightMsgid > 0 && inflightQoS == QOS2 && pubrel)
00705     {
00706         if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0)
00707             rc = FAILURE;
00708         else
00709             rc = publish(len, connect_timer, inflightQoS);
00710     }
00711     else
00712 #endif
00713 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00714     if (inflightMsgid > 0)
00715     {
00716         memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE);
00717         rc = publish(inflightLen, connect_timer, inflightQoS);
00718     }
00719 #endif
00720 
00721 exit:
00722     if (rc == SUCCESS)
00723         isconnected = true;
00724     return rc;
00725 }
00726 
00727 
00728 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int b>
00729 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, b>::connect()
00730 {
00731     MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
00732     return connect(default_options);
00733 }
00734 
00735 
00736 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
00737 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
00738 {
00739     int rc = FAILURE;
00740     OldTimer timer(command_timeout_ms);
00741     int len = 0;
00742     MQTTString topic = {(char*)topicFilter, {0, 0}};
00743 
00744     if (!isconnected)
00745         goto exit;
00746 
00747     len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
00748     if (len <= 0)
00749         goto exit;
00750     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
00751         goto exit;             // there was a problem
00752 
00753     if (waitfor(SUBACK, timer) == SUBACK)      // wait for suback
00754     {
00755         int count = 0, grantedQoS = -1;
00756         unsigned short mypacketid;
00757         if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
00758             rc = grantedQoS; // 0, 1, 2 or 0x80
00759         if (rc != 0x80)
00760         {
00761             for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00762             {
00763                 if (messageHandlers[i].topicFilter == 0)
00764                 {
00765                     messageHandlers[i].topicFilter = topicFilter;
00766                     messageHandlers[i].fp.attach(messageHandler);
00767                     rc = 0;
00768                     break;
00769                 }
00770             }
00771         }
00772     }
00773     else
00774         rc = FAILURE;
00775 
00776 exit:
00777     if (rc != SUCCESS)
00778         cleanSession();
00779     return rc;
00780 }
00781 
00782 
00783 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
00784 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter)
00785 {
00786     int rc = FAILURE;
00787     OldTimer timer(command_timeout_ms);
00788     MQTTString topic = {(char*)topicFilter, {0, 0}};
00789     int len = 0;
00790 
00791     if (!isconnected)
00792         goto exit;
00793 
00794     if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0)
00795         goto exit;
00796     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet
00797         goto exit; // there was a problem
00798 
00799     if (waitfor(UNSUBACK, timer) == UNSUBACK)
00800     {
00801         unsigned short mypacketid;  // should be the same as the packetid above
00802         if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
00803         {
00804             rc = 0;
00805 
00806             // remove the subscription message handler associated with this topic, if there is one
00807             for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00808             {
00809                 if (messageHandlers[i].topicFilter && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0)
00810                 {
00811                     messageHandlers[i].topicFilter = 0;
00812                     break;
00813                 }
00814             }
00815         }
00816     }
00817     else
00818         rc = FAILURE;
00819 
00820 exit:
00821     if (rc != SUCCESS)
00822         cleanSession();
00823     return rc;
00824 }
00825 
00826 
00827 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int b>
00828 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, OldTimer& timer, enum QoS qos)
00829 {
00830     int rc;
00831 
00832     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet
00833         goto exit; // there was a problem
00834 
00835 #if MQTTCLIENT_QOS1
00836     if (qos == QOS1)
00837     {
00838         if (waitfor(PUBACK, timer) == PUBACK)
00839         {
00840             unsigned short mypacketid;
00841             unsigned char dup, type;
00842             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00843                 rc = FAILURE;
00844             else if (inflightMsgid == mypacketid)
00845                 inflightMsgid = 0;
00846         }
00847         else
00848             rc = FAILURE;
00849     }
00850 #elif MQTTCLIENT_QOS2
00851     else if (qos == QOS2)
00852     {
00853         if (waitfor(PUBCOMP, timer) == PUBCOMP)
00854         {
00855             unsigned short mypacketid;
00856             unsigned char dup, type;
00857             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00858                 rc = FAILURE;
00859             else if (inflightMsgid == mypacketid)
00860                 inflightMsgid = 0;
00861         }
00862         else
00863             rc = FAILURE;
00864     }
00865 #endif
00866 
00867 exit:
00868     if (rc != SUCCESS)
00869         cleanSession();
00870     return rc;
00871 }
00872 
00873 
00874 
00875 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int b>
00876 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained)
00877 {
00878     int rc = FAILURE;
00879     OldTimer timer(command_timeout_ms);
00880     MQTTString topicString = MQTTString_initializer;
00881     int len = 0;
00882 
00883     if (!isconnected)
00884         goto exit;
00885 
00886     topicString.cstring = (char*)topicName;
00887 
00888 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00889     if (qos == QOS1 || qos == QOS2)
00890         id = packetid.getNext();
00891 #endif
00892 
00893     len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id,
00894               topicString, (unsigned char*)payload, payloadlen);
00895     if (len <= 0)
00896         goto exit;
00897 
00898 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
00899     if (!cleansession)
00900     {
00901         memcpy(pubbuf, sendbuf, len);
00902         inflightMsgid = id;
00903         inflightLen = len;
00904         inflightQoS = qos;
00905 #if MQTTCLIENT_QOS2
00906         pubrel = false;
00907 #endif
00908     }
00909 #endif
00910 
00911     rc = publish(len, timer, qos);
00912 exit:
00913     return rc;
00914 }
00915 
00916 
00917 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int b>
00918 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained)
00919 {
00920     unsigned short id = 0;  // dummy - not used for anything
00921     return publish(topicName, payload, payloadlen, id, qos, retained);
00922 }
00923 
00924 
00925 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int b>
00926 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message)
00927 {
00928     return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained);
00929 }
00930 
00931 
00932 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int b>
00933 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, b>::disconnect()
00934 {
00935     int rc = FAILURE;
00936     OldTimer timer(command_timeout_ms);            // we might wait for incomplete incoming publishes to complete
00937     int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE);
00938     if (len > 0)
00939         rc = sendPacket(len, timer);            // send the disconnect packet
00940 
00941     if (cleansession)
00942         cleanSession();
00943     else
00944         isconnected = false;
00945     return rc;
00946 }
00947 
00948 
00949 #endif