Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Diff: MQTTClient.h
- Revision:
- 31:a51dd239b78e
- Parent:
- 30:a4e3a97dabe3
- Child:
- 33:8bbc3a992326
- Child:
- 34:e18a166198df
--- a/MQTTClient.h Tue May 20 15:07:11 2014 +0000
+++ b/MQTTClient.h Thu May 22 23:58:08 2014 +0000
@@ -37,6 +37,7 @@
enum QoS { QOS0, QOS1, QOS2 };
+// all failure return codes must be negative
enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
@@ -53,8 +54,13 @@
struct MessageData
{
- struct Message message;
- char* topicName;
+ MessageData(MQTTString &aTopicName, struct Message &aMessage) : message(aMessage), topicName(aTopicName)
+ {
+
+ }
+
+ struct Message &message;
+ MQTTString &topicName;
};
@@ -75,6 +81,17 @@
static const int MAX_PACKET_ID = 65535;
int next;
};
+
+
+class QoS2
+{
+public:
+
+
+private:
+
+
+};
/**
@@ -86,12 +103,13 @@
* @param Network a network class which supports send, receive
* @param Timer a timer class with the methods:
*/
-template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5> class Client
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5>
+class Client
{
public:
- typedef void (*messageHandler)(Message*);
+ typedef void (*messageHandler)(MessageData&);
/** Construct the client
* @param network - pointer to an instance of the Network class - must be connected to the endpoint
@@ -136,7 +154,7 @@
*/
int unsubscribe(const char* topicFilter);
- /** MQTT Disconnect - send an MQTT disconnect packet
+ /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state
* @return success code -
*/
int disconnect();
@@ -173,17 +191,24 @@
PacketId packetid;
- // typedef FP<void, Message*> messageHandlerFP;
struct MessageHandlers
{
const char* topicFilter;
- //messageHandlerFP fp; typedefs not liked?
- FP<void, Message*> fp;
+ FP<void, MessageData&> fp;
} messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic
- FP<void, Message*> defaultMessageHandler;
+ FP<void, MessageData&> defaultMessageHandler;
bool isconnected;
+
+#if 0
+ struct
+ {
+ bool used;
+ int id;
+ } QoS2messages[MAX_QOS2_MESSAGES];
+
+#endif
};
@@ -335,7 +360,8 @@
{
if (messageHandlers[i].fp.attached())
{
- messageHandlers[i].fp(&message);
+ MessageData md(topicName, message);
+ messageHandlers[i].fp(md);
rc = SUCCESS;
}
}
@@ -343,7 +369,8 @@
if (rc == FAILURE && defaultMessageHandler.attached())
{
- defaultMessageHandler(&message);
+ MessageData md(topicName, message);
+ defaultMessageHandler(md);
rc = SUCCESS;
}
@@ -395,7 +422,15 @@
if (MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName,
(char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
goto exit;
- deliverMessage(topicName, msg);
+// if (msg.qos != QOS2)
+ deliverMessage(topicName, msg);
+#if 0
+ else if (isQoS2msgidFree(msg.id))
+ {
+ UseQoS2msgid(msg.id);
+ deliverMessage(topicName, msg);
+ }
+#endif
if (msg.qos != QOS0)
{
if (msg.qos == QOS1)
@@ -484,15 +519,18 @@
{
Timer connect_timer = Timer(command_timeout_ms);
int rc = FAILURE;
+ MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
+ int len = 0;
+
+ if (isconnected) // don't send connect packet again if we are already connected
+ goto exit;
- MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
if (options == 0)
options = &default_options; // set default options if none were supplied
this->keepAliveInterval = options->keepAliveInterval;
ping_timer.countdown(this->keepAliveInterval);
- int len = MQTTSerialize_connect(buf, MAX_MQTT_PACKET_SIZE, options);
- if (len <= 0)
+ if ((len = MQTTSerialize_connect(buf, MAX_MQTT_PACKET_SIZE, options)) <= 0)
goto exit;
if ((rc = sendPacket(len, connect_timer)) != SUCCESS) // send the connect packet
goto exit; // there was a problem
@@ -519,11 +557,11 @@
template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
{
- int rc = FAILURE;
+ int rc = FAILURE;
Timer timer = Timer(command_timeout_ms);
int len = 0;
+ MQTTString topic = {(char*)topicFilter, 0, 0};
- MQTTString topic = {(char*)topicFilter, 0, 0};
if (!isconnected)
goto exit;
@@ -556,8 +594,6 @@
rc = FAILURE;
exit:
- //if (rc == FAILURE)
- // closesession();
return rc;
}
@@ -566,12 +602,14 @@
int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter)
{
int rc = FAILURE;
- Timer timer = Timer(command_timeout_ms);
-
+ Timer timer = Timer(command_timeout_ms);
MQTTString topic = {(char*)topicFilter, 0, 0};
+ int len = 0;
- int len = MQTTSerialize_unsubscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic);
- if (len <= 0)
+ if (!isconnected)
+ goto exit;
+
+ if ((len = MQTTSerialize_unsubscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0)
goto exit;
if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
goto exit; // there was a problem
@@ -595,14 +633,17 @@
int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message* message)
{
int rc = FAILURE;
- Timer timer = Timer(command_timeout_ms);
+ Timer timer = Timer(command_timeout_ms);
+ MQTTString topicString = {(char*)topicName, 0, 0};
+ int len = 0;
- MQTTString topicString = {(char*)topicName, 0, 0};
+ if (!isconnected)
+ goto exit;
if (message->qos == QOS1 || message->qos == QOS2)
message->id = packetid.getNext();
- int len = MQTTSerialize_publish(buf, MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id,
+ len = MQTTSerialize_publish(buf, MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id,
topicString, (char*)message->payload, message->payloadlen);
if (len <= 0)
goto exit;
@@ -645,7 +686,8 @@
int len = MQTTSerialize_disconnect(buf, MAX_MQTT_PACKET_SIZE);
if (len > 0)
rc = sendPacket(len, timer); // send the disconnect packet
-
+
+ isconnected = false;
return rc;
}