ZeroMQ publisher demo application running on LPC1768 and PicoTCP. GPL v2
Dependencies: PicoTCP lpc1768-picotcp-eth-polling mbed-rtos mbed
main.cpp
- Committer:
- daniele
- Date:
- 2013-06-24
- Revision:
- 1:907f8c9fa45d
- Parent:
- 0:c3b9517c3c53
- Child:
- 4:6158f706297f
File content as of revision 1:907f8c9fa45d:
/* PicoTCP ZeroMQ Test Publisher * Copyright 2013 - Daniele Lacamera * * GPL v2 */ #include "mbed.h" #include "EthernetInterface.h" DigitalOut myled(LED1); DigitalOut conn_led(LED2); static struct pico_socket *zmq_sock = NULL; int remaining_hs_bytes = 64; volatile enum zmq_hshake_state { ST_LISTEN = 0, ST_CONNECTED, ST_SIGNATURE, ST_VERSION, ST_GREETING, ST_RDY } Handshake_state = ST_LISTEN; struct __attribute__((packed)) zmq_msg { uint8_t flags; uint8_t len; char txt[0]; }; void zmq_send(struct pico_socket *s, char *txt, int len) { struct zmq_msg msg; msg.flags = 0; msg.len = (uint8_t) len; memcpy(msg.txt, txt, len); pico_socket_write(s, &msg, len + 2); } static void hs_connected(struct pico_socket *s) { uint8_t my_ver[2] = {3u, 0}; uint8_t my_signature[10] = {0xff, 0, 0, 0, 0, 0, 0, 0, 1, 0x7f}; uint8_t my_greeting[52] = {'N','U','L','L', 0}; pico_socket_write(s, my_signature, 10); pico_socket_write(s, my_ver, 2); pico_socket_write(s, my_greeting, 52); Handshake_state = ST_SIGNATURE; remaining_hs_bytes = 64; conn_led = 1; } static void hs_signature(struct pico_socket *s) { uint8_t incoming[20]; int ret; ret = pico_socket_read(s, incoming, 10); if (ret < 10) { printf("Received invalid signature\n"); pico_socket_close(s); Handshake_state = ST_LISTEN; conn_led = 0; return; } if (incoming[0] != 0xFF) { printf("Received invalid signature\n"); pico_socket_close(s); Handshake_state = ST_LISTEN; conn_led = 0; return; } printf("Valid signature received. len = %d, first byte: %02x\n", ret, incoming[0]); remaining_hs_bytes -= ret; Handshake_state = ST_VERSION; } static void hs_version(struct pico_socket *s) { uint8_t incoming[20]; int ret; ret = pico_socket_read(s, incoming, 2); if (ret < 0) { printf("Cannot exchange valid version information. Read returned -1\n"); pico_socket_close(s); Handshake_state = ST_LISTEN; conn_led = 0; return; } if (ret == 0) return; remaining_hs_bytes -= ret; if (incoming[0] != 3) { printf("Version %d.x not supported by this publisher\n", incoming[0]); pico_socket_close(s); Handshake_state = ST_LISTEN; conn_led = 0; return; } printf("Subscriber is using version 3. Good!\n"); Handshake_state = ST_GREETING; } static void hs_greeting(struct pico_socket *s) { uint8_t incoming[64]; int ret; ret = pico_socket_read(s, incoming, 64); printf("pico_socket_read in greeting returned %d\n", ret); if (ret == 0) return; if (ret < 0) { printf("Cannot retrieve valid greeting\n"); pico_socket_close(s); Handshake_state = ST_LISTEN; conn_led = 0; return; } printf("Paired. Sending Ready.\n"); Handshake_state = ST_RDY; zmq_send(s, "READY ", 8); } static void hs_rdy(struct pico_socket *s) { int ret; uint8_t incoming[258]; ret = pico_socket_read(s, incoming, 258); printf("Got %d bytes from subscriber whilst in rdy state.\n", ret); } static void(*hs_cb[])(struct pico_socket *) = { NULL, hs_connected, hs_signature, hs_version, hs_greeting, hs_rdy }; void cb_tcp0mq(uint16_t ev, struct pico_socket *s) { struct pico_ip4 orig; uint16_t port; char peer[30]; if (ev & PICO_SOCK_EV_RD) { if (hs_cb[Handshake_state]) hs_cb[Handshake_state](s); } if (ev & PICO_SOCK_EV_CONN) { struct pico_socket *z; z = pico_socket_accept(s, &orig, &port); pico_ipv4_to_string(peer, orig.addr); printf("tcp0mq> Connection requested by %s:%d.\n", peer, short_be(port)); if (Handshake_state == ST_LISTEN) { printf("tcp0mq> Accepted connection!\n"); conn_led = 1; zmq_sock = z; Handshake_state = ST_CONNECTED; } else { printf("tcp0mq> Server busy, connection rejected\n"); pico_socket_close(z); } } if (ev & PICO_SOCK_EV_FIN) { printf("tcp0mq> Connection closed.\n"); Handshake_state = ST_LISTEN; conn_led = 0; } if (ev & PICO_SOCK_EV_ERR) { printf("tcp0mq> Socket Error received: %s. Bailing out.\n", strerror(pico_err)); printf("tcp0mq> Connection closed.\n"); Handshake_state = ST_LISTEN; conn_led = 0; } if (ev & PICO_SOCK_EV_CLOSE) { printf("tcp0mq> event close\n"); pico_socket_close(s); Handshake_state = ST_LISTEN; conn_led = 0; } if (ev & PICO_SOCK_EV_WR) { /* TODO: manage pending data */ } } int main() { int counter = 0; EthernetInterface eth; eth.init(); //Use DHCP eth.connect(); pico_stack_init(); struct pico_socket *s; struct pico_ip4 server_addr; uint16_t port = short_be(9000); struct pico_ip4 inaddr_any = {0}; s = pico_socket_open(PICO_PROTO_IPV4, PICO_PROTO_TCP, &cb_tcp0mq); if (!s) while(1);; printf("tcp0mq> BIND\n"); if (pico_socket_bind(s, &inaddr_any, &port)!= 0) { printf("tcp0mq> BIND failed because %s\n", strerror(pico_err)); while(1);; } printf("tcp0mq> LISTEN\n"); if (pico_socket_listen(s, 40) != 0) while(1);; printf("tcp0mq> listening port %u ...\n",short_be(port)); while(1) { pico_stack_tick(); wait(0.001); if (zmq_sock && Handshake_state > ST_LISTEN && Handshake_state < ST_RDY) { if (hs_cb[Handshake_state]) hs_cb[Handshake_state](zmq_sock); } if((counter++ > 500) && (Handshake_state == ST_RDY)) { zmq_send(zmq_sock, "HELLO WORLD", 11); counter = 0; myled = !myled; } } }