This example program uses HiveMQ Broker (http://www.mqtt-dashboard.com/index.html) to both publish andsubscribe to topics.

Dependencies:   WNCInterface mbed-rtos mbed

See the README for details on this example program. NOTE: When started, the program can take up to 40 seconds before it will respond. This delay is the time required for the WNC Data Module to connect with the network.

main.cpp

Committer:
root@developer-sjc-cyan-compiler.local.mbed.org
Date:
2016-11-17
Revision:
4:4e31afefdf81
Parent:
3:9413314f8017

File content as of revision 4:4e31afefdf81:

//#define MQTT_DEBUG

#include "mbed.h"
#include "MQTTClient.h"
#include "MQTTFormat.h"

//#include "MQTTEthernet.h"
#include "MQTTWNCInterface.h"
#include "rtos.h"
#include "k64f.h"


// connect options for MQTT broker
#define BROKER "broker.hivemq.com"   // MQTT broker URL
#define PORT 1883                           // MQTT broker port number
#define CLIENTID "96430312d8f7"                         // use K64F MAC address without colons
#define USERNAME ""                         // not required for MQTT Dashboard public broker 
#define PASSWORD ""                         // not required for MQTT Dashboard public broker
#define TOPIC "jmf/Test1"                            // MQTT topic

Queue<uint32_t, 6> messageQ;
MODSERIAL pc(USBTX,USBRX,256,256);
    
struct rcvd_errs{
    int err;
    char *er;
    };
    
rcvd_errs response[] = {
    200, "200 OK - Request has succeeded.",
    201, "201 Created - Request has been fulfilled and a new resource created.",
    202, "202 Accepted - The request has been accepted for processing, but the processing will be completed asynchronously",
    204, "204 No Content - The server has fulfilled the request but does not need to return an entity-body.",
    400, "400 Bad Request - Bad request (e.g. sending an array when the API is expecting a hash.)",
    401, "401 Unauthorized - No valid API key provided.",
    403, "403 Forbidden - You tried to access a disabled device, or your API key is not allowed to access that resource, etc.",
    404, "404 Not Found - The requested item could not be found.",
    405, "405 Method Not Allowed - The HTTP method specified is not allowed.",
    415, "415 Unsupported Media Type - The requested media type is currently not supported.",
    422, "422 Unprocessable Entity - Can result from sending invalid fields.",
    429, "429 Too Many Requests - The user has sent too many requests in a given period of time.",
    500, "500 Server errors - Something went wrong in the M2X server",
    502, "502 Server errors - Something went wrong in the M2X server",
    503, "503 Server errors - Something went wrong in the M2X server",
    504, "504 Server errors - Something went wrong in the M2X server",
    };
#define RCMAX   sizeof(response)/sizeof(rcvd_errs)

char * response_str(int rc) {
    static char *unkown = "Unknown error code...";
    int i=0;
    while( response[i].err != rc && i < RCMAX)
        i++;
    return (i<RCMAX? response[i].er : unkown);
}

// LED color control function
void controlLED(color_t led_color) {
    switch(led_color) {
        case red :
            greenLED = blueLED = 1;          
            redLED = 0.7;
            break;
        case green :
            redLED = blueLED = 1;
            greenLED = 0.7;
            break;
        case blue :
            redLED = greenLED = 1;
            blueLED = 0.7;
            break;
        case off :
            redLED = greenLED = blueLED = 1;
            break;
    }
}
    
// Switch 2 interrupt handler
void sw2_ISR(void) {
    messageQ.put((uint32_t*)22);
}

// Switch3 interrupt handler
void sw3_ISR(void) {
    messageQ.put((uint32_t*)33);
}
 
// MQTT message arrived callback function
void messageArrived(MQTT::MessageData& md) {
    MQTT::Message &message = md.message;
    pc.printf("Receiving MQTT message:  %.*s\r\n", message.payloadlen, (char*)message.payload);
    
    if (message.payloadlen == 3) {
        if (strncmp((char*)message.payload, "red", 3) == 0)
            controlLED(red);
        
        else if(strncmp((char*)message.payload, "grn", 3) == 0)
            controlLED(green);
        
        else if(strncmp((char*)message.payload, "blu", 3) == 0)
            controlLED(blue);
        
        else if(strncmp((char*)message.payload, "off", 3) == 0)
            controlLED(off);
    }        
}

int main() {
    int rc, good = 0;
    Timer tmr;
    char* topic = TOPIC;
    // turn off LED  
    controlLED(blue);

    pc.baud(115200);
    pc.printf("\r\n\r\nWelcome to the K64F MQTT Demo!\r\n");
        
    // set SW2 and SW3 to generate interrupt on falling edge 
    switch2.fall(&sw2_ISR);
    switch3.fall(&sw3_ISR);

    // initialize ethernet interface
    MQTTwnc ipstack = MQTTwnc();
    
    // get and display client network info
    WNCInterface& eth = ipstack.getEth();
    
    // construct the MQTT client
    MQTT::Client<MQTTwnc, Countdown> client = MQTT::Client<MQTTwnc, Countdown>(ipstack);
    
    char* hostname = BROKER;
    int port = PORT;

    pc.printf("\r\nConnected to local network...\r\n");    
    pc.printf("IP address is %s\r\n", eth.getIPAddress());
    pc.printf("MAC address is %s\r\n", eth.getMACAddress());
    pc.printf("Gateway address is %s\r\n", eth.getGateway());

    MQTTPacket_connectData data = MQTTPacket_connectData_initializer;       
    int tries;
    
    while( !good ) {    
        tries=0;
        // connect to TCP socket and check return code
        tmr.start();
        rc = 1;
        while( rc && tries < 3) {
            pc.printf("\r\n\r\nAttempting TCP connect to %s:%d:  ", hostname, port);
            rc = ipstack.connect(hostname, port);
            if( rc ) {
                pc.printf("Failed!!\r\n");
                while( tmr.read_ms() < 5000 ) ;
                tries++;
                tmr.reset();
            }
            else {
                pc.printf("Success!\r\n");
                rc = 0;
            }
        }
    
        data.MQTTVersion = 3;
        data.clientID.cstring = eth.getMACAddress();
    //    data.username.cstring = USERNAME;
    //    data.password.cstring = PASSWORD;
        
        // send MQTT connect packet and check return code
        rc = 1;
        tmr.reset(); 
        while( !client.isConnected() && rc && tries < 3) {
            pc.printf("Attempting (%d) MQTT connect to %s:%d: ", tries, hostname, port);
            rc = client.connect(data);
            if( rc ) {
                pc.printf("Connection Failed!\r\n");
                while( tmr.read_ms() < 5000 );
                tmr.reset();
                tries++;
            }
            else
                pc.printf("Connected!\r\n");
        }

        // subscribe to MQTT topic
        tmr.reset();
        rc = 1;
        while( rc && client.isConnected() && tries < 3) {
            pc.printf("We are %s, Subscribing to MQTT topic %s (%d): ", client.isConnected()?"connected":"NOT CONNECTED",topic,tries);
            rc = client.subscribe(topic, MQTT::QOS0, messageArrived);
            if( rc ) {
                pc.printf("Subscribe request failed!\r\n");
                while( tmr.read_ms() < 5000 );
                tries++;
                tmr.reset();
            }
            else {
                good=1;
                pc.printf("Subscribe successful!\r\n");
            }
        }
    }        
    
    MQTT::Message message;
    char buf[100];
    message.qos = MQTT::QOS0;
    message.retained = false;
    message.dup = false;
    message.payload = (void*)buf;
    message.payloadlen = strlen(buf)+1;
    
    while(true) {
        osEvent switchEvent = messageQ.get(100);
        
        if (switchEvent.value.v == 22 || switchEvent.value.v == 33) {
            switch(switchEvent.value.v) {
                case 22 :
                    sprintf(buf, "sw2");
                    break;
                case 33 :
                    sprintf(buf, "sw3");
                    break;
            }
            pc.printf("Publishing MQTT message: %s (%d)\r\n", (char*)message.payload,
                       message.payloadlen);
            rc = client.publish(topic, message);
            if( rc ) {
                pc.printf("Publish request failed! (%d)\r\n",rc);
            }
            else {
                pc.printf("Publish successful!\r\n");            
            client.yield(100);
            }
        }
        else {
            client.yield(100);
        }           
    }
}