branch with improvemnts

Fork of M2XStreamClient by AT&T M2X Team

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers M2XStreamClient.cpp Source File

M2XStreamClient.cpp

00001 #include "M2XStreamClient.h"
00002 
00003 #include <jsonlite.h>
00004 
00005 #include "StreamParseFunctions.h"
00006 #include "LocationParseFunctions.h"
00007 
00008 const char* M2XStreamClient::kDefaultM2XHost = "api-m2x.att.com";
00009 
00010 static int write_delete_values(Print* print, const char* from, const char* end);
00011 int print_encoded_string(Print* print, const char* str);
00012 int tolower(int ch);
00013 
00014 #if defined(ARDUINO_PLATFORM) || defined(MBED_PLATFORM)
00015 int tolower(int ch)
00016 {
00017   // Arduino and mbed use ASCII table, so we can simplify the implementation
00018   if ((ch >= 'A') && (ch <= 'Z')) {
00019     return (ch + 32);
00020   }
00021   return ch;
00022 }
00023 #else
00024 // For other platform, we use libc's tolower by default
00025 #include <ctype.h>
00026 #endif
00027 
00028 M2XStreamClient::M2XStreamClient(Client* client,
00029                                  const char* key,
00030                                  int case_insensitive,
00031                                  const char* host,
00032                                  int port) : _client(client),
00033                                              _key(key),
00034                                              _case_insensitive(case_insensitive),
00035                                              _host(host),
00036                                              _port(port),
00037                                              _null_print() {
00038 }
00039 
00040 #define WRITE_QUERY_PART(client_, started_, name_, str_) { \
00041   if (str_) { \
00042     if (started_) { \
00043       (client_)->print("&"); \
00044     } else { \
00045       (client_)->print("?"); \
00046       started_ = true; \
00047     } \
00048     (client_)->print(name_ "="); \
00049     (client_)->print(str_); \
00050   } \
00051   }
00052 
00053 int M2XStreamClient::fetchValues(const char* feedId, const char* streamName,
00054                                  stream_value_read_callback callback, void* context,
00055                                  const char* startTime, const char* endTime,
00056                                  const char* limit) {
00057   if (_client->connect(_host, _port)) {
00058     bool query_started = false;
00059 
00060     DBGLN("%s", "Connected to M2X server!");
00061     _client->print("GET /v1/feeds/");
00062     print_encoded_string(_client, feedId);
00063     _client->print("/streams/");
00064     print_encoded_string(_client, streamName);
00065     _client->print("/values");
00066 
00067     WRITE_QUERY_PART(_client, query_started, "start", startTime);
00068     WRITE_QUERY_PART(_client, query_started, "end", endTime);
00069     WRITE_QUERY_PART(_client, query_started, "limit", limit);
00070 
00071     _client->println(" HTTP/1.0");
00072     writeHttpHeader(-1);
00073   } else {
00074     DBGLN("%s", "ERROR: Cannot connect to M2X server!");
00075     return E_NOCONNECTION;
00076   }
00077   int status = readStatusCode(false);
00078   if (status == 200) {
00079     readStreamValue(callback, context);
00080   }
00081 
00082   close();
00083   return status;
00084 }
00085 
00086 int M2XStreamClient::readLocation(const char* feedId,
00087                                   location_read_callback callback,
00088                                   void* context) {
00089   if (_client->connect(_host, _port)) {
00090     DBGLN("%s", "Connected to M2X server!");
00091     _client->print("GET /v1/feeds/");
00092     print_encoded_string(_client, feedId);
00093     _client->println("/location HTTP/1.0");
00094 
00095     writeHttpHeader(-1);
00096   } else {
00097     DBGLN("%s", "ERROR: Cannot connect to M2X server!");
00098     return E_NOCONNECTION;
00099   }
00100   int status = readStatusCode(false);
00101   if (status == 200) {
00102     readLocation(callback, context);
00103   }
00104 
00105   close();
00106   return status;
00107 }
00108 
00109 int M2XStreamClient::deleteValues(const char* feedId, const char* streamName,
00110                                   const char* from, const char* end) {
00111   if (_client->connect(_host, _port)) {
00112     DBGLN("%s", "Connected to M2X server!");
00113     int length = write_delete_values(&_null_print, from, end);
00114     writeDeleteHeader(feedId, streamName, length);
00115     write_delete_values(_client, from, end);
00116   } else {
00117     DBGLN("%s", "ERROR: Cannot connect to M2X server!");
00118     return E_NOCONNECTION;
00119   }
00120 
00121   return readStatusCode(true);
00122 }
00123 
00124 static int write_delete_values(Print* print, const char* from,
00125                                const char* end) {
00126   int bytes = 0;
00127   bytes += print->print("{\"from\":\"");
00128   bytes += print->print(from);
00129   bytes += print->print("\",\"end\":\"");
00130   bytes += print->print(end);
00131   bytes += print->print("\"}");
00132   return bytes;
00133 }
00134 
00135 // Encodes and prints string using Percent-encoding specified
00136 // in RFC 1738, Section 2.2
00137 int print_encoded_string(Print* print, const char* str) {
00138   int bytes = 0;
00139   for (int i = 0; str[i] != 0; i++) {
00140     if (((str[i] >= 'A') && (str[i] <= 'Z')) ||
00141         ((str[i] >= 'a') && (str[i] <= 'z')) ||
00142         ((str[i] >= '0') && (str[i] <= '9')) ||
00143         (str[i] == '-') || (str[i] == '_') ||
00144         (str[i] == '.') || (str[i] == '~')) {
00145       bytes += print->print(str[i]);
00146     } else {
00147       // Encode all other characters
00148       bytes += print->print('%');
00149       bytes += print->print(HEX(str[i] / 16));
00150       bytes += print->print(HEX(str[i] % 16));
00151     }
00152   }
00153   return bytes;
00154 }
00155 
00156 void M2XStreamClient::writePutHeader(const char* feedId,
00157                                      const char* streamName,
00158                                      int contentLength) {
00159   _client->print("PUT /v1/feeds/");
00160   print_encoded_string(_client, feedId);
00161   _client->print("/streams/");
00162   print_encoded_string(_client, streamName);
00163   _client->println(" HTTP/1.0");
00164 
00165   writeHttpHeader(contentLength);
00166 }
00167 
00168 void M2XStreamClient::writeDeleteHeader(const char* feedId,
00169                                         const char* streamName,
00170                                         int contentLength) {
00171   _client->print("DELETE /v1/feeds/");
00172   print_encoded_string(_client, feedId);
00173   _client->print("/streams/");
00174   print_encoded_string(_client, streamName);
00175   _client->print("/values");
00176   _client->println(" HTTP/1.0");
00177 
00178   writeHttpHeader(contentLength);
00179 }
00180 
00181 void M2XStreamClient::writeHttpHeader(int contentLength) {
00182   _client->println(USER_AGENT);
00183   _client->print("X-M2X-KEY: ");
00184   _client->println(_key);
00185 
00186   _client->print("Host: ");
00187   print_encoded_string(_client, _host);
00188   if (_port != kDefaultM2XPort) {
00189     _client->print(":");
00190     // port is an integer, does not need encoding
00191     _client->print(_port);
00192   }
00193   _client->println();
00194 
00195   if (contentLength > 0) {
00196     _client->println("Content-Type: application/json");
00197     DBG("%s", "Content Length: ");
00198     DBGLN("%d", contentLength);
00199 
00200     _client->print("Content-Length: ");
00201     _client->println(contentLength);
00202   }
00203   _client->println();
00204 }
00205 
00206 int M2XStreamClient::waitForString(const char* str) {
00207   int currentIndex = 0;
00208   if (str[currentIndex] == '\0') return E_OK;
00209 
00210   while (true) {
00211     while (_client->available()) {
00212       char c = _client->read();
00213       DBG("%c", c);
00214 
00215       int cmp;
00216       if (_case_insensitive) {
00217         cmp = tolower(c) - tolower(str[currentIndex]);
00218       } else {
00219         cmp = c - str[currentIndex];
00220       }
00221 
00222       if ((str[currentIndex] == '*') || (cmp == 0)) {
00223         currentIndex++;
00224         if (str[currentIndex] == '\0') {
00225           return E_OK;
00226         }
00227       } else {
00228         // start from the beginning
00229         currentIndex = 0;
00230       }
00231     }
00232 
00233     if (!_client->connected()) {
00234       DBGLN("%s", "ERROR: The client is disconnected from the server!");
00235 
00236       close();
00237       return E_DISCONNECTED;
00238     }
00239 
00240     delay(1000);
00241   }
00242   // never reached here
00243   return E_NOTREACHABLE;
00244 }
00245 
00246 int M2XStreamClient::readStatusCode(bool closeClient) {
00247   int responseCode = 0;
00248   int ret = waitForString("HTTP/*.* ");
00249   if (ret != E_OK) {
00250     if (closeClient) close();
00251     return ret;
00252   }
00253 
00254   // ret is not needed from here(since it must be E_OK), so we can use it
00255   // as a regular variable now.
00256   ret = 0;
00257   while (true) {
00258     while (_client->available()) {
00259       char c = _client->read();
00260       DBG("%c", c);
00261 
00262       responseCode = responseCode * 10 + (c - '0');
00263       ret++;
00264       if (ret == 3) {
00265         if (closeClient) close();
00266         return responseCode;
00267       }
00268     }
00269 
00270     if (!_client->connected()) {
00271       DBGLN("%s", "ERROR: The client is disconnected from the server!");
00272 
00273       if (closeClient) close();
00274       return E_DISCONNECTED;
00275     }
00276 
00277     delay(1000);
00278   }
00279 
00280   // never reached here
00281   return E_NOTREACHABLE;
00282 }
00283 
00284 int M2XStreamClient::readContentLength() {
00285   int ret = waitForString("Content-Length: ");
00286   if (ret != E_OK) {
00287     return ret;
00288   }
00289 
00290   // From now on, ret is not needed, we can use it
00291   // to keep the final result
00292   ret = 0;
00293   while (true) {
00294     while (_client->available()) {
00295       char c = _client->read();
00296       DBG("%c", c);
00297 
00298       if ((c == '\r') || (c == '\n')) {
00299         return (ret == 0) ? (E_INVALID) : (ret);
00300       } else {
00301         ret = ret * 10 + (c - '0');
00302       }
00303     }
00304 
00305     if (!_client->connected()) {
00306       DBGLN("%s", "ERROR: The client is disconnected from the server!");
00307 
00308       return E_DISCONNECTED;
00309     }
00310 
00311     delay(1000);
00312   }
00313 
00314   // never reached here
00315   return E_NOTREACHABLE;
00316 }
00317 
00318 int M2XStreamClient::skipHttpHeader() {
00319   return waitForString("\r\n\r\n");
00320 }
00321 
00322 void M2XStreamClient::close() {
00323   // Eats up buffered data before closing
00324   _client->flush();
00325   _client->stop();
00326 }
00327 
00328 int M2XStreamClient::readStreamValue(stream_value_read_callback callback,
00329                                      void* context) {
00330   const int BUF_LEN = 64;
00331   char buf[BUF_LEN];
00332 
00333   int length = readContentLength();
00334   if (length < 0) {
00335     close();
00336     return length;
00337   }
00338 
00339   int index = skipHttpHeader();
00340   if (index != E_OK) {
00341     close();
00342     return index;
00343   }
00344   index = 0;
00345 
00346   stream_parsing_context_state state;
00347   state.state = state.index = 0;
00348   state.callback = callback;
00349   state.context = context;
00350 
00351   jsonlite_parser_callbacks cbs = jsonlite_default_callbacks;
00352   cbs.key_found = on_stream_key_found;
00353   cbs.number_found = on_stream_number_found;
00354   cbs.string_found = on_stream_string_found;
00355   cbs.context.client_state = &state;
00356 
00357   jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5));
00358   jsonlite_parser_set_callback(p, &cbs);
00359 
00360   jsonlite_result result = jsonlite_result_unknown;
00361   while (index < length) {
00362     int i = 0;
00363 
00364     DBG("%s", "Received Data: ");
00365     while ((i < BUF_LEN) && _client->available()) {
00366       buf[i++] = _client->read();
00367       DBG("%c", buf[i - 1]);
00368     }
00369     DBGLNEND;
00370 
00371     if ((!_client->connected()) &&
00372         (!_client->available()) &&
00373         ((index + i) < length)) {
00374       jsonlite_parser_release(p);
00375       close();
00376       return E_NOCONNECTION;
00377     }
00378 
00379     result = jsonlite_parser_tokenize(p, buf, i);
00380     if ((result != jsonlite_result_ok) &&
00381         (result != jsonlite_result_end_of_stream)) {
00382       jsonlite_parser_release(p);
00383       close();
00384       return E_JSON_INVALID;
00385     }
00386 
00387     index += i;
00388   }
00389 
00390   jsonlite_parser_release(p);
00391   close();
00392   return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID);
00393 }
00394 
00395 int M2XStreamClient::readLocation(location_read_callback callback,
00396                                   void* context) {
00397   const int BUF_LEN = 40;
00398   char buf[BUF_LEN];
00399 
00400   int length = readContentLength();
00401   if (length < 0) {
00402     close();
00403     return length;
00404   }
00405 
00406   int index = skipHttpHeader();
00407   if (index != E_OK) {
00408     close();
00409     return index;
00410   }
00411   index = 0;
00412 
00413   location_parsing_context_state state;
00414   state.state = state.index = 0;
00415   state.callback = callback;
00416   state.context = context;
00417 
00418   jsonlite_parser_callbacks cbs = jsonlite_default_callbacks;
00419   cbs.key_found = on_location_key_found;
00420   cbs.string_found = on_location_string_found;
00421   cbs.context.client_state = &state;
00422 
00423   jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5));
00424   jsonlite_parser_set_callback(p, &cbs);
00425 
00426   jsonlite_result result = jsonlite_result_unknown;
00427   while (index < length) {
00428     int i = 0;
00429 
00430     DBG("%s", "Received Data: ");
00431     while ((i < BUF_LEN) && _client->available()) {
00432       buf[i++] = _client->read();
00433       DBG("%c", buf[i - 1]);
00434     }
00435     DBGLNEND;
00436 
00437     if ((!_client->connected()) &&
00438         (!_client->available()) &&
00439         ((index + i) < length)) {
00440       jsonlite_parser_release(p);
00441       close();
00442       return E_NOCONNECTION;
00443     }
00444 
00445     result = jsonlite_parser_tokenize(p, buf, i);
00446     if ((result != jsonlite_result_ok) &&
00447         (result != jsonlite_result_end_of_stream)) {
00448       jsonlite_parser_release(p);
00449       close();
00450       return E_JSON_INVALID;
00451     }
00452 
00453     index += i;
00454   }
00455 
00456   jsonlite_parser_release(p);
00457   close();
00458   return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID);
00459 }