Fork of Hello MQTT, using mbed TLS for secure mqtt transport

Dependencies:   MQTT

Fork of HelloMQTT by MQTT

Files at this revision

API Documentation at this revision

Comitter:
vpcola
Date:
Fri Mar 17 08:42:29 2017 +0000
Parent:
20:49c9daf2b0ff
Child:
22:4d0628d13870
Commit message:
First seed ...

Changed in this revision

MQTTSNetwork.h Show annotated file Show diff for this revision Revisions of this file
main.cpp Show annotated file Show diff for this revision Revisions of this file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/MQTTSNetwork.h	Fri Mar 17 08:42:29 2017 +0000
@@ -0,0 +1,373 @@
+#ifndef _MQTTSNETWORK_H_
+#define _MQTTSNETWORK_H_
+ 
+#include "NetworkInterface.h"
+#include "TCPSocket.h"
+#include "mbedtls/platform.h"
+#include "mbedtls/ssl.h"
+#include "mbedtls/entropy.h"
+#include "mbedtls/ctr_drbg.h"
+#include "mbedtls/error.h"
+
+#if DEBUG_LEVEL > 0
+#include "mbedtls/debug.h"
+#endif
+ 
+class MQTTSNetwork {
+public:
+ 
+    MQTTSNetwork() 
+        : tcpsocket(NULL)
+        ,ssl_ca_pem(NULL)
+        ,keepalive(1)
+    {
+        DRBG_PERS = "mbed TLS MQTT client";
+                
+        mbedtls_entropy_init(&_entropy);
+        mbedtls_ctr_drbg_init(&_ctr_drbg);
+        mbedtls_x509_crt_init(&_cacert);
+        mbedtls_ssl_init(&_ssl);
+        mbedtls_ssl_config_init(&_ssl_conf);        
+        memset( &saved_session, 0, sizeof( mbedtls_ssl_session ) );
+    }
+    
+    ~MQTTSNetwork() 
+    {
+
+        mbedtls_entropy_free(&_entropy);
+        mbedtls_ctr_drbg_free(&_ctr_drbg);
+        mbedtls_x509_crt_free(&_cacert);
+        mbedtls_ssl_free(&_ssl);
+        mbedtls_ssl_config_free(&_ssl_conf);        
+        
+        if (tcpsocket)
+            delete tcpsocket;
+                    
+    }
+    
+
+    
+    int setupTLS(NetworkInterface * net, const char * pem)
+    {
+        int ret;
+        
+        network = net;
+        ssl_ca_pem = pem;
+        
+        printf("Initializing TLS ...\r\n");
+        printf("mbedtls_ctr_drdbg_seed ...\r\n");
+        if ((ret = mbedtls_ctr_drbg_seed(&_ctr_drbg, mbedtls_entropy_func, &_entropy,
+                          (const unsigned char *) DRBG_PERS,
+                          sizeof (DRBG_PERS))) != 0) {
+            print_mbedtls_error("mbedtls_crt_drbg_init", ret);
+            _error = ret;
+            return -1;
+        }
+        printf("mbedtls_x509_crt_parse ...\r\n");
+        if ((ret = mbedtls_x509_crt_parse(&_cacert, (const unsigned char *) ssl_ca_pem,
+                           strlen(ssl_ca_pem) + 1)) != 0) {
+            print_mbedtls_error("mbedtls_x509_crt_parse", ret);
+            _error = ret;
+            return -1;
+        }
+
+        printf("mbedtls_ssl_config_defaults ...\r\n");
+        if ((ret = mbedtls_ssl_config_defaults(&_ssl_conf,
+                        MBEDTLS_SSL_IS_CLIENT,
+                        MBEDTLS_SSL_TRANSPORT_STREAM,
+                        MBEDTLS_SSL_PRESET_DEFAULT)) != 0) {
+            print_mbedtls_error("mbedtls_ssl_config_defaults", ret);
+            _error = ret;
+            return -1;
+        }
+
+        printf("mbedtls_ssl_config_ca_chain ...\r\n");
+        mbedtls_ssl_conf_ca_chain(&_ssl_conf, &_cacert, NULL);
+        printf("mbedtls_ssl_conf_rng ...\r\n");
+        mbedtls_ssl_conf_rng(&_ssl_conf, mbedtls_ctr_drbg_random, &_ctr_drbg);
+
+        /* It is possible to disable authentication by passing
+         * MBEDTLS_SSL_VERIFY_NONE in the call to mbedtls_ssl_conf_authmode()
+         */
+        printf("mbedtls_ssl_conf_authmode ...\r\n");         
+        mbedtls_ssl_conf_authmode(&_ssl_conf, MBEDTLS_SSL_VERIFY_REQUIRED);
+
+#if DEBUG_LEVEL > 0
+        mbedtls_ssl_conf_verify(&_ssl_conf, my_verify, NULL);
+        mbedtls_ssl_conf_dbg(&_ssl_conf, my_debug, NULL);
+        mbedtls_debug_set_threshold(DEBUG_LEVEL);
+#endif
+
+        printf("mbedtls_ssl_setup ...\r\n");         
+        if ((ret = mbedtls_ssl_setup(&_ssl, &_ssl_conf)) != 0) {
+            print_mbedtls_error("mbedtls_ssl_setup", ret);
+            _error = ret;
+            return -1;
+        }
+        
+        
+        return ret;        
+    }
+ 
+    int read(unsigned char* buffer, int len, int timeout) 
+    {
+        int ret;
+        printf("MQTTS client read ...\r\n");
+
+        printf("read set timeout ... %d\r\n", timeout);
+        tcpsocket->set_timeout(timeout);
+        
+        ret = mbedtls_ssl_read(&_ssl, buffer, len);
+        printf("mbedtls_ssl_read returned %d\r\n", ret);
+        if (ret < 0) {
+            if (ret != MBEDTLS_ERR_SSL_WANT_READ && ret != MBEDTLS_ERR_SSL_WANT_WRITE) 
+            {
+                // Some other error we can not recover from
+                print_mbedtls_error("mbedtls_ssl_read", ret);
+                onError(tcpsocket, -1 );
+            }
+            else {
+                // timeout occurred ...
+                printf("Timed out? ...\r\n");
+                _error = ret;
+            }
+            printf("MQTTS client read returns with error!!!...\r\n");
+            return -1;
+        }
+        printf("MQTS client read successfully!! ...\r\n");
+        return ret ;
+    }
+ 
+    int write(unsigned char* buffer, int len, int timeout) 
+    {
+        int ret;
+        
+        printf("MQTTS client write ...\r\n");
+        tcpsocket->set_timeout(timeout);
+        
+        ret =  mbedtls_ssl_write(&_ssl, (const unsigned char *) buffer, len);
+        if (ret < 0) {
+            if (ret != MBEDTLS_ERR_SSL_WANT_READ && ret != MBEDTLS_ERR_SSL_WANT_WRITE) {
+                print_mbedtls_error("mbedtls_ssl_write", ret);
+                onError(tcpsocket, -1 );
+            }
+            else {
+                _error = ret;
+            }
+            return -1;
+        }
+        
+        return ret;        
+    }
+ 
+    int connect(const char* host, int port) 
+    {
+        // Do the TLS handshake here ...
+        /* Initialize the flags */
+        /*
+         * Initialize TLS-related stuf.
+         */
+        int ret;
+        
+        // Save the hostname and port on first connect
+        // Create the socket 
+        if (tcpsocket == NULL)
+            tcpsocket = new TCPSocket(network);
+            
+        if (tcpsocket == NULL)
+            ret = -1;
+           
+
+        printf("mbedtls_ssl_set_hostname ...\r\n");         
+        mbedtls_ssl_set_hostname(&_ssl, host);
+        printf("mbedtls_ssl_set_bio ...\r\n");         
+        mbedtls_ssl_set_bio(&_ssl, static_cast<void *>(tcpsocket),
+                                   ssl_send, ssl_recv, NULL );
+
+        printf("Connecting to %s:%d\r\n", host, port);
+        ret = tcpsocket->connect(host, port);
+        if (ret != NSAPI_ERROR_OK) {
+            if (_debug) mbedtls_printf("Failed to connect\r\n");
+            onError(tcpsocket, -1);
+            return -1;
+        }
+
+        /* Start the handshake, the rest will be done in onReceive() */
+        printf("Starting the TLS handshake...\r\n");
+        ret = mbedtls_ssl_handshake(&_ssl);
+        if (ret < 0) {
+            if (ret != MBEDTLS_ERR_SSL_WANT_READ &&
+                ret != MBEDTLS_ERR_SSL_WANT_WRITE) {
+                print_mbedtls_error("mbedtls_ssl_handshake", ret);
+                onError(tcpsocket, -1);
+            }
+            else {
+                _error = ret;
+            }
+            return -1;
+        }
+
+        /* Handshake done, time to print info */
+        printf("TLS connection to %s:%d established\r\n", 
+            host, port);
+
+        const uint32_t buf_size = 1024;
+        char *buf = new char[buf_size];
+        mbedtls_x509_crt_info(buf, buf_size, "\r    ",
+                        mbedtls_ssl_get_peer_cert(&_ssl));
+        printf("Server certificate:\r\n%s\r", buf);
+
+        uint32_t flags = mbedtls_ssl_get_verify_result(&_ssl);
+        if( flags != 0 )
+        {
+            mbedtls_x509_crt_verify_info(buf, buf_size, "\r  ! ", flags);
+            printf("Certificate verification failed:\r\n%s\r\r\n", buf);
+            return -1;
+        }
+        
+        printf("Certificate verification passed\r\n\r\n");
+        
+        // TODO: Save the session here for reconnect.
+        if( ( ret = mbedtls_ssl_get_session( &_ssl, &saved_session ) ) != 0 )
+        {
+            printf( "mbedtls_ssl_get_session returned -0x%x\n\n", -ret );
+            return -1;
+        }  
+        printf("Session saved for reconnect ...\r\n"); 
+        
+        // Set socket to non-blocking mode ...
+
+        return 0;
+    }
+ 
+    void disconnect() 
+    {
+        if (tcpsocket)
+        {
+            tcpsocket->close();
+            delete tcpsocket;
+            tcpsocket = NULL;
+        }
+    }
+    
+protected:
+    /**
+     * Helper for pretty-printing mbed TLS error codes
+     */
+    static void print_mbedtls_error(const char *name, int err) {
+        char buf[128];
+        mbedtls_strerror(err, buf, sizeof (buf));
+        mbedtls_printf("%s() failed: -0x%04x (%d): %s\r\n", name, -err, err, buf);
+    }
+
+#if DEBUG_LEVEL > 0
+    /**
+     * Debug callback for mbed TLS
+     * Just prints on the USB serial port
+     */
+    static void my_debug(void *ctx, int level, const char *file, int line,
+                         const char *str)
+    {
+        const char *p, *basename;
+        (void) ctx;
+
+        /* Extract basename from file */
+        for(p = basename = file; *p != '\0'; p++) {
+            if(*p == '/' || *p == '\\') {
+                basename = p + 1;
+            }
+        }
+
+        if (_debug) {
+            mbedtls_printf("%s:%04d: |%d| %s", basename, line, level, str);
+        }
+    }
+
+    /**
+     * Certificate verification callback for mbed TLS
+     * Here we only use it to display information on each cert in the chain
+     */
+    static int my_verify(void *data, mbedtls_x509_crt *crt, int depth, uint32_t *flags)
+    {
+        const uint32_t buf_size = 1024;
+        char *buf = new char[buf_size];
+        (void) data;
+
+        if (_debug) mbedtls_printf("\nVerifying certificate at depth %d:\n", depth);
+        mbedtls_x509_crt_info(buf, buf_size - 1, "  ", crt);
+        if (_debug) mbedtls_printf("%s", buf);
+
+        if (*flags == 0)
+            if (_debug) mbedtls_printf("No verification issue for this certificate\n");
+        else
+        {
+            mbedtls_x509_crt_verify_info(buf, buf_size, "  ! ", *flags);
+            if (_debug) mbedtls_printf("%s\n", buf);
+        }
+
+        delete[] buf;
+        return 0;
+    }
+#endif
+
+    /**
+     * Receive callback for mbed TLS
+     */
+    static int ssl_recv(void *ctx, unsigned char *buf, size_t len) {
+        int recv = -1;
+        TCPSocket *socket = static_cast<TCPSocket *>(ctx);
+        recv = socket->recv(buf, len);
+
+        if (NSAPI_ERROR_WOULD_BLOCK == recv) {
+            return MBEDTLS_ERR_SSL_WANT_READ;
+        }
+        else if (recv < 0) {
+            return -1;
+        }
+        else {
+            return recv;
+        }
+   }
+   
+    /**
+     * Send callback for mbed TLS
+     */
+    static int ssl_send(void *ctx, const unsigned char *buf, size_t len) {
+       int sent = -1;
+        TCPSocket *socket = static_cast<TCPSocket *>(ctx);
+        sent = socket->send(buf, len);
+
+        if(NSAPI_ERROR_WOULD_BLOCK == sent) {
+            return MBEDTLS_ERR_SSL_WANT_WRITE;
+        }
+        else if (sent < 0){
+            return -1;
+        }
+        else {
+            return sent;
+        }
+    }
+
+    void onError(TCPSocket *s, int error) {
+        s->close();
+        _error = error;
+    }
+ 
+private:
+    NetworkInterface * network;
+    TCPSocket* tcpsocket;
+    const char * ssl_ca_pem;
+    bool _debug;
+    nsapi_error_t _error;
+    const char *DRBG_PERS;
+    int keepalive;
+    mbedtls_entropy_context _entropy;
+    mbedtls_ctr_drbg_context _ctr_drbg;
+    mbedtls_x509_crt _cacert;
+    mbedtls_ssl_context _ssl;
+    mbedtls_ssl_config _ssl_conf;    
+    mbedtls_ssl_session saved_session;
+
+};
+ 
+#endif // _MQTTNETWORK_H_
\ No newline at end of file
--- a/main.cpp	Tue Jan 10 18:10:17 2017 -0600
+++ b/main.cpp	Fri Mar 17 08:42:29 2017 +0000
@@ -26,114 +26,260 @@
  */
 
  // change this to 0 to output messages to serial instead of LCD
