MQtt to publish the data
Fork of MQTT by
Diff: MQTTAsync.h
- Revision:
- 21:e918525e529d
- Parent:
- 20:cad3d54d7ecf
- Child:
- 22:aadb79d29330
--- a/MQTTAsync.h Mon Apr 28 16:07:51 2014 +0000 +++ b/MQTTAsync.h Tue Apr 29 16:04:55 2014 +0000 @@ -14,8 +14,8 @@ * Ian Craggs - initial API and implementation and/or initial documentation *******************************************************************************/ -#if !defined(MQTTCLIENT_H) -#define MQTTCLIENT_H +#if !defined(MQTTASYNC_H) +#define MQTTASYNC_H #include "FP.h" #include "MQTTPacket.h" @@ -58,19 +58,19 @@ int MAX_MQTT_PACKET_SIZE; // int MAX_MESSAGE_HANDLERS; // each subscription requires a message handler int MAX_CONCURRENT_OPERATIONS; // each command which runs concurrently can have a result handler, when we are in multi-threaded mode - int command_timeout; + int command_timeout_ms; limits() { MAX_MQTT_PACKET_SIZE = 100; MAX_MESSAGE_HANDLERS = 5; MAX_CONCURRENT_OPERATIONS = 1; // 1 indicates single-threaded mode - set to >1 for multithreaded mode - command_timeout = 30; + command_timeout_ms = 30000; } } Limits; -template<class Network, class Timer, class Thread, class Mutex> class Client +template<class Network, class Timer, class Thread, class Mutex> class Async { public: @@ -78,28 +78,50 @@ struct Result { /* success or failure result data */ - Client<Network, Timer, Thread, Mutex>* client; - int connack_rc; + Async<Network, Timer, Thread, Mutex>* client; + int rc; }; typedef void (*resultHandler)(Result*); - Client(Network* network, const Limits limits = Limits()); + Async(Network* network, const Limits limits = Limits()); + + typedef struct + { + Async* client; + Network* network; + } connectionLostInfo; + + typedef int (*connectionLostHandlers)(connectionLostInfo*); + + /** Set the connection lost callback - called whenever the connection is lost and we should be connected + * @param clh - pointer to the callback function + */ + void setConnectionLostHandler(connectionLostHandlers clh) + { + connectionLostHandler.attach(clh); + } + + /** Set the default message handling callback - used for any message which does not match a subscription message handler + * @param mh - pointer to the callback function + */ + void setDefaultMessageHandler(messageHandler mh) + { + defaultMessageHandler.attach(mh); + } - int connect(MQTTPacket_connectData* options = 0, resultHandler fn = 0); + int connect(resultHandler fn, MQTTPacket_connectData* options = 0); template<class T> - int connect(MQTTPacket_connectData* options = 0, T *item = 0, void(T::*method)(Result *) = 0); // alternative to pass in pointer to member function + int connect(void(T::*method)(Result *), MQTTPacket_connectData* options = 0, T *item = 0); // alternative to pass in pointer to member function - int publish(const char* topic, Message* message, resultHandler rh = 0); - - int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh, resultHandler rh = 0); + int publish(resultHandler rh, const char* topic, Message* message); - int unsubscribe(const char* topicFilter, resultHandler rh = 0); + int subscribe(resultHandler rh, const char* topicFilter, enum QoS qos, messageHandler mh); - int disconnect(int timeout, resultHandler rh = 0); + int unsubscribe(resultHandler rh, const char* topicFilter); - void yield(int timeout); + int disconnect(resultHandler rh); private: @@ -149,19 +171,25 @@ } *operations; // result handlers are indexed by packet ids static void threadfn(void* arg); + + messageHandlerFP defaultMessageHandler; + + typedef FP<int, connectionLostInfo*> connectionLostFP; + + connectionLostFP connectionLostHandler; }; } -template<class Network, class Timer, class Thread, class Mutex> void MQTT::Client<Network, Timer, Thread, Mutex>::threadfn(void* arg) +template<class Network, class Timer, class Thread, class Mutex> void MQTT::Async<Network, Timer, Thread, Mutex>::threadfn(void* arg) { - ((Client<Network, Timer, Thread, Mutex>*) arg)->run(NULL); + ((Async<Network, Timer, Thread, Mutex>*) arg)->run(NULL); } -template<class Network, class Timer, class Thread, class Mutex> MQTT::Client<Network, Timer, Thread, Mutex>::Client(Network* network, Limits limits) : limits(limits), packetid() +template<class Network, class Timer, class Thread, class Mutex> MQTT::Async<Network, Timer, Thread, Mutex>::Async(Network* network, Limits limits) : limits(limits), packetid() { this->thread = 0; this->ipstack = network; @@ -180,7 +208,7 @@ } -template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::sendPacket(int length, int timeout) +template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::sendPacket(int length, int timeout) { int sent = 0; @@ -192,7 +220,7 @@ } -template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::decodePacket(int* value, int timeout) +template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::decodePacket(int* value, int timeout) { char c; int multiplier = 1; @@ -226,7 +254,7 @@ * @param timeout the max time to wait for the packet read to complete, in milliseconds * @return the MQTT packet type, or -1 if none */ -template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::readPacket(int timeout) +template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::readPacket(int timeout) { int rc = -1; MQTTHeader header = {0}; @@ -253,7 +281,7 @@ } -template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::deliverMessage(MQTTString* topic, Message* message) +template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::deliverMessage(MQTTString* topic, Message* message) { int rc = -1; @@ -273,17 +301,7 @@ -template<class Network, class Timer, class Thread, class Mutex> void MQTT::Client<Network, Timer, Thread, Mutex>::yield(int timeout) -{ - Timer atimer = Timer(); - - atimer.countdown_ms(timeout); - while (!atimer.expired()) - cycle(atimer.left_ms()); -} - - -template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::cycle(int timeout) +template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::cycle(int timeout) { /* get one piece of work off the wire and one pass through */ @@ -297,7 +315,7 @@ if (this->thread) { Result res = {this, 0}; - if (MQTTDeserialize_connack(&res.connack_rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) + if (MQTTDeserialize_connack(&res.rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) ; connectHandler(&res); connectHandler.detach(); // only invoke the callback once @@ -339,7 +357,7 @@ } -template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::keepalive() +template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::keepalive() { int rc = 0; @@ -366,7 +384,7 @@ } -template<class Network, class Timer, class Thread, class Mutex> void MQTT::Client<Network, Timer, Thread, Mutex>::run(void const *argument) +template<class Network, class Timer, class Thread, class Mutex> void MQTT::Async<Network, Timer, Thread, Mutex>::run(void const *argument) { while (true) cycle(ping_timer.left_ms()); @@ -374,7 +392,7 @@ // only used in single-threaded mode where one command at a time is in process -template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::waitfor(int packet_type, Timer& atimer) +template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::waitfor(int packet_type, Timer& atimer) { int rc = -1; @@ -389,9 +407,9 @@ } -template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::connect(MQTTPacket_connectData* options, resultHandler resultHandler) +template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::connect(resultHandler resultHandler, MQTTPacket_connectData* options) { - connect_timer.countdown(limits.command_timeout); + connect_timer.countdown(limits.command_timeout_ms); MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; if (options == 0) @@ -420,7 +438,7 @@ connectHandler.attach(resultHandler); // start background thread - this->thread = new Thread((void (*)(void const *argument))&MQTT::Client<Network, Timer, Thread, Mutex>::threadfn, (void*)this); + this->thread = new Thread((void (*)(void const *argument))&MQTT::Async<Network, Timer, Thread, Mutex>::threadfn, (void*)this); } exit: @@ -428,7 +446,7 @@ } -template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::findFreeOperation() +template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::findFreeOperation() { int found = -1; for (int i = 0; i < limits.MAX_CONCURRENT_OPERATIONS; ++i) @@ -443,14 +461,14 @@ } -template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler, resultHandler resultHandler) +template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::subscribe(resultHandler resultHandler, const char* topicFilter, enum QoS qos, messageHandler messageHandler) { int index = 0; if (this->thread) index = findFreeOperation(); Timer& atimer = operations[index].timer; - atimer.countdown(limits.command_timeout); + atimer.countdown(limits.command_timeout_ms); MQTTString topic = {(char*)topicFilter, 0, 0}; int len = MQTTSerialize_subscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); @@ -493,14 +511,14 @@ } -template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::unsubscribe(const char* topicFilter, resultHandler resultHandler) +template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::unsubscribe(resultHandler resultHandler, const char* topicFilter) { int index = 0; if (this->thread) index = findFreeOperation(); Timer& atimer = operations[index].timer; - atimer.countdown(limits.command_timeout); + atimer.countdown(limits.command_timeout_ms); MQTTString topic = {(char*)topicFilter, 0, 0}; int len = MQTTSerialize_unsubscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic); @@ -508,22 +526,8 @@ if (rc != len) goto exit; // there was a problem - /* wait for unsuback */ - if (resultHandler == 0) - { - // this will block - if (waitfor(UNSUBACK) == UNSUBACK) - { - int mypacketid; - if (MQTTDeserialize_unsuback(&mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) - rc = 0; - } - } - else - { - // set unsubscribe response callback function + // set unsubscribe response callback function - } exit: return rc; @@ -531,14 +535,14 @@ -template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::publish(const char* topicName, Message* message, resultHandler resultHandler) +template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::publish(resultHandler resultHandler, const char* topicName, Message* message) { int index = 0; if (this->thread) index = findFreeOperation(); Timer& atimer = operations[index].timer; - atimer.countdown(limits.command_timeout); + atimer.countdown(limits.command_timeout_ms); MQTTString topic = {(char*)topicName, 0, 0}; if (message->qos == QOS1 || message->qos == QOS2) @@ -583,4 +587,15 @@ } +template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::disconnect(resultHandler resultHandler) +{ + Timer timer = Timer(limits.command_timeout_ms); // we might wait for incomplete incoming publishes to complete + int len = MQTTSerialize_disconnect(buf, limits.MAX_MQTT_PACKET_SIZE); + int rc = sendPacket(len, timer.left_ms()); // send the disconnect packet + + return (rc == len) ? 0 : -1; +} + + + #endif