Build Realtime Apps With The Real-Time Network - the mbed PubNub API+SDK

Dependents:   PubNubDemo Cellular_PubNubDemo lpc4088_ebb_ublox_Cellular_PubNubDemo PubNubDemo_LPC4088 ... more

Fork of PubNub by Petr Baudis

PubNub

The PubNub library enables your mbed board to communicate with the world via the PubNub cloud messaging system.

The library provides a PubNub class that is tied to a particular set of keys and offers methods that correspond to the appropriate API methods - publish, subscribe, history, etc. The JSON encoded messages are passed as raw strings to conserve memory, but at your option, you can use e.g. picojson library to deal with JSON. The API is synchronous - use multiple rtos threads to talk to PubNub on the background.

Getting Started

Can't wait to try it out? Connect your mbed application board and proceed to the demo project:

Import programPubNubDemo

Reference demo of the PubNub library for the mbed application board - control your board over the internet!

Library Usage

Import library

Public Member Functions

PubNub (const char *publish_key, const char *subscribe_key, const char *origin="http://pubsub.pubnub.com")
Init a Pubnub Client context.
PubNubRes publish (const char *channel, const char *message, char **reply=NULL)
Publish API call.
PubNubRes subscribe (const char *channel, char **reply)
Subscribe API call.
PubNubRes history (const char *channel, char **reply, int *replysize, int limit=10)
History API call.
PubNubRes time (char *ts)
Time API call.

PubNub.cpp

Committer:
pasky
Date:
2014-11-20
Revision:
8:c9f79982b5ca
Parent:
7:a7bbafe53f2d
Parent:
6:09d0a383fdde

File content as of revision 8:c9f79982b5ca:

#include <cstring>

#include "mbed.h"
#include "TCPSocketConnection.h"

#include "PubNub.h"


#define DBG if (0) // change to if (1) to enable debug prints on USB serial


/* We roll our own HTTP communication stack as donatien's HTTPClient has
 * some hard limits on URL length and this will allow a more frugal RAM
 * usage for us. */

/* Also, we pass messages as serialized JSON strings instead of parsed
 * structures.  The available JSON parsers (picojson in particular) are
 * heavy on many tiny memory allocations, which can get very troublesome
 * with mbed's malloc() - we get very high memory fragmentation.  In many
 * cases, working with raw JSON is just fine, and in our demo example,
 * we still show picojson usage for end-user parsing of complex incoming
 * messages. */


/** Custom no-frills HTTP stream. */

class PubNubHTTP: public TCPSocketConnection {
public:
    /* Connect http socket to origin. */
    PubNubRes http_connect(const char *hostname);

    /* These methods are used to send the request type and URL. */
    /* Send a given NUL-terminated string over the http socket. */
    void sendstr(const char *string);
    /* Send a given NUL-terminated string over the http socket, URL encoded. */
    void sendstr_urlenc(const char *string);

    /* A common HTTP request "bottom half" - send the rest of headers
     * and wait for reply + receive. */
    PubNubRes http_request_bh(const char *host, char **reply, int *replylen);

protected:
    /* These methods represent sub-stages of http_request_bh().
     * They share the state of m_replybuf and m_replylen. */
    PubNubRes http_send_headers(const char *host);
    PubNubRes http_recv_headers();
    PubNubRes http_recv_content();

    char *m_replybuf;
    int m_replylen;
    int m_content_length;
};

PubNubRes
PubNubHTTP::http_connect(const char *hostname)
{
    int ret = connect(hostname, 80);
    if (ret < 0) {
        close();
        return PNR_IO_ERROR;
    }
    return PNR_OK;
}

void
PubNubHTTP::sendstr(const char *s)
{
    send_all((char *) s, strlen(s));
}

