mbed_Client_for_MQTT

/media/uploads/markfs/mbed.png

Overview

During the summer holiday 2010, i take part in a research project at CEIT. The project is called "RFID messaging system". The project aims to research RFID hardwares and algorithms to publish RFID events to varies applications, like Foursquare, Pachube, twitter or webpages. A mbed LPC1768 is chosen to run the system. You can follow this project at http://ceit.uq.edu.au/blogs/s4150392.

As part of RFID messaging project, a MQTT client is required to run on mbed platform to publish RFID check-in events to third party via MQTT protocol. There are rich MQTT clients on PCs and other embedded platform, but there is no previous implementation of such client on mbed platform. I decided to develop a fully functional MQTT client running on mbed platform. Detailed information about MQTT protocol are available at MQTT.

MQTT Client

This is a simple MQTT client developed to run on mbed. The library is still under developing, and has not be able to implement all functionality of MQTT protocol. The current version of the library is version 1.0.

v1.0 Features

  1. Connect to MQTT server via TCP connection.
  2. Publish message to MQTT server.
  3. Maintains connection with server.

v2.0 Features

  1. Connect to MQTT server via TCP connection.
  2. Publish message to MQTT server.
  3. Subscribe to MQTT Server
  4. Maintains connection with server.

Limitations

  1. Only QOS 0 messaging is supported

Library

An Ethernet and TCP libraries are required to support basic networking functions.

Import programEthernetTester

See http://mbed.org/users/no2chem/notebook/ethernet-testing/ For usage information.

Hardware

And you also need a ethernet connector, and a pair of ethernet cables. I used a expansion shied from Sparkfun, which has an Ethernet jack and two USB ports.

mbed with expansion shield

Test

I wrote a simple program to test functions of this MQTT client. The mbed and a mac are wired to same router. And the program is designed to connect the RSMB on mac using a name of “mbed”. After sucessful connection, it will first publish a message “Hello, here is mbed!” on topic “/mbed”, and then sends “Mirror, mirror where r u..” three times on same topic. And then the program will run 200s doing nothing, except sending a keep-alive signal every 10 seconds.

Code

main.cpp

#include "mbed.h"
#include "PubSub_mbed.h"

char buffer[1024];
char temp[1024];
int flag;

Ethernet ethernet;

struct netif netif_data;

DigitalOut ledStage0 (LED1);
DigitalOut ledStage1 (LED2);
DigitalOut ledStage2 (LED3);
DigitalOut ledStage3 (LED4);

volatile char stage = 0;

Ticker stage_blinker;

void stageblinker() {
    switch (stage) {
        case 0:
            ledStage0 = !ledStage0;
            ledStage1 = false;
            ledStage2 = false;
            ledStage3 = false;
            break;
        case 1:
            ledStage0 = true;
            ledStage1 = !ledStage1;
            ledStage2 = false;
            ledStage3 = false;
            break;
        case 2:
            ledStage0 = !ledStage0;
            ledStage1 = !ledStage1;
            ledStage2 = true;
            ledStage3 = false;
            break;
        case 3:
            ledStage0 = true;
            ledStage1 = false;
            ledStage2 = !ledStage2;
            ledStage3 = false;
            break;
        case 4:
            ledStage0 = true;
            ledStage1 = !ledStage1;
            ledStage2 = !ledStage2;
            ledStage3 = false;
            break;
        case 5:
            ledStage0 = true;
            ledStage1 = false;
            ledStage2 = false;
            ledStage3 = !ledStage3;
            break;
        case 6:
            ledStage0 = true;
            ledStage1 = true;
            ledStage2 = true;
            ledStage3 = true;
            stage_blinker.detach();
            break;
    }
}


err_t recv_callback(void *arg, struct tcp_pcb *pcb, struct pbuf *p, err_t err) {

    char *data;
    
    /* Check if status is ok and data is arrived. */
    if (err == ERR_OK && p != NULL) {
        /* Inform TCP that we have taken the data. */
        tcp_recved(pcb, p->tot_len);
        data = static_cast<char *>(p->payload);
        printf("DATA: %s\r\n", data);
    } else {
        /* No data arrived */
        /* That means the client closes the connection and sent us a packet with FIN flag set to 1. */
        /* We have to cleanup and destroy out TCPConnection. */
        printf("Connection closed by client.\r\n");
        tcp_close(pcb);
    }
    pbuf_free(p);
    return ERR_OK;
}

/* Accept an incomming call on the registered port */
err_t accept_callback(void *arg, struct tcp_pcb *npcb, err_t err) {

    LWIP_UNUSED_ARG(arg);

    printf("Recieve from broker...\r\n");
    printf("\r\n");

    /* Subscribe a receive callback function */
    tcp_recv(npcb, &recv_callback);

    /* Don't panic! Everything is fine. */
    return ERR_OK;
}

