ZeroMQ publisher demo application running on LPC1768 and PicoTCP. GPL v2

Dependencies:   PicoTCP lpc1768-picotcp-eth-polling mbed-rtos mbed

main.cpp

Committer:
tass
Date:
2013-06-26
Revision:
4:6158f706297f
Parent:
1:907f8c9fa45d

File content as of revision 4:6158f706297f:

/* PicoTCP ZeroMQ Test Publisher 
 * Copyright 2013 - Daniele Lacamera
 * 
 * GPL v2
 */

#include "mbed.h"
#include "EthernetInterface.h"

DigitalOut myled(LED1);
DigitalOut conn_led(LED2);
static struct pico_socket *zmq_sock = NULL;
int remaining_hs_bytes = 64;

volatile enum zmq_hshake_state {
  ST_LISTEN = 0,
  ST_CONNECTED,
  ST_SIGNATURE,
  ST_VERSION,
  ST_GREETING,
  ST_RDY
} Handshake_state = ST_LISTEN;

struct __attribute__((packed)) zmq_msg {
    uint8_t flags;
    uint8_t len;
    char    txt[0];
};


void zmq_send(struct pico_socket *s, char *txt, int len)
{
    struct zmq_msg msg;
    msg.flags = 4;
    msg.len = (uint8_t) len;
    memcpy(msg.txt, txt, len);
    pico_socket_write(s, &msg, len + 2);
}


static void hs_connected(struct pico_socket *s)
{
  uint8_t my_ver[2] = {3u, 0};
  uint8_t my_signature[10] =  {0xff, 0, 0, 0, 0, 0, 0, 0, 1, 0x7f};
  uint8_t my_greeting[52] = {'N','U','L','L', 0};
  pico_socket_write(s, my_signature, 10);
  pico_socket_write(s, my_ver, 2);
  pico_socket_write(s, my_greeting, 52);
  Handshake_state = ST_SIGNATURE;
  remaining_hs_bytes = 64;
  conn_led = 1;
}

static void hs_signature(struct pico_socket *s)
{
  uint8_t incoming[20];
  int ret;
  
  ret = pico_socket_read(s, incoming, 10);
  if (ret < 10) {
    printf("Received invalid signature\n");
    pico_socket_close(s);
    Handshake_state = ST_LISTEN;
    conn_led = 0;
    return;
  }
  if (incoming[0] != 0xFF) {
    printf("Received invalid signature\n");
    pico_socket_close(s);
    Handshake_state = ST_LISTEN;
    conn_led = 0;
    return;
  }
  printf("Valid signature received. len = %d, first byte: %02x\n", ret, incoming[0]);
  remaining_hs_bytes -= ret;
  Handshake_state = ST_VERSION;
}

static void hs_version(struct pico_socket *s)
{
  uint8_t incoming[20];
  int ret;
  ret = pico_socket_read(s, incoming, 2);
  if (ret < 0) {
    printf("Cannot exchange valid version information. Read returned -1\n");
    pico_socket_close(s);
    Handshake_state = ST_LISTEN;
    conn_led = 0;
    return;
  }
  if (ret == 0)
     return;
    
  remaining_hs_bytes -= ret;
  if (incoming[0] != 3) {
    printf("Version %d.x not supported by this publisher\n", incoming[0]);
    pico_socket_close(s);
    Handshake_state = ST_LISTEN;
    conn_led = 0;
    return;
  }
  printf("Subscriber is using version 3. Good!\n");
  Handshake_state = ST_GREETING;
}

static void hs_greeting(struct pico_socket *s)
{
  uint8_t incoming[64];
  int ret;
  ret = pico_socket_read(s, incoming, 64);
  printf("pico_socket_read in greeting returned %d\n", ret);    
  if (ret == 0)
   return;  
  if (ret < 0) {
    printf("Cannot retrieve valid greeting\n");
    pico_socket_close(s);
    Handshake_state = ST_LISTEN;
    conn_led = 0;
    return;
  }
  printf("Paired. Sending Ready.\n");
  Handshake_state = ST_RDY;
  zmq_send(s, "READY   ", 8);
  
}

static void hs_rdy(struct pico_socket *s)
{
    int ret;
    uint8_t incoming[258];
    ret = pico_socket_read(s, incoming, 258);
    printf("Got %d bytes from subscriber whilst in rdy state.\n", ret);
}

static void(*hs_cb[])(struct pico_socket *) = {
    NULL,
    hs_connected,
    hs_signature,
    hs_version,
    hs_greeting,
    hs_rdy
};

void cb_tcp0mq(uint16_t ev, struct pico_socket *s)
{
  struct pico_ip4 orig;
  uint16_t port;
  char peer[30];

  if (ev & PICO_SOCK_EV_RD) {
    if (hs_cb[Handshake_state])
      hs_cb[Handshake_state](s);
  }

  if (ev & PICO_SOCK_EV_CONN) { 
    struct pico_socket *z;
    z = pico_socket_accept(s, &orig, &port);
    pico_ipv4_to_string(peer, orig.addr);
    printf("tcp0mq> Connection requested by %s:%d.\n", peer, short_be(port));
    if (Handshake_state == ST_LISTEN) {
        printf("tcp0mq> Accepted connection!\n");
        conn_led = 1;
        zmq_sock = z;
        Handshake_state = ST_CONNECTED;
    } else {
        printf("tcp0mq> Server busy, connection rejected\n");
        pico_socket_close(z);
    }
  }

  if (ev & PICO_SOCK_EV_FIN) {
    printf("tcp0mq> Connection closed.\n");
    Handshake_state = ST_LISTEN;
    conn_led = 0;
  }

  if (ev & PICO_SOCK_EV_ERR) {
    printf("tcp0mq> Socket Error received: %s. Bailing out.\n", strerror(pico_err));
    printf("tcp0mq> Connection closed.\n");
    Handshake_state = ST_LISTEN;
    conn_led = 0;
  }

  if (ev & PICO_SOCK_EV_CLOSE) {
    printf("tcp0mq> event close\n");
    pico_socket_close(s);
    Handshake_state = ST_LISTEN;
    conn_led = 0;
  }

  if (ev & PICO_SOCK_EV_WR) {
    /* TODO: manage pending data */
  }
}



int main() {
  int counter = 0;
  EthernetInterface eth;
  eth.init(); //Use DHCP
  eth.connect();
  pico_stack_init();
  
  struct pico_socket *s;
  struct pico_ip4 server_addr;
  uint16_t port = short_be(9000);
  struct pico_ip4 inaddr_any = {0};

  s = pico_socket_open(PICO_PROTO_IPV4, PICO_PROTO_TCP, &cb_tcp0mq);
  if (!s)
    while(1);;
  
  printf("tcp0mq> BIND\n");
  if (pico_socket_bind(s, &inaddr_any, &port)!= 0) {
    printf("tcp0mq> BIND failed because %s\n", strerror(pico_err));
    while(1);;
  }

  printf("tcp0mq> LISTEN\n");
  if (pico_socket_listen(s, 40) != 0)
     while(1);;
  printf("tcp0mq> listening port %u ...\n",short_be(port));

  while(1) {
    pico_stack_tick();
    wait(0.001);
    if (zmq_sock && Handshake_state > ST_LISTEN && Handshake_state < ST_RDY) {
      if (hs_cb[Handshake_state]) 
          hs_cb[Handshake_state](zmq_sock);
    }
    if((counter++ > 500) && (Handshake_state == ST_RDY)) {
        zmq_send(zmq_sock, "HELLO WORLD", 11);
        counter = 0;
        myled = !myled;
    }
  }
}