MQTT

Dependencies:   MQTTPacket FP

Revision:
12:cc7f2d62a393
Parent:
9:01b8cc7d94cc
Child:
13:fd82db992024
diff -r 68a4ada53367 -r cc7f2d62a393 MQTTClient.h
--- a/MQTTClient.h	Thu Apr 10 15:19:08 2014 +0000
+++ b/MQTTClient.h	Fri Apr 11 22:31:55 2014 +0100
@@ -33,41 +33,42 @@
     enum QoS qos;
     bool retained;
     bool dup;
-    unsigned short msgid;
+    unsigned short id;
     void *payload;
     size_t payloadlen;
 };
 
 template<class Network, class Timer, class Thread> class Client;
 
-class Result
-{
-    /* success or failure result data */
-    Client<class Network, class Timer, class Thread>* client;
-};
-
-
 class PacketId
 {
 public:
     PacketId();
     
-    int getNext();
-    
+    int getNext();
+   
 private:
     static const int MAX_PACKET_ID = 65535;
     int next;
 };
 
-typedef void (*resultHandler)(Result*);
 typedef void (*messageHandler)(Message*);
   
 template<class Network, class Timer, class Thread> class Client
 {
     
-public:    
+public:    
+
+	struct Result
+	{
+    	/* success or failure result data */
+    	Client<Network, Timer, Thread>* client;
+		int connack_rc;
+	};
+
+	typedef void (*resultHandler)(Result*);
    
-    Client(Network* network, Timer* timer, const int buffer_size = 100, const int command_timeout = 30); 
+    Client(Network* network, const int MAX_MQTT_PACKET_SIZE = 100, const int command_timeout = 30); 
        
     int connect(MQTTPacket_connectData* options = 0, resultHandler fn = 0);
     
@@ -78,7 +79,7 @@
     
     int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh, resultHandler rh = 0);
     
-    int unsubscribe(char* topicFilter, resultHandler rh = 0);
+    int unsubscribe(const char* topicFilter, resultHandler rh = 0);
     
     int disconnect(int timeout, resultHandler rh = 0);
     
@@ -86,7 +87,8 @@
     
 private:
 
-    int cycle();
+    int cycle(int timeout);
+	int keepalive();
 
     int decodePacket(int* value, int timeout);
     int readPacket(int timeout = -1);
@@ -94,16 +96,18 @@
     
     Thread* thread;
     Network* ipstack;
-    Timer* timer;
+    Timer command_timer, ping_timer;
     
     char* buf; 
     int buflen;
     
     char* readbuf;
-    int readbuflen;
+    int readbuflen;
+
+    unsigned int keepAliveInterval;
+	bool ping_outstanding;
     
     int command_timeout; // max time to wait for any MQTT command to complete, in seconds
-    int keepalive;
     PacketId packetid;
     
     typedef FP<void, Result*> resultHandlerFP;    
@@ -112,24 +116,33 @@
     
     #define MAX_MESSAGE_HANDLERS 5
     typedef FP<void, Message*> messageHandlerFP;
-    messageHandlerFP messageHandlers[MAX_MESSAGE_HANDLERS];  // Linked list, or constructor parameter to limit array size?
+    messageHandlerFP messageHandlers[MAX_MESSAGE_HANDLERS];  // Linked list, or constructor parameter to limit array size?
+
+	static void threadfn(void* arg);
     
 };
 
-void threadfn(void* arg);
+}
+
+
+template<class Network, class Timer, class Thread> void MQTT::Client<Network, Timer, Thread>::threadfn(void* arg)
+{
+    ((Client<Network, Timer, Thread>*) arg)->run(NULL);
+}
 
-}
 
-template<class Network, class Timer, class Thread> MQTT::Client<Network, Timer, Thread>::Client(Network* network, Timer* timer, const int buffer_size, const int command_timeout)  : packetid()
+template<class Network, class Timer, class Thread> MQTT::Client<Network, Timer, Thread>::Client(Network* network, const int MAX_MQTT_PACKET_SIZE, const int command_timeout)  : packetid()
 {
     
-   buf = new char[buffer_size];
-   readbuf = new char[buffer_size];
-   buflen = readbuflen = buffer_size;
+   buf = new char[MAX_MQTT_PACKET_SIZE];
+   readbuf = new char[MAX_MQTT_PACKET_SIZE];
+   buflen = readbuflen = MAX_MQTT_PACKET_SIZE;
    this->command_timeout = command_timeout;
    this->thread = 0;
    this->ipstack = network;
-   this->timer = timer;
+   this->command_timer = Timer();
+   this->ping_timer = Timer();
+   this->ping_outstanding = 0;
 }
 
 