-#define USE_LCD 1
 
-#if USE_LCD
-#include "C12832.h"
-
-// the actual pins are defined in mbed_app.json and can be overridden per target
-C12832 lcd(LCD_MOSI, LCD_SCK, LCD_MISO, LCD_A0, LCD_NCS);
-
-#define logMessage lcd.cls();lcd.printf
-
-#else
-
-#define logMessage printf
-
-#endif
 
 #define MQTTCLIENT_QOS2 1
 
+#include "mbed.h"
+#include "rtos.h"
 #include "easy-connect.h"
-#include "MQTTNetwork.h"
+#include "MQTTSNetwork.h"
 #include "MQTTmbed.h"
 #include "MQTTClient.h"
 
-int arrivedcount = 0;
 
+/* List of trusted root CA certificates
+ * currently only "letsencrypt", the CA for mbedhacks.com
+ *
+ * To add more than one root, just concatenate them.
+ *
+ * TODO: Move this certificate file onto the SD card.
+ */
+static const char SSL_CA_PEM[] = "-----BEGIN CERTIFICATE-----\n"
+    "MIIFETCCA/mgAwIBAgISA2ktlb1Y6ap4GCH7dg3wS37XMA0GCSqGSIb3DQEBCwUA\n"
+    "MEoxCzAJBgNVBAYTAlVTMRYwFAYDVQQKEw1MZXQncyBFbmNyeXB0MSMwIQYDVQQD\n"
+    "ExpMZXQncyBFbmNyeXB0IEF1dGhvcml0eSBYMzAeFw0xNzAzMDkwMTQ4MDBaFw0x\n"
+    "NzA2MDcwMTQ4MDBaMBgxFjAUBgNVBAMTDW1iZWRoYWNrcy5jb20wggEiMA0GCSqG\n"
+    "SIb3DQEBAQUAA4IBDwAwggEKAoIBAQC4ppYHlH8lfB7lkWOjMSnOJGaLtCBfz57I\n"
+    "VVOd1Rngsz7nE5fg3joa7lkazRY1ZqtuC2UloS+4LYoQZX4Z887dhdug/TPA4J1A\n"
+    "GppA4xVCb2kUFODMjZ2r4pMLp+MjFFMBaHrL4cgx/n4aJUB+N9Z+HW0p2Yr5TsOQ\n"
+    "ghIOPkNxFr2q6klm49+BMUbO98hAwFwsIISLf6IbHM93gx1ltqkvb55N87ZM1hYH\n"
+    "fkq+J+YqjleiLaqRN2MVlNMNfy9MDbqM5uCyGiWGtq8eiQLaWpZkxnA2MC5zPsO/\n"
+    "fzEWiVjn2uazlXZ5xZwiK22KMxVasqWMitvETtmPOl9mocRbLQdxAgMBAAGjggIh\n"
+    "MIICHTAOBgNVHQ8BAf8EBAMCBaAwHQYDVR0lBBYwFAYIKwYBBQUHAwEGCCsGAQUF\n"
+    "BwMCMAwGA1UdEwEB/wQCMAAwHQYDVR0OBBYEFCsgG+z1BTjrN3K+/tF0C4k818Yv\n"
+    "MB8GA1UdIwQYMBaAFKhKamMEfd265tE5t6ZFZe/zqOyhMHAGCCsGAQUFBwEBBGQw\n"
+    "YjAvBggrBgEFBQcwAYYjaHR0cDovL29jc3AuaW50LXgzLmxldHNlbmNyeXB0Lm9y\n"
+    "Zy8wLwYIKwYBBQUHMAKGI2h0dHA6Ly9jZXJ0LmludC14My5sZXRzZW5jcnlwdC5v\n"
+    "cmcvMCsGA1UdEQQkMCKCDW1iZWRoYWNrcy5jb22CEXd3dy5tYmVkaGFja3MuY29t\n"
+    "MIH+BgNVHSAEgfYwgfMwCAYGZ4EMAQIBMIHmBgsrBgEEAYLfEwEBATCB1jAmBggr\n"
+    "BgEFBQcCARYaaHR0cDovL2Nwcy5sZXRzZW5jcnlwdC5vcmcwgasGCCsGAQUFBwIC\n"
+    "MIGeDIGbVGhpcyBDZXJ0aWZpY2F0ZSBtYXkgb25seSBiZSByZWxpZWQgdXBvbiBi\n"
+    "eSBSZWx5aW5nIFBhcnRpZXMgYW5kIG9ubHkgaW4gYWNjb3JkYW5jZSB3aXRoIHRo\n"
+    "ZSBDZXJ0aWZpY2F0ZSBQb2xpY3kgZm91bmQgYXQgaHR0cHM6Ly9sZXRzZW5jcnlw\n"
+    "dC5vcmcvcmVwb3NpdG9yeS8wDQYJKoZIhvcNAQELBQADggEBABFH6YcvHh8foHeg\n"
+    "NM7iR9HnYRqa5gSERcCtq6jm8PcTsAbsdQ/BNpIHK7AZSg2kk17kj+JFeyMuNJWq\n"
+    "lmabV0dtzdC8ejp1d7hGb/HjuQ400th/QRayvyrDVzQPfCNyJ0C82Q2DFjeUgnqv\n"
+    "oJMcV6i4ICW0boI7GUf7oeHCmrUEHKffAbeFvx3c85c39IHJEFa59UWj1linU/Tr\n"
+    "g9i5AaSKB95d706u1XRA7WLV/Hu7yunhxEjlj33bfdifBb/ZLBd0LtrXPwtXi6E8\n"
+    "r6obp+B+Ce89G7WEhdT9BX0ck1KTK+yP7uAC7tvvsiejxXOoCtVyBAumBJS7mRuv\n"
+    "I5hmKgE=\n"
+    "-----END CERTIFICATE-----\n"
+    "-----BEGIN CERTIFICATE-----\n"
+    "MIIEkjCCA3qgAwIBAgIQCgFBQgAAAVOFc2oLheynCDANBgkqhkiG9w0BAQsFADA/\n"
+    "MSQwIgYDVQQKExtEaWdpdGFsIFNpZ25hdHVyZSBUcnVzdCBDby4xFzAVBgNVBAMT\n"
+    "DkRTVCBSb290IENBIFgzMB4XDTE2MDMxNzE2NDA0NloXDTIxMDMxNzE2NDA0Nlow\n"
+    "SjELMAkGA1UEBhMCVVMxFjAUBgNVBAoTDUxldCdzIEVuY3J5cHQxIzAhBgNVBAMT\n"
+    "GkxldCdzIEVuY3J5cHQgQXV0aG9yaXR5IFgzMIIBIjANBgkqhkiG9w0BAQEFAAOC\n"
+    "AQ8AMIIBCgKCAQEAnNMM8FrlLke3cl03g7NoYzDq1zUmGSXhvb418XCSL7e4S0EF\n"
+    "q6meNQhY7LEqxGiHC6PjdeTm86dicbp5gWAf15Gan/PQeGdxyGkOlZHP/uaZ6WA8\n"
+    "SMx+yk13EiSdRxta67nsHjcAHJyse6cF6s5K671B5TaYucv9bTyWaN8jKkKQDIZ0\n"
+    "Z8h/pZq4UmEUEz9l6YKHy9v6Dlb2honzhT+Xhq+w3Brvaw2VFn3EK6BlspkENnWA\n"
+    "a6xK8xuQSXgvopZPKiAlKQTGdMDQMc2PMTiVFrqoM7hD8bEfwzB/onkxEz0tNvjj\n"
+    "/PIzark5McWvxI0NHWQWM6r6hCm21AvA2H3DkwIDAQABo4IBfTCCAXkwEgYDVR0T\n"
+    "AQH/BAgwBgEB/wIBADAOBgNVHQ8BAf8EBAMCAYYwfwYIKwYBBQUHAQEEczBxMDIG\n"
+    "CCsGAQUFBzABhiZodHRwOi8vaXNyZy50cnVzdGlkLm9jc3AuaWRlbnRydXN0LmNv\n"
+    "bTA7BggrBgEFBQcwAoYvaHR0cDovL2FwcHMuaWRlbnRydXN0LmNvbS9yb290cy9k\n"
+    "c3Ryb290Y2F4My5wN2MwHwYDVR0jBBgwFoAUxKexpHsscfrb4UuQdf/EFWCFiRAw\n"
+    "VAYDVR0gBE0wSzAIBgZngQwBAgEwPwYLKwYBBAGC3xMBAQEwMDAuBggrBgEFBQcC\n"
+    "ARYiaHR0cDovL2Nwcy5yb290LXgxLmxldHNlbmNyeXB0Lm9yZzA8BgNVHR8ENTAz\n"
+    "MDGgL6AthitodHRwOi8vY3JsLmlkZW50cnVzdC5jb20vRFNUUk9PVENBWDNDUkwu\n"
+    "Y3JsMB0GA1UdDgQWBBSoSmpjBH3duubRObemRWXv86jsoTANBgkqhkiG9w0BAQsF\n"
+    "AAOCAQEA3TPXEfNjWDjdGBX7CVW+dla5cEilaUcne8IkCJLxWh9KEik3JHRRHGJo\n"
+    "uM2VcGfl96S8TihRzZvoroed6ti6WqEBmtzw3Wodatg+VyOeph4EYpr/1wXKtx8/\n"
+    "wApIvJSwtmVi4MFU5aMqrSDE6ea73Mj2tcMyo5jMd6jmeWUHK8so/joWUoHOUgwu\n"
+    "X4Po1QYz+3dszkDqMp4fklxBwXRsW10KXzPMTZ+sOPAveyxindmjkW8lGy+QsRlG\n"
+    "PfZ+G6Z6h7mjem0Y+iWlkYcV4PIWL1iwBi8saCbGS5jN2p8M+X+Q7UNKEkROb3N6\n"
+    "KOqkqm57TH2H3eDJAkSnh6/DNFu0Qg==\n"
+    "-----END CERTIFICATE-----";
 
