Fork of Hello MQTT, using mbed TLS for secure mqtt transport

Dependencies:   MQTT

Fork of HelloMQTT by MQTT

main.cpp

Committer:
vpcola
Date:
2017-03-17
Revision:
21:4534812bb94f
Parent:
20:49c9daf2b0ff
Child:
22:4d0628d13870

File content as of revision 21:4534812bb94f:

/*******************************************************************************
 * Copyright (c) 2014, 2015 IBM Corp.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v1.0
 * and Eclipse Distribution License v1.0 which accompany this distribution.
 *
 * The Eclipse Public License is available at
 *    http://www.eclipse.org/legal/epl-v10.html
 * and the Eclipse Distribution License is available at
 *   http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * Contributors:
 *    Ian Craggs - initial API and implementation and/or initial documentation
 *    Ian Craggs - make sure QoS2 processing works, and add device headers
 *******************************************************************************/

 /**
  This is a sample program to illustrate the use of the MQTT Client library
  on the mbed platform.  The Client class requires two classes which mediate
  access to system interfaces for networking and timing.  As long as these two
  classes provide the required public programming interfaces, it does not matter
  what facilities they use underneath. In this program, they use the mbed
  system libraries.

 */

 // change this to 0 to output messages to serial instead of LCD


#define MQTTCLIENT_QOS2 1

#include "mbed.h"
#include "rtos.h"
#include "easy-connect.h"
#include "MQTTSNetwork.h"
#include "MQTTmbed.h"
#include "MQTTClient.h"


/* List of trusted root CA certificates
 * currently only "letsencrypt", the CA for mbedhacks.com
 *
 * To add more than one root, just concatenate them.
 *
 * TODO: Move this certificate file onto the SD card.
 */
