A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.

Dependencies:   FP MQTTPacket

Fork of HelloMQTT by MQTT

Files at this revision

API Documentation at this revision

Comitter:
vpcola
Date:
Mon Mar 27 13:45:26 2017 +0000
Parent:
25:326f00faa092
Child:
27:c90092f35d79
Commit message:
TLS now working

Changed in this revision

MQTTNetwork.h Show diff for this revision Revisions of this file
MQTTThreadedClient.cpp Show annotated file Show diff for this revision Revisions of this file
MQTTThreadedClient.h Show annotated file Show diff for this revision Revisions of this file
main.cpp Show annotated file Show diff for this revision Revisions of this file
--- a/MQTTNetwork.h	Mon Mar 27 03:53:18 2017 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,43 +0,0 @@
-#ifndef _MQTTNETWORK_H_
-#define _MQTTNETWORK_H_
-
-#include "NetworkInterface.h"
-
-class MQTTNetwork {
-public:
-    MQTTNetwork(NetworkInterface* aNetwork) : network(aNetwork) 
-    {
-        socket = new TCPSocket();
-        //socket->set_blocking(false);
-    }
-
-    ~MQTTNetwork() {
-        delete socket;
-    }
-
-    int read(unsigned char* buffer, int len, int timeout) {
-        socket->set_blocking(false);
-        socket->set_timeout(timeout);
-        return socket->recv(buffer, len);
-    }
-
-    int write(unsigned char* buffer, int len, int timeout) {
-        socket->set_blocking(true);
-        return socket->send(buffer, len);
-    }
-
-    int connect(const char* hostname, int port) {
-        socket->open(network);
-        return socket->connect(hostname, port);
-    }
-
-    void disconnect() {
-
-    }
-
-private:
-    NetworkInterface* network;
-    TCPSocket* socket;
-};
-
-#endif // _MQTTNETWORK_H_
--- a/MQTTThreadedClient.cpp	Mon Mar 27 03:53:18 2017 +0000
+++ b/MQTTThreadedClient.cpp	Mon Mar 27 13:45:26 2017 +0000
@@ -89,12 +89,12 @@
     char *buf = new char[buf_size];
     (void) data;
 
-    if (_debug) mbedtls_printf("\nVerifying certificate at depth %d:\n", depth);
+    if (_debug) mbedtls_printf("\nVerifying certificate at depth %d:\r\n", depth);
     mbedtls_x509_crt_info(buf, buf_size - 1, "  ", crt);
     if (_debug) mbedtls_printf("%s", buf);
 
     if (*flags == 0)
