Better timing

Dependencies:   FP MQTTPacket

Fork of MQTT by MQTT

Revision:
21:e918525e529d
Parent:
20:cad3d54d7ecf
Child:
22:aadb79d29330
--- a/MQTTAsync.h	Mon Apr 28 16:07:51 2014 +0000
+++ b/MQTTAsync.h	Tue Apr 29 16:04:55 2014 +0000
@@ -14,8 +14,8 @@
  *    Ian Craggs - initial API and implementation and/or initial documentation
  *******************************************************************************/
 
-#if !defined(MQTTCLIENT_H)
-#define MQTTCLIENT_H
+#if !defined(MQTTASYNC_H)
+#define MQTTASYNC_H
 
 #include "FP.h"
 #include "MQTTPacket.h"
@@ -58,19 +58,19 @@
 	int MAX_MQTT_PACKET_SIZE; // 
 	int MAX_MESSAGE_HANDLERS;  // each subscription requires a message handler
 	int MAX_CONCURRENT_OPERATIONS;  // each command which runs concurrently can have a result handler, when we are in multi-threaded mode
-	int command_timeout;
+	int command_timeout_ms;
 		
 	limits()
 	{
 		MAX_MQTT_PACKET_SIZE = 100;
 		MAX_MESSAGE_HANDLERS = 5;
 		MAX_CONCURRENT_OPERATIONS = 1; // 1 indicates single-threaded mode - set to >1 for multithreaded mode
-		command_timeout = 30;
+		command_timeout_ms = 30000;
 	}
 } Limits;
   
   
-template<class Network, class Timer, class Thread, class Mutex> class Client
+template<class Network, class Timer, class Thread, class Mutex> class Async
 {
     
 public:    
@@ -78,28 +78,50 @@
 	struct Result
 	{
     	/* success or failure result data */
-    	Client<Network, Timer, Thread, Mutex>* client;
-		int connack_rc;
+    	Async<Network, Timer, Thread, Mutex>* client;
+		int rc;
 	};
 
 	typedef void (*resultHandler)(Result*);	
    
-    Client(Network* network, const Limits limits = Limits()); 
+    Async(Network* network, const Limits limits = Limits()); 
+        
+    typedef struct
+    {
+        Async* client;
+        Network* network;
+    } connectionLostInfo;
+    
+    typedef int (*connectionLostHandlers)(connectionLostInfo*);
+    
+    /** Set the connection lost callback - called whenever the connection is lost and we should be connected
+     *  @param clh - pointer to the callback function
+     */
+    void setConnectionLostHandler(connectionLostHandlers clh)
+    {
+        connectionLostHandler.attach(clh);
+    }
+    
+    /** Set the default message handling callback - used for any message which does not match a subscription message handler
+     *  @param mh - pointer to the callback function
+     */
+    void setDefaultMessageHandler(messageHandler mh)
+    {
+        defaultMessageHandler.attach(mh);
+    }
            
-    int connect(MQTTPacket_connectData* options = 0, resultHandler fn = 0);
+    int connect(resultHandler fn, MQTTPacket_connectData* options = 0);
     
      template<class T>
-    int connect(MQTTPacket_connectData* options = 0, T *item = 0, void(T::*method)(Result *) = 0);  // alternative to pass in pointer to member function
+    int connect(void(T::*method)(Result *), MQTTPacket_connectData* options = 0, T *item = 0);  // alternative to pass in pointer to member function
         
-    int publish(const char* topic, Message* message, resultHandler rh = 0);
-    
-    int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh, resultHandler rh = 0);
+    int publish(resultHandler rh, const char* topic, Message* message);
     
-    int unsubscribe(const char* topicFilter, resultHandler rh = 0);
+    int subscribe(resultHandler rh, const char* topicFilter, enum QoS qos, messageHandler mh);
     
-    int disconnect(int timeout, resultHandler rh = 0);
+    int unsubscribe(resultHandler rh, const char* topicFilter);
     