static const char SSL_CA_PEM[] = "-----BEGIN CERTIFICATE-----\n"
    "MIIFETCCA/mgAwIBAgISA2ktlb1Y6ap4GCH7dg3wS37XMA0GCSqGSIb3DQEBCwUA\n"
    "MEoxCzAJBgNVBAYTAlVTMRYwFAYDVQQKEw1MZXQncyBFbmNyeXB0MSMwIQYDVQQD\n"
    "ExpMZXQncyBFbmNyeXB0IEF1dGhvcml0eSBYMzAeFw0xNzAzMDkwMTQ4MDBaFw0x\n"
    "NzA2MDcwMTQ4MDBaMBgxFjAUBgNVBAMTDW1iZWRoYWNrcy5jb20wggEiMA0GCSqG\n"
    "SIb3DQEBAQUAA4IBDwAwggEKAoIBAQC4ppYHlH8lfB7lkWOjMSnOJGaLtCBfz57I\n"
    "VVOd1Rngsz7nE5fg3joa7lkazRY1ZqtuC2UloS+4LYoQZX4Z887dhdug/TPA4J1A\n"
    "GppA4xVCb2kUFODMjZ2r4pMLp+MjFFMBaHrL4cgx/n4aJUB+N9Z+HW0p2Yr5TsOQ\n"
    "ghIOPkNxFr2q6klm49+BMUbO98hAwFwsIISLf6IbHM93gx1ltqkvb55N87ZM1hYH\n"
    "fkq+J+YqjleiLaqRN2MVlNMNfy9MDbqM5uCyGiWGtq8eiQLaWpZkxnA2MC5zPsO/\n"
    "fzEWiVjn2uazlXZ5xZwiK22KMxVasqWMitvETtmPOl9mocRbLQdxAgMBAAGjggIh\n"
    "MIICHTAOBgNVHQ8BAf8EBAMCBaAwHQYDVR0lBBYwFAYIKwYBBQUHAwEGCCsGAQUF\n"
    "BwMCMAwGA1UdEwEB/wQCMAAwHQYDVR0OBBYEFCsgG+z1BTjrN3K+/tF0C4k818Yv\n"
    "MB8GA1UdIwQYMBaAFKhKamMEfd265tE5t6ZFZe/zqOyhMHAGCCsGAQUFBwEBBGQw\n"
    "YjAvBggrBgEFBQcwAYYjaHR0cDovL29jc3AuaW50LXgzLmxldHNlbmNyeXB0Lm9y\n"
    "Zy8wLwYIKwYBBQUHMAKGI2h0dHA6Ly9jZXJ0LmludC14My5sZXRzZW5jcnlwdC5v\n"
    "cmcvMCsGA1UdEQQkMCKCDW1iZWRoYWNrcy5jb22CEXd3dy5tYmVkaGFja3MuY29t\n"
    "MIH+BgNVHSAEgfYwgfMwCAYGZ4EMAQIBMIHmBgsrBgEEAYLfEwEBATCB1jAmBggr\n"
    "BgEFBQcCARYaaHR0cDovL2Nwcy5sZXRzZW5jcnlwdC5vcmcwgasGCCsGAQUFBwIC\n"
    "MIGeDIGbVGhpcyBDZXJ0aWZpY2F0ZSBtYXkgb25seSBiZSByZWxpZWQgdXBvbiBi\n"
    "eSBSZWx5aW5nIFBhcnRpZXMgYW5kIG9ubHkgaW4gYWNjb3JkYW5jZSB3aXRoIHRo\n"
    "ZSBDZXJ0aWZpY2F0ZSBQb2xpY3kgZm91bmQgYXQgaHR0cHM6Ly9sZXRzZW5jcnlw\n"
    "dC5vcmcvcmVwb3NpdG9yeS8wDQYJKoZIhvcNAQELBQADggEBABFH6YcvHh8foHeg\n"
    "NM7iR9HnYRqa5gSERcCtq6jm8PcTsAbsdQ/BNpIHK7AZSg2kk17kj+JFeyMuNJWq\n"
    "lmabV0dtzdC8ejp1d7hGb/HjuQ400th/QRayvyrDVzQPfCNyJ0C82Q2DFjeUgnqv\n"
    "oJMcV6i4ICW0boI7GUf7oeHCmrUEHKffAbeFvx3c85c39IHJEFa59UWj1linU/Tr\n"
    "g9i5AaSKB95d706u1XRA7WLV/Hu7yunhxEjlj33bfdifBb/ZLBd0LtrXPwtXi6E8\n"
    "r6obp+B+Ce89G7WEhdT9BX0ck1KTK+yP7uAC7tvvsiejxXOoCtVyBAumBJS7mRuv\n"
    "I5hmKgE=\n"
    "-----END CERTIFICATE-----\n"
    "-----BEGIN CERTIFICATE-----\n"
    "MIIEkjCCA3qgAwIBAgIQCgFBQgAAAVOFc2oLheynCDANBgkqhkiG9w0BAQsFADA/\n"
    "MSQwIgYDVQQKExtEaWdpdGFsIFNpZ25hdHVyZSBUcnVzdCBDby4xFzAVBgNVBAMT\n"
    "DkRTVCBSb290IENBIFgzMB4XDTE2MDMxNzE2NDA0NloXDTIxMDMxNzE2NDA0Nlow\n"
    "SjELMAkGA1UEBhMCVVMxFjAUBgNVBAoTDUxldCdzIEVuY3J5cHQxIzAhBgNVBAMT\n"
    "GkxldCdzIEVuY3J5cHQgQXV0aG9yaXR5IFgzMIIBIjANBgkqhkiG9w0BAQEFAAOC\n"
    "AQ8AMIIBCgKCAQEAnNMM8FrlLke3cl03g7NoYzDq1zUmGSXhvb418XCSL7e4S0EF\n"
    "q6meNQhY7LEqxGiHC6PjdeTm86dicbp5gWAf15Gan/PQeGdxyGkOlZHP/uaZ6WA8\n"
    "SMx+yk13EiSdRxta67nsHjcAHJyse6cF6s5K671B5TaYucv9bTyWaN8jKkKQDIZ0\n"
    "Z8h/pZq4UmEUEz9l6YKHy9v6Dlb2honzhT+Xhq+w3Brvaw2VFn3EK6BlspkENnWA\n"
    "a6xK8xuQSXgvopZPKiAlKQTGdMDQMc2PMTiVFrqoM7hD8bEfwzB/onkxEz0tNvjj\n"
    "/PIzark5McWvxI0NHWQWM6r6hCm21AvA2H3DkwIDAQABo4IBfTCCAXkwEgYDVR0T\n"
    "AQH/BAgwBgEB/wIBADAOBgNVHQ8BAf8EBAMCAYYwfwYIKwYBBQUHAQEEczBxMDIG\n"
    "CCsGAQUFBzABhiZodHRwOi8vaXNyZy50cnVzdGlkLm9jc3AuaWRlbnRydXN0LmNv\n"
    "bTA7BggrBgEFBQcwAoYvaHR0cDovL2FwcHMuaWRlbnRydXN0LmNvbS9yb290cy9k\n"
    "c3Ryb290Y2F4My5wN2MwHwYDVR0jBBgwFoAUxKexpHsscfrb4UuQdf/EFWCFiRAw\n"
    "VAYDVR0gBE0wSzAIBgZngQwBAgEwPwYLKwYBBAGC3xMBAQEwMDAuBggrBgEFBQcC\n"
    "ARYiaHR0cDovL2Nwcy5yb290LXgxLmxldHNlbmNyeXB0Lm9yZzA8BgNVHR8ENTAz\n"
    "MDGgL6AthitodHRwOi8vY3JsLmlkZW50cnVzdC5jb20vRFNUUk9PVENBWDNDUkwu\n"
    "Y3JsMB0GA1UdDgQWBBSoSmpjBH3duubRObemRWXv86jsoTANBgkqhkiG9w0BAQsF\n"
    "AAOCAQEA3TPXEfNjWDjdGBX7CVW+dla5cEilaUcne8IkCJLxWh9KEik3JHRRHGJo\n"
    "uM2VcGfl96S8TihRzZvoroed6ti6WqEBmtzw3Wodatg+VyOeph4EYpr/1wXKtx8/\n"
    "wApIvJSwtmVi4MFU5aMqrSDE6ea73Mj2tcMyo5jMd6jmeWUHK8so/joWUoHOUgwu\n"
    "X4Po1QYz+3dszkDqMp4fklxBwXRsW10KXzPMTZ+sOPAveyxindmjkW8lGy+QsRlG\n"
    "PfZ+G6Z6h7mjem0Y+iWlkYcV4PIWL1iwBi8saCbGS5jN2p8M+X+Q7UNKEkROb3N6\n"
    "KOqkqm57TH2H3eDJAkSnh6/DNFu0Qg==\n"
    "-----END CERTIFICATE-----";

