A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.
Fork of HelloMQTT by
Revision 31:d34f6adb7a53, committed 2017-04-01
- Comitter:
- vpcola
- Date:
- Sat Apr 01 12:41:29 2017 +0000
- Parent:
- 29:45fd261e840b
- Commit message:
- Testing on NUCLEO L476RG
Changed in this revision
--- a/MQTTThreadedClient.cpp Mon Mar 27 15:16:23 2017 +0000
+++ b/MQTTThreadedClient.cpp Sat Apr 01 12:41:29 2017 +0000
@@ -7,8 +7,8 @@
#include "mbedtls/ctr_drbg.h"
#include "mbedtls/error.h"
-static MemoryPool<PubMessage, 16> mpool;
-static Queue<PubMessage, 16> mqueue;
+static MemoryPool<MQTT::PubMessage, 4> mpool;
+static Queue<MQTT::PubMessage, 4> mqueue;
// SSL/TLS variables
mbedtls_entropy_context _entropy;
@@ -18,6 +18,8 @@
mbedtls_ssl_config _ssl_conf;
mbedtls_ssl_session saved_session;
+namespace MQTT {
+
/**
* Receive callback for mbed TLS
*/
@@ -135,8 +137,8 @@
{
int ret;
- printf("Initializing TLS ...\r\n");
- printf("mbedtls_ctr_drdbg_seed ...\r\n");
+ DBG("Initializing TLS ...\r\n");
+ DBG("mbedtls_ctr_drdbg_seed ...\r\n");
if ((ret = mbedtls_ctr_drbg_seed(&_ctr_drbg, mbedtls_entropy_func, &_entropy,
(const unsigned char *) DRBG_PERS,
sizeof (DRBG_PERS))) != 0) {
@@ -144,7 +146,7 @@
_error = ret;
return -1;
}
- printf("mbedtls_x509_crt_parse ...\r\n");
+ DBG("mbedtls_x509_crt_parse ...\r\n");
if ((ret = mbedtls_x509_crt_parse(&_cacert, (const unsigned char *) ssl_ca_pem,
strlen(ssl_ca_pem) + 1)) != 0) {
mbedtls_printf("mbedtls_x509_crt_parse returned [%x]\r\n", ret);
@@ -152,7 +154,7 @@
return -1;
}
- printf("mbedtls_ssl_config_defaults ...\r\n");
+ DBG("mbedtls_ssl_config_defaults ...\r\n");
if ((ret = mbedtls_ssl_config_defaults(&_ssl_conf,
MBEDTLS_SSL_IS_CLIENT,
MBEDTLS_SSL_TRANSPORT_STREAM,
@@ -162,15 +164,15 @@
return -1;
}
- printf("mbedtls_ssl_config_ca_chain ...\r\n");
+ DBG("mbedtls_ssl_config_ca_chain ...\r\n");
mbedtls_ssl_conf_ca_chain(&_ssl_conf, &_cacert, NULL);
- printf("mbedtls_ssl_conf_rng ...\r\n");
+ DBG("mbedtls_ssl_conf_rng ...\r\n");
mbedtls_ssl_conf_rng(&_ssl_conf, mbedtls_ctr_drbg_random, &_ctr_drbg);
/* It is possible to disable authentication by passing
* MBEDTLS_SSL_VERIFY_NONE in the call to mbedtls_ssl_conf_authmode()
*/
- printf("mbedtls_ssl_conf_authmode ...\r\n");
+ DBG("mbedtls_ssl_conf_authmode ...\r\n");
mbedtls_ssl_conf_authmode(&_ssl_conf, MBEDTLS_SSL_VERIFY_REQUIRED);
#if DEBUG_LEVEL > 0
@@ -179,7 +181,7 @@
mbedtls_debug_set_threshold(DEBUG_LEVEL);
#endif
- printf("mbedtls_ssl_setup ...\r\n");
+ DBG("mbedtls_ssl_setup ...\r\n");
if ((ret = mbedtls_ssl_setup(&_ssl, &_ssl_conf)) != 0) {
mbedtls_printf("mbedtls_ssl_setup returned [%x]\r\n", ret);
_error = ret;
@@ -194,7 +196,7 @@
int ret;
/* Start the handshake, the rest will be done in onReceive() */
- printf("Starting the TLS handshake...\r\n");
+ DBG("Starting the TLS handshake...\r\n");
ret = mbedtls_ssl_handshake(&_ssl);
if (ret < 0)
{
@@ -210,7 +212,7 @@
}
/* Handshake done, time to print info */
- printf("TLS connection to %s:%d established\r\n",
+ DBG("TLS connection to %s:%d established\r\n",
host.c_str(), port);
const uint32_t buf_size = 1024;
@@ -218,22 +220,23 @@
mbedtls_x509_crt_info(buf, buf_size, "\r ",
mbedtls_ssl_get_peer_cert(&_ssl));
- printf("Server certificate:\r\n%s\r", buf);
+ DBG("Server certificate:\r\n%s\r", buf);
// Verify server cert ...
uint32_t flags = mbedtls_ssl_get_verify_result(&_ssl);
if( flags != 0 )
{
mbedtls_x509_crt_verify_info(buf, buf_size, "\r ! ", flags);
- printf("Certificate verification failed:\r\n%s\r\r\n", buf);
+ DBG("Certificate verification failed:\r\n%s\r\r\n", buf);
// free server cert ... before error return
delete [] buf;
return -1;
}
- printf("Certificate verification passed\r\n\r\n");
+ DBG("Certificate verification passed\r\n\r\n");
// delete server cert after verification
delete [] buf;
+#if defined(MBEDTLS_SSL_CLI_C)
// TODO: Save the session here for reconnect.
if( ( ret = mbedtls_ssl_get_session( &_ssl, &saved_session ) ) != 0 )
{
@@ -241,8 +244,8 @@
hasSavedSession = false;
return -1;
}
-
- printf("Session saved for reconnect ...\r\n");
+#endif
+ DBG("Session saved for reconnect ...\r\n");
hasSavedSession = true;
return 0;
@@ -436,7 +439,7 @@
if (!isConnected)
{
- printf("Session not connected! \r\n");
+ DBG("Session not connected! \r\n");
return rc;
}
@@ -445,18 +448,18 @@
// amount for our 32 bit timers which accepts ms.
keepAliveInterval = (connect_options.keepAliveInterval * 1000);
- printf("Login with: \r\n");
- printf("\tUsername: [%s]\r\n", connect_options.username.cstring);
- printf("\tPassword: [%s]\r\n", connect_options.password.cstring);
+ DBG("Login with: \r\n");
+ DBG("\tUsername: [%s]\r\n", connect_options.username.cstring);
+ DBG("\tPassword: [%s]\r\n", connect_options.password.cstring);
if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &connect_options)) <= 0)
{
- printf("Error serializing connect packet ...\r\n");
+ DBG("Error serializing connect packet ...\r\n");
return rc;
}
if ((rc = sendPacket((size_t) len)) != SUCCESS) // send the connect packet
{
- printf("Error sending the connect request packet ...\r\n");
+ DBG("Error sending the connect request packet ...\r\n");
return rc;
}
@@ -465,7 +468,7 @@
{
unsigned char connack_rc = 255;
bool sessionPresent = false;
- printf("Connection acknowledgement received ... deserializing respones ...\r\n");
+ DBG("Connection acknowledgement received ... deserializing respones ...\r\n");
if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
rc = connack_rc;
else
@@ -476,11 +479,11 @@
if (rc == SUCCESS)
{
- printf("Connected!!! ... starting connection timers ...\r\n");
+ DBG("Connected!!! ... starting connection timers ...\r\n");
resetConnectionTimer();
}
- printf("Returning with rc = %d\r\n", rc);
+ DBG("Returning with rc = %d\r\n", rc);
return rc;
}
@@ -494,7 +497,7 @@
&& ( mbedtls_ssl_session_reset( &_ssl ) != 0 )
)
{
- printf( "Session reset returned an error \r\n");
+ DBG( "Session reset returned an error \r\n");
}
isConnected = false;
@@ -509,7 +512,7 @@
if ((network == NULL) || (tcpSocket == NULL)
|| host.empty())
{
- printf("Network settings not set! \r\n");
+ DBG("Network settings not set! \r\n");
return ret;
}
@@ -519,26 +522,27 @@
mbedtls_printf( " failed\n ! mbedtls_ssl_session_reset returned -0x%x\n\n", -ret );
return ret;
}
-
+#if defined(MBEDTLS_SSL_CLI_C)
if ( hasSavedSession && (( ret = mbedtls_ssl_set_session( &_ssl, &saved_session ) ) != 0 )) {
mbedtls_printf( " failed\n ! mbedtls_ssl_conf_session returned %d\n\n", ret );
return ret;
}
+#endif
}
tcpSocket->open(network);
if (useTLS)
{
- printf("mbedtls_ssl_set_hostname ...\r\n");
+ DBG("mbedtls_ssl_set_hostname ...\r\n");
mbedtls_ssl_set_hostname(&_ssl, host.c_str());
- printf("mbedtls_ssl_set_bio ...\r\n");
+ DBG("mbedtls_ssl_set_bio ...\r\n");
mbedtls_ssl_set_bio(&_ssl, static_cast<void *>(tcpSocket),
ssl_send, ssl_recv, NULL );
}
if (( ret = tcpSocket->connect(host.c_str(), port)) < 0 )
{
- printf("Error connecting to %s:%d with %d\r\n", host.c_str(), port, ret);
+ DBG("Error connecting to %s:%d with %d\r\n", host.c_str(), port, ret);
return ret;
}else
isConnected = true;
@@ -548,10 +552,10 @@
if (doTLSHandshake() < 0)
{
- printf("TLS Handshake failed! \r\n");
+ DBG("TLS Handshake failed! \r\n");
return FAILURE;
}else
- printf("TLS Handshake complete!! \r\n");
+ DBG("TLS Handshake complete!! \r\n");
}
return login();
@@ -580,7 +584,7 @@
*message = msg;
// Push the data to the thread
- printf("[Thread:%d] Pushing data to consumer thread ...\r\n", Thread::gettid());
+ DBG("[Thread:%d] Pushing data to consumer thread ...\r\n", Thread::gettid());
mqueue.put(message);
return SUCCESS;
@@ -592,7 +596,7 @@
if (!isConnected)
{
- printf("[Thread:%d] Not connected!!! ...\r\n", Thread::gettid());
+ DBG("[Thread:%d] Not connected!!! ...\r\n", Thread::gettid());
return FAILURE;
}
@@ -601,17 +605,17 @@
topicString, (unsigned char*) &message.payload[0], (int) message.payloadlen);
if (len <= 0)
{
- printf("[Thread:%d]Failed serializing message ...\r\n", Thread::gettid());
+ DBG("[Thread:%d]Failed serializing message ...\r\n", Thread::gettid());
return FAILURE;
}
if (sendPacket(len) == SUCCESS)
{
- printf("[Thread:%d]Successfully sent publish packet to server ...\r\n", Thread::gettid());
+ DBG("[Thread:%d]Successfully sent publish packet to server ...\r\n", Thread::gettid());
return SUCCESS;
}
- printf("[Thread:%d]Failed to send publish packet to server ...\r\n", Thread::gettid());
+ DBG("[Thread:%d]Failed to send publish packet to server ...\r\n", Thread::gettid());
return FAILURE;
}
@@ -630,11 +634,11 @@
if (!isConnected)
{
- printf("Session not connected!!\r\n");
+ DBG("Session not connected!!\r\n");
return 0;
}
- printf("Processing subscribed topics ....\r\n");
+ DBG("Processing subscribed topics ....\r\n");
std::map<std::string, FP<void, MessageData &> >::iterator it;
for(it = topicCBMap.begin(); it != topicCBMap.end(); it++)
@@ -645,21 +649,21 @@
QoS qos = QOS0;
MQTTString topic = {(char*)it->first.c_str(), {0, 0}};
- printf("Subscribing to topic [%s]\r\n", topic.cstring);
+ DBG("Subscribing to topic [%s]\r\n", topic.cstring);
len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
if (len <= 0) {
- printf("Error serializing subscribe packet ...\r\n");
+ DBG("Error serializing subscribe packet ...\r\n");
continue;
}
if ((rc = sendPacket(len)) != SUCCESS) {
- printf("Error sending subscribe packet [%d]\r\n", rc);
+ DBG("Error sending subscribe packet [%d]\r\n", rc);
continue;
}
- printf("Waiting for subscription ack ...\r\n");
+ DBG("Waiting for subscription ack ...\r\n");
// Wait for SUBACK, dropping packets read along the way ...
if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback
int count = 0, grantedQoS = -1;
@@ -671,13 +675,13 @@
{
// Reset connection timers here ...
resetConnectionTimer();
- printf("Successfully subscribed to %s ...\r\n", it->first.c_str());
+ DBG("Successfully subscribed to %s ...\r\n", it->first.c_str());
numsubscribed++;
} else {
- printf("Failed to subscribe to topic %s ... (not authorized?)\r\n", it->first.c_str());
+ DBG("Failed to subscribe to topic %s ... (not authorized?)\r\n", it->first.c_str());
}
} else
- printf("Failed to subscribe to topic %s (ack not received) ...\r\n", it->first.c_str());
+ DBG("Failed to subscribe to topic %s (ack not received) ...\r\n", it->first.c_str());
} // end for loop
return numsubscribed;
@@ -715,7 +719,7 @@
MQTTString topicName = MQTTString_initializer;
Message msg;
int intQoS;
- printf("[Thread:%d]Deserializing publish message ...\r\n", Thread::gettid());
+ DBG("[Thread:%d]Deserializing publish message ...\r\n", Thread::gettid());
if (MQTTDeserialize_publish((unsigned char*)&msg.dup,
&intQoS,
(unsigned char*)&msg.retained,
@@ -724,7 +728,7 @@
(unsigned char**)&msg.payload,
(int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
{
- printf("[Thread:%d]Error deserializing published message ...\r\n", Thread::gettid());
+ DBG("[Thread:%d]Error deserializing published message ...\r\n", Thread::gettid());
return -1;
}
@@ -735,7 +739,7 @@
}else
topic = (const char *) topicName.cstring;
- printf("[Thread:%d]Got message for topic [%s], QoS [%d] ...\r\n", Thread::gettid(), topic.c_str(), intQoS);
+ DBG("[Thread:%d]Got message for topic [%s], QoS [%d] ...\r\n", Thread::gettid(), topic.c_str(), intQoS);
msg.qos = (QoS) intQoS;
@@ -746,7 +750,7 @@
// Call the callback function
if (topicCBMap[topic].attached())
{
- printf("[Thread:%d]Invoking function handler for topic ...\r\n", Thread::gettid());
+ DBG("[Thread:%d]Invoking function handler for topic ...\r\n", Thread::gettid());
MessageData md(topicName, msg);
topicCBMap[topic](md);
@@ -801,7 +805,7 @@
int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
if (len > 0 && (sendPacket(len) == SUCCESS)) // send the ping packet
{
- printf("[Thread:%d]Ping request sent successfully ...\r\n", Thread::gettid());
+ printf("MQTT Ping request sent successfully ...\r\n");
}
}
@@ -829,7 +833,7 @@
}
numsubs = processSubscriptions();
- printf("Subscribed %d topics ...\r\n", numsubs);
+ DBG("Subscribed %d topics ...\r\n", numsubs);
// loop read
while(true)
@@ -842,13 +846,13 @@
break;
case FAILURE:
{
- printf("readPacket returned failure \r\n");
+ DBG("readPacket returned failure \r\n");
goto reconnect;
}
case BUFFER_OVERFLOW:
{
// TODO: Network error, do we disconnect and reconnect?
- printf("[Thread:%d]Failure or buffer overflow problem ... \r\n", Thread::gettid());
+ DBG("Failure or buffer overflow problem ... \r\n");
MBED_ASSERT(false);
}
break;
@@ -862,22 +866,22 @@
break;
case PUBLISH:
{
- printf("[Thread:%d]Publish received!....\r\n", Thread::gettid());
+ DBG("[Thread:%d]Publish received!....\r\n", Thread::gettid());
// We receive data from the MQTT server ..
if (handlePublishMsg() < 0) {
- printf("[Thread:%d]Error handling PUBLISH message ... \r\n", Thread::gettid());
+ DBG("[Thread:%d]Error handling PUBLISH message ... \r\n", Thread::gettid());
break;
}
}
break;
case PINGRESP:
{
- printf("[Thread:%d]Got ping response ...\r\n", Thread::gettid());
+ printf("MQTT Got ping response ...\r\n");
resetConnectionTimer();
}
break;
default:
- printf("[Thread:%d]Unknown/Not handled message from server pType[%d]\r\n", Thread::gettid(), pType);
+ DBG("[Thread:%d]Unknown/Not handled message from server pType[%d]\r\n", Thread::gettid(), pType);
}
// Check if its time to send a keepAlive packet
@@ -891,7 +895,7 @@
osEvent evt = mqueue.get(10);
if (evt.status == osEventMessage) {
- printf("[Thread:%d]Got message to publish! ... \r\n", Thread::gettid());
+ DBG("[Thread:%d]Got message to publish! ... \r\n", Thread::gettid());
// Unpack the message
PubMessage * message = (PubMessage *)evt.value.p;
@@ -916,7 +920,7 @@
reconnect:
// reconnect?
- printf("Client disconnected!! ... retrying ...\r\n");
+ DBG("Client disconnected!! ... retrying ...\r\n");
disconnect();
};
@@ -926,4 +930,6 @@
{
// TODO: Set a signal/flag that the running thread
// will check if its ok to stop ...
+}
+
}
\ No newline at end of file
--- a/MQTTThreadedClient.h Mon Mar 27 15:16:23 2017 +0000
+++ b/MQTTThreadedClient.h Sat Apr 01 12:41:29 2017 +0000
@@ -12,11 +12,22 @@
#include <string>
#include <map>
+//#define MQTT_DEBUG 1
+
+#ifdef MQTT_DEBUG
+#define DBG(fmt, args...) printf(fmt, ## args)
+#else
+#define DBG(fmt, args...) /* Don't do anything in release builds */
+#endif
+
#define COMMAND_TIMEOUT 5000
#define DEFAULT_SOCKET_TIMEOUT 1000
-#define MAX_MQTT_PACKET_SIZE 200
-#define MAX_MQTT_PAYLOAD_SIZE 100
+#define MAX_MQTT_PACKET_SIZE 500
+#define MAX_MQTT_PAYLOAD_SIZE 300
+namespace MQTT
+{
+
typedef enum { QOS0, QOS1, QOS2 } QoS;
// all failure return codes must be negative
@@ -187,4 +198,5 @@
int login();
};
+}
#endif
--- a/easy-connect.lib Mon Mar 27 15:16:23 2017 +0000 +++ b/easy-connect.lib Sat Apr 01 12:41:29 2017 +0000 @@ -1,1 +1,1 @@ -https://github.com/ARMmbed/easy-connect/#cb933fb19cda0a733a64d6b71d271fb6bdaf9e6d +https://github.com/ARMmbed/easy-connect/#5b9cb8cea4a11b0ab974c991b527c9b79fceae75
--- a/main.cpp Mon Mar 27 15:16:23 2017 +0000
+++ b/main.cpp Sat Apr 01 12:41:29 2017 +0000
@@ -27,9 +27,12 @@
#include "mbed.h"
#include "rtos.h"
+
+#undef MBED_CONF_APP_ESP8266_DEBUG
#include "easy-connect.h"
#include "MQTTThreadedClient.h"
+using namespace MQTT;
Serial pc(USBTX, USBRX, 115200);
Thread msgSender(osPriorityNormal, DEFAULT_STACK_SIZE * 2);
@@ -147,7 +150,7 @@
printf("HelloMQTT: version is %.2f\r\n", version);
- NetworkInterface* network = easy_connect(true);
+ NetworkInterface* network = easy_connect(false);
if (!network) {
return -1;
}
--- a/mbed_app.json Mon Mar 27 15:16:23 2017 +0000
+++ b/mbed_app.json Sat Apr 01 12:41:29 2017 +0000
@@ -17,10 +17,10 @@
"value": "D0"
},
"esp8266-ssid": {
- "value": "\"SSID\""
+ "value": "\"VPCOLA\""
},
"esp8266-password": {
- "value": "\"Password\""
+ "value": "\"AB12CD34\""
},
"esp8266-debug": {
"value": true
@@ -64,6 +64,11 @@
"NUCLEO_F411RE": {
"esp8266-tx": "D8",
"esp8266-rx": "D2"
- }
+ },
+ "NUCLEO_L476RG": {
+ "network-interface": "WIFI_ESP8266",
+ "esp8266-tx": "A0",
+ "esp8266-rx": "A1"
+ }
}
}
