Jim Flynn / M2XStreamClient-JMF

Dependents:   WNCInterface_M2Xdemo ATT_WNCInterface_Info WNCInterface_HTTP_example Public_IoT_M2X_Cellular_Demo

Fork of M2XStreamClient by AT&T M2X Team

Files at this revision

API Documentation at this revision

Comitter:
citrusbyte
Date:
Fri Jan 01 15:48:18 2016 +0000
Parent:
18:510895884768
Child:
20:32d24a6355b5
Commit message:
Use correct M2XStreamClient.cpp file

Changed in this revision

M2XStreamClient.cpp Show annotated file Show diff for this revision Revisions of this file
M2XStreamClient.h Show annotated file Show diff for this revision Revisions of this file
M2XStreamClient_template.h Show annotated file Show diff for this revision Revisions of this file
--- a/M2XStreamClient.cpp	Mon Dec 28 14:30:33 2015 +0000
+++ b/M2XStreamClient.cpp	Fri Jan 01 15:48:18 2016 +0000
@@ -1,147 +1,527 @@
 #include "M2XStreamClient.h"
 
-static int fill_iso8601_timestamp(int32_t seconds, int32_t milli,
-                                  char* buffer, int* length);
+#include <jsonlite.h>
+
+#include "StreamParseFunctions.h"
+#include "LocationParseFunctions.h"
+
+const char* M2XStreamClient::kDefaultM2XHost = "api-m2x.att.com";
+
+static int write_delete_values(Print* print, const char* from, const char* end);
+int print_encoded_string(Print* print, const char* str);
+int tolower(int ch);
+
+#if defined(ARDUINO_PLATFORM) || defined(MBED_PLATFORM)
+int tolower(int ch)
+{
+  // Arduino and mbed use ASCII table, so we can simplify the implementation
+  if ((ch >= 'A') && (ch <= 'Z')) {
+    return (ch + 32);
+  }
+  return ch;
+}
+#else
+// For other platform, we use libc's tolower by default
+#include <ctype.h>
+#endif
 
