A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.
Fork of HelloMQTT by
Diff: MQTTThreadedClient.cpp
- Revision:
- 33:38e2e7bf91eb
- Parent:
- 30:b2aed80037db
--- a/MQTTThreadedClient.cpp Sat Apr 01 12:55:49 2017 +0000
+++ b/MQTTThreadedClient.cpp Sun Apr 02 09:11:53 2017 +0800
@@ -196,7 +196,7 @@
int ret;
/* Start the handshake, the rest will be done in onReceive() */
- DBG("Starting the TLS handshake...\r\n");
+ printf("Starting the TLS handshake...\r\n");
ret = mbedtls_ssl_handshake(&_ssl);
if (ret < 0)
{
@@ -212,7 +212,7 @@
}
/* Handshake done, time to print info */
- DBG("TLS connection to %s:%d established\r\n",
+ printf("TLS connection to %s:%d established\r\n",
host.c_str(), port);
const uint32_t buf_size = 1024;
@@ -220,23 +220,24 @@
mbedtls_x509_crt_info(buf, buf_size, "\r ",
mbedtls_ssl_get_peer_cert(&_ssl));
- DBG("Server certificate:\r\n%s\r", buf);
+ printf("Server certificate:\r\n%s\r", buf);
// Verify server cert ...
uint32_t flags = mbedtls_ssl_get_verify_result(&_ssl);
if( flags != 0 )
{
mbedtls_x509_crt_verify_info(buf, buf_size, "\r ! ", flags);
- DBG("Certificate verification failed:\r\n%s\r\r\n", buf);
+ printf("Certificate verification failed:\r\n%s\r\r\n", buf);
// free server cert ... before error return
delete [] buf;
return -1;
}
- DBG("Certificate verification passed\r\n\r\n");
+ printf("Certificate verification passed\r\n\r\n");
// delete server cert after verification
delete [] buf;
#if defined(MBEDTLS_SSL_CLI_C)
+ printf("Saving SSL/TLS session ...\r\n");
// TODO: Save the session here for reconnect.
if( ( ret = mbedtls_ssl_get_session( &_ssl, &saved_session ) ) != 0 )
{
@@ -244,8 +245,9 @@
hasSavedSession = false;
return -1;
}
+ printf("Session saved for reconnect ...\r\n");
#endif
- DBG("Session saved for reconnect ...\r\n");
+
hasSavedSession = true;
return 0;
@@ -584,7 +586,7 @@
*message = msg;
// Push the data to the thread
- DBG("[Thread:%d] Pushing data to consumer thread ...\r\n", Thread::gettid());
+ DBG("Pushing data to consumer thread ...\r\n");
mqueue.put(message);
return SUCCESS;
@@ -596,7 +598,7 @@
if (!isConnected)
{
- DBG("[Thread:%d] Not connected!!! ...\r\n", Thread::gettid());
+ DBG("Not connected!!! ...\r\n");
return FAILURE;
}
@@ -605,17 +607,17 @@
topicString, (unsigned char*) &message.payload[0], (int) message.payloadlen);
if (len <= 0)
{
- DBG("[Thread:%d]Failed serializing message ...\r\n", Thread::gettid());
+ DBG("Failed serializing message ...\r\n");
return FAILURE;
}
if (sendPacket(len) == SUCCESS)
{
- DBG("[Thread:%d]Successfully sent publish packet to server ...\r\n", Thread::gettid());
+ DBG("Successfully sent publish packet to server ...\r\n");
return SUCCESS;
}
- DBG("[Thread:%d]Failed to send publish packet to server ...\r\n", Thread::gettid());
+ DBG("Failed to send publish packet to server ...\r\n");
return FAILURE;
}
@@ -719,7 +721,7 @@
MQTTString topicName = MQTTString_initializer;
Message msg;
int intQoS;
- DBG("[Thread:%d]Deserializing publish message ...\r\n", Thread::gettid());
+ DBG("Deserializing publish message ...\r\n");
if (MQTTDeserialize_publish((unsigned char*)&msg.dup,
&intQoS,
(unsigned char*)&msg.retained,
@@ -728,7 +730,7 @@
(unsigned char**)&msg.payload,
(int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
{
- DBG("[Thread:%d]Error deserializing published message ...\r\n", Thread::gettid());
+ DBG("Error deserializing published message ...\r\n");
return -1;
}
@@ -739,7 +741,7 @@
}else
topic = (const char *) topicName.cstring;
- DBG("[Thread:%d]Got message for topic [%s], QoS [%d] ...\r\n", Thread::gettid(), topic.c_str(), intQoS);
+ DBG("Got message for topic [%s], QoS [%d] ...\r\n", topic.c_str(), intQoS);
msg.qos = (QoS) intQoS;
@@ -750,7 +752,7 @@
// Call the callback function
if (topicCBMap[topic].attached())
{
- DBG("[Thread:%d]Invoking function handler for topic ...\r\n", Thread::gettid());
+ DBG("Invoking function handler for topic ...\r\n");
MessageData md(topicName, msg);
topicCBMap[topic](md);
@@ -805,7 +807,7 @@
int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
if (len > 0 && (sendPacket(len) == SUCCESS)) // send the ping packet
{
- DBG("[Thread:%d]Ping request sent successfully ...\r\n", Thread::gettid());
+ DBG("Ping request sent successfully ...\r\n");
}
}
@@ -852,7 +854,7 @@
case BUFFER_OVERFLOW:
{
// TODO: Network error, do we disconnect and reconnect?
- DBG("[Thread:%d]Failure or buffer overflow problem ... \r\n", Thread::gettid());
+ DBG("Failure or buffer overflow problem ... \r\n");
MBED_ASSERT(false);
}
break;
@@ -866,22 +868,22 @@
break;
case PUBLISH:
{
- DBG("[Thread:%d]Publish received!....\r\n", Thread::gettid());
+ DBG("Publish received!....\r\n");
// We receive data from the MQTT server ..
if (handlePublishMsg() < 0) {
- DBG("[Thread:%d]Error handling PUBLISH message ... \r\n", Thread::gettid());
+ DBG("Error handling PUBLISH message ... \r\n");
break;
}
}
break;
case PINGRESP:
{
- DBG("[Thread:%d]Got ping response ...\r\n", Thread::gettid());
+ DBG("Got ping response ...\r\n");
resetConnectionTimer();
}
break;
default:
- DBG("[Thread:%d]Unknown/Not handled message from server pType[%d]\r\n", Thread::gettid(), pType);
+ DBG("Unknown/Not handled message from server pType[%d]\r\n", pType);
}
// Check if its time to send a keepAlive packet
@@ -895,7 +897,7 @@
osEvent evt = mqueue.get(10);
if (evt.status == osEventMessage) {
- DBG("[Thread:%d]Got message to publish! ... \r\n", Thread::gettid());
+ DBG("Got message to publish! ... \r\n");
// Unpack the message
PubMessage * message = (PubMessage *)evt.value.p;
