my customized library
Fork of MQTT by
Diff: MQTTClient.h
- Revision:
- 53:15b5a280d22d
- Parent:
- 46:e335fcc1a663
- Child:
- 54:ff9e5c4b52d0
--- a/MQTTClient.h Mon Sep 25 11:12:23 2017 +0000 +++ b/MQTTClient.h Mon Sep 25 12:06:28 2017 +0000 @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2014, 2015 IBM Corp. + * Copyright (c) 2014, 2017 IBM Corp. * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 @@ -16,6 +16,9 @@ * Ian Craggs - fix for bug 460389 - send loop uses wrong length * Ian Craggs - fix for bug 464169 - clearing subscriptions * Ian Craggs - fix for bug 464551 - enums and ints can be different size + * Mark Sonnentag - fix for bug 475204 - inefficient instantiation of Timer + * Ian Craggs - fix for bug 475749 - packetid modified twice + * Ian Craggs - add ability to set message handler separately #6 *******************************************************************************/ #if !defined(MQTTCLIENT_H) @@ -23,7 +26,7 @@ #include "FP.h" #include "MQTTPacket.h" -#include "stdio.h" +#include <stdio.h> #include "MQTTLogging.h" #if !defined(MQTTCLIENT_QOS1) @@ -64,6 +67,19 @@ }; +struct connackData +{ + int rc; + bool sessionPresent; +}; + + +struct subackData +{ + int grantedQoS; +}; + + class PacketId { public: @@ -74,7 +90,7 @@ int getNext() { - return next = (next == MAX_PACKET_ID) ? 1 : ++next; + return next = (next == MAX_PACKET_ID) ? 1 : next + 1; } private: @@ -108,34 +124,51 @@ Client(Network& network, unsigned int command_timeout_ms = 30000); /** Set the default message handling callback - used for any message which does not match a subscription message handler - * @param mh - pointer to the callback function + * @param mh - pointer to the callback function. Set to 0 to remove. */ void setDefaultMessageHandler(messageHandler mh) { - defaultMessageHandler.attach(mh); + if (mh != 0) + defaultMessageHandler.attach(mh); + else + defaultMessageHandler.detach(); } + /** Set a message handling callback. This can be used outside of the the subscribe method. + * @param topicFilter - a topic pattern which can include wildcards + * @param mh - pointer to the callback function. If 0, removes the callback if any + */ + int setMessageHandler(const char* topicFilter, messageHandler mh); + /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack * The nework object must be connected to the network endpoint before calling this * Default connect options are used * @return success code - */ int connect(); - - /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack + + /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack * The nework object must be connected to the network endpoint before calling this * @param options - connect options * @return success code - */ int connect(MQTTPacket_connectData& options); + /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack + * The nework object must be connected to the network endpoint before calling this + * @param options - connect options + * @param connackData - connack data to be returned + * @return success code - + */ + int connect(MQTTPacket_connectData& options, connackData& data); + /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs * @param topic - the topic to publish to * @param message - the message to send * @return success code - */ int publish(const char* topicName, Message& message); - + /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs * @param topic - the topic to publish to * @param payload - the data to send @@ -145,12 +178,12 @@ * @return success code - */ int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false); - + /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs * @param topic - the topic to publish to * @param payload - the data to send * @param payloadlen - the length of the data - * @param id - the packet id used - returned + * @param id - the packet id used - returned * @param qos - the QoS to send the publish at * @param retained - whether the message should be retained * @return success code - @@ -165,6 +198,15 @@ */ int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh); + /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback + * @param topicFilter - a topic pattern which can include wildcards + * @param qos - the MQTT QoS to subscribe at© + * @param mh - the callback function to be invoked when a message is received for this subscription + * @param + * @return success code - + */ + int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh, subackData &data); + /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback * @param topicFilter - a topic pattern which can include wildcards * @return success code - @@ -194,6 +236,7 @@ private: + void closeSession(); void cleanSession(); int cycle(Timer& timer); int waitfor(int packet_type, Timer& timer); @@ -253,12 +296,10 @@ template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> -void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::cleanSession() +void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::cleanSession() { - ping_outstanding = false; for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) messageHandlers[i].topicFilter = 0; - isconnected = false; #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 inflightMsgid = 0; @@ -274,12 +315,21 @@ template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> +void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::closeSession() +{ + ping_outstanding = false; + isconnected = false; + if (cleansession) + cleanSession(); +} + + +template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms) : ipstack(network), packetid() { - last_sent = Timer(); - last_received = Timer(); this->command_timeout_ms = command_timeout_ms; - cleanSession(); + cleansession = true; + closeSession(); } @@ -347,7 +397,7 @@ } else rc = FAILURE; - + #if defined(MQTT_DEBUG) char printbuf[150]; DEBUG("Rc %d from sending packet %s\n", rc, MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length)); @@ -389,7 +439,7 @@ * If any read fails in this method, then we should disconnect from the network, as on reconnect * the packets can be retried. * @param timeout the max time to wait for the packet read to complete, in milliseconds - * @return the MQTT packet type, or -1 if none + * @return the MQTT packet type, 0 if none, -1 if error */ template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::readPacket(Timer& timer) @@ -400,7 +450,8 @@ int rem_len = 0; /* 1. read the header byte. This has the packet type in it */ - if (ipstack.read(readbuf, 1, timer.left_ms()) != 1) + rc = ipstack.read(readbuf, 1, timer.left_ms()); + if (rc != 1) goto exit; len = 1; @@ -423,12 +474,13 @@ if (this->keepAliveInterval > 0) last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet exit: - + #if defined(MQTT_DEBUG) if (rc >= 0) { char printbuf[50]; - DEBUG("Rc %d from receiving packet %s\n", rc, MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len)); + DEBUG("Rc %d from receiving packet %s\n", rc, + MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len)); } #endif return rc; @@ -504,7 +556,7 @@ int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms) { int rc = SUCCESS; - Timer timer = Timer(); + Timer timer; timer.countdown_ms(timeout_ms); while (!timer.expired()) @@ -523,19 +575,19 @@ template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer) { - /* get one piece of work off the wire and one pass through */ - - // read the socket, see what work is due - int packet_type = readPacket(timer); - + // get one piece of work off the wire and one pass through int len = 0, rc = SUCCESS; + int packet_type = readPacket(timer); // read the socket, see what work is due + switch (packet_type) { - case FAILURE: - case BUFFER_OVERFLOW: + default: + // no more data to read, unrecoverable. Or read packet fails due to unexpected network error rc = packet_type; + goto exit; + case 0: // timed out reading packet break; case CONNACK: case PUBACK: @@ -546,6 +598,7 @@ MQTTString topicName = MQTTString_initializer; Message msg; int intQoS; + msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */ if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName, (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1) goto exit; @@ -587,8 +640,8 @@ unsigned char dup, type; if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) rc = FAILURE; - else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, - (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0) + else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, + (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0) rc = FAILURE; else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet rc = FAILURE; // there was a problem @@ -597,7 +650,7 @@ if (packet_type == PUBREL) freeQoS2msgid(mypacketid); break; - + case PUBCOMP: break; #endif @@ -605,10 +658,16 @@ ping_outstanding = false; break; } - keepalive(); + + if (keepalive() != SUCCESS) + //check only keepalive FAILURE status so that previous FAILURE status can be considered as FAULT + rc = FAILURE; + exit: if (rc == SUCCESS) rc = packet_type; + else if (isconnected) + closeSession(); return rc; } @@ -616,17 +675,22 @@ template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive() { - int rc = FAILURE; + int rc = SUCCESS; if (keepAliveInterval == 0) - { - rc = SUCCESS; goto exit; - } if (last_sent.expired() || last_received.expired()) { - if (!ping_outstanding) + if (ping_outstanding) + { + rc = FAILURE; // session failure + #if defined(MQTT_DEBUG) + char printbuf[150]; + DEBUG("PINGRESP not received in keepalive interval\n"); + #endif + } + else { Timer timer(1000); int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE); @@ -650,15 +714,16 @@ { if (timer.expired()) break; // we timed out + rc = cycle(timer); } - while ((rc = cycle(timer)) != packet_type); + while (rc != packet_type && rc >= 0); return rc; } template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> -int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options) +int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options, connackData& data) { Timer connect_timer(command_timeout_ms); int rc = FAILURE; @@ -679,10 +744,11 @@ // this will be a blocking call, wait for the connack if (waitfor(CONNACK, connect_timer) == CONNACK) { - unsigned char connack_rc = 255; - bool sessionPresent = false; - if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1) - rc = connack_rc; + data.rc = 0; + data.sessionPresent = false; + if (MQTTDeserialize_connack((unsigned char*)&data.sessionPresent, + (unsigned char*)&data.rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1) + rc = data.rc; else rc = FAILURE; } @@ -710,12 +776,23 @@ exit: if (rc == SUCCESS) + { isconnected = true; + ping_outstanding = false; + } return rc; } template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> +int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options) +{ + connackData data; + return connect(options, data); +} + + +template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect() { MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; @@ -724,7 +801,51 @@ template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> -int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler) +int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::setMessageHandler(const char* topicFilter, messageHandler messageHandler) +{ + int rc = FAILURE; + int i = -1; + + // first check for an existing matching slot + for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) + { + if (messageHandlers[i].topicFilter != 0 && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0) + { + if (messageHandler == 0) // remove existing + { + messageHandlers[i].topicFilter = 0; + messageHandlers[i].fp.detach(); + } + rc = SUCCESS; // return i when adding new subscription + break; + } + } + // if no existing, look for empty slot (unless we are removing) + if (messageHandler != 0) { + if (rc == FAILURE) + { + for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) + { + if (messageHandlers[i].topicFilter == 0) + { + rc = SUCCESS; + break; + } + } + } + if (i < MAX_MESSAGE_HANDLERS) + { + messageHandlers[i].topicFilter = topicFilter; + messageHandlers[i].fp.attach(messageHandler); + } + } + return rc; +} + + +template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> +int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, + enum QoS qos, messageHandler messageHandler, subackData& data) { int rc = FAILURE; Timer timer(command_timeout_ms); @@ -742,35 +863,34 @@ if (waitfor(SUBACK, timer) == SUBACK) // wait for suback { - int count = 0, grantedQoS = -1; + int count = 0; unsigned short mypacketid; - if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) - rc = grantedQoS; // 0, 1, 2 or 0x80 - if (rc != 0x80) + data.grantedQoS = 0; + if (MQTTDeserialize_suback(&mypacketid, 1, &count, &data.grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) { - for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) - { - if (messageHandlers[i].topicFilter == 0) - { - messageHandlers[i].topicFilter = topicFilter; - messageHandlers[i].fp.attach(messageHandler); - rc = 0; - break; - } - } + if (data.grantedQoS != 0x80) + rc = setMessageHandler(topicFilter, messageHandler); } } else rc = FAILURE; exit: - if (rc != SUCCESS) - cleanSession(); + if (rc == FAILURE) + closeSession(); return rc; } template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> +int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler) +{ + subackData data; + return subscribe(topicFilter, qos, messageHandler, data); +} + + +template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter) { int rc = FAILURE; @@ -791,17 +911,8 @@ unsigned short mypacketid; // should be the same as the packetid above if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1) { - rc = 0; - // remove the subscription message handler associated with this topic, if there is one - for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) - { - if (messageHandlers[i].topicFilter && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0) - { - messageHandlers[i].topicFilter = 0; - break; - } - } + setMessageHandler(topicFilter, 0); } } else @@ -809,7 +920,7 @@ exit: if (rc != SUCCESS) - cleanSession(); + closeSession(); return rc; } @@ -837,7 +948,8 @@ else rc = FAILURE; } -#elif MQTTCLIENT_QOS2 +#endif +#if MQTTCLIENT_QOS2 else if (qos == QOS2) { if (waitfor(PUBCOMP, timer) == PUBCOMP) @@ -856,7 +968,7 @@ exit: if (rc != SUCCESS) - cleanSession(); + closeSession(); return rc; } @@ -923,17 +1035,12 @@ int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect() { int rc = FAILURE; - Timer timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete + Timer timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE); if (len > 0) rc = sendPacket(len, timer); // send the disconnect packet - - if (cleansession) - cleanSession(); - else - isconnected = false; + closeSession(); return rc; } - -#endif \ No newline at end of file +#endif