-        if (_debug) mbedtls_printf("No verification issue for this certificate\n");
+        if (_debug) mbedtls_printf("No verification issue for this certificate\r\n");
         else {
             mbedtls_x509_crt_verify_info(buf, buf_size, "  ! ", *flags);
             if (_debug) mbedtls_printf("%s\n", buf);
@@ -108,7 +108,7 @@
 
 void MQTTThreadedClient::setupTLS()    
 {
-        if (ssl_ca_pem != NULL)
+        if (useTLS)
         {
             mbedtls_entropy_init(&_entropy);
             mbedtls_ctr_drbg_init(&_ctr_drbg);
@@ -121,7 +121,7 @@
 
 void MQTTThreadedClient::freeTLS()
 {
-        if (ssl_ca_pem != NULL)
+        if (useTLS)
         {
             mbedtls_entropy_free(&_entropy);
             mbedtls_ctr_drbg_free(&_ctr_drbg);
@@ -140,14 +140,14 @@
         if ((ret = mbedtls_ctr_drbg_seed(&_ctr_drbg, mbedtls_entropy_func, &_entropy,
                           (const unsigned char *) DRBG_PERS,
                           sizeof (DRBG_PERS))) != 0) {
-            printf("error [%d] mbedtls_crt_drbg_init", ret);
+            mbedtls_printf("mbedtls_crt_drbg_init returned [%x]\r\n", ret);
             _error = ret;
             return -1;
         }
         printf("mbedtls_x509_crt_parse ...\r\n");
         if ((ret = mbedtls_x509_crt_parse(&_cacert, (const unsigned char *) ssl_ca_pem,
                            strlen(ssl_ca_pem) + 1)) != 0) {
-            printf("error [%d] mbedtls_x509_crt_parse", ret);
+            mbedtls_printf("mbedtls_x509_crt_parse returned [%x]\r\n", ret);
             _error = ret;
             return -1;
         }
@@ -157,7 +157,7 @@
                         MBEDTLS_SSL_IS_CLIENT,
                         MBEDTLS_SSL_TRANSPORT_STREAM,
                         MBEDTLS_SSL_PRESET_DEFAULT)) != 0) {
-            printf("error [%d] mbedtls_ssl_config_defaults", ret);
+            mbedtls_printf("mbedtls_ssl_config_defaults returned [%x]\r\n", ret);
             _error = ret;
             return -1;
         }
@@ -181,7 +181,7 @@
 
         printf("mbedtls_ssl_setup ...\r\n");         
         if ((ret = mbedtls_ssl_setup(&_ssl, &_ssl_conf)) != 0) {
-            printf("error [%d] mbedtls_ssl_setup", ret);
+            mbedtls_printf("mbedtls_ssl_setup returned [%x]\r\n", ret);
             _error = ret;
             return -1;
         }
@@ -200,17 +200,13 @@
         {
             if (ret != MBEDTLS_ERR_SSL_WANT_READ &&
                 ret != MBEDTLS_ERR_SSL_WANT_WRITE) 
-            {
-                    printf("error [%d] mbedtls_ssl_handshake", ret);
-                    tcpSocket->close();
-                    _error = -1;
-            }
+                    mbedtls_printf("mbedtls_ssl_handshake returned [%x]\r\n", ret);
             else 
             {
                 // do not close the socket if timed out
-                _error = ret;
+                ret = TIMEOUT;
             }
-            return -1;
+            return ret;
         }
 
         /* Handshake done, time to print info */
@@ -241,11 +237,14 @@
         // TODO: Save the session here for reconnect.
         if( ( ret = mbedtls_ssl_get_session( &_ssl, &saved_session ) ) != 0 )
         {
-            printf( "mbedtls_ssl_get_session returned -0x%x\n\n", -ret );
+            mbedtls_printf( "mbedtls_ssl_get_session returned -0x%x\n\n", -ret );
+            hasSavedSession = false;
             return -1;
         }  
         
         printf("Session saved for reconnect ...\r\n");     
+        hasSavedSession = true;
+        
         return 0;
 }
 
@@ -256,7 +255,7 @@
     if (tcpSocket == NULL)
         return -1;
 
-    if (ssl_ca_pem != NULL) 
+    if (useTLS) 
     {
         // Do SSL/TLS read
         rc = mbedtls_ssl_read(&_ssl, (unsigned char *) buffer, size);
@@ -284,7 +283,7 @@
     if (tcpSocket == NULL)
         return -1;
     
-    if (ssl_ca_pem != NULL) {
+    if (useTLS) {
         // Do SSL/TLS write
         rc =  mbedtls_ssl_write(&_ssl, (const unsigned char *) buffer, size);
         if (MBEDTLS_ERR_SSL_WANT_WRITE == rc)
@@ -430,27 +429,27 @@
 }
 
 
-int MQTTThreadedClient::connect(MQTTPacket_connectData& options)
+int MQTTThreadedClient::login()
 {
     int rc = FAILURE;
     int len = 0;
 
-    if (isConnected)
+    if (!isConnected)
     {
-        printf("Session already connected! \r\n");
+        printf("Session not connected! \r\n");
         return rc;
     }
         
     // Copy the keepAliveInterval value to local
     // MQTT specifies in seconds, we have to multiply that
     // amount for our 32 bit timers which accepts ms.
-    keepAliveInterval = (options.keepAliveInterval * 1000);
+    keepAliveInterval = (connect_options.keepAliveInterval * 1000);
     
-    printf("Connecting with: \r\n");
-    printf("\tUsername: [%s]\r\n", options.username.cstring);
-    printf("\tPassword: [%s]\r\n", options.password.cstring);
+    printf("Login with: \r\n");
+    printf("\tUsername: [%s]\r\n", connect_options.username.cstring);
+    printf("\tPassword: [%s]\r\n", connect_options.password.cstring);
     
-    if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0)
+    if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &connect_options)) <= 0)
     {
         printf("Error serializing connect packet ...\r\n");
         return rc;
@@ -478,11 +477,7 @@
     if (rc == SUCCESS)
     {
         printf("Connected!!! ... starting connection timers ...\r\n");
-        isConnected = true;
         resetConnectionTimer();
-    }else
-    {
-        // TODO: Call socket->disconnect()?
     }
     
     printf("Returning with rc = %d\r\n", rc);
@@ -495,24 +490,44 @@
 {
     if (isConnected)
     {
-        // TODO: Send unsubscribe message ...
+        if( useTLS 
+            && ( mbedtls_ssl_session_reset( &_ssl ) != 0 )
+           )
+        {
+            printf( "Session reset returned an error \r\n");
+        }        
         
         isConnected = false;
         tcpSocket->close();      
     }
 }
 
-int MQTTThreadedClient::connect(const char * chost, uint16_t cport, MQTTPacket_connectData & options)
+int MQTTThreadedClient::connect()
 {
     int ret = FAILURE;
+
+    if ((network == NULL) || (tcpSocket == NULL)
+        || host.empty())
+    {
+        printf("Network settings not set! \r\n");
+        return ret;
+    }
     
-    // Copy the settings for reconnection
-    host = chost;
-    port = cport;
-    connect_options = options;
-    
+    if (useTLS) 
+    {
+        if( ( ret = mbedtls_ssl_session_reset( &_ssl ) ) != 0 ) {
+            mbedtls_printf( " failed\n  ! mbedtls_ssl_session_reset returned -0x%x\n\n", -ret );
+            return ret;
+        }
+
+        if ( hasSavedSession && (( ret = mbedtls_ssl_set_session( &_ssl, &saved_session ) ) != 0 )) {
+            mbedtls_printf( " failed\n  ! mbedtls_ssl_conf_session returned %d\n\n", ret );
+            return ret;
+        }
+    }
+        
     tcpSocket->open(network);
-    if (ssl_ca_pem != NULL)
+    if (useTLS)
     {
         printf("mbedtls_ssl_set_hostname ...\r\n");         
         mbedtls_ssl_set_hostname(&_ssl, host.c_str());
@@ -523,18 +538,31 @@
     
     if (( ret = tcpSocket->connect(host.c_str(), port)) < 0 )
     {
-         
          printf("Error connecting to %s:%d with %d\r\n", host.c_str(), port, ret);
          return ret;
-    } 
+    }else
+         isConnected = true;
     
-    if ((ssl_ca_pem != NULL) && (doTLSHandshake() < 0))
+    if (useTLS) 
     {
-         printf("TLS Handshake failed! \r\n");
-         return FAILURE;
+        
+        if (doTLSHandshake() < 0)
+        {
+            printf("TLS Handshake failed! \r\n");
+            return FAILURE;
+        }else
+            printf("TLS Handshake complete!! \r\n");
     }
     
-    return connect(connect_options);
+    return login();
+}
+
+void MQTTThreadedClient::setConnectionParameters(const char * chost, uint16_t cport, MQTTPacket_connectData & options)
+{
+    // Copy the settings for reconnection
+    host = chost;
+    port = cport;
+    connect_options = options;    
 }
 
 int MQTTThreadedClient::publish(PubMessage& msg)
@@ -593,7 +621,7 @@
     FP<void,MessageData &> fp;
     fp.attach(function);
     
-    topicCBMap_t.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));    
+    topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));    
 } 
 
 int MQTTThreadedClient::processSubscriptions()
