ZeroMQ publisher demo application running on LPC1768 and PicoTCP. GPL v2
Dependencies: PicoTCP lpc1768-picotcp-eth-polling mbed-rtos mbed
main.cpp@0:c3b9517c3c53, 2013-06-22 (annotated)
- Committer:
- daniele
- Date:
- Sat Jun 22 14:51:37 2013 +0000
- Revision:
- 0:c3b9517c3c53
- Child:
- 1:907f8c9fa45d
First version of the demo application
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
daniele | 0:c3b9517c3c53 | 1 | /* PicoTCP ZeroMQ Test Publisher |
daniele | 0:c3b9517c3c53 | 2 | * Copyright 2013 - Daniele Lacamera |
daniele | 0:c3b9517c3c53 | 3 | * |
daniele | 0:c3b9517c3c53 | 4 | * GPL v2 |
daniele | 0:c3b9517c3c53 | 5 | */ |
daniele | 0:c3b9517c3c53 | 6 | |
daniele | 0:c3b9517c3c53 | 7 | #include "mbed.h" |
daniele | 0:c3b9517c3c53 | 8 | #include "EthernetInterface.h" |
daniele | 0:c3b9517c3c53 | 9 | |
daniele | 0:c3b9517c3c53 | 10 | DigitalOut myled(LED1); |
daniele | 0:c3b9517c3c53 | 11 | static struct pico_socket *zmq_sock = NULL; |
daniele | 0:c3b9517c3c53 | 12 | |
daniele | 0:c3b9517c3c53 | 13 | enum zmq_hshake_state { |
daniele | 0:c3b9517c3c53 | 14 | ST_LISTEN = 0, |
daniele | 0:c3b9517c3c53 | 15 | ST_CONNECTED, |
daniele | 0:c3b9517c3c53 | 16 | ST_SIGNATURE, |
daniele | 0:c3b9517c3c53 | 17 | ST_VERSION, |
daniele | 0:c3b9517c3c53 | 18 | ST_GREETING, |
daniele | 0:c3b9517c3c53 | 19 | ST_RDY |
daniele | 0:c3b9517c3c53 | 20 | } Handshake_state = ST_LISTEN; |
daniele | 0:c3b9517c3c53 | 21 | |
daniele | 0:c3b9517c3c53 | 22 | struct zmq_msg { |
daniele | 0:c3b9517c3c53 | 23 | uint8_t flags; |
daniele | 0:c3b9517c3c53 | 24 | uint8_t len; |
daniele | 0:c3b9517c3c53 | 25 | char txt[0]; |
daniele | 0:c3b9517c3c53 | 26 | }; |
daniele | 0:c3b9517c3c53 | 27 | |
daniele | 0:c3b9517c3c53 | 28 | |
daniele | 0:c3b9517c3c53 | 29 | void zmq_send(struct pico_socket *s, char *txt, int len) |
daniele | 0:c3b9517c3c53 | 30 | { |
daniele | 0:c3b9517c3c53 | 31 | struct zmq_msg msg; |
daniele | 0:c3b9517c3c53 | 32 | msg.flags = 0; |
daniele | 0:c3b9517c3c53 | 33 | msg.len = (uint8_t) len; |
daniele | 0:c3b9517c3c53 | 34 | memcpy(msg.txt, txt, len); |
daniele | 0:c3b9517c3c53 | 35 | pico_socket_write(s, &msg, len + 2); |
daniele | 0:c3b9517c3c53 | 36 | } |
daniele | 0:c3b9517c3c53 | 37 | |
daniele | 0:c3b9517c3c53 | 38 | |
daniele | 0:c3b9517c3c53 | 39 | static void hs_connected(struct pico_socket *s) |
daniele | 0:c3b9517c3c53 | 40 | { |
daniele | 0:c3b9517c3c53 | 41 | uint8_t my_signature[10] = {0xff, 0, 0, 0, 0, 0, 0, 0, 1, 0x7f}; |
daniele | 0:c3b9517c3c53 | 42 | pico_socket_write(s, my_signature, 10); |
daniele | 0:c3b9517c3c53 | 43 | Handshake_state = ST_SIGNATURE; |
daniele | 0:c3b9517c3c53 | 44 | } |
daniele | 0:c3b9517c3c53 | 45 | |
daniele | 0:c3b9517c3c53 | 46 | static void hs_signature(struct pico_socket *s) |
daniele | 0:c3b9517c3c53 | 47 | { |
daniele | 0:c3b9517c3c53 | 48 | uint8_t incoming[20]; |
daniele | 0:c3b9517c3c53 | 49 | int ret; |
daniele | 0:c3b9517c3c53 | 50 | uint8_t my_ver[2] = {3u, 0}; |
daniele | 0:c3b9517c3c53 | 51 | ret = pico_socket_read(s, incoming, 10); |
daniele | 0:c3b9517c3c53 | 52 | if (ret < 10) { |
daniele | 0:c3b9517c3c53 | 53 | printf("Received invalid signature\n"); |
daniele | 0:c3b9517c3c53 | 54 | pico_socket_close(s); |
daniele | 0:c3b9517c3c53 | 55 | Handshake_state = ST_LISTEN; |
daniele | 0:c3b9517c3c53 | 56 | return; |
daniele | 0:c3b9517c3c53 | 57 | } |
daniele | 0:c3b9517c3c53 | 58 | if (incoming[0] != 0xFF) { |
daniele | 0:c3b9517c3c53 | 59 | printf("Received invalid signature\n"); |
daniele | 0:c3b9517c3c53 | 60 | pico_socket_close(s); |
daniele | 0:c3b9517c3c53 | 61 | Handshake_state = ST_LISTEN; |
daniele | 0:c3b9517c3c53 | 62 | return; |
daniele | 0:c3b9517c3c53 | 63 | } |
daniele | 0:c3b9517c3c53 | 64 | printf("Valid signature received. len = %d, first byte: %02x\n", ret, incoming[0]); |
daniele | 0:c3b9517c3c53 | 65 | pico_socket_write(s, my_ver, 2); |
daniele | 0:c3b9517c3c53 | 66 | Handshake_state = ST_VERSION; |
daniele | 0:c3b9517c3c53 | 67 | } |
daniele | 0:c3b9517c3c53 | 68 | |
daniele | 0:c3b9517c3c53 | 69 | static void hs_version(struct pico_socket *s) |
daniele | 0:c3b9517c3c53 | 70 | { |
daniele | 0:c3b9517c3c53 | 71 | uint8_t incoming[20]; |
daniele | 0:c3b9517c3c53 | 72 | int ret; |
daniele | 0:c3b9517c3c53 | 73 | uint8_t my_greeting[53] = {'N','U','L','L', 0}; |
daniele | 0:c3b9517c3c53 | 74 | ret = pico_socket_read(s, incoming, 2); |
daniele | 0:c3b9517c3c53 | 75 | if (ret < 1) { |
daniele | 0:c3b9517c3c53 | 76 | printf("Cannot exchange valid version information. Read returned %d, expected at least one byte.\n", ret); |
daniele | 0:c3b9517c3c53 | 77 | pico_socket_close(s); |
daniele | 0:c3b9517c3c53 | 78 | Handshake_state = ST_LISTEN; |
daniele | 0:c3b9517c3c53 | 79 | return; |
daniele | 0:c3b9517c3c53 | 80 | } |
daniele | 0:c3b9517c3c53 | 81 | if (incoming[0] != 3) { |
daniele | 0:c3b9517c3c53 | 82 | printf("Version %d.x not supported by this publisher\n", incoming[0]); |
daniele | 0:c3b9517c3c53 | 83 | pico_socket_close(s); |
daniele | 0:c3b9517c3c53 | 84 | Handshake_state = ST_LISTEN; |
daniele | 0:c3b9517c3c53 | 85 | return; |
daniele | 0:c3b9517c3c53 | 86 | } |
daniele | 0:c3b9517c3c53 | 87 | pico_socket_write(s, my_greeting, 53); |
daniele | 0:c3b9517c3c53 | 88 | Handshake_state = ST_GREETING; |
daniele | 0:c3b9517c3c53 | 89 | } |
daniele | 0:c3b9517c3c53 | 90 | |
daniele | 0:c3b9517c3c53 | 91 | static void hs_greeting(struct pico_socket *s) |
daniele | 0:c3b9517c3c53 | 92 | { |
daniele | 0:c3b9517c3c53 | 93 | uint8_t incoming[53]; |
daniele | 0:c3b9517c3c53 | 94 | int ret; |
daniele | 0:c3b9517c3c53 | 95 | uint8_t my_rdy[8] = {'R','E','A','D','Y',' ',' ',' '}; |
daniele | 0:c3b9517c3c53 | 96 | ret = pico_socket_read(s, incoming, 53); |
daniele | 0:c3b9517c3c53 | 97 | if (ret < 53) { |
daniele | 0:c3b9517c3c53 | 98 | printf("Cannot retrieve valid greeting\n"); |
daniele | 0:c3b9517c3c53 | 99 | pico_socket_close(s); |
daniele | 0:c3b9517c3c53 | 100 | Handshake_state = ST_LISTEN; |
daniele | 0:c3b9517c3c53 | 101 | return; |
daniele | 0:c3b9517c3c53 | 102 | } |
daniele | 0:c3b9517c3c53 | 103 | zmq_send(s, "READY ", 8); |
daniele | 0:c3b9517c3c53 | 104 | Handshake_state = ST_RDY; |
daniele | 0:c3b9517c3c53 | 105 | } |
daniele | 0:c3b9517c3c53 | 106 | |
daniele | 0:c3b9517c3c53 | 107 | static void hs_rdy(struct pico_socket *s) |
daniele | 0:c3b9517c3c53 | 108 | { |
daniele | 0:c3b9517c3c53 | 109 | int ret; |
daniele | 0:c3b9517c3c53 | 110 | uint8_t incoming[258]; |
daniele | 0:c3b9517c3c53 | 111 | pico_socket_read(s, incoming, 258); |
daniele | 0:c3b9517c3c53 | 112 | printf("Got %d bytes from subscriber whilst in rdy state.\n", ret); |
daniele | 0:c3b9517c3c53 | 113 | } |
daniele | 0:c3b9517c3c53 | 114 | |
daniele | 0:c3b9517c3c53 | 115 | static void(*hs_cb[])(struct pico_socket *) = { |
daniele | 0:c3b9517c3c53 | 116 | NULL, |
daniele | 0:c3b9517c3c53 | 117 | hs_connected, |
daniele | 0:c3b9517c3c53 | 118 | hs_signature, |
daniele | 0:c3b9517c3c53 | 119 | hs_version, |
daniele | 0:c3b9517c3c53 | 120 | hs_greeting, |
daniele | 0:c3b9517c3c53 | 121 | hs_rdy |
daniele | 0:c3b9517c3c53 | 122 | }; |
daniele | 0:c3b9517c3c53 | 123 | |
daniele | 0:c3b9517c3c53 | 124 | void cb_tcp0mq(uint16_t ev, struct pico_socket *s) |
daniele | 0:c3b9517c3c53 | 125 | { |
daniele | 0:c3b9517c3c53 | 126 | struct pico_ip4 orig; |
daniele | 0:c3b9517c3c53 | 127 | uint16_t port; |
daniele | 0:c3b9517c3c53 | 128 | char peer[30]; |
daniele | 0:c3b9517c3c53 | 129 | |
daniele | 0:c3b9517c3c53 | 130 | if (ev & PICO_SOCK_EV_RD) { |
daniele | 0:c3b9517c3c53 | 131 | if (hs_cb[Handshake_state]) |
daniele | 0:c3b9517c3c53 | 132 | hs_cb[Handshake_state](s); |
daniele | 0:c3b9517c3c53 | 133 | } |
daniele | 0:c3b9517c3c53 | 134 | |
daniele | 0:c3b9517c3c53 | 135 | if (ev & PICO_SOCK_EV_CONN) { |
daniele | 0:c3b9517c3c53 | 136 | zmq_sock = pico_socket_accept(s, &orig, &port); |
daniele | 0:c3b9517c3c53 | 137 | pico_ipv4_to_string(peer, orig.addr); |
daniele | 0:c3b9517c3c53 | 138 | printf("tcp0mq> Connection established with %s:%d.\n", peer, short_be(port)); |
daniele | 0:c3b9517c3c53 | 139 | Handshake_state = ST_CONNECTED; |
daniele | 0:c3b9517c3c53 | 140 | } |
daniele | 0:c3b9517c3c53 | 141 | |
daniele | 0:c3b9517c3c53 | 142 | if (ev & PICO_SOCK_EV_FIN) { |
daniele | 0:c3b9517c3c53 | 143 | printf("tcp0mq> Connection closed.\n"); |
daniele | 0:c3b9517c3c53 | 144 | Handshake_state = ST_LISTEN; |
daniele | 0:c3b9517c3c53 | 145 | } |
daniele | 0:c3b9517c3c53 | 146 | |
daniele | 0:c3b9517c3c53 | 147 | if (ev & PICO_SOCK_EV_ERR) { |
daniele | 0:c3b9517c3c53 | 148 | printf("tcp0mq> Socket Error received: %s. Bailing out.\n", strerror(pico_err)); |
daniele | 0:c3b9517c3c53 | 149 | printf("tcp0mq> Connection closed.\n"); |
daniele | 0:c3b9517c3c53 | 150 | Handshake_state = ST_LISTEN; |
daniele | 0:c3b9517c3c53 | 151 | } |
daniele | 0:c3b9517c3c53 | 152 | |
daniele | 0:c3b9517c3c53 | 153 | if (ev & PICO_SOCK_EV_CLOSE) { |
daniele | 0:c3b9517c3c53 | 154 | printf("tcp0mq> event close\n"); |
daniele | 0:c3b9517c3c53 | 155 | pico_socket_close(s); |
daniele | 0:c3b9517c3c53 | 156 | Handshake_state = ST_LISTEN; |
daniele | 0:c3b9517c3c53 | 157 | } |
daniele | 0:c3b9517c3c53 | 158 | |
daniele | 0:c3b9517c3c53 | 159 | if (ev & PICO_SOCK_EV_WR) { |
daniele | 0:c3b9517c3c53 | 160 | /* TODO: manage pending data */ |
daniele | 0:c3b9517c3c53 | 161 | } |
daniele | 0:c3b9517c3c53 | 162 | } |
daniele | 0:c3b9517c3c53 | 163 | |
daniele | 0:c3b9517c3c53 | 164 | |
daniele | 0:c3b9517c3c53 | 165 | |
daniele | 0:c3b9517c3c53 | 166 | int main() { |
daniele | 0:c3b9517c3c53 | 167 | int counter = 0; |
daniele | 0:c3b9517c3c53 | 168 | EthernetInterface eth; |
daniele | 0:c3b9517c3c53 | 169 | eth.init(); //Use DHCP |
daniele | 0:c3b9517c3c53 | 170 | eth.connect(); |
daniele | 0:c3b9517c3c53 | 171 | pico_stack_init(); |
daniele | 0:c3b9517c3c53 | 172 | |
daniele | 0:c3b9517c3c53 | 173 | struct pico_socket *s; |
daniele | 0:c3b9517c3c53 | 174 | struct pico_ip4 server_addr; |
daniele | 0:c3b9517c3c53 | 175 | uint16_t port = short_be(9000); |
daniele | 0:c3b9517c3c53 | 176 | struct pico_ip4 inaddr_any = {0}; |
daniele | 0:c3b9517c3c53 | 177 | |
daniele | 0:c3b9517c3c53 | 178 | s = pico_socket_open(PICO_PROTO_IPV4, PICO_PROTO_TCP, &cb_tcp0mq); |
daniele | 0:c3b9517c3c53 | 179 | if (!s) |
daniele | 0:c3b9517c3c53 | 180 | while(1);; |
daniele | 0:c3b9517c3c53 | 181 | |
daniele | 0:c3b9517c3c53 | 182 | printf("tcp0mq> BIND\n"); |
daniele | 0:c3b9517c3c53 | 183 | if (pico_socket_bind(s, &inaddr_any, &port)!= 0) { |
daniele | 0:c3b9517c3c53 | 184 | printf("tcp0mq> BIND failed because %s\n", strerror(pico_err)); |
daniele | 0:c3b9517c3c53 | 185 | while(1);; |
daniele | 0:c3b9517c3c53 | 186 | } |
daniele | 0:c3b9517c3c53 | 187 | |
daniele | 0:c3b9517c3c53 | 188 | printf("tcp0mq> LISTEN\n"); |
daniele | 0:c3b9517c3c53 | 189 | if (pico_socket_listen(s, 40) != 0) |
daniele | 0:c3b9517c3c53 | 190 | while(1);; |
daniele | 0:c3b9517c3c53 | 191 | printf("tcp0mq> listening port %u ...\n",short_be(port)); |
daniele | 0:c3b9517c3c53 | 192 | |
daniele | 0:c3b9517c3c53 | 193 | while(1) { |
daniele | 0:c3b9517c3c53 | 194 | pico_stack_tick(); |
daniele | 0:c3b9517c3c53 | 195 | wait(0.001); |
daniele | 0:c3b9517c3c53 | 196 | if((counter++ > 500) && (Handshake_state == ST_RDY)) { |
daniele | 0:c3b9517c3c53 | 197 | zmq_send(zmq_sock, "HELLO WORLD", 10); |
daniele | 0:c3b9517c3c53 | 198 | counter = 0; |
daniele | 0:c3b9517c3c53 | 199 | myled = !myled; |
daniele | 0:c3b9517c3c53 | 200 | } |
daniele | 0:c3b9517c3c53 | 201 | } |
daniele | 0:c3b9517c3c53 | 202 | } |