@@ -138,8 +151,9 @@
     int sent = 0;
     
     while (sent < length)
-        sent += ipstack->write(&buf[sent], length, -1);
-        
+        sent += ipstack->write(&buf[sent], length, -1);
+	if (sent == length)
+	    ping_timer.reset(); // record the fact that we have successfully sent the packet    
     return sent;
 }
 
@@ -205,69 +219,119 @@
 }
 
 
-template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::cycle()
+template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::cycle(int timeout)
 {
-    int timeout = -1;
     /* get one piece of work off the wire and one pass through */
     
-    // 1. read the socket, see what work is due. 
+    // read the socket, see what work is due
     int packet_type = readPacket(timeout);
 
     printf("packet type %d\n", packet_type);
-    
+    
+	int len, rc;
     switch (packet_type)
     {
-        case CONNACK:
-            printf("connack received\n");
-            break;
-        case PUBLISH:
-            break;
+        case CONNACK:
+			if (this->thread)
+			{
+				Result res = {this, 0};
+            	int connack_rc = -1;
+            	if (MQTTDeserialize_connack(&res.connack_rc, readbuf, readbuflen) == 1)
+                	;
+				connectHandler(&res);
+			}
         case PUBACK:
-            break;
         case SUBACK:
             break;
-        case PUBREC:
+        case PUBLISH:
+			MQTTString topicName;
+			Message msg;
+			rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName,
+								 (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, readbuflen);
+			if (msg.qos == QOS0)
+				messageHandlers[0](&msg);
+            break;
+        case PUBREC:
+   	        int type, dup, mypacketid;
+   	        if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
+   	            ; 
+			len = MQTTSerialize_ack(buf, buflen, PUBREL, 0, mypacketid);
+		    rc = sendPacket(len); // send the subscribe packet
+			if (rc != len) 
+				goto exit; // there was a problem
+
             break;
         case PUBCOMP:
             break;
-        case PINGRESP:
+        case PINGRESP:
+			if (ping_outstanding)
+				ping_outstanding = false;
+			//else disconnect();
             break;
         case -1:
             break;
-    }
+    }
+	keepalive();
+exit:
     return packet_type;
+}
+
+
+template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::keepalive()
+{
+	int rc = 0;
+
+	if (keepAliveInterval == 0)
+		goto exit;
+
+	if (ping_timer.read_ms() >= (keepAliveInterval * 1000))
+	{
+		if (ping_outstanding)
+			rc = -1;
+		else
+		{
+			int len = MQTTSerialize_pingreq(buf, buflen);
+			rc = sendPacket(len); // send the connect packet
+			if (rc != len) 
+				rc = -1; // indicate there's a problem
+			else
+				ping_outstanding = true;
+		}
+	}
+
+exit:
+	return rc;
 }
 
 
 template<class Network, class Timer, class Thread> void MQTT::Client<Network, Timer, Thread>::run(void const *argument)