@@ -606,8 +634,10 @@
             return 0;
     }
     
+    printf("Processing subscribed topics ....\r\n");
+    
     std::map<std::string, FP<void, MessageData &> >::iterator it;
-    for(it = topicCBMap_t.begin(); it != topicCBMap_t.end(); it++) 
+    for(it = topicCBMap.begin(); it != topicCBMap.end(); it++) 
     {
         int rc = FAILURE;
         int len = 0;
@@ -653,71 +683,6 @@
     return numsubscribed;    
 }
 
-int MQTTThreadedClient::subscribe(const char * topicstr, QoS qos, void (*function)(MessageData &))
-{
-    int rc = FAILURE;
-    int len = 0;
-
-    MQTTString topic = {(char*)topicstr, {0, 0}};
-    printf("Subscribing to topic [%s]\r\n", topicstr);
-    
-    if (!isConnected)
-    {
-        printf("Session already connected!!\r\n");
-        return rc;
-    }
-
-    len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
-    if (len <= 0)
-    {
-        printf("Error serializing subscribe packet ...\r\n");
-        return rc;
-    }
-    
-    if ((rc = sendPacket(len)) != SUCCESS) 
-    {
-        printf("Error sending subscribe packet [%d]\r\n", rc);
-        return rc;
-    }
-    
-    printf("Waiting for subscription ack ...\r\n");
-    // Wait for SUBACK, dropping packets read along the way ...
-    if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK)  // wait for suback
-    {
-        int count = 0, grantedQoS = -1;
-        unsigned short mypacketid;
-        if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
-            rc = grantedQoS; // 0, 1, 2 or 0x80
-        // For as long as we do not get 0x80 .. 
-        if (rc != 0x80)
-        {
-            // Add message handlers to the map
-            FP<void,MessageData &> fp;
-            fp.attach(function);
-            
-            topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));
-            
-            // Reset connection timers here ...
-            resetConnectionTimer();
-            
-            printf("Successfully subscribed to %s ...\r\n", topicstr);
-            rc = SUCCESS;
-        }else
-        {
-            printf("Failed to subscribe to topic %s ... (not authorized?)\r\n", topicstr);
-        }
-    }
-    else
-    {   
-        printf("Failed to subscribe to topic %s (ack not received) ...\r\n", topicstr);
-        rc = FAILURE;
-    }
-        
-    return rc;
-        
-}
-
-
 bool MQTTThreadedClient::isTopicMatched(char* topicFilter, MQTTString& topicName)
 {
     char* curf = topicFilter;
@@ -843,23 +808,43 @@
 void MQTTThreadedClient::startListener()
 {
     int pType;
+    int numsubs;
     // Continuesly listens for packets and dispatch
     // message handlers ...
-    do {
-        // Connect to server ...
-        if (!isConnected)
+    if (useTLS)
+    {
+        initTLS();
+    }
+            
+    while(true)
+    {
+
+        // Attempt to reconnect and login
+        if ( connect() < 0 )
         {
-            // Attempt to reconnect ...
+            disconnect();
+            // Wait for a few secs and reconnect ...
+            Thread::wait(6000);
+            continue;
         }
         
-        while(true) {
+        numsubs = processSubscriptions();
+        printf("Subscribed %d topics ...\r\n", numsubs);
+         
+        // loop read    
+        while(true) 
+        {
             pType = readPacket();
-            switch(pType) {
+            switch(pType) 
+            {
                 case TIMEOUT:
                     // No data available from the network ...
                     break;
                 case FAILURE:
-                    goto reconnect;
+                    {
+                        printf("readPacket returned failure \r\n");
+                        goto reconnect;
+                    }
                 case BUFFER_OVERFLOW: 
                     {
                         // TODO: Network error, do we disconnect and reconnect?
@@ -930,8 +915,14 @@
         } // end while loop
 
 reconnect:
-    disconnect();
-    // reconnect?
-    } while(true);
+        // reconnect?
+        disconnect();
+        
+    };
 }
-        
\ No newline at end of file
+
+void MQTTThreadedClient::stopListener()
+{
+    // TODO: Set a signal/flag that the running thread 
+    // will check if its ok to stop ...
+}
\ No newline at end of file
--- a/MQTTThreadedClient.h	Mon Mar 27 03:53:18 2017 +0000
+++ b/MQTTThreadedClient.h	Mon Mar 27 13:45:26 2017 +0000
@@ -80,8 +80,11 @@
     MQTTThreadedClient(NetworkInterface * aNetwork, const char * pem = NULL)
         : network(aNetwork),
           ssl_ca_pem(pem),
+          port((pem != NULL) ? 8883 : 1883),
           queue(32 * EVENTS_EVENT_SIZE),
-          isConnected(false) 
+          isConnected(false),          
+          hasSavedSession(false),
+          useTLS(pem != NULL)
     {
         DRBG_PERS = "mbed TLS MQTT client";
         tcpSocket = new TCPSocket();
@@ -98,10 +101,7 @@
                 
     }
 
-
-    int connect(const char * host, uint16_t port, MQTTPacket_connectData & options);
-    void disconnect();
-    
+    void setConnectionParameters(const char * host, uint16_t port, MQTTPacket_connectData & options);
     int publish(PubMessage& message);
     
     void addTopicHandler(const char * topic, void (*function)(MessageData &));
@@ -111,68 +111,20 @@
         FP<void,MessageData &> fp;
         fp.attach(object, member);
 
-        topicCBMap_t.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));        
+        topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topic),fp));  
     }
     
