ZeroMQ publisher demo application running on LPC1768 and PicoTCP. GPL v2

Dependencies:   PicoTCP lpc1768-picotcp-eth-polling mbed-rtos mbed

Committer:
daniele
Date:
Mon Jun 24 09:27:50 2013 +0000
Revision:
1:907f8c9fa45d
Parent:
0:c3b9517c3c53
Child:
4:6158f706297f
Latest version, using polling device driver;

Who changed what in which revision?

UserRevisionLine numberNew contents of line
daniele 0:c3b9517c3c53 1 /* PicoTCP ZeroMQ Test Publisher
daniele 0:c3b9517c3c53 2 * Copyright 2013 - Daniele Lacamera
daniele 0:c3b9517c3c53 3 *
daniele 0:c3b9517c3c53 4 * GPL v2
daniele 0:c3b9517c3c53 5 */
daniele 0:c3b9517c3c53 6
daniele 0:c3b9517c3c53 7 #include "mbed.h"
daniele 0:c3b9517c3c53 8 #include "EthernetInterface.h"
daniele 0:c3b9517c3c53 9
daniele 0:c3b9517c3c53 10 DigitalOut myled(LED1);
daniele 1:907f8c9fa45d 11 DigitalOut conn_led(LED2);
daniele 0:c3b9517c3c53 12 static struct pico_socket *zmq_sock = NULL;
daniele 1:907f8c9fa45d 13 int remaining_hs_bytes = 64;
daniele 0:c3b9517c3c53 14
daniele 1:907f8c9fa45d 15 volatile enum zmq_hshake_state {
daniele 0:c3b9517c3c53 16 ST_LISTEN = 0,
daniele 0:c3b9517c3c53 17 ST_CONNECTED,
daniele 0:c3b9517c3c53 18 ST_SIGNATURE,
daniele 0:c3b9517c3c53 19 ST_VERSION,
daniele 0:c3b9517c3c53 20 ST_GREETING,
daniele 0:c3b9517c3c53 21 ST_RDY
daniele 0:c3b9517c3c53 22 } Handshake_state = ST_LISTEN;
daniele 0:c3b9517c3c53 23
daniele 1:907f8c9fa45d 24 struct __attribute__((packed)) zmq_msg {
daniele 0:c3b9517c3c53 25 uint8_t flags;
daniele 0:c3b9517c3c53 26 uint8_t len;
daniele 0:c3b9517c3c53 27 char txt[0];
daniele 0:c3b9517c3c53 28 };
daniele 0:c3b9517c3c53 29
daniele 0:c3b9517c3c53 30
daniele 0:c3b9517c3c53 31 void zmq_send(struct pico_socket *s, char *txt, int len)
daniele 0:c3b9517c3c53 32 {
daniele 0:c3b9517c3c53 33 struct zmq_msg msg;
daniele 0:c3b9517c3c53 34 msg.flags = 0;
daniele 0:c3b9517c3c53 35 msg.len = (uint8_t) len;
daniele 0:c3b9517c3c53 36 memcpy(msg.txt, txt, len);
daniele 0:c3b9517c3c53 37 pico_socket_write(s, &msg, len + 2);
daniele 0:c3b9517c3c53 38 }
daniele 0:c3b9517c3c53 39
daniele 0:c3b9517c3c53 40
daniele 0:c3b9517c3c53 41 static void hs_connected(struct pico_socket *s)
daniele 0:c3b9517c3c53 42 {
daniele 1:907f8c9fa45d 43 uint8_t my_ver[2] = {3u, 0};
daniele 0:c3b9517c3c53 44 uint8_t my_signature[10] = {0xff, 0, 0, 0, 0, 0, 0, 0, 1, 0x7f};
daniele 1:907f8c9fa45d 45 uint8_t my_greeting[52] = {'N','U','L','L', 0};
daniele 0:c3b9517c3c53 46 pico_socket_write(s, my_signature, 10);
daniele 1:907f8c9fa45d 47 pico_socket_write(s, my_ver, 2);
daniele 1:907f8c9fa45d 48 pico_socket_write(s, my_greeting, 52);
daniele 0:c3b9517c3c53 49 Handshake_state = ST_SIGNATURE;
daniele 1:907f8c9fa45d 50 remaining_hs_bytes = 64;
daniele 1:907f8c9fa45d 51 conn_led = 1;
daniele 0:c3b9517c3c53 52 }
daniele 0:c3b9517c3c53 53
daniele 0:c3b9517c3c53 54 static void hs_signature(struct pico_socket *s)
daniele 0:c3b9517c3c53 55 {
daniele 0:c3b9517c3c53 56 uint8_t incoming[20];
daniele 0:c3b9517c3c53 57 int ret;
daniele 1:907f8c9fa45d 58
daniele 0:c3b9517c3c53 59 ret = pico_socket_read(s, incoming, 10);
daniele 0:c3b9517c3c53 60 if (ret < 10) {
daniele 0:c3b9517c3c53 61 printf("Received invalid signature\n");
daniele 0:c3b9517c3c53 62 pico_socket_close(s);
daniele 0:c3b9517c3c53 63 Handshake_state = ST_LISTEN;
daniele 1:907f8c9fa45d 64 conn_led = 0;
daniele 0:c3b9517c3c53 65 return;
daniele 0:c3b9517c3c53 66 }
daniele 0:c3b9517c3c53 67 if (incoming[0] != 0xFF) {
daniele 0:c3b9517c3c53 68 printf("Received invalid signature\n");
daniele 0:c3b9517c3c53 69 pico_socket_close(s);
daniele 0:c3b9517c3c53 70 Handshake_state = ST_LISTEN;
daniele 1:907f8c9fa45d 71 conn_led = 0;
daniele 0:c3b9517c3c53 72 return;
daniele 0:c3b9517c3c53 73 }
daniele 0:c3b9517c3c53 74 printf("Valid signature received. len = %d, first byte: %02x\n", ret, incoming[0]);
daniele 1:907f8c9fa45d 75 remaining_hs_bytes -= ret;
daniele 0:c3b9517c3c53 76 Handshake_state = ST_VERSION;
daniele 0:c3b9517c3c53 77 }
daniele 0:c3b9517c3c53 78
daniele 0:c3b9517c3c53 79 static void hs_version(struct pico_socket *s)
daniele 0:c3b9517c3c53 80 {
daniele 0:c3b9517c3c53 81 uint8_t incoming[20];
daniele 0:c3b9517c3c53 82 int ret;
daniele 0:c3b9517c3c53 83 ret = pico_socket_read(s, incoming, 2);
daniele 1:907f8c9fa45d 84 if (ret < 0) {
daniele 1:907f8c9fa45d 85 printf("Cannot exchange valid version information. Read returned -1\n");
daniele 0:c3b9517c3c53 86 pico_socket_close(s);
daniele 0:c3b9517c3c53 87 Handshake_state = ST_LISTEN;
daniele 1:907f8c9fa45d 88 conn_led = 0;
daniele 0:c3b9517c3c53 89 return;
daniele 0:c3b9517c3c53 90 }
daniele 1:907f8c9fa45d 91 if (ret == 0)
daniele 1:907f8c9fa45d 92 return;
daniele 1:907f8c9fa45d 93
daniele 1:907f8c9fa45d 94 remaining_hs_bytes -= ret;
daniele 0:c3b9517c3c53 95 if (incoming[0] != 3) {
daniele 0:c3b9517c3c53 96 printf("Version %d.x not supported by this publisher\n", incoming[0]);
daniele 0:c3b9517c3c53 97 pico_socket_close(s);
daniele 0:c3b9517c3c53 98 Handshake_state = ST_LISTEN;
daniele 1:907f8c9fa45d 99 conn_led = 0;
daniele 0:c3b9517c3c53 100 return;
daniele 0:c3b9517c3c53 101 }
daniele 1:907f8c9fa45d 102 printf("Subscriber is using version 3. Good!\n");
daniele 0:c3b9517c3c53 103 Handshake_state = ST_GREETING;
daniele 0:c3b9517c3c53 104 }
daniele 0:c3b9517c3c53 105
daniele 0:c3b9517c3c53 106 static void hs_greeting(struct pico_socket *s)
daniele 0:c3b9517c3c53 107 {
daniele 1:907f8c9fa45d 108 uint8_t incoming[64];
daniele 0:c3b9517c3c53 109 int ret;
daniele 1:907f8c9fa45d 110 ret = pico_socket_read(s, incoming, 64);
daniele 1:907f8c9fa45d 111 printf("pico_socket_read in greeting returned %d\n", ret);
daniele 1:907f8c9fa45d 112 if (ret == 0)
daniele 1:907f8c9fa45d 113 return;
daniele 1:907f8c9fa45d 114 if (ret < 0) {
daniele 0:c3b9517c3c53 115 printf("Cannot retrieve valid greeting\n");
daniele 0:c3b9517c3c53 116 pico_socket_close(s);
daniele 0:c3b9517c3c53 117 Handshake_state = ST_LISTEN;
daniele 1:907f8c9fa45d 118 conn_led = 0;
daniele 0:c3b9517c3c53 119 return;
daniele 0:c3b9517c3c53 120 }
daniele 1:907f8c9fa45d 121 printf("Paired. Sending Ready.\n");
daniele 1:907f8c9fa45d 122 Handshake_state = ST_RDY;
daniele 0:c3b9517c3c53 123 zmq_send(s, "READY ", 8);
daniele 1:907f8c9fa45d 124
daniele 0:c3b9517c3c53 125 }
daniele 0:c3b9517c3c53 126
daniele 0:c3b9517c3c53 127 static void hs_rdy(struct pico_socket *s)
daniele 0:c3b9517c3c53 128 {
daniele 0:c3b9517c3c53 129 int ret;
daniele 0:c3b9517c3c53 130 uint8_t incoming[258];
daniele 1:907f8c9fa45d 131 ret = pico_socket_read(s, incoming, 258);
daniele 0:c3b9517c3c53 132 printf("Got %d bytes from subscriber whilst in rdy state.\n", ret);
daniele 0:c3b9517c3c53 133 }
daniele 0:c3b9517c3c53 134
daniele 0:c3b9517c3c53 135 static void(*hs_cb[])(struct pico_socket *) = {
daniele 0:c3b9517c3c53 136 NULL,
daniele 0:c3b9517c3c53 137 hs_connected,
daniele 0:c3b9517c3c53 138 hs_signature,
daniele 0:c3b9517c3c53 139 hs_version,
daniele 0:c3b9517c3c53 140 hs_greeting,
daniele 0:c3b9517c3c53 141 hs_rdy
daniele 0:c3b9517c3c53 142 };
daniele 0:c3b9517c3c53 143
daniele 0:c3b9517c3c53 144 void cb_tcp0mq(uint16_t ev, struct pico_socket *s)
daniele 0:c3b9517c3c53 145 {
daniele 0:c3b9517c3c53 146 struct pico_ip4 orig;
daniele 0:c3b9517c3c53 147 uint16_t port;
daniele 0:c3b9517c3c53 148 char peer[30];
daniele 0:c3b9517c3c53 149
daniele 0:c3b9517c3c53 150 if (ev & PICO_SOCK_EV_RD) {
daniele 0:c3b9517c3c53 151 if (hs_cb[Handshake_state])
daniele 0:c3b9517c3c53 152 hs_cb[Handshake_state](s);
daniele 0:c3b9517c3c53 153 }
daniele 0:c3b9517c3c53 154
daniele 0:c3b9517c3c53 155 if (ev & PICO_SOCK_EV_CONN) {
daniele 1:907f8c9fa45d 156 struct pico_socket *z;
daniele 1:907f8c9fa45d 157 z = pico_socket_accept(s, &orig, &port);
daniele 0:c3b9517c3c53 158 pico_ipv4_to_string(peer, orig.addr);
daniele 1:907f8c9fa45d 159 printf("tcp0mq> Connection requested by %s:%d.\n", peer, short_be(port));
daniele 1:907f8c9fa45d 160 if (Handshake_state == ST_LISTEN) {
daniele 1:907f8c9fa45d 161 printf("tcp0mq> Accepted connection!\n");
daniele 1:907f8c9fa45d 162 conn_led = 1;
daniele 1:907f8c9fa45d 163 zmq_sock = z;
daniele 1:907f8c9fa45d 164 Handshake_state = ST_CONNECTED;
daniele 1:907f8c9fa45d 165 } else {
daniele 1:907f8c9fa45d 166 printf("tcp0mq> Server busy, connection rejected\n");
daniele 1:907f8c9fa45d 167 pico_socket_close(z);
daniele 1:907f8c9fa45d 168 }
daniele 0:c3b9517c3c53 169 }
daniele 0:c3b9517c3c53 170
daniele 0:c3b9517c3c53 171 if (ev & PICO_SOCK_EV_FIN) {
daniele 0:c3b9517c3c53 172 printf("tcp0mq> Connection closed.\n");
daniele 0:c3b9517c3c53 173 Handshake_state = ST_LISTEN;
daniele 1:907f8c9fa45d 174 conn_led = 0;
daniele 0:c3b9517c3c53 175 }
daniele 0:c3b9517c3c53 176
daniele 0:c3b9517c3c53 177 if (ev & PICO_SOCK_EV_ERR) {
daniele 0:c3b9517c3c53 178 printf("tcp0mq> Socket Error received: %s. Bailing out.\n", strerror(pico_err));
daniele 0:c3b9517c3c53 179 printf("tcp0mq> Connection closed.\n");
daniele 0:c3b9517c3c53 180 Handshake_state = ST_LISTEN;
daniele 1:907f8c9fa45d 181 conn_led = 0;
daniele 0:c3b9517c3c53 182 }
daniele 0:c3b9517c3c53 183
daniele 0:c3b9517c3c53 184 if (ev & PICO_SOCK_EV_CLOSE) {
daniele 0:c3b9517c3c53 185 printf("tcp0mq> event close\n");
daniele 0:c3b9517c3c53 186 pico_socket_close(s);
daniele 0:c3b9517c3c53 187 Handshake_state = ST_LISTEN;
daniele 1:907f8c9fa45d 188 conn_led = 0;
daniele 0:c3b9517c3c53 189 }
daniele 0:c3b9517c3c53 190
daniele 0:c3b9517c3c53 191 if (ev & PICO_SOCK_EV_WR) {
daniele 0:c3b9517c3c53 192 /* TODO: manage pending data */
daniele 0:c3b9517c3c53 193 }
daniele 0:c3b9517c3c53 194 }
daniele 0:c3b9517c3c53 195
daniele 0:c3b9517c3c53 196
daniele 0:c3b9517c3c53 197
daniele 0:c3b9517c3c53 198 int main() {
daniele 0:c3b9517c3c53 199 int counter = 0;
daniele 0:c3b9517c3c53 200 EthernetInterface eth;
daniele 0:c3b9517c3c53 201 eth.init(); //Use DHCP
daniele 0:c3b9517c3c53 202 eth.connect();
daniele 0:c3b9517c3c53 203 pico_stack_init();
daniele 0:c3b9517c3c53 204
daniele 0:c3b9517c3c53 205 struct pico_socket *s;
daniele 0:c3b9517c3c53 206 struct pico_ip4 server_addr;
daniele 0:c3b9517c3c53 207 uint16_t port = short_be(9000);
daniele 0:c3b9517c3c53 208 struct pico_ip4 inaddr_any = {0};
daniele 0:c3b9517c3c53 209
daniele 0:c3b9517c3c53 210 s = pico_socket_open(PICO_PROTO_IPV4, PICO_PROTO_TCP, &cb_tcp0mq);
daniele 0:c3b9517c3c53 211 if (!s)
daniele 0:c3b9517c3c53 212 while(1);;
daniele 0:c3b9517c3c53 213
daniele 0:c3b9517c3c53 214 printf("tcp0mq> BIND\n");
daniele 0:c3b9517c3c53 215 if (pico_socket_bind(s, &inaddr_any, &port)!= 0) {
daniele 0:c3b9517c3c53 216 printf("tcp0mq> BIND failed because %s\n", strerror(pico_err));
daniele 0:c3b9517c3c53 217 while(1);;
daniele 0:c3b9517c3c53 218 }
daniele 0:c3b9517c3c53 219
daniele 0:c3b9517c3c53 220 printf("tcp0mq> LISTEN\n");
daniele 0:c3b9517c3c53 221 if (pico_socket_listen(s, 40) != 0)
daniele 0:c3b9517c3c53 222 while(1);;
daniele 0:c3b9517c3c53 223 printf("tcp0mq> listening port %u ...\n",short_be(port));
daniele 0:c3b9517c3c53 224
daniele 0:c3b9517c3c53 225 while(1) {
daniele 0:c3b9517c3c53 226 pico_stack_tick();
daniele 0:c3b9517c3c53 227 wait(0.001);
daniele 1:907f8c9fa45d 228 if (zmq_sock && Handshake_state > ST_LISTEN && Handshake_state < ST_RDY) {
daniele 1:907f8c9fa45d 229 if (hs_cb[Handshake_state])
daniele 1:907f8c9fa45d 230 hs_cb[Handshake_state](zmq_sock);
daniele 1:907f8c9fa45d 231 }
daniele 0:c3b9517c3c53 232 if((counter++ > 500) && (Handshake_state == ST_RDY)) {
daniele 1:907f8c9fa45d 233 zmq_send(zmq_sock, "HELLO WORLD", 11);
daniele 0:c3b9517c3c53 234 counter = 0;
daniele 0:c3b9517c3c53 235 myled = !myled;
daniele 0:c3b9517c3c53 236 }
daniele 0:c3b9517c3c53 237 }
daniele 0:c3b9517c3c53 238 }