Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Fork of MQTT by
Diff: MQTTClient.h
- Revision:
- 16:91c2f9a144d4
- Parent:
- 15:64a57183aa03
- Child:
- 19:57f6f976e878
diff -r 64a57183aa03 -r 91c2f9a144d4 MQTTClient.h --- a/MQTTClient.h Sun Apr 13 22:32:28 2014 +0000 +++ b/MQTTClient.h Mon Apr 14 18:51:52 2014 +0000 @@ -38,7 +38,6 @@ size_t payloadlen; }; -template<class Network, class Timer, class Thread> class Client; class PacketId { @@ -53,8 +52,25 @@ }; typedef void (*messageHandler)(Message*); + +typedef struct limits +{ + int MAX_MQTT_PACKET_SIZE; // + int MAX_MESSAGE_HANDLERS; // each subscription requires a message handler + int MAX_CONCURRENT_OPERATIONS; // each command which runs concurrently can have a result handler, when we are in multi-threaded mode + int command_timeout; + + limits() + { + MAX_MQTT_PACKET_SIZE = 100; + MAX_MESSAGE_HANDLERS = 5; + MAX_CONCURRENT_OPERATIONS = 1; // 1 indicates single-threaded mode - set to >1 for multithreaded mode + command_timeout = 30; + } +} Limits; -template<class Network, class Timer, class Thread> class Client + +template<class Network, class Timer, class Thread, class Mutex> class Client { public: @@ -62,30 +78,14 @@ struct Result { /* success or failure result data */ - Client<Network, Timer, Thread>* client; + Client<Network, Timer, Thread, Mutex>* client; int connack_rc; }; - typedef void (*resultHandler)(Result*); - - struct limits - { - int MAX_MQTT_PACKET_SIZE; // - int MAX_MESSAGE_HANDLERS; // 5 - each subscription requires a message handler - int MAX_CONCURRENT_OPERATIONS; // each command which runs concurrently can have a result handler, when we are in multi-threaded mode - int command_timeout; - - limits() - { - MAX_MQTT_PACKET_SIZE = 100; - MAX_MESSAGE_HANDLERS = 5; - MAX_CONCURRENT_OPERATIONS= 5; - command_timeout = 30; - } - }; + typedef void (*resultHandler)(Result*); - Client(Network* network, const int MAX_MQTT_PACKET_SIZE = 100, const int command_timeout = 30); - + Client(Network* network, const Limits limits = Limits()); + int connect(MQTTPacket_connectData* options = 0, resultHandler fn = 0); template<class T> @@ -104,48 +104,48 @@ private: int cycle(int timeout); + int waitfor(int packet_type, Timer& atimer); int keepalive(); + int findFreeOperation(); int decodePacket(int* value, int timeout); - int readPacket(int timeout = -1); - int sendPacket(int length, int timeout = -1); + int readPacket(int timeout); + int sendPacket(int length, int timeout); int deliverMessage(MQTTString* topic, Message* message); Thread* thread; Network* ipstack; - Timer command_timer, ping_timer; + + Limits limits; - char buf[]; - int buflen; - + char* buf; char* readbuf; - int readbuflen; + Timer ping_timer, connect_timer; unsigned int keepAliveInterval; bool ping_outstanding; - int command_timeout; // max time to wait for any MQTT command to complete, in seconds PacketId packetid; typedef FP<void, Result*> resultHandlerFP; resultHandlerFP connectHandler; - #define MAX_MESSAGE_HANDLERS 5 typedef FP<void, Message*> messageHandlerFP; - struct + struct MessageHandlers { - char* topic; + const char* topic; messageHandlerFP fp; - } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are linked to a subscription topic + } *messageHandlers; // Message handlers are indexed by subscription topic // how many concurrent operations should we allow? Each one will require a function pointer - struct + struct Operations { unsigned short id; resultHandlerFP fp; - MQTTString* topic; // if this is a publish, store topic name in case republishing is required - Message* message; // for publish, - } *operations; // result handlers are indexed by packet ids + const char* topic; // if this is a publish, store topic name in case republishing is required + Message* message; // for publish, + Timer timer; // to check if the command has timed out + } *operations; // result handlers are indexed by packet ids static void threadfn(void* arg); @@ -154,40 +154,44 @@ } -template<class Network, class Timer, class Thread> void MQTT::Client<Network, Timer, Thread>::threadfn(void* arg) +template<class Network, class Timer, class Thread, class Mutex> void MQTT::Client<Network, Timer, Thread, Mutex>::threadfn(void* arg) { - ((Client<Network, Timer, Thread>*) arg)->run(NULL); + ((Client<Network, Timer, Thread, Mutex>*) arg)->run(NULL); } -template<class Network, class Timer, class Thread> MQTT::Client<Network, Timer, Thread>::Client(Network* network, const int MAX_MQTT_PACKET_SIZE, const int command_timeout) : packetid() +template<class Network, class Timer, class Thread, class Mutex> MQTT::Client<Network, Timer, Thread, Mutex>::Client(Network* network, Limits limits) : limits(limits), packetid() { - //buf = new char[MAX_MQTT_PACKET_SIZE]; - readbuf = new char[MAX_MQTT_PACKET_SIZE]; - buflen = readbuflen = MAX_MQTT_PACKET_SIZE; - - this->command_timeout = command_timeout; - this->thread = 0; - this->ipstack = network; - this->command_timer = Timer(); - this->ping_timer = Timer(); - this->ping_outstanding = 0; + this->thread = 0; + this->ipstack = network; + this->ping_timer = Timer(); + this->ping_outstanding = 0; + + // How to make these memory allocations portable? I was hoping to avoid the heap + buf = new char[limits.MAX_MQTT_PACKET_SIZE]; + readbuf = new char[limits.MAX_MQTT_PACKET_SIZE]; + this->operations = new struct Operations[limits.MAX_CONCURRENT_OPERATIONS]; + for (int i = 0; i < limits.MAX_CONCURRENT_OPERATIONS; ++i) + operations[i].id = 0; + this->messageHandlers = new struct MessageHandlers[limits.MAX_MESSAGE_HANDLERS]; + for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i) + messageHandlers[i].topic = 0; } -template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::sendPacket(int length, int timeout) +template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::sendPacket(int length, int timeout) { int sent = 0; while (sent < length) - sent += ipstack->write(&buf[sent], length, -1); + sent += ipstack->write(&buf[sent], length, timeout); if (sent == length) - ping_timer.reset(); // record the fact that we have successfully sent the packet + ping_timer.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet return sent; } -template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::decodePacket(int* value, int timeout) +template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::decodePacket(int* value, int timeout) { char c; int multiplier = 1; @@ -221,7 +225,7 @@ * @param timeout the max time to wait for the packet read to complete, in milliseconds * @return the MQTT packet type, or -1 if none */ -template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::readPacket(int timeout) +template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::readPacket(int timeout) { int rc = -1; MQTTHeader header = {0}; @@ -248,12 +252,26 @@ } -template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::deliverMessage(MQTTString* topic, Message* message) +template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::deliverMessage(MQTTString* topic, Message* message) { + int rc = -1; + + // we have to find the right message handler - indexed by topic + for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i) + { + if (messageHandlers[i].topic && MQTTPacket_equals(topic, (char*)messageHandlers[i].topic)) + { + messageHandlers[i].fp(message); + rc = 0; + break; + } + } + + return rc; } -template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::cycle(int timeout) +template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::cycle(int timeout) { /* get one piece of work off the wire and one pass through */ @@ -269,29 +287,32 @@ if (this->thread) { Result res = {this, 0}; - if (MQTTDeserialize_connack(&res.connack_rc, readbuf, readbuflen) == 1) + if (MQTTDeserialize_connack(&res.connack_rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) ; connectHandler(&res); connectHandler.detach(); // only invoke the callback once } + break; case PUBACK: + if (this->thread) + ; //call resultHandler case SUBACK: break; case PUBLISH: MQTTString topicName; Message msg; rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName, - (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, readbuflen); + (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, limits.MAX_MQTT_PACKET_SIZE); if (msg.qos == QOS0) deliverMessage(&topicName, &msg); break; case PUBREC: int type, dup, mypacketid; - if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1) + if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) ; // must lock this access against the application thread, if we are multi-threaded - len = MQTTSerialize_ack(buf, buflen, PUBREL, 0, mypacketid); - rc = sendPacket(len); // send the subscribe packet + len = MQTTSerialize_ack(buf, limits.MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid); + rc = sendPacket(len, timeout); // send the PUBREL packet if (rc != len) goto exit; // there was a problem @@ -308,21 +329,21 @@ } -template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::keepalive() +template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::keepalive() { int rc = 0; if (keepAliveInterval == 0) goto exit; - if (ping_timer.read_ms() >= (keepAliveInterval * 1000)) + if (ping_timer.expired()) { if (ping_outstanding) rc = -1; else { - int len = MQTTSerialize_pingreq(buf, buflen); - rc = sendPacket(len); // send the connect packet + int len = MQTTSerialize_pingreq(buf, limits.MAX_MQTT_PACKET_SIZE); + rc = sendPacket(len, 1000); // send the ping packet if (rc != len) rc = -1; // indicate there's a problem else @@ -335,40 +356,53 @@ } -template<class Network, class Timer, class Thread> void MQTT::Client<Network, Timer, Thread>::run(void const *argument) +template<class Network, class Timer, class Thread, class Mutex> void MQTT::Client<Network, Timer, Thread, Mutex>::run(void const *argument) { while (true) - cycle((keepAliveInterval * 1000) - ping_timer.read_ms()); + cycle(ping_timer.left_ms()); } -template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::connect(MQTTPacket_connectData* options, resultHandler resultHandler) +// only used in single-threaded mode where one command at a time is in process +template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::waitfor(int packet_type, Timer& atimer) { - command_timer.start(); + int rc = -1; + + do + { + if (atimer.expired()) + break; // we timed out + } + while ((rc = cycle(atimer.left_ms())) != packet_type); + + return rc; +} + + +template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::connect(MQTTPacket_connectData* options, resultHandler resultHandler) +{ + connect_timer.countdown(limits.command_timeout); MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; if (options == 0) options = &default_options; // set default options if none were supplied this->keepAliveInterval = options->keepAliveInterval; - ping_timer.start(); - int len = MQTTSerialize_connect(buf, buflen, options); - int rc = sendPacket(len); // send the connect packet + ping_timer.countdown(this->keepAliveInterval); + int len = MQTTSerialize_connect(buf, limits.MAX_MQTT_PACKET_SIZE, options); + int rc = sendPacket(len, connect_timer.left_ms()); // send the connect packet if (rc != len) goto exit; // there was a problem if (resultHandler == 0) // wait until the connack is received { // this will be a blocking call, wait for the connack - do - { - if (command_timer.read_ms() > (command_timeout * 1000)) - goto exit; // we timed out - } - while (cycle(command_timeout - command_timer.read_ms()) != CONNACK); - int connack_rc = -1; - if (MQTTDeserialize_connack(&connack_rc, readbuf, readbuflen) == 1) - rc = connack_rc; + if (waitfor(CONNACK, connect_timer) == CONNACK) + { + int connack_rc = -1; + if (MQTTDeserialize_connack(&connack_rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) + rc = connack_rc; + } } else { @@ -376,24 +410,41 @@ connectHandler.attach(resultHandler); // start background thread - this->thread = new Thread((void (*)(void const *argument))&MQTT::Client<Network, Timer, Thread>::threadfn, (void*)this); + this->thread = new Thread((void (*)(void const *argument))&MQTT::Client<Network, Timer, Thread, Mutex>::threadfn, (void*)this); } exit: - command_timer.stop(); - command_timer.reset(); return rc; } -template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler, resultHandler resultHandler) +template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::findFreeOperation() { - command_timer.start(); + int found = -1; + for (int i = 0; i < limits.MAX_CONCURRENT_OPERATIONS; ++i) + { + if (operations[i].id == 0) + { + found = i; + break; + } + } + return found; +} + +template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler, resultHandler resultHandler) +{ + int index = 0; + if (this->thread) + index = findFreeOperation(); + Timer& atimer = operations[index].timer; + + atimer.countdown(limits.command_timeout); MQTTString topic = {(char*)topicFilter, 0, 0}; - int len = MQTTSerialize_subscribe(buf, buflen, 0, packetid.getNext(), 1, &topic, (int*)&qos); - int rc = sendPacket(len); // send the subscribe packet + int len = MQTTSerialize_subscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); + int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet if (rc != len) goto exit; // there was a problem @@ -401,11 +452,24 @@ if (resultHandler == 0) { // this will block - if (cycle(command_timeout - command_timer.read_ms()) == SUBACK) + if (waitfor(SUBACK, atimer) == SUBACK) { int count = 0, grantedQoS = -1, mypacketid; - if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, readbuflen) == 1) + if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) rc = grantedQoS; // 0, 1, 2 or 0x80 + if (rc != 0x80) + { + for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i) + { + if (messageHandlers[i].topic == 0) + { + messageHandlers[i].topic = topicFilter; + messageHandlers[i].fp.attach(messageHandler); + rc = 0; + break; + } + } + } } } else @@ -415,28 +479,29 @@ } exit: - command_timer.stop(); - command_timer.reset(); return rc; } -template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::unsubscribe(const char* topicFilter, resultHandler resultHandler) +template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::unsubscribe(const char* topicFilter, resultHandler resultHandler) { - command_timer.start(); + int index = 0; + if (this->thread) + index = findFreeOperation(); + Timer& atimer = operations[index].timer; MQTTString topic = {(char*)topicFilter, 0, 0}; int len = MQTTSerialize_unsubscribe(buf, buflen, 0, packetid.getNext(), 1, &topic); - int rc = sendPacket(len); // send the subscribe packet + int rc = sendPacket(len, atimer); // send the subscribe packet if (rc != len) goto exit; // there was a problem - /* wait for suback */ + /* wait for unsuback */ if (resultHandler == 0) { // this will block - if (cycle(command_timeout - command_timer.read_ms()) == UNSUBACK) + if (waitfor(UNSUBACK) == UNSUBACK) { int mypacketid; if (MQTTDeserialize_unsuback(&mypacketid, readbuf, readbuflen) == 1) @@ -457,7 +522,7 @@ -template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::publish(const char* topicName, Message* message, resultHandler resultHandler) +template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::publish(const char* topicName, Message* message, resultHandler resultHandler) { command_timer.start(); @@ -475,7 +540,7 @@ { if (message->qos == QOS1) { - if (cycle(command_timeout - command_timer.read_ms()) == PUBACK) + if (waitfor(PUBACK) == PUBACK) { int type, dup, mypacketid; if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1) @@ -484,7 +549,7 @@ } else if (message->qos == QOS2) { - if (cycle(command_timeout - command_timer.read_ms()) == PUBCOMP) + if (waitfor(PUBCOMP) == PUBCOMP) { int type, dup, mypacketid; if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)