Fork of Hello MQTT, using mbed TLS for secure mqtt transport
Fork of HelloMQTT by
main.cpp
00001 /******************************************************************************* 00002 * Copyright (c) 2014, 2015 IBM Corp. 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Ian Craggs - initial API and implementation and/or initial documentation 00015 * Ian Craggs - make sure QoS2 processing works, and add device headers 00016 *******************************************************************************/ 00017 00018 /** 00019 This is a sample program to illustrate the use of the MQTT Client library 00020 on the mbed platform. The Client class requires two classes which mediate 00021 access to system interfaces for networking and timing. As long as these two 00022 classes provide the required public programming interfaces, it does not matter 00023 what facilities they use underneath. In this program, they use the mbed 00024 system libraries. 00025 00026 */ 00027 00028 // change this to 0 to output messages to serial instead of LCD 00029 00030 00031 #define MQTTCLIENT_QOS2 1 00032 00033 #include "mbed.h" 00034 #include "rtos.h" 00035 #include "easy-connect.h" 00036 #include "MQTTSNetwork.h" 00037 #include "MQTTmbed.h" 00038 #include "MQTTClient.h" 00039 00040 00041 /* List of trusted root CA certificates 00042 * currently only "letsencrypt", the CA for mbedhacks.com 00043 * 00044 * To add more than one root, just concatenate them. 00045 * 00046 * TODO: Move this certificate file onto the SD card. 00047 */ 00048 static const char SSL_CA_PEM[] = "-----BEGIN CERTIFICATE-----\n" 00049 "MIIFETCCA/mgAwIBAgISA2ktlb1Y6ap4GCH7dg3wS37XMA0GCSqGSIb3DQEBCwUA\n" 00050 "MEoxCzAJBgNVBAYTAlVTMRYwFAYDVQQKEw1MZXQncyBFbmNyeXB0MSMwIQYDVQQD\n" 00051 "ExpMZXQncyBFbmNyeXB0IEF1dGhvcml0eSBYMzAeFw0xNzAzMDkwMTQ4MDBaFw0x\n" 00052 "NzA2MDcwMTQ4MDBaMBgxFjAUBgNVBAMTDW1iZWRoYWNrcy5jb20wggEiMA0GCSqG\n" 00053 "SIb3DQEBAQUAA4IBDwAwggEKAoIBAQC4ppYHlH8lfB7lkWOjMSnOJGaLtCBfz57I\n" 00054 "VVOd1Rngsz7nE5fg3joa7lkazRY1ZqtuC2UloS+4LYoQZX4Z887dhdug/TPA4J1A\n" 00055 "GppA4xVCb2kUFODMjZ2r4pMLp+MjFFMBaHrL4cgx/n4aJUB+N9Z+HW0p2Yr5TsOQ\n" 00056 "ghIOPkNxFr2q6klm49+BMUbO98hAwFwsIISLf6IbHM93gx1ltqkvb55N87ZM1hYH\n" 00057 "fkq+J+YqjleiLaqRN2MVlNMNfy9MDbqM5uCyGiWGtq8eiQLaWpZkxnA2MC5zPsO/\n" 00058 "fzEWiVjn2uazlXZ5xZwiK22KMxVasqWMitvETtmPOl9mocRbLQdxAgMBAAGjggIh\n" 00059 "MIICHTAOBgNVHQ8BAf8EBAMCBaAwHQYDVR0lBBYwFAYIKwYBBQUHAwEGCCsGAQUF\n" 00060 "BwMCMAwGA1UdEwEB/wQCMAAwHQYDVR0OBBYEFCsgG+z1BTjrN3K+/tF0C4k818Yv\n" 00061 "MB8GA1UdIwQYMBaAFKhKamMEfd265tE5t6ZFZe/zqOyhMHAGCCsGAQUFBwEBBGQw\n" 00062 "YjAvBggrBgEFBQcwAYYjaHR0cDovL29jc3AuaW50LXgzLmxldHNlbmNyeXB0Lm9y\n" 00063 "Zy8wLwYIKwYBBQUHMAKGI2h0dHA6Ly9jZXJ0LmludC14My5sZXRzZW5jcnlwdC5v\n" 00064 "cmcvMCsGA1UdEQQkMCKCDW1iZWRoYWNrcy5jb22CEXd3dy5tYmVkaGFja3MuY29t\n" 00065 "MIH+BgNVHSAEgfYwgfMwCAYGZ4EMAQIBMIHmBgsrBgEEAYLfEwEBATCB1jAmBggr\n" 00066 "BgEFBQcCARYaaHR0cDovL2Nwcy5sZXRzZW5jcnlwdC5vcmcwgasGCCsGAQUFBwIC\n" 00067 "MIGeDIGbVGhpcyBDZXJ0aWZpY2F0ZSBtYXkgb25seSBiZSByZWxpZWQgdXBvbiBi\n" 00068 "eSBSZWx5aW5nIFBhcnRpZXMgYW5kIG9ubHkgaW4gYWNjb3JkYW5jZSB3aXRoIHRo\n" 00069 "ZSBDZXJ0aWZpY2F0ZSBQb2xpY3kgZm91bmQgYXQgaHR0cHM6Ly9sZXRzZW5jcnlw\n" 00070 "dC5vcmcvcmVwb3NpdG9yeS8wDQYJKoZIhvcNAQELBQADggEBABFH6YcvHh8foHeg\n" 00071 "NM7iR9HnYRqa5gSERcCtq6jm8PcTsAbsdQ/BNpIHK7AZSg2kk17kj+JFeyMuNJWq\n" 00072 "lmabV0dtzdC8ejp1d7hGb/HjuQ400th/QRayvyrDVzQPfCNyJ0C82Q2DFjeUgnqv\n" 00073 "oJMcV6i4ICW0boI7GUf7oeHCmrUEHKffAbeFvx3c85c39IHJEFa59UWj1linU/Tr\n" 00074 "g9i5AaSKB95d706u1XRA7WLV/Hu7yunhxEjlj33bfdifBb/ZLBd0LtrXPwtXi6E8\n" 00075 "r6obp+B+Ce89G7WEhdT9BX0ck1KTK+yP7uAC7tvvsiejxXOoCtVyBAumBJS7mRuv\n" 00076 "I5hmKgE=\n" 00077 "-----END CERTIFICATE-----\n" 00078 "-----BEGIN CERTIFICATE-----\n" 00079 "MIIEkjCCA3qgAwIBAgIQCgFBQgAAAVOFc2oLheynCDANBgkqhkiG9w0BAQsFADA/\n" 00080 "MSQwIgYDVQQKExtEaWdpdGFsIFNpZ25hdHVyZSBUcnVzdCBDby4xFzAVBgNVBAMT\n" 00081 "DkRTVCBSb290IENBIFgzMB4XDTE2MDMxNzE2NDA0NloXDTIxMDMxNzE2NDA0Nlow\n" 00082 "SjELMAkGA1UEBhMCVVMxFjAUBgNVBAoTDUxldCdzIEVuY3J5cHQxIzAhBgNVBAMT\n" 00083 "GkxldCdzIEVuY3J5cHQgQXV0aG9yaXR5IFgzMIIBIjANBgkqhkiG9w0BAQEFAAOC\n" 00084 "AQ8AMIIBCgKCAQEAnNMM8FrlLke3cl03g7NoYzDq1zUmGSXhvb418XCSL7e4S0EF\n" 00085 "q6meNQhY7LEqxGiHC6PjdeTm86dicbp5gWAf15Gan/PQeGdxyGkOlZHP/uaZ6WA8\n" 00086 "SMx+yk13EiSdRxta67nsHjcAHJyse6cF6s5K671B5TaYucv9bTyWaN8jKkKQDIZ0\n" 00087 "Z8h/pZq4UmEUEz9l6YKHy9v6Dlb2honzhT+Xhq+w3Brvaw2VFn3EK6BlspkENnWA\n" 00088 "a6xK8xuQSXgvopZPKiAlKQTGdMDQMc2PMTiVFrqoM7hD8bEfwzB/onkxEz0tNvjj\n" 00089 "/PIzark5McWvxI0NHWQWM6r6hCm21AvA2H3DkwIDAQABo4IBfTCCAXkwEgYDVR0T\n" 00090 "AQH/BAgwBgEB/wIBADAOBgNVHQ8BAf8EBAMCAYYwfwYIKwYBBQUHAQEEczBxMDIG\n" 00091 "CCsGAQUFBzABhiZodHRwOi8vaXNyZy50cnVzdGlkLm9jc3AuaWRlbnRydXN0LmNv\n" 00092 "bTA7BggrBgEFBQcwAoYvaHR0cDovL2FwcHMuaWRlbnRydXN0LmNvbS9yb290cy9k\n" 00093 "c3Ryb290Y2F4My5wN2MwHwYDVR0jBBgwFoAUxKexpHsscfrb4UuQdf/EFWCFiRAw\n" 00094 "VAYDVR0gBE0wSzAIBgZngQwBAgEwPwYLKwYBBAGC3xMBAQEwMDAuBggrBgEFBQcC\n" 00095 "ARYiaHR0cDovL2Nwcy5yb290LXgxLmxldHNlbmNyeXB0Lm9yZzA8BgNVHR8ENTAz\n" 00096 "MDGgL6AthitodHRwOi8vY3JsLmlkZW50cnVzdC5jb20vRFNUUk9PVENBWDNDUkwu\n" 00097 "Y3JsMB0GA1UdDgQWBBSoSmpjBH3duubRObemRWXv86jsoTANBgkqhkiG9w0BAQsF\n" 00098 "AAOCAQEA3TPXEfNjWDjdGBX7CVW+dla5cEilaUcne8IkCJLxWh9KEik3JHRRHGJo\n" 00099 "uM2VcGfl96S8TihRzZvoroed6ti6WqEBmtzw3Wodatg+VyOeph4EYpr/1wXKtx8/\n" 00100 "wApIvJSwtmVi4MFU5aMqrSDE6ea73Mj2tcMyo5jMd6jmeWUHK8so/joWUoHOUgwu\n" 00101 "X4Po1QYz+3dszkDqMp4fklxBwXRsW10KXzPMTZ+sOPAveyxindmjkW8lGy+QsRlG\n" 00102 "PfZ+G6Z6h7mjem0Y+iWlkYcV4PIWL1iwBi8saCbGS5jN2p8M+X+Q7UNKEkROb3N6\n" 00103 "KOqkqm57TH2H3eDJAkSnh6/DNFu0Qg==\n" 00104 "-----END CERTIFICATE-----"; 00105 00106 Serial pc(USBTX, USBRX, 115200); 00107 int arrivedcount = 0; 00108 Thread thdMQTT; 00109 00110 MQTTSNetwork mqttNetwork; 00111 MQTT::Client<MQTTSNetwork, Countdown> client = MQTT::Client<MQTTSNetwork, Countdown>(mqttNetwork); 00112 00113 static MemoryPool<MQTT::Message, 16> pool; 00114 static Queue<MQTT::Message, 16> queue; 00115 00116 static const char* topic = "mbed-sample"; 00117 00118 typedef void (*messageHandler)(MQTT::MessageData&); 00119 void messageArrived(MQTT::MessageData& md) 00120 { 00121 MQTT::Message &message = md.message; 00122 pc.printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id); 00123 pc.printf("Payload %.*s\r\n", message.payloadlen, (char*)message.payload); 00124 ++arrivedcount; 00125 } 00126 00127 int mqttsConnect(const char * hostname, uint16_t port, 00128 const char * clientID, 00129 const char * username, 00130 const char * password) 00131 { 00132 int rc; 00133 00134 pc.printf("Connecting to %s:%d\r\n", hostname, port); 00135 rc = mqttNetwork.connect(hostname, port); 00136 if (rc != 0) 00137 { 00138 pc.printf("rc from TCP connect is %d\r\n", rc); 00139 return -1; 00140 } 00141 else 00142 pc.printf("RC passed!\r\n"); 00143 00144 pc.printf("Creating data connection ...\r\n"); 00145 MQTTPacket_connectData data = MQTTPacket_connectData_initializer; 00146 data.MQTTVersion = 3; 00147 data.clientID.cstring = (char *) clientID; 00148 data.username.cstring = (char *) username; 00149 data.password.cstring = (char *) password; 00150 pc.printf("Connecting client ...\r\n"); 00151 if ((rc = client.connect(data)) != 0) 00152 { 00153 printf("rc from MQTT connect is %d\r\n", rc); 00154 return -1; 00155 } 00156 00157 pc.printf("Subscribing to topic ...\r\n"); 00158 if ((rc = client.subscribe(topic, MQTT::QOS2, messageArrived)) != 0) 00159 { 00160 pc.printf("rc from MQTT subscribe is %d\r\n", rc); 00161 return -1; 00162 } 00163 00164 return rc; 00165 } 00166 00167 int mqttsSubscribe(const char * topic, MQTT::QoS os, messageHandler handler) 00168 { 00169 int rc; 00170 00171 pc.printf("Subscribing to topic ...\r\n"); 00172 if ((rc = client.subscribe(topic, os, handler)) != 0) 00173 pc.printf("rc from MQTT subscribe is %d\r\n", rc); 00174 00175 return rc; 00176 } 00177 00178 // This function is used to pass data from 00179 // the main thread to the MQTT listener thread 00180 void postMQTTUpdate(MQTT::Message msg) 00181 { 00182 MQTT::Message * message = pool.alloc(); 00183 // Simple copy, I think this is via (memcpy) 00184 *message = msg; 00185 00186 // Push the data to the consumer thread 00187 pc.printf("Pushing data to MQTTS Listener thread ...\r\n"); 00188 queue.put(message); 00189 } 00190 00191 void mqttListener(void) 00192 { 00193 pc.printf("MQTT listener thread started ...\r\n"); 00194 while(true) 00195 { 00196 // Wait for data in the queue, timeout at 10ms 00197 osEvent evt = queue.get(10); 00198 if (evt.status == osEventMessage) { 00199 pc.printf("Message arrived from main thread ...\r\n"); 00200 // Unpack the message 00201 MQTT::Message * message = (MQTT::Message *)evt.value.p; 00202 00203 pc.printf("Publishing message to MQTT ...\r\n"); 00204 // Push to mqtt 00205 int rc = client.publish(topic, *message); 00206 if (rc < 0) 00207 pc.printf("Error sending mqtt message \r\n"); 00208 else 00209 pc.printf("Message published ...\r\n"); 00210 00211 // Don't forget this! 00212 pool.free(message); 00213 } 00214 00215 pc.printf("MQTT client yeild ...\r\n"); 00216 if (client.yield(100) != 0) 00217 { 00218 client.disconnect(); 00219 // TODO: reconnect TLS session. 00220 return; 00221 } 00222 pc.printf("MQTT client yeild successful ...\r\n"); 00223 } 00224 } 00225 00226 int main(int argc, char* argv[]) 00227 { 00228 float version = 0.6; 00229 00230 int i = 0; 00231 00232 pc.printf("HelloMQTT: version is %.2f\r\n", version); 00233 00234 NetworkInterface* network = easy_connect(true); 00235 if (!network) { 00236 return -1; 00237 } 00238 00239 if ( mqttNetwork.setupTLS(network, SSL_CA_PEM) != 0 ) 00240 { 00241 pc.printf("Failed initializing sercure MQTTS...\r\n"); 00242 return -1; 00243 } 00244 00245 if ( mqttsConnect("mqtt.mbedhacks.com", 00246 8883,"mbedtest_01","tinong","tatay") != 0 ) 00247 { 00248 pc.printf("Failed connecting to mqtt.mbedhacks.com:8883 \r\n"); 00249 return -1; 00250 } 00251 00252 if ( mqttsSubscribe(topic, MQTT::QOS2, messageArrived) != 0 ) 00253 { 00254 pc.printf("Failed to subscribe to a topic!\r\n"); 00255 return -1; 00256 } 00257 00258 // Run an MQTT listener on its own thread. 00259 thdMQTT.start(mqttListener); 00260 00261 // The main loops just sends messages to MQTT server 00262 while(true) 00263 { 00264 MQTT::Message mqtt_msg; 00265 char buff[100]; 00266 00267 sprintf(buff, "message test %d", i); 00268 mqtt_msg.qos = MQTT::QOS0; 00269 mqtt_msg.retained = false; 00270 mqtt_msg.dup = false; 00271 mqtt_msg.payload = (void*)buff; 00272 mqtt_msg.payloadlen = strlen(buff)+1; 00273 00274 // publish the message to mqtt 00275 postMQTTUpdate(mqtt_msg); 00276 i++; 00277 00278 Thread::wait(2000); 00279 } 00280 00281 //mqttNetwork.disconnect(); 00282 00283 //printf("Version %.2f: finish %d msgs\r\n", version, arrivedcount); 00284 00285 //return 0; 00286 }
Generated on Fri Jul 15 2022 22:45:13 by
1.7.2
