Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
PubNub.cpp
00001 #include <cstring> 00002 00003 #include "mbed.h" 00004 #include "EthernetInterface.h" 00005 00006 #include "PubNub.h" 00007 00008 00009 #define DBG if (0) // change to if (1) to enable debug prints on USB serial 00010 00011 00012 /* We roll our own HTTP communication stack as donatien's HTTPClient has 00013 * some hard limits on URL length and this will allow a more frugal RAM 00014 * usage for us. */ 00015 00016 /* Also, we pass messages as serialized JSON strings instead of parsed 00017 * structures. The available JSON parsers (picojson in particular) are 00018 * heavy on many tiny memory allocations, which can get very troublesome 00019 * with mbed's malloc() - we get very high memory fragmentation. In many 00020 * cases, working with raw JSON is just fine, and in our demo example, 00021 * we still show picojson usage for end-user parsing of complex incoming 00022 * messages. */ 00023 00024 00025 /** Custom no-frills HTTP stream. */ 00026 00027 class PubNubHTTP: public TCPSocketConnection { 00028 public: 00029 /* Connect http socket to origin. */ 00030 PubNubRes http_connect(const char *hostname); 00031 00032 /* These methods are used to send the request type and URL. */ 00033 /* Send a given NUL-terminated string over the http socket. */ 00034 void sendstr(const char *string); 00035 /* Send a given NUL-terminated string over the http socket, URL encoded. */ 00036 void sendstr_urlenc(const char *string); 00037 00038 /* A common HTTP request "bottom half" - send the rest of headers 00039 * and wait for reply + receive. */ 00040 PubNubRes http_request_bh(const char *host, char **reply, int *replylen); 00041 00042 protected: 00043 /* These methods represent sub-stages of http_request_bh(). 00044 * They share the state of m_replybuf and m_replylen. */ 00045 PubNubRes http_send_headers(const char *host); 00046 PubNubRes http_recv_headers(); 00047 PubNubRes http_recv_content(); 00048 00049 char *m_replybuf; 00050 int m_replylen; 00051 int m_content_length; 00052 }; 00053 00054 PubNubRes 00055 PubNubHTTP::http_connect(const char *hostname) 00056 { 00057 int ret = connect(hostname, 80); 00058 if (ret < 0) { 00059 close(); 00060 return PNR_IO_ERROR; 00061 } 00062 return PNR_OK; 00063 } 00064 00065 void 00066 PubNubHTTP::sendstr(const char *s) 00067 { 00068 send_all((char *) s, strlen(s)); 00069 } 00070 00071 void 00072 PubNubHTTP::sendstr_urlenc(const char *s) 00073 { 00074 while (s[0]) { 00075 /* RFC 3986 Unreserved characters plus few 00076 * safe reserved ones. */ 00077 size_t okspan = strspn(s, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_.~" ",=:;@[]"); 00078 if (okspan > 0) { 00079 send_all((char *) s, okspan); 00080 s += okspan; 00081 } 00082 if (s[0]) { 00083 /* %-encode a non-ok character. */ 00084 char enc[3] = {'%'}; 00085 enc[1] = "0123456789ABCDEF"[s[0] / 16]; 00086 enc[2] = "0123456789ABCDEF"[s[0] % 16]; 00087 send_all((char *) enc, 3); 00088 s++; 00089 } 00090 } 00091 } 00092 00093 PubNubRes 00094 PubNubHTTP::http_request_bh(const char *host, char **reply, int *replylen) 00095 { 00096 m_replybuf = NULL; 00097 m_replylen = 0; 00098 00099 PubNubRes res; 00100 res = http_send_headers(host); 00101 if (res != PNR_OK) { 00102 close(); 00103 return res; 00104 } 00105 00106 res = http_recv_headers(); 00107 if (res != PNR_OK) { 00108 if (m_replybuf) free(m_replybuf); 00109 close(); 00110 return res; 00111 } 00112 00113 res = http_recv_content(); 00114 if (res != PNR_OK) { 00115 if (m_replybuf) free(m_replybuf); 00116 close(); 00117 return res; 00118 } 00119 00120 close(); 00121 *reply = (char *) realloc(m_replybuf, m_replylen); 00122 *replylen = m_replylen; 00123 return PNR_OK; 00124 } 00125 00126 PubNubRes 00127 PubNubHTTP::http_send_headers(const char *host) 00128 { 00129 /* Finish the first line of the request. */ 00130 sendstr("?pnsdk=PubNub-mbed/0.1 HTTP/1.1\r\n"); 00131 /* Finish HTTP request. */ 00132 sendstr("Host: "); 00133 sendstr(host); 00134 sendstr("\r\nUser-Agent: PubNub-mbed/0.1\r\nConnection: close\r\n\r\n"); 00135 if (!is_connected()) 00136 return PNR_IO_ERROR; 00137 return PNR_OK; 00138 } 00139 00140 PubNubRes 00141 PubNubHTTP::http_recv_headers() 00142 { 00143 /* Now, read HTTP reply. */ 00144 m_replybuf = (char *) malloc(PUBNUB_REPLY_MAXLEN+1); 00145 00146 /* First, receive headers. */ 00147 /* XXX: For now, we assume complete headers will fit in m_replybuf. */ 00148 m_replylen = receive_all(m_replybuf, PUBNUB_REPLY_MAXLEN); 00149 if (m_replylen < 0) 00150 return PNR_IO_ERROR; 00151 m_replybuf[m_replylen] = 0; 00152 00153 char *bufptr = m_replybuf; 00154 00155 /* Parse the first line. */ 00156 if (strncmp(bufptr, "HTTP/1.", 7) || !bufptr[7] || !bufptr[8]) 00157 return PNR_IO_ERROR; 00158 int http_code = atoi(bufptr+9); 00159 if (http_code / 100 != 2) 00160 return PNR_HTTP_ERROR; 00161 bufptr = strstr(bufptr, "\r\n"); 00162 if (!bufptr) 00163 return PNR_IO_ERROR; 00164 bufptr += 2; 00165 00166 /* Parse the rest of the lines. */ 00167 m_content_length = 0; 00168 bool is_chunked = false; 00169 char *newline; 00170 for (;; bufptr = newline+2) { 00171 newline = strstr(bufptr, "\r\n"); 00172 if (!newline) 00173 return PNR_IO_ERROR; 00174 *newline = 0; 00175 00176 char h_chunked[] = "Transfer-Encoding: chunked"; 00177 char h_length[] = "Content-Length: "; 00178 if (*bufptr == 0) { 00179 /* Empty line. End of headers. */ 00180 bufptr += 2; 00181 break; 00182 } else if (!strncmp(bufptr, h_chunked, sizeof(h_chunked)-1)) { 00183 /* Transfer-Encoding: chunked */ 00184 is_chunked = true; 00185 } else if (!strncmp(bufptr, h_length, sizeof(h_length)-1)) { 00186 /* Content-Length: ... */ 00187 m_content_length = atoi(bufptr + sizeof(h_length)-1); 00188 } 00189 } 00190 00191 /* Possibly process a chunk header. */ 00192 if (is_chunked) { 00193 m_content_length = atoi(bufptr); 00194 bufptr = strstr(bufptr, "\r\n"); 00195 if (!bufptr) 00196 return PNR_IO_ERROR; 00197 bufptr += 2; 00198 } 00199 00200 /* Consolidate the buffer. */ 00201 m_replylen -= bufptr - m_replybuf; 00202 memmove(m_replybuf, bufptr, m_replylen); 00203 00204 return PNR_OK; 00205 } 00206 00207 PubNubRes 00208 PubNubHTTP::http_recv_content() 00209 { 00210 /* Now, we are ready to process data! */ 00211 00212 if (m_content_length > PUBNUB_REPLY_MAXLEN) { 00213 /* XXX: Actually, too much data, sorry. */ 00214 DBG printf("too much data\r\n"); 00215 return PNR_FORMAT_ERROR; 00216 } 00217 00218 if (m_replylen < m_content_length) { 00219 /* More data coming. */ 00220 int recv2len = receive_all(m_replybuf + m_replylen, PUBNUB_REPLY_MAXLEN - m_replylen); 00221 if (recv2len < 0) 00222 return PNR_IO_ERROR; 00223 m_replylen += recv2len; 00224 } 00225 00226 if (m_replylen < m_content_length) 00227 return PNR_IO_ERROR; /* Incomplete data. */ 00228 return PNR_OK; 00229 } 00230 00231 00232 /** Some utility routines. */ 00233 00234 /* Split a JSON array (with arbitrary contents) to multiple NUL-terminated 00235 * C strings. */ 00236 static PubNubRes 00237 split_array(char *buf, int len) 00238 { 00239 bool escaped = false, in_string = false; 00240 int bracket_level = 0; 00241 for (int i = 0; i < len; i++) { 00242 if (escaped) { 00243 escaped = false; 00244 } else if (in_string) { 00245 switch (buf[i]) { 00246 case '\\': escaped = true; break; 00247 case '"': in_string = false; break; 00248 default: break; 00249 } 00250 } else { 00251 switch (buf[i]) { 00252 case '"': in_string = true; break; 00253 case '[': case '{': bracket_level++; break; 00254 case ']': case '}': bracket_level--; break; 00255 /* if at root, split! */ 00256 case ',': if (bracket_level == 0) buf[i] = 0; break; 00257 default: break; 00258 } 00259 } 00260 } 00261 DBG printf("parse %d %d %d\r\n", escaped, in_string, bracket_level); 00262 if (escaped || in_string || bracket_level > 0) 00263 return PNR_FORMAT_ERROR; 00264 return PNR_OK; 00265 } 00266 00267 00268 /** PubNub API. */ 00269 00270 PubNub::PubNub(const char *publish_key_, const char *subscribe_key_, const char *origin_) : 00271 m_publish_key(publish_key_), m_subscribe_key(subscribe_key_), m_origin(origin_), 00272 m_replybuf(NULL), m_replylen(0) 00273 { 00274 strcpy(m_timetoken, "0"); 00275 } 00276 00277 PubNub::~PubNub() 00278 { 00279 if (m_replybuf) 00280 free(m_replybuf); 00281 } 00282 00283 const char * 00284 PubNub::origin_hostname() 00285 { 00286 /* TODO: More generic URL handling */ 00287 return m_origin + strlen("http://"); 00288 } 00289 00290 00291 PubNubRes 00292 PubNub::publish(const char *channel, const char *message, char **reply) 00293 { 00294 PubNubHTTP http; 00295 PubNubRes res = http.http_connect(origin_hostname()); 00296 if (res != PNR_OK) 00297 return res; 00298 00299 http.sendstr("GET /publish/"); 00300 http.sendstr(m_publish_key); 00301 http.sendstr("/"); 00302 http.sendstr(m_subscribe_key); 00303 http.sendstr("/0/"); 00304 http.sendstr(channel); 00305 http.sendstr("/0/"); 00306 http.sendstr_urlenc(message); 00307 00308 char *locreply; 00309 if (!reply) 00310 reply = &locreply; 00311 int replylen; 00312 00313 res = http.http_request_bh(origin_hostname(), reply, &replylen); 00314 if (res != PNR_OK) 00315 return res; 00316 bool success = (*reply)[1] == '1' && (*reply)[2] == ','; 00317 if (reply == &locreply) 00318 free(locreply); 00319 return success ? PNR_OK : PNR_PUBNUB_ERROR; 00320 } 00321 00322 00323 PubNubRes 00324 PubNub::subscribe(const char *channel, char **reply) 00325 { 00326 if (m_replybuf) { 00327 int prevlen = strlen(m_replybuf); 00328 //DBG printf("reply (%s) %d > %d: %d\r\n", m_replybuf, prevlen, m_replylen); 00329 if (prevlen < m_replylen) { 00330 /* Next message from stash-away buffer. */ 00331 /* XXX: We can be either memory-frugal or CPU-frugal 00332 * here. We choose to be memory-frugal by copying 00333 * over messages many times, but we may want to make 00334 * this configurable. */ 00335 m_replylen -= prevlen + 1; 00336 memmove(m_replybuf, m_replybuf + prevlen + 1, m_replylen); 00337 m_replybuf = (char *) realloc(m_replybuf, m_replylen); 00338 *reply = m_replybuf; 00339 return PNR_OK; 00340 00341 } else { 00342 /* That's all. free() and fetch new messages. */ 00343 free(m_replybuf); 00344 m_replybuf = NULL; 00345 m_replylen = 0; 00346 } 00347 } 00348 00349 PubNubHTTP http; 00350 PubNubRes res; 00351 res = http.http_connect(origin_hostname()); 00352 if (res != PNR_OK) 00353 return res; 00354 00355 http.sendstr("GET /subscribe/"); 00356 http.sendstr(m_subscribe_key); 00357 http.sendstr("/"); 00358 http.sendstr(channel); 00359 http.sendstr("/0/"); 00360 http.sendstr(m_timetoken); 00361 00362 char *replybuf = NULL; 00363 int replylen; 00364 res = http.http_request_bh(origin_hostname(), &replybuf, &replylen); 00365 if (res != PNR_OK) 00366 goto error; 00367 00368 /* Process the reply, sets timetoken and m_replybuf, m_replylen. */ 00369 res = subscribe_processjson(replybuf, replylen); 00370 if (res != PNR_OK) 00371 goto error; 00372 replybuf = NULL; // freed by processjosn 00373 00374 /* Split JSON array to messages. */ 00375 res = split_array(m_replybuf, m_replylen); 00376 if (res != PNR_OK) { 00377 free(m_replybuf); 00378 goto error; 00379 } 00380 00381 *reply = m_replybuf; 00382 return res; 00383 00384 error: 00385 if (res == PNR_FORMAT_ERROR) { 00386 /* In case of PubNub protocol error, abort an ongoing 00387 * subscribe and start over. This means some messages 00388 * were lost, but allows us to recover from bad 00389 * situations, e.g. too many messages queued or 00390 * unexpected problem caused by a particular message. */ 00391 strcpy(m_timetoken, "0"); 00392 } 00393 if (reply) 00394 free(reply); 00395 m_replybuf = NULL; 00396 m_replylen = 0; 00397 return res; 00398 } 00399 00400 PubNubRes 00401 PubNub::subscribe_processjson(char *reply, int replylen) 00402 { 00403 if (reply[0] != '[' || reply[replylen-1] != ']' 00404 || reply[replylen-2] != '"') { 00405 DBG printf("bad reply '%s'\r\n", reply); 00406 return PNR_FORMAT_ERROR; 00407 } 00408 00409 /* Extract timetoken. */ 00410 reply[replylen-2] = 0; 00411 int i; 00412 for (i = replylen-3; i > 0 && i > int(replylen-3 - (sizeof(m_timetoken)-1)); i--) 00413 if (reply[i] == '"') 00414 break; 00415 if (!i || reply[i-1] != ',' || replylen-2 - (i+1) >= 64) { 00416 DBG printf("bad reply '%s'\r\n", reply); 00417 return PNR_FORMAT_ERROR; 00418 } 00419 strcpy(m_timetoken, &reply[i+1]); 00420 reply[i-1] = 0; // terminate the [] message array 00421 00422 /* Empty reply? */ 00423 if (i == 4) { /* "[[]" */ 00424 free(reply); 00425 m_replybuf = NULL; 00426 m_replylen = 0; 00427 return PNR_OK; 00428 } 00429 00430 /* Extract the messages array. */ 00431 if (reply[1] != '[' || reply[i-2] != ']') { 00432 DBG printf("bad reply end '%s'\r\n", reply); 00433 return PNR_FORMAT_ERROR; 00434 } 00435 reply[i-2] = 0; 00436 00437 /* Shrink memory buffer to bare minimum. */ 00438 memmove(reply, reply + 2, i-2-2); 00439 m_replylen = i-2-2; 00440 m_replybuf = (char *) realloc(reply, m_replylen); 00441 00442 return PNR_OK; 00443 } 00444 00445 00446 PubNubRes 00447 PubNub::history(const char *channel, char **reply, int *replysize, int limit) 00448 { 00449 PubNubHTTP http; 00450 PubNubRes res; 00451 res = http.http_connect(origin_hostname()); 00452 if (res != PNR_OK) 00453 return res; 00454 00455 http.sendstr("GET /history/"); 00456 http.sendstr(m_subscribe_key); 00457 http.sendstr("/"); 00458 http.sendstr(channel); 00459 http.sendstr("/0/"); 00460 char limitbuf[8]; snprintf(limitbuf, sizeof(limitbuf), "%d", limit); 00461 http.sendstr(limitbuf); 00462 00463 char *replybuf = NULL; 00464 int replylen; 00465 res = http.http_request_bh(origin_hostname(), &replybuf, &replylen); 00466 if (res != PNR_OK) 00467 goto error; 00468 00469 /* Extract from the array and split it. */ 00470 00471 if (replybuf[0] != '[' || replybuf[replylen-1] != ']') { 00472 res = PNR_FORMAT_ERROR; 00473 goto error; 00474 } 00475 00476 replylen -= 2; 00477 if (replylen == 0) { /* The reply was [] */ 00478 free(replybuf); 00479 *reply = NULL; 00480 *replysize = 0; 00481 return PNR_OK; 00482 } 00483 00484 memmove(replybuf, replybuf + 1, replylen); 00485 replybuf[replylen] = 0; 00486 00487 res = split_array(replybuf, replylen); 00488 if (res != PNR_OK) 00489 goto error; 00490 00491 *reply = replybuf; 00492 *replysize = replylen; 00493 return PNR_OK; 00494 00495 error: 00496 free(replybuf); 00497 return res; 00498 } 00499 00500 00501 PubNubRes 00502 PubNub::time(char *ts) 00503 { 00504 PubNubHTTP http; 00505 PubNubRes res = http.http_connect(origin_hostname()); 00506 if (res != PNR_OK) 00507 return res; 00508 00509 http.sendstr("GET /time/0"); 00510 00511 char *reply; 00512 int replylen; 00513 res = http.http_request_bh(origin_hostname(), &reply, &replylen); 00514 if (res != PNR_OK) 00515 return res; 00516 00517 if (replylen < 3 || replylen > 32 || reply[0] != '[' || reply[replylen-1] != ']') { 00518 free(reply); 00519 return PNR_FORMAT_ERROR; 00520 } 00521 00522 replylen -= 2; 00523 memcpy(ts, reply + 1, replylen); 00524 ts[replylen] = 0; 00525 00526 free(reply); 00527 return PNR_OK; 00528 }
Generated on Tue Jul 12 2022 13:56:24 by
1.7.2