Free (GPLv2) TCP/IP stack developed by TASS Belgium

Dependents:   lpc1768-picotcp-demo ZeroMQ_PicoTCP_Publisher_demo TCPSocket_HelloWorld_PicoTCP Pico_TCP_UDP_Test ... more

PicoTCP. Copyright (c) 2013 TASS Belgium NV.

Released under the GNU General Public License, version 2.

Different licensing models may exist, at the sole discretion of the Copyright holders.

Official homepage: http://www.picotcp.com

Bug tracker: https://github.com/tass-belgium/picotcp/issues

Development steps:

  • initial integration with mbed RTOS
  • generic mbed Ethernet driver
  • high performance NXP LPC1768 specific Ethernet driver
  • Multi-threading support for mbed RTOS
  • Berkeley sockets and integration with the New Socket API
  • Fork of the apps running on top of the New Socket API
  • Scheduling optimizations
  • Debugging/benchmarking/testing

Demo application (measuring TCP sender performance):

Import programlpc1768-picotcp-demo

A PicoTCP demo app testing the ethernet throughput on the lpc1768 mbed board.

Committer:
tass
Date:
Fri Oct 18 09:41:50 2013 +0000
Revision:
101:37763e3777a7
Parent:
73:dfb737147f6e
Child:
122:5b1e9de8bf7f
merge with mainline Issue #39

Who changed what in which revision?

