Better timing

Dependencies:   FP MQTTPacket

Fork of MQTT by MQTT

Revision:
43:21da1f744243
Parent:
42:f5beda831651
Child:
44:c299463ae853
--- a/MQTTClient.h	Thu Aug 21 12:40:43 2014 +0000
+++ b/MQTTClient.h	Mon Sep 29 11:34:26 2014 +0000
@@ -13,14 +13,6 @@
  * Contributors:
  *    Ian Craggs - initial API and implementation and/or initial documentation
  *******************************************************************************/
- 
- /*
- 
- TODO: 
- 
- ensure publish packets are retried on reconnect
- 
- */
 
 #if !defined(MQTTCLIENT_H)
 #define MQTTCLIENT_H
@@ -28,7 +20,14 @@
 #include "FP.h"
 #include "MQTTPacket.h"
 #include "stdio.h"
-#include "MQTT_logging.h"
+#include "MQTTLogging.h"
+
+#if !defined(MQTTCLIENT_QOS1)
+    #define MQTTCLIENT_QOS1 1
+#endif
+#if !defined(MQTTCLIENT_QOS2)
+    #define MQTTCLIENT_QOS2 0
+#endif
 
 namespace MQTT
 {
@@ -55,7 +54,7 @@
 {
     MessageData(MQTTString &aTopicName, struct Message &aMessage)  : message(aMessage), topicName(aTopicName)
     { }
-    
+
     struct Message &message;
     MQTTString &topicName;
 };
@@ -68,44 +67,33 @@
     {
         next = 0;
     }
-    
+
     int getNext()
     {
         return next = (next == MAX_PACKET_ID) ? 1 : ++next;
     }
-   
+
 private:
     static const int MAX_PACKET_ID = 65535;
     int next;
 };
 
 
-class QoS2
-{
-public:
-
-    
-private:
-
-
-};
-  
-  
 /**
  * @class Client
  * @brief blocking, non-threaded MQTT client API
- * 
+ *
  * This version of the API blocks on all method calls, until they are complete.  This means that only one
- * MQTT request can be in process at any one time.  
+ * MQTT request can be in process at any one time.
  * @param Network a network class which supports send, receive
- * @param Timer a timer class with the methods: 
- */ 
+ * @param Timer a timer class with the methods:
+ */
 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5>
 class Client
 {
-    
+
 public:
-   
+
     typedef void (*messageHandler)(MessageData&);
 
     /** Construct the client
@@ -113,8 +101,8 @@
      *      before calling MQTT connect
      *  @param limits an instance of the Limit class - to alter limits as required
      */
-    Client(Network& network, unsigned int command_timeout_ms = 30000); 
-    
+    Client(Network& network, unsigned int command_timeout_ms = 30000);
+
     /** Set the default message handling callback - used for any message which does not match a subscription message handler
      *  @param mh - pointer to the callback function
      */
@@ -122,95 +110,135 @@
     {
         defaultMessageHandler.attach(mh);
     }
-    
-    void setLogHandler()
-    {
-        logHandler.attach(lh);
-    }
 
+    /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
+     *  The nework object must be connected to the network endpoint before calling this
+     *  Default connect options are used
+     *  @return success code -
+     */
+    int connect();
     
     /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
-     *  The nework object must be connected to the network endpoint before calling this 
+     *  The nework object must be connected to the network endpoint before calling this
      *  @param options - connect options
-     *  @return success code -  
-     */       
-    int connect(MQTTPacket_connectData* options = 0);
-      
+     *  @return success code -
+     */
+    int connect(MQTTPacket_connectData& options);
+
     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
      *  @param topic - the topic to publish to
      *  @param message - the message to send
-     *  @return success code -  
-     */      
-    int publish(const char* topicName, Message* message);
-   
+     *  @return success code -
+     */
+    int publish(const char* topicName, Message& message);
+    
+    /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
+     *  @param topic - the topic to publish to
+     *  @param payload - the data to send
+     *  @param payloadlen - the length of the data
+     *  @param qos - the QoS to send the publish at
+     *  @param retained - whether the message should be retained
+     *  @return success code -
+     */
+    int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false);
+    
+    /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
+     *  @param topic - the topic to publish to
+     *  @param payload - the data to send
+     *  @param payloadlen - the length of the data
+     *  @param id - the packet id used - returned 
+     *  @param qos - the QoS to send the publish at
+     *  @param retained - whether the message should be retained
+     *  @return success code -
+     */
+    int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false);
+
     /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
      *  @param topicFilter - a topic pattern which can include wildcards
      *  @param qos - the MQTT QoS to subscribe at
      *  @param mh - the callback function to be invoked when a message is received for this subscription
