Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Fork of HelloMQTT by
Diff: MQTTThreadedClient.cpp
- Revision:
- 30:b2aed80037db
- Parent:
- 27:c90092f35d79
- Child:
- 33:38e2e7bf91eb
diff -r 45fd261e840b -r b2aed80037db MQTTThreadedClient.cpp
--- a/MQTTThreadedClient.cpp Mon Mar 27 15:16:23 2017 +0000
+++ b/MQTTThreadedClient.cpp Tue Mar 28 09:18:49 2017 +0000
@@ -7,8 +7,8 @@
#include "mbedtls/ctr_drbg.h"
#include "mbedtls/error.h"
-static MemoryPool<PubMessage, 16> mpool;
-static Queue<PubMessage, 16> mqueue;
+static MemoryPool<MQTT::PubMessage, 8> mpool;
+static Queue<MQTT::PubMessage, 8> mqueue;
// SSL/TLS variables
mbedtls_entropy_context _entropy;
@@ -18,6 +18,8 @@
mbedtls_ssl_config _ssl_conf;
mbedtls_ssl_session saved_session;
+namespace MQTT {
+
/**
* Receive callback for mbed TLS
*/
@@ -135,8 +137,8 @@
{
int ret;
- printf("Initializing TLS ...\r\n");
- printf("mbedtls_ctr_drdbg_seed ...\r\n");
+ DBG("Initializing TLS ...\r\n");
+ DBG("mbedtls_ctr_drdbg_seed ...\r\n");
if ((ret = mbedtls_ctr_drbg_seed(&_ctr_drbg, mbedtls_entropy_func, &_entropy,
(const unsigned char *) DRBG_PERS,
sizeof (DRBG_PERS))) != 0) {
@@ -144,7 +146,7 @@
_error = ret;
return -1;
}
- printf("mbedtls_x509_crt_parse ...\r\n");
+ DBG("mbedtls_x509_crt_parse ...\r\n");
if ((ret = mbedtls_x509_crt_parse(&_cacert, (const unsigned char *) ssl_ca_pem,
strlen(ssl_ca_pem) + 1)) != 0) {
mbedtls_printf("mbedtls_x509_crt_parse returned [%x]\r\n", ret);
@@ -152,7 +154,7 @@
return -1;
}
- printf("mbedtls_ssl_config_defaults ...\r\n");
+ DBG("mbedtls_ssl_config_defaults ...\r\n");
if ((ret = mbedtls_ssl_config_defaults(&_ssl_conf,
MBEDTLS_SSL_IS_CLIENT,
MBEDTLS_SSL_TRANSPORT_STREAM,
@@ -162,15 +164,15 @@
return -1;
}
- printf("mbedtls_ssl_config_ca_chain ...\r\n");
+ DBG("mbedtls_ssl_config_ca_chain ...\r\n");
mbedtls_ssl_conf_ca_chain(&_ssl_conf, &_cacert, NULL);
- printf("mbedtls_ssl_conf_rng ...\r\n");
+ DBG("mbedtls_ssl_conf_rng ...\r\n");
mbedtls_ssl_conf_rng(&_ssl_conf, mbedtls_ctr_drbg_random, &_ctr_drbg);
/* It is possible to disable authentication by passing
* MBEDTLS_SSL_VERIFY_NONE in the call to mbedtls_ssl_conf_authmode()
*/
- printf("mbedtls_ssl_conf_authmode ...\r\n");
+ DBG("mbedtls_ssl_conf_authmode ...\r\n");
mbedtls_ssl_conf_authmode(&_ssl_conf, MBEDTLS_SSL_VERIFY_REQUIRED);
#if DEBUG_LEVEL > 0
@@ -179,7 +181,7 @@
mbedtls_debug_set_threshold(DEBUG_LEVEL);
#endif
- printf("mbedtls_ssl_setup ...\r\n");
+ DBG("mbedtls_ssl_setup ...\r\n");
if ((ret = mbedtls_ssl_setup(&_ssl, &_ssl_conf)) != 0) {
mbedtls_printf("mbedtls_ssl_setup returned [%x]\r\n", ret);
_error = ret;
@@ -194,7 +196,7 @@
int ret;
/* Start the handshake, the rest will be done in onReceive() */
- printf("Starting the TLS handshake...\r\n");
+ DBG("Starting the TLS handshake...\r\n");
ret = mbedtls_ssl_handshake(&_ssl);
if (ret < 0)
{
@@ -210,7 +212,7 @@
}
/* Handshake done, time to print info */
- printf("TLS connection to %s:%d established\r\n",
+ DBG("TLS connection to %s:%d established\r\n",
host.c_str(), port);
const uint32_t buf_size = 1024;
@@ -218,22 +220,23 @@
mbedtls_x509_crt_info(buf, buf_size, "\r ",
mbedtls_ssl_get_peer_cert(&_ssl));
- printf("Server certificate:\r\n%s\r", buf);
+ DBG("Server certificate:\r\n%s\r", buf);
// Verify server cert ...
uint32_t flags = mbedtls_ssl_get_verify_result(&_ssl);
if( flags != 0 )
{
mbedtls_x509_crt_verify_info(buf, buf_size, "\r ! ", flags);
- printf("Certificate verification failed:\r\n%s\r\r\n", buf);
+ DBG("Certificate verification failed:\r\n%s\r\r\n", buf);
// free server cert ... before error return
delete [] buf;
return -1;
}
- printf("Certificate verification passed\r\n\r\n");
+ DBG("Certificate verification passed\r\n\r\n");
// delete server cert after verification
delete [] buf;
+#if defined(MBEDTLS_SSL_CLI_C)
// TODO: Save the session here for reconnect.
if( ( ret = mbedtls_ssl_get_session( &_ssl, &saved_session ) ) != 0 )
{
@@ -241,8 +244,8 @@
hasSavedSession = false;
return -1;
}
-
- printf("Session saved for reconnect ...\r\n");
+#endif
+ DBG("Session saved for reconnect ...\r\n");
hasSavedSession = true;
return 0;
@@ -436,7 +439,7 @@
if (!isConnected)
{
- printf("Session not connected! \r\n");
+ DBG("Session not connected! \r\n");
return rc;
}
@@ -445,18 +448,18 @@
// amount for our 32 bit timers which accepts ms.
keepAliveInterval = (connect_options.keepAliveInterval * 1000);
- printf("Login with: \r\n");
- printf("\tUsername: [%s]\r\n", connect_options.username.cstring);
- printf("\tPassword: [%s]\r\n", connect_options.password.cstring);
+ DBG("Login with: \r\n");
+ DBG("\tUsername: [%s]\r\n", connect_options.username.cstring);
+ DBG("\tPassword: [%s]\r\n", connect_options.password.cstring);
if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &connect_options)) <= 0)
{
- printf("Error serializing connect packet ...\r\n");
+ DBG("Error serializing connect packet ...\r\n");
return rc;
}
if ((rc = sendPacket((size_t) len)) != SUCCESS) // send the connect packet
{
- printf("Error sending the connect request packet ...\r\n");
+ DBG("Error sending the connect request packet ...\r\n");
return rc;
}
@@ -465,7 +468,7 @@
{
unsigned char connack_rc = 255;
bool sessionPresent = false;
- printf("Connection acknowledgement received ... deserializing respones ...\r\n");
+ DBG("Connection acknowledgement received ... deserializing respones ...\r\n");
if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
rc = connack_rc;
else
@@ -476,11 +479,11 @@
if (rc == SUCCESS)
{
- printf("Connected!!! ... starting connection timers ...\r\n");
+ DBG("Connected!!! ... starting connection timers ...\r\n");
resetConnectionTimer();
}
- printf("Returning with rc = %d\r\n", rc);
+ DBG("Returning with rc = %d\r\n", rc);
return rc;
}
@@ -494,7 +497,7 @@
&& ( mbedtls_ssl_session_reset( &_ssl ) != 0 )
)
{
- printf( "Session reset returned an error \r\n");
+ DBG( "Session reset returned an error \r\n");
}
isConnected = false;
@@ -509,7 +512,7 @@
if ((network == NULL) || (tcpSocket == NULL)
|| host.empty())
{
- printf("Network settings not set! \r\n");
+ DBG("Network settings not set! \r\n");
return ret;
}
@@ -519,26 +522,27 @@
mbedtls_printf( " failed\n ! mbedtls_ssl_session_reset returned -0x%x\n\n", -ret );
return ret;
}
-
+#if defined(MBEDTLS_SSL_CLI_C)
if ( hasSavedSession && (( ret = mbedtls_ssl_set_session( &_ssl, &saved_session ) ) != 0 )) {
mbedtls_printf( " failed\n ! mbedtls_ssl_conf_session returned %d\n\n", ret );
return ret;
}
+#endif
}
tcpSocket->open(network);
if (useTLS)
{
- printf("mbedtls_ssl_set_hostname ...\r\n");
+ DBG("mbedtls_ssl_set_hostname ...\r\n");
mbedtls_ssl_set_hostname(&_ssl, host.c_str());
- printf("mbedtls_ssl_set_bio ...\r\n");
+ DBG("mbedtls_ssl_set_bio ...\r\n");
mbedtls_ssl_set_bio(&_ssl, static_cast<void *>(tcpSocket),
ssl_send, ssl_recv, NULL );
}
if (( ret = tcpSocket->connect(host.c_str(), port)) < 0 )
{
- printf("Error connecting to %s:%d with %d\r\n", host.c_str(), port, ret);
+ DBG("Error connecting to %s:%d with %d\r\n", host.c_str(), port, ret);
return ret;
}else
isConnected = true;
@@ -548,10 +552,10 @@
if (doTLSHandshake() < 0)
{
- printf("TLS Handshake failed! \r\n");
+ DBG("TLS Handshake failed! \r\n");
return FAILURE;
}else
- printf("TLS Handshake complete!! \r\n");
+ DBG("TLS Handshake complete!! \r\n");
}
return login();
@@ -580,7 +584,7 @@
*message = msg;
// Push the data to the thread
- printf("[Thread:%d] Pushing data to consumer thread ...\r\n", Thread::gettid());
+ DBG("[Thread:%d] Pushing data to consumer thread ...\r\n", Thread::gettid());
mqueue.put(message);
return SUCCESS;
@@ -592,7 +596,7 @@
if (!isConnected)
{
- printf("[Thread:%d] Not connected!!! ...\r\n", Thread::gettid());
+ DBG("[Thread:%d] Not connected!!! ...\r\n", Thread::gettid());
return FAILURE;
}
@@ -601,17 +605,17 @@
topicString, (unsigned char*) &message.payload[0], (int) message.payloadlen);
if (len <= 0)
{
- printf("[Thread:%d]Failed serializing message ...\r\n", Thread::gettid());
+ DBG("[Thread:%d]Failed serializing message ...\r\n", Thread::gettid());
return FAILURE;
}
if (sendPacket(len) == SUCCESS)
{
- printf("[Thread:%d]Successfully sent publish packet to server ...\r\n", Thread::gettid());
+ DBG("[Thread:%d]Successfully sent publish packet to server ...\r\n", Thread::gettid());
return SUCCESS;
}
- printf("[Thread:%d]Failed to send publish packet to server ...\r\n", Thread::gettid());
+ DBG("[Thread:%d]Failed to send publish packet to server ...\r\n", Thread::gettid());
return FAILURE;
}
@@ -630,11 +634,11 @@
if (!isConnected)
{
- printf("Session not connected!!\r\n");
+ DBG("Session not connected!!\r\n");
return 0;
}
- printf("Processing subscribed topics ....\r\n");
+ DBG("Processing subscribed topics ....\r\n");
std::map<std::string, FP<void, MessageData &> >::iterator it;
for(it = topicCBMap.begin(); it != topicCBMap.end(); it++)
@@ -645,21 +649,21 @@
QoS qos = QOS0;
MQTTString topic = {(char*)it->first.c_str(), {0, 0}};
- printf("Subscribing to topic [%s]\r\n", topic.cstring);
+ DBG("Subscribing to topic [%s]\r\n", topic.cstring);
len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
if (len <= 0) {
- printf("Error serializing subscribe packet ...\r\n");
+ DBG("Error serializing subscribe packet ...\r\n");
continue;
}
if ((rc = sendPacket(len)) != SUCCESS) {
- printf("Error sending subscribe packet [%d]\r\n", rc);
+ DBG("Error sending subscribe packet [%d]\r\n", rc);
continue;
}
- printf("Waiting for subscription ack ...\r\n");
+ DBG("Waiting for subscription ack ...\r\n");
// Wait for SUBACK, dropping packets read along the way ...
if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback
int count = 0, grantedQoS = -1;
@@ -671,13 +675,13 @@
{
// Reset connection timers here ...
resetConnectionTimer();
- printf("Successfully subscribed to %s ...\r\n", it->first.c_str());
+ DBG("Successfully subscribed to %s ...\r\n", it->first.c_str());
numsubscribed++;
} else {
- printf("Failed to subscribe to topic %s ... (not authorized?)\r\n", it->first.c_str());
+ DBG("Failed to subscribe to topic %s ... (not authorized?)\r\n", it->first.c_str());
}
} else
- printf("Failed to subscribe to topic %s (ack not received) ...\r\n", it->first.c_str());
+ DBG("Failed to subscribe to topic %s (ack not received) ...\r\n", it->first.c_str());
} // end for loop
return numsubscribed;
@@ -715,7 +719,7 @@
MQTTString topicName = MQTTString_initializer;
Message msg;
int intQoS;
- printf("[Thread:%d]Deserializing publish message ...\r\n", Thread::gettid());
+ DBG("[Thread:%d]Deserializing publish message ...\r\n", Thread::gettid());
if (MQTTDeserialize_publish((unsigned char*)&msg.dup,
&intQoS,
(unsigned char*)&msg.retained,
@@ -724,7 +728,7 @@
(unsigned char**)&msg.payload,
(int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
{
- printf("[Thread:%d]Error deserializing published message ...\r\n", Thread::gettid());
+ DBG("[Thread:%d]Error deserializing published message ...\r\n", Thread::gettid());
return -1;
}
@@ -735,7 +739,7 @@
}else
topic = (const char *) topicName.cstring;
- printf("[Thread:%d]Got message for topic [%s], QoS [%d] ...\r\n", Thread::gettid(), topic.c_str(), intQoS);
+ DBG("[Thread:%d]Got message for topic [%s], QoS [%d] ...\r\n", Thread::gettid(), topic.c_str(), intQoS);
msg.qos = (QoS) intQoS;
@@ -746,7 +750,7 @@
// Call the callback function
if (topicCBMap[topic].attached())
{
- printf("[Thread:%d]Invoking function handler for topic ...\r\n", Thread::gettid());
+ DBG("[Thread:%d]Invoking function handler for topic ...\r\n", Thread::gettid());
MessageData md(topicName, msg);
topicCBMap[topic](md);
@@ -801,7 +805,7 @@
int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
if (len > 0 && (sendPacket(len) == SUCCESS)) // send the ping packet
{
- printf("[Thread:%d]Ping request sent successfully ...\r\n", Thread::gettid());
+ DBG("[Thread:%d]Ping request sent successfully ...\r\n", Thread::gettid());
}
}
@@ -829,7 +833,7 @@
}
numsubs = processSubscriptions();
- printf("Subscribed %d topics ...\r\n", numsubs);
+ DBG("Subscribed %d topics ...\r\n", numsubs);
// loop read
while(true)
@@ -842,13 +846,13 @@
break;
case FAILURE:
{
- printf("readPacket returned failure \r\n");
+ DBG("readPacket returned failure \r\n");
goto reconnect;
}
case BUFFER_OVERFLOW:
{
// TODO: Network error, do we disconnect and reconnect?
- printf("[Thread:%d]Failure or buffer overflow problem ... \r\n", Thread::gettid());
+ DBG("[Thread:%d]Failure or buffer overflow problem ... \r\n", Thread::gettid());
MBED_ASSERT(false);
}
break;
@@ -862,22 +866,22 @@
break;
case PUBLISH:
{
- printf("[Thread:%d]Publish received!....\r\n", Thread::gettid());
+ DBG("[Thread:%d]Publish received!....\r\n", Thread::gettid());
// We receive data from the MQTT server ..
if (handlePublishMsg() < 0) {
- printf("[Thread:%d]Error handling PUBLISH message ... \r\n", Thread::gettid());
+ DBG("[Thread:%d]Error handling PUBLISH message ... \r\n", Thread::gettid());
break;
}
}
break;
case PINGRESP:
{
- printf("[Thread:%d]Got ping response ...\r\n", Thread::gettid());
+ DBG("[Thread:%d]Got ping response ...\r\n", Thread::gettid());
resetConnectionTimer();
}
break;
default:
- printf("[Thread:%d]Unknown/Not handled message from server pType[%d]\r\n", Thread::gettid(), pType);
+ DBG("[Thread:%d]Unknown/Not handled message from server pType[%d]\r\n", Thread::gettid(), pType);
}
// Check if its time to send a keepAlive packet
@@ -891,7 +895,7 @@
osEvent evt = mqueue.get(10);
if (evt.status == osEventMessage) {
- printf("[Thread:%d]Got message to publish! ... \r\n", Thread::gettid());
+ DBG("[Thread:%d]Got message to publish! ... \r\n", Thread::gettid());
// Unpack the message
PubMessage * message = (PubMessage *)evt.value.p;
@@ -916,7 +920,7 @@
reconnect:
// reconnect?
- printf("Client disconnected!! ... retrying ...\r\n");
+ DBG("Client disconnected!! ... retrying ...\r\n");
disconnect();
};
@@ -926,4 +930,6 @@
{
// TODO: Set a signal/flag that the running thread
// will check if its ok to stop ...
+}
+
}
\ No newline at end of file
