A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.

Dependencies:   FP MQTTPacket

Fork of HelloMQTT by MQTT

Revision:
30:b2aed80037db
Parent:
27:c90092f35d79
Child:
33:38e2e7bf91eb
--- a/MQTTThreadedClient.cpp	Mon Mar 27 15:16:23 2017 +0000
+++ b/MQTTThreadedClient.cpp	Tue Mar 28 09:18:49 2017 +0000
@@ -7,8 +7,8 @@
 #include "mbedtls/ctr_drbg.h"
 #include "mbedtls/error.h"
 
-static MemoryPool<PubMessage, 16> mpool;
-static Queue<PubMessage, 16> mqueue;
+static MemoryPool<MQTT::PubMessage, 8> mpool;
+static Queue<MQTT::PubMessage, 8> mqueue;
 
 // SSL/TLS variables
 mbedtls_entropy_context _entropy;
@@ -18,6 +18,8 @@
 mbedtls_ssl_config _ssl_conf;    
 mbedtls_ssl_session saved_session;
 
+namespace MQTT {
+    
 /**
  * Receive callback for mbed TLS
  */
@@ -135,8 +137,8 @@
 {
         int ret;
         
-        printf("Initializing TLS ...\r\n");
-        printf("mbedtls_ctr_drdbg_seed ...\r\n");
+        DBG("Initializing TLS ...\r\n");
+        DBG("mbedtls_ctr_drdbg_seed ...\r\n");
         if ((ret = mbedtls_ctr_drbg_seed(&_ctr_drbg, mbedtls_entropy_func, &_entropy,
                           (const unsigned char *) DRBG_PERS,
                           sizeof (DRBG_PERS))) != 0) {
@@ -144,7 +146,7 @@
             _error = ret;
             return -1;
         }
-        printf("mbedtls_x509_crt_parse ...\r\n");
+        DBG("mbedtls_x509_crt_parse ...\r\n");
         if ((ret = mbedtls_x509_crt_parse(&_cacert, (const unsigned char *) ssl_ca_pem,
                            strlen(ssl_ca_pem) + 1)) != 0) {
             mbedtls_printf("mbedtls_x509_crt_parse returned [%x]\r\n", ret);
@@ -152,7 +154,7 @@
             return -1;
         }
 
-        printf("mbedtls_ssl_config_defaults ...\r\n");
+        DBG("mbedtls_ssl_config_defaults ...\r\n");
         if ((ret = mbedtls_ssl_config_defaults(&_ssl_conf,
                         MBEDTLS_SSL_IS_CLIENT,
                         MBEDTLS_SSL_TRANSPORT_STREAM,
@@ -162,15 +164,15 @@
             return -1;
         }
 
-        printf("mbedtls_ssl_config_ca_chain ...\r\n");
+        DBG("mbedtls_ssl_config_ca_chain ...\r\n");
         mbedtls_ssl_conf_ca_chain(&_ssl_conf, &_cacert, NULL);
-        printf("mbedtls_ssl_conf_rng ...\r\n");
+        DBG("mbedtls_ssl_conf_rng ...\r\n");
         mbedtls_ssl_conf_rng(&_ssl_conf, mbedtls_ctr_drbg_random, &_ctr_drbg);
 
         /* It is possible to disable authentication by passing
          * MBEDTLS_SSL_VERIFY_NONE in the call to mbedtls_ssl_conf_authmode()
          */
-        printf("mbedtls_ssl_conf_authmode ...\r\n");         
+        DBG("mbedtls_ssl_conf_authmode ...\r\n");         
         mbedtls_ssl_conf_authmode(&_ssl_conf, MBEDTLS_SSL_VERIFY_REQUIRED);
 
 #if DEBUG_LEVEL > 0
@@ -179,7 +181,7 @@
         mbedtls_debug_set_threshold(DEBUG_LEVEL);
 #endif
 
-        printf("mbedtls_ssl_setup ...\r\n");         
+        DBG("mbedtls_ssl_setup ...\r\n");         
         if ((ret = mbedtls_ssl_setup(&_ssl, &_ssl_conf)) != 0) {
             mbedtls_printf("mbedtls_ssl_setup returned [%x]\r\n", ret);
             _error = ret;
@@ -194,7 +196,7 @@
         int ret;
         
         /* Start the handshake, the rest will be done in onReceive() */
-        printf("Starting the TLS handshake...\r\n");
+        DBG("Starting the TLS handshake...\r\n");
         ret = mbedtls_ssl_handshake(&_ssl);
         if (ret < 0) 
         {
@@ -210,7 +212,7 @@
         }
 
         /* Handshake done, time to print info */
-        printf("TLS connection to %s:%d established\r\n", 
+        DBG("TLS connection to %s:%d established\r\n", 
             host.c_str(), port);
 
         const uint32_t buf_size = 1024;
@@ -218,22 +220,23 @@
         mbedtls_x509_crt_info(buf, buf_size, "\r    ",
                         mbedtls_ssl_get_peer_cert(&_ssl));
                         
-        printf("Server certificate:\r\n%s\r", buf);
+        DBG("Server certificate:\r\n%s\r", buf);
         // Verify server cert ...
         uint32_t flags = mbedtls_ssl_get_verify_result(&_ssl);
         if( flags != 0 )
         {
             mbedtls_x509_crt_verify_info(buf, buf_size, "\r  ! ", flags);
-            printf("Certificate verification failed:\r\n%s\r\r\n", buf);
+            DBG("Certificate verification failed:\r\n%s\r\r\n", buf);
             // free server cert ... before error return
             delete [] buf;
             return -1;
         }
         
-        printf("Certificate verification passed\r\n\r\n");
+        DBG("Certificate verification passed\r\n\r\n");
         // delete server cert after verification
         delete [] buf;
         
+#if defined(MBEDTLS_SSL_CLI_C)        
         // TODO: Save the session here for reconnect.
         if( ( ret = mbedtls_ssl_get_session( &_ssl, &saved_session ) ) != 0 )
         {
@@ -241,8 +244,8 @@
             hasSavedSession = false;
             return -1;
         }  
-        
-        printf("Session saved for reconnect ...\r\n");     
+#endif        
+        DBG("Session saved for reconnect ...\r\n");     
         hasSavedSession = true;
         
         return 0;
@@ -436,7 +439,7 @@
 
     if (!isConnected)
     {
-        printf("Session not connected! \r\n");
+        DBG("Session not connected! \r\n");
         return rc;
     }
         
@@ -445,18 +448,18 @@
     // amount for our 32 bit timers which accepts ms.
     keepAliveInterval = (connect_options.keepAliveInterval * 1000);
     
-    printf("Login with: \r\n");
-    printf("\tUsername: [%s]\r\n", connect_options.username.cstring);
-    printf("\tPassword: [%s]\r\n", connect_options.password.cstring);
+    DBG("Login with: \r\n");
+    DBG("\tUsername: [%s]\r\n", connect_options.username.cstring);
+    DBG("\tPassword: [%s]\r\n", connect_options.password.cstring);
     
     if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &connect_options)) <= 0)
     {
-        printf("Error serializing connect packet ...\r\n");
+        DBG("Error serializing connect packet ...\r\n");
         return rc;
     }
     if ((rc = sendPacket((size_t) len)) != SUCCESS)  // send the connect packet
     {
-        printf("Error sending the connect request packet ...\r\n");
+        DBG("Error sending the connect request packet ...\r\n");
         return rc; 
     }
     
@@ -465,7 +468,7 @@
     {
         unsigned char connack_rc = 255;
         bool sessionPresent = false;
-        printf("Connection acknowledgement received ... deserializing respones ...\r\n");
+        DBG("Connection acknowledgement received ... deserializing respones ...\r\n");
         if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
             rc = connack_rc;
         else
@@ -476,11 +479,11 @@
 
     if (rc == SUCCESS)
     {
-        printf("Connected!!! ... starting connection timers ...\r\n");
+        DBG("Connected!!! ... starting connection timers ...\r\n");
         resetConnectionTimer();
     }
     
-    printf("Returning with rc = %d\r\n", rc);
+    DBG("Returning with rc = %d\r\n", rc);
     
     return rc;    
 }
@@ -494,7 +497,7 @@
             && ( mbedtls_ssl_session_reset( &_ssl ) != 0 )
            )
         {
-            printf( "Session reset returned an error \r\n");
+            DBG( "Session reset returned an error \r\n");
         }        
         
         isConnected = false;
@@ -509,7 +512,7 @@
     if ((network == NULL) || (tcpSocket == NULL)
         || host.empty())
     {
-        printf("Network settings not set! \r\n");
+        DBG("Network settings not set! \r\n");
         return ret;
     }
     
@@ -519,26 +522,27 @@
             mbedtls_printf( " failed\n  ! mbedtls_ssl_session_reset returned -0x%x\n\n", -ret );
             return ret;
         }
-
+#if defined(MBEDTLS_SSL_CLI_C)
         if ( hasSavedSession && (( ret = mbedtls_ssl_set_session( &_ssl, &saved_session ) ) != 0 )) {
             mbedtls_printf( " failed\n  ! mbedtls_ssl_conf_session returned %d\n\n", ret );
             return ret;
         }
+#endif        
     }
         
     tcpSocket->open(network);
     if (useTLS)
     {
-        printf("mbedtls_ssl_set_hostname ...\r\n");         
+        DBG("mbedtls_ssl_set_hostname ...\r\n");         
         mbedtls_ssl_set_hostname(&_ssl, host.c_str());
-        printf("mbedtls_ssl_set_bio ...\r\n");         
+        DBG("mbedtls_ssl_set_bio ...\r\n");         
         mbedtls_ssl_set_bio(&_ssl, static_cast<void *>(tcpSocket),
                                    ssl_send, ssl_recv, NULL );
     }
     
     if (( ret = tcpSocket->connect(host.c_str(), port)) < 0 )
     {
-         printf("Error connecting to %s:%d with %d\r\n", host.c_str(), port, ret);
+         DBG("Error connecting to %s:%d with %d\r\n", host.c_str(), port, ret);
          return ret;
     }else
          isConnected = true;