-     *  @return success code -  
-     */   
+     *  @return success code -
+     */
     int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh);
-    
+
     /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback
      *  @param topicFilter - a topic pattern which can include wildcards
-     *  @return success code -  
-     */   
+     *  @return success code -
+     */
     int unsubscribe(const char* topicFilter);
-    
+
     /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state
-     *  @return success code -  
+     *  @return success code -
      */
     int disconnect();
-    
+
     /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive
-     *  yield can be called if no other MQTT operation is needed.  This will also allow messages to be 
+     *  yield can be called if no other MQTT operation is needed.  This will also allow messages to be
      *  received.
      *  @param timeout_ms the time to wait, in milliseconds
      *  @return success code - on failure, this means the client has disconnected
      */
-    int yield(int timeout_ms = 1000);
-    
+    int yield(unsigned long timeout_ms = 1000L);
+
+    /** Is the client connected?
+     *  @return flag - is the client connected or not?
+     */
+    bool isConnected()
+    {
+        return isconnected;
+    }
+
 private:
 
     int cycle(Timer& timer);
     int waitfor(int packet_type, Timer& timer);
     int keepalive();
+    int publish(int len, Timer& timer, enum QoS qos);
 
     int decodePacket(int* value, int timeout);
     int readPacket(Timer& timer);
     int sendPacket(int length, Timer& timer);
     int deliverMessage(MQTTString& topicName, Message& message);
     bool isTopicMatched(char* topicFilter, MQTTString& topicName);
-    
+
     Network& ipstack;
-    unsigned int command_timeout_ms;
-    
-    unsigned char buf[MAX_MQTT_PACKET_SIZE];  
-    unsigned char readbuf[MAX_MQTT_PACKET_SIZE];  
+    unsigned long command_timeout_ms;
 
-    Timer ping_timer;
+    unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
+    unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
+
+    Timer last_sent, last_received;
     unsigned int keepAliveInterval;
     bool ping_outstanding;
-    
+    bool cleansession;
+
     PacketId packetid;
-    
+
     struct MessageHandlers
     {
         const char* topicFilter;
         FP<void, MessageData&> fp;
     } messageHandlers[MAX_MESSAGE_HANDLERS];      // Message handlers are indexed by subscription topic
-    
+
     FP<void, MessageData&> defaultMessageHandler;
-     
+
     bool isconnected;
-    
-#if 0
-    struct
-    {
-      bool used;
-      int id;  
-    } QoS2messages[MAX_QOS2_MESSAGES];
-    
+
+#if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
+    unsigned char pubbuf[MAX_MQTT_PACKET_SIZE];  // store the last publish for sending on reconnect
+    int inflightLen;
+    unsigned short inflightMsgid;
+    enum QoS inflightQoS;
+#endif
+
+#if MQTTCLIENT_QOS2
+    bool pubrel;
+    #if !defined(MAX_INCOMING_QOS2_MESSAGES)
+        #define MAX_INCOMING_QOS2_MESSAGES 10
+    #endif
+    unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES];
+    bool isQoS2msgidFree(unsigned short id);
+    bool useQoS2msgid(unsigned short id);
 #endif
 
 };
@@ -218,45 +246,90 @@
 }
 
 