+int arrivedcount = 0;
+Thread thdMQTT;
+
+MQTTSNetwork mqttNetwork;
+MQTT::Client<MQTTSNetwork, Countdown> client = MQTT::Client<MQTTSNetwork, Countdown>(mqttNetwork);
+
+static MemoryPool<MQTT::Message, 16> pool;
+static Queue<MQTT::Message, 16> queue;
+
+static const char* topic = "mbed-sample";
+
+typedef void (*messageHandler)(MQTT::MessageData&);
 void messageArrived(MQTT::MessageData& md)
 {
     MQTT::Message &message = md.message;
-    logMessage("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
-    logMessage("Payload %.*s\r\n", message.payloadlen, (char*)message.payload);
+    printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
+    printf("Payload %.*s\r\n", message.payloadlen, (char*)message.payload);
     ++arrivedcount;
 }
 
+int mqttsConnect(const char * hostname, uint16_t port, 
+        const char * clientID, 
+        const char * username, 
+        const char * password)
+{
+    int rc;
+    
+    printf("Connecting to %s:%d\r\n", hostname, port);
+    rc = mqttNetwork.connect(hostname, port);
+    if (rc != 0)
+    {
+        printf("rc from TCP connect is %d\r\n", rc);
+        return -1;
+    }
+    else
+        printf("RC passed!\r\n");
+ 
+    printf("Creating data connection ...\r\n");
+    MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
+    data.MQTTVersion = 3;
+    data.clientID.cstring = (char *) clientID;
+    data.username.cstring = (char *) username;
+    data.password.cstring = (char *) password;
+    printf("Connecting client ...\r\n");
+    if ((rc = client.connect(data)) != 0)
+    {
+        printf("rc from MQTT connect is %d\r\n", rc);
+        return -1;
+    }
+ 
+    printf("Subscribing to topic ...\r\n");
+    if ((rc = client.subscribe(topic, MQTT::QOS2, messageArrived)) != 0)
+    {
+        printf("rc from MQTT subscribe is %d\r\n", rc);
+        return -1;
+    }
+
+    return rc;    
+}
+
+int mqttsSubscribe(const char * topic, MQTT::QoS os, messageHandler handler)
+{
+    int rc;
+    
+    printf("Subscribing to topic ...\r\n");
+    if ((rc = client.subscribe(topic, os, handler)) != 0)
+        printf("rc from MQTT subscribe is %d\r\n", rc);
+
+    return rc;        
+}
+
+// This function is used to pass data from
+// the main thread to the MQTT listener thread
+void postMQTTUpdate(MQTT::Message msg)
+{
+    MQTT::Message * message = pool.alloc();
+    // Simple copy, I think this is via (memcpy)
+    *message = msg;
+    
+    // Push the data to the consumer thread
+    printf("Pushing data to MQTTS Listener thread ...\r\n");
+    queue.put(message);       
+}
+
+void mqttListener(void)
+{
+    printf("MQTT listener thread started ...\r\n");
+    while(true)
+    {
+        // Wait for data in the queue, timeout at 10ms
+        osEvent evt = queue.get(10);
+        if (evt.status == osEventMessage) {
+            printf("Message arrived from main thread ...\r\n");
+            // Unpack the message
+            MQTT::Message * message = (MQTT::Message *)evt.value.p;
+            
+            printf("Publishing message to MQTT ...\r\n");
+            // Push to mqtt
+            int rc = client.publish(topic, *message);
+            if (rc < 0)
+                printf("Error sending mqtt message \r\n");
+            else
+                printf("Message published ...\r\n");
+                
+            // Don't forget this!
+            pool.free(message);
+        }        
+        
+        printf("MQTT client yeild ...\r\n");
+        if (client.yield(100) != 0)
+        {
+            client.disconnect();
+            // TODO: reconnect TLS session.
+            return;
+        }
+        printf("MQTT client yeild successful ...\r\n");
+    }
+}
 
 int main(int argc, char* argv[])
 {
     float version = 0.6;
-    char* topic = "mbed-sample";
 
-    logMessage("HelloMQTT: version is %.2f\r\n", version);
+    int i = 0;
+
+    printf("HelloMQTT: version is %.2f\r\n", version);
 
     NetworkInterface* network = easy_connect(true);
     if (!network) {
         return -1;
     }
 
-    MQTTNetwork mqttNetwork(network);
-
-    MQTT::Client<MQTTNetwork, Countdown> client = MQTT::Client<MQTTNetwork, Countdown>(mqttNetwork);
-
-    const char* hostname = "m2m.eclipse.org";
-    int port = 1883;
-    logMessage("Connecting to %s:%d\r\n", hostname, port);
-    int rc = mqttNetwork.connect(hostname, port);
-    if (rc != 0)
-        logMessage("rc from TCP connect is %d\r\n", rc);
-
-    MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
-    data.MQTTVersion = 3;
-    data.clientID.cstring = "mbed-sample";
-    data.username.cstring = "testuser";
-    data.password.cstring = "testpassword";
-    if ((rc = client.connect(data)) != 0)
-        logMessage("rc from MQTT connect is %d\r\n", rc);
-
-    if ((rc = client.subscribe(topic, MQTT::QOS2, messageArrived)) != 0)
-        logMessage("rc from MQTT subscribe is %d\r\n", rc);
-
-    MQTT::Message message;
+    if ( mqttNetwork.setupTLS(network, SSL_CA_PEM) != 0 )
+    {
+        printf("Failed initializing sercure MQTTS...\r\n");
+        return -1;
+    }
+    
+    if ( mqttsConnect("mqtt.mbedhacks.com", 
+                        8883,"mbedtest_01","tinong","tatay") != 0 )
+    {
+        printf("Failed connecting to mqtt.mbedhacks.com:8883 \r\n");
+        return -1;
+    }
+    
+    if ( mqttsSubscribe(topic, MQTT::QOS2, messageArrived) != 0 )
+    {
+        printf("Failed to subscribe to a topic!\r\n");
+        return -1;
+    }
+    
+    // Run an MQTT listener on its own thread.
+    thdMQTT.start(mqttListener);
+    
+    // The main loops just sends messages to MQTT server
+    while(true)
+    {
+        MQTT::Message mqtt_msg;
+        char buff[100];
+        
+        sprintf(buff, "message test %d", i);
+        mqtt_msg.qos = MQTT::QOS0;
+        mqtt_msg.retained = false;
+        mqtt_msg.dup = false;
+        mqtt_msg.payload = (void*)buff;
+        mqtt_msg.payloadlen = strlen(buff)+1;
+                    
+        // publish the message to mqtt
+        postMQTTUpdate(mqtt_msg);
+        i++;
+        
+        Thread::wait(2000);
+    }
 
-    // QoS 0
-    char buf[100];
-    sprintf(buf, "Hello World!  QoS 0 message from app version %f\r\n", version);
-    message.qos = MQTT::QOS0;
-    message.retained = false;
-    message.dup = false;
-    message.payload = (void*)buf;
-    message.payloadlen = strlen(buf)+1;
-    rc = client.publish(topic, message);
-    while (arrivedcount < 1)
-        client.yield(100);
+    //mqttNetwork.disconnect();
 
-    // QoS 1
-    sprintf(buf, "Hello World!  QoS 1 message from app version %f\r\n", version);
-    message.qos = MQTT::QOS1;
-    message.payloadlen = strlen(buf)+1;
-    rc = client.publish(topic, message);
-    while (arrivedcount < 2)
-        client.yield(100);
-
-    // QoS 2
-    sprintf(buf, "Hello World!  QoS 2 message from app version %f\r\n", version);
-    message.qos = MQTT::QOS2;
-    message.payloadlen = strlen(buf)+1;
-    rc = client.publish(topic, message);
-    while (arrivedcount < 3)
-        client.yield(100);
-
-    if ((rc = client.unsubscribe(topic)) != 0)
-        logMessage("rc from unsubscribe was %d\r\n", rc);
-
-    if ((rc = client.disconnect()) != 0)
-        logMessage("rc from disconnect was %d\r\n", rc);
-
-    mqttNetwork.disconnect();
-
-    logMessage("Version %.2f: finish %d msgs\r\n", version, arrivedcount);
+    //printf("Version %.2f: finish %d msgs\r\n", version, arrivedcount);
 
     return 0;
 }