A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.

Dependencies:   FP MQTTPacket

Fork of HelloMQTT by MQTT

Files at this revision

API Documentation at this revision

Comitter:
vpcola
Date:
Tue Mar 28 09:18:49 2017 +0000
Parent:
29:45fd261e840b
Child:
32:16ef25cbb05c
Commit message:
Added Nucleo L476 to mbed_app.json

Changed in this revision

MQTTThreadedClient.cpp Show annotated file Show diff for this revision Revisions of this file
MQTTThreadedClient.h Show annotated file Show diff for this revision Revisions of this file
easy-connect.lib Show annotated file Show diff for this revision Revisions of this file
main.cpp Show annotated file Show diff for this revision Revisions of this file
mbed_app.json Show annotated file Show diff for this revision Revisions of this file
--- a/MQTTThreadedClient.cpp	Mon Mar 27 15:16:23 2017 +0000
+++ b/MQTTThreadedClient.cpp	Tue Mar 28 09:18:49 2017 +0000
@@ -7,8 +7,8 @@
 #include "mbedtls/ctr_drbg.h"
 #include "mbedtls/error.h"
 
-static MemoryPool<PubMessage, 16> mpool;
-static Queue<PubMessage, 16> mqueue;
+static MemoryPool<MQTT::PubMessage, 8> mpool;
+static Queue<MQTT::PubMessage, 8> mqueue;
 
 // SSL/TLS variables
 mbedtls_entropy_context _entropy;
@@ -18,6 +18,8 @@
 mbedtls_ssl_config _ssl_conf;    
 mbedtls_ssl_session saved_session;
 