@@ -548,10 +552,10 @@
         
         if (doTLSHandshake() < 0)
         {
-            printf("TLS Handshake failed! \r\n");
+            DBG("TLS Handshake failed! \r\n");
             return FAILURE;
         }else
-            printf("TLS Handshake complete!! \r\n");
+            DBG("TLS Handshake complete!! \r\n");
     }
     
     return login();
@@ -580,7 +584,7 @@
     *message = msg;
     
     // Push the data to the thread
-    printf("[Thread:%d] Pushing data to consumer thread ...\r\n", Thread::gettid());
+    DBG("[Thread:%d] Pushing data to consumer thread ...\r\n", Thread::gettid());
     mqueue.put(message);
     
     return SUCCESS;
@@ -592,7 +596,7 @@
      
      if (!isConnected) 
      {
-        printf("[Thread:%d] Not connected!!! ...\r\n", Thread::gettid());
+        DBG("[Thread:%d] Not connected!!! ...\r\n", Thread::gettid());
         return FAILURE;
      }
         
@@ -601,17 +605,17 @@
               topicString, (unsigned char*) &message.payload[0], (int) message.payloadlen);
      if (len <= 0)
      {
-         printf("[Thread:%d]Failed serializing message ...\r\n", Thread::gettid());
+         DBG("[Thread:%d]Failed serializing message ...\r\n", Thread::gettid());
          return FAILURE;
      }
      
      if (sendPacket(len) == SUCCESS)
      {
-         printf("[Thread:%d]Successfully sent publish packet to server ...\r\n", Thread::gettid());
+         DBG("[Thread:%d]Successfully sent publish packet to server ...\r\n", Thread::gettid());
          return SUCCESS;
      }
     
