An API for using MQTT over multiple transports

Dependencies:   FP MQTTPacket

Dependents:   Cellular_HelloMQTT IoTStarterKit GSwifiInterface_HelloMQTT IBMIoTClientEthernetExample ... more

This library is part of the EclipseTM Paho project; specifically the embedded client.

The goals of this API are:

  1. to be independent of any system library: hence templates parameters for networking, timer and threading classes
  2. not to rely on heap storage, only automatic (I think this is a good thing)
  3. to limit memory use, for instance by defining the size of the buffers and arrays used at object creation time
Revision:
9:01b8cc7d94cc
Parent:
8:c46930bd6c82
Child:
11:db15da110a37
Child:
12:cc7f2d62a393
--- a/MQTTClient.h	Wed Apr 09 13:48:20 2014 +0000
+++ b/MQTTClient.h	Wed Apr 09 23:21:54 2014 +0000
@@ -23,8 +23,6 @@
 
 namespace MQTT
 {
-    
-const int MAX_PACKET_ID = 65535;
 
 
 enum QoS { QOS0, QOS1, QOS2 };
@@ -48,29 +46,46 @@
     Client<class Network, class Timer, class Thread>* client;
 };
 
+
+class PacketId
+{
+public:
+    PacketId();
+    
+    int getNext();
+    
+private:
+    static const int MAX_PACKET_ID = 65535;
+    int next;
+};
+
+typedef void (*resultHandler)(Result*);
+typedef void (*messageHandler)(Message*);
   
 template<class Network, class Timer, class Thread> class Client
 {
     
 public:    
    
-    Client(Network* network, Timer* timer, const int buffer_size = 100, const int command_timeout = 30);  
+    Client(Network* network, Timer* timer, const int buffer_size = 100, const int command_timeout = 30); 
        
-    int connect(MQTTPacket_connectData* options = 0, FP<void, Result*> *resultHandler = 0);
-        
-    int publish(const char* topic, Message* message, FP<void, Result*> *resultHandler = 0);
+    int connect(MQTTPacket_connectData* options = 0, resultHandler fn = 0);
     
-    int subscribe(const char* topicFilter, enum QoS qos, FP<void, Message*> messageHandler, FP<void, Result*> *resultHandler = 0);
+     template<class T>
+    int connect(MQTTPacket_connectData* options = 0, T *item = 0, void(T::*method)(Result *) = 0);  // alternative to pass in pointer to member function
+        
+    int publish(const char* topic, Message* message, resultHandler rh = 0);
     
-    int unsubscribe(char* topicFilter, FP<void, Result*> *resultHandler = 0);
+    int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh, resultHandler rh = 0);
     
-    int disconnect(int timeout, FP<void, Result*> *resultHandler = 0);
+    int unsubscribe(char* topicFilter, resultHandler rh = 0);
+    
+    int disconnect(int timeout, resultHandler rh = 0);
     
     void run(void const *argument);
     
 private:
 
-    int getPacketId();
     int cycle();
 
     int decodePacket(int* value, int timeout);
@@ -81,7 +96,7 @@
     Network* ipstack;
     Timer* timer;
     
-    char* buf;
+    char* buf; 
     int buflen;
     
     char* readbuf;
@@ -89,7 +104,15 @@
     
     int command_timeout; // max time to wait for any MQTT command to complete, in seconds
     int keepalive;
-    int packetid;
+    PacketId packetid;
+    
+    typedef FP<void, Result*> resultHandlerFP;    
+    // how many concurrent operations should we allow?  Each one will require a function pointer
+    resultHandlerFP connectHandler; 
+    
+    #define MAX_MESSAGE_HANDLERS 5
+    typedef FP<void, Message*> messageHandlerFP;
+    messageHandlerFP messageHandlers[MAX_MESSAGE_HANDLERS];  // Linked list, or constructor parameter to limit array size?
     
 };
 
@@ -97,7 +120,7 @@
 
 }
 
-template<class Network, class Timer, class Thread> MQTT::Client<Network, Timer, Thread>::Client(Network* network, Timer* timer, const int buffer_size, const int command_timeout)
+template<class Network, class Timer, class Thread> MQTT::Client<Network, Timer, Thread>::Client(Network* network, Timer* timer, const int buffer_size, const int command_timeout)  : packetid()
 {
     
    buf = new char[buffer_size];
@@ -106,17 +129,10 @@
    this->command_timeout = command_timeout;
    this->thread = 0;
    this->ipstack = network;
-   this->packetid = 0;
    this->timer = timer;
 }
 
 
-template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::getPacketId()
-{
-    return this->packetid = (this->packetid == MAX_PACKET_ID) ? 1 : ++this->packetid;
-}
-
-
 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::sendPacket(int length, int timeout)
 {
     int sent = 0;
@@ -191,11 +207,11 @@
 
 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::cycle()
 {
-    int timeout = 1000L;
+    int timeout = -1;
     /* get one piece of work off the wire and one pass through */
     
     // 1. read the socket, see what work is due. 
-    int packet_type = readPacket(-1);
+    int packet_type = readPacket(timeout);
 
     printf("packet type %d\n", packet_type);
     
@@ -228,7 +244,7 @@
 }
 
 
-template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::connect(MQTTPacket_connectData* options, FP<void, MQTT::Result*> *resultHandler)
+template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::connect(MQTTPacket_connectData* options, resultHandler resultHandler)
 {
     int len = 0;
     int rc = -99;
@@ -261,6 +277,7 @@
     else
     {
         // set connect response callback function
+        connectHandler.attach(resultHandler);
         
         // start background thread
             
@@ -271,14 +288,14 @@
 }
 
 
-template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::subscribe(const char* topicFilter, enum QoS qos, FP<void, Message*> messageHandler, 
-        FP<void, Result*> *resultHandler)
+template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::subscribe(const char* topicFilter, enum QoS qos, 
+    messageHandler messageHandler, resultHandler resultHandler)
 {
     int rc = -1, 
         len = 0;
     MQTTString topic = {(char*)topicFilter, 0, 0};
     
-    len = MQTTSerialize_subscribe(buf, buflen, 0, getPacketId(), 1, &topic, (int*)&qos);
+    len = MQTTSerialize_subscribe(buf, buflen, 0, packetid.getNext(), 1, &topic, (int*)&qos);
     rc = sendPacket(len); // send the subscribe packet
     
     /* wait for suback */
@@ -287,8 +304,8 @@
         // this will block
         if (cycle() == SUBACK)
         {
-            int count = 0, grantedQoS = -1;
-            if (MQTTDeserialize_suback(&packetid, 1, &count, &grantedQoS, readbuf, readbuflen) == 1)
+            int count = 0, grantedQoS = -1, mypacketid;
+            if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, readbuflen) == 1)
                 rc = grantedQoS; // 0, 1, 2 or 0x80 
         }
     }