int arrivedcount = 0;
Thread thdMQTT;

MQTTSNetwork mqttNetwork;
MQTT::Client<MQTTSNetwork, Countdown> client = MQTT::Client<MQTTSNetwork, Countdown>(mqttNetwork);

static MemoryPool<MQTT::Message, 16> pool;
static Queue<MQTT::Message, 16> queue;

static const char* topic = "mbed-sample";

typedef void (*messageHandler)(MQTT::MessageData&);
void messageArrived(MQTT::MessageData& md)
{
    MQTT::Message &message = md.message;
    printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
    printf("Payload %.*s\r\n", message.payloadlen, (char*)message.payload);
    ++arrivedcount;
}

int mqttsConnect(const char * hostname, uint16_t port, 
        const char * clientID, 
        const char * username, 
        const char * password)
{
    int rc;
    
    printf("Connecting to %s:%d\r\n", hostname, port);
    rc = mqttNetwork.connect(hostname, port);
    if (rc != 0)
    {
        printf("rc from TCP connect is %d\r\n", rc);
        return -1;
    }
    else
        printf("RC passed!\r\n");
 
    printf("Creating data connection ...\r\n");
    MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
    data.MQTTVersion = 3;
    data.clientID.cstring = (char *) clientID;
    data.username.cstring = (char *) username;
    data.password.cstring = (char *) password;
    printf("Connecting client ...\r\n");
    if ((rc = client.connect(data)) != 0)
    {
        printf("rc from MQTT connect is %d\r\n", rc);
        return -1;
    }
 
    printf("Subscribing to topic ...\r\n");
    if ((rc = client.subscribe(topic, MQTT::QOS2, messageArrived)) != 0)
    {
        printf("rc from MQTT subscribe is %d\r\n", rc);
        return -1;
    }

    return rc;    
}