-template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 
+template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms)  : ipstack(network), packetid()
 {
-    ping_timer = Timer();
-    ping_outstanding = 0;
+    last_sent = Timer();
+    last_received = Timer();
+    ping_outstanding = false;
     for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
         messageHandlers[i].topicFilter = 0;
-    this->command_timeout_ms = command_timeout_ms; 
+    this->command_timeout_ms = command_timeout_ms;
     isconnected = false;
+    
+#if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
+    inflightMsgid = 0;
+    inflightQoS = QOS0;
+#endif
+
+    
+#if MQTTCLIENT_QOS2
+    pubrel = false;
+    for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
+        incomingQoS2messages[i] = 0;
+#endif
+}
+
+#if MQTTCLIENT_QOS2
+template<class Network, class Timer, int a, int b>
+bool MQTT::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id)
+{
+    for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
+    {
+        if (incomingQoS2messages[i] == id)
+            return false;
+    }
+    return true;
 }
 
 
-template<class Network, class Timer, int a, int b> 
+template<class Network, class Timer, int a, int b>
+bool MQTT::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id)
+{
+    for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
+    {
+        if (incomingQoS2messages[i] == 0)
+        {
+            incomingQoS2messages[i] = id;
+            return true;
+        }
+    }
+    return false;
+}
+#endif
+
+
+template<class Network, class Timer, int a, int b>
 int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer)
 {
-    int rc = FAILURE, 
+    int rc = FAILURE,
         sent = 0;
-    
+
     while (sent < length && !timer.expired())
     {
-        rc = ipstack.write(&buf[sent], length, timer.left_ms());
+        rc = ipstack.write(&sendbuf[sent], length, timer.left_ms());
         if (rc < 0)  // there was an error writing the data
             break;
         sent += rc;
     }
     if (sent == length)
     {
-        ping_timer.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet    
+        if (this->keepAliveInterval > 0)
+            last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet
         rc = SUCCESS;
-        //if (debug)
-        // Log (packet)
     }
     else
         rc = FAILURE;
+        
+#if defined(MQTT_DEBUG)
+    char printbuf[50];
+    DEBUG("Rc %d from sending packet %s\n", rc, MQTTPacket_toString(printbuf, sizeof(printbuf), sendbuf, length));
+#endif
     return rc;
 }
 
 
-template<class Network, class Timer, int a, int b> 
+template<class Network, class Timer, int a, int b>
 int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout)
 {
     unsigned char c;
@@ -287,12 +360,12 @@
 
 /**
  * If any read fails in this method, then we should disconnect from the network, as on reconnect
- * the packets can be retried. 
+ * the packets can be retried.
  * @param timeout the max time to wait for the packet read to complete, in milliseconds
  * @return the MQTT packet type, or -1 if none
  */
-template<class Network, class Timer, int a, int b> 
-int MQTT::Client<Network, Timer, a, b>::readPacket(Timer& timer) 
+template<class Network, class Timer, int a, int b>
+int MQTT::Client<Network, Timer, a, b>::readPacket(Timer& timer)
 {
     int rc = FAILURE;
     MQTTHeader header = {0};
@@ -306,15 +379,22 @@
     len = 1;
     /* 2. read the remaining length.  This is variable in itself */
     decodePacket(&rem_len, timer.left_ms());
-    len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
+    len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */
 
     /* 3. read the rest of the buffer using a callback to supply the rest of the data */
-    if (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len)
+    if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len))
         goto exit;
 
     header.byte = readbuf[0];
     rc = header.bits.type;
+    if (this->keepAliveInterval > 0)
+        last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet
 exit:
+        
+#if defined(MQTT_DEBUG)
+    char printbuf[50];
+    DEBUG("Rc %d from receiving packet %s\n", rc, MQTTPacket_toString(printbuf, sizeof(printbuf), readbuf, len));
+#endif
     return rc;
 }
 
@@ -322,13 +402,13 @@
 // assume topic filter and name is in correct format
 // # can only be at end
 // + and # can only be next to separator