+namespace MQTT {
+    
 /**
  * Receive callback for mbed TLS
  */
@@ -135,8 +137,8 @@
 {
         int ret;
         
-        printf("Initializing TLS ...\r\n");
-        printf("mbedtls_ctr_drdbg_seed ...\r\n");
+        DBG("Initializing TLS ...\r\n");
+        DBG("mbedtls_ctr_drdbg_seed ...\r\n");
         if ((ret = mbedtls_ctr_drbg_seed(&_ctr_drbg, mbedtls_entropy_func, &_entropy,
                           (const unsigned char *) DRBG_PERS,
                           sizeof (DRBG_PERS))) != 0) {
@@ -144,7 +146,7 @@
             _error = ret;
             return -1;
         }
-        printf("mbedtls_x509_crt_parse ...\r\n");
+        DBG("mbedtls_x509_crt_parse ...\r\n");
         if ((ret = mbedtls_x509_crt_parse(&_cacert, (const unsigned char *) ssl_ca_pem,
                            strlen(ssl_ca_pem) + 1)) != 0) {
             mbedtls_printf("mbedtls_x509_crt_parse returned [%x]\r\n", ret);
@@ -152,7 +154,7 @@
             return -1;
         }
 
-        printf("mbedtls_ssl_config_defaults ...\r\n");
+        DBG("mbedtls_ssl_config_defaults ...\r\n");
         if ((ret = mbedtls_ssl_config_defaults(&_ssl_conf,
                         MBEDTLS_SSL_IS_CLIENT,
                         MBEDTLS_SSL_TRANSPORT_STREAM,
@@ -162,15 +164,15 @@
             return -1;
         }
 
-        printf("mbedtls_ssl_config_ca_chain ...\r\n");
+        DBG("mbedtls_ssl_config_ca_chain ...\r\n");
         mbedtls_ssl_conf_ca_chain(&_ssl_conf, &_cacert, NULL);
-        printf("mbedtls_ssl_conf_rng ...\r\n");
+        DBG("mbedtls_ssl_conf_rng ...\r\n");
         mbedtls_ssl_conf_rng(&_ssl_conf, mbedtls_ctr_drbg_random, &_ctr_drbg);
 
         /* It is possible to disable authentication by passing
          * MBEDTLS_SSL_VERIFY_NONE in the call to mbedtls_ssl_conf_authmode()
          */
-        printf("mbedtls_ssl_conf_authmode ...\r\n");         
+        DBG("mbedtls_ssl_conf_authmode ...\r\n");         
         mbedtls_ssl_conf_authmode(&_ssl_conf, MBEDTLS_SSL_VERIFY_REQUIRED);
 
 #if DEBUG_LEVEL > 0
@@ -179,7 +181,7 @@
         mbedtls_debug_set_threshold(DEBUG_LEVEL);
 #endif
 
-        printf("mbedtls_ssl_setup ...\r\n");         
+        DBG("mbedtls_ssl_setup ...\r\n");         
         if ((ret = mbedtls_ssl_setup(&_ssl, &_ssl_conf)) != 0) {
             mbedtls_printf("mbedtls_ssl_setup returned [%x]\r\n", ret);
             _error = ret;
@@ -194,7 +196,7 @@
         int ret;
         
         /* Start the handshake, the rest will be done in onReceive() */
-        printf("Starting the TLS handshake...\r\n");
+        DBG("Starting the TLS handshake...\r\n");
         ret = mbedtls_ssl_handshake(&_ssl);
         if (ret < 0) 
         {
@@ -210,7 +212,7 @@
         }
 
         /* Handshake done, time to print info */
-        printf("TLS connection to %s:%d established\r\n", 
+        DBG("TLS connection to %s:%d established\r\n", 
             host.c_str(), port);
 
         const uint32_t buf_size = 1024;
@@ -218,22 +220,23 @@
         mbedtls_x509_crt_info(buf, buf_size, "\r    ",
                         mbedtls_ssl_get_peer_cert(&_ssl));
                         
-        printf("Server certificate:\r\n%s\r", buf);
+        DBG("Server certificate:\r\n%s\r", buf);
         // Verify server cert ...
         uint32_t flags = mbedtls_ssl_get_verify_result(&_ssl);
         if( flags != 0 )
         {
             mbedtls_x509_crt_verify_info(buf, buf_size, "\r  ! ", flags);
-            printf("Certificate verification failed:\r\n%s\r\r\n", buf);
+            DBG("Certificate verification failed:\r\n%s\r\r\n", buf);
             // free server cert ... before error return
             delete [] buf;
             return -1;
         }
         
-        printf("Certificate verification passed\r\n\r\n");
+        DBG("Certificate verification passed\r\n\r\n");
         // delete server cert after verification
         delete [] buf;
         
+#if defined(MBEDTLS_SSL_CLI_C)        
         // TODO: Save the session here for reconnect.
         if( ( ret = mbedtls_ssl_get_session( &_ssl, &saved_session ) ) != 0 )
         {
@@ -241,8 +244,8 @@
             hasSavedSession = false;
             return -1;
         }  
-        
-        printf("Session saved for reconnect ...\r\n");     
+#endif        
+        DBG("Session saved for reconnect ...\r\n");     
         hasSavedSession = true;
         
         return 0;
@@ -436,7 +439,7 @@
 
     if (!isConnected)
     {
-        printf("Session not connected! \r\n");
+        DBG("Session not connected! \r\n");
         return rc;
     }
         
@@ -445,18 +448,18 @@
     // amount for our 32 bit timers which accepts ms.
     keepAliveInterval = (connect_options.keepAliveInterval * 1000);
     
-    printf("Login with: \r\n");
-    printf("\tUsername: [%s]\r\n", connect_options.username.cstring);
-    printf("\tPassword: [%s]\r\n", connect_options.password.cstring);
+    DBG("Login with: \r\n");
+    DBG("\tUsername: [%s]\r\n", connect_options.username.cstring);
+    DBG("\tPassword: [%s]\r\n", connect_options.password.cstring);
     
     if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &connect_options)) <= 0)
     {
-        printf("Error serializing connect packet ...\r\n");
+        DBG("Error serializing connect packet ...\r\n");
         return rc;
     }
     if ((rc = sendPacket((size_t) len)) != SUCCESS)  // send the connect packet
     {
-        printf("Error sending the connect request packet ...\r\n");
+        DBG("Error sending the connect request packet ...\r\n");
         return rc; 
     }
     
@@ -465,7 +468,7 @@
     {
         unsigned char connack_rc = 255;
         bool sessionPresent = false;
-        printf("Connection acknowledgement received ... deserializing respones ...\r\n");
+        DBG("Connection acknowledgement received ... deserializing respones ...\r\n");
         if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
             rc = connack_rc;
         else
@@ -476,11 +479,11 @@
 
     if (rc == SUCCESS)
     {
-        printf("Connected!!! ... starting connection timers ...\r\n");
+        DBG("Connected!!! ... starting connection timers ...\r\n");
         resetConnectionTimer();
     }
     
-    printf("Returning with rc = %d\r\n", rc);
+    DBG("Returning with rc = %d\r\n", rc);
     
     return rc;    
 }
@@ -494,7 +497,7 @@
             && ( mbedtls_ssl_session_reset( &_ssl ) != 0 )
            )
         {
-            printf( "Session reset returned an error \r\n");
+            DBG( "Session reset returned an error \r\n");
         }        
         
         isConnected = false;
@@ -509,7 +512,7 @@
     if ((network == NULL) || (tcpSocket == NULL)
         || host.empty())
     {
-        printf("Network settings not set! \r\n");
+        DBG("Network settings not set! \r\n");
         return ret;
     }
     
@@ -519,26 +522,27 @@
             mbedtls_printf( " failed\n  ! mbedtls_ssl_session_reset returned -0x%x\n\n", -ret );
             return ret;
         }
-
+#if defined(MBEDTLS_SSL_CLI_C)
         if ( hasSavedSession && (( ret = mbedtls_ssl_set_session( &_ssl, &saved_session ) ) != 0 )) {
             mbedtls_printf( " failed\n  ! mbedtls_ssl_conf_session returned %d\n\n", ret );
             return ret;
         }
+#endif        
     }
         
     tcpSocket->open(network);
     if (useTLS)
     {
-        printf("mbedtls_ssl_set_hostname ...\r\n");         
+        DBG("mbedtls_ssl_set_hostname ...\r\n");         
         mbedtls_ssl_set_hostname(&_ssl, host.c_str());
-        printf("mbedtls_ssl_set_bio ...\r\n");         
+        DBG("mbedtls_ssl_set_bio ...\r\n");         
         mbedtls_ssl_set_bio(&_ssl, static_cast<void *>(tcpSocket),
                                    ssl_send, ssl_recv, NULL );
     }
     
     if (( ret = tcpSocket->connect(host.c_str(), port)) < 0 )
     {
-         printf("Error connecting to %s:%d with %d\r\n", host.c_str(), port, ret);
+         DBG("Error connecting to %s:%d with %d\r\n", host.c_str(), port, ret);
          return ret;
     }else
          isConnected = true;