-    int subscribe(const char * topic, QoS qos, void (*function)(MessageData &));
-    template<typename T>
-    int subscribe(const char * topicstr, QoS qos, T *object, void (T::*member)(MessageData &)) {
-        int rc = FAILURE;
-        int len = 0;
-
-        MQTTString topic = {(char*)topicstr, {0, 0}};
-        printf("Subscribing to topic [%s]\r\n", topicstr);
-        
-        if (!isConnected) {
-            printf("Session already connected!!\r\n");
-            return rc;
-        }
-
-        len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
-        if (len <= 0) {
-            printf("Error serializing subscribe packet ...\r\n");
-            return rc;
-        }
-
-        if ((rc = sendPacket(len)) != SUCCESS) {
-            printf("Error sending subscribe packet [%d]\r\n", rc);
-            return rc;
-        }
-        printf("Waiting for subscription ack ...\r\n");
-        // Wait for SUBACK, dropping packets read along the way ...
-        if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback
-            int count = 0, grantedQoS = -1;
-            unsigned short mypacketid;
-            if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
-                rc = grantedQoS; // 0, 1, 2 or 0x80
-            // For as long as we do not get 0x80 ..
-            if (rc != 0x80) {
-                // Add message handlers to the map
-                FP<void,MessageData &> fp;
-                fp.attach(object, member);
-
-                topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));
-
-                // Reset connection timers here ...
-                resetConnectionTimer();
-                printf("Successfully subscribed to %s ...\r\n", topicstr);
-                rc = SUCCESS;
-            }
-        } else
-        {
-            printf("Failed to subscribe to topic %s (ack not received) ...\r\n", topicstr);
-            rc = FAILURE;
-        }
-
-        return rc;
-    }
-
-    // the listener thread ...
+    // Start the listener thread and start polling 
+    // MQTT server.
     void startListener();
