ZeroMQ publisher demo application running on LPC1768 and PicoTCP. GPL v2
Dependencies: PicoTCP lpc1768-picotcp-eth-polling mbed-rtos mbed
Revision 1:907f8c9fa45d, committed 2013-06-24
- Comitter:
- daniele
- Date:
- Mon Jun 24 09:27:50 2013 +0000
- Parent:
- 0:c3b9517c3c53
- Child:
- 2:2c09e6ff8860
- Commit message:
- Latest version, using polling device driver;
Changed in this revision
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lpc1768-picotcp-eth-polling.lib Mon Jun 24 09:27:50 2013 +0000 @@ -0,0 +1,1 @@ +https://mbed.org/users/daniele/code/lpc1768-picotcp-eth-polling/#d6222c305d94
--- a/lpc1768-picotcp-eth.lib Sat Jun 22 14:51:37 2013 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,1 +0,0 @@ -http://mbed.org/users/daniele/code/lpc1768-picotcp-eth-polling/#260ff19dafef
--- a/main.cpp Sat Jun 22 14:51:37 2013 +0000
+++ b/main.cpp Mon Jun 24 09:27:50 2013 +0000
@@ -8,9 +8,11 @@
#include "EthernetInterface.h"
DigitalOut myled(LED1);
+DigitalOut conn_led(LED2);
static struct pico_socket *zmq_sock = NULL;
+int remaining_hs_bytes = 64;
-enum zmq_hshake_state {
+volatile enum zmq_hshake_state {
ST_LISTEN = 0,
ST_CONNECTED,
ST_SIGNATURE,
@@ -19,7 +21,7 @@
ST_RDY
} Handshake_state = ST_LISTEN;
-struct zmq_msg {
+struct __attribute__((packed)) zmq_msg {
uint8_t flags;
uint8_t len;
char txt[0];
@@ -38,31 +40,39 @@
static void hs_connected(struct pico_socket *s)
{
+ uint8_t my_ver[2] = {3u, 0};
uint8_t my_signature[10] = {0xff, 0, 0, 0, 0, 0, 0, 0, 1, 0x7f};
+ uint8_t my_greeting[52] = {'N','U','L','L', 0};
pico_socket_write(s, my_signature, 10);
+ pico_socket_write(s, my_ver, 2);
+ pico_socket_write(s, my_greeting, 52);
Handshake_state = ST_SIGNATURE;
+ remaining_hs_bytes = 64;
+ conn_led = 1;
}
static void hs_signature(struct pico_socket *s)
{
uint8_t incoming[20];
int ret;
- uint8_t my_ver[2] = {3u, 0};
+
ret = pico_socket_read(s, incoming, 10);
if (ret < 10) {
printf("Received invalid signature\n");
pico_socket_close(s);
Handshake_state = ST_LISTEN;
+ conn_led = 0;
return;
}
if (incoming[0] != 0xFF) {
printf("Received invalid signature\n");
pico_socket_close(s);
Handshake_state = ST_LISTEN;
+ conn_led = 0;
return;
}
printf("Valid signature received. len = %d, first byte: %02x\n", ret, incoming[0]);
- pico_socket_write(s, my_ver, 2);
+ remaining_hs_bytes -= ret;
Handshake_state = ST_VERSION;
}
@@ -70,45 +80,55 @@
{
uint8_t incoming[20];
int ret;
- uint8_t my_greeting[53] = {'N','U','L','L', 0};
ret = pico_socket_read(s, incoming, 2);
- if (ret < 1) {
- printf("Cannot exchange valid version information. Read returned %d, expected at least one byte.\n", ret);
+ if (ret < 0) {
+ printf("Cannot exchange valid version information. Read returned -1\n");
pico_socket_close(s);
Handshake_state = ST_LISTEN;
+ conn_led = 0;
return;
}
+ if (ret == 0)
+ return;
+
+ remaining_hs_bytes -= ret;
if (incoming[0] != 3) {
printf("Version %d.x not supported by this publisher\n", incoming[0]);
pico_socket_close(s);
Handshake_state = ST_LISTEN;
+ conn_led = 0;
return;
}
- pico_socket_write(s, my_greeting, 53);
+ printf("Subscriber is using version 3. Good!\n");
Handshake_state = ST_GREETING;
}
static void hs_greeting(struct pico_socket *s)
{
- uint8_t incoming[53];
+ uint8_t incoming[64];
int ret;
- uint8_t my_rdy[8] = {'R','E','A','D','Y',' ',' ',' '};
- ret = pico_socket_read(s, incoming, 53);
- if (ret < 53) {
+ ret = pico_socket_read(s, incoming, 64);
+ printf("pico_socket_read in greeting returned %d\n", ret);
+ if (ret == 0)
+ return;
+ if (ret < 0) {
printf("Cannot retrieve valid greeting\n");
pico_socket_close(s);
Handshake_state = ST_LISTEN;
+ conn_led = 0;
return;
}
+ printf("Paired. Sending Ready.\n");
+ Handshake_state = ST_RDY;
zmq_send(s, "READY ", 8);
- Handshake_state = ST_RDY;
+
}
static void hs_rdy(struct pico_socket *s)
{
int ret;
uint8_t incoming[258];
- pico_socket_read(s, incoming, 258);
+ ret = pico_socket_read(s, incoming, 258);
printf("Got %d bytes from subscriber whilst in rdy state.\n", ret);
}
@@ -133,27 +153,39 @@
}
if (ev & PICO_SOCK_EV_CONN) {
- zmq_sock = pico_socket_accept(s, &orig, &port);
+ struct pico_socket *z;
+ z = pico_socket_accept(s, &orig, &port);
pico_ipv4_to_string(peer, orig.addr);
- printf("tcp0mq> Connection established with %s:%d.\n", peer, short_be(port));
- Handshake_state = ST_CONNECTED;
+ printf("tcp0mq> Connection requested by %s:%d.\n", peer, short_be(port));
+ if (Handshake_state == ST_LISTEN) {
+ printf("tcp0mq> Accepted connection!\n");
+ conn_led = 1;
+ zmq_sock = z;
+ Handshake_state = ST_CONNECTED;
+ } else {
+ printf("tcp0mq> Server busy, connection rejected\n");
+ pico_socket_close(z);
+ }
}
if (ev & PICO_SOCK_EV_FIN) {
printf("tcp0mq> Connection closed.\n");
Handshake_state = ST_LISTEN;
+ conn_led = 0;
}
if (ev & PICO_SOCK_EV_ERR) {
printf("tcp0mq> Socket Error received: %s. Bailing out.\n", strerror(pico_err));
printf("tcp0mq> Connection closed.\n");
Handshake_state = ST_LISTEN;
+ conn_led = 0;
}
if (ev & PICO_SOCK_EV_CLOSE) {
printf("tcp0mq> event close\n");
pico_socket_close(s);
Handshake_state = ST_LISTEN;
+ conn_led = 0;
}
if (ev & PICO_SOCK_EV_WR) {
@@ -193,8 +225,12 @@
while(1) {
pico_stack_tick();
wait(0.001);
+ if (zmq_sock && Handshake_state > ST_LISTEN && Handshake_state < ST_RDY) {
+ if (hs_cb[Handshake_state])
+ hs_cb[Handshake_state](zmq_sock);
+ }
if((counter++ > 500) && (Handshake_state == ST_RDY)) {
- zmq_send(zmq_sock, "HELLO WORLD", 10);
+ zmq_send(zmq_sock, "HELLO WORLD", 11);
counter = 0;
myled = !myled;
}

