An API for using MQTT over multiple transports
Dependents: Water_Monitor_clone_v1 Cloud_IBM_MbedOS ble-star-mbed
Fork of MQTT by
Revision 31:a51dd239b78e, committed 2014-05-22
- Comitter:
- icraggs
- Date:
- Thu May 22 23:58:08 2014 +0000
- Parent:
- 30:a4e3a97dabe3
- Child:
- 32:3ad9afa63299
- Child:
- 34:e18a166198df
- Commit message:
- Create MQTTSocket.h to not use EthernetInterface
Changed in this revision
--- a/MQTTClient.h Tue May 20 15:07:11 2014 +0000
+++ b/MQTTClient.h Thu May 22 23:58:08 2014 +0000
@@ -37,6 +37,7 @@
enum QoS { QOS0, QOS1, QOS2 };
+// all failure return codes must be negative
enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
@@ -53,8 +54,13 @@
struct MessageData
{
- struct Message message;
- char* topicName;
+ MessageData(MQTTString &aTopicName, struct Message &aMessage) : message(aMessage), topicName(aTopicName)
+ {
+
+ }
+
+ struct Message &message;
+ MQTTString &topicName;
};
@@ -75,6 +81,17 @@
static const int MAX_PACKET_ID = 65535;
int next;
};
+
+
+class QoS2
+{
+public:
+
+
+private:
+
+
+};
/**
@@ -86,12 +103,13 @@
* @param Network a network class which supports send, receive
* @param Timer a timer class with the methods:
*/
-template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5> class Client
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5>
+class Client
{
public:
- typedef void (*messageHandler)(Message*);
+ typedef void (*messageHandler)(MessageData&);
/** Construct the client
* @param network - pointer to an instance of the Network class - must be connected to the endpoint
@@ -136,7 +154,7 @@
*/
int unsubscribe(const char* topicFilter);
- /** MQTT Disconnect - send an MQTT disconnect packet
+ /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state
* @return success code -
*/
int disconnect();
@@ -173,17 +191,24 @@
PacketId packetid;
- // typedef FP<void, Message*> messageHandlerFP;
struct MessageHandlers
{
const char* topicFilter;
- //messageHandlerFP fp; typedefs not liked?
- FP<void, Message*> fp;
+ FP<void, MessageData&> fp;
} messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic
- FP<void, Message*> defaultMessageHandler;
+ FP<void, MessageData&> defaultMessageHandler;
bool isconnected;
+
+#if 0
+ struct
+ {
+ bool used;
+ int id;
+ } QoS2messages[MAX_QOS2_MESSAGES];
+
+#endif
};
@@ -335,7 +360,8 @@
{
if (messageHandlers[i].fp.attached())
{
- messageHandlers[i].fp(&message);
+ MessageData md(topicName, message);
+ messageHandlers[i].fp(md);
rc = SUCCESS;
}
}
@@ -343,7 +369,8 @@
if (rc == FAILURE && defaultMessageHandler.attached())
{
- defaultMessageHandler(&message);
+ MessageData md(topicName, message);
+ defaultMessageHandler(md);
rc = SUCCESS;
}
@@ -395,7 +422,15 @@
if (MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName,
(char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
goto exit;
- deliverMessage(topicName, msg);
+// if (msg.qos != QOS2)
+ deliverMessage(topicName, msg);
+#if 0
+ else if (isQoS2msgidFree(msg.id))
+ {
+ UseQoS2msgid(msg.id);
+ deliverMessage(topicName, msg);
+ }
+#endif
if (msg.qos != QOS0)
{
if (msg.qos == QOS1)
@@ -484,15 +519,18 @@
{
Timer connect_timer = Timer(command_timeout_ms);
int rc = FAILURE;
+ MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
+ int len = 0;
+
+ if (isconnected) // don't send connect packet again if we are already connected
+ goto exit;
- MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
if (options == 0)
options = &default_options; // set default options if none were supplied
this->keepAliveInterval = options->keepAliveInterval;
ping_timer.countdown(this->keepAliveInterval);
- int len = MQTTSerialize_connect(buf, MAX_MQTT_PACKET_SIZE, options);
- if (len <= 0)
+ if ((len = MQTTSerialize_connect(buf, MAX_MQTT_PACKET_SIZE, options)) <= 0)
goto exit;
if ((rc = sendPacket(len, connect_timer)) != SUCCESS) // send the connect packet
goto exit; // there was a problem
@@ -519,11 +557,11 @@
template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
{
- int rc = FAILURE;
+ int rc = FAILURE;
Timer timer = Timer(command_timeout_ms);
int len = 0;
+ MQTTString topic = {(char*)topicFilter, 0, 0};
- MQTTString topic = {(char*)topicFilter, 0, 0};
if (!isconnected)
goto exit;
@@ -556,8 +594,6 @@
rc = FAILURE;
exit:
- //if (rc == FAILURE)
- // closesession();
return rc;
}
@@ -566,12 +602,14 @@
int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter)
{
int rc = FAILURE;
- Timer timer = Timer(command_timeout_ms);
-
+ Timer timer = Timer(command_timeout_ms);
MQTTString topic = {(char*)topicFilter, 0, 0};
+ int len = 0;
- int len = MQTTSerialize_unsubscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic);
- if (len <= 0)
+ if (!isconnected)
+ goto exit;
+
+ if ((len = MQTTSerialize_unsubscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0)
goto exit;
if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
goto exit; // there was a problem
@@ -595,14 +633,17 @@
int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message* message)
{
int rc = FAILURE;
- Timer timer = Timer(command_timeout_ms);
+ Timer timer = Timer(command_timeout_ms);
+ MQTTString topicString = {(char*)topicName, 0, 0};
+ int len = 0;
- MQTTString topicString = {(char*)topicName, 0, 0};
+ if (!isconnected)
+ goto exit;
if (message->qos == QOS1 || message->qos == QOS2)
message->id = packetid.getNext();
- int len = MQTTSerialize_publish(buf, MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id,
+ len = MQTTSerialize_publish(buf, MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id,
topicString, (char*)message->payload, message->payloadlen);
if (len <= 0)
goto exit;
@@ -645,7 +686,8 @@
int len = MQTTSerialize_disconnect(buf, MAX_MQTT_PACKET_SIZE);
if (len > 0)
rc = sendPacket(len, timer); // send the disconnect packet
-
+
+ isconnected = false;
return rc;
}
--- a/MQTTEthernet.h Tue May 20 15:07:11 2014 +0000
+++ b/MQTTEthernet.h Thu May 22 23:58:08 2014 +0000
@@ -4,46 +4,22 @@
#include "MQTT_mbed.h"
#include "EthernetInterface.h"
+#include "MQTTSocket.h"
-class MQTTEthernet
+class MQTTEthernet : public MQTTSocket
{
public:
MQTTEthernet()
{
eth.init(); // Use DHCP
eth.connect();
- mysock.set_blocking(false, 1000); // 1 second Timeout
- }
-
- int connect(char* hostname, int port)
- {
- return mysock.connect(hostname, port);
- }
-
- int read(char* buffer, int len, int timeout)
- {
- mysock.set_blocking(false, timeout);
- return mysock.receive(buffer, len);
- }
-
- int write(char* buffer, int len, int timeout)
- {
- mysock.set_blocking(false, timeout);
- return mysock.send(buffer, len);
- }
-
- int disconnect()
- {
- return mysock.close();
}
private:
EthernetInterface eth;
- TCPSocketConnection mysock;
};
-
#endif
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/MQTTSocket.h Thu May 22 23:58:08 2014 +0000
@@ -0,0 +1,41 @@
+#if !defined(MQTTSOCKET_H)
+#define MQTTSOCKET_H
+
+#include "MQTT_mbed.h"
+#include "TCPSocketConnection.h"
+
+class MQTTSocket
+{
+public:
+ int connect(char* hostname, int port, int timeout=1000)
+ {
+ mysock.set_blocking(false, timeout); // 1 second Timeout
+ return mysock.connect(hostname, port);
+ }
+
+ int read(char* buffer, int len, int timeout)
+ {
+ mysock.set_blocking(false, timeout);
+ return mysock.receive(buffer, len);
+ }
+
+ int write(char* buffer, int len, int timeout)
+ {
+ mysock.set_blocking(false, timeout);
+ return mysock.send(buffer, len);
+ }
+
+ int disconnect()
+ {
+ return mysock.close();
+ }
+
+private:
+
+ TCPSocketConnection mysock;
+
+};
+
+
+
+#endif
