MQTT
Diff: MQTTClient.h
- Revision:
- 26:2658bb87c53d
- Parent:
- 23:05fc7de97d4a
- Child:
- 28:8b2abe9bd814
diff -r 05fc7de97d4a -r 2658bb87c53d MQTTClient.h --- a/MQTTClient.h Tue May 06 09:44:23 2014 +0000 +++ b/MQTTClient.h Sun May 11 18:19:07 2014 +0000 @@ -18,13 +18,7 @@ TODO: - log messages - use macros - - define return code constants - - call connectionLost at appropriate points - in sendPacket and readPacket - - match wildcard topics + ensure publish packets are retried on reconnect */ @@ -94,14 +88,7 @@ { public: - - typedef struct - { - Client* client; - Network* network; - } connectionLostInfo; - - typedef int (*connectionLostHandlers)(connectionLostInfo*); + typedef void (*messageHandler)(Message*); /** Construct the client @@ -111,14 +98,6 @@ */ Client(Network& network, unsigned int command_timeout_ms = 30000); - /** Set the connection lost callback - called whenever the connection is lost and we should be connected - * @param clh - pointer to the callback function - */ - void setConnectionLostHandler(connectionLostHandlers clh) - { - connectionLostHandler.attach(clh); - } - /** Set the default message handling callback - used for any message which does not match a subscription message handler * @param mh - pointer to the callback function */ @@ -164,8 +143,9 @@ * yield can be called if no other MQTT operation is needed. This will also allow messages to be * received. * @param timeout_ms the time to wait, in milliseconds + * @return success code - on failure, this means the client has disconnected */ - void yield(int timeout_ms = 1000); + int yield(int timeout_ms = 1000); private: @@ -176,7 +156,8 @@ int decodePacket(int* value, int timeout); int readPacket(Timer& timer); int sendPacket(int length, Timer& timer); - int deliverMessage(MQTTString* topic, Message* message); + int deliverMessage(MQTTString& topicName, Message& message); + bool isTopicMatched(char* topicFilter, MQTTString& topicName); Network& ipstack; unsigned int command_timeout_ms; @@ -193,15 +174,13 @@ typedef FP<void, Message*> messageHandlerFP; struct MessageHandlers { - const char* topic; + const char* topicFilter; messageHandlerFP fp; } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic messageHandlerFP defaultMessageHandler; - typedef FP<int, connectionLostInfo*> connectionLostFP; - connectionLostFP connectionLostHandler; - + bool isconnected; }; } @@ -213,8 +192,9 @@ ping_timer = Timer(); ping_outstanding = 0; for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) - messageHandlers[i].topic = 0; - this->command_timeout_ms = command_timeout_ms; + messageHandlers[i].topicFilter = 0; + this->command_timeout_ms = command_timeout_ms; + isconnected = false; } @@ -227,13 +207,9 @@ while (sent < length && !timer.expired()) { rc = ipstack.write(&buf[sent], length, timer.left_ms()); - if (rc == -1) - { - connectionLostInfo info = {this, &ipstack}; - connectionLostHandler(&info); - } - else - sent += rc; + if (rc < 0) // there was an error writing the data + break; + sent += rc; } if (sent == length) { @@ -246,7 +222,8 @@ } -template<class Network, class Timer, int a, int b> int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout) +template<class Network, class Timer, int a, int b> +int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout) { char c; int multiplier = 1; @@ -283,7 +260,7 @@ template<class Network, class Timer, int a, int b> int MQTT::Client<Network, Timer, a, b>::readPacket(Timer& timer) { - int rc = -1; + int rc = FAILURE; MQTTHeader header = {0}; int len = 0; int rem_len = 0; @@ -308,23 +285,63 @@ } +// assume topic filter and name is in correct format +// # can only be at end +// + and # can only be next to separator +template<class Network, class Timer, int a, int b> +bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName) +{ + char* curf = topicFilter; + char* curn = topicName.lenstring.data; + char* curn_end = curn + topicName.lenstring.len; + + while (*curf && curn < curn_end) + { + if (*curn == '/' && *curf != '/') + break; + if (*curf != '+' && *curf != '#' && *curf != *curn) + break; + if (*curf == '+') + { // skip until we meet the next separator, or end of string + char* nextpos = curn + 1; + while (nextpos < curn_end && *nextpos != '/') + nextpos = ++curn + 1; + } + else if (*curf == '#') + curn = curn_end - 1; // skip until end of string + curf++; + curn++; + }; + + return (curn == curn_end) && (*curf == '\0'); +} + + + template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> -int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString* topic, Message* message) +int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message) { - int rc = -1; + int rc = FAILURE; // we have to find the right message handler - indexed by topic for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) { - if (messageHandlers[i].topic != 0 && MQTTPacket_equals(topic, (char*)messageHandlers[i].topic)) + if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) || + isTopicMatched((char*)messageHandlers[i].topicFilter, topicName))) { - messageHandlers[i].fp(message); - rc = 0; - break; + if (messageHandlers[i].fp.attached()) + { + messageHandlers[i].fp(&message); + rc = SUCCESS; + } } } - if (rc == -1) - defaultMessageHandler(message); + + if (rc == FAILURE && defaultMessageHandler.attached()) + { + defaultMessageHandler(&message); + rc = SUCCESS; + } return rc; } @@ -332,13 +349,22 @@ template<class Network, class Timer, int a, int b> -void MQTT::Client<Network, Timer, a, b>::yield(int timeout_ms) +int MQTT::Client<Network, Timer, a, b>::yield(int timeout_ms) { + int rc = SUCCESS; Timer timer = Timer(); timer.countdown_ms(timeout_ms); while (!timer.expired()) - cycle(timer); + { + if (cycle(timer) == FAILURE) + { + rc = FAILURE; + break; + } + } + + return rc; } @@ -350,7 +376,8 @@ // read the socket, see what work is due int packet_type = readPacket(timer); - int len, rc; + int len = 0, + rc = SUCCESS; switch (packet_type) { case CONNACK: @@ -360,27 +387,34 @@ case PUBLISH: MQTTString topicName; Message msg; - rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName, - (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE);; - deliverMessage(&topicName, &msg); + if (MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName, + (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1) + goto exit; + deliverMessage(topicName, msg); if (msg.qos != QOS0) { if (msg.qos == QOS1) len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id); else if (msg.qos == QOS2) len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id); - if ((rc = sendPacket(len, timer)) != SUCCESS) + if (len <= 0) + rc = FAILURE; + else + rc = sendPacket(len, timer); + if (rc == FAILURE) goto exit; // there was a problem } break; case PUBREC: int type, dup, mypacketid; - if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1) - ; - len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid); - if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet + if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) + rc = FAILURE; + else if ((len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid)) <= 0) + rc = FAILURE; + else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet + rc = FAILURE; // there was a problem + if (rc == FAILURE) goto exit; // there was a problem - break; case PUBCOMP: break; @@ -390,30 +424,30 @@ } keepalive(); exit: - return packet_type; + if (rc == SUCCESS) + rc = packet_type; + return rc; } template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive() { - int rc = 0; + int rc = FAILURE; if (keepAliveInterval == 0) + { + rc = SUCCESS; goto exit; + } if (ping_timer.expired()) { - if (ping_outstanding) - rc = -1; - else + if (!ping_outstanding) { Timer timer = Timer(1000); int len = MQTTSerialize_pingreq(buf, MAX_MQTT_PACKET_SIZE); - rc = sendPacket(len, timer); // send the ping packet - if (rc != SUCCESS) - rc = -1; // indicate there's a problem - else + if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet ping_outstanding = true; } } @@ -427,7 +461,7 @@ template<class Network, class Timer, int a, int b> int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer) { - int rc = -1; + int rc = FAILURE; do { @@ -444,6 +478,7 @@ int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData* options) { Timer connect_timer = Timer(command_timeout_ms); + int rc = FAILURE; MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; if (options == 0) @@ -452,8 +487,9 @@ this->keepAliveInterval = options->keepAliveInterval; ping_timer.countdown(this->keepAliveInterval); int len = MQTTSerialize_connect(buf, MAX_MQTT_PACKET_SIZE, options); - int rc = sendPacket(len, connect_timer); // send the connect packet - if (rc != SUCCESS) + if (len <= 0) + goto exit; + if ((rc = sendPacket(len, connect_timer)) != SUCCESS) // send the connect packet goto exit; // there was a problem // this will be a blocking call, wait for the connack @@ -462,9 +498,15 @@ int connack_rc = -1; if (MQTTDeserialize_connack(&connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1) rc = connack_rc; + else + rc = FAILURE; } + else + rc = FAILURE; exit: + if (rc == SUCCESS) + isconnected = true; return rc; } @@ -472,17 +514,19 @@ template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler) { - int len = -1; + int rc = FAILURE; Timer timer = Timer(command_timeout_ms); + int len = 0; MQTTString topic = {(char*)topicFilter, 0, 0}; - - int rc = MQTTSerialize_subscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); - if (rc <= 0) + if (!isconnected) goto exit; - len = rc; + + len = MQTTSerialize_subscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); + if (len <= 0) + goto exit; if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet - goto exit; // there was a problem + goto exit; // there was a problem if (waitfor(SUBACK, timer) == SUBACK) // wait for suback { @@ -493,9 +537,9 @@ { for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) { - if (messageHandlers[i].topic == 0) + if (messageHandlers[i].topicFilter == 0) { - messageHandlers[i].topic = topicFilter; + messageHandlers[i].topicFilter = topicFilter; messageHandlers[i].fp.attach(messageHandler); rc = 0; break; @@ -503,8 +547,12 @@ } } } - + else + rc = FAILURE; + exit: + //if (rc == FAILURE) + // closesession(); return rc; } @@ -512,15 +560,14 @@ template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter) { - int len = -1; + int rc = FAILURE; Timer timer = Timer(command_timeout_ms); MQTTString topic = {(char*)topicFilter, 0, 0}; - int rc = MQTTSerialize_unsubscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic); - if (rc <= 0) + int len = MQTTSerialize_unsubscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic); + if (len <= 0) goto exit; - len = rc; if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet goto exit; // there was a problem @@ -530,6 +577,8 @@ if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1) rc = 0; } + else + rc = FAILURE; exit: return rc; @@ -540,6 +589,7 @@ template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message* message) { + int rc = FAILURE; Timer timer = Timer(command_timeout_ms); MQTTString topicString = {(char*)topicName, 0, 0}; @@ -549,8 +599,9 @@ int len = MQTTSerialize_publish(buf, MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id, topicString, (char*)message->payload, message->payloadlen); - int rc = sendPacket(len, timer); // send the subscribe packet - if (rc != SUCCESS) + if (len <= 0) + goto exit; + if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet goto exit; // there was a problem if (message->qos == QOS1) @@ -558,18 +609,22 @@ if (waitfor(PUBACK, timer) == PUBACK) { int type, dup, mypacketid; - if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1) - rc = 0; + if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) + rc = FAILURE; } + else + rc = FAILURE; } else if (message->qos == QOS2) { if (waitfor(PUBCOMP, timer) == PUBCOMP) { int type, dup, mypacketid; - if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1) - rc = 0; + if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) + rc = FAILURE; } + else + rc = FAILURE; } exit: @@ -580,10 +635,12 @@ template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect() { + int rc = FAILURE; Timer timer = Timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete int len = MQTTSerialize_disconnect(buf, MAX_MQTT_PACKET_SIZE); - int rc = sendPacket(len, timer); // send the disconnect packet - + if (len > 0) + rc = sendPacket(len, timer); // send the disconnect packet + return rc; }