MQtt to publish the data

Dependencies:   FP MQTTPacket

Fork of MQTT by James Bruce

Revision:
23:05fc7de97d4a
Parent:
22:aadb79d29330
Child:
25:d13a6c558164
Child:
26:2658bb87c53d
--- a/MQTTClient.h	Wed Apr 30 13:03:45 2014 +0000
+++ b/MQTTClient.h	Tue May 06 09:44:23 2014 +0000
@@ -19,8 +19,13 @@
  TODO: 
  
  log messages - use macros
+ 
  define return code constants
  
+ call connectionLost at appropriate points - in sendPacket and readPacket
+ 
+ match wildcard topics
+ 
  */
 
 #if !defined(MQTTCLIENT_H)
@@ -36,6 +41,8 @@
 
 enum QoS { QOS0, QOS1, QOS2 };
 
+enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
+
 
 struct Message
 {
@@ -58,50 +65,36 @@
 class PacketId
 {
 public:
-    PacketId();
+    PacketId()
+    {
+        next = 0;
+    }
     
-    int getNext();
+    int getNext()
+    {
+        return next = (next == MAX_PACKET_ID) ? 1 : ++next;
+    }
    
 private:
     static const int MAX_PACKET_ID = 65535;
     int next;
 };
-
-typedef void (*messageHandler)(Message*);
-
-typedef struct limits
-{
-    int MAX_MQTT_PACKET_SIZE; // 
-    int MAX_MESSAGE_HANDLERS;  // each subscription requires a message handler
-    long command_timeout_ms;
-        
-    limits()
-    {
-        MAX_MQTT_PACKET_SIZE = 100;
-        MAX_MESSAGE_HANDLERS = 5;
-        command_timeout_ms = 30000;
-    }
-} Limits;
   
   
 /**
  * @class Client
  * @brief blocking, non-threaded MQTT client API
+ * 
+ * This version of the API blocks on all method calls, until they are complete.  This means that only one
+ * MQTT request can be in process at any one time.  
  * @param Network a network class which supports send, receive
  * @param Timer a timer class with the methods: 
  */ 
-template<class Network, class Timer> class Client
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5> class Client
 {
     
 public:
 
-    /** Construct the client
-     *  @param network - pointer to an instance of the Network class - must be connected to the endpoint
-     *      before calling MQTT connect
-     *  @param limits an instance of the Limit class - to alter limits as required
-     */
-    Client(Network* network, const Limits limits = Limits()); 
-    
     typedef struct
     {
         Client* client;
@@ -109,6 +102,14 @@
     } connectionLostInfo;
     
     typedef int (*connectionLostHandlers)(connectionLostInfo*);
+    typedef void (*messageHandler)(Message*);
+
+    /** Construct the client
+     *  @param network - pointer to an instance of the Network class - must be connected to the endpoint
+     *      before calling MQTT connect
+     *  @param limits an instance of the Limit class - to alter limits as required
+     */
+    Client(Network& network, unsigned int command_timeout_ms = 30000); 
     
     /** Set the connection lost callback - called whenever the connection is lost and we should be connected
      *  @param clh - pointer to the callback function
@@ -162,8 +163,9 @@
     /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive
      *  yield can be called if no other MQTT operation is needed.  This will also allow messages to be 
      *  received.
+     *  @param timeout_ms the time to wait, in milliseconds
      */
-    void yield(int timeout);
+    void yield(int timeout_ms = 1000);
     
 private:
 
@@ -176,12 +178,11 @@
     int sendPacket(int length, Timer& timer);
     int deliverMessage(MQTTString* topic, Message* message);
     
-    Network* ipstack;
+    Network& ipstack;
+    unsigned int command_timeout_ms;
     
-    Limits limits;
-    
-    char* buf;  
-    char* readbuf;
+    char buf[MAX_MQTT_PACKET_SIZE];  
+    char readbuf[MAX_MQTT_PACKET_SIZE];  
 
     Timer ping_timer;
     unsigned int keepAliveInterval;
@@ -194,12 +195,11 @@
     {
         const char* topic;
         messageHandlerFP fp;
-    } *messageHandlers;      // Message handlers are indexed by subscription topic
+    } messageHandlers[MAX_MESSAGE_HANDLERS];      // Message handlers are indexed by subscription topic
     
     messageHandlerFP defaultMessageHandler;
     
     typedef FP<int, connectionLostInfo*> connectionLostFP;
-    
     connectionLostFP connectionLostHandler;
     
 };
