MQTT

Dependencies:   MQTTPacket FP

Revision:
15:64a57183aa03
Parent:
13:fd82db992024
Child:
16:91c2f9a144d4
diff -r fd82db992024 -r 64a57183aa03 MQTTClient.h
--- a/MQTTClient.h	Fri Apr 11 22:46:37 2014 +0100
+++ b/MQTTClient.h	Sun Apr 13 22:32:28 2014 +0000
@@ -45,7 +45,7 @@
 public:
     PacketId();
     
-    int getNext();
+    int getNext();
    
 private:
     static const int MAX_PACKET_ID = 65535;
@@ -57,16 +57,32 @@
 template<class Network, class Timer, class Thread> class Client
 {
     
-public:    
-
+public:    
+
 	struct Result
 	{
     	/* success or failure result data */
-    	Client<Network, Timer, Thread>* client;
+    	Client<Network, Timer, Thread>* client;
 		int connack_rc;
-	};
-
+	};
+
 	typedef void (*resultHandler)(Result*);
+	
+	struct limits
+	{
+		int MAX_MQTT_PACKET_SIZE; // 
+		int MAX_MESSAGE_HANDLERS;  // 5 - each subscription requires a message handler
+		int MAX_CONCURRENT_OPERATIONS;  // each command which runs concurrently can have a result handler, when we are in multi-threaded mode
+		int command_timeout;
+		
+		limits()
+		{
+			MAX_MQTT_PACKET_SIZE = 100;
+			MAX_MESSAGE_HANDLERS = 5;
+			MAX_CONCURRENT_OPERATIONS= 5;
+			command_timeout = 30;	
+		}
+	};
    
     Client(Network* network, const int MAX_MQTT_PACKET_SIZE = 100, const int command_timeout = 30); 
        
@@ -87,61 +103,74 @@
     
 private:
 
-    int cycle(int timeout);
+    int cycle(int timeout);
 	int keepalive();
 
     int decodePacket(int* value, int timeout);
     int readPacket(int timeout = -1);
     int sendPacket(int length, int timeout = -1);
+	int deliverMessage(MQTTString* topic, Message* message);
     
     Thread* thread;
     Network* ipstack;
     Timer command_timer, ping_timer;
     
-    char* buf; 
+    char buf[];
     int buflen;
     
     char* readbuf;
-    int readbuflen;
-
-    unsigned int keepAliveInterval;
+    int readbuflen;
+
+    unsigned int keepAliveInterval;
 	bool ping_outstanding;
     
     int command_timeout; // max time to wait for any MQTT command to complete, in seconds
     PacketId packetid;
     
     typedef FP<void, Result*> resultHandlerFP;    
-    // how many concurrent operations should we allow?  Each one will require a function pointer
     resultHandlerFP connectHandler; 
     
     #define MAX_MESSAGE_HANDLERS 5
     typedef FP<void, Message*> messageHandlerFP;
-    messageHandlerFP messageHandlers[MAX_MESSAGE_HANDLERS];  // Linked list, or constructor parameter to limit array size?
-
+    struct
+    {
+    	char* topic;
+    	messageHandlerFP fp;
+    } messageHandlers[MAX_MESSAGE_HANDLERS];  // Message handlers are linked to a subscription topic
+    
+    // how many concurrent operations should we allow?  Each one will require a function pointer
+    struct
+    {
+    	unsigned short id;
+    	resultHandlerFP fp;
+    	MQTTString* topic;  // if this is a publish, store topic name in case republishing is required
+    	Message* message;  // for publish, 
+    } *operations;  // result handlers are indexed by packet ids
+
 	static void threadfn(void* arg);
     
 };
 
-}
-
-
-template<class Network, class Timer, class Thread> void MQTT::Client<Network, Timer, Thread>::threadfn(void* arg)
-{
-    ((Client<Network, Timer, Thread>*) arg)->run(NULL);
+}
+
+
+template<class Network, class Timer, class Thread> void MQTT::Client<Network, Timer, Thread>::threadfn(void* arg)
+{
+    ((Client<Network, Timer, Thread>*) arg)->run(NULL);
 }
 
 
 template<class Network, class Timer, class Thread> MQTT::Client<Network, Timer, Thread>::Client(Network* network, const int MAX_MQTT_PACKET_SIZE, const int command_timeout)  : packetid()
 {
-    
-   buf = new char[MAX_MQTT_PACKET_SIZE];
+   //buf = new char[MAX_MQTT_PACKET_SIZE];
    readbuf = new char[MAX_MQTT_PACKET_SIZE];
    buflen = readbuflen = MAX_MQTT_PACKET_SIZE;
+   
    this->command_timeout = command_timeout;
    this->thread = 0;
    this->ipstack = network;
-   this->command_timer = Timer();
-   this->ping_timer = Timer();
+   this->command_timer = Timer();
+   this->ping_timer = Timer();
    this->ping_outstanding = 0;
 }
 
@@ -151,7 +180,7 @@
     int sent = 0;
     
     while (sent < length)
