A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.
Fork of HelloMQTT by
Diff: MQTTThreadedClient.cpp
- Revision:
- 26:4b21de8043a5
- Parent:
- 25:326f00faa092
- Child:
- 27:c90092f35d79
--- a/MQTTThreadedClient.cpp Mon Mar 27 03:53:18 2017 +0000
+++ b/MQTTThreadedClient.cpp Mon Mar 27 13:45:26 2017 +0000
@@ -89,12 +89,12 @@
char *buf = new char[buf_size];
(void) data;
- if (_debug) mbedtls_printf("\nVerifying certificate at depth %d:\n", depth);
+ if (_debug) mbedtls_printf("\nVerifying certificate at depth %d:\r\n", depth);
mbedtls_x509_crt_info(buf, buf_size - 1, " ", crt);
if (_debug) mbedtls_printf("%s", buf);
if (*flags == 0)
- if (_debug) mbedtls_printf("No verification issue for this certificate\n");
+ if (_debug) mbedtls_printf("No verification issue for this certificate\r\n");
else {
mbedtls_x509_crt_verify_info(buf, buf_size, " ! ", *flags);
if (_debug) mbedtls_printf("%s\n", buf);
@@ -108,7 +108,7 @@
void MQTTThreadedClient::setupTLS()
{
- if (ssl_ca_pem != NULL)
+ if (useTLS)
{
mbedtls_entropy_init(&_entropy);
mbedtls_ctr_drbg_init(&_ctr_drbg);
@@ -121,7 +121,7 @@
void MQTTThreadedClient::freeTLS()
{
- if (ssl_ca_pem != NULL)
+ if (useTLS)
{
mbedtls_entropy_free(&_entropy);
mbedtls_ctr_drbg_free(&_ctr_drbg);
@@ -140,14 +140,14 @@
if ((ret = mbedtls_ctr_drbg_seed(&_ctr_drbg, mbedtls_entropy_func, &_entropy,
(const unsigned char *) DRBG_PERS,
sizeof (DRBG_PERS))) != 0) {
- printf("error [%d] mbedtls_crt_drbg_init", ret);
+ mbedtls_printf("mbedtls_crt_drbg_init returned [%x]\r\n", ret);
_error = ret;
return -1;
}
printf("mbedtls_x509_crt_parse ...\r\n");
if ((ret = mbedtls_x509_crt_parse(&_cacert, (const unsigned char *) ssl_ca_pem,
strlen(ssl_ca_pem) + 1)) != 0) {
- printf("error [%d] mbedtls_x509_crt_parse", ret);
+ mbedtls_printf("mbedtls_x509_crt_parse returned [%x]\r\n", ret);
_error = ret;
return -1;
}
@@ -157,7 +157,7 @@
MBEDTLS_SSL_IS_CLIENT,
MBEDTLS_SSL_TRANSPORT_STREAM,
MBEDTLS_SSL_PRESET_DEFAULT)) != 0) {
- printf("error [%d] mbedtls_ssl_config_defaults", ret);
+ mbedtls_printf("mbedtls_ssl_config_defaults returned [%x]\r\n", ret);
_error = ret;
return -1;
}
@@ -181,7 +181,7 @@
printf("mbedtls_ssl_setup ...\r\n");
if ((ret = mbedtls_ssl_setup(&_ssl, &_ssl_conf)) != 0) {
- printf("error [%d] mbedtls_ssl_setup", ret);
+ mbedtls_printf("mbedtls_ssl_setup returned [%x]\r\n", ret);
_error = ret;
return -1;
}
@@ -200,17 +200,13 @@
{
if (ret != MBEDTLS_ERR_SSL_WANT_READ &&
ret != MBEDTLS_ERR_SSL_WANT_WRITE)
- {
- printf("error [%d] mbedtls_ssl_handshake", ret);
- tcpSocket->close();
- _error = -1;
- }
+ mbedtls_printf("mbedtls_ssl_handshake returned [%x]\r\n", ret);
else
{
// do not close the socket if timed out
- _error = ret;
+ ret = TIMEOUT;
}
- return -1;
+ return ret;
}
/* Handshake done, time to print info */
@@ -241,11 +237,14 @@
// TODO: Save the session here for reconnect.
if( ( ret = mbedtls_ssl_get_session( &_ssl, &saved_session ) ) != 0 )
{
- printf( "mbedtls_ssl_get_session returned -0x%x\n\n", -ret );
+ mbedtls_printf( "mbedtls_ssl_get_session returned -0x%x\n\n", -ret );
+ hasSavedSession = false;
return -1;
}
printf("Session saved for reconnect ...\r\n");
+ hasSavedSession = true;
+
return 0;
}
@@ -256,7 +255,7 @@
if (tcpSocket == NULL)
return -1;
- if (ssl_ca_pem != NULL)
+ if (useTLS)
{
// Do SSL/TLS read
rc = mbedtls_ssl_read(&_ssl, (unsigned char *) buffer, size);
@@ -284,7 +283,7 @@
if (tcpSocket == NULL)
return -1;
- if (ssl_ca_pem != NULL) {
+ if (useTLS) {
// Do SSL/TLS write
rc = mbedtls_ssl_write(&_ssl, (const unsigned char *) buffer, size);
if (MBEDTLS_ERR_SSL_WANT_WRITE == rc)
@@ -430,27 +429,27 @@
}
-int MQTTThreadedClient::connect(MQTTPacket_connectData& options)
+int MQTTThreadedClient::login()
{
int rc = FAILURE;
int len = 0;
- if (isConnected)
+ if (!isConnected)
{
- printf("Session already connected! \r\n");
+ printf("Session not connected! \r\n");
return rc;
}
// Copy the keepAliveInterval value to local
// MQTT specifies in seconds, we have to multiply that
// amount for our 32 bit timers which accepts ms.
- keepAliveInterval = (options.keepAliveInterval * 1000);
+ keepAliveInterval = (connect_options.keepAliveInterval * 1000);
- printf("Connecting with: \r\n");
- printf("\tUsername: [%s]\r\n", options.username.cstring);
- printf("\tPassword: [%s]\r\n", options.password.cstring);
+ printf("Login with: \r\n");
+ printf("\tUsername: [%s]\r\n", connect_options.username.cstring);
+ printf("\tPassword: [%s]\r\n", connect_options.password.cstring);
- if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0)
+ if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &connect_options)) <= 0)
{
printf("Error serializing connect packet ...\r\n");
return rc;
@@ -478,11 +477,7 @@
if (rc == SUCCESS)
{
printf("Connected!!! ... starting connection timers ...\r\n");
- isConnected = true;
resetConnectionTimer();
- }else
- {
- // TODO: Call socket->disconnect()?
}
printf("Returning with rc = %d\r\n", rc);
@@ -495,24 +490,44 @@
{
if (isConnected)
{
- // TODO: Send unsubscribe message ...
+ if( useTLS
+ && ( mbedtls_ssl_session_reset( &_ssl ) != 0 )
+ )
+ {
+ printf( "Session reset returned an error \r\n");
+ }
isConnected = false;
tcpSocket->close();
}
}
-int MQTTThreadedClient::connect(const char * chost, uint16_t cport, MQTTPacket_connectData & options)
+int MQTTThreadedClient::connect()
{
int ret = FAILURE;
+
+ if ((network == NULL) || (tcpSocket == NULL)
+ || host.empty())
+ {
+ printf("Network settings not set! \r\n");
+ return ret;
+ }
- // Copy the settings for reconnection
- host = chost;
- port = cport;
- connect_options = options;
-
+ if (useTLS)
+ {
+ if( ( ret = mbedtls_ssl_session_reset( &_ssl ) ) != 0 ) {
+ mbedtls_printf( " failed\n ! mbedtls_ssl_session_reset returned -0x%x\n\n", -ret );
+ return ret;
+ }
+
+ if ( hasSavedSession && (( ret = mbedtls_ssl_set_session( &_ssl, &saved_session ) ) != 0 )) {
+ mbedtls_printf( " failed\n ! mbedtls_ssl_conf_session returned %d\n\n", ret );
+ return ret;
+ }
+ }
+
tcpSocket->open(network);
- if (ssl_ca_pem != NULL)
+ if (useTLS)
{
printf("mbedtls_ssl_set_hostname ...\r\n");
mbedtls_ssl_set_hostname(&_ssl, host.c_str());
@@ -523,18 +538,31 @@
if (( ret = tcpSocket->connect(host.c_str(), port)) < 0 )
{
-
printf("Error connecting to %s:%d with %d\r\n", host.c_str(), port, ret);
return ret;
- }
+ }else
+ isConnected = true;
- if ((ssl_ca_pem != NULL) && (doTLSHandshake() < 0))
+ if (useTLS)
{
- printf("TLS Handshake failed! \r\n");
- return FAILURE;
+
+ if (doTLSHandshake() < 0)
+ {
+ printf("TLS Handshake failed! \r\n");
+ return FAILURE;
+ }else
+ printf("TLS Handshake complete!! \r\n");
}
- return connect(connect_options);
+ return login();
+}
+
+void MQTTThreadedClient::setConnectionParameters(const char * chost, uint16_t cport, MQTTPacket_connectData & options)
+{
+ // Copy the settings for reconnection
+ host = chost;
+ port = cport;
+ connect_options = options;
}
int MQTTThreadedClient::publish(PubMessage& msg)
@@ -593,7 +621,7 @@
FP<void,MessageData &> fp;
fp.attach(function);
- topicCBMap_t.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));
+ topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));
}
int MQTTThreadedClient::processSubscriptions()
@@ -606,8 +634,10 @@
return 0;
}
+ printf("Processing subscribed topics ....\r\n");
+
std::map<std::string, FP<void, MessageData &> >::iterator it;
- for(it = topicCBMap_t.begin(); it != topicCBMap_t.end(); it++)
+ for(it = topicCBMap.begin(); it != topicCBMap.end(); it++)
{
int rc = FAILURE;
int len = 0;
@@ -653,71 +683,6 @@
return numsubscribed;
}
-int MQTTThreadedClient::subscribe(const char * topicstr, QoS qos, void (*function)(MessageData &))
-{
- int rc = FAILURE;
- int len = 0;
-
- MQTTString topic = {(char*)topicstr, {0, 0}};
- printf("Subscribing to topic [%s]\r\n", topicstr);
-
- if (!isConnected)
- {
- printf("Session already connected!!\r\n");
- return rc;
- }
-
- len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
- if (len <= 0)
- {
- printf("Error serializing subscribe packet ...\r\n");
- return rc;
- }
-
- if ((rc = sendPacket(len)) != SUCCESS)
- {
- printf("Error sending subscribe packet [%d]\r\n", rc);
- return rc;
- }
-
- printf("Waiting for subscription ack ...\r\n");
- // Wait for SUBACK, dropping packets read along the way ...
- if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) // wait for suback
- {
- int count = 0, grantedQoS = -1;
- unsigned short mypacketid;
- if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
- rc = grantedQoS; // 0, 1, 2 or 0x80
- // For as long as we do not get 0x80 ..
- if (rc != 0x80)
- {
- // Add message handlers to the map
- FP<void,MessageData &> fp;
- fp.attach(function);
-
- topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));
-
- // Reset connection timers here ...
- resetConnectionTimer();
-
- printf("Successfully subscribed to %s ...\r\n", topicstr);
- rc = SUCCESS;
- }else
- {
- printf("Failed to subscribe to topic %s ... (not authorized?)\r\n", topicstr);
- }
- }
- else
- {
- printf("Failed to subscribe to topic %s (ack not received) ...\r\n", topicstr);
- rc = FAILURE;
- }
-
- return rc;
-
-}
-
-
bool MQTTThreadedClient::isTopicMatched(char* topicFilter, MQTTString& topicName)
{
char* curf = topicFilter;
@@ -843,23 +808,43 @@
void MQTTThreadedClient::startListener()
{
int pType;
+ int numsubs;
// Continuesly listens for packets and dispatch
// message handlers ...
- do {
- // Connect to server ...
- if (!isConnected)
+ if (useTLS)
+ {
+ initTLS();
+ }
+
+ while(true)
+ {
+
+ // Attempt to reconnect and login
+ if ( connect() < 0 )
{
- // Attempt to reconnect ...
+ disconnect();
+ // Wait for a few secs and reconnect ...
+ Thread::wait(6000);
+ continue;
}
- while(true) {
+ numsubs = processSubscriptions();
+ printf("Subscribed %d topics ...\r\n", numsubs);
+
+ // loop read
+ while(true)
+ {
pType = readPacket();
- switch(pType) {
+ switch(pType)
+ {
case TIMEOUT:
// No data available from the network ...
break;
case FAILURE:
- goto reconnect;
+ {
+ printf("readPacket returned failure \r\n");
+ goto reconnect;
+ }
case BUFFER_OVERFLOW:
{
// TODO: Network error, do we disconnect and reconnect?
@@ -930,8 +915,14 @@
} // end while loop
reconnect:
- disconnect();
- // reconnect?
- } while(true);
+ // reconnect?
+ disconnect();
+
+ };
}
-
\ No newline at end of file
+
+void MQTTThreadedClient::stopListener()
+{
+ // TODO: Set a signal/flag that the running thread
+ // will check if its ok to stop ...
+}
\ No newline at end of file