@@ -548,10 +552,10 @@
         
         if (doTLSHandshake() < 0)
         {
-            printf("TLS Handshake failed! \r\n");
+            DBG("TLS Handshake failed! \r\n");
             return FAILURE;
         }else
-            printf("TLS Handshake complete!! \r\n");
+            DBG("TLS Handshake complete!! \r\n");
     }
     
     return login();
@@ -580,7 +584,7 @@
     *message = msg;
     
     // Push the data to the thread
-    printf("[Thread:%d] Pushing data to consumer thread ...\r\n", Thread::gettid());
+    DBG("[Thread:%d] Pushing data to consumer thread ...\r\n", Thread::gettid());
     mqueue.put(message);
     
     return SUCCESS;
@@ -592,7 +596,7 @@
      
      if (!isConnected) 
      {
-        printf("[Thread:%d] Not connected!!! ...\r\n", Thread::gettid());
+        DBG("[Thread:%d] Not connected!!! ...\r\n", Thread::gettid());
         return FAILURE;
      }
         
@@ -601,17 +605,17 @@
               topicString, (unsigned char*) &message.payload[0], (int) message.payloadlen);
      if (len <= 0)
      {
-         printf("[Thread:%d]Failed serializing message ...\r\n", Thread::gettid());
+         DBG("[Thread:%d]Failed serializing message ...\r\n", Thread::gettid());
          return FAILURE;
      }
      
      if (sendPacket(len) == SUCCESS)
      {
-         printf("[Thread:%d]Successfully sent publish packet to server ...\r\n", Thread::gettid());
+         DBG("[Thread:%d]Successfully sent publish packet to server ...\r\n", Thread::gettid());
          return SUCCESS;
      }
     
-    printf("[Thread:%d]Failed to send publish packet to server ...\r\n", Thread::gettid());
+    DBG("[Thread:%d]Failed to send publish packet to server ...\r\n", Thread::gettid());
     return FAILURE;
 }
 
@@ -630,11 +634,11 @@
     
     if (!isConnected) 
     {
-            printf("Session not connected!!\r\n");
+            DBG("Session not connected!!\r\n");
             return 0;
     }
     
-    printf("Processing subscribed topics ....\r\n");
+    DBG("Processing subscribed topics ....\r\n");
     
     std::map<std::string, FP<void, MessageData &> >::iterator it;
     for(it = topicCBMap.begin(); it != topicCBMap.end(); it++) 
@@ -645,21 +649,21 @@
         QoS qos = QOS0;
 
         MQTTString topic = {(char*)it->first.c_str(), {0, 0}};