-        sent += ipstack->write(&buf[sent], length, -1);
+        sent += ipstack->write(&buf[sent], length, -1);
 	if (sent == length)
 	    ping_timer.reset(); // record the fact that we have successfully sent the packet    
     return sent;
@@ -163,7 +192,7 @@
     char c;
     int multiplier = 1;
     int len = 0;
-#define MAX_NO_OF_REMAINING_LENGTH_BYTES 4
+	const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
 
     *value = 0;
     do
@@ -219,6 +248,11 @@
 }
 
 
+template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::deliverMessage(MQTTString* topic, Message* message)
+{
+}
+
+
 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::cycle(int timeout)
 {
     /* get one piece of work off the wire and one pass through */
@@ -227,116 +261,114 @@
     int packet_type = readPacket(timeout);
 
     printf("packet type %d\n", packet_type);
-    
+    
 	int len, rc;
     switch (packet_type)
     {
-        case CONNACK:
-			if (this->thread)
-			{
-				Result res = {this, 0};
-            	int connack_rc = -1;
+        case CONNACK:
+			if (this->thread)
+			{
+				Result res = {this, 0};
             	if (MQTTDeserialize_connack(&res.connack_rc, readbuf, readbuflen) == 1)
-                	;
-				connectHandler(&res);
-			}
+                	;
+				connectHandler(&res);
+				connectHandler.detach(); // only invoke the callback once
+			}
         case PUBACK:
         case SUBACK:
             break;
-        case PUBLISH:
-			MQTTString topicName;
-			Message msg;
-			rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName,
-								 (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, readbuflen);
-			if (msg.qos == QOS0)
-				messageHandlers[0](&msg);
+        case PUBLISH:
+			MQTTString topicName;
+			Message msg;
+			rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName,
+								 (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, readbuflen);
+			if (msg.qos == QOS0)
+				deliverMessage(&topicName, &msg);
             break;
-        case PUBREC:
+        case PUBREC:
    	        int type, dup, mypacketid;
    	        if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
-   	            ; 
+   	            ; 
+   	        // must lock this access against the application thread, if we are multi-threaded
 			len = MQTTSerialize_ack(buf, buflen, PUBREL, 0, mypacketid);
-		    rc = sendPacket(len); // send the subscribe packet
-			if (rc != len) 
-				goto exit; // there was a problem
+		    rc = sendPacket(len); // send the subscribe packet
+			if (rc != len) 
+				goto exit; // there was a problem
 
             break;
         case PUBCOMP:
             break;
-        case PINGRESP:
-			if (ping_outstanding)
-				ping_outstanding = false;
-			//else disconnect();
+        case PINGRESP:
+			ping_outstanding = false;
             break;
-        case -1:
-            break;
-    }
-	keepalive();
+    }
+	keepalive();
 exit:
     return packet_type;
-}
-
-
+}
+
+
 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::keepalive()