-    printf("[Thread:%d]Failed to send publish packet to server ...\r\n", Thread::gettid());
+    DBG("[Thread:%d]Failed to send publish packet to server ...\r\n", Thread::gettid());
     return FAILURE;
 }
 
@@ -630,11 +634,11 @@
     
     if (!isConnected) 
     {
-            printf("Session not connected!!\r\n");
+            DBG("Session not connected!!\r\n");
             return 0;
     }
     
-    printf("Processing subscribed topics ....\r\n");
+    DBG("Processing subscribed topics ....\r\n");
     
     std::map<std::string, FP<void, MessageData &> >::iterator it;
     for(it = topicCBMap.begin(); it != topicCBMap.end(); it++) 
@@ -645,21 +649,21 @@
         QoS qos = QOS0;
 
         MQTTString topic = {(char*)it->first.c_str(), {0, 0}};
-        printf("Subscribing to topic [%s]\r\n", topic.cstring);
+        DBG("Subscribing to topic [%s]\r\n", topic.cstring);
 
 
         len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
         if (len <= 0) {
-            printf("Error serializing subscribe packet ...\r\n");
+            DBG("Error serializing subscribe packet ...\r\n");
             continue;
         }
 
         if ((rc = sendPacket(len)) != SUCCESS) {
-            printf("Error sending subscribe packet [%d]\r\n", rc);
+            DBG("Error sending subscribe packet [%d]\r\n", rc);
             continue;
         }
 
