A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.

Dependencies:   FP MQTTPacket

Fork of HelloMQTT by MQTT

Revision:
26:4b21de8043a5
Parent:
25:326f00faa092
Child:
27:c90092f35d79
--- a/MQTTThreadedClient.cpp	Mon Mar 27 03:53:18 2017 +0000
+++ b/MQTTThreadedClient.cpp	Mon Mar 27 13:45:26 2017 +0000
@@ -89,12 +89,12 @@
     char *buf = new char[buf_size];
     (void) data;
 
-    if (_debug) mbedtls_printf("\nVerifying certificate at depth %d:\n", depth);
+    if (_debug) mbedtls_printf("\nVerifying certificate at depth %d:\r\n", depth);
     mbedtls_x509_crt_info(buf, buf_size - 1, "  ", crt);
     if (_debug) mbedtls_printf("%s", buf);
 
     if (*flags == 0)
-        if (_debug) mbedtls_printf("No verification issue for this certificate\n");
+        if (_debug) mbedtls_printf("No verification issue for this certificate\r\n");
         else {
             mbedtls_x509_crt_verify_info(buf, buf_size, "  ! ", *flags);
             if (_debug) mbedtls_printf("%s\n", buf);
@@ -108,7 +108,7 @@
 
 void MQTTThreadedClient::setupTLS()    
 {
-        if (ssl_ca_pem != NULL)
+        if (useTLS)
         {
             mbedtls_entropy_init(&_entropy);
             mbedtls_ctr_drbg_init(&_ctr_drbg);
@@ -121,7 +121,7 @@
 
 void MQTTThreadedClient::freeTLS()
 {
-        if (ssl_ca_pem != NULL)
+        if (useTLS)
         {
             mbedtls_entropy_free(&_entropy);
             mbedtls_ctr_drbg_free(&_ctr_drbg);
@@ -140,14 +140,14 @@
         if ((ret = mbedtls_ctr_drbg_seed(&_ctr_drbg, mbedtls_entropy_func, &_entropy,
                           (const unsigned char *) DRBG_PERS,
                           sizeof (DRBG_PERS))) != 0) {
-            printf("error [%d] mbedtls_crt_drbg_init", ret);
+            mbedtls_printf("mbedtls_crt_drbg_init returned [%x]\r\n", ret);
             _error = ret;
             return -1;
         }
         printf("mbedtls_x509_crt_parse ...\r\n");
         if ((ret = mbedtls_x509_crt_parse(&_cacert, (const unsigned char *) ssl_ca_pem,
                            strlen(ssl_ca_pem) + 1)) != 0) {
-            printf("error [%d] mbedtls_x509_crt_parse", ret);
+            mbedtls_printf("mbedtls_x509_crt_parse returned [%x]\r\n", ret);
             _error = ret;
             return -1;
         }
@@ -157,7 +157,7 @@
                         MBEDTLS_SSL_IS_CLIENT,
                         MBEDTLS_SSL_TRANSPORT_STREAM,
                         MBEDTLS_SSL_PRESET_DEFAULT)) != 0) {
-            printf("error [%d] mbedtls_ssl_config_defaults", ret);
+            mbedtls_printf("mbedtls_ssl_config_defaults returned [%x]\r\n", ret);
             _error = ret;
             return -1;
         }
@@ -181,7 +181,7 @@
 
         printf("mbedtls_ssl_setup ...\r\n");         
         if ((ret = mbedtls_ssl_setup(&_ssl, &_ssl_conf)) != 0) {
-            printf("error [%d] mbedtls_ssl_setup", ret);
+            mbedtls_printf("mbedtls_ssl_setup returned [%x]\r\n", ret);
             _error = ret;
             return -1;
         }
@@ -200,17 +200,13 @@
         {
             if (ret != MBEDTLS_ERR_SSL_WANT_READ &&
                 ret != MBEDTLS_ERR_SSL_WANT_WRITE) 
-            {
-                    printf("error [%d] mbedtls_ssl_handshake", ret);
-                    tcpSocket->close();
-                    _error = -1;
-            }
+                    mbedtls_printf("mbedtls_ssl_handshake returned [%x]\r\n", ret);
             else 
             {
                 // do not close the socket if timed out
-                _error = ret;
+                ret = TIMEOUT;
             }
-            return -1;
+            return ret;
         }
 
         /* Handshake done, time to print info */
@@ -241,11 +237,14 @@
         // TODO: Save the session here for reconnect.
         if( ( ret = mbedtls_ssl_get_session( &_ssl, &saved_session ) ) != 0 )
         {
-            printf( "mbedtls_ssl_get_session returned -0x%x\n\n", -ret );
+            mbedtls_printf( "mbedtls_ssl_get_session returned -0x%x\n\n", -ret );
+            hasSavedSession = false;
             return -1;
         }  
         
         printf("Session saved for reconnect ...\r\n");     
+        hasSavedSession = true;
+        
         return 0;
 }
 
@@ -256,7 +255,7 @@
     if (tcpSocket == NULL)
         return -1;
 
-    if (ssl_ca_pem != NULL) 
+    if (useTLS) 
     {
         // Do SSL/TLS read
         rc = mbedtls_ssl_read(&_ssl, (unsigned char *) buffer, size);
@@ -284,7 +283,7 @@
     if (tcpSocket == NULL)
         return -1;
     
-    if (ssl_ca_pem != NULL) {
+    if (useTLS) {
         // Do SSL/TLS write
         rc =  mbedtls_ssl_write(&_ssl, (const unsigned char *) buffer, size);
         if (MBEDTLS_ERR_SSL_WANT_WRITE == rc)
@@ -430,27 +429,27 @@
 }
 
 
-int MQTTThreadedClient::connect(MQTTPacket_connectData& options)
+int MQTTThreadedClient::login()
 {
     int rc = FAILURE;
     int len = 0;
 
-    if (isConnected)
+    if (!isConnected)
     {
-        printf("Session already connected! \r\n");
+        printf("Session not connected! \r\n");
         return rc;
     }
         
     // Copy the keepAliveInterval value to local
     // MQTT specifies in seconds, we have to multiply that
     // amount for our 32 bit timers which accepts ms.
-    keepAliveInterval = (options.keepAliveInterval * 1000);
+    keepAliveInterval = (connect_options.keepAliveInterval * 1000);
     
-    printf("Connecting with: \r\n");
-    printf("\tUsername: [%s]\r\n", options.username.cstring);
-    printf("\tPassword: [%s]\r\n", options.password.cstring);
+    printf("Login with: \r\n");
+    printf("\tUsername: [%s]\r\n", connect_options.username.cstring);
+    printf("\tPassword: [%s]\r\n", connect_options.password.cstring);
     
-    if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0)
+    if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &connect_options)) <= 0)
     {
         printf("Error serializing connect packet ...\r\n");
         return rc;
@@ -478,11 +477,7 @@
     if (rc == SUCCESS)
     {
         printf("Connected!!! ... starting connection timers ...\r\n");
-        isConnected = true;
         resetConnectionTimer();
-    }else
-    {
-        // TODO: Call socket->disconnect()?
     }
     
     printf("Returning with rc = %d\r\n", rc);
@@ -495,24 +490,44 @@
 {
     if (isConnected)
     {
-        // TODO: Send unsubscribe message ...
+        if( useTLS 
+            && ( mbedtls_ssl_session_reset( &_ssl ) != 0 )
+           )
+        {
+            printf( "Session reset returned an error \r\n");
+        }        
         
         isConnected = false;
         tcpSocket->close();      
     }
 }
 
-int MQTTThreadedClient::connect(const char * chost, uint16_t cport, MQTTPacket_connectData & options)
+int MQTTThreadedClient::connect()
 {
     int ret = FAILURE;
+
+    if ((network == NULL) || (tcpSocket == NULL)
+        || host.empty())
+    {
+        printf("Network settings not set! \r\n");
+        return ret;
+    }
     
-    // Copy the settings for reconnection
-    host = chost;
-    port = cport;
-    connect_options = options;
-    
+    if (useTLS) 
+    {
+        if( ( ret = mbedtls_ssl_session_reset( &_ssl ) ) != 0 ) {
+            mbedtls_printf( " failed\n  ! mbedtls_ssl_session_reset returned -0x%x\n\n", -ret );
+            return ret;
+        }
+
+        if ( hasSavedSession && (( ret = mbedtls_ssl_set_session( &_ssl, &saved_session ) ) != 0 )) {
+            mbedtls_printf( " failed\n  ! mbedtls_ssl_conf_session returned %d\n\n", ret );
+            return ret;
+        }
+    }
+        
     tcpSocket->open(network);
-    if (ssl_ca_pem != NULL)
+    if (useTLS)
     {
         printf("mbedtls_ssl_set_hostname ...\r\n");         
         mbedtls_ssl_set_hostname(&_ssl, host.c_str());
@@ -523,18 +538,31 @@
     
     if (( ret = tcpSocket->connect(host.c_str(), port)) < 0 )
     {
-         
          printf("Error connecting to %s:%d with %d\r\n", host.c_str(), port, ret);
          return ret;
-    } 
+    }else
+         isConnected = true;
     
-    if ((ssl_ca_pem != NULL) && (doTLSHandshake() < 0))
+    if (useTLS) 
     {
-         printf("TLS Handshake failed! \r\n");
-         return FAILURE;
+        
+        if (doTLSHandshake() < 0)
+        {
+            printf("TLS Handshake failed! \r\n");
+            return FAILURE;
+        }else
+            printf("TLS Handshake complete!! \r\n");
     }
     
-    return connect(connect_options);
+    return login();
+}
+
+void MQTTThreadedClient::setConnectionParameters(const char * chost, uint16_t cport, MQTTPacket_connectData & options)
+{
+    // Copy the settings for reconnection
+    host = chost;
+    port = cport;
+    connect_options = options;    
 }
 
 int MQTTThreadedClient::publish(PubMessage& msg)
@@ -593,7 +621,7 @@
     FP<void,MessageData &> fp;
     fp.attach(function);
     
-    topicCBMap_t.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));    
+    topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));    
 } 
 
 int MQTTThreadedClient::processSubscriptions()