+    // Stop the listerner thread and closes connection
+    void stopListener();
 
 protected:
-    int connect(MQTTPacket_connectData& options);
+
     int handlePublishMsg();
+    void disconnect();  
+    int connect();      
 
 
 private:
@@ -186,11 +138,10 @@
     std::string host;
     uint16_t port;
     MQTTPacket_connectData connect_options;
-
-    
     // Event queue
     EventQueue queue;
     bool isConnected;
+    bool hasSavedSession;    
 
     // TODO: Because I'm using a map, I can only have one handler
     // for each topic (one that's mapped to the topic string).
@@ -198,7 +149,6 @@
     // In the future, use a vector instead of maps to allow multiple
     // handlers for the same topic.
     std::map<std::string, FP<void, MessageData &> > topicCBMap;
-    std::map<std::string, FP<void, MessageData &> > topicCBMap_t;
     
     unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
     unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
@@ -207,6 +157,7 @@
     Timer comTimer;
 
     // SSL/TLS functions
+    bool useTLS;
     void setupTLS();
     int initTLS();    
     void freeTLS();
@@ -224,7 +175,7 @@
     void resetConnectionTimer();
     void sendPingRequest();
     bool hasConnectionTimedOut();
-
+    int login();
 };
 
 #endif
--- a/main.cpp	Mon Mar 27 03:53:18 2017 +0000
+++ b/main.cpp	Mon Mar 27 13:45:26 2017 +0000
@@ -33,7 +33,76 @@
 
 
 Serial pc(USBTX, USBRX, 115200);
