ZeroMQ publisher demo application running on LPC1768 and PicoTCP. GPL v2

Dependencies:   PicoTCP lpc1768-picotcp-eth-polling mbed-rtos mbed

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers main.cpp Source File

main.cpp

00001 /* PicoTCP ZeroMQ Test Publisher 
00002  * Copyright 2013 - Daniele Lacamera
00003  * 
00004  * GPL v2
00005  */
00006 
00007 #include "mbed.h"
00008 #include "EthernetInterface.h"
00009 
00010 DigitalOut myled(LED1);
00011 DigitalOut conn_led(LED2);
00012 static struct pico_socket *zmq_sock = NULL;
00013 int remaining_hs_bytes = 64;
00014 
00015 volatile enum zmq_hshake_state {
00016   ST_LISTEN = 0,
00017   ST_CONNECTED,
00018   ST_SIGNATURE,
00019   ST_VERSION,
00020   ST_GREETING,
00021   ST_RDY
00022 } Handshake_state = ST_LISTEN;
00023 
00024 struct __attribute__((packed)) zmq_msg {
00025     uint8_t flags;
00026     uint8_t len;
00027     char    txt[0];
00028 };
00029 
00030 
00031 void zmq_send(struct pico_socket *s, char *txt, int len)
00032 {
00033     struct zmq_msg msg;
00034     msg.flags = 4;
00035     msg.len = (uint8_t) len;
00036     memcpy(msg.txt, txt, len);
00037     pico_socket_write(s, &msg, len + 2);
00038 }
00039 
00040 
00041 static void hs_connected(struct pico_socket *s)
00042 {
00043   uint8_t my_ver[2] = {3u, 0};
00044   uint8_t my_signature[10] =  {0xff, 0, 0, 0, 0, 0, 0, 0, 1, 0x7f};
00045   uint8_t my_greeting[52] = {'N','U','L','L', 0};
00046   pico_socket_write(s, my_signature, 10);
00047   pico_socket_write(s, my_ver, 2);
00048   pico_socket_write(s, my_greeting, 52);
00049   Handshake_state = ST_SIGNATURE;
00050   remaining_hs_bytes = 64;
00051   conn_led = 1;
00052 }
00053 
00054 static void hs_signature(struct pico_socket *s)
00055 {
00056   uint8_t incoming[20];
00057   int ret;
00058   
00059   ret = pico_socket_read(s, incoming, 10);
00060   if (ret < 10) {
00061     printf("Received invalid signature\n");
00062     pico_socket_close(s);
00063     Handshake_state = ST_LISTEN;
00064     conn_led = 0;
00065     return;
00066   }
00067   if (incoming[0] != 0xFF) {
00068     printf("Received invalid signature\n");
00069     pico_socket_close(s);
00070     Handshake_state = ST_LISTEN;
00071     conn_led = 0;
00072     return;
00073   }
00074   printf("Valid signature received. len = %d, first byte: %02x\n", ret, incoming[0]);
00075   remaining_hs_bytes -= ret;
00076   Handshake_state = ST_VERSION;
00077 }
00078 
00079 static void hs_version(struct pico_socket *s)
00080 {
00081   uint8_t incoming[20];
00082   int ret;
00083   ret = pico_socket_read(s, incoming, 2);
00084   if (ret < 0) {
00085     printf("Cannot exchange valid version information. Read returned -1\n");
00086     pico_socket_close(s);
00087     Handshake_state = ST_LISTEN;
00088     conn_led = 0;
00089     return;
00090   }
00091   if (ret == 0)
00092      return;
00093     
00094   remaining_hs_bytes -= ret;
00095   if (incoming[0] != 3) {
00096     printf("Version %d.x not supported by this publisher\n", incoming[0]);
00097     pico_socket_close(s);
00098     Handshake_state = ST_LISTEN;
00099     conn_led = 0;
00100     return;
00101   }
00102   printf("Subscriber is using version 3. Good!\n");
00103   Handshake_state = ST_GREETING;
00104 }
00105 
00106 static void hs_greeting(struct pico_socket *s)
00107 {
00108   uint8_t incoming[64];
00109   int ret;
00110   ret = pico_socket_read(s, incoming, 64);
00111   printf("pico_socket_read in greeting returned %d\n", ret);    
00112   if (ret == 0)
00113    return;  
00114   if (ret < 0) {
00115     printf("Cannot retrieve valid greeting\n");
00116     pico_socket_close(s);
00117     Handshake_state = ST_LISTEN;
00118     conn_led = 0;
00119     return;
00120   }
00121   printf("Paired. Sending Ready.\n");
00122   Handshake_state = ST_RDY;
00123   zmq_send(s, "READY   ", 8);
00124   
00125 }
00126 
00127 static void hs_rdy(struct pico_socket *s)
00128 {
00129     int ret;
00130     uint8_t incoming[258];
00131     ret = pico_socket_read(s, incoming, 258);
00132     printf("Got %d bytes from subscriber whilst in rdy state.\n", ret);
00133 }
00134 
00135 static void(*hs_cb[])(struct pico_socket *) = {
00136     NULL,
00137     hs_connected,
00138     hs_signature,
00139     hs_version,
00140     hs_greeting,
00141     hs_rdy
00142 };
00143 
00144 void cb_tcp0mq(uint16_t ev, struct pico_socket *s)
00145 {
00146   struct pico_ip4 orig;
00147   uint16_t port;
00148   char peer[30];
00149 
00150   if (ev & PICO_SOCK_EV_RD) {
00151     if (hs_cb[Handshake_state])
00152       hs_cb[Handshake_state](s);
00153   }
00154 
00155   if (ev & PICO_SOCK_EV_CONN) { 
00156     struct pico_socket *z;
00157     z = pico_socket_accept(s, &orig, &port);
00158     pico_ipv4_to_string(peer, orig.addr);
00159     printf("tcp0mq> Connection requested by %s:%d.\n", peer, short_be(port));
00160     if (Handshake_state == ST_LISTEN) {
00161         printf("tcp0mq> Accepted connection!\n");
00162         conn_led = 1;
00163         zmq_sock = z;
00164         Handshake_state = ST_CONNECTED;
00165     } else {
00166         printf("tcp0mq> Server busy, connection rejected\n");
00167         pico_socket_close(z);
00168     }
00169   }
00170 
00171   if (ev & PICO_SOCK_EV_FIN) {
00172     printf("tcp0mq> Connection closed.\n");
00173     Handshake_state = ST_LISTEN;
00174     conn_led = 0;
00175   }
00176 
00177   if (ev & PICO_SOCK_EV_ERR) {
00178     printf("tcp0mq> Socket Error received: %s. Bailing out.\n", strerror(pico_err));
00179     printf("tcp0mq> Connection closed.\n");
00180     Handshake_state = ST_LISTEN;
00181     conn_led = 0;
00182   }
00183 
00184   if (ev & PICO_SOCK_EV_CLOSE) {
00185     printf("tcp0mq> event close\n");
00186     pico_socket_close(s);
00187     Handshake_state = ST_LISTEN;
00188     conn_led = 0;
00189   }
00190 
00191   if (ev & PICO_SOCK_EV_WR) {
00192     /* TODO: manage pending data */
00193   }
00194 }
00195 
00196 
00197 
00198 int main() {
00199   int counter = 0;
00200   EthernetInterface eth;
00201   eth.init(); //Use DHCP
00202   eth.connect();
00203   pico_stack_init();
00204   
00205   struct pico_socket *s;
00206   struct pico_ip4 server_addr;
00207   uint16_t port = short_be(9000);
00208   struct pico_ip4 inaddr_any = {0};
00209 
00210   s = pico_socket_open(PICO_PROTO_IPV4, PICO_PROTO_TCP, &cb_tcp0mq);
00211   if (!s)
00212     while(1);;
00213   
00214   printf("tcp0mq> BIND\n");
00215   if (pico_socket_bind(s, &inaddr_any, &port)!= 0) {
00216     printf("tcp0mq> BIND failed because %s\n", strerror(pico_err));
00217     while(1);;
00218   }
00219 
00220   printf("tcp0mq> LISTEN\n");
00221   if (pico_socket_listen(s, 40) != 0)
00222      while(1);;
00223   printf("tcp0mq> listening port %u ...\n",short_be(port));
00224 
00225   while(1) {
00226     pico_stack_tick();
00227     wait(0.001);
00228     if (zmq_sock && Handshake_state > ST_LISTEN && Handshake_state < ST_RDY) {
00229       if (hs_cb[Handshake_state]) 
00230           hs_cb[Handshake_state](zmq_sock);
00231     }
00232     if((counter++ > 500) && (Handshake_state == ST_RDY)) {
00233         zmq_send(zmq_sock, "HELLO WORLD", 11);
00234         counter = 0;
00235         myled = !myled;
00236     }
00237   }
00238 }