Vergil Cola / Mbed OS K64FMQTT

Dependencies:   MQTT

Fork of HelloMQTT by MQTT

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers main.cpp Source File

main.cpp

00001 /*******************************************************************************
00002  * Copyright (c) 2014, 2015 IBM Corp.
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors:
00014  *    Ian Craggs - initial API and implementation and/or initial documentation
00015  *    Ian Craggs - make sure QoS2 processing works, and add device headers
00016  *******************************************************************************/
00017 
00018  /**
00019   This is a sample program to illustrate the use of the MQTT Client library
00020   on the mbed platform.  The Client class requires two classes which mediate
00021   access to system interfaces for networking and timing.  As long as these two
00022   classes provide the required public programming interfaces, it does not matter
00023   what facilities they use underneath. In this program, they use the mbed
00024   system libraries.
00025 
00026  */
00027 
00028  // change this to 0 to output messages to serial instead of LCD
00029 
00030 
00031 #define MQTTCLIENT_QOS2 1
00032 
00033 #include "mbed.h"
00034 #include "rtos.h"
00035 #include "easy-connect.h"
00036 #include "MQTTSNetwork.h"
00037 #include "MQTTmbed.h"
00038 #include "MQTTClient.h"
00039 
00040 
00041 /* List of trusted root CA certificates
00042  * currently only "letsencrypt", the CA for mbedhacks.com
00043  *
00044  * To add more than one root, just concatenate them.
00045  *
00046  * TODO: Move this certificate file onto the SD card.
00047  */
00048 static const char SSL_CA_PEM[] = "-----BEGIN CERTIFICATE-----\n"
00049     "MIIFETCCA/mgAwIBAgISA2ktlb1Y6ap4GCH7dg3wS37XMA0GCSqGSIb3DQEBCwUA\n"
00050     "MEoxCzAJBgNVBAYTAlVTMRYwFAYDVQQKEw1MZXQncyBFbmNyeXB0MSMwIQYDVQQD\n"
00051     "ExpMZXQncyBFbmNyeXB0IEF1dGhvcml0eSBYMzAeFw0xNzAzMDkwMTQ4MDBaFw0x\n"
00052     "NzA2MDcwMTQ4MDBaMBgxFjAUBgNVBAMTDW1iZWRoYWNrcy5jb20wggEiMA0GCSqG\n"
00053     "SIb3DQEBAQUAA4IBDwAwggEKAoIBAQC4ppYHlH8lfB7lkWOjMSnOJGaLtCBfz57I\n"
00054     "VVOd1Rngsz7nE5fg3joa7lkazRY1ZqtuC2UloS+4LYoQZX4Z887dhdug/TPA4J1A\n"
00055     "GppA4xVCb2kUFODMjZ2r4pMLp+MjFFMBaHrL4cgx/n4aJUB+N9Z+HW0p2Yr5TsOQ\n"
00056     "ghIOPkNxFr2q6klm49+BMUbO98hAwFwsIISLf6IbHM93gx1ltqkvb55N87ZM1hYH\n"
00057     "fkq+J+YqjleiLaqRN2MVlNMNfy9MDbqM5uCyGiWGtq8eiQLaWpZkxnA2MC5zPsO/\n"
00058     "fzEWiVjn2uazlXZ5xZwiK22KMxVasqWMitvETtmPOl9mocRbLQdxAgMBAAGjggIh\n"
00059     "MIICHTAOBgNVHQ8BAf8EBAMCBaAwHQYDVR0lBBYwFAYIKwYBBQUHAwEGCCsGAQUF\n"
00060     "BwMCMAwGA1UdEwEB/wQCMAAwHQYDVR0OBBYEFCsgG+z1BTjrN3K+/tF0C4k818Yv\n"
00061     "MB8GA1UdIwQYMBaAFKhKamMEfd265tE5t6ZFZe/zqOyhMHAGCCsGAQUFBwEBBGQw\n"
00062     "YjAvBggrBgEFBQcwAYYjaHR0cDovL29jc3AuaW50LXgzLmxldHNlbmNyeXB0Lm9y\n"
00063     "Zy8wLwYIKwYBBQUHMAKGI2h0dHA6Ly9jZXJ0LmludC14My5sZXRzZW5jcnlwdC5v\n"
00064     "cmcvMCsGA1UdEQQkMCKCDW1iZWRoYWNrcy5jb22CEXd3dy5tYmVkaGFja3MuY29t\n"
00065     "MIH+BgNVHSAEgfYwgfMwCAYGZ4EMAQIBMIHmBgsrBgEEAYLfEwEBATCB1jAmBggr\n"
00066     "BgEFBQcCARYaaHR0cDovL2Nwcy5sZXRzZW5jcnlwdC5vcmcwgasGCCsGAQUFBwIC\n"
00067     "MIGeDIGbVGhpcyBDZXJ0aWZpY2F0ZSBtYXkgb25seSBiZSByZWxpZWQgdXBvbiBi\n"
00068     "eSBSZWx5aW5nIFBhcnRpZXMgYW5kIG9ubHkgaW4gYWNjb3JkYW5jZSB3aXRoIHRo\n"
00069     "ZSBDZXJ0aWZpY2F0ZSBQb2xpY3kgZm91bmQgYXQgaHR0cHM6Ly9sZXRzZW5jcnlw\n"
00070     "dC5vcmcvcmVwb3NpdG9yeS8wDQYJKoZIhvcNAQELBQADggEBABFH6YcvHh8foHeg\n"
00071     "NM7iR9HnYRqa5gSERcCtq6jm8PcTsAbsdQ/BNpIHK7AZSg2kk17kj+JFeyMuNJWq\n"
00072     "lmabV0dtzdC8ejp1d7hGb/HjuQ400th/QRayvyrDVzQPfCNyJ0C82Q2DFjeUgnqv\n"
00073     "oJMcV6i4ICW0boI7GUf7oeHCmrUEHKffAbeFvx3c85c39IHJEFa59UWj1linU/Tr\n"
00074     "g9i5AaSKB95d706u1XRA7WLV/Hu7yunhxEjlj33bfdifBb/ZLBd0LtrXPwtXi6E8\n"
00075     "r6obp+B+Ce89G7WEhdT9BX0ck1KTK+yP7uAC7tvvsiejxXOoCtVyBAumBJS7mRuv\n"
00076     "I5hmKgE=\n"
00077     "-----END CERTIFICATE-----\n"
00078     "-----BEGIN CERTIFICATE-----\n"
00079     "MIIEkjCCA3qgAwIBAgIQCgFBQgAAAVOFc2oLheynCDANBgkqhkiG9w0BAQsFADA/\n"
00080     "MSQwIgYDVQQKExtEaWdpdGFsIFNpZ25hdHVyZSBUcnVzdCBDby4xFzAVBgNVBAMT\n"
00081     "DkRTVCBSb290IENBIFgzMB4XDTE2MDMxNzE2NDA0NloXDTIxMDMxNzE2NDA0Nlow\n"
00082     "SjELMAkGA1UEBhMCVVMxFjAUBgNVBAoTDUxldCdzIEVuY3J5cHQxIzAhBgNVBAMT\n"
00083     "GkxldCdzIEVuY3J5cHQgQXV0aG9yaXR5IFgzMIIBIjANBgkqhkiG9w0BAQEFAAOC\n"
00084     "AQ8AMIIBCgKCAQEAnNMM8FrlLke3cl03g7NoYzDq1zUmGSXhvb418XCSL7e4S0EF\n"
00085     "q6meNQhY7LEqxGiHC6PjdeTm86dicbp5gWAf15Gan/PQeGdxyGkOlZHP/uaZ6WA8\n"
00086     "SMx+yk13EiSdRxta67nsHjcAHJyse6cF6s5K671B5TaYucv9bTyWaN8jKkKQDIZ0\n"
00087     "Z8h/pZq4UmEUEz9l6YKHy9v6Dlb2honzhT+Xhq+w3Brvaw2VFn3EK6BlspkENnWA\n"
00088     "a6xK8xuQSXgvopZPKiAlKQTGdMDQMc2PMTiVFrqoM7hD8bEfwzB/onkxEz0tNvjj\n"
00089     "/PIzark5McWvxI0NHWQWM6r6hCm21AvA2H3DkwIDAQABo4IBfTCCAXkwEgYDVR0T\n"
00090     "AQH/BAgwBgEB/wIBADAOBgNVHQ8BAf8EBAMCAYYwfwYIKwYBBQUHAQEEczBxMDIG\n"
00091     "CCsGAQUFBzABhiZodHRwOi8vaXNyZy50cnVzdGlkLm9jc3AuaWRlbnRydXN0LmNv\n"
00092     "bTA7BggrBgEFBQcwAoYvaHR0cDovL2FwcHMuaWRlbnRydXN0LmNvbS9yb290cy9k\n"
00093     "c3Ryb290Y2F4My5wN2MwHwYDVR0jBBgwFoAUxKexpHsscfrb4UuQdf/EFWCFiRAw\n"
00094     "VAYDVR0gBE0wSzAIBgZngQwBAgEwPwYLKwYBBAGC3xMBAQEwMDAuBggrBgEFBQcC\n"
00095     "ARYiaHR0cDovL2Nwcy5yb290LXgxLmxldHNlbmNyeXB0Lm9yZzA8BgNVHR8ENTAz\n"
00096     "MDGgL6AthitodHRwOi8vY3JsLmlkZW50cnVzdC5jb20vRFNUUk9PVENBWDNDUkwu\n"
00097     "Y3JsMB0GA1UdDgQWBBSoSmpjBH3duubRObemRWXv86jsoTANBgkqhkiG9w0BAQsF\n"
00098     "AAOCAQEA3TPXEfNjWDjdGBX7CVW+dla5cEilaUcne8IkCJLxWh9KEik3JHRRHGJo\n"
00099     "uM2VcGfl96S8TihRzZvoroed6ti6WqEBmtzw3Wodatg+VyOeph4EYpr/1wXKtx8/\n"
00100     "wApIvJSwtmVi4MFU5aMqrSDE6ea73Mj2tcMyo5jMd6jmeWUHK8so/joWUoHOUgwu\n"
00101     "X4Po1QYz+3dszkDqMp4fklxBwXRsW10KXzPMTZ+sOPAveyxindmjkW8lGy+QsRlG\n"
00102     "PfZ+G6Z6h7mjem0Y+iWlkYcV4PIWL1iwBi8saCbGS5jN2p8M+X+Q7UNKEkROb3N6\n"
00103     "KOqkqm57TH2H3eDJAkSnh6/DNFu0Qg==\n"
00104     "-----END CERTIFICATE-----";
00105 
00106 Serial pc(USBTX, USBRX, 115200);
00107 int arrivedcount = 0;
00108 Thread thdMQTT;
00109 
00110 MQTTSNetwork mqttNetwork;
00111 MQTT::Client<MQTTSNetwork, Countdown> client = MQTT::Client<MQTTSNetwork, Countdown>(mqttNetwork);
00112 
00113 static MemoryPool<MQTT::Message, 16> pool;
00114 static Queue<MQTT::Message, 16> queue;
00115 
00116 static const char* topic = "mbed-sample";
00117 
00118 typedef void (*messageHandler)(MQTT::MessageData&);
00119 void messageArrived(MQTT::MessageData& md)
00120 {
00121     MQTT::Message &message = md.message;
00122     pc.printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
00123     pc.printf("Payload %.*s\r\n", message.payloadlen, (char*)message.payload);
00124     ++arrivedcount;
00125 }
00126 
00127 int mqttsConnect(const char * hostname, uint16_t port, 
00128         const char * clientID, 
00129         const char * username, 
00130         const char * password)
00131 {
00132     int rc;
00133     
00134     pc.printf("Connecting to %s:%d\r\n", hostname, port);
00135     rc = mqttNetwork.connect(hostname, port);
00136     if (rc != 0)
00137     {
00138         pc.printf("rc from TCP connect is %d\r\n", rc);
00139         return -1;
00140     }
00141     else
00142         pc.printf("RC passed!\r\n");
00143  
00144     pc.printf("Creating data connection ...\r\n");
00145     MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
00146     data.MQTTVersion = 3;
00147     data.clientID.cstring = (char *) clientID;
00148     data.username.cstring = (char *) username;
00149     data.password.cstring = (char *) password;
00150     pc.printf("Connecting client ...\r\n");
00151     if ((rc = client.connect(data)) != 0)
00152     {
00153         printf("rc from MQTT connect is %d\r\n", rc);
00154         return -1;
00155     }
00156  
00157     pc.printf("Subscribing to topic ...\r\n");
00158     if ((rc = client.subscribe(topic, MQTT::QOS2, messageArrived)) != 0)
00159     {
00160         pc.printf("rc from MQTT subscribe is %d\r\n", rc);
00161         return -1;
00162     }
00163 
00164     return rc;    
00165 }
00166 
00167 int mqttsSubscribe(const char * topic, MQTT::QoS os, messageHandler handler)
00168 {
00169     int rc;
00170     
00171     pc.printf("Subscribing to topic ...\r\n");
00172     if ((rc = client.subscribe(topic, os, handler)) != 0)
00173         pc.printf("rc from MQTT subscribe is %d\r\n", rc);
00174 
00175     return rc;        
00176 }
00177 
00178 // This function is used to pass data from
00179 // the main thread to the MQTT listener thread
00180 void postMQTTUpdate(MQTT::Message msg)
00181 {
00182     MQTT::Message * message = pool.alloc();
00183     // Simple copy, I think this is via (memcpy)
00184     *message = msg;
00185     
00186     // Push the data to the consumer thread
00187     pc.printf("Pushing data to MQTTS Listener thread ...\r\n");
00188     queue.put(message);       
00189 }
00190 
00191 void mqttListener(void)
00192 {
00193     pc.printf("MQTT listener thread started ...\r\n");
00194     while(true)
00195     {
00196         // Wait for data in the queue, timeout at 10ms
00197         osEvent evt = queue.get(10);
00198         if (evt.status == osEventMessage) {
00199             pc.printf("Message arrived from main thread ...\r\n");
00200             // Unpack the message
00201             MQTT::Message * message = (MQTT::Message *)evt.value.p;
00202             
00203             pc.printf("Publishing message to MQTT ...\r\n");
00204             // Push to mqtt
00205             int rc = client.publish(topic, *message);
00206             if (rc < 0)
00207                 pc.printf("Error sending mqtt message \r\n");
00208             else
00209                 pc.printf("Message published ...\r\n");
00210                 
00211             // Don't forget this!
00212             pool.free(message);
00213         }        
00214         
00215         pc.printf("MQTT client yeild ...\r\n");
00216         if (client.yield(100) != 0)
00217         {
00218             client.disconnect();
00219             // TODO: reconnect TLS session.
00220             return;
00221         }
00222         pc.printf("MQTT client yeild successful ...\r\n");
00223     }
00224 }
00225 
00226 int main(int argc, char* argv[])
00227 {
00228     float version = 0.6;
00229 
00230     int i = 0;
00231 
00232     pc.printf("HelloMQTT: version is %.2f\r\n", version);
00233 
00234     NetworkInterface* network = easy_connect(true);
00235     if (!network) {
00236         return -1;
00237     }
00238     
00239     if ( mqttNetwork.setupTLS(network, SSL_CA_PEM) != 0 )
00240     {
00241         pc.printf("Failed initializing sercure MQTTS...\r\n");
00242         return -1;
00243     }
00244     
00245     if ( mqttsConnect("mqtt.mbedhacks.com", 
00246                         8883,"mbedtest_01","tinong","tatay") != 0 )
00247     {
00248         pc.printf("Failed connecting to mqtt.mbedhacks.com:8883 \r\n");
00249         return -1;
00250     }
00251     
00252     if ( mqttsSubscribe(topic, MQTT::QOS2, messageArrived) != 0 )
00253     {
00254         pc.printf("Failed to subscribe to a topic!\r\n");
00255         return -1;
00256     }
00257     
00258     // Run an MQTT listener on its own thread.
00259     thdMQTT.start(mqttListener);
00260     
00261     // The main loops just sends messages to MQTT server
00262     while(true)
00263     {
00264         MQTT::Message mqtt_msg;
00265         char buff[100];
00266         
00267         sprintf(buff, "message test %d", i);
00268         mqtt_msg.qos = MQTT::QOS0;
00269         mqtt_msg.retained = false;
00270         mqtt_msg.dup = false;
00271         mqtt_msg.payload = (void*)buff;
00272         mqtt_msg.payloadlen = strlen(buff)+1;
00273                     
00274         // publish the message to mqtt
00275         postMQTTUpdate(mqtt_msg);
00276         i++;
00277         
00278         Thread::wait(2000);
00279     }
00280 
00281     //mqttNetwork.disconnect();
00282 
00283     //printf("Version %.2f: finish %d msgs\r\n", version, arrivedcount);
00284 
00285     //return 0;
00286 }