Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Diff: MQTTClient.h
- Revision:
- 9:01b8cc7d94cc
- Parent:
- 8:c46930bd6c82
- Child:
- 11:db15da110a37
- Child:
- 12:cc7f2d62a393
diff -r c46930bd6c82 -r 01b8cc7d94cc MQTTClient.h --- a/MQTTClient.h Wed Apr 09 13:48:20 2014 +0000 +++ b/MQTTClient.h Wed Apr 09 23:21:54 2014 +0000 @@ -23,8 +23,6 @@ namespace MQTT { - -const int MAX_PACKET_ID = 65535; enum QoS { QOS0, QOS1, QOS2 }; @@ -48,29 +46,46 @@ Client<class Network, class Timer, class Thread>* client; }; + +class PacketId +{ +public: + PacketId(); + + int getNext(); + +private: + static const int MAX_PACKET_ID = 65535; + int next; +}; + +typedef void (*resultHandler)(Result*); +typedef void (*messageHandler)(Message*); template<class Network, class Timer, class Thread> class Client { public: - Client(Network* network, Timer* timer, const int buffer_size = 100, const int command_timeout = 30); + Client(Network* network, Timer* timer, const int buffer_size = 100, const int command_timeout = 30); - int connect(MQTTPacket_connectData* options = 0, FP<void, Result*> *resultHandler = 0); - - int publish(const char* topic, Message* message, FP<void, Result*> *resultHandler = 0); + int connect(MQTTPacket_connectData* options = 0, resultHandler fn = 0); - int subscribe(const char* topicFilter, enum QoS qos, FP<void, Message*> messageHandler, FP<void, Result*> *resultHandler = 0); + template<class T> + int connect(MQTTPacket_connectData* options = 0, T *item = 0, void(T::*method)(Result *) = 0); // alternative to pass in pointer to member function + + int publish(const char* topic, Message* message, resultHandler rh = 0); - int unsubscribe(char* topicFilter, FP<void, Result*> *resultHandler = 0); + int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh, resultHandler rh = 0); - int disconnect(int timeout, FP<void, Result*> *resultHandler = 0); + int unsubscribe(char* topicFilter, resultHandler rh = 0); + + int disconnect(int timeout, resultHandler rh = 0); void run(void const *argument); private: - int getPacketId(); int cycle(); int decodePacket(int* value, int timeout); @@ -81,7 +96,7 @@ Network* ipstack; Timer* timer; - char* buf; + char* buf; int buflen; char* readbuf; @@ -89,7 +104,15 @@ int command_timeout; // max time to wait for any MQTT command to complete, in seconds int keepalive; - int packetid; + PacketId packetid; + + typedef FP<void, Result*> resultHandlerFP; + // how many concurrent operations should we allow? Each one will require a function pointer + resultHandlerFP connectHandler; + + #define MAX_MESSAGE_HANDLERS 5 + typedef FP<void, Message*> messageHandlerFP; + messageHandlerFP messageHandlers[MAX_MESSAGE_HANDLERS]; // Linked list, or constructor parameter to limit array size? }; @@ -97,7 +120,7 @@ } -template<class Network, class Timer, class Thread> MQTT::Client<Network, Timer, Thread>::Client(Network* network, Timer* timer, const int buffer_size, const int command_timeout) +template<class Network, class Timer, class Thread> MQTT::Client<Network, Timer, Thread>::Client(Network* network, Timer* timer, const int buffer_size, const int command_timeout) : packetid() { buf = new char[buffer_size]; @@ -106,17 +129,10 @@ this->command_timeout = command_timeout; this->thread = 0; this->ipstack = network; - this->packetid = 0; this->timer = timer; } -template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::getPacketId() -{ - return this->packetid = (this->packetid == MAX_PACKET_ID) ? 1 : ++this->packetid; -} - - template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::sendPacket(int length, int timeout) { int sent = 0; @@ -191,11 +207,11 @@ template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::cycle() { - int timeout = 1000L; + int timeout = -1; /* get one piece of work off the wire and one pass through */ // 1. read the socket, see what work is due. - int packet_type = readPacket(-1); + int packet_type = readPacket(timeout); printf("packet type %d\n", packet_type); @@ -228,7 +244,7 @@ } -template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::connect(MQTTPacket_connectData* options, FP<void, MQTT::Result*> *resultHandler) +template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::connect(MQTTPacket_connectData* options, resultHandler resultHandler) { int len = 0; int rc = -99; @@ -261,6 +277,7 @@ else { // set connect response callback function + connectHandler.attach(resultHandler); // start background thread @@ -271,14 +288,14 @@ } -template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::subscribe(const char* topicFilter, enum QoS qos, FP<void, Message*> messageHandler, - FP<void, Result*> *resultHandler) +template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::subscribe(const char* topicFilter, enum QoS qos, + messageHandler messageHandler, resultHandler resultHandler) { int rc = -1, len = 0; MQTTString topic = {(char*)topicFilter, 0, 0}; - len = MQTTSerialize_subscribe(buf, buflen, 0, getPacketId(), 1, &topic, (int*)&qos); + len = MQTTSerialize_subscribe(buf, buflen, 0, packetid.getNext(), 1, &topic, (int*)&qos); rc = sendPacket(len); // send the subscribe packet /* wait for suback */ @@ -287,8 +304,8 @@ // this will block if (cycle() == SUBACK) { - int count = 0, grantedQoS = -1; - if (MQTTDeserialize_suback(&packetid, 1, &count, &grantedQoS, readbuf, readbuflen) == 1) + int count = 0, grantedQoS = -1, mypacketid; + if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, readbuflen) == 1) rc = grantedQoS; // 0, 1, 2 or 0x80 } }