-Thread msgSender;
+Thread msgSender(osPriorityNormal, DEFAULT_STACK_SIZE * 2);
+
+#define MQTT_USE_TLS
+
+#ifdef MQTT_USE_TLS
+/* List of trusted root CA certificates
+ * currently only "letsencrypt", the CA for mbedhacks.com
+ *
+ * To add more than one root, just concatenate them.
+ *
+ * TODO: Move this certificate file onto the SD card.
+ */
+static const char SSL_CA_PEM[] = "-----BEGIN CERTIFICATE-----\n"
+    "MIIFETCCA/mgAwIBAgISA2ktlb1Y6ap4GCH7dg3wS37XMA0GCSqGSIb3DQEBCwUA\n"
+    "MEoxCzAJBgNVBAYTAlVTMRYwFAYDVQQKEw1MZXQncyBFbmNyeXB0MSMwIQYDVQQD\n"
+    "ExpMZXQncyBFbmNyeXB0IEF1dGhvcml0eSBYMzAeFw0xNzAzMDkwMTQ4MDBaFw0x\n"
+    "NzA2MDcwMTQ4MDBaMBgxFjAUBgNVBAMTDW1iZWRoYWNrcy5jb20wggEiMA0GCSqG\n"
+    "SIb3DQEBAQUAA4IBDwAwggEKAoIBAQC4ppYHlH8lfB7lkWOjMSnOJGaLtCBfz57I\n"
+    "VVOd1Rngsz7nE5fg3joa7lkazRY1ZqtuC2UloS+4LYoQZX4Z887dhdug/TPA4J1A\n"
+    "GppA4xVCb2kUFODMjZ2r4pMLp+MjFFMBaHrL4cgx/n4aJUB+N9Z+HW0p2Yr5TsOQ\n"
+    "ghIOPkNxFr2q6klm49+BMUbO98hAwFwsIISLf6IbHM93gx1ltqkvb55N87ZM1hYH\n"
+    "fkq+J+YqjleiLaqRN2MVlNMNfy9MDbqM5uCyGiWGtq8eiQLaWpZkxnA2MC5zPsO/\n"
+    "fzEWiVjn2uazlXZ5xZwiK22KMxVasqWMitvETtmPOl9mocRbLQdxAgMBAAGjggIh\n"
+    "MIICHTAOBgNVHQ8BAf8EBAMCBaAwHQYDVR0lBBYwFAYIKwYBBQUHAwEGCCsGAQUF\n"
+    "BwMCMAwGA1UdEwEB/wQCMAAwHQYDVR0OBBYEFCsgG+z1BTjrN3K+/tF0C4k818Yv\n"
+    "MB8GA1UdIwQYMBaAFKhKamMEfd265tE5t6ZFZe/zqOyhMHAGCCsGAQUFBwEBBGQw\n"
+    "YjAvBggrBgEFBQcwAYYjaHR0cDovL29jc3AuaW50LXgzLmxldHNlbmNyeXB0Lm9y\n"
+    "Zy8wLwYIKwYBBQUHMAKGI2h0dHA6Ly9jZXJ0LmludC14My5sZXRzZW5jcnlwdC5v\n"
+    "cmcvMCsGA1UdEQQkMCKCDW1iZWRoYWNrcy5jb22CEXd3dy5tYmVkaGFja3MuY29t\n"
+    "MIH+BgNVHSAEgfYwgfMwCAYGZ4EMAQIBMIHmBgsrBgEEAYLfEwEBATCB1jAmBggr\n"
+    "BgEFBQcCARYaaHR0cDovL2Nwcy5sZXRzZW5jcnlwdC5vcmcwgasGCCsGAQUFBwIC\n"
+    "MIGeDIGbVGhpcyBDZXJ0aWZpY2F0ZSBtYXkgb25seSBiZSByZWxpZWQgdXBvbiBi\n"
+    "eSBSZWx5aW5nIFBhcnRpZXMgYW5kIG9ubHkgaW4gYWNjb3JkYW5jZSB3aXRoIHRo\n"
+    "ZSBDZXJ0aWZpY2F0ZSBQb2xpY3kgZm91bmQgYXQgaHR0cHM6Ly9sZXRzZW5jcnlw\n"
+    "dC5vcmcvcmVwb3NpdG9yeS8wDQYJKoZIhvcNAQELBQADggEBABFH6YcvHh8foHeg\n"
+    "NM7iR9HnYRqa5gSERcCtq6jm8PcTsAbsdQ/BNpIHK7AZSg2kk17kj+JFeyMuNJWq\n"
+    "lmabV0dtzdC8ejp1d7hGb/HjuQ400th/QRayvyrDVzQPfCNyJ0C82Q2DFjeUgnqv\n"
+    "oJMcV6i4ICW0boI7GUf7oeHCmrUEHKffAbeFvx3c85c39IHJEFa59UWj1linU/Tr\n"
+    "g9i5AaSKB95d706u1XRA7WLV/Hu7yunhxEjlj33bfdifBb/ZLBd0LtrXPwtXi6E8\n"
+    "r6obp+B+Ce89G7WEhdT9BX0ck1KTK+yP7uAC7tvvsiejxXOoCtVyBAumBJS7mRuv\n"
+    "I5hmKgE=\n"
+    "-----END CERTIFICATE-----\n"
+    "-----BEGIN CERTIFICATE-----\n"
+    "MIIEkjCCA3qgAwIBAgIQCgFBQgAAAVOFc2oLheynCDANBgkqhkiG9w0BAQsFADA/\n"
+    "MSQwIgYDVQQKExtEaWdpdGFsIFNpZ25hdHVyZSBUcnVzdCBDby4xFzAVBgNVBAMT\n"
+    "DkRTVCBSb290IENBIFgzMB4XDTE2MDMxNzE2NDA0NloXDTIxMDMxNzE2NDA0Nlow\n"
+    "SjELMAkGA1UEBhMCVVMxFjAUBgNVBAoTDUxldCdzIEVuY3J5cHQxIzAhBgNVBAMT\n"
+    "GkxldCdzIEVuY3J5cHQgQXV0aG9yaXR5IFgzMIIBIjANBgkqhkiG9w0BAQEFAAOC\n"
+    "AQ8AMIIBCgKCAQEAnNMM8FrlLke3cl03g7NoYzDq1zUmGSXhvb418XCSL7e4S0EF\n"
+    "q6meNQhY7LEqxGiHC6PjdeTm86dicbp5gWAf15Gan/PQeGdxyGkOlZHP/uaZ6WA8\n"
+    "SMx+yk13EiSdRxta67nsHjcAHJyse6cF6s5K671B5TaYucv9bTyWaN8jKkKQDIZ0\n"
+    "Z8h/pZq4UmEUEz9l6YKHy9v6Dlb2honzhT+Xhq+w3Brvaw2VFn3EK6BlspkENnWA\n"
+    "a6xK8xuQSXgvopZPKiAlKQTGdMDQMc2PMTiVFrqoM7hD8bEfwzB/onkxEz0tNvjj\n"
+    "/PIzark5McWvxI0NHWQWM6r6hCm21AvA2H3DkwIDAQABo4IBfTCCAXkwEgYDVR0T\n"
+    "AQH/BAgwBgEB/wIBADAOBgNVHQ8BAf8EBAMCAYYwfwYIKwYBBQUHAQEEczBxMDIG\n"
+    "CCsGAQUFBzABhiZodHRwOi8vaXNyZy50cnVzdGlkLm9jc3AuaWRlbnRydXN0LmNv\n"
+    "bTA7BggrBgEFBQcwAoYvaHR0cDovL2FwcHMuaWRlbnRydXN0LmNvbS9yb290cy9k\n"
+    "c3Ryb290Y2F4My5wN2MwHwYDVR0jBBgwFoAUxKexpHsscfrb4UuQdf/EFWCFiRAw\n"
+    "VAYDVR0gBE0wSzAIBgZngQwBAgEwPwYLKwYBBAGC3xMBAQEwMDAuBggrBgEFBQcC\n"
+    "ARYiaHR0cDovL2Nwcy5yb290LXgxLmxldHNlbmNyeXB0Lm9yZzA8BgNVHR8ENTAz\n"
+    "MDGgL6AthitodHRwOi8vY3JsLmlkZW50cnVzdC5jb20vRFNUUk9PVENBWDNDUkwu\n"
+    "Y3JsMB0GA1UdDgQWBBSoSmpjBH3duubRObemRWXv86jsoTANBgkqhkiG9w0BAQsF\n"
+    "AAOCAQEA3TPXEfNjWDjdGBX7CVW+dla5cEilaUcne8IkCJLxWh9KEik3JHRRHGJo\n"
+    "uM2VcGfl96S8TihRzZvoroed6ti6WqEBmtzw3Wodatg+VyOeph4EYpr/1wXKtx8/\n"
+    "wApIvJSwtmVi4MFU5aMqrSDE6ea73Mj2tcMyo5jMd6jmeWUHK8so/joWUoHOUgwu\n"
+    "X4Po1QYz+3dszkDqMp4fklxBwXRsW10KXzPMTZ+sOPAveyxindmjkW8lGy+QsRlG\n"
+    "PfZ+G6Z6h7mjem0Y+iWlkYcV4PIWL1iwBi8saCbGS5jN2p8M+X+Q7UNKEkROb3N6\n"
+    "KOqkqm57TH2H3eDJAkSnh6/DNFu0Qg==\n"
+    "-----END CERTIFICATE-----";
+#endif
 
 static const char * clientID = "mbed-sample";
 static const char * userID = "mbedhacks";