@@ -207,34 +207,46 @@
 }
 
 
-template<class Network, class Timer> MQTT::Client<Network, Timer>::Client(Network* network, Limits limits)  : limits(limits), packetid()
+template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 
+MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms)  : ipstack(network), packetid()
 {
-    this->ipstack = network;
-    this->ping_timer = Timer();
-    this->ping_outstanding = 0;
-       
-    // How to make these memory allocations portable?  I was hoping to avoid the heap
-    buf = new char[limits.MAX_MQTT_PACKET_SIZE];
-    readbuf = new char[limits.MAX_MQTT_PACKET_SIZE];
-    this->messageHandlers = new struct MessageHandlers[limits.MAX_MESSAGE_HANDLERS];
-    for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i)
+    ping_timer = Timer();
+    ping_outstanding = 0;
+    for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
         messageHandlers[i].topic = 0;
+    this->command_timeout_ms = command_timeout_ms;    
 }
 
 
-template<class Network, class Timer> int MQTT::Client<Network, Timer>::sendPacket(int length, Timer& timer)
+template<class Network, class Timer, int a, int b> 
+int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer)
 {
-    int sent = 0;
+    int rc = FAILURE, 
+        sent = 0;
     
-    while (sent < length)
-        sent += ipstack->write(&buf[sent], length, timer.left_ms());
+    while (sent < length && !timer.expired())
+    {
+        rc = ipstack.write(&buf[sent], length, timer.left_ms());
+        if (rc == -1)
+        {
+            connectionLostInfo info = {this, &ipstack};
+            connectionLostHandler(&info);
+        }
+        else
+            sent += rc;
+    }
     if (sent == length)
+    {
         ping_timer.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet    
-    return sent;
+        rc = SUCCESS;
+    }
+    else
+        rc = FAILURE;
+    return rc;
 }
 
 
-template<class Network, class Timer> int MQTT::Client<Network, Timer>::decodePacket(int* value, int timeout)
+template<class Network, class Timer, int a, int b> int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout)
 {
     char c;
     int multiplier = 1;
@@ -251,7 +263,7 @@
             rc = MQTTPACKET_READ_ERROR; /* bad data */
             goto exit;
         }
-        rc = ipstack->read(&c, 1, timeout);
+        rc = ipstack.read(&c, 1, timeout);
         if (rc != 1)
             goto exit;
         *value += (c & 127) * multiplier;
@@ -268,7 +280,8 @@
  * @param timeout the max time to wait for the packet read to complete, in milliseconds
  * @return the MQTT packet type, or -1 if none
  */
-template<class Network, class Timer> int MQTT::Client<Network, Timer>::readPacket(Timer& timer) 
+template<class Network, class Timer, int a, int b> 
+int MQTT::Client<Network, Timer, a, b>::readPacket(Timer& timer) 
 {
     int rc = -1;
     MQTTHeader header = {0};
@@ -276,7 +289,7 @@
     int rem_len = 0;
 
     /* 1. read the header byte.  This has the packet type in it */
-    if (ipstack->read(readbuf, 1, timer.left_ms()) != 1)
+    if (ipstack.read(readbuf, 1, timer.left_ms()) != 1)
         goto exit;
 
     len = 1;
@@ -285,7 +298,7 @@
     len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
 
     /* 3. read the rest of the buffer using a callback to supply the rest of the data */
-    if (ipstack->read(readbuf + len, rem_len, timer.left_ms()) != rem_len)
+    if (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len)
         goto exit;
 
     header.byte = readbuf[0];
@@ -295,12 +308,13 @@
 }
 
 