-template<class Network, class Timer, int a, int b> 
+template<class Network, class Timer, int a, int b>
 bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName)
 {
     char* curf = topicFilter;
     char* curn = topicName.lenstring.data;
     char* curn_end = curn + topicName.lenstring.len;
-    
+
     while (*curf && curn < curn_end)
     {
         if (*curn == '/' && *curf != '/')
@@ -346,13 +426,13 @@
         curf++;
         curn++;
     };
-    
+
     return (curn == curn_end) && (*curf == '\0');
 }
 
 
 
-template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 
+template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
 int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message)
 {
     int rc = FAILURE;
@@ -371,25 +451,25 @@
             }
         }
     }
-    
-    if (rc == FAILURE && defaultMessageHandler.attached()) 
+
+    if (rc == FAILURE && defaultMessageHandler.attached())
     {
         MessageData md(topicName, message);
         defaultMessageHandler(md);
         rc = SUCCESS;
-    }   
-    
+    }
+
     return rc;
 }
 
 
 
-template<class Network, class Timer, int a, int b> 
-int MQTT::Client<Network, Timer, a, b>::yield(int timeout_ms)
+template<class Network, class Timer, int a, int b>
+int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms)
 {
     int rc = SUCCESS;
     Timer timer = Timer();
-    
+
     timer.countdown_ms(timeout_ms);
     while (!timer.expired())
     {
@@ -399,19 +479,19 @@
             break;
         }
     }
-        
+
     return rc;
 }
 
 
-template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer)
 {
     /* get one piece of work off the wire and one pass through */
 
     // read the socket, see what work is due
     unsigned short packet_type = readPacket(timer);
-    
+
     int len = 0,
         rc = SUCCESS;
 
@@ -427,21 +507,26 @@
             if (MQTTDeserialize_publish((unsigned char*)&msg.dup, (int*)&msg.qos, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName,
                                  (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
                 goto exit;
-//          if (msg.qos != QOS2) 
+#if MQTTCLIENT_QOS2
+            if (msg.qos != QOS2)
+#endif
                 deliverMessage(topicName, msg);
-#if 0
+#if MQTTCLIENT_QOS2
             else if (isQoS2msgidFree(msg.id))
             {
-                UseQoS2msgid(msg.id);
-                deliverMessage(topicName, msg);
-            }
+                if (useQoS2msgid(msg.id))
+                    deliverMessage(topicName, msg);
+                else
+                    WARN("Maximum number of incoming QoS2 messages exceeded");
+            }   
 #endif
+#if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
             if (msg.qos != QOS0)
             {
                 if (msg.qos == QOS1)
-                    len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id);
+                    len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id);
                 else if (msg.qos == QOS2)
-                    len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id);
+                    len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id);
                 if (len <= 0)
                     rc = FAILURE;
                 else
@@ -450,12 +535,14 @@
                     goto exit; // there was a problem
             }
             break;
+#endif
+#if MQTTCLIENT_QOS2
         case PUBREC:
             unsigned short mypacketid;
             unsigned char dup, type;
             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
                 rc = FAILURE;
-            else if ((len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid)) <= 0)
+            else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid)) <= 0)
                 rc = FAILURE;
             else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet
                 rc = FAILURE; // there was a problem
@@ -464,6 +551,7 @@
             break;
         case PUBCOMP:
             break;
+#endif
         case PINGRESP:
             ping_outstanding = false;
             break;
@@ -487,12 +575,12 @@
         goto exit;
     }
 
