Free (GPLv2) TCP/IP stack developed by TASS Belgium

Dependents:   lpc1768-picotcp-demo ZeroMQ_PicoTCP_Publisher_demo TCPSocket_HelloWorld_PicoTCP Pico_TCP_UDP_Test ... more

PicoTCP. Copyright (c) 2013 TASS Belgium NV.

Released under the GNU General Public License, version 2.

Different licensing models may exist, at the sole discretion of the Copyright holders.

Official homepage: http://www.picotcp.com

Bug tracker: https://github.com/tass-belgium/picotcp/issues

Development steps:

  • initial integration with mbed RTOS
  • generic mbed Ethernet driver
  • high performance NXP LPC1768 specific Ethernet driver
  • Multi-threading support for mbed RTOS
  • Berkeley sockets and integration with the New Socket API
  • Fork of the apps running on top of the New Socket API
  • Scheduling optimizations
  • Debugging/benchmarking/testing

Demo application (measuring TCP sender performance):

Import programlpc1768-picotcp-demo

A PicoTCP demo app testing the ethernet throughput on the lpc1768 mbed board.

Committer:
tass
Date:
Mon Sep 02 08:02:21 2013 +0000
Revision:
51:ab4529a384a6
Child:
63:97f481e33cb2
Updated from masterbranch

Who changed what in which revision?

