ZeroMQ publisher demo application running on LPC1768 and PicoTCP. GPL v2
Dependencies: PicoTCP lpc1768-picotcp-eth-polling mbed-rtos mbed
main.cpp
00001 /* PicoTCP ZeroMQ Test Publisher 00002 * Copyright 2013 - Daniele Lacamera 00003 * 00004 * GPL v2 00005 */ 00006 00007 #include "mbed.h" 00008 #include "EthernetInterface.h" 00009 00010 DigitalOut myled(LED1); 00011 DigitalOut conn_led(LED2); 00012 static struct pico_socket *zmq_sock = NULL; 00013 int remaining_hs_bytes = 64; 00014 00015 volatile enum zmq_hshake_state { 00016 ST_LISTEN = 0, 00017 ST_CONNECTED, 00018 ST_SIGNATURE, 00019 ST_VERSION, 00020 ST_GREETING, 00021 ST_RDY 00022 } Handshake_state = ST_LISTEN; 00023 00024 struct __attribute__((packed)) zmq_msg { 00025 uint8_t flags; 00026 uint8_t len; 00027 char txt[0]; 00028 }; 00029 00030 00031 void zmq_send(struct pico_socket *s, char *txt, int len) 00032 { 00033 struct zmq_msg msg; 00034 msg.flags = 4; 00035 msg.len = (uint8_t) len; 00036 memcpy(msg.txt, txt, len); 00037 pico_socket_write(s, &msg, len + 2); 00038 } 00039 00040 00041 static void hs_connected(struct pico_socket *s) 00042 { 00043 uint8_t my_ver[2] = {3u, 0}; 00044 uint8_t my_signature[10] = {0xff, 0, 0, 0, 0, 0, 0, 0, 1, 0x7f}; 00045 uint8_t my_greeting[52] = {'N','U','L','L', 0}; 00046 pico_socket_write(s, my_signature, 10); 00047 pico_socket_write(s, my_ver, 2); 00048 pico_socket_write(s, my_greeting, 52); 00049 Handshake_state = ST_SIGNATURE; 00050 remaining_hs_bytes = 64; 00051 conn_led = 1; 00052 } 00053 00054 static void hs_signature(struct pico_socket *s) 00055 { 00056 uint8_t incoming[20]; 00057 int ret; 00058 00059 ret = pico_socket_read(s, incoming, 10); 00060 if (ret < 10) { 00061 printf("Received invalid signature\n"); 00062 pico_socket_close(s); 00063 Handshake_state = ST_LISTEN; 00064 conn_led = 0; 00065 return; 00066 } 00067 if (incoming[0] != 0xFF) { 00068 printf("Received invalid signature\n"); 00069 pico_socket_close(s); 00070 Handshake_state = ST_LISTEN; 00071 conn_led = 0; 00072 return; 00073 } 00074 printf("Valid signature received. len = %d, first byte: %02x\n", ret, incoming[0]); 00075 remaining_hs_bytes -= ret; 00076 Handshake_state = ST_VERSION; 00077 } 00078 00079 static void hs_version(struct pico_socket *s) 00080 { 00081 uint8_t incoming[20]; 00082 int ret; 00083 ret = pico_socket_read(s, incoming, 2); 00084 if (ret < 0) { 00085 printf("Cannot exchange valid version information. Read returned -1\n"); 00086 pico_socket_close(s); 00087 Handshake_state = ST_LISTEN; 00088 conn_led = 0; 00089 return; 00090 } 00091 if (ret == 0) 00092 return; 00093 00094 remaining_hs_bytes -= ret; 00095 if (incoming[0] != 3) { 00096 printf("Version %d.x not supported by this publisher\n", incoming[0]); 00097 pico_socket_close(s); 00098 Handshake_state = ST_LISTEN; 00099 conn_led = 0; 00100 return; 00101 } 00102 printf("Subscriber is using version 3. Good!\n"); 00103 Handshake_state = ST_GREETING; 00104 } 00105 00106 static void hs_greeting(struct pico_socket *s) 00107 { 00108 uint8_t incoming[64]; 00109 int ret; 00110 ret = pico_socket_read(s, incoming, 64); 00111 printf("pico_socket_read in greeting returned %d\n", ret); 00112 if (ret == 0) 00113 return; 00114 if (ret < 0) { 00115 printf("Cannot retrieve valid greeting\n"); 00116 pico_socket_close(s); 00117 Handshake_state = ST_LISTEN; 00118 conn_led = 0; 00119 return; 00120 } 00121 printf("Paired. Sending Ready.\n"); 00122 Handshake_state = ST_RDY; 00123 zmq_send(s, "READY ", 8); 00124 00125 } 00126 00127 static void hs_rdy(struct pico_socket *s) 00128 { 00129 int ret; 00130 uint8_t incoming[258]; 00131 ret = pico_socket_read(s, incoming, 258); 00132 printf("Got %d bytes from subscriber whilst in rdy state.\n", ret); 00133 } 00134 00135 static void(*hs_cb[])(struct pico_socket *) = { 00136 NULL, 00137 hs_connected, 00138 hs_signature, 00139 hs_version, 00140 hs_greeting, 00141 hs_rdy 00142 }; 00143 00144 void cb_tcp0mq(uint16_t ev, struct pico_socket *s) 00145 { 00146 struct pico_ip4 orig; 00147 uint16_t port; 00148 char peer[30]; 00149 00150 if (ev & PICO_SOCK_EV_RD) { 00151 if (hs_cb[Handshake_state]) 00152 hs_cb[Handshake_state](s); 00153 } 00154 00155 if (ev & PICO_SOCK_EV_CONN) { 00156 struct pico_socket *z; 00157 z = pico_socket_accept(s, &orig, &port); 00158 pico_ipv4_to_string(peer, orig.addr); 00159 printf("tcp0mq> Connection requested by %s:%d.\n", peer, short_be(port)); 00160 if (Handshake_state == ST_LISTEN) { 00161 printf("tcp0mq> Accepted connection!\n"); 00162 conn_led = 1; 00163 zmq_sock = z; 00164 Handshake_state = ST_CONNECTED; 00165 } else { 00166 printf("tcp0mq> Server busy, connection rejected\n"); 00167 pico_socket_close(z); 00168 } 00169 } 00170 00171 if (ev & PICO_SOCK_EV_FIN) { 00172 printf("tcp0mq> Connection closed.\n"); 00173 Handshake_state = ST_LISTEN; 00174 conn_led = 0; 00175 } 00176 00177 if (ev & PICO_SOCK_EV_ERR) { 00178 printf("tcp0mq> Socket Error received: %s. Bailing out.\n", strerror(pico_err)); 00179 printf("tcp0mq> Connection closed.\n"); 00180 Handshake_state = ST_LISTEN; 00181 conn_led = 0; 00182 } 00183 00184 if (ev & PICO_SOCK_EV_CLOSE) { 00185 printf("tcp0mq> event close\n"); 00186 pico_socket_close(s); 00187 Handshake_state = ST_LISTEN; 00188 conn_led = 0; 00189 } 00190 00191 if (ev & PICO_SOCK_EV_WR) { 00192 /* TODO: manage pending data */ 00193 } 00194 } 00195 00196 00197 00198 int main() { 00199 int counter = 0; 00200 EthernetInterface eth; 00201 eth.init(); //Use DHCP 00202 eth.connect(); 00203 pico_stack_init(); 00204 00205 struct pico_socket *s; 00206 struct pico_ip4 server_addr; 00207 uint16_t port = short_be(9000); 00208 struct pico_ip4 inaddr_any = {0}; 00209 00210 s = pico_socket_open(PICO_PROTO_IPV4, PICO_PROTO_TCP, &cb_tcp0mq); 00211 if (!s) 00212 while(1);; 00213 00214 printf("tcp0mq> BIND\n"); 00215 if (pico_socket_bind(s, &inaddr_any, &port)!= 0) { 00216 printf("tcp0mq> BIND failed because %s\n", strerror(pico_err)); 00217 while(1);; 00218 } 00219 00220 printf("tcp0mq> LISTEN\n"); 00221 if (pico_socket_listen(s, 40) != 0) 00222 while(1);; 00223 printf("tcp0mq> listening port %u ...\n",short_be(port)); 00224 00225 while(1) { 00226 pico_stack_tick(); 00227 wait(0.001); 00228 if (zmq_sock && Handshake_state > ST_LISTEN && Handshake_state < ST_RDY) { 00229 if (hs_cb[Handshake_state]) 00230 hs_cb[Handshake_state](zmq_sock); 00231 } 00232 if((counter++ > 500) && (Handshake_state == ST_RDY)) { 00233 zmq_send(zmq_sock, "HELLO WORLD", 11); 00234 counter = 0; 00235 myled = !myled; 00236 } 00237 } 00238 }
Generated on Thu Jul 14 2022 22:13:50 by 1.7.2