int mqttsSubscribe(const char * topic, MQTT::QoS os, messageHandler handler)
{
    int rc;
    
    printf("Subscribing to topic ...\r\n");
    if ((rc = client.subscribe(topic, os, handler)) != 0)
        printf("rc from MQTT subscribe is %d\r\n", rc);

    return rc;        
}

// This function is used to pass data from
// the main thread to the MQTT listener thread
void postMQTTUpdate(MQTT::Message msg)
{
    MQTT::Message * message = pool.alloc();
    // Simple copy, I think this is via (memcpy)
    *message = msg;
    
    // Push the data to the consumer thread
    printf("Pushing data to MQTTS Listener thread ...\r\n");
    queue.put(message);       
}

void mqttListener(void)
{
    printf("MQTT listener thread started ...\r\n");
    while(true)
    {
        // Wait for data in the queue, timeout at 10ms
        osEvent evt = queue.get(10);
        if (evt.status == osEventMessage) {
            printf("Message arrived from main thread ...\r\n");
            // Unpack the message
            MQTT::Message * message = (MQTT::Message *)evt.value.p;
            
            printf("Publishing message to MQTT ...\r\n");
            // Push to mqtt
            int rc = client.publish(topic, *message);
            if (rc < 0)
                printf("Error sending mqtt message \r\n");
            else
                printf("Message published ...\r\n");
                
            // Don't forget this!
            pool.free(message);
        }        
        
        printf("MQTT client yeild ...\r\n");
        if (client.yield(100) != 0)
        {
            client.disconnect();
            // TODO: reconnect TLS session.
            return;
        }
        printf("MQTT client yeild successful ...\r\n");
    }
}

int main(int argc, char* argv[])
{
    float version = 0.6;

    int i = 0;

    printf("HelloMQTT: version is %.2f\r\n", version);

    NetworkInterface* network = easy_connect(true);
    if (!network) {
        return -1;
    }

    if ( mqttNetwork.setupTLS(network, SSL_CA_PEM) != 0 )
    {
        printf("Failed initializing sercure MQTTS...\r\n");
        return -1;
    }
    
    if ( mqttsConnect("mqtt.mbedhacks.com", 
                        8883,"mbedtest_01","tinong","tatay") != 0 )
    {
        printf("Failed connecting to mqtt.mbedhacks.com:8883 \r\n");
        return -1;
    }
    
    if ( mqttsSubscribe(topic, MQTT::QOS2, messageArrived) != 0 )
    {
        printf("Failed to subscribe to a topic!\r\n");
        return -1;
    }
    
    // Run an MQTT listener on its own thread.
    thdMQTT.start(mqttListener);
    
    // The main loops just sends messages to MQTT server
    while(true)
    {
        MQTT::Message mqtt_msg;
        char buff[100];
        
        sprintf(buff, "message test %d", i);
        mqtt_msg.qos = MQTT::QOS0;
        mqtt_msg.retained = false;
        mqtt_msg.dup = false;
        mqtt_msg.payload = (void*)buff;
        mqtt_msg.payloadlen = strlen(buff)+1;
                    
        // publish the message to mqtt
        postMQTTUpdate(mqtt_msg);
        i++;
        
        Thread::wait(2000);
    }

    //mqttNetwork.disconnect();

    //printf("Version %.2f: finish %d msgs\r\n", version, arrivedcount);

    return 0;
}