-template<class Network, class Timer> int MQTT::Client<Network, Timer>::deliverMessage(MQTTString* topic, Message* message)
+template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 
+int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString* topic, Message* message)
 {
     int rc = -1;
 
     // we have to find the right message handler - indexed by topic
-    for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i)
+    for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
     {
         if (messageHandlers[i].topic != 0 && MQTTPacket_equals(topic, (char*)messageHandlers[i].topic))
         {
@@ -317,17 +331,19 @@
 
 
 
-template<class Network, class Timer> void MQTT::Client<Network, Timer>::yield(int timeout)
+template<class Network, class Timer, int a, int b> 
+void MQTT::Client<Network, Timer, a, b>::yield(int timeout_ms)
 {
     Timer timer = Timer();
     
-    timer.countdown_ms(timeout);
+    timer.countdown_ms(timeout_ms);
     while (!timer.expired())
         cycle(timer);
 }
 
 
-template<class Network, class Timer> int MQTT::Client<Network, Timer>::cycle(Timer& timer)
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 
+int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer)
 {
     /* get one piece of work off the wire and one pass through */
 
@@ -345,26 +361,24 @@
             MQTTString topicName;
             Message msg;
             rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName,
-                                 (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, limits.MAX_MQTT_PACKET_SIZE);;
+                                 (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE);;
             deliverMessage(&topicName, &msg);
             if (msg.qos != QOS0)
             {
                 if (msg.qos == QOS1)
-                    len = MQTTSerialize_ack(buf, limits.MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id);
+                    len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id);
                 else if (msg.qos == QOS2)
-                    len = MQTTSerialize_ack(buf, limits.MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id);
-                rc = sendPacket(len, timer); 
-                if (rc != len) 
+                    len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id);
+                if ((rc = sendPacket(len, timer)) != SUCCESS)
                     goto exit; // there was a problem
             }
             break;
         case PUBREC:
             int type, dup, mypacketid;
-            if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
+            if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
                 ; 
-            len = MQTTSerialize_ack(buf, limits.MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid);
-            rc = sendPacket(len, timer); // send the PUBREL packet
-            if (rc != len) 
+            len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid);
+            if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet
                 goto exit; // there was a problem
 
             break;
@@ -380,7 +394,8 @@
 }
 
 
