MQtt to publish the data
Fork of MQTT by
Diff: MQTTClient.h
- Revision:
- 23:05fc7de97d4a
- Parent:
- 22:aadb79d29330
- Child:
- 25:d13a6c558164
- Child:
- 26:2658bb87c53d
--- a/MQTTClient.h Wed Apr 30 13:03:45 2014 +0000 +++ b/MQTTClient.h Tue May 06 09:44:23 2014 +0000 @@ -19,8 +19,13 @@ TODO: log messages - use macros + define return code constants + call connectionLost at appropriate points - in sendPacket and readPacket + + match wildcard topics + */ #if !defined(MQTTCLIENT_H) @@ -36,6 +41,8 @@ enum QoS { QOS0, QOS1, QOS2 }; +enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 }; + struct Message { @@ -58,50 +65,36 @@ class PacketId { public: - PacketId(); + PacketId() + { + next = 0; + } - int getNext(); + int getNext() + { + return next = (next == MAX_PACKET_ID) ? 1 : ++next; + } private: static const int MAX_PACKET_ID = 65535; int next; }; - -typedef void (*messageHandler)(Message*); - -typedef struct limits -{ - int MAX_MQTT_PACKET_SIZE; // - int MAX_MESSAGE_HANDLERS; // each subscription requires a message handler - long command_timeout_ms; - - limits() - { - MAX_MQTT_PACKET_SIZE = 100; - MAX_MESSAGE_HANDLERS = 5; - command_timeout_ms = 30000; - } -} Limits; /** * @class Client * @brief blocking, non-threaded MQTT client API + * + * This version of the API blocks on all method calls, until they are complete. This means that only one + * MQTT request can be in process at any one time. * @param Network a network class which supports send, receive * @param Timer a timer class with the methods: */ -template<class Network, class Timer> class Client +template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5> class Client { public: - /** Construct the client - * @param network - pointer to an instance of the Network class - must be connected to the endpoint - * before calling MQTT connect - * @param limits an instance of the Limit class - to alter limits as required - */ - Client(Network* network, const Limits limits = Limits()); - typedef struct { Client* client; @@ -109,6 +102,14 @@ } connectionLostInfo; typedef int (*connectionLostHandlers)(connectionLostInfo*); + typedef void (*messageHandler)(Message*); + + /** Construct the client + * @param network - pointer to an instance of the Network class - must be connected to the endpoint + * before calling MQTT connect + * @param limits an instance of the Limit class - to alter limits as required + */ + Client(Network& network, unsigned int command_timeout_ms = 30000); /** Set the connection lost callback - called whenever the connection is lost and we should be connected * @param clh - pointer to the callback function @@ -162,8 +163,9 @@ /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive * yield can be called if no other MQTT operation is needed. This will also allow messages to be * received. + * @param timeout_ms the time to wait, in milliseconds */ - void yield(int timeout); + void yield(int timeout_ms = 1000); private: @@ -176,12 +178,11 @@ int sendPacket(int length, Timer& timer); int deliverMessage(MQTTString* topic, Message* message); - Network* ipstack; + Network& ipstack; + unsigned int command_timeout_ms; - Limits limits; - - char* buf; - char* readbuf; + char buf[MAX_MQTT_PACKET_SIZE]; + char readbuf[MAX_MQTT_PACKET_SIZE]; Timer ping_timer; unsigned int keepAliveInterval; @@ -194,12 +195,11 @@ { const char* topic; messageHandlerFP fp; - } *messageHandlers; // Message handlers are indexed by subscription topic + } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic messageHandlerFP defaultMessageHandler; typedef FP<int, connectionLostInfo*> connectionLostFP; - connectionLostFP connectionLostHandler; }; @@ -207,34 +207,46 @@ } -template<class Network, class Timer> MQTT::Client<Network, Timer>::Client(Network* network, Limits limits) : limits(limits), packetid() +template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> +MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms) : ipstack(network), packetid() { - this->ipstack = network; - this->ping_timer = Timer(); - this->ping_outstanding = 0; - - // How to make these memory allocations portable? I was hoping to avoid the heap - buf = new char[limits.MAX_MQTT_PACKET_SIZE]; - readbuf = new char[limits.MAX_MQTT_PACKET_SIZE]; - this->messageHandlers = new struct MessageHandlers[limits.MAX_MESSAGE_HANDLERS]; - for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i) + ping_timer = Timer(); + ping_outstanding = 0; + for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) messageHandlers[i].topic = 0; + this->command_timeout_ms = command_timeout_ms; } -template<class Network, class Timer> int MQTT::Client<Network, Timer>::sendPacket(int length, Timer& timer) +template<class Network, class Timer, int a, int b> +int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer) { - int sent = 0; + int rc = FAILURE, + sent = 0; - while (sent < length) - sent += ipstack->write(&buf[sent], length, timer.left_ms()); + while (sent < length && !timer.expired()) + { + rc = ipstack.write(&buf[sent], length, timer.left_ms()); + if (rc == -1) + { + connectionLostInfo info = {this, &ipstack}; + connectionLostHandler(&info); + } + else + sent += rc; + } if (sent == length) + { ping_timer.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet - return sent; + rc = SUCCESS; + } + else + rc = FAILURE; + return rc; } -template<class Network, class Timer> int MQTT::Client<Network, Timer>::decodePacket(int* value, int timeout) +template<class Network, class Timer, int a, int b> int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout) { char c; int multiplier = 1; @@ -251,7 +263,7 @@ rc = MQTTPACKET_READ_ERROR; /* bad data */ goto exit; } - rc = ipstack->read(&c, 1, timeout); + rc = ipstack.read(&c, 1, timeout); if (rc != 1) goto exit; *value += (c & 127) * multiplier; @@ -268,7 +280,8 @@ * @param timeout the max time to wait for the packet read to complete, in milliseconds * @return the MQTT packet type, or -1 if none */ -template<class Network, class Timer> int MQTT::Client<Network, Timer>::readPacket(Timer& timer) +template<class Network, class Timer, int a, int b> +int MQTT::Client<Network, Timer, a, b>::readPacket(Timer& timer) { int rc = -1; MQTTHeader header = {0}; @@ -276,7 +289,7 @@ int rem_len = 0; /* 1. read the header byte. This has the packet type in it */ - if (ipstack->read(readbuf, 1, timer.left_ms()) != 1) + if (ipstack.read(readbuf, 1, timer.left_ms()) != 1) goto exit; len = 1; @@ -285,7 +298,7 @@ len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */ /* 3. read the rest of the buffer using a callback to supply the rest of the data */ - if (ipstack->read(readbuf + len, rem_len, timer.left_ms()) != rem_len) + if (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len) goto exit; header.byte = readbuf[0]; @@ -295,12 +308,13 @@ } -template<class Network, class Timer> int MQTT::Client<Network, Timer>::deliverMessage(MQTTString* topic, Message* message) +template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> +int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString* topic, Message* message) { int rc = -1; // we have to find the right message handler - indexed by topic - for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i) + for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) { if (messageHandlers[i].topic != 0 && MQTTPacket_equals(topic, (char*)messageHandlers[i].topic)) { @@ -317,17 +331,19 @@ -template<class Network, class Timer> void MQTT::Client<Network, Timer>::yield(int timeout) +template<class Network, class Timer, int a, int b> +void MQTT::Client<Network, Timer, a, b>::yield(int timeout_ms) { Timer timer = Timer(); - timer.countdown_ms(timeout); + timer.countdown_ms(timeout_ms); while (!timer.expired()) cycle(timer); } -template<class Network, class Timer> int MQTT::Client<Network, Timer>::cycle(Timer& timer) +template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> +int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer) { /* get one piece of work off the wire and one pass through */ @@ -345,26 +361,24 @@ MQTTString topicName; Message msg; rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName, - (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, limits.MAX_MQTT_PACKET_SIZE);; + (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE);; deliverMessage(&topicName, &msg); if (msg.qos != QOS0) { if (msg.qos == QOS1) - len = MQTTSerialize_ack(buf, limits.MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id); + len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id); else if (msg.qos == QOS2) - len = MQTTSerialize_ack(buf, limits.MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id); - rc = sendPacket(len, timer); - if (rc != len) + len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id); + if ((rc = sendPacket(len, timer)) != SUCCESS) goto exit; // there was a problem } break; case PUBREC: int type, dup, mypacketid; - if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) + if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1) ; - len = MQTTSerialize_ack(buf, limits.MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid); - rc = sendPacket(len, timer); // send the PUBREL packet - if (rc != len) + len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid); + if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet goto exit; // there was a problem break; @@ -380,7 +394,8 @@ } -template<class Network, class Timer> int MQTT::Client<Network, Timer>::keepalive() +template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> +int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive() { int rc = 0; @@ -394,9 +409,9 @@ else { Timer timer = Timer(1000); - int len = MQTTSerialize_pingreq(buf, limits.MAX_MQTT_PACKET_SIZE); + int len = MQTTSerialize_pingreq(buf, MAX_MQTT_PACKET_SIZE); rc = sendPacket(len, timer); // send the ping packet - if (rc != len) + if (rc != SUCCESS) rc = -1; // indicate there's a problem else ping_outstanding = true; @@ -409,7 +424,8 @@ // only used in single-threaded mode where one command at a time is in process -template<class Network, class Timer> int MQTT::Client<Network, Timer>::waitfor(int packet_type, Timer& timer) +template<class Network, class Timer, int a, int b> +int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer) { int rc = -1; @@ -424,9 +440,10 @@ } -template<class Network, class Timer> int MQTT::Client<Network, Timer>::connect(MQTTPacket_connectData* options) +template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> +int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData* options) { - Timer connect_timer = Timer(limits.command_timeout_ms); + Timer connect_timer = Timer(command_timeout_ms); MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; if (options == 0) @@ -434,16 +451,16 @@ this->keepAliveInterval = options->keepAliveInterval; ping_timer.countdown(this->keepAliveInterval); - int len = MQTTSerialize_connect(buf, limits.MAX_MQTT_PACKET_SIZE, options); + int len = MQTTSerialize_connect(buf, MAX_MQTT_PACKET_SIZE, options); int rc = sendPacket(len, connect_timer); // send the connect packet - if (rc != len) + if (rc != SUCCESS) goto exit; // there was a problem // this will be a blocking call, wait for the connack if (waitfor(CONNACK, connect_timer) == CONNACK) { int connack_rc = -1; - if (MQTTDeserialize_connack(&connack_rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) + if (MQTTDeserialize_connack(&connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1) rc = connack_rc; } @@ -452,28 +469,29 @@ } -template<class Network, class Timer> int MQTT::Client<Network, Timer>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler) +template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> +int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler) { int len = -1; - Timer timer = Timer(limits.command_timeout_ms); + Timer timer = Timer(command_timeout_ms); MQTTString topic = {(char*)topicFilter, 0, 0}; - int rc = MQTTSerialize_subscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); + int rc = MQTTSerialize_subscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); if (rc <= 0) goto exit; len = rc; - if ((rc = sendPacket(len, timer)) != len) // send the subscribe packet + if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet goto exit; // there was a problem if (waitfor(SUBACK, timer) == SUBACK) // wait for suback { int count = 0, grantedQoS = -1, mypacketid; - if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) + if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) rc = grantedQoS; // 0, 1, 2 or 0x80 if (rc != 0x80) { - for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i) + for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) { if (messageHandlers[i].topic == 0) { @@ -491,24 +509,25 @@ } -template<class Network, class Timer> int MQTT::Client<Network, Timer>::unsubscribe(const char* topicFilter) +template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> +int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter) { int len = -1; - Timer timer = Timer(limits.command_timeout_ms); + Timer timer = Timer(command_timeout_ms); MQTTString topic = {(char*)topicFilter, 0, 0}; - int rc = MQTTSerialize_unsubscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic); + int rc = MQTTSerialize_unsubscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic); if (rc <= 0) goto exit; len = rc; - if ((rc = sendPacket(len, timer)) != len) // send the subscribe packet + if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet goto exit; // there was a problem if (waitfor(UNSUBACK, timer) == UNSUBACK) { int mypacketid; // should be the same as the packetid above - if (MQTTDeserialize_unsuback(&mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) + if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1) rc = 0; } @@ -518,19 +537,20 @@ -template<class Network, class Timer> int MQTT::Client<Network, Timer>::publish(const char* topicName, Message* message) +template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> +int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message* message) { - Timer timer = Timer(limits.command_timeout_ms); + Timer timer = Timer(command_timeout_ms); MQTTString topicString = {(char*)topicName, 0, 0}; if (message->qos == QOS1 || message->qos == QOS2) message->id = packetid.getNext(); - int len = MQTTSerialize_publish(buf, limits.MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id, + int len = MQTTSerialize_publish(buf, MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id, topicString, (char*)message->payload, message->payloadlen); int rc = sendPacket(len, timer); // send the subscribe packet - if (rc != len) + if (rc != SUCCESS) goto exit; // there was a problem if (message->qos == QOS1) @@ -538,7 +558,7 @@ if (waitfor(PUBACK, timer) == PUBACK) { int type, dup, mypacketid; - if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) + if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1) rc = 0; } } @@ -547,7 +567,7 @@ if (waitfor(PUBCOMP, timer) == PUBCOMP) { int type, dup, mypacketid; - if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) + if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1) rc = 0; } } @@ -557,13 +577,14 @@ } -template<class Network, class Timer> int MQTT::Client<Network, Timer>::disconnect() +template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> +int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect() { - Timer timer = Timer(limits.command_timeout_ms); // we might wait for incomplete incoming publishes to complete - int len = MQTTSerialize_disconnect(buf, limits.MAX_MQTT_PACKET_SIZE); + Timer timer = Timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete + int len = MQTTSerialize_disconnect(buf, MAX_MQTT_PACKET_SIZE); int rc = sendPacket(len, timer); // send the disconnect packet - return (rc == len) ? 0 : -1; + return rc; }