UserRevisionLine numberNew contents of line
tass 68:0847e35d08a6 1 /*********************************************************************
tass 68:0847e35d08a6 2 PicoTCP. Copyright (c) 2012 TASS Belgium NV. Some rights reserved.
tass 68:0847e35d08a6 3 See LICENSE and COPYING for usage.
tass 68:0847e35d08a6 4
tass 68:0847e35d08a6 5 Authors: Daniele Lacamera
tass 68:0847e35d08a6 6 *********************************************************************/
tass 68:0847e35d08a6 7
tass 68:0847e35d08a6 8 #include "pico_stack.h"
tass 68:0847e35d08a6 9 #include "pico_config.h"
tass 68:0847e35d08a6 10 #include "pico_ipv4.h"
tass 68:0847e35d08a6 11 #include "pico_socket.h"
tass 68:0847e35d08a6 12 #include "pico_zmq.h"
tass 68:0847e35d08a6 13
tass 68:0847e35d08a6 14 #define MY_VERSION 1u
tass 68:0847e35d08a6 15
tass 68:0847e35d08a6 16
tass 68:0847e35d08a6 17 enum zmq_state {
tass 68:0847e35d08a6 18 ST_OPEN = 0,
tass 68:0847e35d08a6 19 ST_CONNECTED,
tass 68:0847e35d08a6 20 ST_SIGNATURE,
tass 68:0847e35d08a6 21 ST_VERSION,
tass 68:0847e35d08a6 22 ST_GREETING,
tass 68:0847e35d08a6 23 ST_RDY,
tass 68:0847e35d08a6 24 ST_BUSY
tass 68:0847e35d08a6 25 };
tass 68:0847e35d08a6 26
tass 68:0847e35d08a6 27 enum zmq_role {
tass 68:0847e35d08a6 28 ROLE_NONE = 0,
tass 68:0847e35d08a6 29 ROLE_PUBLISHER,
tass 68:0847e35d08a6 30 ROLE_SUBSCRIBER
tass 68:0847e35d08a6 31 };
tass 68:0847e35d08a6 32
tass 68:0847e35d08a6 33 struct __attribute__((packed)) zmq_msg {
tass 68:0847e35d08a6 34 uint8_t flags;
tass 68:0847e35d08a6 35 uint8_t len;
tass 68:0847e35d08a6 36 char txt[0];
tass 68:0847e35d08a6 37 };
tass 68:0847e35d08a6 38
tass 68:0847e35d08a6 39 struct zmq_socket;
tass 68:0847e35d08a6 40
tass 68:0847e35d08a6 41 struct zmq_connector {
tass 68:0847e35d08a6 42 struct pico_socket *sock;
tass 68:0847e35d08a6 43 enum zmq_state state;
tass 68:0847e35d08a6 44 ZMQ parent;
tass 68:0847e35d08a6 45 enum zmq_role role;
tass 68:0847e35d08a6 46 uint8_t bytes_received;
tass 68:0847e35d08a6 47 struct zmq_connector *next;
tass 68:0847e35d08a6 48 };
tass 68:0847e35d08a6 49
tass 68:0847e35d08a6 50 struct zmq_socket {
tass 68:0847e35d08a6 51 struct pico_socket *sock;
tass 68:0847e35d08a6 52 void (*ready)(ZMQ z);
tass 68:0847e35d08a6 53 enum zmq_state state;
tass 68:0847e35d08a6 54 struct zmq_connector *subs;
tass 68:0847e35d08a6 55 enum zmq_role role;
tass 68:0847e35d08a6 56 };
tass 68:0847e35d08a6 57
tass 68:0847e35d08a6 58 static int zmq_socket_cmp(void *ka, void *kb)
tass 68:0847e35d08a6 59 {
tass 68:0847e35d08a6 60 ZMQ a = ka;
tass 68:0847e35d08a6 61 ZMQ b = kb;
tass 68:0847e35d08a6 62 if (a->sock < b->sock)
tass 68:0847e35d08a6 63 return -1;
tass 68:0847e35d08a6 64 if (b->sock < a->sock)
tass 68:0847e35d08a6 65 return 1;
tass 68:0847e35d08a6 66 return 0;
tass 68:0847e35d08a6 67 }
tass 68:0847e35d08a6 68 PICO_TREE_DECLARE(zmq_sockets, zmq_socket_cmp);
tass 68:0847e35d08a6 69
tass 68:0847e35d08a6 70 static inline ZMQ ZMTP(struct pico_socket *s)
tass 68:0847e35d08a6 71 {
tass 68:0847e35d08a6 72 struct zmq_socket tst = { .sock = s };
tass 68:0847e35d08a6 73 return (pico_tree_findKey(&zmq_sockets, &tst));
tass 68:0847e35d08a6 74 }
tass 68:0847e35d08a6 75
tass 68:0847e35d08a6 76 static inline struct zmq_connector *find_subscriber(struct pico_socket *s)
tass 68:0847e35d08a6 77 {
tass 68:0847e35d08a6 78 ZMQ search;
tass 68:0847e35d08a6 79 struct pico_tree_node *idx;
tass 68:0847e35d08a6 80 struct zmq_connector *el;
tass 68:0847e35d08a6 81 pico_tree_foreach(idx, &zmq_sockets) {
tass 68:0847e35d08a6 82 search = idx->keyValue;
tass 68:0847e35d08a6 83 el = search->subs;
tass 68:0847e35d08a6 84 while(el) {
tass 68:0847e35d08a6 85 if (el->sock == s)
tass 68:0847e35d08a6 86 return el;
tass 68:0847e35d08a6 87 el = el->next;
tass 68:0847e35d08a6 88 }
tass 68:0847e35d08a6 89 }
tass 68:0847e35d08a6 90 return NULL;
tass 68:0847e35d08a6 91 }
tass 68:0847e35d08a6 92
tass 68:0847e35d08a6 93
tass 68:0847e35d08a6 94 static void zmq_connector_add(ZMQ z, struct zmq_connector *zc)
tass 68:0847e35d08a6 95 {
tass 68:0847e35d08a6 96 zc->next = z->subs;
tass 68:0847e35d08a6 97 z->subs = zc;
tass 68:0847e35d08a6 98 zc->parent = z;
tass 68:0847e35d08a6 99 dbg("Added connector %p, sock is %p\n", zc, zc->sock);
tass 68:0847e35d08a6 100 }
tass 68:0847e35d08a6 101
tass 68:0847e35d08a6 102 static void zmq_connector_del(struct zmq_connector *zc)
tass 68:0847e35d08a6 103 {
tass 68:0847e35d08a6 104 ZMQ z = zc->parent;
tass 68:0847e35d08a6 105 if(z) {
tass 68:0847e35d08a6 106 struct zmq_connector *el = z->subs, *prev = NULL; /* el = pointer to linked list */
tass 68:0847e35d08a6 107 while(el) {
tass 68:0847e35d08a6 108 if (el == zc) { /* did we find the connector that we want to delete? */
tass 68:0847e35d08a6 109 if (prev) /* was there a previous list item? */
tass 68:0847e35d08a6 110 prev->next = zc->next; /* link the linked list again */
tass 68:0847e35d08a6 111 else
tass 68:0847e35d08a6 112 z->subs = zc->next; /* we were at the beginning of the list */
tass 68:0847e35d08a6 113 break;
tass 68:0847e35d08a6 114 }
tass 68:0847e35d08a6 115 prev = el;
tass 68:0847e35d08a6 116 el = el->next;
tass 68:0847e35d08a6 117 }
tass 68:0847e35d08a6 118 }
tass 68:0847e35d08a6 119 pico_socket_close(zc->sock);
tass 68:0847e35d08a6 120 pico_free(zc);
tass 68:0847e35d08a6 121 }
tass 68:0847e35d08a6 122
tass 68:0847e35d08a6 123 static void zmq_check_state(ZMQ z)
tass 68:0847e35d08a6 124 {
tass 68:0847e35d08a6 125 struct zmq_connector *c = z->subs;
tass 68:0847e35d08a6 126 enum zmq_state default_state, option_state;
tass 68:0847e35d08a6 127 if ((z->state != ST_RDY) && (z->state != ST_BUSY))
tass 68:0847e35d08a6 128 return;
tass 68:0847e35d08a6 129 if (z->role == ROLE_SUBSCRIBER) {
tass 68:0847e35d08a6 130 default_state = ST_RDY;
tass 68:0847e35d08a6 131 option_state = ST_BUSY;
tass 68:0847e35d08a6 132 } else {
tass 68:0847e35d08a6 133 default_state = ST_BUSY;
tass 68:0847e35d08a6 134 option_state = ST_RDY;
tass 68:0847e35d08a6 135 }
tass 68:0847e35d08a6 136 z->state = default_state;
tass 68:0847e35d08a6 137 while(c) {
tass 68:0847e35d08a6 138 if (c->state == option_state) {
tass 68:0847e35d08a6 139 z->state = option_state;
tass 68:0847e35d08a6 140 return;
tass 68:0847e35d08a6 141 }
tass 68:0847e35d08a6 142 c = c->next;
tass 68:0847e35d08a6 143 }
tass 68:0847e35d08a6 144 }
tass 68:0847e35d08a6 145
tass 68:0847e35d08a6 146
tass 68:0847e35d08a6 147 static void zmq_hs_connected(struct zmq_connector *z)
tass 68:0847e35d08a6 148 {
tass 68:0847e35d08a6 149 /* v2 signature */
tass 68:0847e35d08a6 150 uint8_t my_signature[14] = {0xff, 0, 0, 0, 0, 0, 0, 0, 1, 0x7f, 1, 1, 0, 0};
tass 68:0847e35d08a6 151
tass 68:0847e35d08a6 152 // uint8_t my_ver[2] = {MY_VERSION, 0};
tass 68:0847e35d08a6 153 // uint8_t my_greeting[52] = {'N','U','L','L', 0};
tass 68:0847e35d08a6 154
tass 68:0847e35d08a6 155 pico_socket_write(z->sock, my_signature, 14);
tass 68:0847e35d08a6 156 // pico_socket_write(z->sock, my_ver, 2);
tass 68:0847e35d08a6 157
tass 68:0847e35d08a6 158 // if (MY_VERSION > 2)
tass 68:0847e35d08a6 159 // pico_socket_write(z->sock, my_greeting, 52);
tass 68:0847e35d08a6 160
tass 68:0847e35d08a6 161 z->state = ST_SIGNATURE;
tass 68:0847e35d08a6 162 // z->state = ST_RDY;
tass 68:0847e35d08a6 163 }
tass 68:0847e35d08a6 164
tass 68:0847e35d08a6 165 static void zmq_hs_signature(struct zmq_connector *zc)
tass 68:0847e35d08a6 166 {
tass 68:0847e35d08a6 167 uint8_t incoming[20];
tass 68:0847e35d08a6 168 int ret;
tass 68:0847e35d08a6 169
tass 68:0847e35d08a6 170 ret = pico_socket_read(zc->sock, incoming, 14);
tass 68:0847e35d08a6 171 if (zc->bytes_received == 0 && ret > 0 && incoming[0] != 0xFF) {
tass 68:0847e35d08a6 172 //dbg("Received invalid signature: [0]!=0xFF\n");
tass 68:0847e35d08a6 173 zmq_connector_del(zc);
tass 68:0847e35d08a6 174 }
tass 73:dfb737147f6e 175 zc->bytes_received = (uint8_t)(zc->bytes_received + ret);
tass 68:0847e35d08a6 176 if (zc->bytes_received < 14) {
tass 68:0847e35d08a6 177 //dbg("Waiting for the rest of the sig - got %u bytes\n",zc->bytes_received);
tass 68:0847e35d08a6 178 return;
tass 68:0847e35d08a6 179 }
tass 68:0847e35d08a6 180
tass 68:0847e35d08a6 181 //dbg("Valid signature received. len = %d, first byte: %02x\n", ret, incoming[0]);
tass 68:0847e35d08a6 182 zc->state = ST_RDY;
tass 68:0847e35d08a6 183 }
tass 68:0847e35d08a6 184
tass 68:0847e35d08a6 185 static void zmq_hs_version(struct zmq_connector *zc)
tass 68:0847e35d08a6 186 {
tass 68:0847e35d08a6 187 uint8_t incoming[20];
tass 68:0847e35d08a6 188 int ret;
tass 68:0847e35d08a6 189 ret = pico_socket_read(zc->sock, incoming, 2);
tass 68:0847e35d08a6 190 if (ret < 0) {
tass 68:0847e35d08a6 191 dbg("Cannot exchange valid version information. Read returned -1\n");
tass 68:0847e35d08a6 192 zmq_connector_del(zc);
tass 68:0847e35d08a6 193 return;
tass 68:0847e35d08a6 194 }
tass 68:0847e35d08a6 195 if (ret == 0)
tass 68:0847e35d08a6 196 return;
tass 68:0847e35d08a6 197 /* Version check?
tass 68:0847e35d08a6 198 if (incoming[0] != 3) {
tass 68:0847e35d08a6 199 dbg("Version %d.x not supported by this publisher\n", incoming[0]);
tass 68:0847e35d08a6 200 zmq_connector_del(zc);
tass 68:0847e35d08a6 201 return;
tass 68:0847e35d08a6 202 }
tass 68:0847e35d08a6 203 dbg("Subscriber is using version 3. Good!\n");
tass 68:0847e35d08a6 204 */
tass 68:0847e35d08a6 205 dbg("Subscriber is using version %d. Good!\n", incoming[0]);
tass 68:0847e35d08a6 206 if (incoming[0] == 3)
tass 68:0847e35d08a6 207 zc->state = ST_GREETING;
tass 68:0847e35d08a6 208 else
tass 68:0847e35d08a6 209 zc->state = ST_RDY;
tass 68:0847e35d08a6 210 }
tass 68:0847e35d08a6 211
tass 68:0847e35d08a6 212 static void zmq_hs_greeting(struct zmq_connector *zc)
tass 68:0847e35d08a6 213 {
tass 68:0847e35d08a6 214 uint8_t incoming[64];
tass 68:0847e35d08a6 215 int ret;
tass 68:0847e35d08a6 216 ret = pico_socket_read(zc->sock, incoming, 64);
tass 68:0847e35d08a6 217 dbg("zmq_socket_read in greeting returned %d\n", ret);
tass 68:0847e35d08a6 218 if (ret == 0)
tass 68:0847e35d08a6 219 return;
tass 68:0847e35d08a6 220 if (ret < 0) {
tass 68:0847e35d08a6 221 dbg("Cannot retrieve valid greeting\n");
tass 68:0847e35d08a6 222 zmq_connector_del(zc);
tass 68:0847e35d08a6 223 return;
tass 68:0847e35d08a6 224 }
tass 68:0847e35d08a6 225 zc->state = ST_RDY;
tass 68:0847e35d08a6 226 zmq_check_state(zc->parent);
tass 68:0847e35d08a6 227 dbg("Paired. Sending Ready.\n");
tass 68:0847e35d08a6 228 pico_socket_write(zc->sock, "READY ",8);
tass 68:0847e35d08a6 229 }
tass 68:0847e35d08a6 230
tass 68:0847e35d08a6 231 static void zmq_hs_rdy(struct zmq_connector *zc)
tass 68:0847e35d08a6 232 {
tass 68:0847e35d08a6 233 int ret;
tass 68:0847e35d08a6 234 uint8_t incoming[258];
tass 68:0847e35d08a6 235 if (zc->role == ROLE_SUBSCRIBER)
tass 68:0847e35d08a6 236 return;
tass 68:0847e35d08a6 237 ret = pico_socket_read(zc->sock, incoming, 258);
tass 68:0847e35d08a6 238 dbg("Got %d bytes from subscriber whilst in rdy state.\n", ret);
tass 68:0847e35d08a6 239 }
tass 68:0847e35d08a6 240
tass 68:0847e35d08a6 241 static void zmq_hs_busy(struct zmq_connector *zc)
tass 68:0847e35d08a6 242 {
tass 68:0847e35d08a6 243 int was_busy = 0;
tass 68:0847e35d08a6 244 if (zc->parent->state == ST_BUSY)
tass 68:0847e35d08a6 245 was_busy = 1;
tass 68:0847e35d08a6 246 zmq_check_state(zc->parent);
tass 68:0847e35d08a6 247 if (was_busy && (zc->parent->state == ST_RDY) && zc->parent->ready)
tass 68:0847e35d08a6 248 zc->parent->ready(zc->parent);
tass 68:0847e35d08a6 249 }
tass 68:0847e35d08a6 250
tass 68:0847e35d08a6 251 static void(*zmq_hs_cb[])(struct zmq_connector *) = {
tass 68:0847e35d08a6 252 NULL,
tass 68:0847e35d08a6 253 zmq_hs_connected,
tass 68:0847e35d08a6 254 zmq_hs_signature,
tass 68:0847e35d08a6 255 zmq_hs_version,
tass 68:0847e35d08a6 256 zmq_hs_greeting,
tass 68:0847e35d08a6 257 zmq_hs_rdy,
tass 68:0847e35d08a6 258 zmq_hs_busy
tass 68:0847e35d08a6 259 };
tass 68:0847e35d08a6 260
tass 68:0847e35d08a6 261
tass 68:0847e35d08a6 262 static void cb_tcp0mq(uint16_t ev, struct pico_socket *s)
tass 68:0847e35d08a6 263 {
tass 68:0847e35d08a6 264 struct pico_ip4 orig;
tass 68:0847e35d08a6 265 uint16_t port;
tass 68:0847e35d08a6 266 char peer[30];
tass 68:0847e35d08a6 267 struct zmq_connector *z_a, *zc;
tass 68:0847e35d08a6 268 ZMQ z = ZMTP(s);
tass 68:0847e35d08a6 269
tass 68:0847e35d08a6 270 /* Publisher. Accepting new subscribers */
tass 68:0847e35d08a6 271 if (z) {
tass 68:0847e35d08a6 272 if (ev & PICO_SOCK_EV_CONN) {
tass 68:0847e35d08a6 273 z_a = pico_zalloc(sizeof(struct zmq_socket));
tass 68:0847e35d08a6 274 if (z_a == NULL)
tass 68:0847e35d08a6 275 return;
tass 68:0847e35d08a6 276
tass 68:0847e35d08a6 277 z_a->sock = pico_socket_accept(s, &orig, &port);
tass 68:0847e35d08a6 278 pico_ipv4_to_string(peer, orig.addr);
tass 68:0847e35d08a6 279 dbg("tcp0mq> Connection requested by %s:%u.\n", peer, short_be(port));
tass 68:0847e35d08a6 280 if (z->state == ST_OPEN) {
tass 68:0847e35d08a6 281 dbg("tcp0mq> Accepted connection! New subscriber on sock %p.\n",z_a->sock);
tass 68:0847e35d08a6 282 zmq_connector_add(z, z_a);
tass 68:0847e35d08a6 283 z_a->role = ROLE_PUBLISHER;
tass 68:0847e35d08a6 284 z_a->state = ST_CONNECTED;
tass 68:0847e35d08a6 285 zmq_hs_connected(z_a);
tass 68:0847e35d08a6 286 } else {
tass 68:0847e35d08a6 287 dbg("tcp0mq> Server busy, connection rejected\n");
tass 68:0847e35d08a6 288 pico_socket_close(z_a->sock);
tass 68:0847e35d08a6 289 }
tass 68:0847e35d08a6 290 }
tass 68:0847e35d08a6 291 return;
tass 68:0847e35d08a6 292 }
tass 68:0847e35d08a6 293
tass 68:0847e35d08a6 294 zc = find_subscriber(s);
tass 68:0847e35d08a6 295 if (!zc) {
tass 68:0847e35d08a6 296 dbg("Cannot find subscriber with socket %p, ev = %d!\n", s, ev);
tass 68:0847e35d08a6 297 // pico_socket_close(s);
tass 68:0847e35d08a6 298 return;
tass 68:0847e35d08a6 299 }
tass 68:0847e35d08a6 300
tass 68:0847e35d08a6 301 if ((ev & PICO_SOCK_EV_CONN) && zc->role == ROLE_SUBSCRIBER && zc->state == ST_OPEN)
tass 68:0847e35d08a6 302 {
tass 68:0847e35d08a6 303 zc->state = ST_CONNECTED;
tass 68:0847e35d08a6 304 zmq_hs_connected(zc);
tass 68:0847e35d08a6 305 }
tass 68:0847e35d08a6 306
tass 68:0847e35d08a6 307
tass 68:0847e35d08a6 308 if (ev & PICO_SOCK_EV_RD) {
tass 68:0847e35d08a6 309 if (zmq_hs_cb[zc->state])
tass 68:0847e35d08a6 310 zmq_hs_cb[zc->state](zc);
tass 68:0847e35d08a6 311 }
tass 68:0847e35d08a6 312
tass 68:0847e35d08a6 313 if ((ev & PICO_SOCK_EV_WR) && zc->parent && (zc->parent->role == ROLE_PUBLISHER) && (zc->state == ST_BUSY)) {
tass 68:0847e35d08a6 314 if (zmq_hs_cb[zc->state])
tass 68:0847e35d08a6 315 zmq_hs_cb[zc->state](zc);
tass 68:0847e35d08a6 316 }
tass 68:0847e35d08a6 317
tass 68:0847e35d08a6 318
tass 68:0847e35d08a6 319 if (ev & PICO_SOCK_EV_FIN) {
tass 68:0847e35d08a6 320 dbg("tcp0mq> Connection closed.\n");
tass 68:0847e35d08a6 321 zmq_connector_del(zc);
tass 68:0847e35d08a6 322 }
tass 68:0847e35d08a6 323
tass 68:0847e35d08a6 324 if (ev & PICO_SOCK_EV_ERR) {
tass 68:0847e35d08a6 325 dbg("tcp0mq> Socket Error received: %s. Bailing out.\n", strerror(pico_err));
tass 68:0847e35d08a6 326 zmq_connector_del(zc);
tass 68:0847e35d08a6 327 }
tass 68:0847e35d08a6 328
tass 68:0847e35d08a6 329 if (ev & PICO_SOCK_EV_CLOSE) {
tass 68:0847e35d08a6 330 dbg("tcp0mq> event close\n");
tass 68:0847e35d08a6 331 zmq_connector_del(zc);
tass 68:0847e35d08a6 332 }
tass 68:0847e35d08a6 333
tass 68:0847e35d08a6 334 }
tass 68:0847e35d08a6 335
tass 68:0847e35d08a6 336 ZMQ zmq_subscriber(void (*cb)(ZMQ z))
tass 68:0847e35d08a6 337 {
tass 68:0847e35d08a6 338 ZMQ z = pico_zalloc(sizeof(struct zmq_socket));
tass 68:0847e35d08a6 339 if (!z) {
tass 68:0847e35d08a6 340 pico_err = PICO_ERR_ENOMEM;
tass 68:0847e35d08a6 341 return NULL;
tass 68:0847e35d08a6 342 }
tass 68:0847e35d08a6 343 z->state = ST_BUSY;
tass 68:0847e35d08a6 344 z->ready = cb;
tass 68:0847e35d08a6 345 z->role = ROLE_SUBSCRIBER;
tass 68:0847e35d08a6 346 pico_tree_insert(&zmq_sockets, z);
tass 68:0847e35d08a6 347 return z;
tass 68:0847e35d08a6 348 }
tass 68:0847e35d08a6 349
tass 68:0847e35d08a6 350 int zmq_connect(ZMQ z, char *address, uint16_t port)
tass 68:0847e35d08a6 351 {
tass 68:0847e35d08a6 352 struct pico_ip4 ip = {0};
tass 68:0847e35d08a6 353 struct zmq_connector *z_c;
tass 68:0847e35d08a6 354 if (pico_string_to_ipv4(address, &ip.addr) < 0) {
tass 68:0847e35d08a6 355 dbg("FIXME!! I need to synchronize with the dns client to get to my publisher :(\n");
tass 68:0847e35d08a6 356 return -1;
tass 68:0847e35d08a6 357 }
tass 68:0847e35d08a6 358
tass 68:0847e35d08a6 359 z_c = pico_zalloc(sizeof(struct zmq_connector));
tass 68:0847e35d08a6 360 if (!z_c)
tass 68:0847e35d08a6 361 return -1;
tass 68:0847e35d08a6 362 z_c->role = ROLE_SUBSCRIBER;
tass 68:0847e35d08a6 363 z_c->state = ST_OPEN;
tass 68:0847e35d08a6 364 z_c->sock = pico_socket_open(PICO_PROTO_IPV4, PICO_PROTO_TCP, &cb_tcp0mq);
tass 68:0847e35d08a6 365 if (!z_c->sock) {
tass 68:0847e35d08a6 366 pico_free(z_c);
tass 68:0847e35d08a6 367 return -1;
tass 68:0847e35d08a6 368 }
tass 68:0847e35d08a6 369 if (pico_socket_connect(z_c->sock, &ip, short_be(port)) < 0)
tass 68:0847e35d08a6 370 return -1;
tass 68:0847e35d08a6 371 zmq_connector_add(z, z_c);
tass 68:0847e35d08a6 372 return 0;
tass 68:0847e35d08a6 373 }
tass 68:0847e35d08a6 374
tass 68:0847e35d08a6 375 ZMQ zmq_publisher(uint16_t _port, void (*cb)(ZMQ z))
tass 68:0847e35d08a6 376 {
tass 68:0847e35d08a6 377 struct pico_socket *s;
tass 68:0847e35d08a6 378 struct pico_ip4 inaddr_any = {0};
tass 68:0847e35d08a6 379 uint16_t port = short_be(_port);
tass 68:0847e35d08a6 380 ZMQ z = NULL;
tass 68:0847e35d08a6 381 s = pico_socket_open(PICO_PROTO_IPV4, PICO_PROTO_TCP, &cb_tcp0mq);
tass 68:0847e35d08a6 382 if (!s)
tass 68:0847e35d08a6 383 return NULL;
tass 68:0847e35d08a6 384
tass 68:0847e35d08a6 385 dbg("zmq_publisher: BIND\n");
tass 68:0847e35d08a6 386 if (pico_socket_bind(s, &inaddr_any, &port)!= 0) {
tass 68:0847e35d08a6 387 dbg("zmq publisher: BIND failed\n");
tass 68:0847e35d08a6 388 return NULL;
tass 68:0847e35d08a6 389 }
tass 68:0847e35d08a6 390 if (pico_socket_listen(s, 2) != 0) {
tass 68:0847e35d08a6 391 dbg("zmq publisher: LISTEN failed\n");
tass 68:0847e35d08a6 392 return NULL;
tass 68:0847e35d08a6 393 }
tass 68:0847e35d08a6 394 dbg("zmq_publisher: Active and bound to local port %d\n", short_be(port));
tass 68:0847e35d08a6 395
tass 68:0847e35d08a6 396 z = pico_zalloc(sizeof(struct zmq_socket));
tass 68:0847e35d08a6 397 if (!z) {
tass 68:0847e35d08a6 398 pico_socket_close(s);
tass 68:0847e35d08a6 399 pico_err = PICO_ERR_ENOMEM;
tass 68:0847e35d08a6 400 return NULL;
tass 68:0847e35d08a6 401 }
tass 68:0847e35d08a6 402 z->sock = s;
tass 68:0847e35d08a6 403 z->state = ST_OPEN;
tass 68:0847e35d08a6 404 z->ready = cb;
tass 68:0847e35d08a6 405 z->role = ROLE_PUBLISHER;
tass 68:0847e35d08a6 406 z->subs = NULL;
tass 68:0847e35d08a6 407 pico_tree_insert(&zmq_sockets, z);
tass 68:0847e35d08a6 408 dbg("zmq publisher created.\n");
tass 68:0847e35d08a6 409 return z;
tass 68:0847e35d08a6 410 }
tass 68:0847e35d08a6 411
tass 68:0847e35d08a6 412 int zmq_send(ZMQ z, char *txt, int len)
tass 68:0847e35d08a6 413 {
tass 68:0847e35d08a6 414 struct zmq_msg *msg;
tass 68:0847e35d08a6 415 struct zmq_connector *c = z->subs;
tass 68:0847e35d08a6 416 int ret = 0;
tass 68:0847e35d08a6 417
tass 68:0847e35d08a6 418 if (!c)
tass 68:0847e35d08a6 419 {
tass 68:0847e35d08a6 420 dbg("no subscribers, bailing out\n");
tass 68:0847e35d08a6 421 return 0; /* Need at least one subscriber */
tass 68:0847e35d08a6 422 }
tass 70:cd218dd180e5 423 msg = pico_zalloc((size_t)(len + 2));
tass 68:0847e35d08a6 424 msg->flags = 4;
tass 68:0847e35d08a6 425 msg->len = (uint8_t) len;
tass 70:cd218dd180e5 426 memcpy(msg->txt, txt,(size_t) len);
tass 68:0847e35d08a6 427
tass 68:0847e35d08a6 428 while (c) {
tass 68:0847e35d08a6 429 dbg("write to %u\n",c->state);
tass 68:0847e35d08a6 430 if ((ST_RDY == c->state) && (pico_socket_write(c->sock, msg, len + 2) > 0))
tass 68:0847e35d08a6 431 ret++;
tass 68:0847e35d08a6 432 c = c->next;
tass 68:0847e35d08a6 433 }
tass 68:0847e35d08a6 434 pico_free(msg);
tass 68:0847e35d08a6 435 return ret;
tass 68:0847e35d08a6 436 }
tass 68:0847e35d08a6 437
tass 68:0847e35d08a6 438 int zmq_recv(ZMQ z, char *txt)
tass 68:0847e35d08a6 439 {
tass 68:0847e35d08a6 440 int ret;
tass 68:0847e35d08a6 441 struct zmq_msg msg;
tass 68:0847e35d08a6 442 struct zmq_connector *nxt, *c = z->subs;
tass 68:0847e35d08a6 443 if (z->state != ST_RDY)
tass 68:0847e35d08a6 444 return 0;
tass 68:0847e35d08a6 445 while (c) {
tass 68:0847e35d08a6 446 nxt = c->next;
tass 68:0847e35d08a6 447 ret = pico_socket_read(c->sock, &msg, 2);
tass 68:0847e35d08a6 448 if (ret < 0) {
tass 68:0847e35d08a6 449 dbg("Error reading!\n");
tass 68:0847e35d08a6 450 zmq_connector_del(c);
tass 68:0847e35d08a6 451 } else if (ret < 2) {
tass 68:0847e35d08a6 452 c->state = ST_BUSY;
tass 68:0847e35d08a6 453 } else {
tass 68:0847e35d08a6 454 return pico_socket_read(c->sock, txt, msg.len);
tass 68:0847e35d08a6 455 }
tass 68:0847e35d08a6 456 c = nxt;
tass 68:0847e35d08a6 457 }
tass 68:0847e35d08a6 458 zmq_check_state(z);
tass 68:0847e35d08a6 459 return 0;
tass 68:0847e35d08a6 460 }
tass 68:0847e35d08a6 461
tass 68:0847e35d08a6 462 void zmq_close(ZMQ z)
tass 68:0847e35d08a6 463 {
tass 68:0847e35d08a6 464 struct zmq_connector *nxt, *c = z->subs;
tass 68:0847e35d08a6 465 while(c) {
tass 68:0847e35d08a6 466 nxt = c->next;
tass 68:0847e35d08a6 467 zmq_connector_del(c);
tass 68:0847e35d08a6 468 c = nxt;
tass 68:0847e35d08a6 469 }
tass 68:0847e35d08a6 470 pico_socket_close(z->sock);
tass 68:0847e35d08a6 471 pico_free(z);
tass 68:0847e35d08a6 472 }