Fork of Hello MQTT, using mbed TLS for secure mqtt transport
Fork of HelloMQTT by
Diff: main.cpp
- Revision:
- 21:4534812bb94f
- Parent:
- 20:49c9daf2b0ff
- Child:
- 22:4d0628d13870
--- a/main.cpp Tue Jan 10 18:10:17 2017 -0600 +++ b/main.cpp Fri Mar 17 08:42:29 2017 +0000 @@ -26,114 +26,260 @@ */ // change this to 0 to output messages to serial instead of LCD -#define USE_LCD 1 -#if USE_LCD -#include "C12832.h" - -// the actual pins are defined in mbed_app.json and can be overridden per target -C12832 lcd(LCD_MOSI, LCD_SCK, LCD_MISO, LCD_A0, LCD_NCS); - -#define logMessage lcd.cls();lcd.printf - -#else - -#define logMessage printf - -#endif #define MQTTCLIENT_QOS2 1 +#include "mbed.h" +#include "rtos.h" #include "easy-connect.h" -#include "MQTTNetwork.h" +#include "MQTTSNetwork.h" #include "MQTTmbed.h" #include "MQTTClient.h" -int arrivedcount = 0; +/* List of trusted root CA certificates + * currently only "letsencrypt", the CA for mbedhacks.com + * + * To add more than one root, just concatenate them. + * + * TODO: Move this certificate file onto the SD card. + */ +static const char SSL_CA_PEM[] = "-----BEGIN CERTIFICATE-----\n" + "MIIFETCCA/mgAwIBAgISA2ktlb1Y6ap4GCH7dg3wS37XMA0GCSqGSIb3DQEBCwUA\n" + "MEoxCzAJBgNVBAYTAlVTMRYwFAYDVQQKEw1MZXQncyBFbmNyeXB0MSMwIQYDVQQD\n" + "ExpMZXQncyBFbmNyeXB0IEF1dGhvcml0eSBYMzAeFw0xNzAzMDkwMTQ4MDBaFw0x\n" + "NzA2MDcwMTQ4MDBaMBgxFjAUBgNVBAMTDW1iZWRoYWNrcy5jb20wggEiMA0GCSqG\n" + "SIb3DQEBAQUAA4IBDwAwggEKAoIBAQC4ppYHlH8lfB7lkWOjMSnOJGaLtCBfz57I\n" + "VVOd1Rngsz7nE5fg3joa7lkazRY1ZqtuC2UloS+4LYoQZX4Z887dhdug/TPA4J1A\n" + "GppA4xVCb2kUFODMjZ2r4pMLp+MjFFMBaHrL4cgx/n4aJUB+N9Z+HW0p2Yr5TsOQ\n" + "ghIOPkNxFr2q6klm49+BMUbO98hAwFwsIISLf6IbHM93gx1ltqkvb55N87ZM1hYH\n" + "fkq+J+YqjleiLaqRN2MVlNMNfy9MDbqM5uCyGiWGtq8eiQLaWpZkxnA2MC5zPsO/\n" + "fzEWiVjn2uazlXZ5xZwiK22KMxVasqWMitvETtmPOl9mocRbLQdxAgMBAAGjggIh\n" + "MIICHTAOBgNVHQ8BAf8EBAMCBaAwHQYDVR0lBBYwFAYIKwYBBQUHAwEGCCsGAQUF\n" + "BwMCMAwGA1UdEwEB/wQCMAAwHQYDVR0OBBYEFCsgG+z1BTjrN3K+/tF0C4k818Yv\n" + "MB8GA1UdIwQYMBaAFKhKamMEfd265tE5t6ZFZe/zqOyhMHAGCCsGAQUFBwEBBGQw\n" + "YjAvBggrBgEFBQcwAYYjaHR0cDovL29jc3AuaW50LXgzLmxldHNlbmNyeXB0Lm9y\n" + "Zy8wLwYIKwYBBQUHMAKGI2h0dHA6Ly9jZXJ0LmludC14My5sZXRzZW5jcnlwdC5v\n" + "cmcvMCsGA1UdEQQkMCKCDW1iZWRoYWNrcy5jb22CEXd3dy5tYmVkaGFja3MuY29t\n" + "MIH+BgNVHSAEgfYwgfMwCAYGZ4EMAQIBMIHmBgsrBgEEAYLfEwEBATCB1jAmBggr\n" + "BgEFBQcCARYaaHR0cDovL2Nwcy5sZXRzZW5jcnlwdC5vcmcwgasGCCsGAQUFBwIC\n" + "MIGeDIGbVGhpcyBDZXJ0aWZpY2F0ZSBtYXkgb25seSBiZSByZWxpZWQgdXBvbiBi\n" + "eSBSZWx5aW5nIFBhcnRpZXMgYW5kIG9ubHkgaW4gYWNjb3JkYW5jZSB3aXRoIHRo\n" + "ZSBDZXJ0aWZpY2F0ZSBQb2xpY3kgZm91bmQgYXQgaHR0cHM6Ly9sZXRzZW5jcnlw\n" + "dC5vcmcvcmVwb3NpdG9yeS8wDQYJKoZIhvcNAQELBQADggEBABFH6YcvHh8foHeg\n" + "NM7iR9HnYRqa5gSERcCtq6jm8PcTsAbsdQ/BNpIHK7AZSg2kk17kj+JFeyMuNJWq\n" + "lmabV0dtzdC8ejp1d7hGb/HjuQ400th/QRayvyrDVzQPfCNyJ0C82Q2DFjeUgnqv\n" + "oJMcV6i4ICW0boI7GUf7oeHCmrUEHKffAbeFvx3c85c39IHJEFa59UWj1linU/Tr\n" + "g9i5AaSKB95d706u1XRA7WLV/Hu7yunhxEjlj33bfdifBb/ZLBd0LtrXPwtXi6E8\n" + "r6obp+B+Ce89G7WEhdT9BX0ck1KTK+yP7uAC7tvvsiejxXOoCtVyBAumBJS7mRuv\n" + "I5hmKgE=\n" + "-----END CERTIFICATE-----\n" + "-----BEGIN CERTIFICATE-----\n" + "MIIEkjCCA3qgAwIBAgIQCgFBQgAAAVOFc2oLheynCDANBgkqhkiG9w0BAQsFADA/\n" + "MSQwIgYDVQQKExtEaWdpdGFsIFNpZ25hdHVyZSBUcnVzdCBDby4xFzAVBgNVBAMT\n" + "DkRTVCBSb290IENBIFgzMB4XDTE2MDMxNzE2NDA0NloXDTIxMDMxNzE2NDA0Nlow\n" + "SjELMAkGA1UEBhMCVVMxFjAUBgNVBAoTDUxldCdzIEVuY3J5cHQxIzAhBgNVBAMT\n" + "GkxldCdzIEVuY3J5cHQgQXV0aG9yaXR5IFgzMIIBIjANBgkqhkiG9w0BAQEFAAOC\n" + "AQ8AMIIBCgKCAQEAnNMM8FrlLke3cl03g7NoYzDq1zUmGSXhvb418XCSL7e4S0EF\n" + "q6meNQhY7LEqxGiHC6PjdeTm86dicbp5gWAf15Gan/PQeGdxyGkOlZHP/uaZ6WA8\n" + "SMx+yk13EiSdRxta67nsHjcAHJyse6cF6s5K671B5TaYucv9bTyWaN8jKkKQDIZ0\n" + "Z8h/pZq4UmEUEz9l6YKHy9v6Dlb2honzhT+Xhq+w3Brvaw2VFn3EK6BlspkENnWA\n" + "a6xK8xuQSXgvopZPKiAlKQTGdMDQMc2PMTiVFrqoM7hD8bEfwzB/onkxEz0tNvjj\n" + "/PIzark5McWvxI0NHWQWM6r6hCm21AvA2H3DkwIDAQABo4IBfTCCAXkwEgYDVR0T\n" + "AQH/BAgwBgEB/wIBADAOBgNVHQ8BAf8EBAMCAYYwfwYIKwYBBQUHAQEEczBxMDIG\n" + "CCsGAQUFBzABhiZodHRwOi8vaXNyZy50cnVzdGlkLm9jc3AuaWRlbnRydXN0LmNv\n" + "bTA7BggrBgEFBQcwAoYvaHR0cDovL2FwcHMuaWRlbnRydXN0LmNvbS9yb290cy9k\n" + "c3Ryb290Y2F4My5wN2MwHwYDVR0jBBgwFoAUxKexpHsscfrb4UuQdf/EFWCFiRAw\n" + "VAYDVR0gBE0wSzAIBgZngQwBAgEwPwYLKwYBBAGC3xMBAQEwMDAuBggrBgEFBQcC\n" + "ARYiaHR0cDovL2Nwcy5yb290LXgxLmxldHNlbmNyeXB0Lm9yZzA8BgNVHR8ENTAz\n" + "MDGgL6AthitodHRwOi8vY3JsLmlkZW50cnVzdC5jb20vRFNUUk9PVENBWDNDUkwu\n" + "Y3JsMB0GA1UdDgQWBBSoSmpjBH3duubRObemRWXv86jsoTANBgkqhkiG9w0BAQsF\n" + "AAOCAQEA3TPXEfNjWDjdGBX7CVW+dla5cEilaUcne8IkCJLxWh9KEik3JHRRHGJo\n" + "uM2VcGfl96S8TihRzZvoroed6ti6WqEBmtzw3Wodatg+VyOeph4EYpr/1wXKtx8/\n" + "wApIvJSwtmVi4MFU5aMqrSDE6ea73Mj2tcMyo5jMd6jmeWUHK8so/joWUoHOUgwu\n" + "X4Po1QYz+3dszkDqMp4fklxBwXRsW10KXzPMTZ+sOPAveyxindmjkW8lGy+QsRlG\n" + "PfZ+G6Z6h7mjem0Y+iWlkYcV4PIWL1iwBi8saCbGS5jN2p8M+X+Q7UNKEkROb3N6\n" + "KOqkqm57TH2H3eDJAkSnh6/DNFu0Qg==\n" + "-----END CERTIFICATE-----"; +int arrivedcount = 0; +Thread thdMQTT; + +MQTTSNetwork mqttNetwork; +MQTT::Client<MQTTSNetwork, Countdown> client = MQTT::Client<MQTTSNetwork, Countdown>(mqttNetwork); + +static MemoryPool<MQTT::Message, 16> pool; +static Queue<MQTT::Message, 16> queue; + +static const char* topic = "mbed-sample"; + +typedef void (*messageHandler)(MQTT::MessageData&); void messageArrived(MQTT::MessageData& md) { MQTT::Message &message = md.message; - logMessage("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id); - logMessage("Payload %.*s\r\n", message.payloadlen, (char*)message.payload); + printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id); + printf("Payload %.*s\r\n", message.payloadlen, (char*)message.payload); ++arrivedcount; } +int mqttsConnect(const char * hostname, uint16_t port, + const char * clientID, + const char * username, + const char * password) +{ + int rc; + + printf("Connecting to %s:%d\r\n", hostname, port); + rc = mqttNetwork.connect(hostname, port); + if (rc != 0) + { + printf("rc from TCP connect is %d\r\n", rc); + return -1; + } + else + printf("RC passed!\r\n"); + + printf("Creating data connection ...\r\n"); + MQTTPacket_connectData data = MQTTPacket_connectData_initializer; + data.MQTTVersion = 3; + data.clientID.cstring = (char *) clientID; + data.username.cstring = (char *) username; + data.password.cstring = (char *) password; + printf("Connecting client ...\r\n"); + if ((rc = client.connect(data)) != 0) + { + printf("rc from MQTT connect is %d\r\n", rc); + return -1; + } + + printf("Subscribing to topic ...\r\n"); + if ((rc = client.subscribe(topic, MQTT::QOS2, messageArrived)) != 0) + { + printf("rc from MQTT subscribe is %d\r\n", rc); + return -1; + } + + return rc; +} + +int mqttsSubscribe(const char * topic, MQTT::QoS os, messageHandler handler) +{ + int rc; + + printf("Subscribing to topic ...\r\n"); + if ((rc = client.subscribe(topic, os, handler)) != 0) + printf("rc from MQTT subscribe is %d\r\n", rc); + + return rc; +} + +// This function is used to pass data from +// the main thread to the MQTT listener thread +void postMQTTUpdate(MQTT::Message msg) +{ + MQTT::Message * message = pool.alloc(); + // Simple copy, I think this is via (memcpy) + *message = msg; + + // Push the data to the consumer thread + printf("Pushing data to MQTTS Listener thread ...\r\n"); + queue.put(message); +} + +void mqttListener(void) +{ + printf("MQTT listener thread started ...\r\n"); + while(true) + { + // Wait for data in the queue, timeout at 10ms + osEvent evt = queue.get(10); + if (evt.status == osEventMessage) { + printf("Message arrived from main thread ...\r\n"); + // Unpack the message + MQTT::Message * message = (MQTT::Message *)evt.value.p; + + printf("Publishing message to MQTT ...\r\n"); + // Push to mqtt + int rc = client.publish(topic, *message); + if (rc < 0) + printf("Error sending mqtt message \r\n"); + else + printf("Message published ...\r\n"); + + // Don't forget this! + pool.free(message); + } + + printf("MQTT client yeild ...\r\n"); + if (client.yield(100) != 0) + { + client.disconnect(); + // TODO: reconnect TLS session. + return; + } + printf("MQTT client yeild successful ...\r\n"); + } +} int main(int argc, char* argv[]) { float version = 0.6; - char* topic = "mbed-sample"; - logMessage("HelloMQTT: version is %.2f\r\n", version); + int i = 0; + + printf("HelloMQTT: version is %.2f\r\n", version); NetworkInterface* network = easy_connect(true); if (!network) { return -1; } - MQTTNetwork mqttNetwork(network); - - MQTT::Client<MQTTNetwork, Countdown> client = MQTT::Client<MQTTNetwork, Countdown>(mqttNetwork); - - const char* hostname = "m2m.eclipse.org"; - int port = 1883; - logMessage("Connecting to %s:%d\r\n", hostname, port); - int rc = mqttNetwork.connect(hostname, port); - if (rc != 0) - logMessage("rc from TCP connect is %d\r\n", rc); - - MQTTPacket_connectData data = MQTTPacket_connectData_initializer; - data.MQTTVersion = 3; - data.clientID.cstring = "mbed-sample"; - data.username.cstring = "testuser"; - data.password.cstring = "testpassword"; - if ((rc = client.connect(data)) != 0) - logMessage("rc from MQTT connect is %d\r\n", rc); - - if ((rc = client.subscribe(topic, MQTT::QOS2, messageArrived)) != 0) - logMessage("rc from MQTT subscribe is %d\r\n", rc); - - MQTT::Message message; + if ( mqttNetwork.setupTLS(network, SSL_CA_PEM) != 0 ) + { + printf("Failed initializing sercure MQTTS...\r\n"); + return -1; + } + + if ( mqttsConnect("mqtt.mbedhacks.com", + 8883,"mbedtest_01","tinong","tatay") != 0 ) + { + printf("Failed connecting to mqtt.mbedhacks.com:8883 \r\n"); + return -1; + } + + if ( mqttsSubscribe(topic, MQTT::QOS2, messageArrived) != 0 ) + { + printf("Failed to subscribe to a topic!\r\n"); + return -1; + } + + // Run an MQTT listener on its own thread. + thdMQTT.start(mqttListener); + + // The main loops just sends messages to MQTT server + while(true) + { + MQTT::Message mqtt_msg; + char buff[100]; + + sprintf(buff, "message test %d", i); + mqtt_msg.qos = MQTT::QOS0; + mqtt_msg.retained = false; + mqtt_msg.dup = false; + mqtt_msg.payload = (void*)buff; + mqtt_msg.payloadlen = strlen(buff)+1; + + // publish the message to mqtt + postMQTTUpdate(mqtt_msg); + i++; + + Thread::wait(2000); + } - // QoS 0 - char buf[100]; - sprintf(buf, "Hello World! QoS 0 message from app version %f\r\n", version); - message.qos = MQTT::QOS0; - message.retained = false; - message.dup = false; - message.payload = (void*)buf; - message.payloadlen = strlen(buf)+1; - rc = client.publish(topic, message); - while (arrivedcount < 1) - client.yield(100); + //mqttNetwork.disconnect(); - // QoS 1 - sprintf(buf, "Hello World! QoS 1 message from app version %f\r\n", version); - message.qos = MQTT::QOS1; - message.payloadlen = strlen(buf)+1; - rc = client.publish(topic, message); - while (arrivedcount < 2) - client.yield(100); - - // QoS 2 - sprintf(buf, "Hello World! QoS 2 message from app version %f\r\n", version); - message.qos = MQTT::QOS2; - message.payloadlen = strlen(buf)+1; - rc = client.publish(topic, message); - while (arrivedcount < 3) - client.yield(100); - - if ((rc = client.unsubscribe(topic)) != 0) - logMessage("rc from unsubscribe was %d\r\n", rc); - - if ((rc = client.disconnect()) != 0) - logMessage("rc from disconnect was %d\r\n", rc); - - mqttNetwork.disconnect(); - - logMessage("Version %.2f: finish %d msgs\r\n", version, arrivedcount); + //printf("Version %.2f: finish %d msgs\r\n", version, arrivedcount); return 0; }