Build Realtime Apps With The Real-Time Network - the mbed PubNub API+SDK


The PubNub library enables your mbed board to communicate with the world via the PubNub cloud messaging system.

The library provides a PubNub class that is tied to a particular set of keys and offers methods that correspond to the appropriate API methods - publish, subscribe, history, etc. The JSON encoded messages are passed as raw strings to conserve memory, but at your option, you can use e.g. picojson library to deal with JSON. The API is synchronous - use multiple rtos threads to talk to PubNub on the background.

Getting Started

Can't wait to try it out? Connect your mbed application board and proceed to the demo project:

Import programPubNubDemo

Reference demo of the PubNub library for the mbed application board - control your board over the internet!

Library Usage

Import library

Public Member Functions

PubNub (const char *publish_key, const char *subscribe_key, const char *origin="")
Init a Pubnub Client context.
PubNubRes publish (const char *channel, const char *message, char **reply=NULL)
Publish API call.
PubNubRes subscribe (const char *channel, char **reply)
Subscribe API call.
PubNubRes history (const char *channel, char **reply, int *replysize, int limit=10)
History API call.
PubNubRes time (char *ts)
Time API call.
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/PubNub.cpp	Sun Mar 02 01:32:54 2014 +0000
@@ -0,0 +1,528 @@
+#include <cstring>
+#include "mbed.h"
+#include "EthernetInterface.h"
+#include "PubNub.h"
+#define DBG if (0) // change to if (1) to enable debug prints on USB serial
+/* We roll our own HTTP communication stack as donatien's HTTPClient has
+ * some hard limits on URL length and this will allow a more frugal RAM
+ * usage for us. */
+/* Also, we pass messages as serialized JSON strings instead of parsed
+ * structures.  The available JSON parsers (picojson in particular) are
+ * heavy on many tiny memory allocations, which can get very troublesome
+ * with mbed's malloc() - we get very high memory fragmentation.  In many
+ * cases, working with raw JSON is just fine, and in our demo example,
+ * we still show picojson usage for end-user parsing of complex incoming
+ * messages. */
+/** Custom no-frills HTTP stream. */
+class PubNubHTTP: public TCPSocketConnection {
+    /* Connect http socket to origin. */
+    PubNubRes http_connect(const char *hostname);
+    /* These methods are used to send the request type and URL. */
+    /* Send a given NUL-terminated string over the http socket. */
+    void sendstr(const char *string);
+    /* Send a given NUL-terminated string over the http socket, URL encoded. */
+    void sendstr_urlenc(const char *string);
+    /* A common HTTP request "bottom half" - send the rest of headers
+     * and wait for reply + receive. */
+    PubNubRes http_request_bh(const char *host, char **reply, int *replylen);
+    /* These methods represent sub-stages of http_request_bh().
+     * They share the state of m_replybuf and m_replylen. */
+    PubNubRes http_send_headers(const char *host);
+    PubNubRes http_recv_headers();
+    PubNubRes http_recv_content();
+    char *m_replybuf;
+    int m_replylen;
+    int m_content_length;
+PubNubHTTP::http_connect(const char *hostname)
+    int ret = connect(hostname, 80);
+    if (ret < 0) {
+        close();
+        return PNR_IO_ERROR;
+    }
+    return PNR_OK;
+PubNubHTTP::sendstr(const char *s)
+    send_all((char *) s, strlen(s));
+PubNubHTTP::sendstr_urlenc(const char *s)
+    while (s[0]) {
+        /* RFC 3986 Unreserved characters plus few
+         * safe reserved ones. */
+        size_t okspan = strspn(s, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_.~" ",=:;@[]");
+        if (okspan > 0) {
+            send_all((char *) s, okspan);
+            s += okspan;
+        }
+        if (s[0]) {
+            /* %-encode a non-ok character. */
+            char enc[3] = {'%'};
+            enc[1] = "0123456789ABCDEF"[s[0] / 16];
+            enc[2] = "0123456789ABCDEF"[s[0] % 16];
+            send_all((char *) enc, 3);
+            s++;
+        }
+    }
+PubNubHTTP::http_request_bh(const char *host, char **reply, int *replylen)
+    m_replybuf = NULL;
+    m_replylen = 0;
+    PubNubRes res;
+    res = http_send_headers(host);
+    if (res != PNR_OK) {
+        close();
+        return res;
+    }
+    res = http_recv_headers();
+    if (res != PNR_OK) {
+        if (m_replybuf) free(m_replybuf);
+        close();
+        return res;
+    }
+    res = http_recv_content();
+    if (res != PNR_OK) {
+        if (m_replybuf) free(m_replybuf);
+        close();
+        return res;
+    }
+    close();
+    *reply = (char *) realloc(m_replybuf, m_replylen);
+    *replylen = m_replylen;
+    return PNR_OK;
+PubNubHTTP::http_send_headers(const char *host)
+    /* Finish the first line of the request. */
+    sendstr(" HTTP/1.1\r\n");
+    /* Finish HTTP request. */
+    sendstr("Host: ");
+    sendstr(host);
+    sendstr("\r\nUser-Agent: PubNub-mbed/0.1\r\nConnection: close\r\n\r\n");
+    if (!is_connected())
+        return PNR_IO_ERROR;
+    return PNR_OK;
+    /* Now, read HTTP reply. */
+    m_replybuf = (char *) malloc(PUBNUB_REPLY_MAXLEN+1);
+    /* First, receive headers. */
+    /* XXX: For now, we assume complete headers will fit in m_replybuf. */
+    m_replylen = receive_all(m_replybuf, PUBNUB_REPLY_MAXLEN);
+    if (m_replylen < 0)
+        return PNR_IO_ERROR;
+    m_replybuf[m_replylen] = 0;
+    char *bufptr = m_replybuf;
+    /* Parse the first line. */
+    if (strncmp(bufptr, "HTTP/1.", 7) || !bufptr[7] || !bufptr[8])
+        return PNR_IO_ERROR;
+    int http_code = atoi(bufptr+9);
+    if (http_code / 100 != 2)
+        return PNR_HTTP_ERROR;
+    bufptr = strstr(bufptr, "\r\n");
+    if (!bufptr)
+        return PNR_IO_ERROR;
+    bufptr += 2;
+    /* Parse the rest of the lines. */
+    m_content_length = 0;
+    bool is_chunked = false;
+    char *newline;
+    for (;; bufptr = newline+2) {
+        newline = strstr(bufptr, "\r\n");
+        if (!newline)
+            return PNR_IO_ERROR;
+        *newline = 0;
+        char h_chunked[] = "Transfer-Encoding: chunked";
+        char h_length[] = "Content-Length: ";
+        if (*bufptr == 0) {
+            /* Empty line. End of headers. */
+            bufptr += 2;
+            break;
+        } else if (!strncmp(bufptr, h_chunked, sizeof(h_chunked)-1)) {
+            /* Transfer-Encoding: chunked */
+            is_chunked = true;
+        } else if (!strncmp(bufptr, h_length, sizeof(h_length)-1)) {
+            /* Content-Length: ... */
+            m_content_length = atoi(bufptr + sizeof(h_length)-1);
+        }
+    }
+    /* Possibly process a chunk header. */
+    if (is_chunked) {
+        m_content_length = atoi(bufptr);
+        bufptr = strstr(bufptr, "\r\n");
+        if (!bufptr)
+            return PNR_IO_ERROR;
+        bufptr += 2;
+    }
+    /* Consolidate the buffer. */
+    m_replylen -= bufptr - m_replybuf;
+    memmove(m_replybuf, bufptr, m_replylen);
+    return PNR_OK;
+    /* Now, we are ready to process data! */
+    if (m_content_length > PUBNUB_REPLY_MAXLEN) {
+        /* XXX: Actually, too much data, sorry. */
+        DBG printf("too much data\r\n");
+        return PNR_FORMAT_ERROR;
+    }
+    if (m_replylen < m_content_length) {
+        /* More data coming. */
+        int recv2len = receive_all(m_replybuf + m_replylen, PUBNUB_REPLY_MAXLEN - m_replylen);
+        if (recv2len < 0)
+            return PNR_IO_ERROR;
+        m_replylen += recv2len;
+    }
+    if (m_replylen < m_content_length)
+        return PNR_IO_ERROR; /* Incomplete data. */
+    return PNR_OK;
+/** Some utility routines. */
+/* Split a JSON array (with arbitrary contents) to multiple NUL-terminated
+ * C strings. */
+static PubNubRes
+split_array(char *buf, int len)
+    bool escaped = false, in_string = false;
+    int bracket_level = 0;
+    for (int i = 0; i < len; i++) {
+        if (escaped) {
+            escaped = false;
+        } else if (in_string) {
+            switch (buf[i]) {
+                case '\\': escaped = true; break;
+                case '"': in_string = false; break;
+                default: break;
+            }
+        } else {
+            switch (buf[i]) {
+                case '"': in_string = true; break;
+                case '[': case '{': bracket_level++; break;
+                case ']': case '}': bracket_level--; break;
+                /* if at root, split! */
+                case ',': if (bracket_level == 0) buf[i] = 0; break;
+                default: break;
+            }
+        }
+    }
+    DBG printf("parse %d %d %d\r\n", escaped, in_string, bracket_level);
+    if (escaped || in_string || bracket_level > 0)
+        return PNR_FORMAT_ERROR;
+    return PNR_OK;
+/** PubNub API. */
+PubNub::PubNub(const char *publish_key_, const char *subscribe_key_, const char *origin_) :
+    m_publish_key(publish_key_), m_subscribe_key(subscribe_key_), m_origin(origin_),
+    m_replybuf(NULL), m_replylen(0)
+    strcpy(m_timetoken, "0");
+    if (m_replybuf)
+        free(m_replybuf);
+const char *
+    /* TODO: More generic URL handling */
+    return m_origin + strlen("http://");
+PubNub::publish(const char *channel, const char *message, char **reply)
+    PubNubHTTP http;
+    PubNubRes res = http.http_connect(origin_hostname());
+    if (res != PNR_OK)
+        return res;
+    http.sendstr("GET /publish/");
+    http.sendstr(m_publish_key);
+    http.sendstr("/");
+    http.sendstr(m_subscribe_key);
+    http.sendstr("/0/");
+    http.sendstr(channel);
+    http.sendstr("/0/");
+    http.sendstr_urlenc(message);
+    char *locreply;
+    if (!reply)
+        reply = &locreply;
+    int replylen;
+    res = http.http_request_bh(origin_hostname(), reply, &replylen);
+    if (res != PNR_OK)
+        return res;
+    bool success = (*reply)[1] == '1' && (*reply)[2] == ',';
+    if (reply == &locreply)
+        free(locreply);
+    return success ? PNR_OK : PNR_PUBNUB_ERROR;
+PubNub::subscribe(const char *channel, char **reply)
+    if (m_replybuf) {
+        int prevlen = strlen(m_replybuf);
+        //DBG printf("reply (%s) %d > %d: %d\r\n", m_replybuf, prevlen, m_replylen);
+        if (prevlen < m_replylen) {
+            /* Next message from stash-away buffer. */
+            /* XXX: We can be either memory-frugal or CPU-frugal
+             * here. We choose to be memory-frugal by copying
+             * over messages many times, but we may want to make
+             * this configurable. */
+            m_replylen -= prevlen + 1;
+            memmove(m_replybuf, m_replybuf + prevlen + 1, m_replylen);
+            m_replybuf = (char *) realloc(m_replybuf, m_replylen);
+            *reply = m_replybuf;
+            return PNR_OK;
+        } else {
+            /* That's all. free() and fetch new messages. */
+            free(m_replybuf);
+            m_replybuf = NULL;
+            m_replylen = 0;
+        }
+    }
+    PubNubHTTP http;
+    PubNubRes res;
+    res = http.http_connect(origin_hostname());
+    if (res != PNR_OK)
+        return res;
+    http.sendstr("GET /subscribe/");
+    http.sendstr(m_subscribe_key);
+    http.sendstr("/");
+    http.sendstr(channel);
+    http.sendstr("/0/");
+    http.sendstr(m_timetoken);
+    char *replybuf = NULL;
+    int replylen;
+    res = http.http_request_bh(origin_hostname(), &replybuf, &replylen);
+    if (res != PNR_OK)
+        goto error;
+    /* Process the reply, sets timetoken and m_replybuf, m_replylen. */
+    res = subscribe_processjson(replybuf, replylen);
+    if (res != PNR_OK)
+        goto error;
+    replybuf = NULL; // freed by processjosn
+    /* Split JSON array to messages. */
+    res = split_array(m_replybuf, m_replylen);
+    if (res != PNR_OK) {
+        free(m_replybuf);
+        goto error;
+    }
+    *reply = m_replybuf;
+    return res;
+    if (res == PNR_FORMAT_ERROR) {
+        /* In case of PubNub protocol error, abort an ongoing
+         * subscribe and start over. This means some messages
+         * were lost, but allows us to recover from bad
+         * situations, e.g. too many messages queued or
+         * unexpected problem caused by a particular message. */
+        strcpy(m_timetoken, "0");
+    }
+    if (reply)
+        free(reply);
+    m_replybuf = NULL;
+    m_replylen = 0;
+    return res;
+PubNub::subscribe_processjson(char *reply, int replylen)
+    if (reply[0] != '[' || reply[replylen-1] != ']'
+        || reply[replylen-2] != '"') {
+        DBG printf("bad reply '%s'\r\n", reply);
+        return PNR_FORMAT_ERROR;
+    }
+    /* Extract timetoken. */
+    reply[replylen-2] = 0;
+    int i;
+    for (i = replylen-3; i > 0 && i > int(replylen-3 - (sizeof(m_timetoken)-1)); i--)
+        if (reply[i] == '"')
+            break;
+    if (!i || reply[i-1] != ',' || replylen-2 - (i+1) >= 64) {
+        DBG printf("bad reply '%s'\r\n", reply);
+        return PNR_FORMAT_ERROR;
+    }
+    strcpy(m_timetoken, &reply[i+1]);
+    reply[i-1] = 0; // terminate the [] message array
+    /* Empty reply? */
+    if (i == 4) { /* "[[]" */
+        free(reply);
+        m_replybuf = NULL;
+        m_replylen = 0;
+        return PNR_OK;
+    }
+    /* Extract the messages array. */
+    if (reply[1] != '[' || reply[i-2] != ']') {
+        DBG printf("bad reply end '%s'\r\n", reply);
+        return PNR_FORMAT_ERROR;
+    }
+    reply[i-2] = 0;
+    /* Shrink memory buffer to bare minimum. */
+    memmove(reply, reply + 2, i-2-2);
+    m_replylen = i-2-2;
+    m_replybuf = (char *) realloc(reply, m_replylen);
+    return PNR_OK;
+PubNub::history(const char *channel, char **reply, int *replysize, int limit)
+    PubNubHTTP http;
+    PubNubRes res;
+    res = http.http_connect(origin_hostname());
+    if (res != PNR_OK)
+        return res;
+    http.sendstr("GET /history/");
+    http.sendstr(m_subscribe_key);
+    http.sendstr("/");
+    http.sendstr(channel);
+    http.sendstr("/0/");
+    char limitbuf[8]; snprintf(limitbuf, sizeof(limitbuf), "%d", limit);
+    http.sendstr(limitbuf);
+    char *replybuf = NULL;
+    int replylen;
+    res = http.http_request_bh(origin_hostname(), &replybuf, &replylen);
+    if (res != PNR_OK)
+        goto error;
+    /* Extract from the array and split it. */
+    if (replybuf[0] != '[' || replybuf[replylen-1] != ']') {
+        res = PNR_FORMAT_ERROR;
+        goto error;
+    }
+    replylen -= 2;
+    if (replylen == 0) { /* The reply was [] */
+        free(replybuf);
+        *reply = NULL;
+        *replysize = 0;
+        return PNR_OK;
+    }
+    memmove(replybuf, replybuf + 1, replylen);
+    replybuf[replylen] = 0;
+    res = split_array(replybuf, replylen);
+    if (res != PNR_OK)
+        goto error;
+    *reply = replybuf;
+    *replysize = replylen;
+    return PNR_OK;
+    free(replybuf);
+    return res;
+PubNub::time(char *ts)
+    PubNubHTTP http;
+    PubNubRes res = http.http_connect(origin_hostname());
+    if (res != PNR_OK)
+        return res;
+    http.sendstr("GET /time/0");
+    char *reply;
+    int replylen;
+    res = http.http_request_bh(origin_hostname(), &reply, &replylen);
+    if (res != PNR_OK)
+        return res;
+    if (replylen < 3 || replylen > 32 || reply[0] != '[' || reply[replylen-1] != ']') {
+        free(reply);
+        return PNR_FORMAT_ERROR;
+    }
+    replylen -= 2;
+    memcpy(ts, reply + 1, replylen);
+    ts[replylen] = 0;
+    free(reply);
+    return PNR_OK;
\ No newline at end of file