ZeroMQ publisher demo application running on LPC1768 and PicoTCP. GPL v2
Dependencies: PicoTCP lpc1768-picotcp-eth-polling mbed-rtos mbed
Diff: main.cpp
- Revision:
- 1:907f8c9fa45d
- Parent:
- 0:c3b9517c3c53
- Child:
- 4:6158f706297f
diff -r c3b9517c3c53 -r 907f8c9fa45d main.cpp --- a/main.cpp Sat Jun 22 14:51:37 2013 +0000 +++ b/main.cpp Mon Jun 24 09:27:50 2013 +0000 @@ -8,9 +8,11 @@ #include "EthernetInterface.h" DigitalOut myled(LED1); +DigitalOut conn_led(LED2); static struct pico_socket *zmq_sock = NULL; +int remaining_hs_bytes = 64; -enum zmq_hshake_state { +volatile enum zmq_hshake_state { ST_LISTEN = 0, ST_CONNECTED, ST_SIGNATURE, @@ -19,7 +21,7 @@ ST_RDY } Handshake_state = ST_LISTEN; -struct zmq_msg { +struct __attribute__((packed)) zmq_msg { uint8_t flags; uint8_t len; char txt[0]; @@ -38,31 +40,39 @@ static void hs_connected(struct pico_socket *s) { + uint8_t my_ver[2] = {3u, 0}; uint8_t my_signature[10] = {0xff, 0, 0, 0, 0, 0, 0, 0, 1, 0x7f}; + uint8_t my_greeting[52] = {'N','U','L','L', 0}; pico_socket_write(s, my_signature, 10); + pico_socket_write(s, my_ver, 2); + pico_socket_write(s, my_greeting, 52); Handshake_state = ST_SIGNATURE; + remaining_hs_bytes = 64; + conn_led = 1; } static void hs_signature(struct pico_socket *s) { uint8_t incoming[20]; int ret; - uint8_t my_ver[2] = {3u, 0}; + ret = pico_socket_read(s, incoming, 10); if (ret < 10) { printf("Received invalid signature\n"); pico_socket_close(s); Handshake_state = ST_LISTEN; + conn_led = 0; return; } if (incoming[0] != 0xFF) { printf("Received invalid signature\n"); pico_socket_close(s); Handshake_state = ST_LISTEN; + conn_led = 0; return; } printf("Valid signature received. len = %d, first byte: %02x\n", ret, incoming[0]); - pico_socket_write(s, my_ver, 2); + remaining_hs_bytes -= ret; Handshake_state = ST_VERSION; } @@ -70,45 +80,55 @@ { uint8_t incoming[20]; int ret; - uint8_t my_greeting[53] = {'N','U','L','L', 0}; ret = pico_socket_read(s, incoming, 2); - if (ret < 1) { - printf("Cannot exchange valid version information. Read returned %d, expected at least one byte.\n", ret); + if (ret < 0) { + printf("Cannot exchange valid version information. Read returned -1\n"); pico_socket_close(s); Handshake_state = ST_LISTEN; + conn_led = 0; return; } + if (ret == 0) + return; + + remaining_hs_bytes -= ret; if (incoming[0] != 3) { printf("Version %d.x not supported by this publisher\n", incoming[0]); pico_socket_close(s); Handshake_state = ST_LISTEN; + conn_led = 0; return; } - pico_socket_write(s, my_greeting, 53); + printf("Subscriber is using version 3. Good!\n"); Handshake_state = ST_GREETING; } static void hs_greeting(struct pico_socket *s) { - uint8_t incoming[53]; + uint8_t incoming[64]; int ret; - uint8_t my_rdy[8] = {'R','E','A','D','Y',' ',' ',' '}; - ret = pico_socket_read(s, incoming, 53); - if (ret < 53) { + ret = pico_socket_read(s, incoming, 64); + printf("pico_socket_read in greeting returned %d\n", ret); + if (ret == 0) + return; + if (ret < 0) { printf("Cannot retrieve valid greeting\n"); pico_socket_close(s); Handshake_state = ST_LISTEN; + conn_led = 0; return; } + printf("Paired. Sending Ready.\n"); + Handshake_state = ST_RDY; zmq_send(s, "READY ", 8); - Handshake_state = ST_RDY; + } static void hs_rdy(struct pico_socket *s) { int ret; uint8_t incoming[258]; - pico_socket_read(s, incoming, 258); + ret = pico_socket_read(s, incoming, 258); printf("Got %d bytes from subscriber whilst in rdy state.\n", ret); } @@ -133,27 +153,39 @@ } if (ev & PICO_SOCK_EV_CONN) { - zmq_sock = pico_socket_accept(s, &orig, &port); + struct pico_socket *z; + z = pico_socket_accept(s, &orig, &port); pico_ipv4_to_string(peer, orig.addr); - printf("tcp0mq> Connection established with %s:%d.\n", peer, short_be(port)); - Handshake_state = ST_CONNECTED; + printf("tcp0mq> Connection requested by %s:%d.\n", peer, short_be(port)); + if (Handshake_state == ST_LISTEN) { + printf("tcp0mq> Accepted connection!\n"); + conn_led = 1; + zmq_sock = z; + Handshake_state = ST_CONNECTED; + } else { + printf("tcp0mq> Server busy, connection rejected\n"); + pico_socket_close(z); + } } if (ev & PICO_SOCK_EV_FIN) { printf("tcp0mq> Connection closed.\n"); Handshake_state = ST_LISTEN; + conn_led = 0; } if (ev & PICO_SOCK_EV_ERR) { printf("tcp0mq> Socket Error received: %s. Bailing out.\n", strerror(pico_err)); printf("tcp0mq> Connection closed.\n"); Handshake_state = ST_LISTEN; + conn_led = 0; } if (ev & PICO_SOCK_EV_CLOSE) { printf("tcp0mq> event close\n"); pico_socket_close(s); Handshake_state = ST_LISTEN; + conn_led = 0; } if (ev & PICO_SOCK_EV_WR) { @@ -193,8 +225,12 @@ while(1) { pico_stack_tick(); wait(0.001); + if (zmq_sock && Handshake_state > ST_LISTEN && Handshake_state < ST_RDY) { + if (hs_cb[Handshake_state]) + hs_cb[Handshake_state](zmq_sock); + } if((counter++ > 500) && (Handshake_state == ST_RDY)) { - zmq_send(zmq_sock, "HELLO WORLD", 10); + zmq_send(zmq_sock, "HELLO WORLD", 11); counter = 0; myled = !myled; }