-    if (ping_timer.expired())
+    if (last_sent.expired() || last_received.expired())
     {
         if (!ping_outstanding)
         {
             Timer timer = Timer(1000);
-            int len = MQTTSerialize_pingreq(buf, MAX_MQTT_PACKET_SIZE);
+            int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
             if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet
                 ping_outstanding = true;
         }
@@ -504,43 +592,41 @@
 
 
 // only used in single-threaded mode where one command at a time is in process
-template<class Network, class Timer, int a, int b> 
+template<class Network, class Timer, int a, int b>
 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer)
 {
     int rc = FAILURE;
-    
+
     do
     {
-        if (timer.expired()) 
+        if (timer.expired())
             break; // we timed out
     }
-    while ((rc = cycle(timer)) != packet_type);  
-    
+    while ((rc = cycle(timer)) != packet_type);
+
     return rc;
 }
 
 
-template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 
-int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData* options)
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
+int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options)
 {
     Timer connect_timer = Timer(command_timeout_ms);
     int rc = FAILURE;
-    MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
     int len = 0;
-    
+
     if (isconnected) // don't send connect packet again if we are already connected
         goto exit;
 
-    if (options == 0)
-        options = &default_options; // set default options if none were supplied
-    
-    this->keepAliveInterval = options->keepAliveInterval;
-    ping_timer.countdown(this->keepAliveInterval);
-    if ((len = MQTTSerialize_connect(buf, MAX_MQTT_PACKET_SIZE, options)) <= 0)
+    this->keepAliveInterval = options.keepAliveInterval;
+    this->cleansession = options.cleansession;
+    if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0)
         goto exit;
     if ((rc = sendPacket(len, connect_timer)) != SUCCESS)  // send the connect packet
         goto exit; // there was a problem
-    
+
+    if (this->keepAliveInterval > 0)
+        last_received.countdown(this->keepAliveInterval);
     // this will be a blocking call, wait for the connack
     if (waitfor(CONNACK, connect_timer) == CONNACK)
     {
@@ -553,7 +639,26 @@
     }
     else
         rc = FAILURE;
-    
+        
+#if MQTTCLIENT_QOS2
+    // resend an inflight publish
+    if (inflightMsgid >0 && inflightQoS == QOS2 && pubrel)
+    {
+        if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0)
+            rc = FAILURE;
+        else
+            rc = publish(len, connect_timer, inflightQoS);
+    }
+    else
+#endif
+#if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
+    if (inflightMsgid > 0)
+    {
+        memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE);
+        rc = publish(inflightLen, connect_timer, inflightQoS);
+    }
+#endif
+
 exit:
     if (rc == SUCCESS)
         isconnected = true;
@@ -561,29 +666,37 @@
 }
 
 
-template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
+int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect()
+{
+    MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
+    return connect(default_options);
+}
+
+
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
-{ 
-    int rc = FAILURE;  
+{
+    int rc = FAILURE;
     Timer timer = Timer(command_timeout_ms);
     int len = 0;
     MQTTString topic = {(char*)topicFilter, 0, 0};
-    
+
     if (!isconnected)
         goto exit;
-    
-    len = MQTTSerialize_subscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
+
+    len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
     if (len <= 0)
         goto exit;
     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
         goto exit;             // there was a problem
-    
-    if (waitfor(SUBACK, timer) == SUBACK)      // wait for suback 
+
+    if (waitfor(SUBACK, timer) == SUBACK)      // wait for suback
     {
         int count = 0, grantedQoS = -1;
         unsigned short mypacketid;
         if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
-            rc = grantedQoS; // 0, 1, 2 or 0x80 
+            rc = grantedQoS; // 0, 1, 2 or 0x80
         if (rc != 0x80)
         {
             for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
@@ -598,67 +711,58 @@
             }
         }
     }
-    else 
+    else
         rc = FAILURE;
-        
+
 exit:
+    if (rc != SUCCESS)
+        isconnected = false;
     return rc;
 }
 
 