UserRevisionLine numberNew contents of line
tass 51:ab4529a384a6 1 /*********************************************************************
tass 51:ab4529a384a6 2 PicoTCP. Copyright (c) 2012 TASS Belgium NV. Some rights reserved.
tass 51:ab4529a384a6 3 See LICENSE and COPYING for usage.
tass 51:ab4529a384a6 4
tass 51:ab4529a384a6 5 Authors: Daniele Lacamera
tass 51:ab4529a384a6 6 *********************************************************************/
tass 51:ab4529a384a6 7
tass 51:ab4529a384a6 8 #include "pico_stack.h"
tass 51:ab4529a384a6 9 #include "pico_config.h"
tass 51:ab4529a384a6 10 #include "pico_ipv4.h"
tass 51:ab4529a384a6 11 #include "pico_socket.h"
tass 51:ab4529a384a6 12 #include "pico_zmq.h"
tass 51:ab4529a384a6 13
tass 51:ab4529a384a6 14 #define MY_VERSION 1u
tass 51:ab4529a384a6 15
tass 51:ab4529a384a6 16
tass 51:ab4529a384a6 17 enum zmq_state {
tass 51:ab4529a384a6 18 ST_OPEN = 0,
tass 51:ab4529a384a6 19 ST_CONNECTED,
tass 51:ab4529a384a6 20 ST_SIGNATURE,
tass 51:ab4529a384a6 21 ST_VERSION,
tass 51:ab4529a384a6 22 ST_GREETING,
tass 51:ab4529a384a6 23 ST_RDY,
tass 51:ab4529a384a6 24 ST_BUSY
tass 51:ab4529a384a6 25 };
tass 51:ab4529a384a6 26
tass 51:ab4529a384a6 27 enum zmq_role {
tass 51:ab4529a384a6 28 ROLE_NONE = 0,
tass 51:ab4529a384a6 29 ROLE_PUBLISHER,
tass 51:ab4529a384a6 30 ROLE_SUBSCRIBER
tass 51:ab4529a384a6 31 };
tass 51:ab4529a384a6 32
tass 51:ab4529a384a6 33 struct __attribute__((packed)) zmq_msg {
tass 51:ab4529a384a6 34 uint8_t flags;
tass 51:ab4529a384a6 35 uint8_t len;
tass 51:ab4529a384a6 36 char txt[0];
tass 51:ab4529a384a6 37 };
tass 51:ab4529a384a6 38
tass 51:ab4529a384a6 39 struct zmq_socket;
tass 51:ab4529a384a6 40
tass 51:ab4529a384a6 41 struct zmq_connector {
tass 51:ab4529a384a6 42 struct pico_socket *sock;
tass 51:ab4529a384a6 43 enum zmq_state state;
tass 51:ab4529a384a6 44 ZMQ parent;
tass 51:ab4529a384a6 45 enum zmq_role role;
tass 51:ab4529a384a6 46 uint8_t bytes_received;
tass 51:ab4529a384a6 47 struct zmq_connector *next;
tass 51:ab4529a384a6 48 };
tass 51:ab4529a384a6 49
tass 51:ab4529a384a6 50 struct zmq_socket {
tass 51:ab4529a384a6 51 struct pico_socket *sock;
tass 51:ab4529a384a6 52 void (*ready)(ZMQ z);
tass 51:ab4529a384a6 53 enum zmq_state state;
tass 51:ab4529a384a6 54 struct zmq_connector *subs;
tass 51:ab4529a384a6 55 enum zmq_role role;
tass 51:ab4529a384a6 56 };
tass 51:ab4529a384a6 57
tass 51:ab4529a384a6 58 static int zmq_socket_cmp(void *ka, void *kb)
tass 51:ab4529a384a6 59 {
tass 51:ab4529a384a6 60 ZMQ a = ka;
tass 51:ab4529a384a6 61 ZMQ b = kb;
tass 51:ab4529a384a6 62 if (a->sock < b->sock)
tass 51:ab4529a384a6 63 return -1;
tass 51:ab4529a384a6 64 if (b->sock < a->sock)
tass 51:ab4529a384a6 65 return 1;
tass 51:ab4529a384a6 66 return 0;
tass 51:ab4529a384a6 67 }
tass 51:ab4529a384a6 68 PICO_TREE_DECLARE(zmq_sockets, zmq_socket_cmp);
tass 51:ab4529a384a6 69
tass 51:ab4529a384a6 70 static inline ZMQ ZMTP(struct pico_socket *s)
tass 51:ab4529a384a6 71 {
tass 51:ab4529a384a6 72 struct zmq_socket tst = { .sock = s };
tass 51:ab4529a384a6 73 return (pico_tree_findKey(&zmq_sockets, &tst));
tass 51:ab4529a384a6 74 }
tass 51:ab4529a384a6 75
tass 51:ab4529a384a6 76 static inline struct zmq_connector *find_subscriber(struct pico_socket *s)
tass 51:ab4529a384a6 77 {
tass 51:ab4529a384a6 78 ZMQ search;
tass 51:ab4529a384a6 79 struct pico_tree_node *idx;
tass 51:ab4529a384a6 80 struct zmq_connector *el;
tass 51:ab4529a384a6 81 pico_tree_foreach(idx, &zmq_sockets) {
tass 51:ab4529a384a6 82 search = idx->keyValue;
tass 51:ab4529a384a6 83 el = search->subs;
tass 51:ab4529a384a6 84 while(el) {
tass 51:ab4529a384a6 85 if (el->sock == s)
tass 51:ab4529a384a6 86 return el;
tass 51:ab4529a384a6 87 el = el->next;
tass 51:ab4529a384a6 88 }
tass 51:ab4529a384a6 89 }
tass 51:ab4529a384a6 90 return NULL;
tass 51:ab4529a384a6 91 }
tass 51:ab4529a384a6 92
tass 51:ab4529a384a6 93
tass 51:ab4529a384a6 94 static void zmq_connector_add(ZMQ z, struct zmq_connector *zc)
tass 51:ab4529a384a6 95 {
tass 51:ab4529a384a6 96 zc->next = z->subs;
tass 51:ab4529a384a6 97 z->subs = zc;
tass 51:ab4529a384a6 98 zc->parent = z;
tass 51:ab4529a384a6 99 dbg("Added connector %p, sock is %p\n", zc, zc->sock);
tass 51:ab4529a384a6 100 }
tass 51:ab4529a384a6 101
tass 51:ab4529a384a6 102 static void zmq_connector_del(struct zmq_connector *zc)
tass 51:ab4529a384a6 103 {
tass 51:ab4529a384a6 104 ZMQ z = zc->parent;
tass 51:ab4529a384a6 105 if(z) {
tass 51:ab4529a384a6 106 struct zmq_connector *el = z->subs, *prev = NULL; /* el = pointer to linked list */
tass 51:ab4529a384a6 107 while(el) {
tass 51:ab4529a384a6 108 if (el == zc) { /* did we find the connector that we want to delete? */
tass 51:ab4529a384a6 109 if (prev) /* was there a previous list item? */
tass 51:ab4529a384a6 110 prev->next = zc->next; /* link the linked list again */
tass 51:ab4529a384a6 111 else
tass 51:ab4529a384a6 112 z->subs = zc->next; /* we were at the beginning of the list */
tass 51:ab4529a384a6 113 break;
tass 51:ab4529a384a6 114 }
tass 51:ab4529a384a6 115 prev = el;
tass 51:ab4529a384a6 116 el = el->next;
tass 51:ab4529a384a6 117 }
tass 51:ab4529a384a6 118 }
tass 51:ab4529a384a6 119 pico_socket_close(zc->sock);
tass 51:ab4529a384a6 120 pico_free(zc);
tass 51:ab4529a384a6 121 }
tass 51:ab4529a384a6 122
tass 51:ab4529a384a6 123 static void zmq_check_state(ZMQ z)
tass 51:ab4529a384a6 124 {
tass 51:ab4529a384a6 125 struct zmq_connector *c = z->subs;
tass 51:ab4529a384a6 126 enum zmq_state default_state, option_state;
tass 51:ab4529a384a6 127 if ((z->state != ST_RDY) && (z->state != ST_BUSY))
tass 51:ab4529a384a6 128 return;
tass 51:ab4529a384a6 129 if (z->role == ROLE_SUBSCRIBER) {
tass 51:ab4529a384a6 130 default_state = ST_RDY;
tass 51:ab4529a384a6 131 option_state = ST_BUSY;
tass 51:ab4529a384a6 132 } else {
tass 51:ab4529a384a6 133 default_state = ST_BUSY;
tass 51:ab4529a384a6 134 option_state = ST_RDY;
tass 51:ab4529a384a6 135 }
tass 51:ab4529a384a6 136 z->state = default_state;
tass 51:ab4529a384a6 137 while(c) {
tass 51:ab4529a384a6 138 if (c->state == option_state) {
tass 51:ab4529a384a6 139 z->state = option_state;
tass 51:ab4529a384a6 140 return;
tass 51:ab4529a384a6 141 }
tass 51:ab4529a384a6 142 c = c->next;
tass 51:ab4529a384a6 143 }
tass 51:ab4529a384a6 144 }
tass 51:ab4529a384a6 145
tass 51:ab4529a384a6 146
tass 51:ab4529a384a6 147 static void zmq_hs_connected(struct zmq_connector *z)
tass 51:ab4529a384a6 148 {
tass 51:ab4529a384a6 149 /* v2 signature */
tass 51:ab4529a384a6 150 uint8_t my_signature[14] = {0xff, 0, 0, 0, 0, 0, 0, 0, 1, 0x7f, 1, 1, 0, 0};
tass 51:ab4529a384a6 151
tass 51:ab4529a384a6 152 // uint8_t my_ver[2] = {MY_VERSION, 0};
tass 51:ab4529a384a6 153 // uint8_t my_greeting[52] = {'N','U','L','L', 0};
tass 51:ab4529a384a6 154
tass 51:ab4529a384a6 155 pico_socket_write(z->sock, my_signature, 14);
tass 51:ab4529a384a6 156 // pico_socket_write(z->sock, my_ver, 2);
tass 51:ab4529a384a6 157
tass 51:ab4529a384a6 158 // if (MY_VERSION > 2)
tass 51:ab4529a384a6 159 // pico_socket_write(z->sock, my_greeting, 52);
tass 51:ab4529a384a6 160
tass 51:ab4529a384a6 161 z->state = ST_SIGNATURE;
tass 51:ab4529a384a6 162 // z->state = ST_RDY;
tass 51:ab4529a384a6 163 }
tass 51:ab4529a384a6 164
tass 51:ab4529a384a6 165 static void zmq_hs_signature(struct zmq_connector *zc)
tass 51:ab4529a384a6 166 {
tass 51:ab4529a384a6 167 uint8_t incoming[20];
tass 51:ab4529a384a6 168 int ret;
tass 51:ab4529a384a6 169
tass 51:ab4529a384a6 170 ret = pico_socket_read(zc->sock, incoming, 14);
tass 51:ab4529a384a6 171 if (zc->bytes_received == 0 && ret > 0 && incoming[0] != 0xFF) {
tass 51:ab4529a384a6 172 //dbg("Received invalid signature: [0]!=0xFF\n");
tass 51:ab4529a384a6 173 zmq_connector_del(zc);
tass 51:ab4529a384a6 174 }
tass 51:ab4529a384a6 175 zc->bytes_received += ret;
tass 51:ab4529a384a6 176 if (zc->bytes_received < 14) {
tass 51:ab4529a384a6 177 //dbg("Waiting for the rest of the sig - got %u bytes\n",zc->bytes_received);
tass 51:ab4529a384a6 178 return;
tass 51:ab4529a384a6 179 }
tass 51:ab4529a384a6 180
tass 51:ab4529a384a6 181 //dbg("Valid signature received. len = %d, first byte: %02x\n", ret, incoming[0]);
tass 51:ab4529a384a6 182 zc->state = ST_RDY;
tass 51:ab4529a384a6 183 }
tass 51:ab4529a384a6 184
tass 51:ab4529a384a6 185 static void zmq_hs_version(struct zmq_connector *zc)
tass 51:ab4529a384a6 186 {
tass 51:ab4529a384a6 187 uint8_t incoming[20];
tass 51:ab4529a384a6 188 int ret;
tass 51:ab4529a384a6 189 ret = pico_socket_read(zc->sock, incoming, 2);
tass 51:ab4529a384a6 190 if (ret < 0) {
tass 51:ab4529a384a6 191 dbg("Cannot exchange valid version information. Read returned -1\n");
tass 51:ab4529a384a6 192 zmq_connector_del(zc);
tass 51:ab4529a384a6 193 return;
tass 51:ab4529a384a6 194 }
tass 51:ab4529a384a6 195 if (ret == 0)
tass 51:ab4529a384a6 196 return;
tass 51:ab4529a384a6 197 /* Version check?
tass 51:ab4529a384a6 198 if (incoming[0] != 3) {
tass 51:ab4529a384a6 199 dbg("Version %d.x not supported by this publisher\n", incoming[0]);
tass 51:ab4529a384a6 200 zmq_connector_del(zc);
tass 51:ab4529a384a6 201 return;
tass 51:ab4529a384a6 202 }
tass 51:ab4529a384a6 203 dbg("Subscriber is using version 3. Good!\n");
tass 51:ab4529a384a6 204 */
tass 51:ab4529a384a6 205 dbg("Subscriber is using version %d. Good!\n", incoming[0]);
tass 51:ab4529a384a6 206 if (incoming[0] == 3)
tass 51:ab4529a384a6 207 zc->state = ST_GREETING;
tass 51:ab4529a384a6 208 else
tass 51:ab4529a384a6 209 zc->state = ST_RDY;
tass 51:ab4529a384a6 210 }
tass 51:ab4529a384a6 211
tass 51:ab4529a384a6 212 static void zmq_hs_greeting(struct zmq_connector *zc)
tass 51:ab4529a384a6 213 {
tass 51:ab4529a384a6 214 uint8_t incoming[64];
tass 51:ab4529a384a6 215 int ret;
tass 51:ab4529a384a6 216 ret = pico_socket_read(zc->sock, incoming, 64);
tass 51:ab4529a384a6 217 dbg("zmq_socket_read in greeting returned %d\n", ret);
tass 51:ab4529a384a6 218 if (ret == 0)
tass 51:ab4529a384a6 219 return;
tass 51:ab4529a384a6 220 if (ret < 0) {
tass 51:ab4529a384a6 221 dbg("Cannot retrieve valid greeting\n");
tass 51:ab4529a384a6 222 zmq_connector_del(zc);
tass 51:ab4529a384a6 223 return;
tass 51:ab4529a384a6 224 }
tass 51:ab4529a384a6 225 zc->state = ST_RDY;
tass 51:ab4529a384a6 226 zmq_check_state(zc->parent);
tass 51:ab4529a384a6 227 dbg("Paired. Sending Ready.\n");
tass 51:ab4529a384a6 228 pico_socket_write(zc->sock, "READY ",8);
tass 51:ab4529a384a6 229 }
tass 51:ab4529a384a6 230
tass 51:ab4529a384a6 231 static void zmq_hs_rdy(struct zmq_connector *zc)
tass 51:ab4529a384a6 232 {
tass 51:ab4529a384a6 233 int ret;
tass 51:ab4529a384a6 234 uint8_t incoming[258];
tass 51:ab4529a384a6 235 if (zc->role == ROLE_SUBSCRIBER)
tass 51:ab4529a384a6 236 return;
tass 51:ab4529a384a6 237 ret = pico_socket_read(zc->sock, incoming, 258);
tass 51:ab4529a384a6 238 dbg("Got %d bytes from subscriber whilst in rdy state.\n", ret);
tass 51:ab4529a384a6 239 }
tass 51:ab4529a384a6 240
tass 51:ab4529a384a6 241 static void zmq_hs_busy(struct zmq_connector *zc)
tass 51:ab4529a384a6 242 {
tass 51:ab4529a384a6 243 int was_busy = 0;
tass 51:ab4529a384a6 244 if (zc->parent->state == ST_BUSY)
tass 51:ab4529a384a6 245 was_busy = 1;
tass 51:ab4529a384a6 246 zmq_check_state(zc->parent);
tass 51:ab4529a384a6 247 if (was_busy && (zc->parent->state == ST_RDY) && zc->parent->ready)
tass 51:ab4529a384a6 248 zc->parent->ready(zc->parent);
tass 51:ab4529a384a6 249 }
tass 51:ab4529a384a6 250
tass 51:ab4529a384a6 251 static void(*zmq_hs_cb[])(struct zmq_connector *) = {
tass 51:ab4529a384a6 252 NULL,
tass 51:ab4529a384a6 253 zmq_hs_connected,
tass 51:ab4529a384a6 254 zmq_hs_signature,
tass 51:ab4529a384a6 255 zmq_hs_version,
tass 51:ab4529a384a6 256 zmq_hs_greeting,
tass 51:ab4529a384a6 257 zmq_hs_rdy,
tass 51:ab4529a384a6 258 zmq_hs_busy
tass 51:ab4529a384a6 259 };
tass 51:ab4529a384a6 260
tass 51:ab4529a384a6 261
tass 51:ab4529a384a6 262 static void cb_tcp0mq(uint16_t ev, struct pico_socket *s)
tass 51:ab4529a384a6 263 {
tass 51:ab4529a384a6 264 struct pico_ip4 orig;
tass 51:ab4529a384a6 265 uint16_t port;
tass 51:ab4529a384a6 266 char peer[30];
tass 51:ab4529a384a6 267 struct zmq_connector *z_a, *zc;
tass 51:ab4529a384a6 268 ZMQ z = ZMTP(s);
tass 51:ab4529a384a6 269
tass 51:ab4529a384a6 270 /* Publisher. Accepting new subscribers */
tass 51:ab4529a384a6 271 if (z) {
tass 51:ab4529a384a6 272 if (ev & PICO_SOCK_EV_CONN) {
tass 51:ab4529a384a6 273 z_a = pico_zalloc(sizeof(struct zmq_socket));
tass 51:ab4529a384a6 274 if (z_a == NULL)
tass 51:ab4529a384a6 275 return;
tass 51:ab4529a384a6 276
tass 51:ab4529a384a6 277 z_a->sock = pico_socket_accept(s, &orig, &port);
tass 51:ab4529a384a6 278 pico_ipv4_to_string(peer, orig.addr);
tass 51:ab4529a384a6 279 dbg("tcp0mq> Connection requested by %s:%u.\n", peer, short_be(port));
tass 51:ab4529a384a6 280 if (z->state == ST_OPEN) {
tass 51:ab4529a384a6 281 dbg("tcp0mq> Accepted connection! New subscriber on sock %p.\n",z_a->sock);
tass 51:ab4529a384a6 282 zmq_connector_add(z, z_a);
tass 51:ab4529a384a6 283 z_a->role = ROLE_PUBLISHER;
tass 51:ab4529a384a6 284 z_a->state = ST_CONNECTED;
tass 51:ab4529a384a6 285 zmq_hs_connected(z_a);
tass 51:ab4529a384a6 286 } else {
tass 51:ab4529a384a6 287 dbg("tcp0mq> Server busy, connection rejected\n");
tass 51:ab4529a384a6 288 pico_socket_close(z_a->sock);
tass 51:ab4529a384a6 289 }
tass 51:ab4529a384a6 290 }
tass 51:ab4529a384a6 291 return;
tass 51:ab4529a384a6 292 }
tass 51:ab4529a384a6 293
tass 51:ab4529a384a6 294 zc = find_subscriber(s);
tass 51:ab4529a384a6 295 if (!zc) {
tass 51:ab4529a384a6 296 dbg("Cannot find subscriber with socket %p, ev = %d!\n", s, ev);
tass 51:ab4529a384a6 297 // pico_socket_close(s);
tass 51:ab4529a384a6 298 return;
tass 51:ab4529a384a6 299 }
tass 51:ab4529a384a6 300
tass 51:ab4529a384a6 301 if ((ev & PICO_SOCK_EV_CONN) && zc->role == ROLE_SUBSCRIBER && zc->state == ST_OPEN)
tass 51:ab4529a384a6 302 {
tass 51:ab4529a384a6 303 zc->state = ST_CONNECTED;
tass 51:ab4529a384a6 304 zmq_hs_connected(zc);
tass 51:ab4529a384a6 305 }
tass 51:ab4529a384a6 306
tass 51:ab4529a384a6 307
tass 51:ab4529a384a6 308 if (ev & PICO_SOCK_EV_RD) {
tass 51:ab4529a384a6 309 if (zmq_hs_cb[zc->state])
tass 51:ab4529a384a6 310 zmq_hs_cb[zc->state](zc);
tass 51:ab4529a384a6 311 }
tass 51:ab4529a384a6 312
tass 51:ab4529a384a6 313 if ((ev & PICO_SOCK_EV_WR) && zc->parent && (zc->parent->role == ROLE_PUBLISHER) && (zc->state == ST_BUSY)) {
tass 51:ab4529a384a6 314 if (zmq_hs_cb[zc->state])
tass 51:ab4529a384a6 315 zmq_hs_cb[zc->state](zc);
tass 51:ab4529a384a6 316 }
tass 51:ab4529a384a6 317
tass 51:ab4529a384a6 318
tass 51:ab4529a384a6 319 if (ev & PICO_SOCK_EV_FIN) {
tass 51:ab4529a384a6 320 dbg("tcp0mq> Connection closed.\n");
tass 51:ab4529a384a6 321 zmq_connector_del(zc);
tass 51:ab4529a384a6 322 }
tass 51:ab4529a384a6 323
tass 51:ab4529a384a6 324 if (ev & PICO_SOCK_EV_ERR) {
tass 51:ab4529a384a6 325 dbg("tcp0mq> Socket Error received: %s. Bailing out.\n", strerror(pico_err));
tass 51:ab4529a384a6 326 zmq_connector_del(zc);
tass 51:ab4529a384a6 327 }
tass 51:ab4529a384a6 328
tass 51:ab4529a384a6 329 if (ev & PICO_SOCK_EV_CLOSE) {
tass 51:ab4529a384a6 330 dbg("tcp0mq> event close\n");
tass 51:ab4529a384a6 331 zmq_connector_del(zc);
tass 51:ab4529a384a6 332 }
tass 51:ab4529a384a6 333
tass 51:ab4529a384a6 334 }
tass 51:ab4529a384a6 335
tass 51:ab4529a384a6 336 ZMQ zmq_subscriber(void (*cb)(ZMQ z))
tass 51:ab4529a384a6 337 {
tass 51:ab4529a384a6 338 ZMQ z = pico_zalloc(sizeof(struct zmq_socket));
tass 51:ab4529a384a6 339 if (!z) {
tass 51:ab4529a384a6 340 pico_err = PICO_ERR_ENOMEM;
tass 51:ab4529a384a6 341 return NULL;
tass 51:ab4529a384a6 342 }
tass 51:ab4529a384a6 343 z->state = ST_BUSY;
tass 51:ab4529a384a6 344 z->ready = cb;
tass 51:ab4529a384a6 345 z->role = ROLE_SUBSCRIBER;
tass 51:ab4529a384a6 346 pico_tree_insert(&zmq_sockets, z);
tass 51:ab4529a384a6 347 return z;
tass 51:ab4529a384a6 348 }
tass 51:ab4529a384a6 349
tass 51:ab4529a384a6 350 int zmq_connect(ZMQ z, char *address, uint16_t port)
tass 51:ab4529a384a6 351 {
tass 51:ab4529a384a6 352 struct pico_ip4 ip;
tass 51:ab4529a384a6 353 struct zmq_connector *z_c;
tass 51:ab4529a384a6 354 if (pico_string_to_ipv4(address, &ip.addr) < 0) {
tass 51:ab4529a384a6 355 dbg("FIXME!! I need to synchronize with the dns client to get to my publisher :(\n");
tass 51:ab4529a384a6 356 return -1;
tass 51:ab4529a384a6 357 }
tass 51:ab4529a384a6 358
tass 51:ab4529a384a6 359 z_c = pico_zalloc(sizeof(struct zmq_connector));
tass 51:ab4529a384a6 360 if (!z_c)
tass 51:ab4529a384a6 361 return -1;
tass 51:ab4529a384a6 362 z_c->role = ROLE_SUBSCRIBER;
tass 51:ab4529a384a6 363 z_c->state = ST_OPEN;
tass 51:ab4529a384a6 364 z_c->sock = pico_socket_open(PICO_PROTO_IPV4, PICO_PROTO_TCP, &cb_tcp0mq);
tass 51:ab4529a384a6 365 if (!z_c->sock) {
tass 51:ab4529a384a6 366 pico_free(z_c);
tass 51:ab4529a384a6 367 return -1;
tass 51:ab4529a384a6 368 }
tass 51:ab4529a384a6 369 if (pico_socket_connect(z_c->sock, &ip, short_be(port)) < 0)
tass 51:ab4529a384a6 370 return -1;
tass 51:ab4529a384a6 371 zmq_connector_add(z, z_c);
tass 51:ab4529a384a6 372 return 0;
tass 51:ab4529a384a6 373 }
tass 51:ab4529a384a6 374
tass 51:ab4529a384a6 375 ZMQ zmq_publisher(uint16_t _port, void (*cb)(ZMQ z))
tass 51:ab4529a384a6 376 {
tass 51:ab4529a384a6 377 struct pico_socket *s;
tass 51:ab4529a384a6 378 struct pico_ip4 inaddr_any = {0};
tass 51:ab4529a384a6 379 uint16_t port = short_be(_port);
tass 51:ab4529a384a6 380 ZMQ z = NULL;
tass 51:ab4529a384a6 381 s = pico_socket_open(PICO_PROTO_IPV4, PICO_PROTO_TCP, &cb_tcp0mq);
tass 51:ab4529a384a6 382 if (!s)
tass 51:ab4529a384a6 383 return NULL;
tass 51:ab4529a384a6 384
tass 51:ab4529a384a6 385 dbg("zmq_publisher: BIND\n");
tass 51:ab4529a384a6 386 if (pico_socket_bind(s, &inaddr_any, &port)!= 0) {
tass 51:ab4529a384a6 387 dbg("zmq publisher: BIND failed\n");
tass 51:ab4529a384a6 388 return NULL;
tass 51:ab4529a384a6 389 }
tass 51:ab4529a384a6 390 if (pico_socket_listen(s, 2) != 0) {
tass 51:ab4529a384a6 391 dbg("zmq publisher: LISTEN failed\n");
tass 51:ab4529a384a6 392 return NULL;
tass 51:ab4529a384a6 393 }
tass 51:ab4529a384a6 394 dbg("zmq_publisher: Active and bound to local port %d\n", short_be(port));
tass 51:ab4529a384a6 395
tass 51:ab4529a384a6 396 z = pico_zalloc(sizeof(struct zmq_socket));
tass 51:ab4529a384a6 397 if (!z) {
tass 51:ab4529a384a6 398 pico_socket_close(s);
tass 51:ab4529a384a6 399 pico_err = PICO_ERR_ENOMEM;
tass 51:ab4529a384a6 400 return NULL;
tass 51:ab4529a384a6 401 }
tass 51:ab4529a384a6 402 z->sock = s;
tass 51:ab4529a384a6 403 z->state = ST_OPEN;
tass 51:ab4529a384a6 404 z->ready = cb;
tass 51:ab4529a384a6 405 z->role = ROLE_PUBLISHER;
tass 51:ab4529a384a6 406 z->subs = NULL;
tass 51:ab4529a384a6 407 pico_tree_insert(&zmq_sockets, z);
tass 51:ab4529a384a6 408 dbg("zmq publisher created.\n");
tass 51:ab4529a384a6 409 return z;
tass 51:ab4529a384a6 410 }
tass 51:ab4529a384a6 411
tass 51:ab4529a384a6 412 int zmq_send(ZMQ z, char *txt, int len)
tass 51:ab4529a384a6 413 {
tass 51:ab4529a384a6 414 struct zmq_msg *msg;
tass 51:ab4529a384a6 415 struct zmq_connector *c = z->subs;
tass 51:ab4529a384a6 416 int ret = 0;
tass 51:ab4529a384a6 417
tass 51:ab4529a384a6 418 if (!c)
tass 51:ab4529a384a6 419 {
tass 51:ab4529a384a6 420 dbg("no subscribers, bailing out\n");
tass 51:ab4529a384a6 421 return 0; /* Need at least one subscriber */
tass 51:ab4529a384a6 422 }
tass 51:ab4529a384a6 423 msg = pico_zalloc(len + 2);
tass 51:ab4529a384a6 424 msg->flags = 4;
tass 51:ab4529a384a6 425 msg->len = (uint8_t) len;
tass 51:ab4529a384a6 426 memcpy(msg->txt, txt, len);
tass 51:ab4529a384a6 427
tass 51:ab4529a384a6 428 while (c) {
tass 51:ab4529a384a6 429 dbg("write to %u\n",c->state);
tass 51:ab4529a384a6 430 if ((ST_RDY == c->state) && (pico_socket_write(c->sock, msg, len + 2) > 0))
tass 51:ab4529a384a6 431 ret++;
tass 51:ab4529a384a6 432 c = c->next;
tass 51:ab4529a384a6 433 }
tass 51:ab4529a384a6 434 pico_free(msg);
tass 51:ab4529a384a6 435 return ret;
tass 51:ab4529a384a6 436 }
tass 51:ab4529a384a6 437
tass 51:ab4529a384a6 438 int zmq_recv(ZMQ z, char *txt)
tass 51:ab4529a384a6 439 {
tass 51:ab4529a384a6 440 int ret;
tass 51:ab4529a384a6 441 struct zmq_msg msg;
tass 51:ab4529a384a6 442 struct zmq_connector *nxt, *c = z->subs;
tass 51:ab4529a384a6 443 if (z->state != ST_RDY)
tass 51:ab4529a384a6 444 return 0;
tass 51:ab4529a384a6 445 while (c) {
tass 51:ab4529a384a6 446 nxt = c->next;
tass 51:ab4529a384a6 447 ret = pico_socket_read(c->sock, &msg, 2);
tass 51:ab4529a384a6 448 if (ret < 0) {
tass 51:ab4529a384a6 449 dbg("Error reading!\n");
tass 51:ab4529a384a6 450 zmq_connector_del(c);
tass 51:ab4529a384a6 451 } else if (ret < 2) {
tass 51:ab4529a384a6 452 c->state = ST_BUSY;
tass 51:ab4529a384a6 453 } else {
tass 51:ab4529a384a6 454 return pico_socket_read(c->sock, txt, msg.len);
tass 51:ab4529a384a6 455 }
tass 51:ab4529a384a6 456 c = nxt;
tass 51:ab4529a384a6 457 }
tass 51:ab4529a384a6 458 zmq_check_state(z);
tass 51:ab4529a384a6 459 return 0;
tass 51:ab4529a384a6 460 }
tass 51:ab4529a384a6 461
tass 51:ab4529a384a6 462 void zmq_close(ZMQ z)
tass 51:ab4529a384a6 463 {
tass 51:ab4529a384a6 464 struct zmq_connector *nxt, *c = z->subs;
tass 51:ab4529a384a6 465 while(c) {
tass 51:ab4529a384a6 466 nxt = c->next;
tass 51:ab4529a384a6 467 zmq_connector_del(c);
tass 51:ab4529a384a6 468 c = nxt;
tass 51:ab4529a384a6 469 }
tass 51:ab4529a384a6 470 pico_socket_close(z->sock);
tass 51:ab4529a384a6 471 pico_free(z);
tass 51:ab4529a384a6 472 }