ZeroMQ publisher demo application running on LPC1768 and PicoTCP. GPL v2
Dependencies: PicoTCP lpc1768-picotcp-eth-polling mbed-rtos mbed
main.cpp@4:6158f706297f, 2013-06-26 (annotated)
- Committer:
- tass
- Date:
- Wed Jun 26 07:06:14 2013 +0000
- Revision:
- 4:6158f706297f
- Parent:
- 1:907f8c9fa45d
fixed flag that was causing assertion failure for zmtp30
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
daniele | 0:c3b9517c3c53 | 1 | /* PicoTCP ZeroMQ Test Publisher |
daniele | 0:c3b9517c3c53 | 2 | * Copyright 2013 - Daniele Lacamera |
daniele | 0:c3b9517c3c53 | 3 | * |
daniele | 0:c3b9517c3c53 | 4 | * GPL v2 |
daniele | 0:c3b9517c3c53 | 5 | */ |
daniele | 0:c3b9517c3c53 | 6 | |
daniele | 0:c3b9517c3c53 | 7 | #include "mbed.h" |
daniele | 0:c3b9517c3c53 | 8 | #include "EthernetInterface.h" |
daniele | 0:c3b9517c3c53 | 9 | |
daniele | 0:c3b9517c3c53 | 10 | DigitalOut myled(LED1); |
daniele | 1:907f8c9fa45d | 11 | DigitalOut conn_led(LED2); |
daniele | 0:c3b9517c3c53 | 12 | static struct pico_socket *zmq_sock = NULL; |
daniele | 1:907f8c9fa45d | 13 | int remaining_hs_bytes = 64; |
daniele | 0:c3b9517c3c53 | 14 | |
daniele | 1:907f8c9fa45d | 15 | volatile enum zmq_hshake_state { |
daniele | 0:c3b9517c3c53 | 16 | ST_LISTEN = 0, |
daniele | 0:c3b9517c3c53 | 17 | ST_CONNECTED, |
daniele | 0:c3b9517c3c53 | 18 | ST_SIGNATURE, |
daniele | 0:c3b9517c3c53 | 19 | ST_VERSION, |
daniele | 0:c3b9517c3c53 | 20 | ST_GREETING, |
daniele | 0:c3b9517c3c53 | 21 | ST_RDY |
daniele | 0:c3b9517c3c53 | 22 | } Handshake_state = ST_LISTEN; |
daniele | 0:c3b9517c3c53 | 23 | |
daniele | 1:907f8c9fa45d | 24 | struct __attribute__((packed)) zmq_msg { |
daniele | 0:c3b9517c3c53 | 25 | uint8_t flags; |
daniele | 0:c3b9517c3c53 | 26 | uint8_t len; |
daniele | 0:c3b9517c3c53 | 27 | char txt[0]; |
daniele | 0:c3b9517c3c53 | 28 | }; |
daniele | 0:c3b9517c3c53 | 29 | |
daniele | 0:c3b9517c3c53 | 30 | |
daniele | 0:c3b9517c3c53 | 31 | void zmq_send(struct pico_socket *s, char *txt, int len) |
daniele | 0:c3b9517c3c53 | 32 | { |
daniele | 0:c3b9517c3c53 | 33 | struct zmq_msg msg; |
tass | 4:6158f706297f | 34 | msg.flags = 4; |
daniele | 0:c3b9517c3c53 | 35 | msg.len = (uint8_t) len; |
daniele | 0:c3b9517c3c53 | 36 | memcpy(msg.txt, txt, len); |
daniele | 0:c3b9517c3c53 | 37 | pico_socket_write(s, &msg, len + 2); |
daniele | 0:c3b9517c3c53 | 38 | } |
daniele | 0:c3b9517c3c53 | 39 | |
daniele | 0:c3b9517c3c53 | 40 | |
daniele | 0:c3b9517c3c53 | 41 | static void hs_connected(struct pico_socket *s) |
daniele | 0:c3b9517c3c53 | 42 | { |
daniele | 1:907f8c9fa45d | 43 | uint8_t my_ver[2] = {3u, 0}; |
daniele | 0:c3b9517c3c53 | 44 | uint8_t my_signature[10] = {0xff, 0, 0, 0, 0, 0, 0, 0, 1, 0x7f}; |
daniele | 1:907f8c9fa45d | 45 | uint8_t my_greeting[52] = {'N','U','L','L', 0}; |
daniele | 0:c3b9517c3c53 | 46 | pico_socket_write(s, my_signature, 10); |
daniele | 1:907f8c9fa45d | 47 | pico_socket_write(s, my_ver, 2); |
daniele | 1:907f8c9fa45d | 48 | pico_socket_write(s, my_greeting, 52); |
daniele | 0:c3b9517c3c53 | 49 | Handshake_state = ST_SIGNATURE; |
daniele | 1:907f8c9fa45d | 50 | remaining_hs_bytes = 64; |
daniele | 1:907f8c9fa45d | 51 | conn_led = 1; |
daniele | 0:c3b9517c3c53 | 52 | } |
daniele | 0:c3b9517c3c53 | 53 | |
daniele | 0:c3b9517c3c53 | 54 | static void hs_signature(struct pico_socket *s) |
daniele | 0:c3b9517c3c53 | 55 | { |
daniele | 0:c3b9517c3c53 | 56 | uint8_t incoming[20]; |
daniele | 0:c3b9517c3c53 | 57 | int ret; |
daniele | 1:907f8c9fa45d | 58 | |
daniele | 0:c3b9517c3c53 | 59 | ret = pico_socket_read(s, incoming, 10); |
daniele | 0:c3b9517c3c53 | 60 | if (ret < 10) { |
daniele | 0:c3b9517c3c53 | 61 | printf("Received invalid signature\n"); |
daniele | 0:c3b9517c3c53 | 62 | pico_socket_close(s); |
daniele | 0:c3b9517c3c53 | 63 | Handshake_state = ST_LISTEN; |
daniele | 1:907f8c9fa45d | 64 | conn_led = 0; |
daniele | 0:c3b9517c3c53 | 65 | return; |
daniele | 0:c3b9517c3c53 | 66 | } |
daniele | 0:c3b9517c3c53 | 67 | if (incoming[0] != 0xFF) { |
daniele | 0:c3b9517c3c53 | 68 | printf("Received invalid signature\n"); |
daniele | 0:c3b9517c3c53 | 69 | pico_socket_close(s); |
daniele | 0:c3b9517c3c53 | 70 | Handshake_state = ST_LISTEN; |
daniele | 1:907f8c9fa45d | 71 | conn_led = 0; |
daniele | 0:c3b9517c3c53 | 72 | return; |
daniele | 0:c3b9517c3c53 | 73 | } |
daniele | 0:c3b9517c3c53 | 74 | printf("Valid signature received. len = %d, first byte: %02x\n", ret, incoming[0]); |
daniele | 1:907f8c9fa45d | 75 | remaining_hs_bytes -= ret; |
daniele | 0:c3b9517c3c53 | 76 | Handshake_state = ST_VERSION; |
daniele | 0:c3b9517c3c53 | 77 | } |
daniele | 0:c3b9517c3c53 | 78 | |
daniele | 0:c3b9517c3c53 | 79 | static void hs_version(struct pico_socket *s) |
daniele | 0:c3b9517c3c53 | 80 | { |
daniele | 0:c3b9517c3c53 | 81 | uint8_t incoming[20]; |
daniele | 0:c3b9517c3c53 | 82 | int ret; |
daniele | 0:c3b9517c3c53 | 83 | ret = pico_socket_read(s, incoming, 2); |
daniele | 1:907f8c9fa45d | 84 | if (ret < 0) { |
daniele | 1:907f8c9fa45d | 85 | printf("Cannot exchange valid version information. Read returned -1\n"); |
daniele | 0:c3b9517c3c53 | 86 | pico_socket_close(s); |
daniele | 0:c3b9517c3c53 | 87 | Handshake_state = ST_LISTEN; |
daniele | 1:907f8c9fa45d | 88 | conn_led = 0; |
daniele | 0:c3b9517c3c53 | 89 | return; |
daniele | 0:c3b9517c3c53 | 90 | } |
daniele | 1:907f8c9fa45d | 91 | if (ret == 0) |
daniele | 1:907f8c9fa45d | 92 | return; |
daniele | 1:907f8c9fa45d | 93 | |
daniele | 1:907f8c9fa45d | 94 | remaining_hs_bytes -= ret; |
daniele | 0:c3b9517c3c53 | 95 | if (incoming[0] != 3) { |
daniele | 0:c3b9517c3c53 | 96 | printf("Version %d.x not supported by this publisher\n", incoming[0]); |
daniele | 0:c3b9517c3c53 | 97 | pico_socket_close(s); |
daniele | 0:c3b9517c3c53 | 98 | Handshake_state = ST_LISTEN; |
daniele | 1:907f8c9fa45d | 99 | conn_led = 0; |
daniele | 0:c3b9517c3c53 | 100 | return; |
daniele | 0:c3b9517c3c53 | 101 | } |
daniele | 1:907f8c9fa45d | 102 | printf("Subscriber is using version 3. Good!\n"); |
daniele | 0:c3b9517c3c53 | 103 | Handshake_state = ST_GREETING; |
daniele | 0:c3b9517c3c53 | 104 | } |
daniele | 0:c3b9517c3c53 | 105 | |
daniele | 0:c3b9517c3c53 | 106 | static void hs_greeting(struct pico_socket *s) |
daniele | 0:c3b9517c3c53 | 107 | { |
daniele | 1:907f8c9fa45d | 108 | uint8_t incoming[64]; |
daniele | 0:c3b9517c3c53 | 109 | int ret; |
daniele | 1:907f8c9fa45d | 110 | ret = pico_socket_read(s, incoming, 64); |
daniele | 1:907f8c9fa45d | 111 | printf("pico_socket_read in greeting returned %d\n", ret); |
daniele | 1:907f8c9fa45d | 112 | if (ret == 0) |
daniele | 1:907f8c9fa45d | 113 | return; |
daniele | 1:907f8c9fa45d | 114 | if (ret < 0) { |
daniele | 0:c3b9517c3c53 | 115 | printf("Cannot retrieve valid greeting\n"); |
daniele | 0:c3b9517c3c53 | 116 | pico_socket_close(s); |
daniele | 0:c3b9517c3c53 | 117 | Handshake_state = ST_LISTEN; |
daniele | 1:907f8c9fa45d | 118 | conn_led = 0; |
daniele | 0:c3b9517c3c53 | 119 | return; |
daniele | 0:c3b9517c3c53 | 120 | } |
daniele | 1:907f8c9fa45d | 121 | printf("Paired. Sending Ready.\n"); |
daniele | 1:907f8c9fa45d | 122 | Handshake_state = ST_RDY; |
daniele | 0:c3b9517c3c53 | 123 | zmq_send(s, "READY ", 8); |
daniele | 1:907f8c9fa45d | 124 | |
daniele | 0:c3b9517c3c53 | 125 | } |
daniele | 0:c3b9517c3c53 | 126 | |
daniele | 0:c3b9517c3c53 | 127 | static void hs_rdy(struct pico_socket *s) |
daniele | 0:c3b9517c3c53 | 128 | { |
daniele | 0:c3b9517c3c53 | 129 | int ret; |
daniele | 0:c3b9517c3c53 | 130 | uint8_t incoming[258]; |
daniele | 1:907f8c9fa45d | 131 | ret = pico_socket_read(s, incoming, 258); |
daniele | 0:c3b9517c3c53 | 132 | printf("Got %d bytes from subscriber whilst in rdy state.\n", ret); |
daniele | 0:c3b9517c3c53 | 133 | } |
daniele | 0:c3b9517c3c53 | 134 | |
daniele | 0:c3b9517c3c53 | 135 | static void(*hs_cb[])(struct pico_socket *) = { |
daniele | 0:c3b9517c3c53 | 136 | NULL, |
daniele | 0:c3b9517c3c53 | 137 | hs_connected, |
daniele | 0:c3b9517c3c53 | 138 | hs_signature, |
daniele | 0:c3b9517c3c53 | 139 | hs_version, |
daniele | 0:c3b9517c3c53 | 140 | hs_greeting, |
daniele | 0:c3b9517c3c53 | 141 | hs_rdy |
daniele | 0:c3b9517c3c53 | 142 | }; |
daniele | 0:c3b9517c3c53 | 143 | |
daniele | 0:c3b9517c3c53 | 144 | void cb_tcp0mq(uint16_t ev, struct pico_socket *s) |
daniele | 0:c3b9517c3c53 | 145 | { |
daniele | 0:c3b9517c3c53 | 146 | struct pico_ip4 orig; |
daniele | 0:c3b9517c3c53 | 147 | uint16_t port; |
daniele | 0:c3b9517c3c53 | 148 | char peer[30]; |
daniele | 0:c3b9517c3c53 | 149 | |
daniele | 0:c3b9517c3c53 | 150 | if (ev & PICO_SOCK_EV_RD) { |
daniele | 0:c3b9517c3c53 | 151 | if (hs_cb[Handshake_state]) |
daniele | 0:c3b9517c3c53 | 152 | hs_cb[Handshake_state](s); |
daniele | 0:c3b9517c3c53 | 153 | } |
daniele | 0:c3b9517c3c53 | 154 | |
daniele | 0:c3b9517c3c53 | 155 | if (ev & PICO_SOCK_EV_CONN) { |
daniele | 1:907f8c9fa45d | 156 | struct pico_socket *z; |
daniele | 1:907f8c9fa45d | 157 | z = pico_socket_accept(s, &orig, &port); |
daniele | 0:c3b9517c3c53 | 158 | pico_ipv4_to_string(peer, orig.addr); |
daniele | 1:907f8c9fa45d | 159 | printf("tcp0mq> Connection requested by %s:%d.\n", peer, short_be(port)); |
daniele | 1:907f8c9fa45d | 160 | if (Handshake_state == ST_LISTEN) { |
daniele | 1:907f8c9fa45d | 161 | printf("tcp0mq> Accepted connection!\n"); |
daniele | 1:907f8c9fa45d | 162 | conn_led = 1; |
daniele | 1:907f8c9fa45d | 163 | zmq_sock = z; |
daniele | 1:907f8c9fa45d | 164 | Handshake_state = ST_CONNECTED; |
daniele | 1:907f8c9fa45d | 165 | } else { |
daniele | 1:907f8c9fa45d | 166 | printf("tcp0mq> Server busy, connection rejected\n"); |
daniele | 1:907f8c9fa45d | 167 | pico_socket_close(z); |
daniele | 1:907f8c9fa45d | 168 | } |
daniele | 0:c3b9517c3c53 | 169 | } |
daniele | 0:c3b9517c3c53 | 170 | |
daniele | 0:c3b9517c3c53 | 171 | if (ev & PICO_SOCK_EV_FIN) { |
daniele | 0:c3b9517c3c53 | 172 | printf("tcp0mq> Connection closed.\n"); |
daniele | 0:c3b9517c3c53 | 173 | Handshake_state = ST_LISTEN; |
daniele | 1:907f8c9fa45d | 174 | conn_led = 0; |
daniele | 0:c3b9517c3c53 | 175 | } |
daniele | 0:c3b9517c3c53 | 176 | |
daniele | 0:c3b9517c3c53 | 177 | if (ev & PICO_SOCK_EV_ERR) { |
daniele | 0:c3b9517c3c53 | 178 | printf("tcp0mq> Socket Error received: %s. Bailing out.\n", strerror(pico_err)); |
daniele | 0:c3b9517c3c53 | 179 | printf("tcp0mq> Connection closed.\n"); |
daniele | 0:c3b9517c3c53 | 180 | Handshake_state = ST_LISTEN; |
daniele | 1:907f8c9fa45d | 181 | conn_led = 0; |
daniele | 0:c3b9517c3c53 | 182 | } |
daniele | 0:c3b9517c3c53 | 183 | |
daniele | 0:c3b9517c3c53 | 184 | if (ev & PICO_SOCK_EV_CLOSE) { |
daniele | 0:c3b9517c3c53 | 185 | printf("tcp0mq> event close\n"); |
daniele | 0:c3b9517c3c53 | 186 | pico_socket_close(s); |
daniele | 0:c3b9517c3c53 | 187 | Handshake_state = ST_LISTEN; |
daniele | 1:907f8c9fa45d | 188 | conn_led = 0; |
daniele | 0:c3b9517c3c53 | 189 | } |
daniele | 0:c3b9517c3c53 | 190 | |
daniele | 0:c3b9517c3c53 | 191 | if (ev & PICO_SOCK_EV_WR) { |
daniele | 0:c3b9517c3c53 | 192 | /* TODO: manage pending data */ |
daniele | 0:c3b9517c3c53 | 193 | } |
daniele | 0:c3b9517c3c53 | 194 | } |
daniele | 0:c3b9517c3c53 | 195 | |
daniele | 0:c3b9517c3c53 | 196 | |
daniele | 0:c3b9517c3c53 | 197 | |
daniele | 0:c3b9517c3c53 | 198 | int main() { |
daniele | 0:c3b9517c3c53 | 199 | int counter = 0; |
daniele | 0:c3b9517c3c53 | 200 | EthernetInterface eth; |
daniele | 0:c3b9517c3c53 | 201 | eth.init(); //Use DHCP |
daniele | 0:c3b9517c3c53 | 202 | eth.connect(); |
daniele | 0:c3b9517c3c53 | 203 | pico_stack_init(); |
daniele | 0:c3b9517c3c53 | 204 | |
daniele | 0:c3b9517c3c53 | 205 | struct pico_socket *s; |
daniele | 0:c3b9517c3c53 | 206 | struct pico_ip4 server_addr; |
daniele | 0:c3b9517c3c53 | 207 | uint16_t port = short_be(9000); |
daniele | 0:c3b9517c3c53 | 208 | struct pico_ip4 inaddr_any = {0}; |
daniele | 0:c3b9517c3c53 | 209 | |
daniele | 0:c3b9517c3c53 | 210 | s = pico_socket_open(PICO_PROTO_IPV4, PICO_PROTO_TCP, &cb_tcp0mq); |
daniele | 0:c3b9517c3c53 | 211 | if (!s) |
daniele | 0:c3b9517c3c53 | 212 | while(1);; |
daniele | 0:c3b9517c3c53 | 213 | |
daniele | 0:c3b9517c3c53 | 214 | printf("tcp0mq> BIND\n"); |
daniele | 0:c3b9517c3c53 | 215 | if (pico_socket_bind(s, &inaddr_any, &port)!= 0) { |
daniele | 0:c3b9517c3c53 | 216 | printf("tcp0mq> BIND failed because %s\n", strerror(pico_err)); |
daniele | 0:c3b9517c3c53 | 217 | while(1);; |
daniele | 0:c3b9517c3c53 | 218 | } |
daniele | 0:c3b9517c3c53 | 219 | |
daniele | 0:c3b9517c3c53 | 220 | printf("tcp0mq> LISTEN\n"); |
daniele | 0:c3b9517c3c53 | 221 | if (pico_socket_listen(s, 40) != 0) |
daniele | 0:c3b9517c3c53 | 222 | while(1);; |
daniele | 0:c3b9517c3c53 | 223 | printf("tcp0mq> listening port %u ...\n",short_be(port)); |
daniele | 0:c3b9517c3c53 | 224 | |
daniele | 0:c3b9517c3c53 | 225 | while(1) { |
daniele | 0:c3b9517c3c53 | 226 | pico_stack_tick(); |
daniele | 0:c3b9517c3c53 | 227 | wait(0.001); |
daniele | 1:907f8c9fa45d | 228 | if (zmq_sock && Handshake_state > ST_LISTEN && Handshake_state < ST_RDY) { |
daniele | 1:907f8c9fa45d | 229 | if (hs_cb[Handshake_state]) |
daniele | 1:907f8c9fa45d | 230 | hs_cb[Handshake_state](zmq_sock); |
daniele | 1:907f8c9fa45d | 231 | } |
daniele | 0:c3b9517c3c53 | 232 | if((counter++ > 500) && (Handshake_state == ST_RDY)) { |
daniele | 1:907f8c9fa45d | 233 | zmq_send(zmq_sock, "HELLO WORLD", 11); |
daniele | 0:c3b9517c3c53 | 234 | counter = 0; |
daniele | 0:c3b9517c3c53 | 235 | myled = !myled; |
daniele | 0:c3b9517c3c53 | 236 | } |
daniele | 0:c3b9517c3c53 | 237 | } |
daniele | 0:c3b9517c3c53 | 238 | } |