MQTT
Diff: MQTTClient.h
- Revision:
- 15:64a57183aa03
- Parent:
- 13:fd82db992024
- Child:
- 16:91c2f9a144d4
diff -r fd82db992024 -r 64a57183aa03 MQTTClient.h --- a/MQTTClient.h Fri Apr 11 22:46:37 2014 +0100 +++ b/MQTTClient.h Sun Apr 13 22:32:28 2014 +0000 @@ -45,7 +45,7 @@ public: PacketId(); - int getNext(); + int getNext(); private: static const int MAX_PACKET_ID = 65535; @@ -57,16 +57,32 @@ template<class Network, class Timer, class Thread> class Client { -public: - +public: + struct Result { /* success or failure result data */ - Client<Network, Timer, Thread>* client; + Client<Network, Timer, Thread>* client; int connack_rc; - }; - + }; + typedef void (*resultHandler)(Result*); + + struct limits + { + int MAX_MQTT_PACKET_SIZE; // + int MAX_MESSAGE_HANDLERS; // 5 - each subscription requires a message handler + int MAX_CONCURRENT_OPERATIONS; // each command which runs concurrently can have a result handler, when we are in multi-threaded mode + int command_timeout; + + limits() + { + MAX_MQTT_PACKET_SIZE = 100; + MAX_MESSAGE_HANDLERS = 5; + MAX_CONCURRENT_OPERATIONS= 5; + command_timeout = 30; + } + }; Client(Network* network, const int MAX_MQTT_PACKET_SIZE = 100, const int command_timeout = 30); @@ -87,61 +103,74 @@ private: - int cycle(int timeout); + int cycle(int timeout); int keepalive(); int decodePacket(int* value, int timeout); int readPacket(int timeout = -1); int sendPacket(int length, int timeout = -1); + int deliverMessage(MQTTString* topic, Message* message); Thread* thread; Network* ipstack; Timer command_timer, ping_timer; - char* buf; + char buf[]; int buflen; char* readbuf; - int readbuflen; - - unsigned int keepAliveInterval; + int readbuflen; + + unsigned int keepAliveInterval; bool ping_outstanding; int command_timeout; // max time to wait for any MQTT command to complete, in seconds PacketId packetid; typedef FP<void, Result*> resultHandlerFP; - // how many concurrent operations should we allow? Each one will require a function pointer resultHandlerFP connectHandler; #define MAX_MESSAGE_HANDLERS 5 typedef FP<void, Message*> messageHandlerFP; - messageHandlerFP messageHandlers[MAX_MESSAGE_HANDLERS]; // Linked list, or constructor parameter to limit array size? - + struct + { + char* topic; + messageHandlerFP fp; + } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are linked to a subscription topic + + // how many concurrent operations should we allow? Each one will require a function pointer + struct + { + unsigned short id; + resultHandlerFP fp; + MQTTString* topic; // if this is a publish, store topic name in case republishing is required + Message* message; // for publish, + } *operations; // result handlers are indexed by packet ids + static void threadfn(void* arg); }; -} - - -template<class Network, class Timer, class Thread> void MQTT::Client<Network, Timer, Thread>::threadfn(void* arg) -{ - ((Client<Network, Timer, Thread>*) arg)->run(NULL); +} + + +template<class Network, class Timer, class Thread> void MQTT::Client<Network, Timer, Thread>::threadfn(void* arg) +{ + ((Client<Network, Timer, Thread>*) arg)->run(NULL); } template<class Network, class Timer, class Thread> MQTT::Client<Network, Timer, Thread>::Client(Network* network, const int MAX_MQTT_PACKET_SIZE, const int command_timeout) : packetid() { - - buf = new char[MAX_MQTT_PACKET_SIZE]; + //buf = new char[MAX_MQTT_PACKET_SIZE]; readbuf = new char[MAX_MQTT_PACKET_SIZE]; buflen = readbuflen = MAX_MQTT_PACKET_SIZE; + this->command_timeout = command_timeout; this->thread = 0; this->ipstack = network; - this->command_timer = Timer(); - this->ping_timer = Timer(); + this->command_timer = Timer(); + this->ping_timer = Timer(); this->ping_outstanding = 0; } @@ -151,7 +180,7 @@ int sent = 0; while (sent < length) - sent += ipstack->write(&buf[sent], length, -1); + sent += ipstack->write(&buf[sent], length, -1); if (sent == length) ping_timer.reset(); // record the fact that we have successfully sent the packet return sent; @@ -163,7 +192,7 @@ char c; int multiplier = 1; int len = 0; -#define MAX_NO_OF_REMAINING_LENGTH_BYTES 4 + const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; *value = 0; do @@ -219,6 +248,11 @@ } +template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::deliverMessage(MQTTString* topic, Message* message) +{ +} + + template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::cycle(int timeout) { /* get one piece of work off the wire and one pass through */ @@ -227,116 +261,114 @@ int packet_type = readPacket(timeout); printf("packet type %d\n", packet_type); - + int len, rc; switch (packet_type) { - case CONNACK: - if (this->thread) - { - Result res = {this, 0}; - int connack_rc = -1; + case CONNACK: + if (this->thread) + { + Result res = {this, 0}; if (MQTTDeserialize_connack(&res.connack_rc, readbuf, readbuflen) == 1) - ; - connectHandler(&res); - } + ; + connectHandler(&res); + connectHandler.detach(); // only invoke the callback once + } case PUBACK: case SUBACK: break; - case PUBLISH: - MQTTString topicName; - Message msg; - rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName, - (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, readbuflen); - if (msg.qos == QOS0) - messageHandlers[0](&msg); + case PUBLISH: + MQTTString topicName; + Message msg; + rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName, + (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, readbuflen); + if (msg.qos == QOS0) + deliverMessage(&topicName, &msg); break; - case PUBREC: + case PUBREC: int type, dup, mypacketid; if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1) - ; + ; + // must lock this access against the application thread, if we are multi-threaded len = MQTTSerialize_ack(buf, buflen, PUBREL, 0, mypacketid); - rc = sendPacket(len); // send the subscribe packet - if (rc != len) - goto exit; // there was a problem + rc = sendPacket(len); // send the subscribe packet + if (rc != len) + goto exit; // there was a problem break; case PUBCOMP: break; - case PINGRESP: - if (ping_outstanding) - ping_outstanding = false; - //else disconnect(); + case PINGRESP: + ping_outstanding = false; break; - case -1: - break; - } - keepalive(); + } + keepalive(); exit: return packet_type; -} - - +} + + template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::keepalive() -{ - int rc = 0; - - if (keepAliveInterval == 0) - goto exit; - - if (ping_timer.read_ms() >= (keepAliveInterval * 1000)) - { - if (ping_outstanding) - rc = -1; - else - { - int len = MQTTSerialize_pingreq(buf, buflen); - rc = sendPacket(len); // send the connect packet - if (rc != len) - rc = -1; // indicate there's a problem - else - ping_outstanding = true; - } - } - -exit: +{ + int rc = 0; + + if (keepAliveInterval == 0) + goto exit; + + if (ping_timer.read_ms() >= (keepAliveInterval * 1000)) + { + if (ping_outstanding) + rc = -1; + else + { + int len = MQTTSerialize_pingreq(buf, buflen); + rc = sendPacket(len); // send the connect packet + if (rc != len) + rc = -1; // indicate there's a problem + else + ping_outstanding = true; + } + } + +exit: return rc; } template<class Network, class Timer, class Thread> void MQTT::Client<Network, Timer, Thread>::run(void const *argument) -{ - while (true) +{ + while (true) cycle((keepAliveInterval * 1000) - ping_timer.read_ms()); } template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::connect(MQTTPacket_connectData* options, resultHandler resultHandler) -{ - command_timer.start(); +{ + command_timer.start(); MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; if (options == 0) options = &default_options; // set default options if none were supplied - this->keepAliveInterval = options->keepAliveInterval; + this->keepAliveInterval = options->keepAliveInterval; ping_timer.start(); int len = MQTTSerialize_connect(buf, buflen, options); - int rc = sendPacket(len); // send the connect packet - if (rc != len) + int rc = sendPacket(len); // send the connect packet + if (rc != len) goto exit; // there was a problem if (resultHandler == 0) // wait until the connack is received - { - if (command_timer.read_ms() > (command_timeout * 1000)) - goto exit; // we timed out + { // this will be a blocking call, wait for the connack - if (cycle(command_timeout - command_timer.read_ms()) == CONNACK) + do { - int connack_rc = -1; - if (MQTTDeserialize_connack(&connack_rc, readbuf, readbuflen) == 1) - rc = connack_rc; - } + if (command_timer.read_ms() > (command_timeout * 1000)) + goto exit; // we timed out + } + while (cycle(command_timeout - command_timer.read_ms()) != CONNACK); + int connack_rc = -1; + if (MQTTDeserialize_connack(&connack_rc, readbuf, readbuflen) == 1) + rc = connack_rc; } else { @@ -346,23 +378,23 @@ // start background thread this->thread = new Thread((void (*)(void const *argument))&MQTT::Client<Network, Timer, Thread>::threadfn, (void*)this); } - -exit: - command_timer.stop(); + +exit: + command_timer.stop(); command_timer.reset(); return rc; } template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler, resultHandler resultHandler) -{ - command_timer.start(); +{ + command_timer.start(); MQTTString topic = {(char*)topicFilter, 0, 0}; int len = MQTTSerialize_subscribe(buf, buflen, 0, packetid.getNext(), 1, &topic, (int*)&qos); - int rc = sendPacket(len); // send the subscribe packet - if (rc != len) + int rc = sendPacket(len); // send the subscribe packet + if (rc != len) goto exit; // there was a problem /* wait for suback */ @@ -381,23 +413,23 @@ // set subscribe response callback function } - -exit: - command_timer.stop(); + +exit: + command_timer.stop(); command_timer.reset(); return rc; -} - - +} + + template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::unsubscribe(const char* topicFilter, resultHandler resultHandler) -{ - command_timer.start(); +{ + command_timer.start(); MQTTString topic = {(char*)topicFilter, 0, 0}; int len = MQTTSerialize_unsubscribe(buf, buflen, 0, packetid.getNext(), 1, &topic); - int rc = sendPacket(len); // send the subscribe packet - if (rc != len) + int rc = sendPacket(len); // send the subscribe packet + if (rc != len) goto exit; // there was a problem /* wait for suback */ @@ -416,57 +448,49 @@ // set unsubscribe response callback function } - -exit: - command_timer.stop(); + +exit: + command_timer.stop(); command_timer.reset(); return rc; -} - - - +} + + + template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::publish(const char* topicName, Message* message, resultHandler resultHandler) -{ - command_timer.start(); +{ + command_timer.start(); - MQTTString topic = {(char*)topicName, 0, 0}; - + MQTTString topic = {(char*)topicName, 0, 0}; + message->id = packetid.getNext(); - + int len = MQTTSerialize_publish(buf, buflen, 0, message->qos, message->retained, message->id, topic, message->payload, message->payloadlen); - int rc = sendPacket(len); // send the subscribe packet - if (rc != len) + int rc = sendPacket(len); // send the subscribe packet + if (rc != len) goto exit; // there was a problem /* wait for acks */ if (resultHandler == 0) { - if (message->qos == QOS1) + if (message->qos == QOS1) { if (cycle(command_timeout - command_timer.read_ms()) == PUBACK) { int type, dup, mypacketid; if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1) rc = 0; - } - } - else if (message->qos == QOS2) - { - if (cycle(command_timeout - command_timer.read_ms()) == PUBREC) - { - int type, dup, mypacketid; - if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1) - rc = 0; - len = MQTTSerialize_ack(buf, buflen, PUBREL, 0, message->id); - rc = sendPacket(len); // send the subscribe packet - if (rc != len) - goto exit; // there was a problem - if (cycle(command_timeout - command_timer.read_ms()) == PUBCOMP) - { - if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1) - rc = 0; - } - } + } + } + else if (message->qos == QOS2) + { + if (cycle(command_timeout - command_timer.read_ms()) == PUBCOMP) + { + int type, dup, mypacketid; + if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1) + rc = 0; + } + } } else @@ -474,9 +498,9 @@ // set publish response callback function } - -exit: - command_timer.stop(); + +exit: + command_timer.stop(); command_timer.reset(); return rc; }