-        printf("Waiting for subscription ack ...\r\n");
+        DBG("Waiting for subscription ack ...\r\n");
         // Wait for SUBACK, dropping packets read along the way ...
         if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback
             int count = 0, grantedQoS = -1;
@@ -671,13 +675,13 @@
             {
                 // Reset connection timers here ...
                 resetConnectionTimer();
-                printf("Successfully subscribed to %s ...\r\n", it->first.c_str());
+                DBG("Successfully subscribed to %s ...\r\n", it->first.c_str());
                 numsubscribed++;
             } else {
-                printf("Failed to subscribe to topic %s ... (not authorized?)\r\n", it->first.c_str());
+                DBG("Failed to subscribe to topic %s ... (not authorized?)\r\n", it->first.c_str());
             }
         } else 
-            printf("Failed to subscribe to topic %s (ack not received) ...\r\n", it->first.c_str());
+            DBG("Failed to subscribe to topic %s (ack not received) ...\r\n", it->first.c_str());
     } // end for loop
     
     return numsubscribed;    
@@ -715,7 +719,7 @@
     MQTTString topicName = MQTTString_initializer;
     Message msg;
     int intQoS;
-    printf("[Thread:%d]Deserializing publish message ...\r\n", Thread::gettid());
+    DBG("[Thread:%d]Deserializing publish message ...\r\n", Thread::gettid());
     if (MQTTDeserialize_publish((unsigned char*)&msg.dup, 
             &intQoS, 
             (unsigned char*)&msg.retained, 
@@ -724,7 +728,7 @@
             (unsigned char**)&msg.payload, 
             (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
     {
-        printf("[Thread:%d]Error deserializing published message ...\r\n", Thread::gettid());
+        DBG("[Thread:%d]Error deserializing published message ...\r\n", Thread::gettid());
         return -1;
     }
 
@@ -735,7 +739,7 @@
     }else
         topic = (const char *) topicName.cstring;
     
-    printf("[Thread:%d]Got message for topic [%s], QoS [%d] ...\r\n", Thread::gettid(), topic.c_str(), intQoS);
+    DBG("[Thread:%d]Got message for topic [%s], QoS [%d] ...\r\n", Thread::gettid(), topic.c_str(), intQoS);
     
     msg.qos = (QoS) intQoS;
 
@@ -746,7 +750,7 @@
         // Call the callback function 
         if (topicCBMap[topic].attached())
         {
-            printf("[Thread:%d]Invoking function handler for topic ...\r\n", Thread::gettid());
+            DBG("[Thread:%d]Invoking function handler for topic ...\r\n", Thread::gettid());
             MessageData md(topicName, msg);            
             topicCBMap[topic](md);
             
@@ -801,7 +805,7 @@
     int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
     if (len > 0 && (sendPacket(len) == SUCCESS)) // send the ping packet
     {
-        printf("[Thread:%d]Ping request sent successfully ...\r\n", Thread::gettid());
+        DBG("[Thread:%d]Ping request sent successfully ...\r\n", Thread::gettid());
     }
 }
 
@@ -829,7 +833,7 @@
         }
         
         numsubs = processSubscriptions();
-        printf("Subscribed %d topics ...\r\n", numsubs);
+        DBG("Subscribed %d topics ...\r\n", numsubs);
          
         // loop read    
         while(true) 
@@ -842,13 +846,13 @@
                     break;
                 case FAILURE:
                     {
-                        printf("readPacket returned failure \r\n");
+                        DBG("readPacket returned failure \r\n");
                         goto reconnect;
                     }
                 case BUFFER_OVERFLOW: 
                     {
                         // TODO: Network error, do we disconnect and reconnect?
-                        printf("[Thread:%d]Failure or buffer overflow problem ... \r\n", Thread::gettid());
+                        DBG("[Thread:%d]Failure or buffer overflow problem ... \r\n", Thread::gettid());
                         MBED_ASSERT(false);
                     }
                     break;
@@ -862,22 +866,22 @@
                     break;
                 case PUBLISH: 
                     {
-                        printf("[Thread:%d]Publish received!....\r\n", Thread::gettid());
+                        DBG("[Thread:%d]Publish received!....\r\n", Thread::gettid());
                         // We receive data from the MQTT server ..
                         if (handlePublishMsg() < 0) {
-                            printf("[Thread:%d]Error handling PUBLISH message ... \r\n", Thread::gettid());
+                            DBG("[Thread:%d]Error handling PUBLISH message ... \r\n", Thread::gettid());
                             break;
                         }
                     }
                     break;
                 case PINGRESP: 
                     {
-                        printf("[Thread:%d]Got ping response ...\r\n", Thread::gettid());
+                        DBG("[Thread:%d]Got ping response ...\r\n", Thread::gettid());
                         resetConnectionTimer();
                     }
                     break;
                 default:
-                    printf("[Thread:%d]Unknown/Not handled message from server pType[%d]\r\n", Thread::gettid(), pType);
+                    DBG("[Thread:%d]Unknown/Not handled message from server pType[%d]\r\n", Thread::gettid(), pType);
             }
 
             // Check if its time to send a keepAlive packet
@@ -891,7 +895,7 @@
             osEvent evt = mqueue.get(10);
             if (evt.status == osEventMessage) {
 
-                printf("[Thread:%d]Got message to publish! ... \r\n", Thread::gettid());
+                DBG("[Thread:%d]Got message to publish! ... \r\n", Thread::gettid());
 
                 // Unpack the message
                 PubMessage * message = (PubMessage *)evt.value.p;
@@ -916,7 +920,7 @@
 
 reconnect:
         // reconnect?
-        printf("Client disconnected!! ... retrying ...\r\n");
+        DBG("Client disconnected!! ... retrying ...\r\n");
         disconnect();
         
     };
@@ -926,4 +930,6 @@
 {
     // TODO: Set a signal/flag that the running thread 
     // will check if its ok to stop ...
+}
+
 }
\ No newline at end of file