-        printf("Subscribing to topic [%s]\r\n", topic.cstring);
+        DBG("Subscribing to topic [%s]\r\n", topic.cstring);
 
 
         len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
         if (len <= 0) {
-            printf("Error serializing subscribe packet ...\r\n");
+            DBG("Error serializing subscribe packet ...\r\n");
             continue;
         }
 
         if ((rc = sendPacket(len)) != SUCCESS) {
-            printf("Error sending subscribe packet [%d]\r\n", rc);
+            DBG("Error sending subscribe packet [%d]\r\n", rc);
             continue;
         }
 
-        printf("Waiting for subscription ack ...\r\n");
+        DBG("Waiting for subscription ack ...\r\n");
         // Wait for SUBACK, dropping packets read along the way ...
         if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback
             int count = 0, grantedQoS = -1;
@@ -671,13 +675,13 @@
             {
                 // Reset connection timers here ...
                 resetConnectionTimer();
-                printf("Successfully subscribed to %s ...\r\n", it->first.c_str());
+                DBG("Successfully subscribed to %s ...\r\n", it->first.c_str());
                 numsubscribed++;
             } else {
-                printf("Failed to subscribe to topic %s ... (not authorized?)\r\n", it->first.c_str());
+                DBG("Failed to subscribe to topic %s ... (not authorized?)\r\n", it->first.c_str());
             }
         } else 
-            printf("Failed to subscribe to topic %s (ack not received) ...\r\n", it->first.c_str());
+            DBG("Failed to subscribe to topic %s (ack not received) ...\r\n", it->first.c_str());
     } // end for loop
     
     return numsubscribed;    
@@ -715,7 +719,7 @@
     MQTTString topicName = MQTTString_initializer;
     Message msg;
     int intQoS;
-    printf("[Thread:%d]Deserializing publish message ...\r\n", Thread::gettid());
+    DBG("[Thread:%d]Deserializing publish message ...\r\n", Thread::gettid());
     if (MQTTDeserialize_publish((unsigned char*)&msg.dup, 
             &intQoS, 
             (unsigned char*)&msg.retained, 
@@ -724,7 +728,7 @@
             (unsigned char**)&msg.payload, 
             (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
     {
-        printf("[Thread:%d]Error deserializing published message ...\r\n", Thread::gettid());
+        DBG("[Thread:%d]Error deserializing published message ...\r\n", Thread::gettid());
         return -1;
     }
 
@@ -735,7 +739,7 @@
     }else
         topic = (const char *) topicName.cstring;
     
-    printf("[Thread:%d]Got message for topic [%s], QoS [%d] ...\r\n", Thread::gettid(), topic.c_str(), intQoS);
+    DBG("[Thread:%d]Got message for topic [%s], QoS [%d] ...\r\n", Thread::gettid(), topic.c_str(), intQoS);
     
     msg.qos = (QoS) intQoS;
 
@@ -746,7 +750,7 @@
         // Call the callback function 
         if (topicCBMap[topic].attached())
         {
-            printf("[Thread:%d]Invoking function handler for topic ...\r\n", Thread::gettid());
+            DBG("[Thread:%d]Invoking function handler for topic ...\r\n", Thread::gettid());
             MessageData md(topicName, msg);            
             topicCBMap[topic](md);
             
@@ -801,7 +805,7 @@
     int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
     if (len > 0 && (sendPacket(len) == SUCCESS)) // send the ping packet
     {
-        printf("[Thread:%d]Ping request sent successfully ...\r\n", Thread::gettid());
+        DBG("[Thread:%d]Ping request sent successfully ...\r\n", Thread::gettid());
     }
 }
 
@@ -829,7 +833,7 @@
         }
         
         numsubs = processSubscriptions();
-        printf("Subscribed %d topics ...\r\n", numsubs);
+        DBG("Subscribed %d topics ...\r\n", numsubs);
          
         // loop read    
         while(true) 
@@ -842,13 +846,13 @@
                     break;
                 case FAILURE:
                     {
-                        printf("readPacket returned failure \r\n");
+                        DBG("readPacket returned failure \r\n");
                         goto reconnect;
                     }
                 case BUFFER_OVERFLOW: 
                     {
                         // TODO: Network error, do we disconnect and reconnect?
-                        printf("[Thread:%d]Failure or buffer overflow problem ... \r\n", Thread::gettid());
+                        DBG("[Thread:%d]Failure or buffer overflow problem ... \r\n", Thread::gettid());
                         MBED_ASSERT(false);
                     }
                     break;
@@ -862,22 +866,22 @@
                     break;
                 case PUBLISH: 
                     {
-                        printf("[Thread:%d]Publish received!....\r\n", Thread::gettid());
+                        DBG("[Thread:%d]Publish received!....\r\n", Thread::gettid());
                         // We receive data from the MQTT server ..
                         if (handlePublishMsg() < 0) {
-                            printf("[Thread:%d]Error handling PUBLISH message ... \r\n", Thread::gettid());
+                            DBG("[Thread:%d]Error handling PUBLISH message ... \r\n", Thread::gettid());
                             break;
                         }
                     }
                     break;
                 case PINGRESP: 
                     {
-                        printf("[Thread:%d]Got ping response ...\r\n", Thread::gettid());
+                        DBG("[Thread:%d]Got ping response ...\r\n", Thread::gettid());
                         resetConnectionTimer();
                     }
                     break;
                 default:
-                    printf("[Thread:%d]Unknown/Not handled message from server pType[%d]\r\n", Thread::gettid(), pType);
+                    DBG("[Thread:%d]Unknown/Not handled message from server pType[%d]\r\n", Thread::gettid(), pType);
             }
 
             // Check if its time to send a keepAlive packet
