A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.

Dependencies:   FP MQTTPacket

Fork of HelloMQTT by MQTT

Revision:
23:06fac173529e
Child:
25:326f00faa092
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/MQTTThreadedClient.h	Sun Mar 26 04:35:46 2017 +0000
@@ -0,0 +1,181 @@
+#ifndef _MQTT_THREADED_CLIENT_H_
+#define _MQTT_THREADED_CLIENT_H_
+
+#include "mbed.h"
+#include "rtos.h"
+#include "MQTTPacket.h"
+#include "NetworkInterface.h"
+#include "FP.h"
+
+#include <cstdio>
+#include <string>
+#include <map>
+
+#define COMMAND_TIMEOUT 5000
+#define DEFAULT_SOCKET_TIMEOUT 1000
+#define MAX_MQTT_PACKET_SIZE 200
+#define MAX_MQTT_PAYLOAD_SIZE 100
+
+typedef enum { QOS0, QOS1, QOS2 } QoS;
+
+// all failure return codes must be negative
+typedef enum { BUFFER_OVERFLOW = -3, TIMEOUT = -2, FAILURE = -1, SUCCESS = 0 } returnCode;
+
+
+typedef struct
+{
+    QoS qos;
+    bool retained;
+    bool dup;
+    unsigned short id;
+    void *payload;
+    size_t payloadlen;
+}Message, *pMessage;
+
+// TODO:
+// Merge this struct with the one above, in order to use the same
+// data structure for sending and receiving. I need to simplify
+// the PubMessage to not contain pointers like the one above.
+typedef struct
+{
+    char topic[100];
+    QoS qos;
+    unsigned short id;
+    size_t payloadlen;
+    char payload[MAX_MQTT_PAYLOAD_SIZE];
+}PubMessage, *pPubMessage;
+
+struct MessageData
+{
+    MessageData(MQTTString &aTopicName, Message &aMessage)  : message(aMessage), topicName(aTopicName)
+    { }
+    Message &message;
+    MQTTString &topicName;
+};
+
+class PacketId
+{
+public:
+    PacketId()
+    {
+        next = 0;
+    }
+
+    int getNext()
+    {
+        return next = (next == MAX_PACKET_ID) ? 1 : ++next;
+    }
+
+private:
+    static const int MAX_PACKET_ID = 65535;
+    int next;
+};
+
+
+
+class MQTTThreadedClient
+{
+public:
+    MQTTThreadedClient(NetworkInterface * aNetwork)
+        : network(aNetwork),
+          queue(32 * EVENTS_EVENT_SIZE),
+          isConnected(false) {
+        tcpSocket = new TCPSocket();
+    }
+
+    int connect(const char * host, uint16_t port, MQTTPacket_connectData & options);
+    int publish(PubMessage& message);
+    int subscribe(const char * topic, QoS qos, void (*function)(MessageData &));
+    
+    template<typename T>
+    int subscribe(const char * topicstr, QoS qos, T *object, void (T::*member)(MessageData &)) {
+        int rc = FAILURE;
+        int len = 0;
+
+        MQTTString topic = {(char*)topicstr, {0, 0}};
+
+        if (!isConnected) {
+            printf("Session already connected!!\r\n");
+            return rc;
+        }
+
+        len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
+        if (len <= 0) {
+            printf("Error serializing subscribe packet ...\r\n");
+            return rc;
+        }
+
+        if ((rc = sendPacket(len)) != SUCCESS) {
+            printf("Error sending subscribe packet [%d]\r\n", rc);
+            return rc;
+        }
+
+        // Wait for SUBACK, dropping packets read along the way ...
+        if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback
+            int count = 0, grantedQoS = -1;
+            unsigned short mypacketid;
+            if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
+                rc = grantedQoS; // 0, 1, 2 or 0x80
+            // For as long as we do not get 0x80 ..
+            if (rc != 0x80) {
+                // Add message handlers to the map
+                FP<void,MessageData &> fp;
+                fp.attach(object, member);
+
+                topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));
+
+                // Reset connection timers here ...
+                resetConnectionTimer();
+
+                rc = SUCCESS;
+            }
+        } else
+            rc = FAILURE;
+
+        return rc;
+    }
+
+    // the listener thread ...
+    void startListener();
+
+protected:
+    int connect(MQTTPacket_connectData& options);
+    int handlePublishMsg();
+
+
+private:
+    NetworkInterface * network;
+    TCPSocket * tcpSocket;
+    PacketId packetid;
+    // Event queue
+    EventQueue queue;
+    bool isConnected;
+
+    // TODO: Because I'm using a map, I can only have one handler
+    // for each topic (one that's mapped to the topic string).
+    // Attaching another handler on the same topic is not possible.
+    // In the future, use a vector instead of maps to allow multiple
+    // handlers for the same topic.
+    std::map<std::string, FP<void, MessageData &> > topicCBMap;
+
+    unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
+    unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
+
+    unsigned int keepAliveInterval;
+    Timer comTimer;
+
+    int readPacket();
+    int sendPacket(size_t length);
+    int readPacketLength(int* value);
+    int readUntil(int packetType, int timeout);
+    int readBytesToBuffer(char * buffer, size_t size, int timeout);
+    int sendBytesFromBuffer(char * buffer, size_t size, int timeout);
+    bool isTopicMatched(char* topic, MQTTString& topicName);
+    int  sendPublish(PubMessage& message);
+    void resetConnectionTimer();
+    void sendPingRequest();
+    bool hasConnectionTimedOut();
+
+};
+
+#endif