Build Realtime Apps With The Real-Time Network - the mbed PubNub API+SDK

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers PubNub.cpp Source File

PubNub.cpp

00001 #include <cstring>
00002 
00003 #include "mbed.h"
00004 #include "EthernetInterface.h"
00005 
00006 #include "PubNub.h"
00007 
00008 
00009 #define DBG if (0) // change to if (1) to enable debug prints on USB serial
00010 
00011 
00012 /* We roll our own HTTP communication stack as donatien's HTTPClient has
00013  * some hard limits on URL length and this will allow a more frugal RAM
00014  * usage for us. */
00015 
00016 /* Also, we pass messages as serialized JSON strings instead of parsed
00017  * structures.  The available JSON parsers (picojson in particular) are
00018  * heavy on many tiny memory allocations, which can get very troublesome
00019  * with mbed's malloc() - we get very high memory fragmentation.  In many
00020  * cases, working with raw JSON is just fine, and in our demo example,
00021  * we still show picojson usage for end-user parsing of complex incoming
00022  * messages. */
00023 
00024 
00025 /** Custom no-frills HTTP stream. */
00026 
00027 class PubNubHTTP: public TCPSocketConnection {
00028 public:
00029     /* Connect http socket to origin. */
00030     PubNubRes http_connect(const char *hostname);
00031 
00032     /* These methods are used to send the request type and URL. */
00033     /* Send a given NUL-terminated string over the http socket. */
00034     void sendstr(const char *string);
00035     /* Send a given NUL-terminated string over the http socket, URL encoded. */
00036     void sendstr_urlenc(const char *string);
00037 
00038     /* A common HTTP request "bottom half" - send the rest of headers
00039      * and wait for reply + receive. */
00040     PubNubRes http_request_bh(const char *host, char **reply, int *replylen);
00041 
00042 protected:
00043     /* These methods represent sub-stages of http_request_bh().
00044      * They share the state of m_replybuf and m_replylen. */
00045     PubNubRes http_send_headers(const char *host);
00046     PubNubRes http_recv_headers();
00047     PubNubRes http_recv_content();
00048 
00049     char *m_replybuf;
00050     int m_replylen;
00051     int m_content_length;
00052 };
00053 
00054 PubNubRes
00055 PubNubHTTP::http_connect(const char *hostname)
00056 {
00057     int ret = connect(hostname, 80);
00058     if (ret < 0) {
00059         close();
00060         return PNR_IO_ERROR;
00061     }
00062     return PNR_OK;
00063 }
00064 
00065 void
00066 PubNubHTTP::sendstr(const char *s)
00067 {
00068     send_all((char *) s, strlen(s));
00069 }
00070 
00071 void
00072 PubNubHTTP::sendstr_urlenc(const char *s)
00073 {
00074     while (s[0]) {
00075         /* RFC 3986 Unreserved characters plus few
00076          * safe reserved ones. */
00077         size_t okspan = strspn(s, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_.~" ",=:;@[]");
00078         if (okspan > 0) {
00079             send_all((char *) s, okspan);
00080             s += okspan;
00081         }
00082         if (s[0]) {
00083             /* %-encode a non-ok character. */
00084             char enc[3] = {'%'};
00085             enc[1] = "0123456789ABCDEF"[s[0] / 16];
00086             enc[2] = "0123456789ABCDEF"[s[0] % 16];
00087             send_all((char *) enc, 3);
00088             s++;
00089         }
00090     }
00091 }
00092 
00093 PubNubRes
00094 PubNubHTTP::http_request_bh(const char *host, char **reply, int *replylen)
00095 {
00096     m_replybuf = NULL;
00097     m_replylen = 0;
00098 
00099     PubNubRes res;
00100     res = http_send_headers(host);
00101     if (res != PNR_OK) {
00102         close();
00103         return res;
00104     }
00105 
00106     res = http_recv_headers();
00107     if (res != PNR_OK) {
00108         if (m_replybuf) free(m_replybuf);
00109         close();
00110         return res;
00111     }
00112 
00113     res = http_recv_content();
00114     if (res != PNR_OK) {
00115         if (m_replybuf) free(m_replybuf);
00116         close();
00117         return res;
00118     }
00119 
00120     close();
00121     *reply = (char *) realloc(m_replybuf, m_replylen);
00122     *replylen = m_replylen;
00123     return PNR_OK;
00124 }
00125 
00126 PubNubRes
00127 PubNubHTTP::http_send_headers(const char *host)
00128 {
00129     /* Finish the first line of the request. */
00130     sendstr("?pnsdk=PubNub-mbed/0.1 HTTP/1.1\r\n");
00131     /* Finish HTTP request. */
00132     sendstr("Host: ");
00133     sendstr(host);
00134     sendstr("\r\nUser-Agent: PubNub-mbed/0.1\r\nConnection: close\r\n\r\n");
00135     if (!is_connected())
00136         return PNR_IO_ERROR;
00137     return PNR_OK;
00138 }
00139 
00140 PubNubRes
00141 PubNubHTTP::http_recv_headers()
00142 {
00143     /* Now, read HTTP reply. */
00144     m_replybuf = (char *) malloc(PUBNUB_REPLY_MAXLEN+1);
00145 
00146     /* First, receive headers. */
00147     /* XXX: For now, we assume complete headers will fit in m_replybuf. */
00148     m_replylen = receive_all(m_replybuf, PUBNUB_REPLY_MAXLEN);
00149     if (m_replylen < 0)
00150         return PNR_IO_ERROR;
00151     m_replybuf[m_replylen] = 0;
00152 
00153     char *bufptr = m_replybuf;
00154 
00155     /* Parse the first line. */
00156     if (strncmp(bufptr, "HTTP/1.", 7) || !bufptr[7] || !bufptr[8])
00157         return PNR_IO_ERROR;
00158     int http_code = atoi(bufptr+9);
00159     if (http_code / 100 != 2)
00160         return PNR_HTTP_ERROR;
00161     bufptr = strstr(bufptr, "\r\n");
00162     if (!bufptr)
00163         return PNR_IO_ERROR;
00164     bufptr += 2;
00165 
00166     /* Parse the rest of the lines. */
00167     m_content_length = 0;
00168     bool is_chunked = false;
00169     char *newline;
00170     for (;; bufptr = newline+2) {
00171         newline = strstr(bufptr, "\r\n");
00172         if (!newline)
00173             return PNR_IO_ERROR;
00174         *newline = 0;
00175 
00176         char h_chunked[] = "Transfer-Encoding: chunked";
00177         char h_length[] = "Content-Length: ";
00178         if (*bufptr == 0) {
00179             /* Empty line. End of headers. */
00180             bufptr += 2;
00181             break;
00182         } else if (!strncmp(bufptr, h_chunked, sizeof(h_chunked)-1)) {
00183             /* Transfer-Encoding: chunked */
00184             is_chunked = true;
00185         } else if (!strncmp(bufptr, h_length, sizeof(h_length)-1)) {
00186             /* Content-Length: ... */
00187             m_content_length = atoi(bufptr + sizeof(h_length)-1);
00188         }
00189     }
00190 
00191     /* Possibly process a chunk header. */
00192     if (is_chunked) {
00193         m_content_length = atoi(bufptr);
00194         bufptr = strstr(bufptr, "\r\n");
00195         if (!bufptr)
00196             return PNR_IO_ERROR;
00197         bufptr += 2;
00198     }
00199 
00200     /* Consolidate the buffer. */
00201     m_replylen -= bufptr - m_replybuf;
00202     memmove(m_replybuf, bufptr, m_replylen);
00203 
00204     return PNR_OK;
00205 }
00206 
00207 PubNubRes
00208 PubNubHTTP::http_recv_content()
00209 {
00210     /* Now, we are ready to process data! */
00211 
00212     if (m_content_length > PUBNUB_REPLY_MAXLEN) {
00213         /* XXX: Actually, too much data, sorry. */
00214         DBG printf("too much data\r\n");
00215         return PNR_FORMAT_ERROR;
00216     }
00217 
00218     if (m_replylen < m_content_length) {
00219         /* More data coming. */
00220         int recv2len = receive_all(m_replybuf + m_replylen, PUBNUB_REPLY_MAXLEN - m_replylen);
00221         if (recv2len < 0)
00222             return PNR_IO_ERROR;
00223         m_replylen += recv2len;
00224     }
00225 
00226     if (m_replylen < m_content_length)
00227         return PNR_IO_ERROR; /* Incomplete data. */
00228     return PNR_OK;
00229 }
00230 
00231 
00232 /** Some utility routines. */
00233 
00234 /* Split a JSON array (with arbitrary contents) to multiple NUL-terminated
00235  * C strings. */
00236 static PubNubRes
00237 split_array(char *buf, int len)
00238 {
00239     bool escaped = false, in_string = false;
00240     int bracket_level = 0;
00241     for (int i = 0; i < len; i++) {
00242         if (escaped) {
00243             escaped = false;
00244         } else if (in_string) {
00245             switch (buf[i]) {
00246                 case '\\': escaped = true; break;
00247                 case '"': in_string = false; break;
00248                 default: break;
00249             }
00250         } else {
00251             switch (buf[i]) {
00252                 case '"': in_string = true; break;
00253                 case '[': case '{': bracket_level++; break;
00254                 case ']': case '}': bracket_level--; break;
00255                 /* if at root, split! */
00256                 case ',': if (bracket_level == 0) buf[i] = 0; break;
00257                 default: break;
00258             }
00259         }
00260     }
00261     DBG printf("parse %d %d %d\r\n", escaped, in_string, bracket_level);
00262     if (escaped || in_string || bracket_level > 0)
00263         return PNR_FORMAT_ERROR;
00264     return PNR_OK;
00265 }
00266 
00267 
00268 /** PubNub API. */
00269 
00270 PubNub::PubNub(const char *publish_key_, const char *subscribe_key_, const char *origin_) :
00271     m_publish_key(publish_key_), m_subscribe_key(subscribe_key_), m_origin(origin_),
00272     m_replybuf(NULL), m_replylen(0)
00273 {
00274     strcpy(m_timetoken, "0");
00275 }
00276 
00277 PubNub::~PubNub()
00278 {
00279     if (m_replybuf)
00280         free(m_replybuf);
00281 }
00282 
00283 const char *
00284 PubNub::origin_hostname()
00285 {
00286     /* TODO: More generic URL handling */
00287     return m_origin + strlen("http://");
00288 }
00289 
00290 
00291 PubNubRes
00292 PubNub::publish(const char *channel, const char *message, char **reply)
00293 {
00294     PubNubHTTP http;
00295     PubNubRes res = http.http_connect(origin_hostname());
00296     if (res != PNR_OK)
00297         return res;
00298 
00299     http.sendstr("GET /publish/");
00300     http.sendstr(m_publish_key);
00301     http.sendstr("/");
00302     http.sendstr(m_subscribe_key);
00303     http.sendstr("/0/");
00304     http.sendstr(channel);
00305     http.sendstr("/0/");
00306     http.sendstr_urlenc(message);
00307 
00308     char *locreply;
00309     if (!reply)
00310         reply = &locreply;
00311     int replylen;
00312 
00313     res = http.http_request_bh(origin_hostname(), reply, &replylen);
00314     if (res != PNR_OK)
00315         return res;
00316     bool success = (*reply)[1] == '1' && (*reply)[2] == ',';
00317     if (reply == &locreply)
00318         free(locreply);
00319     return success ? PNR_OK : PNR_PUBNUB_ERROR;
00320 }
00321 
00322 
00323 PubNubRes
00324 PubNub::subscribe(const char *channel, char **reply)
00325 {
00326     if (m_replybuf) {
00327         int prevlen = strlen(m_replybuf);
00328         //DBG printf("reply (%s) %d > %d: %d\r\n", m_replybuf, prevlen, m_replylen);
00329         if (prevlen < m_replylen) {
00330             /* Next message from stash-away buffer. */
00331             /* XXX: We can be either memory-frugal or CPU-frugal
00332              * here. We choose to be memory-frugal by copying
00333              * over messages many times, but we may want to make
00334              * this configurable. */
00335             m_replylen -= prevlen + 1;
00336             memmove(m_replybuf, m_replybuf + prevlen + 1, m_replylen);
00337             m_replybuf = (char *) realloc(m_replybuf, m_replylen);
00338             *reply = m_replybuf;
00339             return PNR_OK;
00340 
00341         } else {
00342             /* That's all. free() and fetch new messages. */
00343             free(m_replybuf);
00344             m_replybuf = NULL;
00345             m_replylen = 0;
00346         }
00347     }
00348 
00349     PubNubHTTP http;
00350     PubNubRes res;
00351     res = http.http_connect(origin_hostname());
00352     if (res != PNR_OK)
00353         return res;
00354 
00355     http.sendstr("GET /subscribe/");
00356     http.sendstr(m_subscribe_key);
00357     http.sendstr("/");
00358     http.sendstr(channel);
00359     http.sendstr("/0/");
00360     http.sendstr(m_timetoken);
00361 
00362     char *replybuf = NULL;
00363     int replylen;
00364     res = http.http_request_bh(origin_hostname(), &replybuf, &replylen);
00365     if (res != PNR_OK)
00366         goto error;
00367 
00368     /* Process the reply, sets timetoken and m_replybuf, m_replylen. */
00369     res = subscribe_processjson(replybuf, replylen);
00370     if (res != PNR_OK)
00371         goto error;
00372     replybuf = NULL; // freed by processjosn
00373 
00374     /* Split JSON array to messages. */
00375     res = split_array(m_replybuf, m_replylen);
00376     if (res != PNR_OK) {
00377         free(m_replybuf);
00378         goto error;
00379     }
00380 
00381     *reply = m_replybuf;
00382     return res;
00383 
00384 error:
00385     if (res == PNR_FORMAT_ERROR) {
00386         /* In case of PubNub protocol error, abort an ongoing
00387          * subscribe and start over. This means some messages
00388          * were lost, but allows us to recover from bad
00389          * situations, e.g. too many messages queued or
00390          * unexpected problem caused by a particular message. */
00391         strcpy(m_timetoken, "0");
00392     }
00393     if (reply)
00394         free(reply);
00395     m_replybuf = NULL;
00396     m_replylen = 0;
00397     return res;
00398 }
00399 
00400 PubNubRes
00401 PubNub::subscribe_processjson(char *reply, int replylen)
00402 {
00403     if (reply[0] != '[' || reply[replylen-1] != ']'
00404         || reply[replylen-2] != '"') {
00405         DBG printf("bad reply '%s'\r\n", reply);
00406         return PNR_FORMAT_ERROR;
00407     }
00408 
00409     /* Extract timetoken. */
00410     reply[replylen-2] = 0;
00411     int i;
00412     for (i = replylen-3; i > 0 && i > int(replylen-3 - (sizeof(m_timetoken)-1)); i--)
00413         if (reply[i] == '"')
00414             break;
00415     if (!i || reply[i-1] != ',' || replylen-2 - (i+1) >= 64) {
00416         DBG printf("bad reply '%s'\r\n", reply);
00417         return PNR_FORMAT_ERROR;
00418     }
00419     strcpy(m_timetoken, &reply[i+1]);
00420     reply[i-1] = 0; // terminate the [] message array
00421 
00422     /* Empty reply? */
00423     if (i == 4) { /* "[[]" */
00424         free(reply);
00425         m_replybuf = NULL;
00426         m_replylen = 0;
00427         return PNR_OK;
00428     }
00429 
00430     /* Extract the messages array. */
00431     if (reply[1] != '[' || reply[i-2] != ']') {
00432         DBG printf("bad reply end '%s'\r\n", reply);
00433         return PNR_FORMAT_ERROR;
00434     }
00435     reply[i-2] = 0;
00436 
00437     /* Shrink memory buffer to bare minimum. */
00438     memmove(reply, reply + 2, i-2-2);
00439     m_replylen = i-2-2;
00440     m_replybuf = (char *) realloc(reply, m_replylen);
00441 
00442     return PNR_OK;
00443 }
00444 
00445 
00446 PubNubRes
00447 PubNub::history(const char *channel, char **reply, int *replysize, int limit)
00448 {
00449     PubNubHTTP http;
00450     PubNubRes res;
00451     res = http.http_connect(origin_hostname());
00452     if (res != PNR_OK)
00453         return res;
00454 
00455     http.sendstr("GET /history/");
00456     http.sendstr(m_subscribe_key);
00457     http.sendstr("/");
00458     http.sendstr(channel);
00459     http.sendstr("/0/");
00460     char limitbuf[8]; snprintf(limitbuf, sizeof(limitbuf), "%d", limit);
00461     http.sendstr(limitbuf);
00462 
00463     char *replybuf = NULL;
00464     int replylen;
00465     res = http.http_request_bh(origin_hostname(), &replybuf, &replylen);
00466     if (res != PNR_OK)
00467         goto error;
00468 
00469     /* Extract from the array and split it. */
00470 
00471     if (replybuf[0] != '[' || replybuf[replylen-1] != ']') {
00472         res = PNR_FORMAT_ERROR;
00473         goto error;
00474     }
00475 
00476     replylen -= 2;
00477     if (replylen == 0) { /* The reply was [] */
00478         free(replybuf);
00479         *reply = NULL;
00480         *replysize = 0;
00481         return PNR_OK;
00482     }
00483 
00484     memmove(replybuf, replybuf + 1, replylen);
00485     replybuf[replylen] = 0;
00486 
00487     res = split_array(replybuf, replylen);
00488     if (res != PNR_OK)
00489         goto error;
00490 
00491     *reply = replybuf;
00492     *replysize = replylen;
00493     return PNR_OK;
00494     
00495 error:
00496     free(replybuf);
00497     return res;
00498 }
00499 
00500 
00501 PubNubRes
00502 PubNub::time(char *ts)
00503 {
00504     PubNubHTTP http;
00505     PubNubRes res = http.http_connect(origin_hostname());
00506     if (res != PNR_OK)
00507         return res;
00508 
00509     http.sendstr("GET /time/0");
00510 
00511     char *reply;
00512     int replylen;
00513     res = http.http_request_bh(origin_hostname(), &reply, &replylen);
00514     if (res != PNR_OK)
00515         return res;
00516 
00517     if (replylen < 3 || replylen > 32 || reply[0] != '[' || reply[replylen-1] != ']') {
00518         free(reply);
00519         return PNR_FORMAT_ERROR;
00520     }
00521 
00522     replylen -= 2;
00523     memcpy(ts, reply + 1, replylen);
00524     ts[replylen] = 0;
00525 
00526     free(reply);
00527     return PNR_OK;
00528 }