int main() {

    printf("\r\n");
    printf("############ mBed MQTT client Tester ############\r\n");
    printf("\r\n");
    flag = 0;
    stage = 0;

    //Attach stage blinker
    Ticker tickFast, tickSlow, tickARP, eth_tick, dns_tick, dhcp_coarse, dhcp_fine;
    stage_blinker.attach_us(&stageblinker, 1000*500);

    struct netif   *netif = &netif_data;
    struct ip_addr  ipaddr;
    struct ip_addr  netmask;
    struct ip_addr  gateway;
    
    struct ip_addr serverIp;
    
    /*mbed host name on network*/
    char *hostname = "my_mbed";

    printf("Configuring device for DHCP...\r\n");
    
    /* Start Network with DHCP */
    IP4_ADDR(&netmask, 255,255,255,255);
    IP4_ADDR(&gateway, 0,0,0,0);
    IP4_ADDR(&ipaddr, 0,0,0,0);
    IP4_ADDR(&serverIp, 10,1,1,5);

    /* Initialise after configuration */
    lwip_init();
    netif->hwaddr_len = ETHARP_HWADDR_LEN;
    device_address((char *)netif->hwaddr);

    /*debug MAC address*/
    printf("mbed HW address: %02x:%02x:%02x:%02x:%02x:%02x\r\n",
           (char*) netif->hwaddr[0],
           (char*) netif->hwaddr[1],
           (char*) netif->hwaddr[2],
           (char*) netif->hwaddr[3],
           (char*) netif->hwaddr[4],
           (char*) netif->hwaddr[5]);

    netif = netif_add(netif, &ipaddr, &netmask, &gateway, NULL, device_init, ip_input);
    netif->hostname = hostname;
    netif_set_default(netif);
    dhcp_start(netif); // <-- Use DHCP

    /* Initialise all needed timers */
    tickARP.attach_us( &etharp_tmr,  ARP_TMR_INTERVAL  * 1000);
    tickFast.attach_us(&tcp_fasttmr, TCP_FAST_INTERVAL * 1000);
    tickSlow.attach_us(&tcp_slowtmr, TCP_SLOW_INTERVAL * 1000);
    dns_tick.attach_us(&dns_tmr, DNS_TMR_INTERVAL * 1000);
    dhcp_coarse.attach_us(&dhcp_coarse_tmr, DHCP_COARSE_TIMER_MSECS * 1000);
    dhcp_fine.attach_us(&dhcp_fine_tmr, DHCP_FINE_TIMER_MSECS * 1000);

    stage = 1;

    while (!netif_is_up(netif)) {
        device_poll();
    }

    printf("Interface is up, local IP is %s\r\n", inet_ntoa(*(struct in_addr*)&(netif->ip_addr)));
    printf("\r\n");
    
    printf("Start connecting to MQTT broker...\r\n");
    printf("\r\n");

    stage = 2;
    

    PubSub_mbed client(serverIp, 1883, &accept_callback);

    
    /*Client nanme show for MQTT server*/
    char clientID[] = "mbed";
    /*Subscribe to topic : "/mbed_r" */
    char topic[] = "/mbed_r";
   
    
    flag = client.connect(clientID);

    if (flag) {
        printf("Connect to broker sucessed....\r\n");
    } else {
        printf("Failed...\r\n");
    }
    //printf("Flag(Connect) is : %d\r\n", flag);
    printf("\r\n");
    wait(3);

    stage = 3;

    flag = client.publish("/mbed", "Hello, here is mbed!");
    
    //printf("Flag(Publish) is : %d\r\n", flag);
    //printf("\r\n");
    wait(3);

    stage = 4;

    client.subscribe(topic);

    wait(5);
    
    stage = 5;


    int i;
    for (i = 0; i < 3; i++) {
        flag = client.publish("/mbed", "Mirror, Mirror, where r u..");
        device_poll();
        wait(1);
    }
    
    flag = client.publish("/mbed", "Keep alive");
    
    i = 0;
    while (i <= 20) {
        client.live();
        device_poll();
        wait(10);
        i++;
    }

    
    //Close connection
    client.disconnect();

    printf("\r\n");
    printf("############  Test finish  #############\r\n");
    printf("\r\n");

}


Results

This is a shot from the RSMB, when the broker accepted the connection from mbed.

/media/uploads/markfs/rsmb.png

This is the first message from mbed shown on a iA92 client, running on a mac.

/media/uploads/markfs/hello2.png

Here is the second message “Mirror, Mirror, where r u..” send from mbed.

/media/uploads/markfs/mirror2.png

This serial port window shows progresses of full test session.

/media/uploads/markfs/tester.png

The library has been tested on mbed NXP LPC1768, and IBM’s RSMB is chosen as MQTT server and iA92 java as a client running on PC and mac.

Downloads

Primary post of this page, downloads of library codes, more information and further updates can be found at following link:


All wikipages