M2XStreamClient fork with a workaround in M2XStreamClient.cpp for the MTS_Wifi_Connect_M2X example.

Dependents:   STM32_MTS_Wifi_Connect_M2X M2X_STM32_MTS_Temp MTS_WiFi_Connect_M2X_Example

Fork of M2XStreamClient by AT&T M2X Team

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers M2XStreamClient.cpp Source File

M2XStreamClient.cpp

00001 #include "M2XStreamClient.h"
00002 
00003 #include <jsonlite.h>
00004 
00005 #include "StreamParseFunctions.h"
00006 #include "LocationParseFunctions.h"
00007 
00008 const char* M2XStreamClient::kDefaultM2XHost = "api-m2x.att.com";
00009 
00010 int print_encoded_string(Print* print, const char* str);
00011 int tolower(int ch);
00012 
00013 #if defined(ARDUINO_PLATFORM) || defined(MBED_PLATFORM)
00014 int tolower(int ch)
00015 {
00016   // Arduino and mbed use ASCII table, so we can simplify the implementation
00017   if ((ch >= 'A') && (ch <= 'Z')) {
00018     return (ch + 32);
00019   }
00020   return ch;
00021 }
00022 #else
00023 // For other platform, we use libc's tolower by default
00024 #include <ctype.h>
00025 #endif
00026 
00027 M2XStreamClient::M2XStreamClient(Client* client,
00028                                  const char* key,
00029                                  int case_insensitive,
00030                                  const char* host,
00031                                  int port) : _client(client),
00032                                              _key(key),
00033                                              _case_insensitive(case_insensitive),
00034                                              _host(host),
00035                                              _port(port),
00036                                              _null_print() {
00037 }
00038 
00039 #define WRITE_QUERY_PART(client_, started_, name_, str_) { \
00040   if (str_) { \
00041     if (started_) { \
00042       (client_)->print("&"); \
00043     } else { \
00044       (client_)->print("?"); \
00045       started_ = true; \
00046     } \
00047     (client_)->print(name_ "="); \
00048     (client_)->print(str_); \
00049   } \
00050   }
00051 
00052 int M2XStreamClient::fetchValues(const char* feedId, const char* streamName,
00053                                  stream_value_read_callback callback, void* context,
00054                                  const char* startTime, const char* endTime,
00055                                  const char* limit) {
00056   if (_client->connect(_host, _port)) {
00057     bool query_started = false;
00058 
00059     DBGLN("%s", "Connected to M2X server!");
00060     _client->print("GET /v1/feeds/");
00061     print_encoded_string(_client, feedId);
00062     _client->print("/streams/");
00063     print_encoded_string(_client, streamName);
00064     _client->print("/values");
00065 
00066     WRITE_QUERY_PART(_client, query_started, "start", startTime);
00067     WRITE_QUERY_PART(_client, query_started, "end", endTime);
00068     WRITE_QUERY_PART(_client, query_started, "limit", limit);
00069 
00070     _client->println(" HTTP/1.0");
00071     writeHttpHeader(-1);
00072   } else {
00073     DBGLN("%s", "ERROR: Cannot connect to M2X server!");
00074     return E_NOCONNECTION;
00075   }
00076   int status = readStatusCode(false);
00077   if (status == 200) {
00078     readStreamValue(callback, context);
00079   }
00080 
00081   close();
00082   return status;
00083 }
00084 
00085 int M2XStreamClient::readLocation(const char* feedId,
00086                                   location_read_callback callback,
00087                                   void* context) {
00088   if (_client->connect(_host, _port)) {
00089     DBGLN("%s", "Connected to M2X server!");
00090     _client->print("GET /v1/feeds/");
00091     print_encoded_string(_client, feedId);
00092     _client->println("/location HTTP/1.0");
00093 
00094     writeHttpHeader(-1);
00095   } else {
00096     DBGLN("%s", "ERROR: Cannot connect to M2X server!");
00097     return E_NOCONNECTION;
00098   }
00099   int status = readStatusCode(false);
00100   if (status == 200) {
00101     readLocation(callback, context);
00102   }
00103 
00104   close();
00105   return status;
00106 }
00107 
00108 // Encodes and prints string using Percent-encoding specified
00109 // in RFC 1738, Section 2.2
00110 int print_encoded_string(Print* print, const char* str) {
00111   int bytes = 0;
00112   for (int i = 0; str[i] != 0; i++) {
00113     if (((str[i] >= 'A') && (str[i] <= 'Z')) ||
00114         ((str[i] >= 'a') && (str[i] <= 'z')) ||
00115         ((str[i] >= '0') && (str[i] <= '9')) ||
00116         (str[i] == '-') || (str[i] == '_') ||
00117         (str[i] == '.') || (str[i] == '~')) {
00118       bytes += print->print(str[i]);
00119     } else {
00120       // Encode all other characters
00121       bytes += print->print('%');
00122       bytes += print->print(HEX(str[i] / 16));
00123       bytes += print->print(HEX(str[i] % 16));
00124     }
00125   }
00126   return bytes;
00127 }
00128 
00129 void M2XStreamClient::writePostHeader(const char* feedId,
00130                                       const char* streamName,
00131                                       int contentLength) {
00132   _client->print("PUT /v1/feeds/");
00133   print_encoded_string(_client, feedId);
00134   _client->print("/streams/");
00135   print_encoded_string(_client, streamName);
00136   _client->println(" HTTP/1.0");
00137 
00138   writeHttpHeader(contentLength);
00139 }
00140 
00141 void M2XStreamClient::writeHttpHeader(int contentLength) {
00142   _client->println(USER_AGENT);
00143   _client->print("X-M2X-KEY: ");
00144   _client->println(_key);
00145 
00146   _client->print("Host: ");
00147   print_encoded_string(_client, _host);
00148   if (_port != kDefaultM2XPort) {
00149     _client->print(":");
00150     // port is an integer, does not need encoding
00151     _client->print(_port);
00152   }
00153   _client->println();
00154 
00155   if (contentLength > 0) {
00156     _client->println("Content-Type: application/json");
00157     DBG("%s", "Content Length: ");
00158     DBGLN("%d", contentLength);
00159 
00160     _client->print("Content-Length: ");
00161     _client->println(contentLength);
00162   }
00163   _client->println();
00164 }
00165 
00166 int M2XStreamClient::waitForString(const char* str) {
00167   int currentIndex = 0;
00168   if (str[currentIndex] == '\0') return E_OK;
00169 
00170   while (true) {
00171     while (_client->available()) {
00172       char c = _client->read();
00173       DBG("%c", c);
00174 
00175       int cmp;
00176       if (_case_insensitive) {
00177         cmp = tolower(c) - tolower(str[currentIndex]);
00178       } else {
00179         cmp = c - str[currentIndex];
00180       }
00181 
00182       if ((str[currentIndex] == '*') || (cmp == 0)) {
00183         currentIndex++;
00184         if (str[currentIndex] == '\0') {
00185           return E_OK;
00186         }
00187       } else {
00188         // start from the beginning
00189         currentIndex = 0;
00190       }
00191     }
00192 
00193     if (!_client->connected()) {
00194       DBGLN("%s", "ERROR: The client is disconnected from the server!");
00195 
00196       close();
00197       return E_DISCONNECTED;
00198     }
00199 
00200     delay(1000);
00201   }
00202   // never reached here
00203   return E_NOTREACHABLE;
00204 }
00205 
00206 int M2XStreamClient::readStatusCode(bool closeClient) {
00207   int responseCode = 0;
00208   int ret = waitForString("HTTP/*.* ");
00209   if (ret != E_OK) {
00210     if (closeClient) close();
00211     return ret;
00212   }
00213 
00214   // ret is not needed from here(since it must be E_OK), so we can use it
00215   // as a regular variable now.
00216   ret = 0;
00217   while (true) {
00218     while (_client->available()) {
00219       char c = _client->read();
00220       DBG("%c", c);
00221 
00222       responseCode = responseCode * 10 + (c - '0');
00223       ret++;
00224       if (ret == 3) {
00225         if (closeClient) close();
00226         return responseCode;
00227       }
00228     }
00229 
00230     if (!_client->connected()) {
00231       DBGLN("%s", "ERROR: The client is disconnected from the server!");
00232 
00233       if (closeClient) close();
00234       return E_DISCONNECTED;
00235     }
00236 
00237     delay(1000);
00238   }
00239 
00240   // never reached here
00241   return E_NOTREACHABLE;
00242 }
00243 
00244 int M2XStreamClient::readContentLength() {
00245   int ret = waitForString("Content-Length: ");
00246   if (ret != E_OK) {
00247     return ret;
00248   }
00249 
00250   // From now on, ret is not needed, we can use it
00251   // to keep the final result
00252   ret = 0;
00253   while (true) {
00254     while (_client->available()) {
00255       char c = _client->read();
00256       DBG("%c", c);
00257 
00258       if ((c == '\r') || (c == '\n')) {
00259         return (ret == 0) ? (E_INVALID) : (ret);
00260       } else {
00261         ret = ret * 10 + (c - '0');
00262       }
00263     }
00264 
00265     if (!_client->connected()) {
00266       DBGLN("%s", "ERROR: The client is disconnected from the server!");
00267 
00268       return E_DISCONNECTED;
00269     }
00270 
00271     delay(1000);
00272   }
00273 
00274   // never reached here
00275   return E_NOTREACHABLE;
00276 }
00277 
00278 int M2XStreamClient::skipHttpHeader() {
00279   return waitForString("\r\n\r\n");
00280 }
00281 
00282 void M2XStreamClient::close() {
00283   // Eats up buffered data before closing
00284   _client->flush();
00285   _client->stop();
00286 }
00287 
00288 int M2XStreamClient::readStreamValue(stream_value_read_callback callback,
00289                                      void* context) {
00290   const int BUF_LEN = 32;
00291   char buf[BUF_LEN];
00292 
00293   int length = readContentLength();
00294   if (length < 0) {
00295     close();
00296     return length;
00297   }
00298 
00299   int index = skipHttpHeader();
00300   if (index != E_OK) {
00301     close();
00302     return index;
00303   }
00304   index = 0;
00305 
00306   stream_parsing_context_state state;
00307   state.state = state.index = 0;
00308   state.callback = callback;
00309   state.context = context;
00310 
00311   jsonlite_parser_callbacks cbs = jsonlite_default_callbacks;
00312   cbs.key_found = on_stream_key_found;
00313   cbs.string_found = on_stream_string_found;
00314   cbs.context.client_state = &state;
00315 
00316   jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5));
00317   jsonlite_parser_set_callback(p, &cbs);
00318 
00319   jsonlite_result result = jsonlite_result_unknown;
00320   
00321   wait_ms(100);    // Workaround - 100ms delay necessary for data fetches to work
00322   while (index < length) {
00323     int i = 0;
00324     
00325     DBG("%s", "Received Data: ");
00326     while ((i < BUF_LEN) && _client->available()) {
00327       buf[i++] = _client->read();
00328       DBG("%c", buf[i - 1]);
00329     }
00330     DBGLNEND;
00331 
00332     if ((!_client->connected()) &&
00333         (!_client->available()) &&
00334         ((index + i) < length)) {
00335       jsonlite_parser_release(p);
00336       close();
00337       return E_NOCONNECTION;
00338     }
00339 
00340     result = jsonlite_parser_tokenize(p, buf, i);
00341     if ((result != jsonlite_result_ok) &&
00342         (result != jsonlite_result_end_of_stream)) {
00343       jsonlite_parser_release(p);
00344       close();
00345       return E_JSON_INVALID;
00346     }
00347 
00348     index += i;
00349   }
00350 
00351   jsonlite_parser_release(p);
00352   close();
00353   return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID);
00354 }
00355 
00356 int M2XStreamClient::readLocation(location_read_callback callback,
00357                                   void* context) {
00358   const int BUF_LEN = 40;
00359   char buf[BUF_LEN];
00360     
00361   int length = readContentLength();
00362   if (length < 0) {
00363     close();
00364     return length;
00365   }
00366   
00367   int index = skipHttpHeader();
00368   if (index != E_OK) {
00369     close();
00370     return index;
00371   }
00372   index = 0;
00373   
00374   location_parsing_context_state state;
00375   state.state = state.index = 0;
00376   state.callback = callback;
00377   state.context = context;
00378 
00379   jsonlite_parser_callbacks cbs = jsonlite_default_callbacks;
00380   cbs.key_found = on_location_key_found;
00381   cbs.string_found = on_location_string_found;
00382   cbs.context.client_state = &state;
00383 
00384   jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5));
00385   jsonlite_parser_set_callback(p, &cbs);
00386 
00387   jsonlite_result result = jsonlite_result_unknown;
00388   
00389   wait_ms(100);    // Workaround - 100ms delay necessary for data fetches to work
00390   while (index < length) {
00391     int i = 0;
00392           
00393     DBG("%s", "Received Data: ");
00394     while ((i < BUF_LEN) && _client->available()) {
00395       buf[i++] = _client->read();
00396       DBG("%c", buf[i - 1]);
00397     }
00398     DBGLNEND;
00399 
00400     if ((!_client->connected()) &&
00401         (!_client->available()) &&
00402         ((index + i) < length)) {
00403       jsonlite_parser_release(p);
00404       close();
00405       return E_NOCONNECTION;
00406     }
00407 
00408     result = jsonlite_parser_tokenize(p, buf, i);
00409     if ((result != jsonlite_result_ok) &&
00410         (result != jsonlite_result_end_of_stream)) {
00411       jsonlite_parser_release(p);
00412       close();
00413       return E_JSON_INVALID;
00414     }
00415 
00416     index += i;
00417   }
00418   
00419   jsonlite_parser_release(p);
00420   close();
00421   return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID);
00422 }