Modified MQTT for Mbed OS.
Dependents: mbed-os-mqtt door_lock co657_IoT nucleo-f429zi-mbed-os-mqtt
Fork of MQTT by
Revision 31:a51dd239b78e, committed 2014-05-22
- Comitter:
- icraggs
- Date:
- Thu May 22 23:58:08 2014 +0000
- Parent:
- 30:a4e3a97dabe3
- Child:
- 32:3ad9afa63299
- Child:
- 34:e18a166198df
- Commit message:
- Create MQTTSocket.h to not use EthernetInterface
Changed in this revision
--- a/MQTTClient.h Tue May 20 15:07:11 2014 +0000 +++ b/MQTTClient.h Thu May 22 23:58:08 2014 +0000 @@ -37,6 +37,7 @@ enum QoS { QOS0, QOS1, QOS2 }; +// all failure return codes must be negative enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 }; @@ -53,8 +54,13 @@ struct MessageData { - struct Message message; - char* topicName; + MessageData(MQTTString &aTopicName, struct Message &aMessage) : message(aMessage), topicName(aTopicName) + { + + } + + struct Message &message; + MQTTString &topicName; }; @@ -75,6 +81,17 @@ static const int MAX_PACKET_ID = 65535; int next; }; + + +class QoS2 +{ +public: + + +private: + + +}; /** @@ -86,12 +103,13 @@ * @param Network a network class which supports send, receive * @param Timer a timer class with the methods: */ -template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5> class Client +template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5> +class Client { public: - typedef void (*messageHandler)(Message*); + typedef void (*messageHandler)(MessageData&); /** Construct the client * @param network - pointer to an instance of the Network class - must be connected to the endpoint @@ -136,7 +154,7 @@ */ int unsubscribe(const char* topicFilter); - /** MQTT Disconnect - send an MQTT disconnect packet + /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state * @return success code - */ int disconnect(); @@ -173,17 +191,24 @@ PacketId packetid; - // typedef FP<void, Message*> messageHandlerFP; struct MessageHandlers { const char* topicFilter; - //messageHandlerFP fp; typedefs not liked? - FP<void, Message*> fp; + FP<void, MessageData&> fp; } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic - FP<void, Message*> defaultMessageHandler; + FP<void, MessageData&> defaultMessageHandler; bool isconnected; + +#if 0 + struct + { + bool used; + int id; + } QoS2messages[MAX_QOS2_MESSAGES]; + +#endif }; @@ -335,7 +360,8 @@ { if (messageHandlers[i].fp.attached()) { - messageHandlers[i].fp(&message); + MessageData md(topicName, message); + messageHandlers[i].fp(md); rc = SUCCESS; } } @@ -343,7 +369,8 @@ if (rc == FAILURE && defaultMessageHandler.attached()) { - defaultMessageHandler(&message); + MessageData md(topicName, message); + defaultMessageHandler(md); rc = SUCCESS; } @@ -395,7 +422,15 @@ if (MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName, (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1) goto exit; - deliverMessage(topicName, msg); +// if (msg.qos != QOS2) + deliverMessage(topicName, msg); +#if 0 + else if (isQoS2msgidFree(msg.id)) + { + UseQoS2msgid(msg.id); + deliverMessage(topicName, msg); + } +#endif if (msg.qos != QOS0) { if (msg.qos == QOS1) @@ -484,15 +519,18 @@ { Timer connect_timer = Timer(command_timeout_ms); int rc = FAILURE; + MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; + int len = 0; + + if (isconnected) // don't send connect packet again if we are already connected + goto exit; - MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; if (options == 0) options = &default_options; // set default options if none were supplied this->keepAliveInterval = options->keepAliveInterval; ping_timer.countdown(this->keepAliveInterval); - int len = MQTTSerialize_connect(buf, MAX_MQTT_PACKET_SIZE, options); - if (len <= 0) + if ((len = MQTTSerialize_connect(buf, MAX_MQTT_PACKET_SIZE, options)) <= 0) goto exit; if ((rc = sendPacket(len, connect_timer)) != SUCCESS) // send the connect packet goto exit; // there was a problem @@ -519,11 +557,11 @@ template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler) { - int rc = FAILURE; + int rc = FAILURE; Timer timer = Timer(command_timeout_ms); int len = 0; + MQTTString topic = {(char*)topicFilter, 0, 0}; - MQTTString topic = {(char*)topicFilter, 0, 0}; if (!isconnected) goto exit; @@ -556,8 +594,6 @@ rc = FAILURE; exit: - //if (rc == FAILURE) - // closesession(); return rc; } @@ -566,12 +602,14 @@ int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter) { int rc = FAILURE; - Timer timer = Timer(command_timeout_ms); - + Timer timer = Timer(command_timeout_ms); MQTTString topic = {(char*)topicFilter, 0, 0}; + int len = 0; - int len = MQTTSerialize_unsubscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic); - if (len <= 0) + if (!isconnected) + goto exit; + + if ((len = MQTTSerialize_unsubscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0) goto exit; if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet goto exit; // there was a problem @@ -595,14 +633,17 @@ int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message* message) { int rc = FAILURE; - Timer timer = Timer(command_timeout_ms); + Timer timer = Timer(command_timeout_ms); + MQTTString topicString = {(char*)topicName, 0, 0}; + int len = 0; - MQTTString topicString = {(char*)topicName, 0, 0}; + if (!isconnected) + goto exit; if (message->qos == QOS1 || message->qos == QOS2) message->id = packetid.getNext(); - int len = MQTTSerialize_publish(buf, MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id, + len = MQTTSerialize_publish(buf, MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id, topicString, (char*)message->payload, message->payloadlen); if (len <= 0) goto exit; @@ -645,7 +686,8 @@ int len = MQTTSerialize_disconnect(buf, MAX_MQTT_PACKET_SIZE); if (len > 0) rc = sendPacket(len, timer); // send the disconnect packet - + + isconnected = false; return rc; }
--- a/MQTTEthernet.h Tue May 20 15:07:11 2014 +0000 +++ b/MQTTEthernet.h Thu May 22 23:58:08 2014 +0000 @@ -4,46 +4,22 @@ #include "MQTT_mbed.h" #include "EthernetInterface.h" +#include "MQTTSocket.h" -class MQTTEthernet +class MQTTEthernet : public MQTTSocket { public: MQTTEthernet() { eth.init(); // Use DHCP eth.connect(); - mysock.set_blocking(false, 1000); // 1 second Timeout - } - - int connect(char* hostname, int port) - { - return mysock.connect(hostname, port); - } - - int read(char* buffer, int len, int timeout) - { - mysock.set_blocking(false, timeout); - return mysock.receive(buffer, len); - } - - int write(char* buffer, int len, int timeout) - { - mysock.set_blocking(false, timeout); - return mysock.send(buffer, len); - } - - int disconnect() - { - return mysock.close(); } private: EthernetInterface eth; - TCPSocketConnection mysock; }; - #endif
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/MQTTSocket.h Thu May 22 23:58:08 2014 +0000 @@ -0,0 +1,41 @@ +#if !defined(MQTTSOCKET_H) +#define MQTTSOCKET_H + +#include "MQTT_mbed.h" +#include "TCPSocketConnection.h" + +class MQTTSocket +{ +public: + int connect(char* hostname, int port, int timeout=1000) + { + mysock.set_blocking(false, timeout); // 1 second Timeout + return mysock.connect(hostname, port); + } + + int read(char* buffer, int len, int timeout) + { + mysock.set_blocking(false, timeout); + return mysock.receive(buffer, len); + } + + int write(char* buffer, int len, int timeout) + { + mysock.set_blocking(false, timeout); + return mysock.send(buffer, len); + } + + int disconnect() + { + return mysock.close(); + } + +private: + + TCPSocketConnection mysock; + +}; + + + +#endif