A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.
Fork of HelloMQTT by
Revision 23:06fac173529e, committed 2017-03-26
- Comitter:
- vpcola
- Date:
- Sun Mar 26 04:35:46 2017 +0000
- Parent:
- 22:826657a00c44
- Child:
- 24:9d5f0300d7ed
- Commit message:
- Code is stable now in both publish/subscribe ...
Changed in this revision
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/FP.lib Sun Mar 26 04:35:46 2017 +0000 @@ -0,0 +1,1 @@ +http://mbed.org/users/sam_grove/code/FP/#3c62ba1807ac
--- a/MQTT.lib Tue Mar 21 12:57:07 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,1 +0,0 @@ -http://mbed.org/teams/mqtt/code/MQTT/#c299463ae853
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/MQTTPacket.lib Sun Mar 26 04:35:46 2017 +0000 @@ -0,0 +1,1 @@ +http://mbed.org/teams/mqtt/code/MQTTPacket/#62396c1620b6
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/MQTTThreadedClient.cpp Sun Mar 26 04:35:46 2017 +0000
@@ -0,0 +1,564 @@
+#include "mbed.h"
+#include "rtos.h"
+#include "MQTTThreadedClient.h"
+
+
+static MemoryPool<PubMessage, 16> mpool;
+static Queue<PubMessage, 16> mqueue;
+
+int MQTTThreadedClient::readBytesToBuffer(char * buffer, size_t size, int timeout)
+{
+ int rc;
+
+ if (tcpSocket == NULL)
+ return -1;
+
+ // non-blocking socket ...
+ tcpSocket->set_timeout(timeout);
+ rc = tcpSocket->recv( (void *) buffer, size);
+
+ // return 0 bytes if timeout ...
+ if (NSAPI_ERROR_WOULD_BLOCK == rc)
+ return TIMEOUT;
+ else
+ return rc; // return the number of bytes received or error
+}
+
+int MQTTThreadedClient::sendBytesFromBuffer(char * buffer, size_t size, int timeout)
+{
+ int rc;
+
+ if (tcpSocket == NULL)
+ return -1;
+
+ // set the write timeout
+ tcpSocket->set_timeout(timeout);
+ rc = tcpSocket->send(buffer, size);
+
+ if ( NSAPI_ERROR_WOULD_BLOCK == rc)
+ return TIMEOUT;
+ else
+ return rc;
+}
+
+int MQTTThreadedClient::readPacketLength(int* value)
+{
+ int rc = MQTTPACKET_READ_ERROR;
+ unsigned char c;
+ int multiplier = 1;
+ int len = 0;
+ const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
+
+ *value = 0;
+ do
+ {
+ if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
+ {
+ rc = MQTTPACKET_READ_ERROR; /* bad data */
+ goto exit;
+ }
+
+ rc = readBytesToBuffer((char *) &c, 1, DEFAULT_SOCKET_TIMEOUT);
+ if (rc != 1)
+ {
+ rc = MQTTPACKET_READ_ERROR;
+ goto exit;
+ }
+
+ *value += (c & 127) * multiplier;
+ multiplier *= 128;
+ } while ((c & 128) != 0);
+
+ rc = MQTTPACKET_READ_COMPLETE;
+
+exit:
+ if (rc == MQTTPACKET_READ_ERROR )
+ len = -1;
+
+ return len;
+}
+
+int MQTTThreadedClient::sendPacket(size_t length)
+{
+ int rc = FAILURE;
+ int sent = 0;
+
+ while (sent < length)
+ {
+ rc = sendBytesFromBuffer((char *) &sendbuf[sent], length - sent, DEFAULT_SOCKET_TIMEOUT);
+ if (rc < 0) // there was an error writing the data
+ break;
+ sent += rc;
+ }
+
+ if (sent == length)
+ rc = SUCCESS;
+ else
+ rc = FAILURE;
+
+ return rc;
+}
+/**
+ * Reads the entire packet to readbuf and returns
+ * the type of packet when successful, otherwise
+ * a negative error code is returned.
+ **/
+int MQTTThreadedClient::readPacket()
+{
+ int rc = FAILURE;
+ MQTTHeader header = {0};
+ int len = 0;
+ int rem_len = 0;
+
+ /* 1. read the header byte. This has the packet type in it */
+ if ( (rc = readBytesToBuffer((char *) &readbuf[0], 1, DEFAULT_SOCKET_TIMEOUT)) != 1)
+ goto exit;
+
+ len = 1;
+ /* 2. read the remaining length. This is variable in itself */
+ if ( readPacketLength(&rem_len) < 0 )
+ goto exit;
+
+ len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */
+
+ if (rem_len > (MAX_MQTT_PACKET_SIZE - len))
+ {
+ rc = BUFFER_OVERFLOW;
+ goto exit;
+ }
+
+ /* 3. read the rest of the buffer using a callback to supply the rest of the data */
+ if (rem_len > 0 && (readBytesToBuffer((char *) (readbuf + len), rem_len, DEFAULT_SOCKET_TIMEOUT) != rem_len))
+ goto exit;
+
+ // Convert the header to type
+ // and update rc
+ header.byte = readbuf[0];
+ rc = header.bits.type;
+
+exit:
+
+ return rc;
+}
+
+/**
+ * Read until a specified packet type is received, or untill the specified
+ * timeout dropping packets along the way.
+ **/
+int MQTTThreadedClient::readUntil(int packetType, int timeout)
+{
+ int pType = FAILURE;
+ Timer timer;
+
+ timer.start();
+ do {
+ pType = readPacket();
+ if (pType < 0)
+ break;
+
+ if (timer.read_ms() > timeout)
+ {
+ pType = FAILURE;
+ break;
+ }
+ }while(pType != packetType);
+
+ return pType;
+}
+
+
+int MQTTThreadedClient::connect(MQTTPacket_connectData& options)
+{
+ int rc = FAILURE;
+ int len = 0;
+
+ if (isConnected)
+ {
+ printf("Session already connected! \r\n");
+ return rc;
+ }
+
+ // Copy the keepAliveInterval value to local
+ // MQTT specifies in seconds, we have to multiply that
+ // amount for our 32 bit timers which accepts ms.
+ keepAliveInterval = (options.keepAliveInterval * 1000);
+
+ printf("Connecting with: \r\n");
+ printf("\tUsername: [%s]\r\n", options.username.cstring);
+ printf("\tPassword: [%s]\r\n", options.password.cstring);
+
+ if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0)
+ {
+ printf("Error serializing connect packet ...\r\n");
+ return rc;
+ }
+ if ((rc = sendPacket((size_t) len)) != SUCCESS) // send the connect packet
+ {
+ printf("Error sending the connect request packet ...\r\n");
+ return rc;
+ }
+
+ // Wait for the CONNACK
+ if (readUntil(CONNACK, COMMAND_TIMEOUT) == CONNACK)
+ {
+ unsigned char connack_rc = 255;
+ bool sessionPresent = false;
+ printf("Connection acknowledgement received ... deserializing respones ...\r\n");
+ if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
+ rc = connack_rc;
+ else
+ rc = FAILURE;
+ }
+ else
+ rc = FAILURE;
+
+ if (rc == SUCCESS)
+ {
+ printf("Connected!!! ... starting connection timers ...\r\n");
+ isConnected = true;
+ resetConnectionTimer();
+ }else
+ {
+ // TODO: Call socket->disconnect()?
+ }
+
+ printf("Returning with rc = %d\r\n", rc);
+
+ return rc;
+}
+
+int MQTTThreadedClient::connect(const char * host, uint16_t port, MQTTPacket_connectData & options)
+{
+ int ret;
+
+ tcpSocket->open(network);
+ if (( ret = tcpSocket->connect(host, port)) < 0 )
+ {
+
+ printf("Error connecting to %s:%d with %d\r\n", host, port, ret);
+ return ret;
+ }
+
+ return connect(options);
+}
+
+int MQTTThreadedClient::publish(PubMessage& msg)
+{
+#if 0
+ int id = queue.call(mbed::callback(this, &MQTTThreadedClient::sendPublish), topic, message);
+ // TODO: handle id values when the function is called later
+ if (id == 0)
+ return FAILURE;
+ else
+ return SUCCESS;
+#endif
+ PubMessage *message = mpool.alloc();
+ // Simple copy
+ *message = msg;
+
+ // Push the data to the thread
+ printf("Pushing data to consumer thread ...\r\n");
+ mqueue.put(message);
+
+ return SUCCESS;
+}
+
+int MQTTThreadedClient::sendPublish(PubMessage& message)
+{
+ MQTTString topicString = MQTTString_initializer;
+
+ if (!isConnected)
+ {
+ printf("Not connected!!! ...\r\n");
+ return FAILURE;
+ }
+
+ topicString.cstring = (char*) &message.topic[0];
+ int len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, message.qos, false, message.id,
+ topicString, (unsigned char*) &message.payload[0], (int) message.payloadlen);
+ if (len <= 0)
+ {
+ printf("Failed serializing message ...\r\n");
+ return FAILURE;
+ }
+
+ if (sendPacket(len) == SUCCESS)
+ {
+ printf("Successfully sent publish packet to server ...\r\n");
+ return SUCCESS;
+ }
+
+ printf("Failed to send publish packet to server ...\r\n");
+ return FAILURE;
+}
+
+int MQTTThreadedClient::subscribe(const char * topicstr, QoS qos, void (*function)(MessageData &))
+{
+ int rc = FAILURE;
+ int len = 0;
+
+ MQTTString topic = {(char*)topicstr, {0, 0}};
+ printf("Subscribing to topic [%s]\r\n", topicstr);
+
+ if (!isConnected)
+ {
+ printf("Session already connected!!\r\n");
+ return rc;
+ }
+
+ len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
+ if (len <= 0)
+ {
+ printf("Error serializing subscribe packet ...\r\n");
+ return rc;
+ }
+
+ if ((rc = sendPacket(len)) != SUCCESS)
+ {
+ printf("Error sending subscribe packet [%d]\r\n", rc);
+ return rc;
+ }
+
+ printf("Waiting for subscription ack ...\r\n");
+ // Wait for SUBACK, dropping packets read along the way ...
+ if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) // wait for suback
+ {
+ int count = 0, grantedQoS = -1;
+ unsigned short mypacketid;
+ if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
+ rc = grantedQoS; // 0, 1, 2 or 0x80
+ // For as long as we do not get 0x80 ..
+ if (rc != 0x80)
+ {
+ // Add message handlers to the map
+ FP<void,MessageData &> fp;
+ fp.attach(function);
+
+ topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));
+
+ // Reset connection timers here ...
+ resetConnectionTimer();
+
+ printf("Successfully subscribed to %s ...\r\n", topicstr);
+ rc = SUCCESS;
+ }else
+ {
+ printf("Failed to subscribe to topic %s ... (not authorized?)\r\n", topicstr);
+ }
+ }
+ else
+ {
+ printf("Failed to subscribe to topic %s (ack not received) ...\r\n", topicstr);
+ rc = FAILURE;
+ }
+
+ return rc;
+
+}
+
+
+bool MQTTThreadedClient::isTopicMatched(char* topicFilter, MQTTString& topicName)
+{
+ char* curf = topicFilter;
+ char* curn = topicName.lenstring.data;
+ char* curn_end = curn + topicName.lenstring.len;
+
+ while (*curf && curn < curn_end)
+ {
+ if (*curn == '/' && *curf != '/')
+ break;
+ if (*curf != '+' && *curf != '#' && *curf != *curn)
+ break;
+ if (*curf == '+')
+ { // skip until we meet the next separator, or end of string
+ char* nextpos = curn + 1;
+ while (nextpos < curn_end && *nextpos != '/')
+ nextpos = ++curn + 1;
+ }
+ else if (*curf == '#')
+ curn = curn_end - 1; // skip until end of string
+ curf++;
+ curn++;
+ };
+
+ return (curn == curn_end) && (*curf == '\0');
+}
+
+int MQTTThreadedClient::handlePublishMsg()
+{
+ MQTTString topicName = MQTTString_initializer;
+ Message msg;
+ int intQoS;
+ printf("Deserializing publish message ...\r\n");
+ if (MQTTDeserialize_publish((unsigned char*)&msg.dup,
+ &intQoS,
+ (unsigned char*)&msg.retained,
+ (unsigned short*)&msg.id,
+ &topicName,
+ (unsigned char**)&msg.payload,
+ (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
+ {
+ printf("Error deserializing published message ...\r\n");
+ return -1;
+ }
+
+ std::string topic;
+ if (topicName.lenstring.len > 0)
+ {
+ topic = std::string((const char *) topicName.lenstring.data, (size_t) topicName.lenstring.len);
+ }else
+ topic = (const char *) topicName.cstring;
+
+ printf("Got message for topic [%s], QoS [%d] ...\r\n", topic.c_str(), intQoS);
+
+ msg.qos = (QoS) intQoS;
+
+
+ // Call the handlers for each topic
+ if (topicCBMap.find(topic) != topicCBMap.end())
+ {
+ // Call the callback function
+ if (topicCBMap[topic].attached())
+ {
+ printf("Invoking function handler for topic ...\r\n");
+ MessageData md(topicName, msg);
+ topicCBMap[topic](md);
+
+ return 1;
+ }
+ }
+
+ // TODO: depending on the QoS
+ // we send data to the server = PUBACK or PUBREC
+ switch(intQoS)
+ {
+ case QOS0:
+ // We send back nothing ...
+ break;
+ case QOS1:
+ // TODO: implement
+ break;
+ case QOS2:
+ // TODO: implement
+ break;
+ default:
+ break;
+ }
+
+ return 0;
+}
+
+void MQTTThreadedClient::resetConnectionTimer()
+{
+ if (keepAliveInterval > 0)
+ {
+ comTimer.reset();
+ comTimer.start();
+ }
+}
+
+bool MQTTThreadedClient::hasConnectionTimedOut()
+{
+ if (keepAliveInterval > 0 ) {
+ // Check connection timer
+ if (comTimer.read_ms() > keepAliveInterval)
+ return true;
+ else
+ return false;
+ }
+
+ return false;
+}
+
+void MQTTThreadedClient::sendPingRequest()
+{
+ int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
+ if (len > 0 && (sendPacket(len) == SUCCESS)) // send the ping packet
+ {
+ printf("Ping request sent successfully ...\r\n");
+ }
+}
+
+void MQTTThreadedClient::startListener()
+{
+ int pType;
+ // Continuesly listens for packets and dispatch
+ // message handlers ...
+ while(true)
+ {
+ pType = readPacket();
+ switch(pType)
+ {
+ case TIMEOUT:
+ // No data available from the network ...
+ break;
+ case FAILURE:
+ case BUFFER_OVERFLOW:
+ {
+ // TODO: Network error, do we disconnect and reconnect?
+ printf("Failure or buffer overflow problem ... \r\n");
+ MBED_ASSERT(false);
+ }
+ break;
+ /**
+ * The rest of the return codes below (all positive) is about MQTT
+ * response codes
+ **/
+ case CONNACK:
+ case PUBACK:
+ case SUBACK:
+ break;
+ case PUBLISH:
+ {
+ printf("Publish received!....\r\n");
+ // We receive data from the MQTT server ..
+ if (handlePublishMsg() < 0)
+ {
+ printf("Error handling PUBLISH message ... \r\n");
+ break;
+ }
+ }
+ break;
+ case PINGRESP:
+ {
+ printf("Got ping response ...\r\n");
+ resetConnectionTimer();
+ }
+ break;
+ default:
+ printf("Unknown/Not handled message from server pType[%d]\r\n", pType);
+ }
+
+ // Check if its time to send a keepAlive packet
+ if (hasConnectionTimedOut())
+ {
+ // Queue the ping request so that other
+ // pending operations queued above will go first
+ queue.call(this, &MQTTThreadedClient::sendPingRequest);
+ }
+
+ // Check if we have messages on the message queue
+ osEvent evt = mqueue.get(10);
+ if (evt.status == osEventMessage) {
+
+ printf("Got message to publish! ... \r\n");
+
+ // Unpack the message
+ PubMessage * message = (PubMessage *)evt.value.p;
+
+ // Send the packet, do not queue the call
+ // like the ping above ..
+ if ( sendPublish(*message) == SUCCESS)
+ // Reset timers if we have been able to send successfully
+ resetConnectionTimer();
+
+ // Free the message from mempool after using
+ mpool.free(message);
+ }
+
+ // Dispatch any queued events ...
+ queue.dispatch(100);
+ }
+
+}
+
\ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/MQTTThreadedClient.h Sun Mar 26 04:35:46 2017 +0000
@@ -0,0 +1,181 @@
+#ifndef _MQTT_THREADED_CLIENT_H_
+#define _MQTT_THREADED_CLIENT_H_
+
+#include "mbed.h"
+#include "rtos.h"
+#include "MQTTPacket.h"
+#include "NetworkInterface.h"
+#include "FP.h"
+
+#include <cstdio>
+#include <string>
+#include <map>
+
+#define COMMAND_TIMEOUT 5000
+#define DEFAULT_SOCKET_TIMEOUT 1000
+#define MAX_MQTT_PACKET_SIZE 200
+#define MAX_MQTT_PAYLOAD_SIZE 100
+
+typedef enum { QOS0, QOS1, QOS2 } QoS;
+
+// all failure return codes must be negative
+typedef enum { BUFFER_OVERFLOW = -3, TIMEOUT = -2, FAILURE = -1, SUCCESS = 0 } returnCode;
+
+
+typedef struct
+{
+ QoS qos;
+ bool retained;
+ bool dup;
+ unsigned short id;
+ void *payload;
+ size_t payloadlen;
+}Message, *pMessage;
+
+// TODO:
+// Merge this struct with the one above, in order to use the same
+// data structure for sending and receiving. I need to simplify
+// the PubMessage to not contain pointers like the one above.
+typedef struct
+{
+ char topic[100];
+ QoS qos;
+ unsigned short id;
+ size_t payloadlen;
+ char payload[MAX_MQTT_PAYLOAD_SIZE];
+}PubMessage, *pPubMessage;
+
+struct MessageData
+{
+ MessageData(MQTTString &aTopicName, Message &aMessage) : message(aMessage), topicName(aTopicName)
+ { }
+ Message &message;
+ MQTTString &topicName;
+};
+
+class PacketId
+{
+public:
+ PacketId()
+ {
+ next = 0;
+ }
+
+ int getNext()
+ {
+ return next = (next == MAX_PACKET_ID) ? 1 : ++next;
+ }
+
+private:
+ static const int MAX_PACKET_ID = 65535;
+ int next;
+};
+
+
+
+class MQTTThreadedClient
+{
+public:
+ MQTTThreadedClient(NetworkInterface * aNetwork)
+ : network(aNetwork),
+ queue(32 * EVENTS_EVENT_SIZE),
+ isConnected(false) {
+ tcpSocket = new TCPSocket();
+ }
+
+ int connect(const char * host, uint16_t port, MQTTPacket_connectData & options);
+ int publish(PubMessage& message);
+ int subscribe(const char * topic, QoS qos, void (*function)(MessageData &));
+
+ template<typename T>
+ int subscribe(const char * topicstr, QoS qos, T *object, void (T::*member)(MessageData &)) {
+ int rc = FAILURE;
+ int len = 0;
+
+ MQTTString topic = {(char*)topicstr, {0, 0}};
+
+ if (!isConnected) {
+ printf("Session already connected!!\r\n");
+ return rc;
+ }
+
+ len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
+ if (len <= 0) {
+ printf("Error serializing subscribe packet ...\r\n");
+ return rc;
+ }
+
+ if ((rc = sendPacket(len)) != SUCCESS) {
+ printf("Error sending subscribe packet [%d]\r\n", rc);
+ return rc;
+ }
+
+ // Wait for SUBACK, dropping packets read along the way ...
+ if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback
+ int count = 0, grantedQoS = -1;
+ unsigned short mypacketid;
+ if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
+ rc = grantedQoS; // 0, 1, 2 or 0x80
+ // For as long as we do not get 0x80 ..
+ if (rc != 0x80) {
+ // Add message handlers to the map
+ FP<void,MessageData &> fp;
+ fp.attach(object, member);
+
+ topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));
+
+ // Reset connection timers here ...
+ resetConnectionTimer();
+
+ rc = SUCCESS;
+ }
+ } else
+ rc = FAILURE;
+
+ return rc;
+ }
+
+ // the listener thread ...
+ void startListener();
+
+protected:
+ int connect(MQTTPacket_connectData& options);
+ int handlePublishMsg();
+
+
+private:
+ NetworkInterface * network;
+ TCPSocket * tcpSocket;
+ PacketId packetid;
+ // Event queue
+ EventQueue queue;
+ bool isConnected;
+
+ // TODO: Because I'm using a map, I can only have one handler
+ // for each topic (one that's mapped to the topic string).
+ // Attaching another handler on the same topic is not possible.
+ // In the future, use a vector instead of maps to allow multiple
+ // handlers for the same topic.
+ std::map<std::string, FP<void, MessageData &> > topicCBMap;
+
+ unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
+ unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
+
+ unsigned int keepAliveInterval;
+ Timer comTimer;
+
+ int readPacket();
+ int sendPacket(size_t length);
+ int readPacketLength(int* value);
+ int readUntil(int packetType, int timeout);
+ int readBytesToBuffer(char * buffer, size_t size, int timeout);
+ int sendBytesFromBuffer(char * buffer, size_t size, int timeout);
+ bool isTopicMatched(char* topic, MQTTString& topicName);
+ int sendPublish(PubMessage& message);
+ void resetConnectionTimer();
+ void sendPingRequest();
+ bool hasConnectionTimedOut();
+
+};
+
+#endif
--- a/main.cpp Tue Mar 21 12:57:07 2017 +0000
+++ b/main.cpp Sun Mar 26 04:35:46 2017 +0000
@@ -29,54 +29,53 @@
#include "mbed.h"
#include "rtos.h"
#include "easy-connect.h"
-#include "MQTTLogging.h"
-#include "MQTTNetwork.h"
-#include "MQTTmbed.h"
-#include "MQTTClient.h"
+#include "MQTTThreadedClient.h"
+
Serial pc(USBTX, USBRX, 115200);
+Thread msgSender;
+
+static const char * clientID = "mbed-sample";
+static const char * userID = "mbedhacks";
+static const char * password = "qwer123";
+static const char * topic_1 = "mbed-sample";
+static const char * topic_2 = "test";
+
int arrivedcount = 0;
-Thread msgSender;
-
-static MemoryPool<MQTT::Message, 16> pool;
-static Queue<MQTT::Message, 16> queue;
-
-void messageArrived(MQTT::MessageData& md)
+void messageArrived(MessageData& md)
{
- MQTT::Message &message = md.message;
- printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
+ Message &message = md.message;
+ printf("Arrived Callback 1 : qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
printf("Payload [%.*s]\r\n", message.payloadlen, (char*)message.payload);
++arrivedcount;
}
-void SendDataThread()
+class CallbackTest
{
- unsigned int i;
- while(true)
+ public:
+
+ CallbackTest()
+ : arrivedcount(0)
+ {}
+
+ void messageArrived(MessageData& md)
{
- MQTT::Message * message = pool.alloc();
- char * buff = new char[sizeof(char) * 100];
-
- sprintf(buff, "message test %d", i);
- message->qos = MQTT::QOS0;
- message->retained = false;
- message->dup = false;
- message->payload = (void*)buff;
- message->payloadlen = strlen(buff)+1;
-
- // publish the message to mqtt
- queue.put(message);
- i++;
-
- Thread::wait(2000);
- }
-}
+ Message &message = md.message;
+ printf("Arrived Callback 1 : qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
+ printf("Payload [%.*s]\r\n", message.payloadlen, (char*)message.payload);
+ ++arrivedcount;
+ }
+
+ private:
+
+ int arrivedcount;
+};
int main(int argc, char* argv[])
{
float version = 0.6;
- char* topic = "mbed-sample";
+ CallbackTest testcb;
printf("HelloMQTT: version is %.2f\r\n", version);
@@ -85,74 +84,49 @@
return -1;
}
- MQTTNetwork mqttNetwork(network);
+ MQTTThreadedClient mqtt(network);
- MQTT::Client<MQTTNetwork, Countdown> client = MQTT::Client<MQTTNetwork, Countdown>(mqttNetwork);
-
- const char* hostname = "m2m.eclipse.org";
+ const char* hostname = "mqtt.mbedhacks.com";
+ // const char* hostname = "192.168.0.7";
int port = 1883;
printf("Connecting to %s:%d\r\n", hostname, port);
- int rc = mqttNetwork.connect(hostname, port);
- if (rc != 0)
- printf("rc from TCP connect is %d\r\n", rc);
+
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
data.MQTTVersion = 3;
- data.clientID.cstring = "mbed-sample";
- data.username.cstring = "testuser";
- data.password.cstring = "testpassword";
- if ((rc = client.connect(data)) != 0)
- printf("rc from MQTT connect is %d\r\n", rc);
+ data.clientID.cstring = (char *) clientID;
+ data.username.cstring = (char *) userID;
+ data.password.cstring = (char *) password;
+ data.keepAliveInterval = 100; // default is 60
- if ((rc = client.subscribe(topic, MQTT::QOS0, messageArrived)) != 0)
- printf("rc from MQTT subscribe is %d\r\n", rc);
+ int rc = mqtt.connect(hostname, port, data);
+ if (rc != 0)
+ printf("rc from TCP connect is %d\r\n", rc);
+
+ if ((rc = mqtt.subscribe(topic_1, QOS0, messageArrived)) != 0)
+ printf("rc from MQTT subscribe 1 is %d\r\n", rc);
+
+ if ((rc = mqtt.subscribe(topic_2, QOS0, &testcb, &CallbackTest::messageArrived)) != 0)
+ printf("rc from MQTT subscribe 2 is %d\r\b", rc);
// Start the data producer
- msgSender.start(SendDataThread);
+ msgSender.start(mbed::callback(&mqtt, &MQTTThreadedClient::startListener));
+ int i = 0;
while(true)
{
- osEvent evt = queue.get(10);
- if (evt.status == osEventMessage) {
- //printf("Message arrived from main thread ...\r\n");
- // Unpack the message
- MQTT::Message * message = (MQTT::Message *)evt.value.p;
-
- printf("Publishing message to MQTT ...\r\n");
- // Push to mqtt
- rc = client.publish(topic, *message);
- if (rc < 0)
- printf("Error sending mqtt message rc = %d\r\n", rc);
- else
- printf("Message published ...\r\n");
-
- //printf("Deleting payload ...\r\n");
- // Delete payload
- delete [] message->payload;
-
- //printf("Deleting pool allocation ...\r\n");
- // Don't forget this!
- pool.free(message);
- }
+ PubMessage message;
+ message.qos = QOS0;
+ message.id = 123;
- //printf("MQTT client yield ...\r\n");
- if (client.yield(100) != MQTT::SUCCESS)
- {
- printf("Yield error, client disconnected? ...\r\n");
- break;
- }
- //printf("MQTT client yield successful ...\r\n");
+ strcpy(&message.topic[0], topic_1);
+ sprintf(&message.payload[0], "Testing %d", i);
+ message.payloadlen = strlen((const char *) &message.payload[0]);
+ mqtt.publish(message);
+
+ i++;
+ //TODO: Nothing here yet ...
+ Thread::wait(1000);
}
- if ((rc = client.unsubscribe(topic)) != 0)
- printf("rc from unsubscribe was %d\r\n", rc);
-
- if ((rc = client.disconnect()) != 0)
- printf("rc from disconnect was %d\r\n", rc);
-
- mqttNetwork.disconnect();
-
- printf("Version %.2f: finish %d msgs\r\n", version, arrivedcount);
-
- return 0;
}
