Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Diff: MQTTClient.h
- Revision:
- 9:01b8cc7d94cc
- Parent:
- 8:c46930bd6c82
- Child:
- 11:db15da110a37
- Child:
- 12:cc7f2d62a393
diff -r c46930bd6c82 -r 01b8cc7d94cc MQTTClient.h
--- a/MQTTClient.h Wed Apr 09 13:48:20 2014 +0000
+++ b/MQTTClient.h Wed Apr 09 23:21:54 2014 +0000
@@ -23,8 +23,6 @@
namespace MQTT
{
-
-const int MAX_PACKET_ID = 65535;
enum QoS { QOS0, QOS1, QOS2 };
@@ -48,29 +46,46 @@
Client<class Network, class Timer, class Thread>* client;
};
+
+class PacketId
+{
+public:
+ PacketId();
+
+ int getNext();
+
+private:
+ static const int MAX_PACKET_ID = 65535;
+ int next;
+};
+
+typedef void (*resultHandler)(Result*);
+typedef void (*messageHandler)(Message*);
template<class Network, class Timer, class Thread> class Client
{
public:
- Client(Network* network, Timer* timer, const int buffer_size = 100, const int command_timeout = 30);
+ Client(Network* network, Timer* timer, const int buffer_size = 100, const int command_timeout = 30);
- int connect(MQTTPacket_connectData* options = 0, FP<void, Result*> *resultHandler = 0);
-
- int publish(const char* topic, Message* message, FP<void, Result*> *resultHandler = 0);
+ int connect(MQTTPacket_connectData* options = 0, resultHandler fn = 0);
- int subscribe(const char* topicFilter, enum QoS qos, FP<void, Message*> messageHandler, FP<void, Result*> *resultHandler = 0);
+ template<class T>
+ int connect(MQTTPacket_connectData* options = 0, T *item = 0, void(T::*method)(Result *) = 0); // alternative to pass in pointer to member function
+
+ int publish(const char* topic, Message* message, resultHandler rh = 0);
- int unsubscribe(char* topicFilter, FP<void, Result*> *resultHandler = 0);
+ int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh, resultHandler rh = 0);
- int disconnect(int timeout, FP<void, Result*> *resultHandler = 0);
+ int unsubscribe(char* topicFilter, resultHandler rh = 0);
+
+ int disconnect(int timeout, resultHandler rh = 0);
void run(void const *argument);
private:
- int getPacketId();
int cycle();
int decodePacket(int* value, int timeout);
@@ -81,7 +96,7 @@
Network* ipstack;
Timer* timer;
- char* buf;
+ char* buf;
int buflen;
char* readbuf;
@@ -89,7 +104,15 @@
int command_timeout; // max time to wait for any MQTT command to complete, in seconds
int keepalive;
- int packetid;
+ PacketId packetid;
+
+ typedef FP<void, Result*> resultHandlerFP;
+ // how many concurrent operations should we allow? Each one will require a function pointer
+ resultHandlerFP connectHandler;
+
+ #define MAX_MESSAGE_HANDLERS 5
+ typedef FP<void, Message*> messageHandlerFP;
+ messageHandlerFP messageHandlers[MAX_MESSAGE_HANDLERS]; // Linked list, or constructor parameter to limit array size?
};
@@ -97,7 +120,7 @@
}
-template<class Network, class Timer, class Thread> MQTT::Client<Network, Timer, Thread>::Client(Network* network, Timer* timer, const int buffer_size, const int command_timeout)
+template<class Network, class Timer, class Thread> MQTT::Client<Network, Timer, Thread>::Client(Network* network, Timer* timer, const int buffer_size, const int command_timeout) : packetid()
{
buf = new char[buffer_size];
@@ -106,17 +129,10 @@
this->command_timeout = command_timeout;
this->thread = 0;
this->ipstack = network;
- this->packetid = 0;
this->timer = timer;
}
-template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::getPacketId()
-{
- return this->packetid = (this->packetid == MAX_PACKET_ID) ? 1 : ++this->packetid;
-}
-
-
template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::sendPacket(int length, int timeout)
{
int sent = 0;
@@ -191,11 +207,11 @@
template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::cycle()
{
- int timeout = 1000L;
+ int timeout = -1;
/* get one piece of work off the wire and one pass through */
// 1. read the socket, see what work is due.
- int packet_type = readPacket(-1);
+ int packet_type = readPacket(timeout);
printf("packet type %d\n", packet_type);
@@ -228,7 +244,7 @@
}
-template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::connect(MQTTPacket_connectData* options, FP<void, MQTT::Result*> *resultHandler)
+template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::connect(MQTTPacket_connectData* options, resultHandler resultHandler)
{
int len = 0;
int rc = -99;
@@ -261,6 +277,7 @@
else
{
// set connect response callback function
+ connectHandler.attach(resultHandler);
// start background thread
@@ -271,14 +288,14 @@
}
-template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::subscribe(const char* topicFilter, enum QoS qos, FP<void, Message*> messageHandler,
- FP<void, Result*> *resultHandler)
+template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::subscribe(const char* topicFilter, enum QoS qos,
+ messageHandler messageHandler, resultHandler resultHandler)
{
int rc = -1,
len = 0;
MQTTString topic = {(char*)topicFilter, 0, 0};
- len = MQTTSerialize_subscribe(buf, buflen, 0, getPacketId(), 1, &topic, (int*)&qos);
+ len = MQTTSerialize_subscribe(buf, buflen, 0, packetid.getNext(), 1, &topic, (int*)&qos);
rc = sendPacket(len); // send the subscribe packet
/* wait for suback */
@@ -287,8 +304,8 @@
// this will block
if (cycle() == SUBACK)
{
- int count = 0, grantedQoS = -1;
- if (MQTTDeserialize_suback(&packetid, 1, &count, &grantedQoS, readbuf, readbuflen) == 1)
+ int count = 0, grantedQoS = -1, mypacketid;
+ if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, readbuflen) == 1)
rc = grantedQoS; // 0, 1, 2 or 0x80
}
}