A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.
Fork of HelloMQTT by
Revision 26:4b21de8043a5, committed 2017-03-27
- Comitter:
- vpcola
- Date:
- Mon Mar 27 13:45:26 2017 +0000
- Parent:
- 25:326f00faa092
- Child:
- 27:c90092f35d79
- Commit message:
- TLS now working
Changed in this revision
--- a/MQTTNetwork.h Mon Mar 27 03:53:18 2017 +0000
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,43 +0,0 @@
-#ifndef _MQTTNETWORK_H_
-#define _MQTTNETWORK_H_
-
-#include "NetworkInterface.h"
-
-class MQTTNetwork {
-public:
- MQTTNetwork(NetworkInterface* aNetwork) : network(aNetwork)
- {
- socket = new TCPSocket();
- //socket->set_blocking(false);
- }
-
- ~MQTTNetwork() {
- delete socket;
- }
-
- int read(unsigned char* buffer, int len, int timeout) {
- socket->set_blocking(false);
- socket->set_timeout(timeout);
- return socket->recv(buffer, len);
- }
-
- int write(unsigned char* buffer, int len, int timeout) {
- socket->set_blocking(true);
- return socket->send(buffer, len);
- }
-
- int connect(const char* hostname, int port) {
- socket->open(network);
- return socket->connect(hostname, port);
- }
-
- void disconnect() {
-
- }
-
-private:
- NetworkInterface* network;
- TCPSocket* socket;
-};
-
-#endif // _MQTTNETWORK_H_
--- a/MQTTThreadedClient.cpp Mon Mar 27 03:53:18 2017 +0000
+++ b/MQTTThreadedClient.cpp Mon Mar 27 13:45:26 2017 +0000
@@ -89,12 +89,12 @@
char *buf = new char[buf_size];
(void) data;
- if (_debug) mbedtls_printf("\nVerifying certificate at depth %d:\n", depth);
+ if (_debug) mbedtls_printf("\nVerifying certificate at depth %d:\r\n", depth);
mbedtls_x509_crt_info(buf, buf_size - 1, " ", crt);
if (_debug) mbedtls_printf("%s", buf);
if (*flags == 0)
- if (_debug) mbedtls_printf("No verification issue for this certificate\n");
+ if (_debug) mbedtls_printf("No verification issue for this certificate\r\n");
else {
mbedtls_x509_crt_verify_info(buf, buf_size, " ! ", *flags);
if (_debug) mbedtls_printf("%s\n", buf);
@@ -108,7 +108,7 @@
void MQTTThreadedClient::setupTLS()
{
- if (ssl_ca_pem != NULL)
+ if (useTLS)
{
mbedtls_entropy_init(&_entropy);
mbedtls_ctr_drbg_init(&_ctr_drbg);
@@ -121,7 +121,7 @@
void MQTTThreadedClient::freeTLS()
{
- if (ssl_ca_pem != NULL)
+ if (useTLS)
{
mbedtls_entropy_free(&_entropy);
mbedtls_ctr_drbg_free(&_ctr_drbg);
@@ -140,14 +140,14 @@
if ((ret = mbedtls_ctr_drbg_seed(&_ctr_drbg, mbedtls_entropy_func, &_entropy,
(const unsigned char *) DRBG_PERS,
sizeof (DRBG_PERS))) != 0) {
- printf("error [%d] mbedtls_crt_drbg_init", ret);
+ mbedtls_printf("mbedtls_crt_drbg_init returned [%x]\r\n", ret);
_error = ret;
return -1;
}
printf("mbedtls_x509_crt_parse ...\r\n");
if ((ret = mbedtls_x509_crt_parse(&_cacert, (const unsigned char *) ssl_ca_pem,
strlen(ssl_ca_pem) + 1)) != 0) {
- printf("error [%d] mbedtls_x509_crt_parse", ret);
+ mbedtls_printf("mbedtls_x509_crt_parse returned [%x]\r\n", ret);
_error = ret;
return -1;
}
@@ -157,7 +157,7 @@
MBEDTLS_SSL_IS_CLIENT,
MBEDTLS_SSL_TRANSPORT_STREAM,
MBEDTLS_SSL_PRESET_DEFAULT)) != 0) {
- printf("error [%d] mbedtls_ssl_config_defaults", ret);
+ mbedtls_printf("mbedtls_ssl_config_defaults returned [%x]\r\n", ret);
_error = ret;
return -1;
}
@@ -181,7 +181,7 @@
printf("mbedtls_ssl_setup ...\r\n");
if ((ret = mbedtls_ssl_setup(&_ssl, &_ssl_conf)) != 0) {
- printf("error [%d] mbedtls_ssl_setup", ret);
+ mbedtls_printf("mbedtls_ssl_setup returned [%x]\r\n", ret);
_error = ret;
return -1;
}
@@ -200,17 +200,13 @@
{
if (ret != MBEDTLS_ERR_SSL_WANT_READ &&
ret != MBEDTLS_ERR_SSL_WANT_WRITE)
- {
- printf("error [%d] mbedtls_ssl_handshake", ret);
- tcpSocket->close();
- _error = -1;
- }
+ mbedtls_printf("mbedtls_ssl_handshake returned [%x]\r\n", ret);
else
{
// do not close the socket if timed out
- _error = ret;
+ ret = TIMEOUT;
}
- return -1;
+ return ret;
}
/* Handshake done, time to print info */
@@ -241,11 +237,14 @@
// TODO: Save the session here for reconnect.
if( ( ret = mbedtls_ssl_get_session( &_ssl, &saved_session ) ) != 0 )
{
- printf( "mbedtls_ssl_get_session returned -0x%x\n\n", -ret );
+ mbedtls_printf( "mbedtls_ssl_get_session returned -0x%x\n\n", -ret );
+ hasSavedSession = false;
return -1;
}
printf("Session saved for reconnect ...\r\n");
+ hasSavedSession = true;
+
return 0;
}
@@ -256,7 +255,7 @@
if (tcpSocket == NULL)
return -1;
- if (ssl_ca_pem != NULL)
+ if (useTLS)
{
// Do SSL/TLS read
rc = mbedtls_ssl_read(&_ssl, (unsigned char *) buffer, size);
@@ -284,7 +283,7 @@
if (tcpSocket == NULL)
return -1;
- if (ssl_ca_pem != NULL) {
+ if (useTLS) {
// Do SSL/TLS write
rc = mbedtls_ssl_write(&_ssl, (const unsigned char *) buffer, size);
if (MBEDTLS_ERR_SSL_WANT_WRITE == rc)
@@ -430,27 +429,27 @@
}
-int MQTTThreadedClient::connect(MQTTPacket_connectData& options)
+int MQTTThreadedClient::login()
{
int rc = FAILURE;
int len = 0;
- if (isConnected)
+ if (!isConnected)
{
- printf("Session already connected! \r\n");
+ printf("Session not connected! \r\n");
return rc;
}
// Copy the keepAliveInterval value to local
// MQTT specifies in seconds, we have to multiply that
// amount for our 32 bit timers which accepts ms.
- keepAliveInterval = (options.keepAliveInterval * 1000);
+ keepAliveInterval = (connect_options.keepAliveInterval * 1000);
- printf("Connecting with: \r\n");
- printf("\tUsername: [%s]\r\n", options.username.cstring);
- printf("\tPassword: [%s]\r\n", options.password.cstring);
+ printf("Login with: \r\n");
+ printf("\tUsername: [%s]\r\n", connect_options.username.cstring);
+ printf("\tPassword: [%s]\r\n", connect_options.password.cstring);
- if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0)
+ if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &connect_options)) <= 0)
{
printf("Error serializing connect packet ...\r\n");
return rc;
@@ -478,11 +477,7 @@
if (rc == SUCCESS)
{
printf("Connected!!! ... starting connection timers ...\r\n");
- isConnected = true;
resetConnectionTimer();
- }else
- {
- // TODO: Call socket->disconnect()?
}
printf("Returning with rc = %d\r\n", rc);
@@ -495,24 +490,44 @@
{
if (isConnected)
{
- // TODO: Send unsubscribe message ...
+ if( useTLS
+ && ( mbedtls_ssl_session_reset( &_ssl ) != 0 )
+ )
+ {
+ printf( "Session reset returned an error \r\n");
+ }
isConnected = false;
tcpSocket->close();
}
}
-int MQTTThreadedClient::connect(const char * chost, uint16_t cport, MQTTPacket_connectData & options)
+int MQTTThreadedClient::connect()
{
int ret = FAILURE;
+
+ if ((network == NULL) || (tcpSocket == NULL)
+ || host.empty())
+ {
+ printf("Network settings not set! \r\n");
+ return ret;
+ }
- // Copy the settings for reconnection
- host = chost;
- port = cport;
- connect_options = options;
-
+ if (useTLS)
+ {
+ if( ( ret = mbedtls_ssl_session_reset( &_ssl ) ) != 0 ) {
+ mbedtls_printf( " failed\n ! mbedtls_ssl_session_reset returned -0x%x\n\n", -ret );
+ return ret;
+ }
+
+ if ( hasSavedSession && (( ret = mbedtls_ssl_set_session( &_ssl, &saved_session ) ) != 0 )) {
+ mbedtls_printf( " failed\n ! mbedtls_ssl_conf_session returned %d\n\n", ret );
+ return ret;
+ }
+ }
+
tcpSocket->open(network);
- if (ssl_ca_pem != NULL)
+ if (useTLS)
{
printf("mbedtls_ssl_set_hostname ...\r\n");
mbedtls_ssl_set_hostname(&_ssl, host.c_str());
@@ -523,18 +538,31 @@
if (( ret = tcpSocket->connect(host.c_str(), port)) < 0 )
{
-
printf("Error connecting to %s:%d with %d\r\n", host.c_str(), port, ret);
return ret;
- }
+ }else
+ isConnected = true;
- if ((ssl_ca_pem != NULL) && (doTLSHandshake() < 0))
+ if (useTLS)
{
- printf("TLS Handshake failed! \r\n");
- return FAILURE;
+
+ if (doTLSHandshake() < 0)
+ {
+ printf("TLS Handshake failed! \r\n");
+ return FAILURE;
+ }else
+ printf("TLS Handshake complete!! \r\n");
}
- return connect(connect_options);
+ return login();
+}
+
+void MQTTThreadedClient::setConnectionParameters(const char * chost, uint16_t cport, MQTTPacket_connectData & options)
+{
+ // Copy the settings for reconnection
+ host = chost;
+ port = cport;
+ connect_options = options;
}
int MQTTThreadedClient::publish(PubMessage& msg)
@@ -593,7 +621,7 @@
FP<void,MessageData &> fp;
fp.attach(function);
- topicCBMap_t.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));
+ topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));
}
int MQTTThreadedClient::processSubscriptions()
@@ -606,8 +634,10 @@
return 0;
}
+ printf("Processing subscribed topics ....\r\n");
+
std::map<std::string, FP<void, MessageData &> >::iterator it;
- for(it = topicCBMap_t.begin(); it != topicCBMap_t.end(); it++)
+ for(it = topicCBMap.begin(); it != topicCBMap.end(); it++)
{
int rc = FAILURE;
int len = 0;
@@ -653,71 +683,6 @@
return numsubscribed;
}
-int MQTTThreadedClient::subscribe(const char * topicstr, QoS qos, void (*function)(MessageData &))
-{
- int rc = FAILURE;
- int len = 0;
-
- MQTTString topic = {(char*)topicstr, {0, 0}};
- printf("Subscribing to topic [%s]\r\n", topicstr);
-
- if (!isConnected)
- {
- printf("Session already connected!!\r\n");
- return rc;
- }
-
- len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
- if (len <= 0)
- {
- printf("Error serializing subscribe packet ...\r\n");
- return rc;
- }
-
- if ((rc = sendPacket(len)) != SUCCESS)
- {
- printf("Error sending subscribe packet [%d]\r\n", rc);
- return rc;
- }
-
- printf("Waiting for subscription ack ...\r\n");
- // Wait for SUBACK, dropping packets read along the way ...
- if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) // wait for suback
- {
- int count = 0, grantedQoS = -1;
- unsigned short mypacketid;
- if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
- rc = grantedQoS; // 0, 1, 2 or 0x80
- // For as long as we do not get 0x80 ..
- if (rc != 0x80)
- {
- // Add message handlers to the map
- FP<void,MessageData &> fp;
- fp.attach(function);
-
- topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));
-
- // Reset connection timers here ...
- resetConnectionTimer();
-
- printf("Successfully subscribed to %s ...\r\n", topicstr);
- rc = SUCCESS;
- }else
- {
- printf("Failed to subscribe to topic %s ... (not authorized?)\r\n", topicstr);
- }
- }
- else
- {
- printf("Failed to subscribe to topic %s (ack not received) ...\r\n", topicstr);
- rc = FAILURE;
- }
-
- return rc;
-
-}
-
-
bool MQTTThreadedClient::isTopicMatched(char* topicFilter, MQTTString& topicName)
{
char* curf = topicFilter;
@@ -843,23 +808,43 @@
void MQTTThreadedClient::startListener()
{
int pType;
+ int numsubs;
// Continuesly listens for packets and dispatch
// message handlers ...
- do {
- // Connect to server ...
- if (!isConnected)
+ if (useTLS)
+ {
+ initTLS();
+ }
+
+ while(true)
+ {
+
+ // Attempt to reconnect and login
+ if ( connect() < 0 )
{
- // Attempt to reconnect ...
+ disconnect();
+ // Wait for a few secs and reconnect ...
+ Thread::wait(6000);
+ continue;
}
- while(true) {
+ numsubs = processSubscriptions();
+ printf("Subscribed %d topics ...\r\n", numsubs);
+
+ // loop read
+ while(true)
+ {
pType = readPacket();
- switch(pType) {
+ switch(pType)
+ {
case TIMEOUT:
// No data available from the network ...
break;
case FAILURE:
- goto reconnect;
+ {
+ printf("readPacket returned failure \r\n");
+ goto reconnect;
+ }
case BUFFER_OVERFLOW:
{
// TODO: Network error, do we disconnect and reconnect?
@@ -930,8 +915,14 @@
} // end while loop
reconnect:
- disconnect();
- // reconnect?
- } while(true);
+ // reconnect?
+ disconnect();
+
+ };
}
-
\ No newline at end of file
+
+void MQTTThreadedClient::stopListener()
+{
+ // TODO: Set a signal/flag that the running thread
+ // will check if its ok to stop ...
+}
\ No newline at end of file
--- a/MQTTThreadedClient.h Mon Mar 27 03:53:18 2017 +0000
+++ b/MQTTThreadedClient.h Mon Mar 27 13:45:26 2017 +0000
@@ -80,8 +80,11 @@
MQTTThreadedClient(NetworkInterface * aNetwork, const char * pem = NULL)
: network(aNetwork),
ssl_ca_pem(pem),
+ port((pem != NULL) ? 8883 : 1883),
queue(32 * EVENTS_EVENT_SIZE),
- isConnected(false)
+ isConnected(false),
+ hasSavedSession(false),
+ useTLS(pem != NULL)
{
DRBG_PERS = "mbed TLS MQTT client";
tcpSocket = new TCPSocket();
@@ -98,10 +101,7 @@
}
-
- int connect(const char * host, uint16_t port, MQTTPacket_connectData & options);
- void disconnect();
-
+ void setConnectionParameters(const char * host, uint16_t port, MQTTPacket_connectData & options);
int publish(PubMessage& message);
void addTopicHandler(const char * topic, void (*function)(MessageData &));
@@ -111,68 +111,20 @@
FP<void,MessageData &> fp;
fp.attach(object, member);
- topicCBMap_t.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));
+ topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topic),fp));
}
- int subscribe(const char * topic, QoS qos, void (*function)(MessageData &));
- template<typename T>
- int subscribe(const char * topicstr, QoS qos, T *object, void (T::*member)(MessageData &)) {
- int rc = FAILURE;
- int len = 0;
-
- MQTTString topic = {(char*)topicstr, {0, 0}};
- printf("Subscribing to topic [%s]\r\n", topicstr);
-
- if (!isConnected) {
- printf("Session already connected!!\r\n");
- return rc;
- }
-
- len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
- if (len <= 0) {
- printf("Error serializing subscribe packet ...\r\n");
- return rc;
- }
-
- if ((rc = sendPacket(len)) != SUCCESS) {
- printf("Error sending subscribe packet [%d]\r\n", rc);
- return rc;
- }
- printf("Waiting for subscription ack ...\r\n");
- // Wait for SUBACK, dropping packets read along the way ...
- if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback
- int count = 0, grantedQoS = -1;
- unsigned short mypacketid;
- if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
- rc = grantedQoS; // 0, 1, 2 or 0x80
- // For as long as we do not get 0x80 ..
- if (rc != 0x80) {
- // Add message handlers to the map
- FP<void,MessageData &> fp;
- fp.attach(object, member);
-
- topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));
-
- // Reset connection timers here ...
- resetConnectionTimer();
- printf("Successfully subscribed to %s ...\r\n", topicstr);
- rc = SUCCESS;
- }
- } else
- {
- printf("Failed to subscribe to topic %s (ack not received) ...\r\n", topicstr);
- rc = FAILURE;
- }
-
- return rc;
- }
-
- // the listener thread ...
+ // Start the listener thread and start polling
+ // MQTT server.
void startListener();
+ // Stop the listerner thread and closes connection
+ void stopListener();
protected:
- int connect(MQTTPacket_connectData& options);
+
int handlePublishMsg();
+ void disconnect();
+ int connect();
private:
@@ -186,11 +138,10 @@
std::string host;
uint16_t port;
MQTTPacket_connectData connect_options;
-
-
// Event queue
EventQueue queue;
bool isConnected;
+ bool hasSavedSession;
// TODO: Because I'm using a map, I can only have one handler
// for each topic (one that's mapped to the topic string).
@@ -198,7 +149,6 @@
// In the future, use a vector instead of maps to allow multiple
// handlers for the same topic.
std::map<std::string, FP<void, MessageData &> > topicCBMap;
- std::map<std::string, FP<void, MessageData &> > topicCBMap_t;
unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
@@ -207,6 +157,7 @@
Timer comTimer;
// SSL/TLS functions
+ bool useTLS;
void setupTLS();
int initTLS();
void freeTLS();
@@ -224,7 +175,7 @@
void resetConnectionTimer();
void sendPingRequest();
bool hasConnectionTimedOut();
-
+ int login();
};
#endif
--- a/main.cpp Mon Mar 27 03:53:18 2017 +0000
+++ b/main.cpp Mon Mar 27 13:45:26 2017 +0000
@@ -33,7 +33,76 @@
Serial pc(USBTX, USBRX, 115200);
-Thread msgSender;
+Thread msgSender(osPriorityNormal, DEFAULT_STACK_SIZE * 2);
+
+#define MQTT_USE_TLS
+
+#ifdef MQTT_USE_TLS
+/* List of trusted root CA certificates
+ * currently only "letsencrypt", the CA for mbedhacks.com
+ *
+ * To add more than one root, just concatenate them.
+ *
+ * TODO: Move this certificate file onto the SD card.
+ */
+static const char SSL_CA_PEM[] = "-----BEGIN CERTIFICATE-----\n"
+ "MIIFETCCA/mgAwIBAgISA2ktlb1Y6ap4GCH7dg3wS37XMA0GCSqGSIb3DQEBCwUA\n"
+ "MEoxCzAJBgNVBAYTAlVTMRYwFAYDVQQKEw1MZXQncyBFbmNyeXB0MSMwIQYDVQQD\n"
+ "ExpMZXQncyBFbmNyeXB0IEF1dGhvcml0eSBYMzAeFw0xNzAzMDkwMTQ4MDBaFw0x\n"
+ "NzA2MDcwMTQ4MDBaMBgxFjAUBgNVBAMTDW1iZWRoYWNrcy5jb20wggEiMA0GCSqG\n"
+ "SIb3DQEBAQUAA4IBDwAwggEKAoIBAQC4ppYHlH8lfB7lkWOjMSnOJGaLtCBfz57I\n"
+ "VVOd1Rngsz7nE5fg3joa7lkazRY1ZqtuC2UloS+4LYoQZX4Z887dhdug/TPA4J1A\n"
+ "GppA4xVCb2kUFODMjZ2r4pMLp+MjFFMBaHrL4cgx/n4aJUB+N9Z+HW0p2Yr5TsOQ\n"
+ "ghIOPkNxFr2q6klm49+BMUbO98hAwFwsIISLf6IbHM93gx1ltqkvb55N87ZM1hYH\n"
+ "fkq+J+YqjleiLaqRN2MVlNMNfy9MDbqM5uCyGiWGtq8eiQLaWpZkxnA2MC5zPsO/\n"
+ "fzEWiVjn2uazlXZ5xZwiK22KMxVasqWMitvETtmPOl9mocRbLQdxAgMBAAGjggIh\n"
+ "MIICHTAOBgNVHQ8BAf8EBAMCBaAwHQYDVR0lBBYwFAYIKwYBBQUHAwEGCCsGAQUF\n"
+ "BwMCMAwGA1UdEwEB/wQCMAAwHQYDVR0OBBYEFCsgG+z1BTjrN3K+/tF0C4k818Yv\n"
+ "MB8GA1UdIwQYMBaAFKhKamMEfd265tE5t6ZFZe/zqOyhMHAGCCsGAQUFBwEBBGQw\n"
+ "YjAvBggrBgEFBQcwAYYjaHR0cDovL29jc3AuaW50LXgzLmxldHNlbmNyeXB0Lm9y\n"
+ "Zy8wLwYIKwYBBQUHMAKGI2h0dHA6Ly9jZXJ0LmludC14My5sZXRzZW5jcnlwdC5v\n"
+ "cmcvMCsGA1UdEQQkMCKCDW1iZWRoYWNrcy5jb22CEXd3dy5tYmVkaGFja3MuY29t\n"
+ "MIH+BgNVHSAEgfYwgfMwCAYGZ4EMAQIBMIHmBgsrBgEEAYLfEwEBATCB1jAmBggr\n"
+ "BgEFBQcCARYaaHR0cDovL2Nwcy5sZXRzZW5jcnlwdC5vcmcwgasGCCsGAQUFBwIC\n"
+ "MIGeDIGbVGhpcyBDZXJ0aWZpY2F0ZSBtYXkgb25seSBiZSByZWxpZWQgdXBvbiBi\n"
+ "eSBSZWx5aW5nIFBhcnRpZXMgYW5kIG9ubHkgaW4gYWNjb3JkYW5jZSB3aXRoIHRo\n"
+ "ZSBDZXJ0aWZpY2F0ZSBQb2xpY3kgZm91bmQgYXQgaHR0cHM6Ly9sZXRzZW5jcnlw\n"
+ "dC5vcmcvcmVwb3NpdG9yeS8wDQYJKoZIhvcNAQELBQADggEBABFH6YcvHh8foHeg\n"
+ "NM7iR9HnYRqa5gSERcCtq6jm8PcTsAbsdQ/BNpIHK7AZSg2kk17kj+JFeyMuNJWq\n"
+ "lmabV0dtzdC8ejp1d7hGb/HjuQ400th/QRayvyrDVzQPfCNyJ0C82Q2DFjeUgnqv\n"
+ "oJMcV6i4ICW0boI7GUf7oeHCmrUEHKffAbeFvx3c85c39IHJEFa59UWj1linU/Tr\n"
+ "g9i5AaSKB95d706u1XRA7WLV/Hu7yunhxEjlj33bfdifBb/ZLBd0LtrXPwtXi6E8\n"
+ "r6obp+B+Ce89G7WEhdT9BX0ck1KTK+yP7uAC7tvvsiejxXOoCtVyBAumBJS7mRuv\n"
+ "I5hmKgE=\n"
+ "-----END CERTIFICATE-----\n"
+ "-----BEGIN CERTIFICATE-----\n"
+ "MIIEkjCCA3qgAwIBAgIQCgFBQgAAAVOFc2oLheynCDANBgkqhkiG9w0BAQsFADA/\n"
+ "MSQwIgYDVQQKExtEaWdpdGFsIFNpZ25hdHVyZSBUcnVzdCBDby4xFzAVBgNVBAMT\n"
+ "DkRTVCBSb290IENBIFgzMB4XDTE2MDMxNzE2NDA0NloXDTIxMDMxNzE2NDA0Nlow\n"
+ "SjELMAkGA1UEBhMCVVMxFjAUBgNVBAoTDUxldCdzIEVuY3J5cHQxIzAhBgNVBAMT\n"
+ "GkxldCdzIEVuY3J5cHQgQXV0aG9yaXR5IFgzMIIBIjANBgkqhkiG9w0BAQEFAAOC\n"
+ "AQ8AMIIBCgKCAQEAnNMM8FrlLke3cl03g7NoYzDq1zUmGSXhvb418XCSL7e4S0EF\n"
+ "q6meNQhY7LEqxGiHC6PjdeTm86dicbp5gWAf15Gan/PQeGdxyGkOlZHP/uaZ6WA8\n"
+ "SMx+yk13EiSdRxta67nsHjcAHJyse6cF6s5K671B5TaYucv9bTyWaN8jKkKQDIZ0\n"
+ "Z8h/pZq4UmEUEz9l6YKHy9v6Dlb2honzhT+Xhq+w3Brvaw2VFn3EK6BlspkENnWA\n"
+ "a6xK8xuQSXgvopZPKiAlKQTGdMDQMc2PMTiVFrqoM7hD8bEfwzB/onkxEz0tNvjj\n"
+ "/PIzark5McWvxI0NHWQWM6r6hCm21AvA2H3DkwIDAQABo4IBfTCCAXkwEgYDVR0T\n"
+ "AQH/BAgwBgEB/wIBADAOBgNVHQ8BAf8EBAMCAYYwfwYIKwYBBQUHAQEEczBxMDIG\n"
+ "CCsGAQUFBzABhiZodHRwOi8vaXNyZy50cnVzdGlkLm9jc3AuaWRlbnRydXN0LmNv\n"
+ "bTA7BggrBgEFBQcwAoYvaHR0cDovL2FwcHMuaWRlbnRydXN0LmNvbS9yb290cy9k\n"
+ "c3Ryb290Y2F4My5wN2MwHwYDVR0jBBgwFoAUxKexpHsscfrb4UuQdf/EFWCFiRAw\n"
+ "VAYDVR0gBE0wSzAIBgZngQwBAgEwPwYLKwYBBAGC3xMBAQEwMDAuBggrBgEFBQcC\n"
+ "ARYiaHR0cDovL2Nwcy5yb290LXgxLmxldHNlbmNyeXB0Lm9yZzA8BgNVHR8ENTAz\n"
+ "MDGgL6AthitodHRwOi8vY3JsLmlkZW50cnVzdC5jb20vRFNUUk9PVENBWDNDUkwu\n"
+ "Y3JsMB0GA1UdDgQWBBSoSmpjBH3duubRObemRWXv86jsoTANBgkqhkiG9w0BAQsF\n"
+ "AAOCAQEA3TPXEfNjWDjdGBX7CVW+dla5cEilaUcne8IkCJLxWh9KEik3JHRRHGJo\n"
+ "uM2VcGfl96S8TihRzZvoroed6ti6WqEBmtzw3Wodatg+VyOeph4EYpr/1wXKtx8/\n"
+ "wApIvJSwtmVi4MFU5aMqrSDE6ea73Mj2tcMyo5jMd6jmeWUHK8so/joWUoHOUgwu\n"
+ "X4Po1QYz+3dszkDqMp4fklxBwXRsW10KXzPMTZ+sOPAveyxindmjkW8lGy+QsRlG\n"
+ "PfZ+G6Z6h7mjem0Y+iWlkYcV4PIWL1iwBi8saCbGS5jN2p8M+X+Q7UNKEkROb3N6\n"
+ "KOqkqm57TH2H3eDJAkSnh6/DNFu0Qg==\n"
+ "-----END CERTIFICATE-----";
+#endif
static const char * clientID = "mbed-sample";
static const char * userID = "mbedhacks";
@@ -84,30 +153,30 @@
return -1;
}
+#ifdef MQTT_USE_TLS
+ MQTTThreadedClient mqtt(network, SSL_CA_PEM);
+#else
MQTTThreadedClient mqtt(network);
+#endif
+
const char* hostname = "mqtt.mbedhacks.com";
// const char* hostname = "192.168.0.7";
+#ifdef MQTT_USE_TLS
+ int port = 8883;
+#else
int port = 1883;
- printf("Connecting to %s:%d\r\n", hostname, port);
-
+#endif
- MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
- data.MQTTVersion = 3;
- data.clientID.cstring = (char *) clientID;
- data.username.cstring = (char *) userID;
- data.password.cstring = (char *) password;
- data.keepAliveInterval = 100; // default is 60
-
- int rc = mqtt.connect(hostname, port, data);
- if (rc != 0)
- printf("rc from TCP connect is %d\r\n", rc);
-
- if ((rc = mqtt.subscribe(topic_1, QOS0, messageArrived)) != 0)
- printf("rc from MQTT subscribe 1 is %d\r\n", rc);
-
- if ((rc = mqtt.subscribe(topic_2, QOS0, &testcb, &CallbackTest::messageArrived)) != 0)
- printf("rc from MQTT subscribe 2 is %d\r\b", rc);
+ MQTTPacket_connectData logindata = MQTTPacket_connectData_initializer;
+ logindata.MQTTVersion = 3;
+ logindata.clientID.cstring = (char *) clientID;
+ logindata.username.cstring = (char *) userID;
+ logindata.password.cstring = (char *) password;
+
+ mqtt.setConnectionParameters(hostname, port, logindata);
+ mqtt.addTopicHandler(topic_1, messageArrived);
+ mqtt.addTopicHandler(topic_2, &testcb, &CallbackTest::messageArrived);
// Start the data producer
msgSender.start(mbed::callback(&mqtt, &MQTTThreadedClient::startListener));