-	void yield(int timeout);
+    int disconnect(resultHandler rh);
     
 private:
 
@@ -149,19 +171,25 @@
     } *operations;           // result handlers are indexed by packet ids
 
 	static void threadfn(void* arg);
+	
+	messageHandlerFP defaultMessageHandler;
+    
+    typedef FP<int, connectionLostInfo*> connectionLostFP;
+    
+    connectionLostFP connectionLostHandler;
     
 };
 
 }
 
 
-template<class Network, class Timer, class Thread, class Mutex> void MQTT::Client<Network, Timer, Thread, Mutex>::threadfn(void* arg)
+template<class Network, class Timer, class Thread, class Mutex> void MQTT::Async<Network, Timer, Thread, Mutex>::threadfn(void* arg)
 {
-    ((Client<Network, Timer, Thread, Mutex>*) arg)->run(NULL);
+    ((Async<Network, Timer, Thread, Mutex>*) arg)->run(NULL);
 }
 
 
-template<class Network, class Timer, class Thread, class Mutex> MQTT::Client<Network, Timer, Thread, Mutex>::Client(Network* network, Limits limits)  : limits(limits), packetid()
+template<class Network, class Timer, class Thread, class Mutex> MQTT::Async<Network, Timer, Thread, Mutex>::Async(Network* network, Limits limits)  : limits(limits), packetid()
 {
 	this->thread = 0;
 	this->ipstack = network;
@@ -180,7 +208,7 @@
 }
 
 
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::sendPacket(int length, int timeout)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::sendPacket(int length, int timeout)
 {
     int sent = 0;
     
@@ -192,7 +220,7 @@
 }
 
 
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::decodePacket(int* value, int timeout)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::decodePacket(int* value, int timeout)
 {
     char c;
     int multiplier = 1;
@@ -226,7 +254,7 @@
  * @param timeout the max time to wait for the packet read to complete, in milliseconds
  * @return the MQTT packet type, or -1 if none
  */
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::readPacket(int timeout) 
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::readPacket(int timeout) 
 {
     int rc = -1;
     MQTTHeader header = {0};
@@ -253,7 +281,7 @@
 }
 
 
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::deliverMessage(MQTTString* topic, Message* message)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::deliverMessage(MQTTString* topic, Message* message)
 {
 	int rc = -1;
 
@@ -273,17 +301,7 @@
 
 
 
-template<class Network, class Timer, class Thread, class Mutex> void MQTT::Client<Network, Timer, Thread, Mutex>::yield(int timeout)
-{
-	Timer atimer = Timer();
-	
-	atimer.countdown_ms(timeout);
-	while (!atimer.expired())
-		cycle(atimer.left_ms());
-}
-
-
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::cycle(int timeout)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::cycle(int timeout)
 {
     /* get one piece of work off the wire and one pass through */
 
@@ -297,7 +315,7 @@
 			if (this->thread)
 			{
 				Result res = {this, 0};
-            	if (MQTTDeserialize_connack(&res.connack_rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
+            	if (MQTTDeserialize_connack(&res.rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
                 	;
 				connectHandler(&res);
 				connectHandler.detach(); // only invoke the callback once
@@ -339,7 +357,7 @@
 }
 
 
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::keepalive()
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::keepalive()
 {
 	int rc = 0;
 
@@ -366,7 +384,7 @@
 }
 
 
-template<class Network, class Timer, class Thread, class Mutex> void MQTT::Client<Network, Timer, Thread, Mutex>::run(void const *argument)
+template<class Network, class Timer, class Thread, class Mutex> void MQTT::Async<Network, Timer, Thread, Mutex>::run(void const *argument)
 {
 	while (true)
 		cycle(ping_timer.left_ms());
@@ -374,7 +392,7 @@
 
 
 // only used in single-threaded mode where one command at a time is in process
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::waitfor(int packet_type, Timer& atimer)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::waitfor(int packet_type, Timer& atimer)
 {
 	int rc = -1;
 	
@@ -389,9 +407,9 @@
 }
 
 
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::connect(MQTTPacket_connectData* options, resultHandler resultHandler)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::connect(resultHandler resultHandler, MQTTPacket_connectData* options)
 {
-	connect_timer.countdown(limits.command_timeout);
+	connect_timer.countdown(limits.command_timeout_ms);
 
     MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
     if (options == 0)
@@ -420,7 +438,7 @@
         connectHandler.attach(resultHandler);
         
         // start background thread            
-        this->thread = new Thread((void (*)(void const *argument))&MQTT::Client<Network, Timer, Thread, Mutex>::threadfn, (void*)this);
+        this->thread = new Thread((void (*)(void const *argument))&MQTT::Async<Network, Timer, Thread, Mutex>::threadfn, (void*)this);
     }
     
 exit:
@@ -428,7 +446,7 @@
 }
 
 
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::findFreeOperation()
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::findFreeOperation()
 {
 	int found = -1;
 	for (int i = 0; i < limits.MAX_CONCURRENT_OPERATIONS; ++i)
@@ -443,14 +461,14 @@
 }
 
 
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler, resultHandler resultHandler)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::subscribe(resultHandler resultHandler, const char* topicFilter, enum QoS qos, messageHandler messageHandler)
 {
 	int index = 0;
 	if (this->thread)
 		index = findFreeOperation();	
 	Timer& atimer = operations[index].timer;
 	
-	atimer.countdown(limits.command_timeout);
+	atimer.countdown(limits.command_timeout_ms);
     MQTTString topic = {(char*)topicFilter, 0, 0};
     
     int len = MQTTSerialize_subscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
@@ -493,14 +511,14 @@
 }
 
 
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::unsubscribe(const char* topicFilter, resultHandler resultHandler)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::unsubscribe(resultHandler resultHandler, const char* topicFilter)
 {
 	int index = 0;
 	if (this->thread)
 		index = findFreeOperation();	
 	Timer& atimer = operations[index].timer;
 
-	atimer.countdown(limits.command_timeout);
+	atimer.countdown(limits.command_timeout_ms);
     MQTTString topic = {(char*)topicFilter, 0, 0};
     
     int len = MQTTSerialize_unsubscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic);
@@ -508,22 +526,8 @@
 	if (rc != len) 
 		goto exit; // there was a problem
     
-    /* wait for unsuback */
-    if (resultHandler == 0)
-    {
-        // this will block
-        if (waitfor(UNSUBACK) == UNSUBACK)
-        {
-            int mypacketid;
-            if (MQTTDeserialize_unsuback(&mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
-                rc = 0; 
-        }
-    }
-    else
-    {
-        // set unsubscribe response callback function
+    // set unsubscribe response callback function
         
-    }
     
 exit:
     return rc;
@@ -531,14 +535,14 @@
 
 
    
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::publish(const char* topicName, Message* message, resultHandler resultHandler)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::publish(resultHandler resultHandler, const char* topicName, Message* message)
 {
 	int index = 0;
 	if (this->thread)
 		index = findFreeOperation();	
 	Timer& atimer = operations[index].timer;
 
-	atimer.countdown(limits.command_timeout);
+	atimer.countdown(limits.command_timeout_ms);
     MQTTString topic = {(char*)topicName, 0, 0};
 
 	if (message->qos == QOS1 || message->qos == QOS2)
@@ -583,4 +587,15 @@
 }
 
 
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::disconnect(resultHandler resultHandler)
+{  
+    Timer timer = Timer(limits.command_timeout_ms);     // we might wait for incomplete incoming publishes to complete
+    int len = MQTTSerialize_disconnect(buf, limits.MAX_MQTT_PACKET_SIZE);
+    int rc = sendPacket(len, timer.left_ms());   // send the disconnect packet
+    
+    return (rc == len) ? 0 : -1;
+}
+
+
+
 #endif