Build Realtime Apps With The Real-Time Network - the mbed PubNub API+SDK
Embed:
(wiki syntax)
Show/hide line numbers
PubNub.cpp
00001 #include <cstring> 00002 00003 #include "mbed.h" 00004 #include "EthernetInterface.h" 00005 00006 #include "PubNub.h" 00007 00008 00009 #define DBG if (0) // change to if (1) to enable debug prints on USB serial 00010 00011 00012 /* We roll our own HTTP communication stack as donatien's HTTPClient has 00013 * some hard limits on URL length and this will allow a more frugal RAM 00014 * usage for us. */ 00015 00016 /* Also, we pass messages as serialized JSON strings instead of parsed 00017 * structures. The available JSON parsers (picojson in particular) are 00018 * heavy on many tiny memory allocations, which can get very troublesome 00019 * with mbed's malloc() - we get very high memory fragmentation. In many 00020 * cases, working with raw JSON is just fine, and in our demo example, 00021 * we still show picojson usage for end-user parsing of complex incoming 00022 * messages. */ 00023 00024 00025 /** Custom no-frills HTTP stream. */ 00026 00027 class PubNubHTTP: public TCPSocketConnection { 00028 public: 00029 /* Connect http socket to origin. */ 00030 PubNubRes http_connect(const char *hostname); 00031 00032 /* These methods are used to send the request type and URL. */ 00033 /* Send a given NUL-terminated string over the http socket. */ 00034 void sendstr(const char *string); 00035 /* Send a given NUL-terminated string over the http socket, URL encoded. */ 00036 void sendstr_urlenc(const char *string); 00037 00038 /* A common HTTP request "bottom half" - send the rest of headers 00039 * and wait for reply + receive. */ 00040 PubNubRes http_request_bh(const char *host, char **reply, int *replylen); 00041 00042 protected: 00043 /* These methods represent sub-stages of http_request_bh(). 00044 * They share the state of m_replybuf and m_replylen. */ 00045 PubNubRes http_send_headers(const char *host); 00046 PubNubRes http_recv_headers(); 00047 PubNubRes http_recv_content(); 00048 00049 char *m_replybuf; 00050 int m_replylen; 00051 int m_content_length; 00052 }; 00053 00054 PubNubRes 00055 PubNubHTTP::http_connect(const char *hostname) 00056 { 00057 int ret = connect(hostname, 80); 00058 if (ret < 0) { 00059 close(); 00060 return PNR_IO_ERROR; 00061 } 00062 return PNR_OK; 00063 } 00064 00065 void 00066 PubNubHTTP::sendstr(const char *s) 00067 { 00068 send_all((char *) s, strlen(s)); 00069 } 00070 00071 void 00072 PubNubHTTP::sendstr_urlenc(const char *s) 00073 { 00074 while (s[0]) { 00075 /* RFC 3986 Unreserved characters plus few 00076 * safe reserved ones. */ 00077 size_t okspan = strspn(s, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_.~" ",=:;@[]"); 00078 if (okspan > 0) { 00079 send_all((char *) s, okspan); 00080 s += okspan; 00081 } 00082 if (s[0]) { 00083 /* %-encode a non-ok character. */ 00084 char enc[3] = {'%'}; 00085 enc[1] = "0123456789ABCDEF"[s[0] / 16]; 00086 enc[2] = "0123456789ABCDEF"[s[0] % 16]; 00087 send_all((char *) enc, 3); 00088 s++; 00089 } 00090 } 00091 } 00092 00093 PubNubRes 00094 PubNubHTTP::http_request_bh(const char *host, char **reply, int *replylen) 00095 { 00096 m_replybuf = NULL; 00097 m_replylen = 0; 00098 00099 PubNubRes res; 00100 res = http_send_headers(host); 00101 if (res != PNR_OK) { 00102 close(); 00103 return res; 00104 } 00105 00106 res = http_recv_headers(); 00107 if (res != PNR_OK) { 00108 if (m_replybuf) free(m_replybuf); 00109 close(); 00110 return res; 00111 } 00112 00113 res = http_recv_content(); 00114 if (res != PNR_OK) { 00115 if (m_replybuf) free(m_replybuf); 00116 close(); 00117 return res; 00118 } 00119 00120 close(); 00121 *reply = (char *) realloc(m_replybuf, m_replylen); 00122 *replylen = m_replylen; 00123 return PNR_OK; 00124 } 00125 00126 PubNubRes 00127 PubNubHTTP::http_send_headers(const char *host) 00128 { 00129 /* Finish the first line of the request. */ 00130 sendstr("?pnsdk=PubNub-mbed/0.1 HTTP/1.1\r\n"); 00131 /* Finish HTTP request. */ 00132 sendstr("Host: "); 00133 sendstr(host); 00134 sendstr("\r\nUser-Agent: PubNub-mbed/0.1\r\nConnection: close\r\n\r\n"); 00135 if (!is_connected()) 00136 return PNR_IO_ERROR; 00137 return PNR_OK; 00138 } 00139 00140 PubNubRes 00141 PubNubHTTP::http_recv_headers() 00142 { 00143 /* Now, read HTTP reply. */ 00144 m_replybuf = (char *) malloc(PUBNUB_REPLY_MAXLEN+1); 00145 00146 /* First, receive headers. */ 00147 /* XXX: For now, we assume complete headers will fit in m_replybuf. */ 00148 m_replylen = receive_all(m_replybuf, PUBNUB_REPLY_MAXLEN); 00149 if (m_replylen < 0) 00150 return PNR_IO_ERROR; 00151 m_replybuf[m_replylen] = 0; 00152 00153 char *bufptr = m_replybuf; 00154 00155 /* Parse the first line. */ 00156 if (strncmp(bufptr, "HTTP/1.", 7) || !bufptr[7] || !bufptr[8]) 00157 return PNR_IO_ERROR; 00158 int http_code = atoi(bufptr+9); 00159 if (http_code / 100 != 2) 00160 return PNR_HTTP_ERROR; 00161 bufptr = strstr(bufptr, "\r\n"); 00162 if (!bufptr) 00163 return PNR_IO_ERROR; 00164 bufptr += 2; 00165 00166 /* Parse the rest of the lines. */ 00167 m_content_length = 0; 00168 bool is_chunked = false; 00169 char *newline; 00170 for (;; bufptr = newline+2) { 00171 newline = strstr(bufptr, "\r\n"); 00172 if (!newline) 00173 return PNR_IO_ERROR; 00174 *newline = 0; 00175 00176 char h_chunked[] = "Transfer-Encoding: chunked"; 00177 char h_length[] = "Content-Length: "; 00178 if (*bufptr == 0) { 00179 /* Empty line. End of headers. */ 00180 bufptr += 2; 00181 break; 00182 } else if (!strncmp(bufptr, h_chunked, sizeof(h_chunked)-1)) { 00183 /* Transfer-Encoding: chunked */ 00184 is_chunked = true; 00185 } else if (!strncmp(bufptr, h_length, sizeof(h_length)-1)) { 00186 /* Content-Length: ... */ 00187 m_content_length = atoi(bufptr + sizeof(h_length)-1); 00188 } 00189 } 00190 00191 /* Possibly process a chunk header. */ 00192 if (is_chunked) { 00193 m_content_length = atoi(bufptr); 00194 bufptr = strstr(bufptr, "\r\n"); 00195 if (!bufptr) 00196 return PNR_IO_ERROR; 00197 bufptr += 2; 00198 } 00199 00200 /* Consolidate the buffer. */ 00201 m_replylen -= bufptr - m_replybuf; 00202 memmove(m_replybuf, bufptr, m_replylen); 00203 00204 return PNR_OK; 00205 } 00206 00207 PubNubRes 00208 PubNubHTTP::http_recv_content() 00209 { 00210 /* Now, we are ready to process data! */ 00211 00212 if (m_content_length > PUBNUB_REPLY_MAXLEN) { 00213 /* XXX: Actually, too much data, sorry. */ 00214 DBG printf("too much data\r\n"); 00215 return PNR_FORMAT_ERROR; 00216 } 00217 00218 if (m_replylen < m_content_length) { 00219 /* More data coming. */ 00220 int recv2len = receive_all(m_replybuf + m_replylen, PUBNUB_REPLY_MAXLEN - m_replylen); 00221 if (recv2len < 0) 00222 return PNR_IO_ERROR; 00223 m_replylen += recv2len; 00224 } 00225 00226 if (m_replylen < m_content_length) 00227 return PNR_IO_ERROR; /* Incomplete data. */ 00228 return PNR_OK; 00229 } 00230 00231 00232 /** Some utility routines. */ 00233 00234 /* Split a JSON array (with arbitrary contents) to multiple NUL-terminated 00235 * C strings. */ 00236 static PubNubRes 00237 split_array(char *buf, int len) 00238 { 00239 bool escaped = false, in_string = false; 00240 int bracket_level = 0; 00241 for (int i = 0; i < len; i++) { 00242 if (escaped) { 00243 escaped = false; 00244 } else if (in_string) { 00245 switch (buf[i]) { 00246 case '\\': escaped = true; break; 00247 case '"': in_string = false; break; 00248 default: break; 00249 } 00250 } else { 00251 switch (buf[i]) { 00252 case '"': in_string = true; break; 00253 case '[': case '{': bracket_level++; break; 00254 case ']': case '}': bracket_level--; break; 00255 /* if at root, split! */ 00256 case ',': if (bracket_level == 0) buf[i] = 0; break; 00257 default: break; 00258 } 00259 } 00260 } 00261 DBG printf("parse %d %d %d\r\n", escaped, in_string, bracket_level); 00262 if (escaped || in_string || bracket_level > 0) 00263 return PNR_FORMAT_ERROR; 00264 return PNR_OK; 00265 } 00266 00267 00268 /** PubNub API. */ 00269 00270 PubNub::PubNub(const char *publish_key_, const char *subscribe_key_, const char *origin_) : 00271 m_publish_key(publish_key_), m_subscribe_key(subscribe_key_), m_origin(origin_), 00272 m_replybuf(NULL), m_replylen(0) 00273 { 00274 strcpy(m_timetoken, "0"); 00275 } 00276 00277 PubNub::~PubNub() 00278 { 00279 if (m_replybuf) 00280 free(m_replybuf); 00281 } 00282 00283 const char * 00284 PubNub::origin_hostname() 00285 { 00286 /* TODO: More generic URL handling */ 00287 return m_origin + strlen("http://"); 00288 } 00289 00290 00291 PubNubRes 00292 PubNub::publish(const char *channel, const char *message, char **reply) 00293 { 00294 PubNubHTTP http; 00295 PubNubRes res = http.http_connect(origin_hostname()); 00296 if (res != PNR_OK) 00297 return res; 00298 00299 http.sendstr("GET /publish/"); 00300 http.sendstr(m_publish_key); 00301 http.sendstr("/"); 00302 http.sendstr(m_subscribe_key); 00303 http.sendstr("/0/"); 00304 http.sendstr(channel); 00305 http.sendstr("/0/"); 00306 http.sendstr_urlenc(message); 00307 00308 char *locreply; 00309 if (!reply) 00310 reply = &locreply; 00311 int replylen; 00312 00313 res = http.http_request_bh(origin_hostname(), reply, &replylen); 00314 if (res != PNR_OK) 00315 return res; 00316 bool success = (*reply)[1] == '1' && (*reply)[2] == ','; 00317 if (reply == &locreply) 00318 free(locreply); 00319 return success ? PNR_OK : PNR_PUBNUB_ERROR; 00320 } 00321 00322 00323 PubNubRes 00324 PubNub::subscribe(const char *channel, char **reply) 00325 { 00326 if (m_replybuf) { 00327 int prevlen = strlen(m_replybuf); 00328 //DBG printf("reply (%s) %d > %d: %d\r\n", m_replybuf, prevlen, m_replylen); 00329 if (prevlen < m_replylen) { 00330 /* Next message from stash-away buffer. */ 00331 /* XXX: We can be either memory-frugal or CPU-frugal 00332 * here. We choose to be memory-frugal by copying 00333 * over messages many times, but we may want to make 00334 * this configurable. */ 00335 m_replylen -= prevlen + 1; 00336 memmove(m_replybuf, m_replybuf + prevlen + 1, m_replylen); 00337 m_replybuf = (char *) realloc(m_replybuf, m_replylen); 00338 *reply = m_replybuf; 00339 return PNR_OK; 00340 00341 } else { 00342 /* That's all. free() and fetch new messages. */ 00343 free(m_replybuf); 00344 m_replybuf = NULL; 00345 m_replylen = 0; 00346 } 00347 } 00348 00349 PubNubHTTP http; 00350 PubNubRes res; 00351 res = http.http_connect(origin_hostname()); 00352 if (res != PNR_OK) 00353 return res; 00354 00355 http.sendstr("GET /subscribe/"); 00356 http.sendstr(m_subscribe_key); 00357 http.sendstr("/"); 00358 http.sendstr(channel); 00359 http.sendstr("/0/"); 00360 http.sendstr(m_timetoken); 00361 00362 char *replybuf = NULL; 00363 int replylen; 00364 res = http.http_request_bh(origin_hostname(), &replybuf, &replylen); 00365 if (res != PNR_OK) 00366 goto error; 00367 00368 /* Process the reply, sets timetoken and m_replybuf, m_replylen. */ 00369 res = subscribe_processjson(replybuf, replylen); 00370 if (res != PNR_OK) 00371 goto error; 00372 replybuf = NULL; // freed by processjosn 00373 00374 /* Split JSON array to messages. */ 00375 res = split_array(m_replybuf, m_replylen); 00376 if (res != PNR_OK) { 00377 free(m_replybuf); 00378 goto error; 00379 } 00380 00381 *reply = m_replybuf; 00382 return res; 00383 00384 error: 00385 if (res == PNR_FORMAT_ERROR) { 00386 /* In case of PubNub protocol error, abort an ongoing 00387 * subscribe and start over. This means some messages 00388 * were lost, but allows us to recover from bad 00389 * situations, e.g. too many messages queued or 00390 * unexpected problem caused by a particular message. */ 00391 strcpy(m_timetoken, "0"); 00392 } 00393 if (reply) 00394 free(reply); 00395 m_replybuf = NULL; 00396 m_replylen = 0; 00397 return res; 00398 } 00399 00400 PubNubRes 00401 PubNub::subscribe_processjson(char *reply, int replylen) 00402 { 00403 if (reply[0] != '[' || reply[replylen-1] != ']' 00404 || reply[replylen-2] != '"') { 00405 DBG printf("bad reply '%s'\r\n", reply); 00406 return PNR_FORMAT_ERROR; 00407 } 00408 00409 /* Extract timetoken. */ 00410 reply[replylen-2] = 0; 00411 int i; 00412 for (i = replylen-3; i > 0 && i > int(replylen-3 - (sizeof(m_timetoken)-1)); i--) 00413 if (reply[i] == '"') 00414 break; 00415 if (!i || reply[i-1] != ',' || replylen-2 - (i+1) >= 64) { 00416 DBG printf("bad reply '%s'\r\n", reply); 00417 return PNR_FORMAT_ERROR; 00418 } 00419 strcpy(m_timetoken, &reply[i+1]); 00420 reply[i-1] = 0; // terminate the [] message array 00421 00422 /* Empty reply? */ 00423 if (i == 4) { /* "[[]" */ 00424 free(reply); 00425 m_replybuf = NULL; 00426 m_replylen = 0; 00427 return PNR_OK; 00428 } 00429 00430 /* Extract the messages array. */ 00431 if (reply[1] != '[' || reply[i-2] != ']') { 00432 DBG printf("bad reply end '%s'\r\n", reply); 00433 return PNR_FORMAT_ERROR; 00434 } 00435 reply[i-2] = 0; 00436 00437 /* Shrink memory buffer to bare minimum. */ 00438 memmove(reply, reply + 2, i-2-2); 00439 m_replylen = i-2-2; 00440 m_replybuf = (char *) realloc(reply, m_replylen); 00441 00442 return PNR_OK; 00443 } 00444 00445 00446 PubNubRes 00447 PubNub::history(const char *channel, char **reply, int *replysize, int limit) 00448 { 00449 PubNubHTTP http; 00450 PubNubRes res; 00451 res = http.http_connect(origin_hostname()); 00452 if (res != PNR_OK) 00453 return res; 00454 00455 http.sendstr("GET /history/"); 00456 http.sendstr(m_subscribe_key); 00457 http.sendstr("/"); 00458 http.sendstr(channel); 00459 http.sendstr("/0/"); 00460 char limitbuf[8]; snprintf(limitbuf, sizeof(limitbuf), "%d", limit); 00461 http.sendstr(limitbuf); 00462 00463 char *replybuf = NULL; 00464 int replylen; 00465 res = http.http_request_bh(origin_hostname(), &replybuf, &replylen); 00466 if (res != PNR_OK) 00467 goto error; 00468 00469 /* Extract from the array and split it. */ 00470 00471 if (replybuf[0] != '[' || replybuf[replylen-1] != ']') { 00472 res = PNR_FORMAT_ERROR; 00473 goto error; 00474 } 00475 00476 replylen -= 2; 00477 if (replylen == 0) { /* The reply was [] */ 00478 free(replybuf); 00479 *reply = NULL; 00480 *replysize = 0; 00481 return PNR_OK; 00482 } 00483 00484 memmove(replybuf, replybuf + 1, replylen); 00485 replybuf[replylen] = 0; 00486 00487 res = split_array(replybuf, replylen); 00488 if (res != PNR_OK) 00489 goto error; 00490 00491 *reply = replybuf; 00492 *replysize = replylen; 00493 return PNR_OK; 00494 00495 error: 00496 free(replybuf); 00497 return res; 00498 } 00499 00500 00501 PubNubRes 00502 PubNub::time(char *ts) 00503 { 00504 PubNubHTTP http; 00505 PubNubRes res = http.http_connect(origin_hostname()); 00506 if (res != PNR_OK) 00507 return res; 00508 00509 http.sendstr("GET /time/0"); 00510 00511 char *reply; 00512 int replylen; 00513 res = http.http_request_bh(origin_hostname(), &reply, &replylen); 00514 if (res != PNR_OK) 00515 return res; 00516 00517 if (replylen < 3 || replylen > 32 || reply[0] != '[' || reply[replylen-1] != ']') { 00518 free(reply); 00519 return PNR_FORMAT_ERROR; 00520 } 00521 00522 replylen -= 2; 00523 memcpy(ts, reply + 1, replylen); 00524 ts[replylen] = 0; 00525 00526 free(reply); 00527 return PNR_OK; 00528 }
Generated on Tue Jul 12 2022 13:56:24 by 1.7.2