-TimeService::TimeService(M2XStreamClient* client) : _client(client) {
+M2XStreamClient::M2XStreamClient(Client* client,
+                                 const char* key,
+                                 int case_insensitive,
+                                 const char* host,
+                                 int port,
+                                 const char* path_prefix) : _client(client),
+                                             _key(key),
+                                             _case_insensitive(case_insensitive),
+                                             _host(host),
+                                             _port(port),
+                                             _path_prefix(path_prefix),
+                                             _null_print() {
+}
+
+int M2XStreamClient::listStreamValues(const char* deviceId, const char* streamName,
+                                      stream_value_read_callback callback, void* context,
+                                      const char* query) {
+  if (_client->connect(_host, _port)) {
+    DBGLN("%s", "Connected to M2X server!");
+    _client->print("GET ");
+    if (_path_prefix)
+        _client->print(_path_prefix);
+    _client->print("/v2/devices/");
+    print_encoded_string(_client, deviceId);
+    _client->print("/streams/");
+    print_encoded_string(_client, streamName);
+    _client->print("/values.json");
+
+    if (query) {
+      if (query[0] != '?') {
+        _client->print('?');
+      }
+      _client->print(query);
+    }
+
+    _client->println(" HTTP/1.0");
+    writeHttpHeader(-1);
+  } else {
+    DBGLN("%s", "ERROR: Cannot connect to M2X server!");
+    return E_NOCONNECTION;
+  }
+  int status = readStatusCode(false);
+  if (status == 200) {
+    readStreamValue(callback, context);
+  }
+
+  close();
+  return status;
 }
 
-int TimeService::init() {
-  _timer.start();
-  return reset();
+int M2XStreamClient::readLocation(const char* deviceId,
+                                  location_read_callback callback,
+                                  void* context) {
+  if (_client->connect(_host, _port)) {
+    DBGLN("%s", "Connected to M2X server!");
+    _client->print("GET ");
+    if (_path_prefix)
+        _client->print(_path_prefix);
+    _client->print("/v2/devices/");
+    print_encoded_string(_client, deviceId);
+    _client->println("/location HTTP/1.0");
+
+    writeHttpHeader(-1);
+  } else {
+    DBGLN("%s", "ERROR: Cannot connect to M2X server!");
+    return E_NOCONNECTION;
+  }
+  int status = readStatusCode(false);
+  if (status == 200) {
+    readLocation(callback, context);
+  }
+
+  close();
+  return status;
+}
+
+int M2XStreamClient::deleteValues(const char* deviceId, const char* streamName,
+                                  const char* from, const char* end) {
+  if (_client->connect(_host, _port)) {
+    DBGLN("%s", "Connected to M2X server!");
+    int length = write_delete_values(&_null_print, from, end);
+    writeDeleteHeader(deviceId, streamName, length);
+    write_delete_values(_client, from, end);
+  } else {
+    DBGLN("%s", "ERROR: Cannot connect to M2X server!");
+    return E_NOCONNECTION;
+  }
+
+  return readStatusCode(true);
 }
 
-int TimeService::reset() {
-  int32_t ts;
-  int status = _client->getTimestamp32(&ts);
+int M2XStreamClient::getTimestamp32(int32_t *ts) {
+  // The maximum value of signed 64-bit integer is 0x7fffffffffffffff,
+  // which is 9223372036854775807. It consists of 19 characters, so a
+  // buffer of 20 is definitely enough here
+  int length = 20;
+  char buffer[20];
+  int status = getTimestamp(buffer, &length);
+  if (status == 200) {
+    int32_t result = 0;
+    for (int i = 0; i < length; i++) {
+      result = result * 10 + (buffer[i] - '0');
+    }
+    if (ts != NULL) { *ts = result; }
+  }
+  return status;
+}
+
+int M2XStreamClient::getTimestamp(char* buffer, int *bufferLength) {
+  if (bufferLength == NULL) { return E_INVALID; }
+  if (_client->connect(_host, _port)) {
+    DBGLN("%s", "Connected to M2X server!");
+    _client->println("GET /v2/time/seconds HTTP/1.0");
 
-  if (m2x_status_is_success(status)) {
-    _server_timestamp = ts;
-    _local_last_milli = _timer.read_ms();
+    writeHttpHeader(-1);
+  } else {
+    DBGLN("%s", "ERROR: Cannot connect to M2X server!");
+    return E_NOCONNECTION;
   }
+  int status = readStatusCode(false);
+  if (status == 200) {
+    int length = readContentLength();
+    if (length < 0) {
+      close();
+      return length;
+    }
+    if (*bufferLength < length) {
+      *bufferLength = length;
+      return E_BUFFER_TOO_SMALL;
+    }
+    *bufferLength = length;
+    int index = skipHttpHeader();
+    if (index != E_OK) {
+      close();
+      return index;
+    }
+    index = 0;
+    while (index < length) {
+      DBG("%s", "Received Data: ");
+      while ((index < length) && _client->available()) {
+        buffer[index++] = _client->read();
+        DBG("%c", buffer[index - 1]);
+      }
+      DBGLNEND;
 
+      if ((!_client->connected()) &&
+          (index < length)) {
+        close();
+        return E_NOCONNECTION;
+      }
+
+      delay(200);
+    }
+  }
+  close();
   return status;
 }
 
-int TimeService::getTimestamp(char* buffer, int* length) {
-  uint32_t now = _timer.read_ms();
-  if (now < _local_last_milli) {
-    // In case of a timestamp overflow, we reset the server timestamp recorded.
-    // Notice that unlike Arduino, mbed would overflow every 30 minutes,
-    // so if 2 calls to this API are more than 30 minutes apart, we are
-    // likely to run into troubles. This is a limitation of the current
-    // mbed platform. However, we argue that it might be a rare case that
-    // 2 calls to this are 30 minutes apart. In most cases, we would call
-    // this API every few seconds or minutes, this won't be a huge problem.
-    // However, if you have a use case that would require 2 intervening
-    // calls to this be 30 minutes apart, you might want to leverage a RTC
-    // clock instead of the simple ticker here.
-    int status = reset();
-    if (!m2x_status_is_success(status)) { return status; }
-    now = _timer.read_ms();
+static int write_delete_values(Print* print, const char* from,
+                               const char* end) {
+  int bytes = 0;
+  bytes += print->print("{\"from\":\"");
+  bytes += print->print(from);
+  bytes += print->print("\",\"end\":\"");
+  bytes += print->print(end);
+  bytes += print->print("\"}");
+  return bytes;
+}
+
+// Encodes and prints string using Percent-encoding specified
+// in RFC 1738, Section 2.2
+int print_encoded_string(Print* print, const char* str) {
+  int bytes = 0;
+  for (int i = 0; str[i] != 0; i++) {
+    if (((str[i] >= 'A') && (str[i] <= 'Z')) ||
+        ((str[i] >= 'a') && (str[i] <= 'z')) ||
+        ((str[i] >= '0') && (str[i] <= '9')) ||
+        (str[i] == '-') || (str[i] == '_') ||
+        (str[i] == '.') || (str[i] == '~')) {
+      bytes += print->print(str[i]);
+    } else {
+      // Encode all other characters
+      bytes += print->print('%');
+      bytes += print->print(HEX(str[i] / 16));
+      bytes += print->print(HEX(str[i] % 16));
+    }
   }
-  if (now < _local_last_milli) {
-    // We have already reseted the timestamp, so this cannot happen,
-    // an HTTP request cannot last 30 minutes, something else must be wrong
-    // here.
-    return E_TIMESTAMP_ERROR;
-  }
-  uint32_t diff = now - _local_last_milli;
-  _local_last_milli = now;
-  _server_timestamp += (int32_t) (diff / 1000); // Milliseconds to seconds
-  return fill_iso8601_timestamp(_server_timestamp, (int32_t) (diff % 1000),
-                                buffer, length);
+  return bytes;
+}
+
+void M2XStreamClient::writePutHeader(const char* deviceId,
+                                     const char* streamName,
+                                     int contentLength) {
+  _client->print("PUT ");
+  if (_path_prefix)
+    _client->print(_path_prefix);
+  _client->print("/v2/devices/");
+  print_encoded_string(_client, deviceId);
+  _client->print("/streams/");
+  print_encoded_string(_client, streamName);
+  _client->println("/value HTTP/1.0");
+
+  writeHttpHeader(contentLength);
+}
+
+void M2XStreamClient::writeDeleteHeader(const char* deviceId,
+                                        const char* streamName,
+                                        int contentLength) {
+  _client->print("DELETE ");
+  if (_path_prefix)
+    _client->print(_path_prefix);
+  _client->print("/v2/devices/");
+  print_encoded_string(_client, deviceId);
+  _client->print("/streams/");
+  print_encoded_string(_client, streamName);
+  _client->print("/values");
+  _client->println(" HTTP/1.0");
+
+  writeHttpHeader(contentLength);
 }
 
-#define SIZE_ISO_8601 25
-static inline bool is_leap_year(int16_t y) {
-  return ((1970 + y) > 0) &&
-      !((1970 + y) % 4) &&
-      (((1970 + y) % 100) || !((1970 + y) % 400));
+void M2XStreamClient::writeHttpHeader(int contentLength) {
+  _client->println(USER_AGENT);
+  _client->print("X-M2X-KEY: ");
+  _client->println(_key);
+
+  _client->print("Host: ");
+  print_encoded_string(_client, _host);
+  if (_port != kDefaultM2XPort) {
+    _client->print(":");
+    // port is an integer, does not need encoding
+    _client->print(_port);
+  }
+  _client->println();
+
+  if (contentLength > 0) {
+    _client->println("Content-Type: application/json");
+    DBG("%s", "Content Length: ");
+    DBGLN("%d", contentLength);
+
+    _client->print("Content-Length: ");
+    _client->println(contentLength);
+  }
+  _client->println();
 }
-static inline int32_t days_in_year(int16_t y) {
-  return is_leap_year(y) ? 366 : 365;
-}
-static const uint8_t MONTH_DAYS[]={31,28,31,30,31,30,31,31,30,31,30,31};
+
+int M2XStreamClient::waitForString(const char* str) {
+  int currentIndex = 0;
+  if (str[currentIndex] == '\0') return E_OK;
+
+  while (true) {
+    while (_client->available()) {
+      char c = _client->read();
+      DBG("%c", c);
+
+      int cmp;
+      if (_case_insensitive) {
+        cmp = tolower(c) - tolower(str[currentIndex]);
+      } else {
+        cmp = c - str[currentIndex];
+      }
 
-static int fill_iso8601_timestamp(int32_t timestamp, int32_t milli,
-                                  char* buffer, int* length) {
-  int16_t year;
-  int8_t month, month_length;
-  int32_t day;
-  int8_t hour, minute, second;
+      if ((str[currentIndex] == '*') || (cmp == 0)) {
+        currentIndex++;
+        if (str[currentIndex] == '\0') {
+          return E_OK;
+        }
+      } else {
+        // start from the beginning
+        currentIndex = 0;
+      }
+    }
+
+    if (!_client->connected()) {
+      DBGLN("%s", "ERROR: The client is disconnected from the server!");
+
+      close();
+      return E_DISCONNECTED;
+    }
+
+    delay(1000);
+  }
+  // never reached here
+  return E_NOTREACHABLE;
+}
 
-  if (*length < SIZE_ISO_8601) {
-    *length = SIZE_ISO_8601;
-    return E_BUFFER_TOO_SMALL;
+int M2XStreamClient::readStatusCode(bool closeClient) {
+  int responseCode = 0;
+  int ret = waitForString("HTTP/*.* ");
+  if (ret != E_OK) {
+    if (closeClient) close();
+    return ret;
+  }
+
+  // ret is not needed from here(since it must be E_OK), so we can use it
+  // as a regular variable now.
+  ret = 0;
+  while (true) {
+    while (_client->available()) {
+      char c = _client->read();
+      DBG("%c", c);
+
+      responseCode = responseCode * 10 + (c - '0');
+      ret++;
+      if (ret == 3) {
+        if (closeClient) close();
+        return responseCode;
+      }
+    }
+
+    if (!_client->connected()) {
+      DBGLN("%s", "ERROR: The client is disconnected from the server!");
+
+      if (closeClient) close();
+      return E_DISCONNECTED;
+    }
+
+    delay(1000);
   }
 
-  second = timestamp % 60;
-  timestamp /= 60; // now it is minutes
+  // never reached here
+  return E_NOTREACHABLE;
+}
 
-  minute = timestamp % 60;
-  timestamp /= 60; // now it is hours
-
-  hour = timestamp % 24;
-  timestamp /= 24; // now it is days
+int M2XStreamClient::readContentLength() {
+  int ret = waitForString("Content-Length: ");
+  if (ret != E_OK) {
+    return ret;
+  }
 
-  year = 0;
-  day = 0;
-  while ((day += days_in_year(year)) <= timestamp) {
-    year++;
-  }
-  day -= days_in_year(year);
-  timestamp -= day; // now it is days in this year, starting at 0
+  // From now on, ret is not needed, we can use it
+  // to keep the final result
+  ret = 0;
+  while (true) {
+    while (_client->available()) {
+      char c = _client->read();
+      DBG("%c", c);
 
-  day = 0;
-  month_length = 0;
-  for (month = 0; month < 12; month++) {
-    if (month == 1) {
-      // February
-      month_length = is_leap_year(year) ? 29 : 28;
-    } else {
-      month_length = MONTH_DAYS[month];
+      if ((c == '\r') || (c == '\n')) {
+        return (ret == 0) ? (E_INVALID) : (ret);
+      } else {
+        ret = ret * 10 + (c - '0');
+      }
+    }
+
+    if (!_client->connected()) {
+      DBGLN("%s", "ERROR: The client is disconnected from the server!");
+
+      return E_DISCONNECTED;
     }
 
-    if (timestamp >= month_length) {
-      timestamp -= month_length;
-    } else {
-      break;
-    }
+    delay(1000);
   }
-  year = 1970 + year;
-  month++; // offset by 1
-  day = timestamp + 1;
+
+  // never reached here
+  return E_NOTREACHABLE;
+}
+
+int M2XStreamClient::skipHttpHeader() {
+  return waitForString("\n\r\n");
+}
+
+void M2XStreamClient::close() {
+  // Eats up buffered data before closing
+  _client->flush();
+  _client->stop();
+}
+
+int M2XStreamClient::readStreamValue(stream_value_read_callback callback,
+                                     void* context) {
+  const int BUF_LEN = 64;
+  char buf[BUF_LEN];
+
+  int length = readContentLength();
+  if (length < 0) {
+    close();
+    return length;
+  }
 
-  int i = 0, j = 0;
+  int index = skipHttpHeader();
+  if (index != E_OK) {
+    close();
+    return index;
+  }
+  index = 0;
+
+  stream_parsing_context_state state;
+  state.state = state.index = 0;
+  state.callback = callback;
+  state.context = context;
+
+  jsonlite_parser_callbacks cbs = jsonlite_default_callbacks;
+  cbs.key_found = on_stream_key_found;
+  cbs.number_found = on_stream_number_found;
+  cbs.string_found = on_stream_string_found;
+  cbs.context.client_state = &state;
 
-  // NOTE: It seems the snprintf implementation in Arduino has bugs,
-  // we have to manually piece the string together here.
-#define INT_TO_STR(v_, width_) \
-  for (j = 0; j < (width_); j++) { \
-    buffer[i + (width_) - 1 - j] = '0' + ((v_) % 10); \
-    (v_) /= 10; \
-  } \
-  i += (width_)
+  jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5));
+  jsonlite_parser_set_callback(p, &cbs);
+
+  jsonlite_result result = jsonlite_result_unknown;
+  while (index < length) {
+    int i = 0;
+
+    DBG("%s", "Received Data: ");
+    while ((i < BUF_LEN) && _client->available()) {
+      buf[i++] = _client->read();
+      DBG("%c", buf[i - 1]);
+    }
+    DBGLNEND;
+
+    if ((!_client->connected()) &&
+        (!_client->available()) &&
+        ((index + i) < length)) {
+      jsonlite_parser_release(p);
+      close();
+      return E_NOCONNECTION;
+    }
 
-  INT_TO_STR(year, 4);
-  buffer[i++] = '-';
-  INT_TO_STR(month, 2);
-  buffer[i++] = '-';
-  INT_TO_STR(day, 2);
-  buffer[i++] = 'T';
-  INT_TO_STR(hour, 2);
-  buffer[i++] = ':';
-  INT_TO_STR(minute, 2);
-  buffer[i++] = ':';
-  INT_TO_STR(second, 2);
-  buffer[i++] = '.';
-  INT_TO_STR(milli, 3);
-  buffer[i++] = 'Z';
-  buffer[i++] = '\0';
+    result = jsonlite_parser_tokenize(p, buf, i);
+    if ((result != jsonlite_result_ok) &&
+        (result != jsonlite_result_end_of_stream)) {
+      jsonlite_parser_release(p);
+      close();
+      return E_JSON_INVALID;
+    }
+
+    index += i;
+  }
+
+  jsonlite_parser_release(p);
+  close();
+  return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID);
+}
+
+int M2XStreamClient::readLocation(location_read_callback callback,
+                                  void* context) {
+  const int BUF_LEN = 40;
+  char buf[BUF_LEN];
+
+  int length = readContentLength();
+  if (length < 0) {
+    close();
+    return length;
+  }
+
+  int index = skipHttpHeader();
+  if (index != E_OK) {
+    close();
+    return index;
+  }
+  index = 0;
+
+  location_parsing_context_state state;
+  state.state = state.index = 0;
+  state.callback = callback;
+  state.context = context;
 
-#undef INT_TO_STR
+  jsonlite_parser_callbacks cbs = jsonlite_default_callbacks;
+  cbs.key_found = on_location_key_found;
+  cbs.string_found = on_location_string_found;
+  cbs.context.client_state = &state;
+
+  jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5));
+  jsonlite_parser_set_callback(p, &cbs);
+
+  jsonlite_result result = jsonlite_result_unknown;
+  while (index < length) {
+    int i = 0;
+
+    DBG("%s", "Received Data: ");
+    while ((i < BUF_LEN) && _client->available()) {
+      buf[i++] = _client->read();
+      DBG("%c", buf[i - 1]);
+    }
+    DBGLNEND;
 
-  *length = i;
-  return E_OK;
+    if ((!_client->connected()) &&
+        (!_client->available()) &&
+        ((index + i) < length)) {
+      jsonlite_parser_release(p);
+      close();
+      return E_NOCONNECTION;
+    }
+
+    result = jsonlite_parser_tokenize(p, buf, i);
+    if ((result != jsonlite_result_ok) &&
+        (result != jsonlite_result_end_of_stream)) {
+      jsonlite_parser_release(p);
+      close();
+      return E_JSON_INVALID;
+    }
+
+    index += i;
+  }
+
+  jsonlite_parser_release(p);
+  close();
+  return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID);
 }
--- a/M2XStreamClient.h	Mon Dec 28 14:30:33 2015 +0000
+++ b/M2XStreamClient.h	Fri Jan 01 15:48:18 2016 +0000
@@ -127,6 +127,20 @@
                         const char* names[], const int counts[],
                         const char* ats[], T values[]);
 
+  // Post multiple values of a single device at once.
+  // +deviceId+ - id of the device to post values
+  // +streamNum+ - Number of streams to post
+  // +names+ - Array of stream names, the length of the array should
+  // be exactly +streamNum+
+  // +values+ - Array of values to post, the length of the array should
+  // be exactly +streamNum+. Notice that the array of +values+ should
+  // match the array of +names+, and that the ith value in +values+ is
+  // exactly the value to post for the ith stream name in +names+
+  template <class T>
+  int postSingleDeviceUpdate(const char* deviceId, int streamNum,
+                             const char* names[], T values[],
+                             const char* at = NULL);
+
   // Fetch values for a particular data stream. Since memory is
   // very limited on an Arduino, we cannot parse and get all the
   // data points in memory. Instead, we use callbacks here: whenever
--- a/M2XStreamClient_template.h	Mon Dec 28 14:30:33 2015 +0000
+++ b/M2XStreamClient_template.h	Fri Jan 01 15:48:18 2016 +0000
@@ -73,6 +73,54 @@
 }
 
 template <class T>
+inline int write_single_device_values(Print* print, int streamNum,
+                                      const char* names[], T values[],
+                                      const char* at) {
+  int bytes = 0;
+  bytes += print->print("{\"values\":{");
+  for (int i = 0; i < streamNum; i++) {
+    bytes += print->print("\"");
+    bytes += print->print(names[i]);
+    bytes += print->print("\": \"");
+    bytes += print->print(values[value_index]);
+    bytes += print->print("\"");
+    if (i < streamNum - 1) { bytes += print->print(","); }
+  }
+  bytes += print->print("}");
+  if (at != NULL) {
+    bytes += print->print(",\"timestamp\":\"");
+    bytes += print->print(at);
+    bytes += print->print("\"");
+  }
+  bytes += print->print("}");
+  return bytes;
+}
+
+template <class T>
+int M2XStreamClient::postSingleDeviceUpdate(const char* deviceId, int streamNum,
+                                            const char* names[], T values[],
+                                            const char* at) {
+  if (_client->connect(_host, _port)) {
+    DBGLN("%s", "Connected to M2X server!");
+    int length = write_single_device_values(&_null_print, streamNum, names,
+                                            values, at);
+    _client->print("POST ");
+    if (_path_prefix) {
+      _client->print(_path_prefix);
+    }
+    _client->print("/v2/devices/");
+    print_encoded_string(_client, deviceId);
+    _client->println("/update HTTP/1.0");
+    writeHttpHeader(length);
+    write_single_device_values(_client, streamNum, names, values, at);
+  } else {
+    DBGLN("%s", "ERROR: Cannot connect to M2X server!");
+    return E_NOCONNECTION;
+  }
+  return readStatusCode(true);
+}
+
+template <class T>
 static int write_location_data(Print* print, const char* name,
                                T latitude, T longitude,
                                T elevation) {