void
PubNubHTTP::sendstr_urlenc(const char *s)
{
    while (s[0]) {
        /* RFC 3986 Unreserved characters plus few
         * safe reserved ones. */
        size_t okspan = strspn(s, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_.~" ",=:;@[]");
        if (okspan > 0) {
            send_all((char *) s, okspan);
            s += okspan;
        }
        if (s[0]) {
            /* %-encode a non-ok character. */
            char enc[3] = {'%'};
            enc[1] = "0123456789ABCDEF"[s[0] / 16];
            enc[2] = "0123456789ABCDEF"[s[0] % 16];
            send_all((char *) enc, 3);
            s++;
        }
    }
}

PubNubRes
PubNubHTTP::http_request_bh(const char *host, char **reply, int *replylen)
{
    m_replybuf = NULL;
    m_replylen = 0;

    PubNubRes res;
    res = http_send_headers(host);
    if (res != PNR_OK) {
        close();
        return res;
    }

    res = http_recv_headers();
    if (res != PNR_OK) {
        if (m_replybuf) free(m_replybuf);
        close();
        return res;
    }

    res = http_recv_content();
    if (res != PNR_OK) {
        if (m_replybuf) free(m_replybuf);
        close();
        return res;
    }

    close();
    *reply = (char *) realloc(m_replybuf, m_replylen);
    *replylen = m_replylen;
    return PNR_OK;
}

PubNubRes
PubNubHTTP::http_send_headers(const char *host)
{
    /* Finish the first line of the request. */
    sendstr(" HTTP/1.1\r\n");
    /* Finish HTTP request. */
    sendstr("Host: ");
    sendstr(host);
    sendstr("\r\nUser-Agent: PubNub-mbed/0.1\r\nConnection: close\r\n\r\n");
    if (!is_connected())
        return PNR_IO_ERROR;
    return PNR_OK;
}

PubNubRes
PubNubHTTP::http_recv_headers()
{
    /* Now, read HTTP reply. */
    m_replybuf = (char *) malloc(PUBNUB_REPLY_MAXLEN+1);

    /* First, receive headers. */
    /* XXX: For now, we assume complete headers will fit in m_replybuf. */
    m_replylen = receive_all(m_replybuf, PUBNUB_REPLY_MAXLEN);
    if (m_replylen < 0)
        return PNR_IO_ERROR;
    m_replybuf[m_replylen] = 0;

    char *bufptr = m_replybuf;

    /* Parse the first line. */
    if (strncmp(bufptr, "HTTP/1.", 7) || !bufptr[7] || !bufptr[8])
        return PNR_IO_ERROR;
    int http_code = atoi(bufptr+9);
    if (http_code / 100 != 2)
        return PNR_HTTP_ERROR;
    bufptr = strstr(bufptr, "\r\n");
    if (!bufptr)
        return PNR_IO_ERROR;
    bufptr += 2;

    /* Parse the rest of the lines. */
    m_content_length = 0;
    bool is_chunked = false;
    char *newline;
    for (;; bufptr = newline+2) {
        newline = strstr(bufptr, "\r\n");
        if (!newline)
            return PNR_IO_ERROR;
        *newline = 0;

        char h_chunked[] = "Transfer-Encoding: chunked";
        char h_length[] = "Content-Length: ";
        if (*bufptr == 0) {
            /* Empty line. End of headers. */
            bufptr += 2;
            break;
        } else if (!strncmp(bufptr, h_chunked, sizeof(h_chunked)-1)) {
            /* Transfer-Encoding: chunked */
            is_chunked = true;
        } else if (!strncmp(bufptr, h_length, sizeof(h_length)-1)) {
            /* Content-Length: ... */
            m_content_length = atoi(bufptr + sizeof(h_length)-1);
        }
    }

    /* Possibly process a chunk header. */
    if (is_chunked) {
        m_content_length = atoi(bufptr);
        bufptr = strstr(bufptr, "\r\n");
        if (!bufptr)
            return PNR_IO_ERROR;
        bufptr += 2;
    }

    /* Consolidate the buffer. */
    m_replylen -= bufptr - m_replybuf;
    memmove(m_replybuf, bufptr, m_replylen);

    return PNR_OK;
}

PubNubRes
PubNubHTTP::http_recv_content()
{
    /* Now, we are ready to process data! */

    if (m_content_length > PUBNUB_REPLY_MAXLEN) {
        /* XXX: Actually, too much data, sorry. */
        DBG printf("too much data\r\n");
        return PNR_FORMAT_ERROR;
    }

    if (m_replylen < m_content_length) {
        /* More data coming. */
        int recv2len = receive_all(m_replybuf + m_replylen, PUBNUB_REPLY_MAXLEN - m_replylen);
        if (recv2len < 0)
            return PNR_IO_ERROR;
        m_replylen += recv2len;
    }

    if (m_replylen < m_content_length)
        return PNR_IO_ERROR; /* Incomplete data. */
    return PNR_OK;
}


/** Some utility routines. */

/* Split a JSON array (with arbitrary contents) to multiple NUL-terminated
 * C strings. */
static PubNubRes
split_array(char *buf, int len)
{
    bool escaped = false, in_string = false;
    int bracket_level = 0;
    for (int i = 0; i < len; i++) {
        if (escaped) {
            escaped = false;
        } else if (in_string) {
            switch (buf[i]) {
                case '\\': escaped = true; break;
                case '"': in_string = false; break;
                default: break;
            }
        } else {
            switch (buf[i]) {
                case '"': in_string = true; break;
                case '[': case '{': bracket_level++; break;
                case ']': case '}': bracket_level--; break;
                /* if at root, split! */
                case ',': if (bracket_level == 0) buf[i] = 0; break;
                default: break;
            }
        }
    }
    DBG printf("parse %d %d %d\r\n", escaped, in_string, bracket_level);
    if (escaped || in_string || bracket_level > 0)
        return PNR_FORMAT_ERROR;
    return PNR_OK;
}


/** PubNub API. */

PubNub::PubNub(const char *publish_key_, const char *subscribe_key_, const char *origin_) :
    m_publish_key(publish_key_), m_subscribe_key(subscribe_key_), m_origin(origin_),
    m_replybuf(NULL), m_replylen(0)
{
    strcpy(m_timetoken, "0");
}

PubNub::~PubNub()
{
    if (m_replybuf)
        free(m_replybuf);
}

const char *
PubNub::origin_hostname()
{
    /* TODO: More generic URL handling */
    return m_origin + strlen("http://");
}


PubNubRes
PubNub::publish(const char *channel, const char *message, char **reply)
{
    PubNubHTTP http;
    PubNubRes res = http.http_connect(origin_hostname());
    if (res != PNR_OK)
        return res;

    http.sendstr("GET /publish/");
    http.sendstr(m_publish_key);
    http.sendstr("/");
    http.sendstr(m_subscribe_key);
    http.sendstr("/0/");
    http.sendstr(channel);
    http.sendstr("/0/");
    http.sendstr_urlenc(message);

    char *locreply;
    if (!reply)
        reply = &locreply;
    int replylen;

    res = http.http_request_bh(origin_hostname(), reply, &replylen);
    if (res != PNR_OK)
        return res;
    bool success = (*reply)[1] == '1' && (*reply)[2] == ',';
    if (reply == &locreply)
        free(locreply);
    return success ? PNR_OK : PNR_PUBNUB_ERROR;
}


PubNubRes
PubNub::subscribe(const char *channel, char **reply)
{
    if (m_replybuf) {
        int prevlen = strlen(m_replybuf);
        //DBG printf("reply (%s) %d > %d: %d\r\n", m_replybuf, prevlen, m_replylen);
        if (prevlen < m_replylen) {
            /* Next message from stash-away buffer. */
            /* XXX: We can be either memory-frugal or CPU-frugal
             * here. We choose to be memory-frugal by copying
             * over messages many times, but we may want to make
             * this configurable. */
            m_replylen -= prevlen + 1;
            memmove(m_replybuf, m_replybuf + prevlen + 1, m_replylen);
            m_replybuf = (char *) realloc(m_replybuf, m_replylen);
            *reply = m_replybuf;
            return PNR_OK;

        } else {
            /* That's all. free() and fetch new messages. */
            free(m_replybuf);
            m_replybuf = NULL;
            m_replylen = 0;
        }
    }

    PubNubHTTP http;
    PubNubRes res;
    res = http.http_connect(origin_hostname());
    if (res != PNR_OK)
        return res;

    http.sendstr("GET /subscribe/");
    http.sendstr(m_subscribe_key);
    http.sendstr("/");
    http.sendstr(channel);
    http.sendstr("/0/");
    http.sendstr(m_timetoken);

    char *replybuf = NULL;
    int replylen;
    res = http.http_request_bh(origin_hostname(), &replybuf, &replylen);
    if (res != PNR_OK)
        goto error;

    /* Process the reply, sets timetoken and m_replybuf, m_replylen. */
    res = subscribe_processjson(replybuf, replylen);
    if (res != PNR_OK)
        goto error;
    replybuf = NULL; // freed by processjosn

    /* Split JSON array to messages. */
    res = split_array(m_replybuf, m_replylen);
    if (res != PNR_OK) {
        free(m_replybuf);
        goto error;
    }

    *reply = m_replybuf;
    return res;

error:
    if (res == PNR_FORMAT_ERROR) {
        /* In case of PubNub protocol error, abort an ongoing
         * subscribe and start over. This means some messages
         * were lost, but allows us to recover from bad
         * situations, e.g. too many messages queued or
         * unexpected problem caused by a particular message. */
        strcpy(m_timetoken, "0");
    }
    if (reply)
        free(reply);
    m_replybuf = NULL;
    m_replylen = 0;
    return res;
}

PubNubRes
PubNub::subscribe_processjson(char *reply, int replylen)
{
    if (reply[0] != '[' || reply[replylen-1] != ']'
        || reply[replylen-2] != '"') {
        DBG printf("bad reply '%s'\r\n", reply);
        return PNR_FORMAT_ERROR;
    }

    /* Extract timetoken. */
    reply[replylen-2] = 0;
    int i;
    for (i = replylen-3; i > 0 && i > int(replylen-3 - (sizeof(m_timetoken)-1)); i--)
        if (reply[i] == '"')
            break;
    if (!i || reply[i-1] != ',' || replylen-2 - (i+1) >= 64) {
        DBG printf("bad reply '%s'\r\n", reply);
        return PNR_FORMAT_ERROR;
    }
    strcpy(m_timetoken, &reply[i+1]);
    reply[i-1] = 0; // terminate the [] message array

    /* Empty reply? */
    if (i == 4) { /* "[[]" */
        free(reply);
        m_replybuf = NULL;
        m_replylen = 0;
        return PNR_OK;
    }

    /* Extract the messages array. */
    if (reply[1] != '[' || reply[i-2] != ']') {
        DBG printf("bad reply end '%s'\r\n", reply);
        return PNR_FORMAT_ERROR;
    }
    reply[i-2] = 0;

    /* Shrink memory buffer to bare minimum. */
    memmove(reply, reply + 2, i-2-2);
    m_replylen = i-2-2;
    reply[m_replylen] = 0;
    m_replybuf = (char *) realloc(reply, m_replylen);

    return PNR_OK;
}


PubNubRes
PubNub::history(const char *channel, char **reply, int *replysize, int limit)
{
    PubNubHTTP http;
    PubNubRes res;
    res = http.http_connect(origin_hostname());
    if (res != PNR_OK)
        return res;

    http.sendstr("GET /history/");
    http.sendstr(m_subscribe_key);
    http.sendstr("/");
    http.sendstr(channel);
    http.sendstr("/0/");
    char limitbuf[8]; snprintf(limitbuf, sizeof(limitbuf), "%d", limit);
    http.sendstr(limitbuf);

    char *replybuf = NULL;
    int replylen;
    res = http.http_request_bh(origin_hostname(), &replybuf, &replylen);
    if (res != PNR_OK)
        goto error;

    /* Extract from the array and split it. */

    if (replybuf[0] != '[' || replybuf[replylen-1] != ']') {
        res = PNR_FORMAT_ERROR;
        goto error;
    }

    replylen -= 2;
    if (replylen == 0) { /* The reply was [] */
        free(replybuf);
        *reply = NULL;
        *replysize = 0;
        return PNR_OK;
    }

    memmove(replybuf, replybuf + 1, replylen);
    replybuf[replylen] = 0;

    res = split_array(replybuf, replylen);
    if (res != PNR_OK)
        goto error;

    *reply = replybuf;
    *replysize = replylen;
    return PNR_OK;
    
error:
    free(replybuf);
    return res;
}


PubNubRes
PubNub::time(char *ts)
{
    PubNubHTTP http;
    PubNubRes res = http.http_connect(origin_hostname());
    if (res != PNR_OK)
        return res;

    http.sendstr("GET /time/0");

    char *reply;
    int replylen;
    res = http.http_request_bh(origin_hostname(), &reply, &replylen);
    if (res != PNR_OK)
        return res;

    if (replylen < 3 || replylen > 32 || reply[0] != '[' || reply[replylen-1] != ']') {
        free(reply);
        return PNR_FORMAT_ERROR;
    }

    replylen -= 2;
    memcpy(ts, reply + 1, replylen);
    ts[replylen] = 0;

    free(reply);
    return PNR_OK;
}