Build Realtime Apps With The Real-Time Network - the mbed PubNub API+SDK
The PubNub library enables your mbed board to communicate with the world via the PubNub cloud messaging system.
The library provides a PubNub class that is tied to a particular set of keys and offers methods that correspond to the appropriate API methods - publish, subscribe, history, etc. The JSON encoded messages are passed as raw strings to conserve memory, but at your option, you can use e.g. picojson library to deal with JSON. The API is synchronous - use multiple rtos threads to talk to PubNub on the background.
Getting Started
Can't wait to try it out? Connect your mbed application board and proceed to the demo project:
Import programPubNubDemo
Reference demo of the PubNub library for the mbed application board - control your board over the internet!
Library Usage
Import library
Public Member Functions |
|
PubNub (const char *publish_key, const char *subscribe_key, const char *origin="http://pubsub.pubnub.com") | |
Init a Pubnub Client context.
|
|
PubNubRes | publish (const char *channel, const char *message, char **reply=NULL) |
Publish API call.
|
|
PubNubRes | subscribe (const char *channel, char **reply) |
Subscribe API call.
|
|
PubNubRes | history (const char *channel, char **reply, int *replysize, int limit=10) |
History API call.
|
|
PubNubRes | time (char *ts) |
Time API call.
|
Diff: PubNub.cpp
- Revision:
- 0:9858347c382d
- Child:
- 5:1c98f24712e1
diff -r 000000000000 -r 9858347c382d PubNub.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/PubNub.cpp Sun Mar 02 01:32:54 2014 +0000 @@ -0,0 +1,528 @@ +#include <cstring> + +#include "mbed.h" +#include "EthernetInterface.h" + +#include "PubNub.h" + + +#define DBG if (0) // change to if (1) to enable debug prints on USB serial + + +/* We roll our own HTTP communication stack as donatien's HTTPClient has + * some hard limits on URL length and this will allow a more frugal RAM + * usage for us. */ + +/* Also, we pass messages as serialized JSON strings instead of parsed + * structures. The available JSON parsers (picojson in particular) are + * heavy on many tiny memory allocations, which can get very troublesome + * with mbed's malloc() - we get very high memory fragmentation. In many + * cases, working with raw JSON is just fine, and in our demo example, + * we still show picojson usage for end-user parsing of complex incoming + * messages. */ + + +/** Custom no-frills HTTP stream. */ + +class PubNubHTTP: public TCPSocketConnection { +public: + /* Connect http socket to origin. */ + PubNubRes http_connect(const char *hostname); + + /* These methods are used to send the request type and URL. */ + /* Send a given NUL-terminated string over the http socket. */ + void sendstr(const char *string); + /* Send a given NUL-terminated string over the http socket, URL encoded. */ + void sendstr_urlenc(const char *string); + + /* A common HTTP request "bottom half" - send the rest of headers + * and wait for reply + receive. */ + PubNubRes http_request_bh(const char *host, char **reply, int *replylen); + +protected: + /* These methods represent sub-stages of http_request_bh(). + * They share the state of m_replybuf and m_replylen. */ + PubNubRes http_send_headers(const char *host); + PubNubRes http_recv_headers(); + PubNubRes http_recv_content(); + + char *m_replybuf; + int m_replylen; + int m_content_length; +}; + +PubNubRes +PubNubHTTP::http_connect(const char *hostname) +{ + int ret = connect(hostname, 80); + if (ret < 0) { + close(); + return PNR_IO_ERROR; + } + return PNR_OK; +} + +void +PubNubHTTP::sendstr(const char *s) +{ + send_all((char *) s, strlen(s)); +} + +void +PubNubHTTP::sendstr_urlenc(const char *s) +{ + while (s[0]) { + /* RFC 3986 Unreserved characters plus few + * safe reserved ones. */ + size_t okspan = strspn(s, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_.~" ",=:;@[]"); + if (okspan > 0) { + send_all((char *) s, okspan); + s += okspan; + } + if (s[0]) { + /* %-encode a non-ok character. */ + char enc[3] = {'%'}; + enc[1] = "0123456789ABCDEF"[s[0] / 16]; + enc[2] = "0123456789ABCDEF"[s[0] % 16]; + send_all((char *) enc, 3); + s++; + } + } +} + +PubNubRes +PubNubHTTP::http_request_bh(const char *host, char **reply, int *replylen) +{ + m_replybuf = NULL; + m_replylen = 0; + + PubNubRes res; + res = http_send_headers(host); + if (res != PNR_OK) { + close(); + return res; + } + + res = http_recv_headers(); + if (res != PNR_OK) { + if (m_replybuf) free(m_replybuf); + close(); + return res; + } + + res = http_recv_content(); + if (res != PNR_OK) { + if (m_replybuf) free(m_replybuf); + close(); + return res; + } + + close(); + *reply = (char *) realloc(m_replybuf, m_replylen); + *replylen = m_replylen; + return PNR_OK; +} + +PubNubRes +PubNubHTTP::http_send_headers(const char *host) +{ + /* Finish the first line of the request. */ + sendstr(" HTTP/1.1\r\n"); + /* Finish HTTP request. */ + sendstr("Host: "); + sendstr(host); + sendstr("\r\nUser-Agent: PubNub-mbed/0.1\r\nConnection: close\r\n\r\n"); + if (!is_connected()) + return PNR_IO_ERROR; + return PNR_OK; +} + +PubNubRes +PubNubHTTP::http_recv_headers() +{ + /* Now, read HTTP reply. */ + m_replybuf = (char *) malloc(PUBNUB_REPLY_MAXLEN+1); + + /* First, receive headers. */ + /* XXX: For now, we assume complete headers will fit in m_replybuf. */ + m_replylen = receive_all(m_replybuf, PUBNUB_REPLY_MAXLEN); + if (m_replylen < 0) + return PNR_IO_ERROR; + m_replybuf[m_replylen] = 0; + + char *bufptr = m_replybuf; + + /* Parse the first line. */ + if (strncmp(bufptr, "HTTP/1.", 7) || !bufptr[7] || !bufptr[8]) + return PNR_IO_ERROR; + int http_code = atoi(bufptr+9); + if (http_code / 100 != 2) + return PNR_HTTP_ERROR; + bufptr = strstr(bufptr, "\r\n"); + if (!bufptr) + return PNR_IO_ERROR; + bufptr += 2; + + /* Parse the rest of the lines. */ + m_content_length = 0; + bool is_chunked = false; + char *newline; + for (;; bufptr = newline+2) { + newline = strstr(bufptr, "\r\n"); + if (!newline) + return PNR_IO_ERROR; + *newline = 0; + + char h_chunked[] = "Transfer-Encoding: chunked"; + char h_length[] = "Content-Length: "; + if (*bufptr == 0) { + /* Empty line. End of headers. */ + bufptr += 2; + break; + } else if (!strncmp(bufptr, h_chunked, sizeof(h_chunked)-1)) { + /* Transfer-Encoding: chunked */ + is_chunked = true; + } else if (!strncmp(bufptr, h_length, sizeof(h_length)-1)) { + /* Content-Length: ... */ + m_content_length = atoi(bufptr + sizeof(h_length)-1); + } + } + + /* Possibly process a chunk header. */ + if (is_chunked) { + m_content_length = atoi(bufptr); + bufptr = strstr(bufptr, "\r\n"); + if (!bufptr) + return PNR_IO_ERROR; + bufptr += 2; + } + + /* Consolidate the buffer. */ + m_replylen -= bufptr - m_replybuf; + memmove(m_replybuf, bufptr, m_replylen); + + return PNR_OK; +} + +PubNubRes +PubNubHTTP::http_recv_content() +{ + /* Now, we are ready to process data! */ + + if (m_content_length > PUBNUB_REPLY_MAXLEN) { + /* XXX: Actually, too much data, sorry. */ + DBG printf("too much data\r\n"); + return PNR_FORMAT_ERROR; + } + + if (m_replylen < m_content_length) { + /* More data coming. */ + int recv2len = receive_all(m_replybuf + m_replylen, PUBNUB_REPLY_MAXLEN - m_replylen); + if (recv2len < 0) + return PNR_IO_ERROR; + m_replylen += recv2len; + } + + if (m_replylen < m_content_length) + return PNR_IO_ERROR; /* Incomplete data. */ + return PNR_OK; +} + + +/** Some utility routines. */ + +/* Split a JSON array (with arbitrary contents) to multiple NUL-terminated + * C strings. */ +static PubNubRes +split_array(char *buf, int len) +{ + bool escaped = false, in_string = false; + int bracket_level = 0; + for (int i = 0; i < len; i++) { + if (escaped) { + escaped = false; + } else if (in_string) { + switch (buf[i]) { + case '\\': escaped = true; break; + case '"': in_string = false; break; + default: break; + } + } else { + switch (buf[i]) { + case '"': in_string = true; break; + case '[': case '{': bracket_level++; break; + case ']': case '}': bracket_level--; break; + /* if at root, split! */ + case ',': if (bracket_level == 0) buf[i] = 0; break; + default: break; + } + } + } + DBG printf("parse %d %d %d\r\n", escaped, in_string, bracket_level); + if (escaped || in_string || bracket_level > 0) + return PNR_FORMAT_ERROR; + return PNR_OK; +} + + +/** PubNub API. */ + +PubNub::PubNub(const char *publish_key_, const char *subscribe_key_, const char *origin_) : + m_publish_key(publish_key_), m_subscribe_key(subscribe_key_), m_origin(origin_), + m_replybuf(NULL), m_replylen(0) +{ + strcpy(m_timetoken, "0"); +} + +PubNub::~PubNub() +{ + if (m_replybuf) + free(m_replybuf); +} + +const char * +PubNub::origin_hostname() +{ + /* TODO: More generic URL handling */ + return m_origin + strlen("http://"); +} + + +PubNubRes +PubNub::publish(const char *channel, const char *message, char **reply) +{ + PubNubHTTP http; + PubNubRes res = http.http_connect(origin_hostname()); + if (res != PNR_OK) + return res; + + http.sendstr("GET /publish/"); + http.sendstr(m_publish_key); + http.sendstr("/"); + http.sendstr(m_subscribe_key); + http.sendstr("/0/"); + http.sendstr(channel); + http.sendstr("/0/"); + http.sendstr_urlenc(message); + + char *locreply; + if (!reply) + reply = &locreply; + int replylen; + + res = http.http_request_bh(origin_hostname(), reply, &replylen); + if (res != PNR_OK) + return res; + bool success = (*reply)[1] == '1' && (*reply)[2] == ','; + if (reply == &locreply) + free(locreply); + return success ? PNR_OK : PNR_PUBNUB_ERROR; +} + + +PubNubRes +PubNub::subscribe(const char *channel, char **reply) +{ + if (m_replybuf) { + int prevlen = strlen(m_replybuf); + //DBG printf("reply (%s) %d > %d: %d\r\n", m_replybuf, prevlen, m_replylen); + if (prevlen < m_replylen) { + /* Next message from stash-away buffer. */ + /* XXX: We can be either memory-frugal or CPU-frugal + * here. We choose to be memory-frugal by copying + * over messages many times, but we may want to make + * this configurable. */ + m_replylen -= prevlen + 1; + memmove(m_replybuf, m_replybuf + prevlen + 1, m_replylen); + m_replybuf = (char *) realloc(m_replybuf, m_replylen); + *reply = m_replybuf; + return PNR_OK; + + } else { + /* That's all. free() and fetch new messages. */ + free(m_replybuf); + m_replybuf = NULL; + m_replylen = 0; + } + } + + PubNubHTTP http; + PubNubRes res; + res = http.http_connect(origin_hostname()); + if (res != PNR_OK) + return res; + + http.sendstr("GET /subscribe/"); + http.sendstr(m_subscribe_key); + http.sendstr("/"); + http.sendstr(channel); + http.sendstr("/0/"); + http.sendstr(m_timetoken); + + char *replybuf = NULL; + int replylen; + res = http.http_request_bh(origin_hostname(), &replybuf, &replylen); + if (res != PNR_OK) + goto error; + + /* Process the reply, sets timetoken and m_replybuf, m_replylen. */ + res = subscribe_processjson(replybuf, replylen); + if (res != PNR_OK) + goto error; + replybuf = NULL; // freed by processjosn + + /* Split JSON array to messages. */ + res = split_array(m_replybuf, m_replylen); + if (res != PNR_OK) { + free(m_replybuf); + goto error; + } + + *reply = m_replybuf; + return res; + +error: + if (res == PNR_FORMAT_ERROR) { + /* In case of PubNub protocol error, abort an ongoing + * subscribe and start over. This means some messages + * were lost, but allows us to recover from bad + * situations, e.g. too many messages queued or + * unexpected problem caused by a particular message. */ + strcpy(m_timetoken, "0"); + } + if (reply) + free(reply); + m_replybuf = NULL; + m_replylen = 0; + return res; +} + +PubNubRes +PubNub::subscribe_processjson(char *reply, int replylen) +{ + if (reply[0] != '[' || reply[replylen-1] != ']' + || reply[replylen-2] != '"') { + DBG printf("bad reply '%s'\r\n", reply); + return PNR_FORMAT_ERROR; + } + + /* Extract timetoken. */ + reply[replylen-2] = 0; + int i; + for (i = replylen-3; i > 0 && i > int(replylen-3 - (sizeof(m_timetoken)-1)); i--) + if (reply[i] == '"') + break; + if (!i || reply[i-1] != ',' || replylen-2 - (i+1) >= 64) { + DBG printf("bad reply '%s'\r\n", reply); + return PNR_FORMAT_ERROR; + } + strcpy(m_timetoken, &reply[i+1]); + reply[i-1] = 0; // terminate the [] message array + + /* Empty reply? */ + if (i == 4) { /* "[[]" */ + free(reply); + m_replybuf = NULL; + m_replylen = 0; + return PNR_OK; + } + + /* Extract the messages array. */ + if (reply[1] != '[' || reply[i-2] != ']') { + DBG printf("bad reply end '%s'\r\n", reply); + return PNR_FORMAT_ERROR; + } + reply[i-2] = 0; + + /* Shrink memory buffer to bare minimum. */ + memmove(reply, reply + 2, i-2-2); + m_replylen = i-2-2; + m_replybuf = (char *) realloc(reply, m_replylen); + + return PNR_OK; +} + + +PubNubRes +PubNub::history(const char *channel, char **reply, int *replysize, int limit) +{ + PubNubHTTP http; + PubNubRes res; + res = http.http_connect(origin_hostname()); + if (res != PNR_OK) + return res; + + http.sendstr("GET /history/"); + http.sendstr(m_subscribe_key); + http.sendstr("/"); + http.sendstr(channel); + http.sendstr("/0/"); + char limitbuf[8]; snprintf(limitbuf, sizeof(limitbuf), "%d", limit); + http.sendstr(limitbuf); + + char *replybuf = NULL; + int replylen; + res = http.http_request_bh(origin_hostname(), &replybuf, &replylen); + if (res != PNR_OK) + goto error; + + /* Extract from the array and split it. */ + + if (replybuf[0] != '[' || replybuf[replylen-1] != ']') { + res = PNR_FORMAT_ERROR; + goto error; + } + + replylen -= 2; + if (replylen == 0) { /* The reply was [] */ + free(replybuf); + *reply = NULL; + *replysize = 0; + return PNR_OK; + } + + memmove(replybuf, replybuf + 1, replylen); + replybuf[replylen] = 0; + + res = split_array(replybuf, replylen); + if (res != PNR_OK) + goto error; + + *reply = replybuf; + *replysize = replylen; + return PNR_OK; + +error: + free(replybuf); + return res; +} + + +PubNubRes +PubNub::time(char *ts) +{ + PubNubHTTP http; + PubNubRes res = http.http_connect(origin_hostname()); + if (res != PNR_OK) + return res; + + http.sendstr("GET /time/0"); + + char *reply; + int replylen; + res = http.http_request_bh(origin_hostname(), &reply, &replylen); + if (res != PNR_OK) + return res; + + if (replylen < 3 || replylen > 32 || reply[0] != '[' || reply[replylen-1] != ']') { + free(reply); + return PNR_FORMAT_ERROR; + } + + replylen -= 2; + memcpy(ts, reply + 1, replylen); + ts[replylen] = 0; + + free(reply); + return PNR_OK; +} \ No newline at end of file