A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.
Fork of HelloMQTT by
Diff: MQTTThreadedClient.cpp
- Revision:
- 26:4b21de8043a5
- Parent:
- 25:326f00faa092
- Child:
- 27:c90092f35d79
--- a/MQTTThreadedClient.cpp Mon Mar 27 03:53:18 2017 +0000 +++ b/MQTTThreadedClient.cpp Mon Mar 27 13:45:26 2017 +0000 @@ -89,12 +89,12 @@ char *buf = new char[buf_size]; (void) data; - if (_debug) mbedtls_printf("\nVerifying certificate at depth %d:\n", depth); + if (_debug) mbedtls_printf("\nVerifying certificate at depth %d:\r\n", depth); mbedtls_x509_crt_info(buf, buf_size - 1, " ", crt); if (_debug) mbedtls_printf("%s", buf); if (*flags == 0) - if (_debug) mbedtls_printf("No verification issue for this certificate\n"); + if (_debug) mbedtls_printf("No verification issue for this certificate\r\n"); else { mbedtls_x509_crt_verify_info(buf, buf_size, " ! ", *flags); if (_debug) mbedtls_printf("%s\n", buf); @@ -108,7 +108,7 @@ void MQTTThreadedClient::setupTLS() { - if (ssl_ca_pem != NULL) + if (useTLS) { mbedtls_entropy_init(&_entropy); mbedtls_ctr_drbg_init(&_ctr_drbg); @@ -121,7 +121,7 @@ void MQTTThreadedClient::freeTLS() { - if (ssl_ca_pem != NULL) + if (useTLS) { mbedtls_entropy_free(&_entropy); mbedtls_ctr_drbg_free(&_ctr_drbg); @@ -140,14 +140,14 @@ if ((ret = mbedtls_ctr_drbg_seed(&_ctr_drbg, mbedtls_entropy_func, &_entropy, (const unsigned char *) DRBG_PERS, sizeof (DRBG_PERS))) != 0) { - printf("error [%d] mbedtls_crt_drbg_init", ret); + mbedtls_printf("mbedtls_crt_drbg_init returned [%x]\r\n", ret); _error = ret; return -1; } printf("mbedtls_x509_crt_parse ...\r\n"); if ((ret = mbedtls_x509_crt_parse(&_cacert, (const unsigned char *) ssl_ca_pem, strlen(ssl_ca_pem) + 1)) != 0) { - printf("error [%d] mbedtls_x509_crt_parse", ret); + mbedtls_printf("mbedtls_x509_crt_parse returned [%x]\r\n", ret); _error = ret; return -1; } @@ -157,7 +157,7 @@ MBEDTLS_SSL_IS_CLIENT, MBEDTLS_SSL_TRANSPORT_STREAM, MBEDTLS_SSL_PRESET_DEFAULT)) != 0) { - printf("error [%d] mbedtls_ssl_config_defaults", ret); + mbedtls_printf("mbedtls_ssl_config_defaults returned [%x]\r\n", ret); _error = ret; return -1; } @@ -181,7 +181,7 @@ printf("mbedtls_ssl_setup ...\r\n"); if ((ret = mbedtls_ssl_setup(&_ssl, &_ssl_conf)) != 0) { - printf("error [%d] mbedtls_ssl_setup", ret); + mbedtls_printf("mbedtls_ssl_setup returned [%x]\r\n", ret); _error = ret; return -1; } @@ -200,17 +200,13 @@ { if (ret != MBEDTLS_ERR_SSL_WANT_READ && ret != MBEDTLS_ERR_SSL_WANT_WRITE) - { - printf("error [%d] mbedtls_ssl_handshake", ret); - tcpSocket->close(); - _error = -1; - } + mbedtls_printf("mbedtls_ssl_handshake returned [%x]\r\n", ret); else { // do not close the socket if timed out - _error = ret; + ret = TIMEOUT; } - return -1; + return ret; } /* Handshake done, time to print info */ @@ -241,11 +237,14 @@ // TODO: Save the session here for reconnect. if( ( ret = mbedtls_ssl_get_session( &_ssl, &saved_session ) ) != 0 ) { - printf( "mbedtls_ssl_get_session returned -0x%x\n\n", -ret ); + mbedtls_printf( "mbedtls_ssl_get_session returned -0x%x\n\n", -ret ); + hasSavedSession = false; return -1; } printf("Session saved for reconnect ...\r\n"); + hasSavedSession = true; + return 0; } @@ -256,7 +255,7 @@ if (tcpSocket == NULL) return -1; - if (ssl_ca_pem != NULL) + if (useTLS) { // Do SSL/TLS read rc = mbedtls_ssl_read(&_ssl, (unsigned char *) buffer, size); @@ -284,7 +283,7 @@ if (tcpSocket == NULL) return -1; - if (ssl_ca_pem != NULL) { + if (useTLS) { // Do SSL/TLS write rc = mbedtls_ssl_write(&_ssl, (const unsigned char *) buffer, size); if (MBEDTLS_ERR_SSL_WANT_WRITE == rc) @@ -430,27 +429,27 @@ } -int MQTTThreadedClient::connect(MQTTPacket_connectData& options) +int MQTTThreadedClient::login() { int rc = FAILURE; int len = 0; - if (isConnected) + if (!isConnected) { - printf("Session already connected! \r\n"); + printf("Session not connected! \r\n"); return rc; } // Copy the keepAliveInterval value to local // MQTT specifies in seconds, we have to multiply that // amount for our 32 bit timers which accepts ms. - keepAliveInterval = (options.keepAliveInterval * 1000); + keepAliveInterval = (connect_options.keepAliveInterval * 1000); - printf("Connecting with: \r\n"); - printf("\tUsername: [%s]\r\n", options.username.cstring); - printf("\tPassword: [%s]\r\n", options.password.cstring); + printf("Login with: \r\n"); + printf("\tUsername: [%s]\r\n", connect_options.username.cstring); + printf("\tPassword: [%s]\r\n", connect_options.password.cstring); - if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0) + if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &connect_options)) <= 0) { printf("Error serializing connect packet ...\r\n"); return rc; @@ -478,11 +477,7 @@ if (rc == SUCCESS) { printf("Connected!!! ... starting connection timers ...\r\n"); - isConnected = true; resetConnectionTimer(); - }else - { - // TODO: Call socket->disconnect()? } printf("Returning with rc = %d\r\n", rc); @@ -495,24 +490,44 @@ { if (isConnected) { - // TODO: Send unsubscribe message ... + if( useTLS + && ( mbedtls_ssl_session_reset( &_ssl ) != 0 ) + ) + { + printf( "Session reset returned an error \r\n"); + } isConnected = false; tcpSocket->close(); } } -int MQTTThreadedClient::connect(const char * chost, uint16_t cport, MQTTPacket_connectData & options) +int MQTTThreadedClient::connect() { int ret = FAILURE; + + if ((network == NULL) || (tcpSocket == NULL) + || host.empty()) + { + printf("Network settings not set! \r\n"); + return ret; + } - // Copy the settings for reconnection - host = chost; - port = cport; - connect_options = options; - + if (useTLS) + { + if( ( ret = mbedtls_ssl_session_reset( &_ssl ) ) != 0 ) { + mbedtls_printf( " failed\n ! mbedtls_ssl_session_reset returned -0x%x\n\n", -ret ); + return ret; + } + + if ( hasSavedSession && (( ret = mbedtls_ssl_set_session( &_ssl, &saved_session ) ) != 0 )) { + mbedtls_printf( " failed\n ! mbedtls_ssl_conf_session returned %d\n\n", ret ); + return ret; + } + } + tcpSocket->open(network); - if (ssl_ca_pem != NULL) + if (useTLS) { printf("mbedtls_ssl_set_hostname ...\r\n"); mbedtls_ssl_set_hostname(&_ssl, host.c_str()); @@ -523,18 +538,31 @@ if (( ret = tcpSocket->connect(host.c_str(), port)) < 0 ) { - printf("Error connecting to %s:%d with %d\r\n", host.c_str(), port, ret); return ret; - } + }else + isConnected = true; - if ((ssl_ca_pem != NULL) && (doTLSHandshake() < 0)) + if (useTLS) { - printf("TLS Handshake failed! \r\n"); - return FAILURE; + + if (doTLSHandshake() < 0) + { + printf("TLS Handshake failed! \r\n"); + return FAILURE; + }else + printf("TLS Handshake complete!! \r\n"); } - return connect(connect_options); + return login(); +} + +void MQTTThreadedClient::setConnectionParameters(const char * chost, uint16_t cport, MQTTPacket_connectData & options) +{ + // Copy the settings for reconnection + host = chost; + port = cport; + connect_options = options; } int MQTTThreadedClient::publish(PubMessage& msg) @@ -593,7 +621,7 @@ FP<void,MessageData &> fp; fp.attach(function); - topicCBMap_t.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp)); + topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp)); } int MQTTThreadedClient::processSubscriptions() @@ -606,8 +634,10 @@ return 0; } + printf("Processing subscribed topics ....\r\n"); + std::map<std::string, FP<void, MessageData &> >::iterator it; - for(it = topicCBMap_t.begin(); it != topicCBMap_t.end(); it++) + for(it = topicCBMap.begin(); it != topicCBMap.end(); it++) { int rc = FAILURE; int len = 0; @@ -653,71 +683,6 @@ return numsubscribed; } -int MQTTThreadedClient::subscribe(const char * topicstr, QoS qos, void (*function)(MessageData &)) -{ - int rc = FAILURE; - int len = 0; - - MQTTString topic = {(char*)topicstr, {0, 0}}; - printf("Subscribing to topic [%s]\r\n", topicstr); - - if (!isConnected) - { - printf("Session already connected!!\r\n"); - return rc; - } - - len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); - if (len <= 0) - { - printf("Error serializing subscribe packet ...\r\n"); - return rc; - } - - if ((rc = sendPacket(len)) != SUCCESS) - { - printf("Error sending subscribe packet [%d]\r\n", rc); - return rc; - } - - printf("Waiting for subscription ack ...\r\n"); - // Wait for SUBACK, dropping packets read along the way ... - if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) // wait for suback - { - int count = 0, grantedQoS = -1; - unsigned short mypacketid; - if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) - rc = grantedQoS; // 0, 1, 2 or 0x80 - // For as long as we do not get 0x80 .. - if (rc != 0x80) - { - // Add message handlers to the map - FP<void,MessageData &> fp; - fp.attach(function); - - topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp)); - - // Reset connection timers here ... - resetConnectionTimer(); - - printf("Successfully subscribed to %s ...\r\n", topicstr); - rc = SUCCESS; - }else - { - printf("Failed to subscribe to topic %s ... (not authorized?)\r\n", topicstr); - } - } - else - { - printf("Failed to subscribe to topic %s (ack not received) ...\r\n", topicstr); - rc = FAILURE; - } - - return rc; - -} - - bool MQTTThreadedClient::isTopicMatched(char* topicFilter, MQTTString& topicName) { char* curf = topicFilter; @@ -843,23 +808,43 @@ void MQTTThreadedClient::startListener() { int pType; + int numsubs; // Continuesly listens for packets and dispatch // message handlers ... - do { - // Connect to server ... - if (!isConnected) + if (useTLS) + { + initTLS(); + } + + while(true) + { + + // Attempt to reconnect and login + if ( connect() < 0 ) { - // Attempt to reconnect ... + disconnect(); + // Wait for a few secs and reconnect ... + Thread::wait(6000); + continue; } - while(true) { + numsubs = processSubscriptions(); + printf("Subscribed %d topics ...\r\n", numsubs); + + // loop read + while(true) + { pType = readPacket(); - switch(pType) { + switch(pType) + { case TIMEOUT: // No data available from the network ... break; case FAILURE: - goto reconnect; + { + printf("readPacket returned failure \r\n"); + goto reconnect; + } case BUFFER_OVERFLOW: { // TODO: Network error, do we disconnect and reconnect? @@ -930,8 +915,14 @@ } // end while loop reconnect: - disconnect(); - // reconnect? - } while(true); + // reconnect? + disconnect(); + + }; } - \ No newline at end of file + +void MQTTThreadedClient::stopListener() +{ + // TODO: Set a signal/flag that the running thread + // will check if its ok to stop ... +} \ No newline at end of file