Modified MQTT for Mbed OS.

Dependencies:   FP MQTTPacket

Dependents:   mbed-os-mqtt door_lock co657_IoT nucleo-f429zi-mbed-os-mqtt

Fork of MQTT by MQTT

Revision:
31:a51dd239b78e
Parent:
30:a4e3a97dabe3
Child:
33:8bbc3a992326
Child:
34:e18a166198df
--- a/MQTTClient.h	Tue May 20 15:07:11 2014 +0000
+++ b/MQTTClient.h	Thu May 22 23:58:08 2014 +0000
@@ -37,6 +37,7 @@
 
 enum QoS { QOS0, QOS1, QOS2 };
 
+// all failure return codes must be negative
 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
 
 
@@ -53,8 +54,13 @@
 
 struct MessageData
 {
-    struct Message message;
-    char* topicName;
+    MessageData(MQTTString &aTopicName, struct Message &aMessage)  : message(aMessage), topicName(aTopicName)
+    {
+
+    }
+    
+    struct Message &message;
+    MQTTString &topicName;
 };
 
 
@@ -75,6 +81,17 @@
     static const int MAX_PACKET_ID = 65535;
     int next;
 };
+
+
+class QoS2
+{
+public:
+
+    
+private:
+
+
+};
   
   
 /**
@@ -86,12 +103,13 @@
  * @param Network a network class which supports send, receive
  * @param Timer a timer class with the methods: 
  */ 
-template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5> class Client
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5>
+class Client
 {
     
 public:
    
-    typedef void (*messageHandler)(Message*);
+    typedef void (*messageHandler)(MessageData&);
 
     /** Construct the client
      *  @param network - pointer to an instance of the Network class - must be connected to the endpoint
@@ -136,7 +154,7 @@
      */   
     int unsubscribe(const char* topicFilter);
     
-    /** MQTT Disconnect - send an MQTT disconnect packet 
+    /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state
      *  @return success code -  
      */
     int disconnect();
@@ -173,17 +191,24 @@
     
     PacketId packetid;
     
-    // typedef FP<void, Message*> messageHandlerFP;
     struct MessageHandlers
     {
         const char* topicFilter;
-        //messageHandlerFP fp; typedefs not liked?
-        FP<void, Message*> fp;
+        FP<void, MessageData&> fp;
     } messageHandlers[MAX_MESSAGE_HANDLERS];      // Message handlers are indexed by subscription topic
     
-    FP<void, Message*> defaultMessageHandler;
+    FP<void, MessageData&> defaultMessageHandler;
      
     bool isconnected;
+    
+#if 0
+    struct
+    {
+      bool used;
+      int id;  
+    } QoS2messages[MAX_QOS2_MESSAGES];
+    
+#endif
 
 };
 
@@ -335,7 +360,8 @@
         {
             if (messageHandlers[i].fp.attached())
             {
-                messageHandlers[i].fp(&message);
+                MessageData md(topicName, message);
+                messageHandlers[i].fp(md);
                 rc = SUCCESS;
             }
         }
@@ -343,7 +369,8 @@
     
     if (rc == FAILURE && defaultMessageHandler.attached()) 
     {
-        defaultMessageHandler(&message);
+        MessageData md(topicName, message);
+        defaultMessageHandler(md);
         rc = SUCCESS;
     }   
     
@@ -395,7 +422,15 @@
             if (MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName,
                                  (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
                 goto exit;
-            deliverMessage(topicName, msg);
+//          if (msg.qos != QOS2) 
+                deliverMessage(topicName, msg);
+#if 0
+            else if (isQoS2msgidFree(msg.id))
+            {
+                UseQoS2msgid(msg.id);
+                deliverMessage(topicName, msg);
+            }
+#endif
             if (msg.qos != QOS0)
             {
                 if (msg.qos == QOS1)
@@ -484,15 +519,18 @@
 {
     Timer connect_timer = Timer(command_timeout_ms);
     int rc = FAILURE;
+    MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
+    int len = 0;
+    
+    if (isconnected) // don't send connect packet again if we are already connected
+        goto exit;
 
-    MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
     if (options == 0)
         options = &default_options; // set default options if none were supplied
     
     this->keepAliveInterval = options->keepAliveInterval;
     ping_timer.countdown(this->keepAliveInterval);
-    int len = MQTTSerialize_connect(buf, MAX_MQTT_PACKET_SIZE, options);
-    if (len <= 0)
+    if ((len = MQTTSerialize_connect(buf, MAX_MQTT_PACKET_SIZE, options)) <= 0)
         goto exit;
     if ((rc = sendPacket(len, connect_timer)) != SUCCESS)  // send the connect packet
         goto exit; // there was a problem
@@ -519,11 +557,11 @@
 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 
 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
 { 
-    int rc = FAILURE;
+    int rc = FAILURE;  
     Timer timer = Timer(command_timeout_ms);
     int len = 0;
+    MQTTString topic = {(char*)topicFilter, 0, 0};
     
-    MQTTString topic = {(char*)topicFilter, 0, 0};
     if (!isconnected)
         goto exit;
     
@@ -556,8 +594,6 @@
         rc = FAILURE;
         
 exit:
-    //if (rc == FAILURE)
-    //   closesession();
     return rc;
 }
 
@@ -566,12 +602,14 @@
 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter)
 {   
     int rc = FAILURE;
-    Timer timer = Timer(command_timeout_ms);
-    
+    Timer timer = Timer(command_timeout_ms);    
     MQTTString topic = {(char*)topicFilter, 0, 0};
+    int len = 0;
     
-    int len = MQTTSerialize_unsubscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic);
-    if (len <= 0)
+    if (!isconnected)
+        goto exit;
+    
+    if ((len = MQTTSerialize_unsubscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0)
         goto exit;
     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
         goto exit; // there was a problem
@@ -595,14 +633,17 @@
 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message* message)
 {
     int rc = FAILURE;
-    Timer timer = Timer(command_timeout_ms);
+    Timer timer = Timer(command_timeout_ms);   
+    MQTTString topicString = {(char*)topicName, 0, 0};
+    int len = 0;
     
-    MQTTString topicString = {(char*)topicName, 0, 0};
+    if (!isconnected)
+        goto exit;
 
     if (message->qos == QOS1 || message->qos == QOS2)
         message->id = packetid.getNext();
     
-    int len = MQTTSerialize_publish(buf, MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id, 
+    len = MQTTSerialize_publish(buf, MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id, 
               topicString, (char*)message->payload, message->payloadlen);
     if (len <= 0)
         goto exit;
@@ -645,7 +686,8 @@
     int len = MQTTSerialize_disconnect(buf, MAX_MQTT_PACKET_SIZE);
     if (len > 0)
         rc = sendPacket(len, timer);            // send the disconnect packet
-
+        
+    isconnected = false;
     return rc;
 }