@@ -606,8 +634,10 @@
             return 0;
     }
     
+    printf("Processing subscribed topics ....\r\n");
+    
     std::map<std::string, FP<void, MessageData &> >::iterator it;
-    for(it = topicCBMap_t.begin(); it != topicCBMap_t.end(); it++) 
+    for(it = topicCBMap.begin(); it != topicCBMap.end(); it++) 
     {
         int rc = FAILURE;
         int len = 0;
@@ -653,71 +683,6 @@
     return numsubscribed;    
 }
 
-int MQTTThreadedClient::subscribe(const char * topicstr, QoS qos, void (*function)(MessageData &))
-{
-    int rc = FAILURE;
-    int len = 0;
-
-    MQTTString topic = {(char*)topicstr, {0, 0}};
-    printf("Subscribing to topic [%s]\r\n", topicstr);
-    
-    if (!isConnected)
-    {
-        printf("Session already connected!!\r\n");
-        return rc;
-    }
-
-    len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
-    if (len <= 0)
-    {
-        printf("Error serializing subscribe packet ...\r\n");
-        return rc;
-    }
-    
-    if ((rc = sendPacket(len)) != SUCCESS) 
-    {
-        printf("Error sending subscribe packet [%d]\r\n", rc);
-        return rc;
-    }
-    
-    printf("Waiting for subscription ack ...\r\n");
-    // Wait for SUBACK, dropping packets read along the way ...
-    if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK)  // wait for suback
-    {
-        int count = 0, grantedQoS = -1;
-        unsigned short mypacketid;
-        if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
-            rc = grantedQoS; // 0, 1, 2 or 0x80
-        // For as long as we do not get 0x80 .. 
-        if (rc != 0x80)
-        {
-            // Add message handlers to the map
-            FP<void,MessageData &> fp;
-            fp.attach(function);
-            
-            topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));
-            
-            // Reset connection timers here ...
-            resetConnectionTimer();
-            
-            printf("Successfully subscribed to %s ...\r\n", topicstr);
-            rc = SUCCESS;
-        }else
-        {
-            printf("Failed to subscribe to topic %s ... (not authorized?)\r\n", topicstr);
-        }
-    }
-    else
-    {   
-        printf("Failed to subscribe to topic %s (ack not received) ...\r\n", topicstr);
-        rc = FAILURE;
-    }
-        
-    return rc;
-        
-}
-
-
 bool MQTTThreadedClient::isTopicMatched(char* topicFilter, MQTTString& topicName)
 {
     char* curf = topicFilter;
@@ -843,23 +808,43 @@
 void MQTTThreadedClient::startListener()
 {
     int pType;
+    int numsubs;
     // Continuesly listens for packets and dispatch
     // message handlers ...
-    do {
-        // Connect to server ...
-        if (!isConnected)
+    if (useTLS)
+    {
+        initTLS();
+    }
+            
+    while(true)
+    {
+
+        // Attempt to reconnect and login
+        if ( connect() < 0 )
         {
-            // Attempt to reconnect ...
+            disconnect();
+            // Wait for a few secs and reconnect ...
+            Thread::wait(6000);
+            continue;
         }
         
-        while(true) {
+        numsubs = processSubscriptions();
+        printf("Subscribed %d topics ...\r\n", numsubs);
+         
+        // loop read    
+        while(true) 
+        {
             pType = readPacket();
-            switch(pType) {
+            switch(pType) 
+            {
                 case TIMEOUT:
                     // No data available from the network ...
                     break;
                 case FAILURE:
-                    goto reconnect;
+                    {
+                        printf("readPacket returned failure \r\n");
+                        goto reconnect;
+                    }
                 case BUFFER_OVERFLOW: 
                     {
                         // TODO: Network error, do we disconnect and reconnect?
@@ -930,8 +915,14 @@
         } // end while loop
 
 reconnect:
-    disconnect();
-    // reconnect?
-    } while(true);
+        // reconnect?
+        disconnect();
+        
+    };
 }
-        
\ No newline at end of file
+
+void MQTTThreadedClient::stopListener()
+{
+    // TODO: Set a signal/flag that the running thread 
+    // will check if its ok to stop ...
+}
\ No newline at end of file