A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.

Dependencies:   FP MQTTPacket

Fork of HelloMQTT by MQTT

Revision:
26:4b21de8043a5
Parent:
25:326f00faa092
Child:
27:c90092f35d79
--- a/MQTTThreadedClient.h	Mon Mar 27 03:53:18 2017 +0000
+++ b/MQTTThreadedClient.h	Mon Mar 27 13:45:26 2017 +0000
@@ -80,8 +80,11 @@
     MQTTThreadedClient(NetworkInterface * aNetwork, const char * pem = NULL)
         : network(aNetwork),
           ssl_ca_pem(pem),
+          port((pem != NULL) ? 8883 : 1883),
           queue(32 * EVENTS_EVENT_SIZE),
-          isConnected(false) 
+          isConnected(false),          
+          hasSavedSession(false),
+          useTLS(pem != NULL)
     {
         DRBG_PERS = "mbed TLS MQTT client";
         tcpSocket = new TCPSocket();
@@ -98,10 +101,7 @@
                 
     }
 
-
-    int connect(const char * host, uint16_t port, MQTTPacket_connectData & options);
-    void disconnect();
-    
+    void setConnectionParameters(const char * host, uint16_t port, MQTTPacket_connectData & options);
     int publish(PubMessage& message);
     
     void addTopicHandler(const char * topic, void (*function)(MessageData &));
@@ -111,68 +111,20 @@
         FP<void,MessageData &> fp;
         fp.attach(object, member);
 
-        topicCBMap_t.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));        
+        topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topic),fp));  
     }
     
-    int subscribe(const char * topic, QoS qos, void (*function)(MessageData &));
-    template<typename T>
-    int subscribe(const char * topicstr, QoS qos, T *object, void (T::*member)(MessageData &)) {
-        int rc = FAILURE;
-        int len = 0;
-
-        MQTTString topic = {(char*)topicstr, {0, 0}};
-        printf("Subscribing to topic [%s]\r\n", topicstr);
-        
-        if (!isConnected) {
-            printf("Session already connected!!\r\n");
-            return rc;
-        }
-
-        len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
-        if (len <= 0) {
-            printf("Error serializing subscribe packet ...\r\n");
-            return rc;
-        }
-
-        if ((rc = sendPacket(len)) != SUCCESS) {
-            printf("Error sending subscribe packet [%d]\r\n", rc);
-            return rc;
-        }
-        printf("Waiting for subscription ack ...\r\n");
-        // Wait for SUBACK, dropping packets read along the way ...
-        if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback
-            int count = 0, grantedQoS = -1;
-            unsigned short mypacketid;
-            if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
-                rc = grantedQoS; // 0, 1, 2 or 0x80
-            // For as long as we do not get 0x80 ..
-            if (rc != 0x80) {
-                // Add message handlers to the map
-                FP<void,MessageData &> fp;
-                fp.attach(object, member);
-
-                topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));
-
-                // Reset connection timers here ...
-                resetConnectionTimer();
-                printf("Successfully subscribed to %s ...\r\n", topicstr);
-                rc = SUCCESS;
-            }
-        } else
-        {
-            printf("Failed to subscribe to topic %s (ack not received) ...\r\n", topicstr);
-            rc = FAILURE;
-        }
-
-        return rc;
-    }
-
-    // the listener thread ...
+    // Start the listener thread and start polling 
+    // MQTT server.
     void startListener();
+    // Stop the listerner thread and closes connection
+    void stopListener();
 
 protected:
-    int connect(MQTTPacket_connectData& options);
+
     int handlePublishMsg();
+    void disconnect();  
+    int connect();      
 
 
 private:
@@ -186,11 +138,10 @@
     std::string host;
     uint16_t port;
     MQTTPacket_connectData connect_options;
-
-    
     // Event queue
     EventQueue queue;
     bool isConnected;
+    bool hasSavedSession;    
 
     // TODO: Because I'm using a map, I can only have one handler
     // for each topic (one that's mapped to the topic string).
@@ -198,7 +149,6 @@
     // In the future, use a vector instead of maps to allow multiple
     // handlers for the same topic.
     std::map<std::string, FP<void, MessageData &> > topicCBMap;
-    std::map<std::string, FP<void, MessageData &> > topicCBMap_t;
     
     unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
     unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
@@ -207,6 +157,7 @@
     Timer comTimer;
 
     // SSL/TLS functions
+    bool useTLS;
     void setupTLS();
     int initTLS();    
     void freeTLS();
@@ -224,7 +175,7 @@
     void resetConnectionTimer();
     void sendPingRequest();
     bool hasConnectionTimedOut();
-
+    int login();
 };
 
 #endif