Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Fork of MQTT by
Diff: MQTTClient.h
- Revision:
- 9:01b8cc7d94cc
- Parent:
- 8:c46930bd6c82
- Child:
- 11:db15da110a37
- Child:
- 12:cc7f2d62a393
--- a/MQTTClient.h Wed Apr 09 13:48:20 2014 +0000 +++ b/MQTTClient.h Wed Apr 09 23:21:54 2014 +0000 @@ -23,8 +23,6 @@ namespace MQTT { - -const int MAX_PACKET_ID = 65535; enum QoS { QOS0, QOS1, QOS2 }; @@ -48,29 +46,46 @@ Client<class Network, class Timer, class Thread>* client; }; + +class PacketId +{ +public: + PacketId(); + + int getNext(); + +private: + static const int MAX_PACKET_ID = 65535; + int next; +}; + +typedef void (*resultHandler)(Result*); +typedef void (*messageHandler)(Message*); template<class Network, class Timer, class Thread> class Client { public: - Client(Network* network, Timer* timer, const int buffer_size = 100, const int command_timeout = 30); + Client(Network* network, Timer* timer, const int buffer_size = 100, const int command_timeout = 30); - int connect(MQTTPacket_connectData* options = 0, FP<void, Result*> *resultHandler = 0); - - int publish(const char* topic, Message* message, FP<void, Result*> *resultHandler = 0); + int connect(MQTTPacket_connectData* options = 0, resultHandler fn = 0); - int subscribe(const char* topicFilter, enum QoS qos, FP<void, Message*> messageHandler, FP<void, Result*> *resultHandler = 0); + template<class T> + int connect(MQTTPacket_connectData* options = 0, T *item = 0, void(T::*method)(Result *) = 0); // alternative to pass in pointer to member function + + int publish(const char* topic, Message* message, resultHandler rh = 0); - int unsubscribe(char* topicFilter, FP<void, Result*> *resultHandler = 0); + int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh, resultHandler rh = 0); - int disconnect(int timeout, FP<void, Result*> *resultHandler = 0); + int unsubscribe(char* topicFilter, resultHandler rh = 0); + + int disconnect(int timeout, resultHandler rh = 0); void run(void const *argument); private: - int getPacketId(); int cycle(); int decodePacket(int* value, int timeout); @@ -81,7 +96,7 @@ Network* ipstack; Timer* timer; - char* buf; + char* buf; int buflen; char* readbuf; @@ -89,7 +104,15 @@ int command_timeout; // max time to wait for any MQTT command to complete, in seconds int keepalive; - int packetid; + PacketId packetid; + + typedef FP<void, Result*> resultHandlerFP; + // how many concurrent operations should we allow? Each one will require a function pointer + resultHandlerFP connectHandler; + + #define MAX_MESSAGE_HANDLERS 5 + typedef FP<void, Message*> messageHandlerFP; + messageHandlerFP messageHandlers[MAX_MESSAGE_HANDLERS]; // Linked list, or constructor parameter to limit array size? }; @@ -97,7 +120,7 @@ } -template<class Network, class Timer, class Thread> MQTT::Client<Network, Timer, Thread>::Client(Network* network, Timer* timer, const int buffer_size, const int command_timeout) +template<class Network, class Timer, class Thread> MQTT::Client<Network, Timer, Thread>::Client(Network* network, Timer* timer, const int buffer_size, const int command_timeout) : packetid() { buf = new char[buffer_size]; @@ -106,17 +129,10 @@ this->command_timeout = command_timeout; this->thread = 0; this->ipstack = network; - this->packetid = 0; this->timer = timer; } -template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::getPacketId() -{ - return this->packetid = (this->packetid == MAX_PACKET_ID) ? 1 : ++this->packetid; -} - - template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::sendPacket(int length, int timeout) { int sent = 0; @@ -191,11 +207,11 @@ template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::cycle() { - int timeout = 1000L; + int timeout = -1; /* get one piece of work off the wire and one pass through */ // 1. read the socket, see what work is due. - int packet_type = readPacket(-1); + int packet_type = readPacket(timeout); printf("packet type %d\n", packet_type); @@ -228,7 +244,7 @@ } -template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::connect(MQTTPacket_connectData* options, FP<void, MQTT::Result*> *resultHandler) +template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::connect(MQTTPacket_connectData* options, resultHandler resultHandler) { int len = 0; int rc = -99; @@ -261,6 +277,7 @@ else { // set connect response callback function + connectHandler.attach(resultHandler); // start background thread @@ -271,14 +288,14 @@ } -template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::subscribe(const char* topicFilter, enum QoS qos, FP<void, Message*> messageHandler, - FP<void, Result*> *resultHandler) +template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::subscribe(const char* topicFilter, enum QoS qos, + messageHandler messageHandler, resultHandler resultHandler) { int rc = -1, len = 0; MQTTString topic = {(char*)topicFilter, 0, 0}; - len = MQTTSerialize_subscribe(buf, buflen, 0, getPacketId(), 1, &topic, (int*)&qos); + len = MQTTSerialize_subscribe(buf, buflen, 0, packetid.getNext(), 1, &topic, (int*)&qos); rc = sendPacket(len); // send the subscribe packet /* wait for suback */ @@ -287,8 +304,8 @@ // this will block if (cycle() == SUBACK) { - int count = 0, grantedQoS = -1; - if (MQTTDeserialize_suback(&packetid, 1, &count, &grantedQoS, readbuf, readbuflen) == 1) + int count = 0, grantedQoS = -1, mypacketid; + if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, readbuflen) == 1) rc = grantedQoS; // 0, 1, 2 or 0x80 } }