Build Realtime Apps With The Real-Time Network - the mbed PubNub API+SDK
The PubNub library enables your mbed board to communicate with the world via the PubNub cloud messaging system.
The library provides a PubNub class that is tied to a particular set of keys and offers methods that correspond to the appropriate API methods - publish, subscribe, history, etc. The JSON encoded messages are passed as raw strings to conserve memory, but at your option, you can use e.g. picojson library to deal with JSON. The API is synchronous - use multiple rtos threads to talk to PubNub on the background.
Getting Started
Can't wait to try it out? Connect your mbed application board and proceed to the demo project:
Import programPubNubDemo
Reference demo of the PubNub library for the mbed application board - control your board over the internet!
Library Usage
Import library
Public Member Functions |
|
PubNub (const char *publish_key, const char *subscribe_key, const char *origin="http://pubsub.pubnub.com") | |
Init a Pubnub Client context.
|
|
PubNubRes | publish (const char *channel, const char *message, char **reply=NULL) |
Publish API call.
|
|
PubNubRes | subscribe (const char *channel, char **reply) |
Subscribe API call.
|
|
PubNubRes | history (const char *channel, char **reply, int *replysize, int limit=10) |
History API call.
|
|
PubNubRes | time (char *ts) |
Time API call.
|
PubNub.cpp
- Committer:
- Petr Baudis
- Date:
- 2014-09-01
- Revision:
- 5:1c98f24712e1
- Parent:
- 0:9858347c382d
File content as of revision 5:1c98f24712e1:
#include <cstring> #include "mbed.h" #include "EthernetInterface.h" #include "PubNub.h" #define DBG if (0) // change to if (1) to enable debug prints on USB serial /* We roll our own HTTP communication stack as donatien's HTTPClient has * some hard limits on URL length and this will allow a more frugal RAM * usage for us. */ /* Also, we pass messages as serialized JSON strings instead of parsed * structures. The available JSON parsers (picojson in particular) are * heavy on many tiny memory allocations, which can get very troublesome * with mbed's malloc() - we get very high memory fragmentation. In many * cases, working with raw JSON is just fine, and in our demo example, * we still show picojson usage for end-user parsing of complex incoming * messages. */ /** Custom no-frills HTTP stream. */ class PubNubHTTP: public TCPSocketConnection { public: /* Connect http socket to origin. */ PubNubRes http_connect(const char *hostname); /* These methods are used to send the request type and URL. */ /* Send a given NUL-terminated string over the http socket. */ void sendstr(const char *string); /* Send a given NUL-terminated string over the http socket, URL encoded. */ void sendstr_urlenc(const char *string); /* A common HTTP request "bottom half" - send the rest of headers * and wait for reply + receive. */ PubNubRes http_request_bh(const char *host, char **reply, int *replylen); protected: /* These methods represent sub-stages of http_request_bh(). * They share the state of m_replybuf and m_replylen. */ PubNubRes http_send_headers(const char *host); PubNubRes http_recv_headers(); PubNubRes http_recv_content(); char *m_replybuf; int m_replylen; int m_content_length; }; PubNubRes PubNubHTTP::http_connect(const char *hostname) { int ret = connect(hostname, 80); if (ret < 0) { close(); return PNR_IO_ERROR; } return PNR_OK; } void PubNubHTTP::sendstr(const char *s) { send_all((char *) s, strlen(s)); } void PubNubHTTP::sendstr_urlenc(const char *s) { while (s[0]) { /* RFC 3986 Unreserved characters plus few * safe reserved ones. */ size_t okspan = strspn(s, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_.~" ",=:;@[]"); if (okspan > 0) { send_all((char *) s, okspan); s += okspan; } if (s[0]) { /* %-encode a non-ok character. */ char enc[3] = {'%'}; enc[1] = "0123456789ABCDEF"[s[0] / 16]; enc[2] = "0123456789ABCDEF"[s[0] % 16]; send_all((char *) enc, 3); s++; } } } PubNubRes PubNubHTTP::http_request_bh(const char *host, char **reply, int *replylen) { m_replybuf = NULL; m_replylen = 0; PubNubRes res; res = http_send_headers(host); if (res != PNR_OK) { close(); return res; } res = http_recv_headers(); if (res != PNR_OK) { if (m_replybuf) free(m_replybuf); close(); return res; } res = http_recv_content(); if (res != PNR_OK) { if (m_replybuf) free(m_replybuf); close(); return res; } close(); *reply = (char *) realloc(m_replybuf, m_replylen); *replylen = m_replylen; return PNR_OK; } PubNubRes PubNubHTTP::http_send_headers(const char *host) { /* Finish the first line of the request. */ sendstr("?pnsdk=PubNub-mbed/0.1 HTTP/1.1\r\n"); /* Finish HTTP request. */ sendstr("Host: "); sendstr(host); sendstr("\r\nUser-Agent: PubNub-mbed/0.1\r\nConnection: close\r\n\r\n"); if (!is_connected()) return PNR_IO_ERROR; return PNR_OK; } PubNubRes PubNubHTTP::http_recv_headers() { /* Now, read HTTP reply. */ m_replybuf = (char *) malloc(PUBNUB_REPLY_MAXLEN+1); /* First, receive headers. */ /* XXX: For now, we assume complete headers will fit in m_replybuf. */ m_replylen = receive_all(m_replybuf, PUBNUB_REPLY_MAXLEN); if (m_replylen < 0) return PNR_IO_ERROR; m_replybuf[m_replylen] = 0; char *bufptr = m_replybuf; /* Parse the first line. */ if (strncmp(bufptr, "HTTP/1.", 7) || !bufptr[7] || !bufptr[8]) return PNR_IO_ERROR; int http_code = atoi(bufptr+9); if (http_code / 100 != 2) return PNR_HTTP_ERROR; bufptr = strstr(bufptr, "\r\n"); if (!bufptr) return PNR_IO_ERROR; bufptr += 2; /* Parse the rest of the lines. */ m_content_length = 0; bool is_chunked = false; char *newline; for (;; bufptr = newline+2) { newline = strstr(bufptr, "\r\n"); if (!newline) return PNR_IO_ERROR; *newline = 0; char h_chunked[] = "Transfer-Encoding: chunked"; char h_length[] = "Content-Length: "; if (*bufptr == 0) { /* Empty line. End of headers. */ bufptr += 2; break; } else if (!strncmp(bufptr, h_chunked, sizeof(h_chunked)-1)) { /* Transfer-Encoding: chunked */ is_chunked = true; } else if (!strncmp(bufptr, h_length, sizeof(h_length)-1)) { /* Content-Length: ... */ m_content_length = atoi(bufptr + sizeof(h_length)-1); } } /* Possibly process a chunk header. */ if (is_chunked) { m_content_length = atoi(bufptr); bufptr = strstr(bufptr, "\r\n"); if (!bufptr) return PNR_IO_ERROR; bufptr += 2; } /* Consolidate the buffer. */ m_replylen -= bufptr - m_replybuf; memmove(m_replybuf, bufptr, m_replylen); return PNR_OK; } PubNubRes PubNubHTTP::http_recv_content() { /* Now, we are ready to process data! */ if (m_content_length > PUBNUB_REPLY_MAXLEN) { /* XXX: Actually, too much data, sorry. */ DBG printf("too much data\r\n"); return PNR_FORMAT_ERROR; } if (m_replylen < m_content_length) { /* More data coming. */ int recv2len = receive_all(m_replybuf + m_replylen, PUBNUB_REPLY_MAXLEN - m_replylen); if (recv2len < 0) return PNR_IO_ERROR; m_replylen += recv2len; } if (m_replylen < m_content_length) return PNR_IO_ERROR; /* Incomplete data. */ return PNR_OK; } /** Some utility routines. */ /* Split a JSON array (with arbitrary contents) to multiple NUL-terminated * C strings. */ static PubNubRes split_array(char *buf, int len) { bool escaped = false, in_string = false; int bracket_level = 0; for (int i = 0; i < len; i++) { if (escaped) { escaped = false; } else if (in_string) { switch (buf[i]) { case '\\': escaped = true; break; case '"': in_string = false; break; default: break; } } else { switch (buf[i]) { case '"': in_string = true; break; case '[': case '{': bracket_level++; break; case ']': case '}': bracket_level--; break; /* if at root, split! */ case ',': if (bracket_level == 0) buf[i] = 0; break; default: break; } } } DBG printf("parse %d %d %d\r\n", escaped, in_string, bracket_level); if (escaped || in_string || bracket_level > 0) return PNR_FORMAT_ERROR; return PNR_OK; } /** PubNub API. */ PubNub::PubNub(const char *publish_key_, const char *subscribe_key_, const char *origin_) : m_publish_key(publish_key_), m_subscribe_key(subscribe_key_), m_origin(origin_), m_replybuf(NULL), m_replylen(0) { strcpy(m_timetoken, "0"); } PubNub::~PubNub() { if (m_replybuf) free(m_replybuf); } const char * PubNub::origin_hostname() { /* TODO: More generic URL handling */ return m_origin + strlen("http://"); } PubNubRes PubNub::publish(const char *channel, const char *message, char **reply) { PubNubHTTP http; PubNubRes res = http.http_connect(origin_hostname()); if (res != PNR_OK) return res; http.sendstr("GET /publish/"); http.sendstr(m_publish_key); http.sendstr("/"); http.sendstr(m_subscribe_key); http.sendstr("/0/"); http.sendstr(channel); http.sendstr("/0/"); http.sendstr_urlenc(message); char *locreply; if (!reply) reply = &locreply; int replylen; res = http.http_request_bh(origin_hostname(), reply, &replylen); if (res != PNR_OK) return res; bool success = (*reply)[1] == '1' && (*reply)[2] == ','; if (reply == &locreply) free(locreply); return success ? PNR_OK : PNR_PUBNUB_ERROR; } PubNubRes PubNub::subscribe(const char *channel, char **reply) { if (m_replybuf) { int prevlen = strlen(m_replybuf); //DBG printf("reply (%s) %d > %d: %d\r\n", m_replybuf, prevlen, m_replylen); if (prevlen < m_replylen) { /* Next message from stash-away buffer. */ /* XXX: We can be either memory-frugal or CPU-frugal * here. We choose to be memory-frugal by copying * over messages many times, but we may want to make * this configurable. */ m_replylen -= prevlen + 1; memmove(m_replybuf, m_replybuf + prevlen + 1, m_replylen); m_replybuf = (char *) realloc(m_replybuf, m_replylen); *reply = m_replybuf; return PNR_OK; } else { /* That's all. free() and fetch new messages. */ free(m_replybuf); m_replybuf = NULL; m_replylen = 0; } } PubNubHTTP http; PubNubRes res; res = http.http_connect(origin_hostname()); if (res != PNR_OK) return res; http.sendstr("GET /subscribe/"); http.sendstr(m_subscribe_key); http.sendstr("/"); http.sendstr(channel); http.sendstr("/0/"); http.sendstr(m_timetoken); char *replybuf = NULL; int replylen; res = http.http_request_bh(origin_hostname(), &replybuf, &replylen); if (res != PNR_OK) goto error; /* Process the reply, sets timetoken and m_replybuf, m_replylen. */ res = subscribe_processjson(replybuf, replylen); if (res != PNR_OK) goto error; replybuf = NULL; // freed by processjosn /* Split JSON array to messages. */ res = split_array(m_replybuf, m_replylen); if (res != PNR_OK) { free(m_replybuf); goto error; } *reply = m_replybuf; return res; error: if (res == PNR_FORMAT_ERROR) { /* In case of PubNub protocol error, abort an ongoing * subscribe and start over. This means some messages * were lost, but allows us to recover from bad * situations, e.g. too many messages queued or * unexpected problem caused by a particular message. */ strcpy(m_timetoken, "0"); } if (reply) free(reply); m_replybuf = NULL; m_replylen = 0; return res; } PubNubRes PubNub::subscribe_processjson(char *reply, int replylen) { if (reply[0] != '[' || reply[replylen-1] != ']' || reply[replylen-2] != '"') { DBG printf("bad reply '%s'\r\n", reply); return PNR_FORMAT_ERROR; } /* Extract timetoken. */ reply[replylen-2] = 0; int i; for (i = replylen-3; i > 0 && i > int(replylen-3 - (sizeof(m_timetoken)-1)); i--) if (reply[i] == '"') break; if (!i || reply[i-1] != ',' || replylen-2 - (i+1) >= 64) { DBG printf("bad reply '%s'\r\n", reply); return PNR_FORMAT_ERROR; } strcpy(m_timetoken, &reply[i+1]); reply[i-1] = 0; // terminate the [] message array /* Empty reply? */ if (i == 4) { /* "[[]" */ free(reply); m_replybuf = NULL; m_replylen = 0; return PNR_OK; } /* Extract the messages array. */ if (reply[1] != '[' || reply[i-2] != ']') { DBG printf("bad reply end '%s'\r\n", reply); return PNR_FORMAT_ERROR; } reply[i-2] = 0; /* Shrink memory buffer to bare minimum. */ memmove(reply, reply + 2, i-2-2); m_replylen = i-2-2; m_replybuf = (char *) realloc(reply, m_replylen); return PNR_OK; } PubNubRes PubNub::history(const char *channel, char **reply, int *replysize, int limit) { PubNubHTTP http; PubNubRes res; res = http.http_connect(origin_hostname()); if (res != PNR_OK) return res; http.sendstr("GET /history/"); http.sendstr(m_subscribe_key); http.sendstr("/"); http.sendstr(channel); http.sendstr("/0/"); char limitbuf[8]; snprintf(limitbuf, sizeof(limitbuf), "%d", limit); http.sendstr(limitbuf); char *replybuf = NULL; int replylen; res = http.http_request_bh(origin_hostname(), &replybuf, &replylen); if (res != PNR_OK) goto error; /* Extract from the array and split it. */ if (replybuf[0] != '[' || replybuf[replylen-1] != ']') { res = PNR_FORMAT_ERROR; goto error; } replylen -= 2; if (replylen == 0) { /* The reply was [] */ free(replybuf); *reply = NULL; *replysize = 0; return PNR_OK; } memmove(replybuf, replybuf + 1, replylen); replybuf[replylen] = 0; res = split_array(replybuf, replylen); if (res != PNR_OK) goto error; *reply = replybuf; *replysize = replylen; return PNR_OK; error: free(replybuf); return res; } PubNubRes PubNub::time(char *ts) { PubNubHTTP http; PubNubRes res = http.http_connect(origin_hostname()); if (res != PNR_OK) return res; http.sendstr("GET /time/0"); char *reply; int replylen; res = http.http_request_bh(origin_hostname(), &reply, &replylen); if (res != PNR_OK) return res; if (replylen < 3 || replylen > 32 || reply[0] != '[' || reply[replylen-1] != ']') { free(reply); return PNR_FORMAT_ERROR; } replylen -= 2; memcpy(ts, reply + 1, replylen); ts[replylen] = 0; free(reply); return PNR_OK; }