-template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter)
-{   
+{
     int rc = FAILURE;
-    Timer timer = Timer(command_timeout_ms);    
+    Timer timer = Timer(command_timeout_ms);
     MQTTString topic = {(char*)topicFilter, 0, 0};
     int len = 0;
-    
+
     if (!isconnected)
         goto exit;
-    
-    if ((len = MQTTSerialize_unsubscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0)
+
+    if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0)
         goto exit;
-    if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
+    if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet
         goto exit; // there was a problem
-    
+
     if (waitfor(UNSUBACK, timer) == UNSUBACK)
     {
         unsigned short mypacketid;  // should be the same as the packetid above
         if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
-            rc = 0; 
+            rc = 0;
     }
     else
         rc = FAILURE;
-    
+
 exit:
+    if (rc != SUCCESS)
+        isconnected = false;
     return rc;
 }
 
 
-   
-template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 
-int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message* message)
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
+int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos)
 {
-    int rc = FAILURE;
-    Timer timer = Timer(command_timeout_ms);   
-    MQTTString topicString = {(char*)topicName, 0, 0};
-    int len = 0;
+    int rc;
     
-    if (!isconnected)
-        goto exit;
+    if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet
+        goto exit; // there was a problem
 
-    if (message->qos == QOS1 || message->qos == QOS2)
-        message->id = packetid.getNext();
-    
-    len = MQTTSerialize_publish(buf, MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id, 
-              topicString, (unsigned char*)message->payload, message->payloadlen);
-    if (len <= 0)
-        goto exit;
-    if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
-        goto exit; // there was a problem
-    
-    if (message->qos == QOS1)
+#if MQTTCLIENT_QOS1 
+    if (qos == QOS1)
     {
         if (waitfor(PUBACK, timer) == PUBACK)
         {
@@ -666,11 +770,14 @@
             unsigned char dup, type;
             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
                 rc = FAILURE;
+            else if (inflightMsgid == mypacketid)
+                inflightMsgid = 0;
         }
         else
             rc = FAILURE;
     }
-    else if (message->qos == QOS2)
+#elif MQTTCLIENT_QOS2
+    else if (qos == QOS2)
     {
         if (waitfor(PUBCOMP, timer) == PUBCOMP)
         {
@@ -678,28 +785,91 @@
             unsigned char dup, type;
             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
                 rc = FAILURE;
+            else if (inflightMsgid == mypacketid)
+                inflightMsgid = 0;
         }
         else
             rc = FAILURE;
     }
-    
+#endif
+
+exit:
+    if (rc != SUCCESS)
+        isconnected = false;
+    return rc;
+}
+
+
+
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
+int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained)
+{
+    int rc = FAILURE;
+    Timer timer = Timer(command_timeout_ms);
+    MQTTString topicString = MQTTString_initializer;
+    int len = 0;
+
+    if (!isconnected)
+        goto exit;
+        
+    topicString.cstring = (char*)topicName;
+
+#if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
+    if (qos == QOS1 || qos == QOS2)
+        id = packetid.getNext();
+#endif
+
+    len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id,
+              topicString, (unsigned char*)payload, payloadlen);
+    if (len <= 0)
+        goto exit;
+        
+#if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
+    if (!cleansession)
+    {
+        memcpy(pubbuf, sendbuf, len);
+        inflightMsgid = id;
+        inflightLen = len;
+        inflightQoS = qos;
+#if MQTTCLIENT_QOS2
+        pubrel = false;
+#endif
+    }
+#endif
+        
+    rc = publish(len, timer, qos);
 exit:
     return rc;
 }
 
 
-template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
+int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained)
+{
+    unsigned short id = 0;  // dummy - not used for anything
+    return publish(topicName, payload, payloadlen, id, qos, retained);
+}
+
+
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
+int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message)
+{
+    return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained);
+}
+
+
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect()
-{  
+{
     int rc = FAILURE;
     Timer timer = Timer(command_timeout_ms);     // we might wait for incomplete incoming publishes to complete
     int len = MQTTSerialize_disconnect(buf, MAX_MQTT_PACKET_SIZE);
     if (len > 0)
         rc = sendPacket(len, timer);            // send the disconnect packet
-        
+
     isconnected = false;
     return rc;
 }
 
 
-#endif
\ No newline at end of file
+#endif