Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependencies: FP MQTTPacket
Fork of MQTTS by
Diff: MQTTAsync.h
- Revision:
- 21:e918525e529d
- Parent:
- 20:cad3d54d7ecf
- Child:
- 22:aadb79d29330
diff -r cad3d54d7ecf -r e918525e529d MQTTAsync.h
--- a/MQTTAsync.h Mon Apr 28 16:07:51 2014 +0000
+++ b/MQTTAsync.h Tue Apr 29 16:04:55 2014 +0000
@@ -14,8 +14,8 @@
* Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/
-#if !defined(MQTTCLIENT_H)
-#define MQTTCLIENT_H
+#if !defined(MQTTASYNC_H)
+#define MQTTASYNC_H
#include "FP.h"
#include "MQTTPacket.h"
@@ -58,19 +58,19 @@
int MAX_MQTT_PACKET_SIZE; //
int MAX_MESSAGE_HANDLERS; // each subscription requires a message handler
int MAX_CONCURRENT_OPERATIONS; // each command which runs concurrently can have a result handler, when we are in multi-threaded mode
- int command_timeout;
+ int command_timeout_ms;
limits()
{
MAX_MQTT_PACKET_SIZE = 100;
MAX_MESSAGE_HANDLERS = 5;
MAX_CONCURRENT_OPERATIONS = 1; // 1 indicates single-threaded mode - set to >1 for multithreaded mode
- command_timeout = 30;
+ command_timeout_ms = 30000;
}
} Limits;
-template<class Network, class Timer, class Thread, class Mutex> class Client
+template<class Network, class Timer, class Thread, class Mutex> class Async
{
public:
@@ -78,28 +78,50 @@
struct Result
{
/* success or failure result data */
- Client<Network, Timer, Thread, Mutex>* client;
- int connack_rc;
+ Async<Network, Timer, Thread, Mutex>* client;
+ int rc;
};
typedef void (*resultHandler)(Result*);
- Client(Network* network, const Limits limits = Limits());
+ Async(Network* network, const Limits limits = Limits());
+
+ typedef struct
+ {
+ Async* client;
+ Network* network;
+ } connectionLostInfo;
+
+ typedef int (*connectionLostHandlers)(connectionLostInfo*);
+
+ /** Set the connection lost callback - called whenever the connection is lost and we should be connected
+ * @param clh - pointer to the callback function
+ */
+ void setConnectionLostHandler(connectionLostHandlers clh)
+ {
+ connectionLostHandler.attach(clh);
+ }
+
+ /** Set the default message handling callback - used for any message which does not match a subscription message handler
+ * @param mh - pointer to the callback function
+ */
+ void setDefaultMessageHandler(messageHandler mh)
+ {
+ defaultMessageHandler.attach(mh);
+ }
- int connect(MQTTPacket_connectData* options = 0, resultHandler fn = 0);
+ int connect(resultHandler fn, MQTTPacket_connectData* options = 0);
template<class T>
- int connect(MQTTPacket_connectData* options = 0, T *item = 0, void(T::*method)(Result *) = 0); // alternative to pass in pointer to member function
+ int connect(void(T::*method)(Result *), MQTTPacket_connectData* options = 0, T *item = 0); // alternative to pass in pointer to member function
- int publish(const char* topic, Message* message, resultHandler rh = 0);
-
- int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh, resultHandler rh = 0);
+ int publish(resultHandler rh, const char* topic, Message* message);
- int unsubscribe(const char* topicFilter, resultHandler rh = 0);
+ int subscribe(resultHandler rh, const char* topicFilter, enum QoS qos, messageHandler mh);
- int disconnect(int timeout, resultHandler rh = 0);
+ int unsubscribe(resultHandler rh, const char* topicFilter);
- void yield(int timeout);
+ int disconnect(resultHandler rh);
private:
@@ -149,19 +171,25 @@
} *operations; // result handlers are indexed by packet ids
static void threadfn(void* arg);
+
+ messageHandlerFP defaultMessageHandler;
+
+ typedef FP<int, connectionLostInfo*> connectionLostFP;
+
+ connectionLostFP connectionLostHandler;
};
}
-template<class Network, class Timer, class Thread, class Mutex> void MQTT::Client<Network, Timer, Thread, Mutex>::threadfn(void* arg)
+template<class Network, class Timer, class Thread, class Mutex> void MQTT::Async<Network, Timer, Thread, Mutex>::threadfn(void* arg)
{
- ((Client<Network, Timer, Thread, Mutex>*) arg)->run(NULL);
+ ((Async<Network, Timer, Thread, Mutex>*) arg)->run(NULL);
}
-template<class Network, class Timer, class Thread, class Mutex> MQTT::Client<Network, Timer, Thread, Mutex>::Client(Network* network, Limits limits) : limits(limits), packetid()
+template<class Network, class Timer, class Thread, class Mutex> MQTT::Async<Network, Timer, Thread, Mutex>::Async(Network* network, Limits limits) : limits(limits), packetid()
{
this->thread = 0;
this->ipstack = network;
@@ -180,7 +208,7 @@
}
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::sendPacket(int length, int timeout)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::sendPacket(int length, int timeout)
{
int sent = 0;
@@ -192,7 +220,7 @@
}
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::decodePacket(int* value, int timeout)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::decodePacket(int* value, int timeout)
{
char c;
int multiplier = 1;
@@ -226,7 +254,7 @@
* @param timeout the max time to wait for the packet read to complete, in milliseconds
* @return the MQTT packet type, or -1 if none
*/
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::readPacket(int timeout)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::readPacket(int timeout)
{
int rc = -1;
MQTTHeader header = {0};
@@ -253,7 +281,7 @@
}
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::deliverMessage(MQTTString* topic, Message* message)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::deliverMessage(MQTTString* topic, Message* message)
{
int rc = -1;
@@ -273,17 +301,7 @@
-template<class Network, class Timer, class Thread, class Mutex> void MQTT::Client<Network, Timer, Thread, Mutex>::yield(int timeout)
-{
- Timer atimer = Timer();
-
- atimer.countdown_ms(timeout);
- while (!atimer.expired())
- cycle(atimer.left_ms());
-}
-
-
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::cycle(int timeout)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::cycle(int timeout)
{
/* get one piece of work off the wire and one pass through */
@@ -297,7 +315,7 @@
if (this->thread)
{
Result res = {this, 0};
- if (MQTTDeserialize_connack(&res.connack_rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
+ if (MQTTDeserialize_connack(&res.rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
;
connectHandler(&res);
connectHandler.detach(); // only invoke the callback once
@@ -339,7 +357,7 @@
}
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::keepalive()
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::keepalive()
{
int rc = 0;
@@ -366,7 +384,7 @@
}
-template<class Network, class Timer, class Thread, class Mutex> void MQTT::Client<Network, Timer, Thread, Mutex>::run(void const *argument)
+template<class Network, class Timer, class Thread, class Mutex> void MQTT::Async<Network, Timer, Thread, Mutex>::run(void const *argument)
{
while (true)
cycle(ping_timer.left_ms());
@@ -374,7 +392,7 @@
// only used in single-threaded mode where one command at a time is in process
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::waitfor(int packet_type, Timer& atimer)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::waitfor(int packet_type, Timer& atimer)
{
int rc = -1;
@@ -389,9 +407,9 @@
}
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::connect(MQTTPacket_connectData* options, resultHandler resultHandler)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::connect(resultHandler resultHandler, MQTTPacket_connectData* options)
{
- connect_timer.countdown(limits.command_timeout);
+ connect_timer.countdown(limits.command_timeout_ms);
MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
if (options == 0)
@@ -420,7 +438,7 @@
connectHandler.attach(resultHandler);
// start background thread
- this->thread = new Thread((void (*)(void const *argument))&MQTT::Client<Network, Timer, Thread, Mutex>::threadfn, (void*)this);
+ this->thread = new Thread((void (*)(void const *argument))&MQTT::Async<Network, Timer, Thread, Mutex>::threadfn, (void*)this);
}
exit:
@@ -428,7 +446,7 @@
}
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::findFreeOperation()
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::findFreeOperation()
{
int found = -1;
for (int i = 0; i < limits.MAX_CONCURRENT_OPERATIONS; ++i)
@@ -443,14 +461,14 @@
}
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler, resultHandler resultHandler)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::subscribe(resultHandler resultHandler, const char* topicFilter, enum QoS qos, messageHandler messageHandler)
{
int index = 0;
if (this->thread)
index = findFreeOperation();
Timer& atimer = operations[index].timer;
- atimer.countdown(limits.command_timeout);
+ atimer.countdown(limits.command_timeout_ms);
MQTTString topic = {(char*)topicFilter, 0, 0};
int len = MQTTSerialize_subscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
@@ -493,14 +511,14 @@
}
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::unsubscribe(const char* topicFilter, resultHandler resultHandler)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::unsubscribe(resultHandler resultHandler, const char* topicFilter)
{
int index = 0;
if (this->thread)
index = findFreeOperation();
Timer& atimer = operations[index].timer;
- atimer.countdown(limits.command_timeout);
+ atimer.countdown(limits.command_timeout_ms);
MQTTString topic = {(char*)topicFilter, 0, 0};
int len = MQTTSerialize_unsubscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic);
@@ -508,22 +526,8 @@
if (rc != len)
goto exit; // there was a problem
- /* wait for unsuback */
- if (resultHandler == 0)
- {
- // this will block
- if (waitfor(UNSUBACK) == UNSUBACK)
- {
- int mypacketid;
- if (MQTTDeserialize_unsuback(&mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
- rc = 0;
- }
- }
- else
- {
- // set unsubscribe response callback function
+ // set unsubscribe response callback function
- }
exit:
return rc;
@@ -531,14 +535,14 @@
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::publish(const char* topicName, Message* message, resultHandler resultHandler)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::publish(resultHandler resultHandler, const char* topicName, Message* message)
{
int index = 0;
if (this->thread)
index = findFreeOperation();
Timer& atimer = operations[index].timer;
- atimer.countdown(limits.command_timeout);
+ atimer.countdown(limits.command_timeout_ms);
MQTTString topic = {(char*)topicName, 0, 0};
if (message->qos == QOS1 || message->qos == QOS2)
@@ -583,4 +587,15 @@
}
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::disconnect(resultHandler resultHandler)
+{
+ Timer timer = Timer(limits.command_timeout_ms); // we might wait for incomplete incoming publishes to complete
+ int len = MQTTSerialize_disconnect(buf, limits.MAX_MQTT_PACKET_SIZE);
+ int rc = sendPacket(len, timer.left_ms()); // send the disconnect packet
+
+ return (rc == len) ? 0 : -1;
+}
+
+
+
#endif