-template<class Network, class Timer> int MQTT::Client<Network, Timer>::keepalive()
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
+int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive()
 {
     int rc = 0;
 
@@ -394,9 +409,9 @@
         else
         {
             Timer timer = Timer(1000);
-            int len = MQTTSerialize_pingreq(buf, limits.MAX_MQTT_PACKET_SIZE);
+            int len = MQTTSerialize_pingreq(buf, MAX_MQTT_PACKET_SIZE);
             rc = sendPacket(len, timer); // send the ping packet
-            if (rc != len) 
+            if (rc != SUCCESS) 
                 rc = -1; // indicate there's a problem
             else
                 ping_outstanding = true;
@@ -409,7 +424,8 @@
 
 
 // only used in single-threaded mode where one command at a time is in process
-template<class Network, class Timer> int MQTT::Client<Network, Timer>::waitfor(int packet_type, Timer& timer)
+template<class Network, class Timer, int a, int b> 
+int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer)
 {
     int rc = -1;
     
@@ -424,9 +440,10 @@
 }
 
 
-template<class Network, class Timer> int MQTT::Client<Network, Timer>::connect(MQTTPacket_connectData* options)
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 
+int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData* options)
 {
-    Timer connect_timer = Timer(limits.command_timeout_ms);
+    Timer connect_timer = Timer(command_timeout_ms);
 
     MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
     if (options == 0)
@@ -434,16 +451,16 @@
     
     this->keepAliveInterval = options->keepAliveInterval;
     ping_timer.countdown(this->keepAliveInterval);
-    int len = MQTTSerialize_connect(buf, limits.MAX_MQTT_PACKET_SIZE, options);
+    int len = MQTTSerialize_connect(buf, MAX_MQTT_PACKET_SIZE, options);
     int rc = sendPacket(len, connect_timer); // send the connect packet
-    if (rc != len) 
+    if (rc != SUCCESS) 
         goto exit; // there was a problem
     
     // this will be a blocking call, wait for the connack
     if (waitfor(CONNACK, connect_timer) == CONNACK)
     {
         int connack_rc = -1;
-        if (MQTTDeserialize_connack(&connack_rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
+        if (MQTTDeserialize_connack(&connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
             rc = connack_rc;
     }
     
@@ -452,28 +469,29 @@
 }
 
 
-template<class Network, class Timer> int MQTT::Client<Network, Timer>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 
+int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
 { 
     int len = -1;
-    Timer timer = Timer(limits.command_timeout_ms);
+    Timer timer = Timer(command_timeout_ms);
     
     MQTTString topic = {(char*)topicFilter, 0, 0};
     
-    int rc = MQTTSerialize_subscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
+    int rc = MQTTSerialize_subscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
     if (rc <= 0)
         goto exit;
     len = rc;
-    if ((rc = sendPacket(len, timer)) != len) // send the subscribe packet
+    if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
         goto exit; // there was a problem
     
     if (waitfor(SUBACK, timer) == SUBACK)      // wait for suback 
     {
         int count = 0, grantedQoS = -1, mypacketid;
-        if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
+        if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
             rc = grantedQoS; // 0, 1, 2 or 0x80 
         if (rc != 0x80)
         {
-            for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i)
+            for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
             {
                 if (messageHandlers[i].topic == 0)
                 {
@@ -491,24 +509,25 @@
 }
 
 
-template<class Network, class Timer> int MQTT::Client<Network, Timer>::unsubscribe(const char* topicFilter)
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 
+int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter)
 {   
     int len = -1;
-    Timer timer = Timer(limits.command_timeout_ms);
+    Timer timer = Timer(command_timeout_ms);
     
     MQTTString topic = {(char*)topicFilter, 0, 0};
     
-    int rc = MQTTSerialize_unsubscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic);
+    int rc = MQTTSerialize_unsubscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic);
     if (rc <= 0)
         goto exit;
     len = rc;
-    if ((rc = sendPacket(len, timer)) != len) // send the subscribe packet
+    if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
         goto exit; // there was a problem
     
     if (waitfor(UNSUBACK, timer) == UNSUBACK)
     {
         int mypacketid;  // should be the same as the packetid above
-        if (MQTTDeserialize_unsuback(&mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
+        if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
             rc = 0; 
     }
     
@@ -518,19 +537,20 @@
 
 
    
-template<class Network, class Timer> int MQTT::Client<Network, Timer>::publish(const char* topicName, Message* message)
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 
+int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message* message)
 {
-    Timer timer = Timer(limits.command_timeout_ms);
+    Timer timer = Timer(command_timeout_ms);
     
     MQTTString topicString = {(char*)topicName, 0, 0};
 
     if (message->qos == QOS1 || message->qos == QOS2)
         message->id = packetid.getNext();
     
-    int len = MQTTSerialize_publish(buf, limits.MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id, 
+    int len = MQTTSerialize_publish(buf, MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id, 
               topicString, (char*)message->payload, message->payloadlen);
     int rc = sendPacket(len, timer); // send the subscribe packet
-    if (rc != len) 
+    if (rc != SUCCESS) 
         goto exit; // there was a problem
     
     if (message->qos == QOS1)
@@ -538,7 +558,7 @@
         if (waitfor(PUBACK, timer) == PUBACK)
         {
             int type, dup, mypacketid;
-            if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
+            if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
                 rc = 0; 
         }
     }
@@ -547,7 +567,7 @@
         if (waitfor(PUBCOMP, timer) == PUBCOMP)
         {
             int type, dup, mypacketid;
-            if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
+            if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
                 rc = 0; 
         }
     }
@@ -557,13 +577,14 @@
 }
 
 
-template<class Network, class Timer> int MQTT::Client<Network, Timer>::disconnect()
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 
+int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect()
 {  
-    Timer timer = Timer(limits.command_timeout_ms);     // we might wait for incomplete incoming publishes to complete
-    int len = MQTTSerialize_disconnect(buf, limits.MAX_MQTT_PACKET_SIZE);
+    Timer timer = Timer(command_timeout_ms);     // we might wait for incomplete incoming publishes to complete
+    int len = MQTTSerialize_disconnect(buf, MAX_MQTT_PACKET_SIZE);
     int rc = sendPacket(len, timer);   // send the disconnect packet
     
-    return (rc == len) ? 0 : -1;
+    return rc;
 }