Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Fork of HelloMQTT by
Diff: main.cpp
- Revision:
- 21:4534812bb94f
- Parent:
- 20:49c9daf2b0ff
- Child:
- 22:4d0628d13870
diff -r 49c9daf2b0ff -r 4534812bb94f main.cpp
--- a/main.cpp Tue Jan 10 18:10:17 2017 -0600
+++ b/main.cpp Fri Mar 17 08:42:29 2017 +0000
@@ -26,114 +26,260 @@
*/
// change this to 0 to output messages to serial instead of LCD
-#define USE_LCD 1
-#if USE_LCD
-#include "C12832.h"
-
-// the actual pins are defined in mbed_app.json and can be overridden per target
-C12832 lcd(LCD_MOSI, LCD_SCK, LCD_MISO, LCD_A0, LCD_NCS);
-
-#define logMessage lcd.cls();lcd.printf
-
-#else
-
-#define logMessage printf
-
-#endif
#define MQTTCLIENT_QOS2 1
+#include "mbed.h"
+#include "rtos.h"
#include "easy-connect.h"
-#include "MQTTNetwork.h"
+#include "MQTTSNetwork.h"
#include "MQTTmbed.h"
#include "MQTTClient.h"
-int arrivedcount = 0;
+/* List of trusted root CA certificates
+ * currently only "letsencrypt", the CA for mbedhacks.com
+ *
+ * To add more than one root, just concatenate them.
+ *
+ * TODO: Move this certificate file onto the SD card.
+ */
+static const char SSL_CA_PEM[] = "-----BEGIN CERTIFICATE-----\n"
+ "MIIFETCCA/mgAwIBAgISA2ktlb1Y6ap4GCH7dg3wS37XMA0GCSqGSIb3DQEBCwUA\n"
+ "MEoxCzAJBgNVBAYTAlVTMRYwFAYDVQQKEw1MZXQncyBFbmNyeXB0MSMwIQYDVQQD\n"
+ "ExpMZXQncyBFbmNyeXB0IEF1dGhvcml0eSBYMzAeFw0xNzAzMDkwMTQ4MDBaFw0x\n"
+ "NzA2MDcwMTQ4MDBaMBgxFjAUBgNVBAMTDW1iZWRoYWNrcy5jb20wggEiMA0GCSqG\n"
+ "SIb3DQEBAQUAA4IBDwAwggEKAoIBAQC4ppYHlH8lfB7lkWOjMSnOJGaLtCBfz57I\n"
+ "VVOd1Rngsz7nE5fg3joa7lkazRY1ZqtuC2UloS+4LYoQZX4Z887dhdug/TPA4J1A\n"
+ "GppA4xVCb2kUFODMjZ2r4pMLp+MjFFMBaHrL4cgx/n4aJUB+N9Z+HW0p2Yr5TsOQ\n"
+ "ghIOPkNxFr2q6klm49+BMUbO98hAwFwsIISLf6IbHM93gx1ltqkvb55N87ZM1hYH\n"
+ "fkq+J+YqjleiLaqRN2MVlNMNfy9MDbqM5uCyGiWGtq8eiQLaWpZkxnA2MC5zPsO/\n"
+ "fzEWiVjn2uazlXZ5xZwiK22KMxVasqWMitvETtmPOl9mocRbLQdxAgMBAAGjggIh\n"
+ "MIICHTAOBgNVHQ8BAf8EBAMCBaAwHQYDVR0lBBYwFAYIKwYBBQUHAwEGCCsGAQUF\n"
+ "BwMCMAwGA1UdEwEB/wQCMAAwHQYDVR0OBBYEFCsgG+z1BTjrN3K+/tF0C4k818Yv\n"
+ "MB8GA1UdIwQYMBaAFKhKamMEfd265tE5t6ZFZe/zqOyhMHAGCCsGAQUFBwEBBGQw\n"
+ "YjAvBggrBgEFBQcwAYYjaHR0cDovL29jc3AuaW50LXgzLmxldHNlbmNyeXB0Lm9y\n"
+ "Zy8wLwYIKwYBBQUHMAKGI2h0dHA6Ly9jZXJ0LmludC14My5sZXRzZW5jcnlwdC5v\n"
+ "cmcvMCsGA1UdEQQkMCKCDW1iZWRoYWNrcy5jb22CEXd3dy5tYmVkaGFja3MuY29t\n"
+ "MIH+BgNVHSAEgfYwgfMwCAYGZ4EMAQIBMIHmBgsrBgEEAYLfEwEBATCB1jAmBggr\n"
+ "BgEFBQcCARYaaHR0cDovL2Nwcy5sZXRzZW5jcnlwdC5vcmcwgasGCCsGAQUFBwIC\n"
+ "MIGeDIGbVGhpcyBDZXJ0aWZpY2F0ZSBtYXkgb25seSBiZSByZWxpZWQgdXBvbiBi\n"
+ "eSBSZWx5aW5nIFBhcnRpZXMgYW5kIG9ubHkgaW4gYWNjb3JkYW5jZSB3aXRoIHRo\n"
+ "ZSBDZXJ0aWZpY2F0ZSBQb2xpY3kgZm91bmQgYXQgaHR0cHM6Ly9sZXRzZW5jcnlw\n"
+ "dC5vcmcvcmVwb3NpdG9yeS8wDQYJKoZIhvcNAQELBQADggEBABFH6YcvHh8foHeg\n"
+ "NM7iR9HnYRqa5gSERcCtq6jm8PcTsAbsdQ/BNpIHK7AZSg2kk17kj+JFeyMuNJWq\n"
+ "lmabV0dtzdC8ejp1d7hGb/HjuQ400th/QRayvyrDVzQPfCNyJ0C82Q2DFjeUgnqv\n"
+ "oJMcV6i4ICW0boI7GUf7oeHCmrUEHKffAbeFvx3c85c39IHJEFa59UWj1linU/Tr\n"
+ "g9i5AaSKB95d706u1XRA7WLV/Hu7yunhxEjlj33bfdifBb/ZLBd0LtrXPwtXi6E8\n"
+ "r6obp+B+Ce89G7WEhdT9BX0ck1KTK+yP7uAC7tvvsiejxXOoCtVyBAumBJS7mRuv\n"
+ "I5hmKgE=\n"
+ "-----END CERTIFICATE-----\n"
+ "-----BEGIN CERTIFICATE-----\n"
+ "MIIEkjCCA3qgAwIBAgIQCgFBQgAAAVOFc2oLheynCDANBgkqhkiG9w0BAQsFADA/\n"
+ "MSQwIgYDVQQKExtEaWdpdGFsIFNpZ25hdHVyZSBUcnVzdCBDby4xFzAVBgNVBAMT\n"
+ "DkRTVCBSb290IENBIFgzMB4XDTE2MDMxNzE2NDA0NloXDTIxMDMxNzE2NDA0Nlow\n"
+ "SjELMAkGA1UEBhMCVVMxFjAUBgNVBAoTDUxldCdzIEVuY3J5cHQxIzAhBgNVBAMT\n"
+ "GkxldCdzIEVuY3J5cHQgQXV0aG9yaXR5IFgzMIIBIjANBgkqhkiG9w0BAQEFAAOC\n"
+ "AQ8AMIIBCgKCAQEAnNMM8FrlLke3cl03g7NoYzDq1zUmGSXhvb418XCSL7e4S0EF\n"
+ "q6meNQhY7LEqxGiHC6PjdeTm86dicbp5gWAf15Gan/PQeGdxyGkOlZHP/uaZ6WA8\n"
+ "SMx+yk13EiSdRxta67nsHjcAHJyse6cF6s5K671B5TaYucv9bTyWaN8jKkKQDIZ0\n"
+ "Z8h/pZq4UmEUEz9l6YKHy9v6Dlb2honzhT+Xhq+w3Brvaw2VFn3EK6BlspkENnWA\n"
+ "a6xK8xuQSXgvopZPKiAlKQTGdMDQMc2PMTiVFrqoM7hD8bEfwzB/onkxEz0tNvjj\n"
+ "/PIzark5McWvxI0NHWQWM6r6hCm21AvA2H3DkwIDAQABo4IBfTCCAXkwEgYDVR0T\n"
+ "AQH/BAgwBgEB/wIBADAOBgNVHQ8BAf8EBAMCAYYwfwYIKwYBBQUHAQEEczBxMDIG\n"
+ "CCsGAQUFBzABhiZodHRwOi8vaXNyZy50cnVzdGlkLm9jc3AuaWRlbnRydXN0LmNv\n"
+ "bTA7BggrBgEFBQcwAoYvaHR0cDovL2FwcHMuaWRlbnRydXN0LmNvbS9yb290cy9k\n"
+ "c3Ryb290Y2F4My5wN2MwHwYDVR0jBBgwFoAUxKexpHsscfrb4UuQdf/EFWCFiRAw\n"
+ "VAYDVR0gBE0wSzAIBgZngQwBAgEwPwYLKwYBBAGC3xMBAQEwMDAuBggrBgEFBQcC\n"
+ "ARYiaHR0cDovL2Nwcy5yb290LXgxLmxldHNlbmNyeXB0Lm9yZzA8BgNVHR8ENTAz\n"
+ "MDGgL6AthitodHRwOi8vY3JsLmlkZW50cnVzdC5jb20vRFNUUk9PVENBWDNDUkwu\n"
+ "Y3JsMB0GA1UdDgQWBBSoSmpjBH3duubRObemRWXv86jsoTANBgkqhkiG9w0BAQsF\n"
+ "AAOCAQEA3TPXEfNjWDjdGBX7CVW+dla5cEilaUcne8IkCJLxWh9KEik3JHRRHGJo\n"
+ "uM2VcGfl96S8TihRzZvoroed6ti6WqEBmtzw3Wodatg+VyOeph4EYpr/1wXKtx8/\n"
+ "wApIvJSwtmVi4MFU5aMqrSDE6ea73Mj2tcMyo5jMd6jmeWUHK8so/joWUoHOUgwu\n"
+ "X4Po1QYz+3dszkDqMp4fklxBwXRsW10KXzPMTZ+sOPAveyxindmjkW8lGy+QsRlG\n"
+ "PfZ+G6Z6h7mjem0Y+iWlkYcV4PIWL1iwBi8saCbGS5jN2p8M+X+Q7UNKEkROb3N6\n"
+ "KOqkqm57TH2H3eDJAkSnh6/DNFu0Qg==\n"
+ "-----END CERTIFICATE-----";
+int arrivedcount = 0;
+Thread thdMQTT;
+
+MQTTSNetwork mqttNetwork;
+MQTT::Client<MQTTSNetwork, Countdown> client = MQTT::Client<MQTTSNetwork, Countdown>(mqttNetwork);
+
+static MemoryPool<MQTT::Message, 16> pool;
+static Queue<MQTT::Message, 16> queue;
+
+static const char* topic = "mbed-sample";
+
+typedef void (*messageHandler)(MQTT::MessageData&);
void messageArrived(MQTT::MessageData& md)
{
MQTT::Message &message = md.message;
- logMessage("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
- logMessage("Payload %.*s\r\n", message.payloadlen, (char*)message.payload);
+ printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
+ printf("Payload %.*s\r\n", message.payloadlen, (char*)message.payload);
++arrivedcount;
}
+int mqttsConnect(const char * hostname, uint16_t port,
+ const char * clientID,
+ const char * username,
+ const char * password)
+{
+ int rc;
+
+ printf("Connecting to %s:%d\r\n", hostname, port);
+ rc = mqttNetwork.connect(hostname, port);
+ if (rc != 0)
+ {
+ printf("rc from TCP connect is %d\r\n", rc);
+ return -1;
+ }
+ else
+ printf("RC passed!\r\n");
+
+ printf("Creating data connection ...\r\n");
+ MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
+ data.MQTTVersion = 3;
+ data.clientID.cstring = (char *) clientID;
+ data.username.cstring = (char *) username;
+ data.password.cstring = (char *) password;
+ printf("Connecting client ...\r\n");
+ if ((rc = client.connect(data)) != 0)
+ {
+ printf("rc from MQTT connect is %d\r\n", rc);
+ return -1;
+ }
+
+ printf("Subscribing to topic ...\r\n");
+ if ((rc = client.subscribe(topic, MQTT::QOS2, messageArrived)) != 0)
+ {
+ printf("rc from MQTT subscribe is %d\r\n", rc);
+ return -1;
+ }
+
+ return rc;
+}
+
+int mqttsSubscribe(const char * topic, MQTT::QoS os, messageHandler handler)
+{
+ int rc;
+
+ printf("Subscribing to topic ...\r\n");
+ if ((rc = client.subscribe(topic, os, handler)) != 0)
+ printf("rc from MQTT subscribe is %d\r\n", rc);
+
+ return rc;
+}
+
+// This function is used to pass data from
+// the main thread to the MQTT listener thread
+void postMQTTUpdate(MQTT::Message msg)
+{
+ MQTT::Message * message = pool.alloc();
+ // Simple copy, I think this is via (memcpy)
+ *message = msg;
+
+ // Push the data to the consumer thread
+ printf("Pushing data to MQTTS Listener thread ...\r\n");
+ queue.put(message);
+}
+
+void mqttListener(void)
+{
+ printf("MQTT listener thread started ...\r\n");
+ while(true)
+ {
+ // Wait for data in the queue, timeout at 10ms
+ osEvent evt = queue.get(10);
+ if (evt.status == osEventMessage) {
+ printf("Message arrived from main thread ...\r\n");
+ // Unpack the message
+ MQTT::Message * message = (MQTT::Message *)evt.value.p;
+
+ printf("Publishing message to MQTT ...\r\n");
+ // Push to mqtt
+ int rc = client.publish(topic, *message);
+ if (rc < 0)
+ printf("Error sending mqtt message \r\n");
+ else
+ printf("Message published ...\r\n");
+
+ // Don't forget this!
+ pool.free(message);
+ }
+
+ printf("MQTT client yeild ...\r\n");
+ if (client.yield(100) != 0)
+ {
+ client.disconnect();
+ // TODO: reconnect TLS session.
+ return;
+ }
+ printf("MQTT client yeild successful ...\r\n");
+ }
+}
int main(int argc, char* argv[])
{
float version = 0.6;
- char* topic = "mbed-sample";
- logMessage("HelloMQTT: version is %.2f\r\n", version);
+ int i = 0;
+
+ printf("HelloMQTT: version is %.2f\r\n", version);
NetworkInterface* network = easy_connect(true);
if (!network) {
return -1;
}
- MQTTNetwork mqttNetwork(network);
-
- MQTT::Client<MQTTNetwork, Countdown> client = MQTT::Client<MQTTNetwork, Countdown>(mqttNetwork);
-
- const char* hostname = "m2m.eclipse.org";
- int port = 1883;
- logMessage("Connecting to %s:%d\r\n", hostname, port);
- int rc = mqttNetwork.connect(hostname, port);
- if (rc != 0)
- logMessage("rc from TCP connect is %d\r\n", rc);
-
- MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
- data.MQTTVersion = 3;
- data.clientID.cstring = "mbed-sample";
- data.username.cstring = "testuser";
- data.password.cstring = "testpassword";
- if ((rc = client.connect(data)) != 0)
- logMessage("rc from MQTT connect is %d\r\n", rc);
-
- if ((rc = client.subscribe(topic, MQTT::QOS2, messageArrived)) != 0)
- logMessage("rc from MQTT subscribe is %d\r\n", rc);
-
- MQTT::Message message;
+ if ( mqttNetwork.setupTLS(network, SSL_CA_PEM) != 0 )
+ {
+ printf("Failed initializing sercure MQTTS...\r\n");
+ return -1;
+ }
+
+ if ( mqttsConnect("mqtt.mbedhacks.com",
+ 8883,"mbedtest_01","tinong","tatay") != 0 )
+ {
+ printf("Failed connecting to mqtt.mbedhacks.com:8883 \r\n");
+ return -1;
+ }
+
+ if ( mqttsSubscribe(topic, MQTT::QOS2, messageArrived) != 0 )
+ {
+ printf("Failed to subscribe to a topic!\r\n");
+ return -1;
+ }
+
+ // Run an MQTT listener on its own thread.
+ thdMQTT.start(mqttListener);
+
+ // The main loops just sends messages to MQTT server
+ while(true)
+ {
+ MQTT::Message mqtt_msg;
+ char buff[100];
+
+ sprintf(buff, "message test %d", i);
+ mqtt_msg.qos = MQTT::QOS0;
+ mqtt_msg.retained = false;
+ mqtt_msg.dup = false;
+ mqtt_msg.payload = (void*)buff;
+ mqtt_msg.payloadlen = strlen(buff)+1;
+
+ // publish the message to mqtt
+ postMQTTUpdate(mqtt_msg);
+ i++;
+
+ Thread::wait(2000);
+ }
- // QoS 0
- char buf[100];
- sprintf(buf, "Hello World! QoS 0 message from app version %f\r\n", version);
- message.qos = MQTT::QOS0;
- message.retained = false;
- message.dup = false;
- message.payload = (void*)buf;
- message.payloadlen = strlen(buf)+1;
- rc = client.publish(topic, message);
- while (arrivedcount < 1)
- client.yield(100);
+ //mqttNetwork.disconnect();
- // QoS 1
- sprintf(buf, "Hello World! QoS 1 message from app version %f\r\n", version);
- message.qos = MQTT::QOS1;
- message.payloadlen = strlen(buf)+1;
- rc = client.publish(topic, message);
- while (arrivedcount < 2)
- client.yield(100);
-
- // QoS 2
- sprintf(buf, "Hello World! QoS 2 message from app version %f\r\n", version);
- message.qos = MQTT::QOS2;
- message.payloadlen = strlen(buf)+1;
- rc = client.publish(topic, message);
- while (arrivedcount < 3)
- client.yield(100);
-
- if ((rc = client.unsubscribe(topic)) != 0)
- logMessage("rc from unsubscribe was %d\r\n", rc);
-
- if ((rc = client.disconnect()) != 0)
- logMessage("rc from disconnect was %d\r\n", rc);
-
- mqttNetwork.disconnect();
-
- logMessage("Version %.2f: finish %d msgs\r\n", version, arrivedcount);
+ //printf("Version %.2f: finish %d msgs\r\n", version, arrivedcount);
return 0;
}
