ZeroMQ publisher demo application running on LPC1768 and PicoTCP. GPL v2
Dependencies: PicoTCP lpc1768-picotcp-eth-polling mbed-rtos mbed
main.cpp
- Committer:
- tass
- Date:
- 2013-06-26
- Revision:
- 4:6158f706297f
- Parent:
- 1:907f8c9fa45d
File content as of revision 4:6158f706297f:
/* PicoTCP ZeroMQ Test Publisher
* Copyright 2013 - Daniele Lacamera
*
* GPL v2
*/
#include "mbed.h"
#include "EthernetInterface.h"
DigitalOut myled(LED1);
DigitalOut conn_led(LED2);
static struct pico_socket *zmq_sock = NULL;
int remaining_hs_bytes = 64;
volatile enum zmq_hshake_state {
ST_LISTEN = 0,
ST_CONNECTED,
ST_SIGNATURE,
ST_VERSION,
ST_GREETING,
ST_RDY
} Handshake_state = ST_LISTEN;
struct __attribute__((packed)) zmq_msg {
uint8_t flags;
uint8_t len;
char txt[0];
};
void zmq_send(struct pico_socket *s, char *txt, int len)
{
struct zmq_msg msg;
msg.flags = 4;
msg.len = (uint8_t) len;
memcpy(msg.txt, txt, len);
pico_socket_write(s, &msg, len + 2);
}
static void hs_connected(struct pico_socket *s)
{
uint8_t my_ver[2] = {3u, 0};
uint8_t my_signature[10] = {0xff, 0, 0, 0, 0, 0, 0, 0, 1, 0x7f};
uint8_t my_greeting[52] = {'N','U','L','L', 0};
pico_socket_write(s, my_signature, 10);
pico_socket_write(s, my_ver, 2);
pico_socket_write(s, my_greeting, 52);
Handshake_state = ST_SIGNATURE;
remaining_hs_bytes = 64;
conn_led = 1;
}
static void hs_signature(struct pico_socket *s)
{
uint8_t incoming[20];
int ret;
ret = pico_socket_read(s, incoming, 10);
if (ret < 10) {
printf("Received invalid signature\n");
pico_socket_close(s);
Handshake_state = ST_LISTEN;
conn_led = 0;
return;
}
if (incoming[0] != 0xFF) {
printf("Received invalid signature\n");
pico_socket_close(s);
Handshake_state = ST_LISTEN;
conn_led = 0;
return;
}
printf("Valid signature received. len = %d, first byte: %02x\n", ret, incoming[0]);
remaining_hs_bytes -= ret;
Handshake_state = ST_VERSION;
}
static void hs_version(struct pico_socket *s)
{
uint8_t incoming[20];
int ret;
ret = pico_socket_read(s, incoming, 2);
if (ret < 0) {
printf("Cannot exchange valid version information. Read returned -1\n");
pico_socket_close(s);
Handshake_state = ST_LISTEN;
conn_led = 0;
return;
}
if (ret == 0)
return;
remaining_hs_bytes -= ret;
if (incoming[0] != 3) {
printf("Version %d.x not supported by this publisher\n", incoming[0]);
pico_socket_close(s);
Handshake_state = ST_LISTEN;
conn_led = 0;
return;
}
printf("Subscriber is using version 3. Good!\n");
Handshake_state = ST_GREETING;
}
static void hs_greeting(struct pico_socket *s)
{
uint8_t incoming[64];
int ret;
ret = pico_socket_read(s, incoming, 64);
printf("pico_socket_read in greeting returned %d\n", ret);
if (ret == 0)
return;
if (ret < 0) {
printf("Cannot retrieve valid greeting\n");
pico_socket_close(s);
Handshake_state = ST_LISTEN;
conn_led = 0;
return;
}
printf("Paired. Sending Ready.\n");
Handshake_state = ST_RDY;
zmq_send(s, "READY ", 8);
}
static void hs_rdy(struct pico_socket *s)
{
int ret;
uint8_t incoming[258];
ret = pico_socket_read(s, incoming, 258);
printf("Got %d bytes from subscriber whilst in rdy state.\n", ret);
}
static void(*hs_cb[])(struct pico_socket *) = {
NULL,
hs_connected,
hs_signature,
hs_version,
hs_greeting,
hs_rdy
};
void cb_tcp0mq(uint16_t ev, struct pico_socket *s)
{
struct pico_ip4 orig;
uint16_t port;
char peer[30];
if (ev & PICO_SOCK_EV_RD) {
if (hs_cb[Handshake_state])
hs_cb[Handshake_state](s);
}
if (ev & PICO_SOCK_EV_CONN) {
struct pico_socket *z;
z = pico_socket_accept(s, &orig, &port);
pico_ipv4_to_string(peer, orig.addr);
printf("tcp0mq> Connection requested by %s:%d.\n", peer, short_be(port));
if (Handshake_state == ST_LISTEN) {
printf("tcp0mq> Accepted connection!\n");
conn_led = 1;
zmq_sock = z;
Handshake_state = ST_CONNECTED;
} else {
printf("tcp0mq> Server busy, connection rejected\n");
pico_socket_close(z);
}
}
if (ev & PICO_SOCK_EV_FIN) {
printf("tcp0mq> Connection closed.\n");
Handshake_state = ST_LISTEN;
conn_led = 0;
}
if (ev & PICO_SOCK_EV_ERR) {
printf("tcp0mq> Socket Error received: %s. Bailing out.\n", strerror(pico_err));
printf("tcp0mq> Connection closed.\n");
Handshake_state = ST_LISTEN;
conn_led = 0;
}
if (ev & PICO_SOCK_EV_CLOSE) {
printf("tcp0mq> event close\n");
pico_socket_close(s);
Handshake_state = ST_LISTEN;
conn_led = 0;
}
if (ev & PICO_SOCK_EV_WR) {
/* TODO: manage pending data */
}
}
int main() {
int counter = 0;
EthernetInterface eth;
eth.init(); //Use DHCP
eth.connect();
pico_stack_init();
struct pico_socket *s;
struct pico_ip4 server_addr;
uint16_t port = short_be(9000);
struct pico_ip4 inaddr_any = {0};
s = pico_socket_open(PICO_PROTO_IPV4, PICO_PROTO_TCP, &cb_tcp0mq);
if (!s)
while(1);;
printf("tcp0mq> BIND\n");
if (pico_socket_bind(s, &inaddr_any, &port)!= 0) {
printf("tcp0mq> BIND failed because %s\n", strerror(pico_err));
while(1);;
}
printf("tcp0mq> LISTEN\n");
if (pico_socket_listen(s, 40) != 0)
while(1);;
printf("tcp0mq> listening port %u ...\n",short_be(port));
while(1) {
pico_stack_tick();
wait(0.001);
if (zmq_sock && Handshake_state > ST_LISTEN && Handshake_state < ST_RDY) {
if (hs_cb[Handshake_state])
hs_cb[Handshake_state](zmq_sock);
}
if((counter++ > 500) && (Handshake_state == ST_RDY)) {
zmq_send(zmq_sock, "HELLO WORLD", 11);
counter = 0;
myled = !myled;
}
}
}