-{
-	int rc = 0;
-
-	if (keepAliveInterval == 0)
-		goto exit;
-
-	if (ping_timer.read_ms() >= (keepAliveInterval * 1000))
-	{
-		if (ping_outstanding)
-			rc = -1;
-		else
-		{
-			int len = MQTTSerialize_pingreq(buf, buflen);
-			rc = sendPacket(len); // send the connect packet
-			if (rc != len) 
-				rc = -1; // indicate there's a problem
-			else
-				ping_outstanding = true;
-		}
-	}
-
-exit:
+{
+	int rc = 0;
+
+	if (keepAliveInterval == 0)
+		goto exit;
+
+	if (ping_timer.read_ms() >= (keepAliveInterval * 1000))
+	{
+		if (ping_outstanding)
+			rc = -1;
+		else
+		{
+			int len = MQTTSerialize_pingreq(buf, buflen);
+			rc = sendPacket(len); // send the connect packet
+			if (rc != len) 
+				rc = -1; // indicate there's a problem
+			else
+				ping_outstanding = true;
+		}
+	}
+
+exit:
 	return rc;
 }
 
 
 template<class Network, class Timer, class Thread> void MQTT::Client<Network, Timer, Thread>::run(void const *argument)
-{
-	while (true)
+{
+	while (true)
 		cycle((keepAliveInterval * 1000) - ping_timer.read_ms());
 }
 
 
 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::connect(MQTTPacket_connectData* options, resultHandler resultHandler)
-{
-	command_timer.start();
+{
+	command_timer.start();
 
     MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
     if (options == 0)
         options = &default_options; // set default options if none were supplied
     
-    this->keepAliveInterval = options->keepAliveInterval;
+    this->keepAliveInterval = options->keepAliveInterval;
 	ping_timer.start();
     int len = MQTTSerialize_connect(buf, buflen, options);
-    int rc = sendPacket(len); // send the connect packet
-	if (rc != len) 
+    int rc = sendPacket(len); // send the connect packet
+	if (rc != len) 
 		goto exit; // there was a problem
     
     if (resultHandler == 0)     // wait until the connack is received 
-    {
-		if (command_timer.read_ms() > (command_timeout * 1000)) 
-			goto exit; // we timed out
+    {
         // this will be a blocking call, wait for the connack
-        if (cycle(command_timeout - command_timer.read_ms()) == CONNACK)
+		do
         {
-            int connack_rc = -1;
-            if (MQTTDeserialize_connack(&connack_rc, readbuf, readbuflen) == 1)
-                rc = connack_rc;
-        }
+			if (command_timer.read_ms() > (command_timeout * 1000)) 
+				goto exit; // we timed out
+		}
+		while (cycle(command_timeout - command_timer.read_ms()) != CONNACK);
+        int connack_rc = -1;
+        if (MQTTDeserialize_connack(&connack_rc, readbuf, readbuflen) == 1)
+	        rc = connack_rc;
     }
     else
     {
@@ -346,23 +378,23 @@
         // start background thread            
         this->thread = new Thread((void (*)(void const *argument))&MQTT::Client<Network, Timer, Thread>::threadfn, (void*)this);
     }
-    
-exit:
-	command_timer.stop();
+    
+exit:
+	command_timer.stop();
 	command_timer.reset();
     return rc;
 }
 
 
 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler, resultHandler resultHandler)
-{
-	command_timer.start();
+{
+	command_timer.start();
 
     MQTTString topic = {(char*)topicFilter, 0, 0};
     
     int len = MQTTSerialize_subscribe(buf, buflen, 0, packetid.getNext(), 1, &topic, (int*)&qos);
-    int rc = sendPacket(len); // send the subscribe packet
-	if (rc != len) 
+    int rc = sendPacket(len); // send the subscribe packet
+	if (rc != len) 
 		goto exit; // there was a problem
     
     /* wait for suback */
@@ -381,23 +413,23 @@
         // set subscribe response callback function
         
     }
-    
-exit:
-	command_timer.stop();
+    
+exit:
+	command_timer.stop();
 	command_timer.reset();
     return rc;
-}
-
-
+}
+
+
 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::unsubscribe(const char* topicFilter, resultHandler resultHandler)
-{
-	command_timer.start();
+{
+	command_timer.start();
 
     MQTTString topic = {(char*)topicFilter, 0, 0};
     
     int len = MQTTSerialize_unsubscribe(buf, buflen, 0, packetid.getNext(), 1, &topic);
-    int rc = sendPacket(len); // send the subscribe packet
-	if (rc != len) 
+    int rc = sendPacket(len); // send the subscribe packet
+	if (rc != len) 
 		goto exit; // there was a problem
     
     /* wait for suback */
@@ -416,57 +448,49 @@
         // set unsubscribe response callback function
         
     }
-    
-exit:
-	command_timer.stop();
+    
+exit:
+	command_timer.stop();
 	command_timer.reset();
     return rc;
-}
-
-
-   
+}
+
+
+   
 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::publish(const char* topicName, Message* message, resultHandler resultHandler)
-{
-	command_timer.start();
+{
+	command_timer.start();
 
-    MQTTString topic = {(char*)topicName, 0, 0};
-
+    MQTTString topic = {(char*)topicName, 0, 0};
+
 	message->id = packetid.getNext();
-    
+    
 	int len = MQTTSerialize_publish(buf, buflen, 0, message->qos, message->retained, message->id, topic, message->payload, message->payloadlen);
-    int rc = sendPacket(len); // send the subscribe packet
-	if (rc != len) 
+    int rc = sendPacket(len); // send the subscribe packet
+	if (rc != len) 
 		goto exit; // there was a problem
     
     /* wait for acks */
     if (resultHandler == 0)
     {
- 		if (message->qos == QOS1)
+ 		if (message->qos == QOS1)
 		{
 	        if (cycle(command_timeout - command_timer.read_ms()) == PUBACK)
     	    {
     	        int type, dup, mypacketid;
     	        if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
     	            rc = 0; 
-    	    }
-		}
-		else if (message->qos == QOS2)
-		{
-	        if (cycle(command_timeout - command_timer.read_ms()) == PUBREC)
-    	    {
-    	        int type, dup, mypacketid;
-    	        if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
-    	            rc = 0; 
-				len = MQTTSerialize_ack(buf, buflen, PUBREL, 0, message->id);
-			    rc = sendPacket(len); // send the subscribe packet
-				if (rc != len) 
-					goto exit; // there was a problem
-		        if (cycle(command_timeout - command_timer.read_ms()) == PUBCOMP)
-	    	    {
-    	        	if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
-    	            	rc = 0; 
-				}
-    	    }
+    	    }
+		}
+		else if (message->qos == QOS2)
+		{
+	        if (cycle(command_timeout - command_timer.read_ms()) == PUBCOMP)
+	   	    {
+	   	    	int type, dup, mypacketid;
+            	if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
+    	           	rc = 0; 
+			}
+
 		}
     }
     else
@@ -474,9 +498,9 @@
         // set publish response callback function
         
     }
-    
-exit:
-	command_timer.stop();
+    
+exit:
+	command_timer.stop();
 	command_timer.reset();
     return rc;
 }