A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.

Dependencies:   FP MQTTPacket

Fork of HelloMQTT by MQTT

Revision:
25:326f00faa092
Parent:
23:06fac173529e
Child:
26:4b21de8043a5
--- a/MQTTThreadedClient.h	Sun Mar 26 04:38:36 2017 +0000
+++ b/MQTTThreadedClient.h	Mon Mar 27 03:53:18 2017 +0000
@@ -7,6 +7,7 @@
 #include "NetworkInterface.h"
 #include "FP.h"
 
+
 #include <cstdio>
 #include <string>
 #include <map>
@@ -76,24 +77,52 @@
 class MQTTThreadedClient
 {
 public:
-    MQTTThreadedClient(NetworkInterface * aNetwork)
+    MQTTThreadedClient(NetworkInterface * aNetwork, const char * pem = NULL)
         : network(aNetwork),
+          ssl_ca_pem(pem),
           queue(32 * EVENTS_EVENT_SIZE),
-          isConnected(false) {
+          isConnected(false) 
+    {
+        DRBG_PERS = "mbed TLS MQTT client";
         tcpSocket = new TCPSocket();
+        setupTLS();
+    }
+    
+    ~MQTTThreadedClient()
+    {
+        // TODO: signal the thread to shutdown
+        freeTLS();
+           
+        if (isConnected)
+            disconnect();
+                
     }
 
+
     int connect(const char * host, uint16_t port, MQTTPacket_connectData & options);
+    void disconnect();
+    
     int publish(PubMessage& message);
+    
+    void addTopicHandler(const char * topic, void (*function)(MessageData &));
+    template<typename T>
+    void addTopicHandler(const char * topic, T *object, void (T::*member)(MessageData &))
+    {
+        FP<void,MessageData &> fp;
+        fp.attach(object, member);
+
+        topicCBMap_t.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));        
+    }
+    
     int subscribe(const char * topic, QoS qos, void (*function)(MessageData &));
-    
     template<typename T>
     int subscribe(const char * topicstr, QoS qos, T *object, void (T::*member)(MessageData &)) {
         int rc = FAILURE;
         int len = 0;
 
         MQTTString topic = {(char*)topicstr, {0, 0}};
-
+        printf("Subscribing to topic [%s]\r\n", topicstr);
+        
         if (!isConnected) {
             printf("Session already connected!!\r\n");
             return rc;
@@ -109,7 +138,7 @@
             printf("Error sending subscribe packet [%d]\r\n", rc);
             return rc;
         }
-
+        printf("Waiting for subscription ack ...\r\n");
         // Wait for SUBACK, dropping packets read along the way ...
         if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback
             int count = 0, grantedQoS = -1;
@@ -126,11 +155,14 @@
 
                 // Reset connection timers here ...
                 resetConnectionTimer();
-
+                printf("Successfully subscribed to %s ...\r\n", topicstr);
                 rc = SUCCESS;
             }
         } else
+        {
+            printf("Failed to subscribe to topic %s (ack not received) ...\r\n", topicstr);
             rc = FAILURE;
+        }
 
         return rc;
     }
@@ -145,8 +177,17 @@
 
 private:
     NetworkInterface * network;
+    const char * ssl_ca_pem;
     TCPSocket * tcpSocket;
     PacketId packetid;
+    const char *DRBG_PERS;
+    nsapi_error_t _error;    
+    // Connection options
+    std::string host;
+    uint16_t port;
+    MQTTPacket_connectData connect_options;
+
+    
     // Event queue
     EventQueue queue;
     bool isConnected;
@@ -157,13 +198,21 @@
     // In the future, use a vector instead of maps to allow multiple
     // handlers for the same topic.
     std::map<std::string, FP<void, MessageData &> > topicCBMap;
-
+    std::map<std::string, FP<void, MessageData &> > topicCBMap_t;
+    
     unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
     unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
 
     unsigned int keepAliveInterval;
     Timer comTimer;
 
+    // SSL/TLS functions
+    void setupTLS();
+    int initTLS();    
+    void freeTLS();
+    int doTLSHandshake();
+    
+    int processSubscriptions();
     int readPacket();
     int sendPacket(size_t length);
     int readPacketLength(int* value);