@@ -891,7 +895,7 @@
             osEvent evt = mqueue.get(10);
             if (evt.status == osEventMessage) {
 
-                printf("[Thread:%d]Got message to publish! ... \r\n", Thread::gettid());
+                DBG("[Thread:%d]Got message to publish! ... \r\n", Thread::gettid());
 
                 // Unpack the message
                 PubMessage * message = (PubMessage *)evt.value.p;
@@ -916,7 +920,7 @@
 
 reconnect:
         // reconnect?
-        printf("Client disconnected!! ... retrying ...\r\n");
+        DBG("Client disconnected!! ... retrying ...\r\n");
         disconnect();
         
     };
@@ -926,4 +930,6 @@
 {
     // TODO: Set a signal/flag that the running thread 
     // will check if its ok to stop ...
+}
+
 }
\ No newline at end of file
--- a/MQTTThreadedClient.h	Mon Mar 27 15:16:23 2017 +0000
+++ b/MQTTThreadedClient.h	Tue Mar 28 09:18:49 2017 +0000
@@ -7,6 +7,13 @@
 #include "NetworkInterface.h"
 #include "FP.h"
 
+#define MQTT_DEBUG 1
+
+#ifdef MQTT_DEBUG
+#define DBG(fmt, args...)    printf(fmt, ## args)
+#else
+#define DBG(fmt, args...)    /* Don't do anything in release builds */
+#endif
 
 #include <cstdio>
 #include <string>
@@ -17,6 +24,9 @@
 #define MAX_MQTT_PACKET_SIZE 200
 #define MAX_MQTT_PAYLOAD_SIZE 100
 
+namespace MQTT
+{
+    
 typedef enum { QOS0, QOS1, QOS2 } QoS;
 
 // all failure return codes must be negative
@@ -187,4 +197,5 @@
     int login();
 };
 
+}
 #endif
--- a/easy-connect.lib	Mon Mar 27 15:16:23 2017 +0000
+++ b/easy-connect.lib	Tue Mar 28 09:18:49 2017 +0000
@@ -1,1 +1,1 @@
-https://github.com/ARMmbed/easy-connect/#cb933fb19cda0a733a64d6b71d271fb6bdaf9e6d
+https://github.com/ARMmbed/easy-connect/#5b9cb8cea4a11b0ab974c991b527c9b79fceae75
--- a/main.cpp	Mon Mar 27 15:16:23 2017 +0000
+++ b/main.cpp	Tue Mar 28 09:18:49 2017 +0000
@@ -30,6 +30,7 @@
 #include "easy-connect.h"
 #include "MQTTThreadedClient.h"
 
+using namespace MQTT;
 
 Serial pc(USBTX, USBRX, 115200);
 Thread msgSender(osPriorityNormal, DEFAULT_STACK_SIZE * 2);
--- a/mbed_app.json	Mon Mar 27 15:16:23 2017 +0000
+++ b/mbed_app.json	Tue Mar 28 09:18:49 2017 +0000
@@ -64,6 +64,11 @@
         "NUCLEO_F411RE": {
             "esp8266-tx": "D8",
             "esp8266-rx": "D2"
-        }
+        },
+        "NUCLEO_L476RG": {
+            "network-interface": "WIFI_ESP8266",
+            "esp8266-tx": "D8",
+            "esp8266-rx": "D2"
+        }        
     }
 }