@@ -84,30 +153,30 @@
         return -1;
     }
 
+#ifdef MQTT_USE_TLS
+    MQTTThreadedClient mqtt(network, SSL_CA_PEM);
+#else
     MQTTThreadedClient mqtt(network);
+#endif
+
 
     const char* hostname = "mqtt.mbedhacks.com";
     // const char* hostname = "192.168.0.7";    
+#ifdef MQTT_USE_TLS    
+    int port = 8883;
+#else
     int port = 1883;
-    printf("Connecting to %s:%d\r\n", hostname, port);
-
+#endif
 
-    MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
-    data.MQTTVersion = 3;
-    data.clientID.cstring = (char *) clientID;
-    data.username.cstring = (char *) userID;
-    data.password.cstring = (char *) password;
-    data.keepAliveInterval = 100; // default is 60
-
-    int rc = mqtt.connect(hostname, port, data);
-    if (rc != 0)
-        printf("rc from TCP connect is %d\r\n", rc);
-        
-    if ((rc = mqtt.subscribe(topic_1, QOS0, messageArrived)) != 0)
-        printf("rc from MQTT subscribe 1 is %d\r\n", rc);
-        
-    if ((rc = mqtt.subscribe(topic_2, QOS0, &testcb, &CallbackTest::messageArrived)) != 0)
-        printf("rc from MQTT subscribe 2 is %d\r\b", rc);
+    MQTTPacket_connectData logindata = MQTTPacket_connectData_initializer;
+    logindata.MQTTVersion = 3;
+    logindata.clientID.cstring = (char *) clientID;
+    logindata.username.cstring = (char *) userID;
+    logindata.password.cstring = (char *) password;
+    
+    mqtt.setConnectionParameters(hostname, port, logindata);
+    mqtt.addTopicHandler(topic_1, messageArrived);
+    mqtt.addTopicHandler(topic_2, &testcb, &CallbackTest::messageArrived);
 
     // Start the data producer
     msgSender.start(mbed::callback(&mqtt, &MQTTThreadedClient::startListener));