-{
+{
+	while (true)
+		cycle((keepAliveInterval * 1000) - ping_timer.read_ms());
 }
 
 
 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::connect(MQTTPacket_connectData* options, resultHandler resultHandler)
-{
-    int len = 0;
-    int rc = -99;
-    MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
+{
+	command_timer.start();
 
-    /* 2. if the connect was successful, send the MQTT connect packet */   
+    MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
     if (options == 0)
-    {
-        default_options.clientID.cstring = "me";
-        options = &default_options;
-    }
+        options = &default_options; // set default options if none were supplied
     
-    this->keepalive = options->keepAliveInterval;
-    len = MQTTSerialize_connect(buf, buflen, options);
-    printf("len from send is %d %d\n", len, buflen);
-    rc = sendPacket(len); // send the connect packet
-    printf("rc from send is %d\n", rc);
+    this->keepAliveInterval = options->keepAliveInterval;
+	ping_timer.start();
+    int len = MQTTSerialize_connect(buf, buflen, options);
+    int rc = sendPacket(len); // send the connect packet
+	if (rc != len) 
+		goto exit; // there was a problem
     
-    /* 3. wait until the connack is received */
-    if (resultHandler == 0)
-    {
+    if (resultHandler == 0)     // wait until the connack is received 
+    {
+		if (command_timer.read_ms() > (command_timeout * 1000)) 
+			goto exit; // we timed out
         // this will be a blocking call, wait for the connack
-        if (cycle() == CONNACK)
+        if (cycle(command_timeout - command_timer.read_ms()) == CONNACK)
         {
             int connack_rc = -1;
             if (MQTTDeserialize_connack(&connack_rc, readbuf, readbuflen) == 1)
@@ -279,30 +343,33 @@
         // set connect response callback function
         connectHandler.attach(resultHandler);
         
-        // start background thread
-            
-        this->thread = new Thread((void (*)(void const *argument))&MQTT::threadfn, (void*)this);
+        // start background thread            
+        this->thread = new Thread((void (*)(void const *argument))&MQTT::Client<Network, Timer, Thread>::threadfn, (void*)this);
     }
-    
+    
+exit:
+	command_timer.stop();
+	command_timer.reset();
     return rc;
 }
 
 
-template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::subscribe(const char* topicFilter, enum QoS qos, 
-    messageHandler messageHandler, resultHandler resultHandler)
-{
-    int rc = -1, 
-        len = 0;
+template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler, resultHandler resultHandler)
+{
+	command_timer.start();
+
     MQTTString topic = {(char*)topicFilter, 0, 0};
     
-    len = MQTTSerialize_subscribe(buf, buflen, 0, packetid.getNext(), 1, &topic, (int*)&qos);
-    rc = sendPacket(len); // send the subscribe packet
+    int len = MQTTSerialize_subscribe(buf, buflen, 0, packetid.getNext(), 1, &topic, (int*)&qos);
+    int rc = sendPacket(len); // send the subscribe packet
+	if (rc != len) 
+		goto exit; // there was a problem
     
     /* wait for suback */
     if (resultHandler == 0)
     {
         // this will block
-        if (cycle() == SUBACK)
+        if (cycle(command_timeout - command_timer.read_ms()) == SUBACK)
         {
             int count = 0, grantedQoS = -1, mypacketid;
             if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, readbuflen) == 1)
@@ -314,7 +381,103 @@
         // set subscribe response callback function
         
     }
+    
+exit:
+	command_timer.stop();
+	command_timer.reset();
+    return rc;
+}
+
+
+template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::unsubscribe(const char* topicFilter, resultHandler resultHandler)
+{
+	command_timer.start();
+
+    MQTTString topic = {(char*)topicFilter, 0, 0};
     
+    int len = MQTTSerialize_unsubscribe(buf, buflen, 0, packetid.getNext(), 1, &topic);
+    int rc = sendPacket(len); // send the subscribe packet
+	if (rc != len) 
+		goto exit; // there was a problem
+    
+    /* wait for suback */
+    if (resultHandler == 0)
+    {
+        // this will block
+        if (cycle(command_timeout - command_timer.read_ms()) == UNSUBACK)
+        {
+            int mypacketid;
+            if (MQTTDeserialize_unsuback(&mypacketid, readbuf, readbuflen) == 1)
+                rc = 0; 
+        }
+    }
+    else
+    {
+        // set unsubscribe response callback function
+        
+    }
+    
+exit:
+	command_timer.stop();
+	command_timer.reset();
+    return rc;
+}
+
+
+   
+template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::publish(const char* topicName, Message* message, resultHandler resultHandler)
+{
+	command_timer.start();
+
+    MQTTString topic = {(char*)topicName, 0, 0};
+
+	message->id = packetid.getNext();
+    
+	int len = MQTTSerialize_publish(buf, buflen, 0, message->qos, message->retained, message->id, topic, message->payload, message->payloadlen);
+    int rc = sendPacket(len); // send the subscribe packet
+	if (rc != len) 
+		goto exit; // there was a problem
+    
+    /* wait for acks */
+    if (resultHandler == 0)
+    {
+ 		if (message->qos == QOS1)
+		{
+	        if (cycle(command_timeout - command_timer.read_ms()) == PUBACK)
+    	    {
+    	        int type, dup, mypacketid;
+    	        if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
+    	            rc = 0; 
+    	    }
+		}
+		else if (message->qos == QOS2)
+		{
+	        if (cycle(command_timeout - command_timer.read_ms()) == PUBREC)
+    	    {
+    	        int type, dup, mypacketid;
+    	        if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
+    	            rc = 0; 
+				len = MQTTSerialize_ack(buf, buflen, PUBREL, 0, message->id);
+			    rc = sendPacket(len); // send the subscribe packet
+				if (rc != len) 
+					goto exit; // there was a problem
+		        if (cycle(command_timeout - command_timer.read_ms()) == PUBCOMP)
+	    	    {
+    	        	if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
+    	            	rc = 0; 
+				}
+    	    }
+		}
+    }
+    else
+    {
+        // set publish response callback function
+        
+    }
+    
+exit:
+	command_timer.stop();
+	command_timer.reset();
     return rc;
 }