Fork of Hello MQTT, using mbed TLS for secure mqtt transport
Fork of HelloMQTT by
main.cpp
- Committer:
- vpcola
- Date:
- 2017-03-18
- Revision:
- 22:4d0628d13870
- Parent:
- 21:4534812bb94f
File content as of revision 22:4d0628d13870:
/******************************************************************************* * Copyright (c) 2014, 2015 IBM Corp. * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * and Eclipse Distribution License v1.0 which accompany this distribution. * * The Eclipse Public License is available at * http://www.eclipse.org/legal/epl-v10.html * and the Eclipse Distribution License is available at * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Ian Craggs - initial API and implementation and/or initial documentation * Ian Craggs - make sure QoS2 processing works, and add device headers *******************************************************************************/ /** This is a sample program to illustrate the use of the MQTT Client library on the mbed platform. The Client class requires two classes which mediate access to system interfaces for networking and timing. As long as these two classes provide the required public programming interfaces, it does not matter what facilities they use underneath. In this program, they use the mbed system libraries. */ // change this to 0 to output messages to serial instead of LCD #define MQTTCLIENT_QOS2 1 #include "mbed.h" #include "rtos.h" #include "easy-connect.h" #include "MQTTSNetwork.h" #include "MQTTmbed.h" #include "MQTTClient.h" /* List of trusted root CA certificates * currently only "letsencrypt", the CA for mbedhacks.com * * To add more than one root, just concatenate them. * * TODO: Move this certificate file onto the SD card. */ static const char SSL_CA_PEM[] = "-----BEGIN CERTIFICATE-----\n" "MIIFETCCA/mgAwIBAgISA2ktlb1Y6ap4GCH7dg3wS37XMA0GCSqGSIb3DQEBCwUA\n" "MEoxCzAJBgNVBAYTAlVTMRYwFAYDVQQKEw1MZXQncyBFbmNyeXB0MSMwIQYDVQQD\n" "ExpMZXQncyBFbmNyeXB0IEF1dGhvcml0eSBYMzAeFw0xNzAzMDkwMTQ4MDBaFw0x\n" "NzA2MDcwMTQ4MDBaMBgxFjAUBgNVBAMTDW1iZWRoYWNrcy5jb20wggEiMA0GCSqG\n" "SIb3DQEBAQUAA4IBDwAwggEKAoIBAQC4ppYHlH8lfB7lkWOjMSnOJGaLtCBfz57I\n" "VVOd1Rngsz7nE5fg3joa7lkazRY1ZqtuC2UloS+4LYoQZX4Z887dhdug/TPA4J1A\n" "GppA4xVCb2kUFODMjZ2r4pMLp+MjFFMBaHrL4cgx/n4aJUB+N9Z+HW0p2Yr5TsOQ\n" "ghIOPkNxFr2q6klm49+BMUbO98hAwFwsIISLf6IbHM93gx1ltqkvb55N87ZM1hYH\n" "fkq+J+YqjleiLaqRN2MVlNMNfy9MDbqM5uCyGiWGtq8eiQLaWpZkxnA2MC5zPsO/\n" "fzEWiVjn2uazlXZ5xZwiK22KMxVasqWMitvETtmPOl9mocRbLQdxAgMBAAGjggIh\n" "MIICHTAOBgNVHQ8BAf8EBAMCBaAwHQYDVR0lBBYwFAYIKwYBBQUHAwEGCCsGAQUF\n" "BwMCMAwGA1UdEwEB/wQCMAAwHQYDVR0OBBYEFCsgG+z1BTjrN3K+/tF0C4k818Yv\n" "MB8GA1UdIwQYMBaAFKhKamMEfd265tE5t6ZFZe/zqOyhMHAGCCsGAQUFBwEBBGQw\n" "YjAvBggrBgEFBQcwAYYjaHR0cDovL29jc3AuaW50LXgzLmxldHNlbmNyeXB0Lm9y\n" "Zy8wLwYIKwYBBQUHMAKGI2h0dHA6Ly9jZXJ0LmludC14My5sZXRzZW5jcnlwdC5v\n" "cmcvMCsGA1UdEQQkMCKCDW1iZWRoYWNrcy5jb22CEXd3dy5tYmVkaGFja3MuY29t\n" "MIH+BgNVHSAEgfYwgfMwCAYGZ4EMAQIBMIHmBgsrBgEEAYLfEwEBATCB1jAmBggr\n" "BgEFBQcCARYaaHR0cDovL2Nwcy5sZXRzZW5jcnlwdC5vcmcwgasGCCsGAQUFBwIC\n" "MIGeDIGbVGhpcyBDZXJ0aWZpY2F0ZSBtYXkgb25seSBiZSByZWxpZWQgdXBvbiBi\n" "eSBSZWx5aW5nIFBhcnRpZXMgYW5kIG9ubHkgaW4gYWNjb3JkYW5jZSB3aXRoIHRo\n" "ZSBDZXJ0aWZpY2F0ZSBQb2xpY3kgZm91bmQgYXQgaHR0cHM6Ly9sZXRzZW5jcnlw\n" "dC5vcmcvcmVwb3NpdG9yeS8wDQYJKoZIhvcNAQELBQADggEBABFH6YcvHh8foHeg\n" "NM7iR9HnYRqa5gSERcCtq6jm8PcTsAbsdQ/BNpIHK7AZSg2kk17kj+JFeyMuNJWq\n" "lmabV0dtzdC8ejp1d7hGb/HjuQ400th/QRayvyrDVzQPfCNyJ0C82Q2DFjeUgnqv\n" "oJMcV6i4ICW0boI7GUf7oeHCmrUEHKffAbeFvx3c85c39IHJEFa59UWj1linU/Tr\n" "g9i5AaSKB95d706u1XRA7WLV/Hu7yunhxEjlj33bfdifBb/ZLBd0LtrXPwtXi6E8\n" "r6obp+B+Ce89G7WEhdT9BX0ck1KTK+yP7uAC7tvvsiejxXOoCtVyBAumBJS7mRuv\n" "I5hmKgE=\n" "-----END CERTIFICATE-----\n" "-----BEGIN CERTIFICATE-----\n" "MIIEkjCCA3qgAwIBAgIQCgFBQgAAAVOFc2oLheynCDANBgkqhkiG9w0BAQsFADA/\n" "MSQwIgYDVQQKExtEaWdpdGFsIFNpZ25hdHVyZSBUcnVzdCBDby4xFzAVBgNVBAMT\n" "DkRTVCBSb290IENBIFgzMB4XDTE2MDMxNzE2NDA0NloXDTIxMDMxNzE2NDA0Nlow\n" "SjELMAkGA1UEBhMCVVMxFjAUBgNVBAoTDUxldCdzIEVuY3J5cHQxIzAhBgNVBAMT\n" "GkxldCdzIEVuY3J5cHQgQXV0aG9yaXR5IFgzMIIBIjANBgkqhkiG9w0BAQEFAAOC\n" "AQ8AMIIBCgKCAQEAnNMM8FrlLke3cl03g7NoYzDq1zUmGSXhvb418XCSL7e4S0EF\n" "q6meNQhY7LEqxGiHC6PjdeTm86dicbp5gWAf15Gan/PQeGdxyGkOlZHP/uaZ6WA8\n" "SMx+yk13EiSdRxta67nsHjcAHJyse6cF6s5K671B5TaYucv9bTyWaN8jKkKQDIZ0\n" "Z8h/pZq4UmEUEz9l6YKHy9v6Dlb2honzhT+Xhq+w3Brvaw2VFn3EK6BlspkENnWA\n" "a6xK8xuQSXgvopZPKiAlKQTGdMDQMc2PMTiVFrqoM7hD8bEfwzB/onkxEz0tNvjj\n" "/PIzark5McWvxI0NHWQWM6r6hCm21AvA2H3DkwIDAQABo4IBfTCCAXkwEgYDVR0T\n" "AQH/BAgwBgEB/wIBADAOBgNVHQ8BAf8EBAMCAYYwfwYIKwYBBQUHAQEEczBxMDIG\n" "CCsGAQUFBzABhiZodHRwOi8vaXNyZy50cnVzdGlkLm9jc3AuaWRlbnRydXN0LmNv\n" "bTA7BggrBgEFBQcwAoYvaHR0cDovL2FwcHMuaWRlbnRydXN0LmNvbS9yb290cy9k\n" "c3Ryb290Y2F4My5wN2MwHwYDVR0jBBgwFoAUxKexpHsscfrb4UuQdf/EFWCFiRAw\n" "VAYDVR0gBE0wSzAIBgZngQwBAgEwPwYLKwYBBAGC3xMBAQEwMDAuBggrBgEFBQcC\n" "ARYiaHR0cDovL2Nwcy5yb290LXgxLmxldHNlbmNyeXB0Lm9yZzA8BgNVHR8ENTAz\n" "MDGgL6AthitodHRwOi8vY3JsLmlkZW50cnVzdC5jb20vRFNUUk9PVENBWDNDUkwu\n" "Y3JsMB0GA1UdDgQWBBSoSmpjBH3duubRObemRWXv86jsoTANBgkqhkiG9w0BAQsF\n" "AAOCAQEA3TPXEfNjWDjdGBX7CVW+dla5cEilaUcne8IkCJLxWh9KEik3JHRRHGJo\n" "uM2VcGfl96S8TihRzZvoroed6ti6WqEBmtzw3Wodatg+VyOeph4EYpr/1wXKtx8/\n" "wApIvJSwtmVi4MFU5aMqrSDE6ea73Mj2tcMyo5jMd6jmeWUHK8so/joWUoHOUgwu\n" "X4Po1QYz+3dszkDqMp4fklxBwXRsW10KXzPMTZ+sOPAveyxindmjkW8lGy+QsRlG\n" "PfZ+G6Z6h7mjem0Y+iWlkYcV4PIWL1iwBi8saCbGS5jN2p8M+X+Q7UNKEkROb3N6\n" "KOqkqm57TH2H3eDJAkSnh6/DNFu0Qg==\n" "-----END CERTIFICATE-----"; Serial pc(USBTX, USBRX, 115200); int arrivedcount = 0; Thread thdMQTT; MQTTSNetwork mqttNetwork; MQTT::Client<MQTTSNetwork, Countdown> client = MQTT::Client<MQTTSNetwork, Countdown>(mqttNetwork); static MemoryPool<MQTT::Message, 16> pool; static Queue<MQTT::Message, 16> queue; static const char* topic = "mbed-sample"; typedef void (*messageHandler)(MQTT::MessageData&); void messageArrived(MQTT::MessageData& md) { MQTT::Message &message = md.message; pc.printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id); pc.printf("Payload %.*s\r\n", message.payloadlen, (char*)message.payload); ++arrivedcount; } int mqttsConnect(const char * hostname, uint16_t port, const char * clientID, const char * username, const char * password) { int rc; pc.printf("Connecting to %s:%d\r\n", hostname, port); rc = mqttNetwork.connect(hostname, port); if (rc != 0) { pc.printf("rc from TCP connect is %d\r\n", rc); return -1; } else pc.printf("RC passed!\r\n"); pc.printf("Creating data connection ...\r\n"); MQTTPacket_connectData data = MQTTPacket_connectData_initializer; data.MQTTVersion = 3; data.clientID.cstring = (char *) clientID; data.username.cstring = (char *) username; data.password.cstring = (char *) password; pc.printf("Connecting client ...\r\n"); if ((rc = client.connect(data)) != 0) { printf("rc from MQTT connect is %d\r\n", rc); return -1; } pc.printf("Subscribing to topic ...\r\n"); if ((rc = client.subscribe(topic, MQTT::QOS2, messageArrived)) != 0) { pc.printf("rc from MQTT subscribe is %d\r\n", rc); return -1; } return rc; } int mqttsSubscribe(const char * topic, MQTT::QoS os, messageHandler handler) { int rc; pc.printf("Subscribing to topic ...\r\n"); if ((rc = client.subscribe(topic, os, handler)) != 0) pc.printf("rc from MQTT subscribe is %d\r\n", rc); return rc; } // This function is used to pass data from // the main thread to the MQTT listener thread void postMQTTUpdate(MQTT::Message msg) { MQTT::Message * message = pool.alloc(); // Simple copy, I think this is via (memcpy) *message = msg; // Push the data to the consumer thread pc.printf("Pushing data to MQTTS Listener thread ...\r\n"); queue.put(message); } void mqttListener(void) { pc.printf("MQTT listener thread started ...\r\n"); while(true) { // Wait for data in the queue, timeout at 10ms osEvent evt = queue.get(10); if (evt.status == osEventMessage) { pc.printf("Message arrived from main thread ...\r\n"); // Unpack the message MQTT::Message * message = (MQTT::Message *)evt.value.p; pc.printf("Publishing message to MQTT ...\r\n"); // Push to mqtt int rc = client.publish(topic, *message); if (rc < 0) pc.printf("Error sending mqtt message \r\n"); else pc.printf("Message published ...\r\n"); // Don't forget this! pool.free(message); } pc.printf("MQTT client yeild ...\r\n"); if (client.yield(100) != 0) { client.disconnect(); // TODO: reconnect TLS session. return; } pc.printf("MQTT client yeild successful ...\r\n"); } } int main(int argc, char* argv[]) { float version = 0.6; int i = 0; pc.printf("HelloMQTT: version is %.2f\r\n", version); NetworkInterface* network = easy_connect(true); if (!network) { return -1; } if ( mqttNetwork.setupTLS(network, SSL_CA_PEM) != 0 ) { pc.printf("Failed initializing sercure MQTTS...\r\n"); return -1; } if ( mqttsConnect("mqtt.mbedhacks.com", 8883,"mbedtest_01","tinong","tatay") != 0 ) { pc.printf("Failed connecting to mqtt.mbedhacks.com:8883 \r\n"); return -1; } if ( mqttsSubscribe(topic, MQTT::QOS2, messageArrived) != 0 ) { pc.printf("Failed to subscribe to a topic!\r\n"); return -1; } // Run an MQTT listener on its own thread. thdMQTT.start(mqttListener); // The main loops just sends messages to MQTT server while(true) { MQTT::Message mqtt_msg; char buff[100]; sprintf(buff, "message test %d", i); mqtt_msg.qos = MQTT::QOS0; mqtt_msg.retained = false; mqtt_msg.dup = false; mqtt_msg.payload = (void*)buff; mqtt_msg.payloadlen = strlen(buff)+1; // publish the message to mqtt postMQTTUpdate(mqtt_msg); i++; Thread::wait(2000); } //mqttNetwork.disconnect(); //printf("Version %.2f: finish %d msgs\r\n", version, arrivedcount); //return 0; }