ZeroMQ publisher demo application running on LPC1768 and PicoTCP. GPL v2
Dependencies: PicoTCP lpc1768-picotcp-eth-polling mbed-rtos mbed
Diff: main.cpp
- Revision:
- 0:c3b9517c3c53
- Child:
- 1:907f8c9fa45d
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/main.cpp Sat Jun 22 14:51:37 2013 +0000 @@ -0,0 +1,202 @@ +/* PicoTCP ZeroMQ Test Publisher + * Copyright 2013 - Daniele Lacamera + * + * GPL v2 + */ + +#include "mbed.h" +#include "EthernetInterface.h" + +DigitalOut myled(LED1); +static struct pico_socket *zmq_sock = NULL; + +enum zmq_hshake_state { + ST_LISTEN = 0, + ST_CONNECTED, + ST_SIGNATURE, + ST_VERSION, + ST_GREETING, + ST_RDY +} Handshake_state = ST_LISTEN; + +struct zmq_msg { + uint8_t flags; + uint8_t len; + char txt[0]; +}; + + +void zmq_send(struct pico_socket *s, char *txt, int len) +{ + struct zmq_msg msg; + msg.flags = 0; + msg.len = (uint8_t) len; + memcpy(msg.txt, txt, len); + pico_socket_write(s, &msg, len + 2); +} + + +static void hs_connected(struct pico_socket *s) +{ + uint8_t my_signature[10] = {0xff, 0, 0, 0, 0, 0, 0, 0, 1, 0x7f}; + pico_socket_write(s, my_signature, 10); + Handshake_state = ST_SIGNATURE; +} + +static void hs_signature(struct pico_socket *s) +{ + uint8_t incoming[20]; + int ret; + uint8_t my_ver[2] = {3u, 0}; + ret = pico_socket_read(s, incoming, 10); + if (ret < 10) { + printf("Received invalid signature\n"); + pico_socket_close(s); + Handshake_state = ST_LISTEN; + return; + } + if (incoming[0] != 0xFF) { + printf("Received invalid signature\n"); + pico_socket_close(s); + Handshake_state = ST_LISTEN; + return; + } + printf("Valid signature received. len = %d, first byte: %02x\n", ret, incoming[0]); + pico_socket_write(s, my_ver, 2); + Handshake_state = ST_VERSION; +} + +static void hs_version(struct pico_socket *s) +{ + uint8_t incoming[20]; + int ret; + uint8_t my_greeting[53] = {'N','U','L','L', 0}; + ret = pico_socket_read(s, incoming, 2); + if (ret < 1) { + printf("Cannot exchange valid version information. Read returned %d, expected at least one byte.\n", ret); + pico_socket_close(s); + Handshake_state = ST_LISTEN; + return; + } + if (incoming[0] != 3) { + printf("Version %d.x not supported by this publisher\n", incoming[0]); + pico_socket_close(s); + Handshake_state = ST_LISTEN; + return; + } + pico_socket_write(s, my_greeting, 53); + Handshake_state = ST_GREETING; +} + +static void hs_greeting(struct pico_socket *s) +{ + uint8_t incoming[53]; + int ret; + uint8_t my_rdy[8] = {'R','E','A','D','Y',' ',' ',' '}; + ret = pico_socket_read(s, incoming, 53); + if (ret < 53) { + printf("Cannot retrieve valid greeting\n"); + pico_socket_close(s); + Handshake_state = ST_LISTEN; + return; + } + zmq_send(s, "READY ", 8); + Handshake_state = ST_RDY; +} + +static void hs_rdy(struct pico_socket *s) +{ + int ret; + uint8_t incoming[258]; + pico_socket_read(s, incoming, 258); + printf("Got %d bytes from subscriber whilst in rdy state.\n", ret); +} + +static void(*hs_cb[])(struct pico_socket *) = { + NULL, + hs_connected, + hs_signature, + hs_version, + hs_greeting, + hs_rdy +}; + +void cb_tcp0mq(uint16_t ev, struct pico_socket *s) +{ + struct pico_ip4 orig; + uint16_t port; + char peer[30]; + + if (ev & PICO_SOCK_EV_RD) { + if (hs_cb[Handshake_state]) + hs_cb[Handshake_state](s); + } + + if (ev & PICO_SOCK_EV_CONN) { + zmq_sock = pico_socket_accept(s, &orig, &port); + pico_ipv4_to_string(peer, orig.addr); + printf("tcp0mq> Connection established with %s:%d.\n", peer, short_be(port)); + Handshake_state = ST_CONNECTED; + } + + if (ev & PICO_SOCK_EV_FIN) { + printf("tcp0mq> Connection closed.\n"); + Handshake_state = ST_LISTEN; + } + + if (ev & PICO_SOCK_EV_ERR) { + printf("tcp0mq> Socket Error received: %s. Bailing out.\n", strerror(pico_err)); + printf("tcp0mq> Connection closed.\n"); + Handshake_state = ST_LISTEN; + } + + if (ev & PICO_SOCK_EV_CLOSE) { + printf("tcp0mq> event close\n"); + pico_socket_close(s); + Handshake_state = ST_LISTEN; + } + + if (ev & PICO_SOCK_EV_WR) { + /* TODO: manage pending data */ + } +} + + + +int main() { + int counter = 0; + EthernetInterface eth; + eth.init(); //Use DHCP + eth.connect(); + pico_stack_init(); + + struct pico_socket *s; + struct pico_ip4 server_addr; + uint16_t port = short_be(9000); + struct pico_ip4 inaddr_any = {0}; + + s = pico_socket_open(PICO_PROTO_IPV4, PICO_PROTO_TCP, &cb_tcp0mq); + if (!s) + while(1);; + + printf("tcp0mq> BIND\n"); + if (pico_socket_bind(s, &inaddr_any, &port)!= 0) { + printf("tcp0mq> BIND failed because %s\n", strerror(pico_err)); + while(1);; + } + + printf("tcp0mq> LISTEN\n"); + if (pico_socket_listen(s, 40) != 0) + while(1);; + printf("tcp0mq> listening port %u ...\n",short_be(port)); + + while(1) { + pico_stack_tick(); + wait(0.001); + if((counter++ > 500) && (Handshake_state == ST_RDY)) { + zmq_send(zmq_sock, "HELLO WORLD", 10); + counter = 0; + myled = !myled; + } + } +}