ARM mbed M2X API Client: The ARM mbed client library is used to send/receive data to/from AT&T's M2X service from mbed LPC1768 microcontrollers.

Dependents:   m2x-demo-all M2X_MTS_ACCEL_DEMO M2X_MTS_Accel M2X_K64F_ACCEL ... more

Revision:
19:4dfa28d37b8f
Parent:
17:9db4a86b876a
--- a/M2XStreamClient.cpp	Mon Dec 28 14:30:33 2015 +0000
+++ b/M2XStreamClient.cpp	Fri Jan 01 15:48:18 2016 +0000
@@ -1,147 +1,527 @@
 #include "M2XStreamClient.h"
 
-static int fill_iso8601_timestamp(int32_t seconds, int32_t milli,
-                                  char* buffer, int* length);
+#include <jsonlite.h>
+
+#include "StreamParseFunctions.h"
+#include "LocationParseFunctions.h"
+
+const char* M2XStreamClient::kDefaultM2XHost = "api-m2x.att.com";
+
+static int write_delete_values(Print* print, const char* from, const char* end);
+int print_encoded_string(Print* print, const char* str);
+int tolower(int ch);
+
+#if defined(ARDUINO_PLATFORM) || defined(MBED_PLATFORM)
+int tolower(int ch)
+{
+  // Arduino and mbed use ASCII table, so we can simplify the implementation
+  if ((ch >= 'A') && (ch <= 'Z')) {
+    return (ch + 32);
+  }
+  return ch;
+}
+#else
+// For other platform, we use libc's tolower by default
+#include <ctype.h>
+#endif
 
-TimeService::TimeService(M2XStreamClient* client) : _client(client) {
+M2XStreamClient::M2XStreamClient(Client* client,
+                                 const char* key,
+                                 int case_insensitive,
+                                 const char* host,
+                                 int port,
+                                 const char* path_prefix) : _client(client),
+                                             _key(key),
+                                             _case_insensitive(case_insensitive),
+                                             _host(host),
+                                             _port(port),
+                                             _path_prefix(path_prefix),
+                                             _null_print() {
+}
+
+int M2XStreamClient::listStreamValues(const char* deviceId, const char* streamName,
+                                      stream_value_read_callback callback, void* context,
+                                      const char* query) {
+  if (_client->connect(_host, _port)) {
+    DBGLN("%s", "Connected to M2X server!");
+    _client->print("GET ");
+    if (_path_prefix)
+        _client->print(_path_prefix);
+    _client->print("/v2/devices/");
+    print_encoded_string(_client, deviceId);
+    _client->print("/streams/");
+    print_encoded_string(_client, streamName);
+    _client->print("/values.json");
+
+    if (query) {
+      if (query[0] != '?') {
+        _client->print('?');
+      }
+      _client->print(query);
+    }
+
+    _client->println(" HTTP/1.0");
+    writeHttpHeader(-1);
+  } else {
+    DBGLN("%s", "ERROR: Cannot connect to M2X server!");
+    return E_NOCONNECTION;
+  }
+  int status = readStatusCode(false);
+  if (status == 200) {
+    readStreamValue(callback, context);
+  }
+
+  close();
+  return status;
 }
 
-int TimeService::init() {
-  _timer.start();
-  return reset();
+int M2XStreamClient::readLocation(const char* deviceId,
+                                  location_read_callback callback,
+                                  void* context) {
+  if (_client->connect(_host, _port)) {
+    DBGLN("%s", "Connected to M2X server!");
+    _client->print("GET ");
+    if (_path_prefix)
+        _client->print(_path_prefix);
+    _client->print("/v2/devices/");
+    print_encoded_string(_client, deviceId);
+    _client->println("/location HTTP/1.0");
+
+    writeHttpHeader(-1);
+  } else {
+    DBGLN("%s", "ERROR: Cannot connect to M2X server!");
+    return E_NOCONNECTION;
+  }
+  int status = readStatusCode(false);
+  if (status == 200) {
+    readLocation(callback, context);
+  }
+
+  close();
+  return status;
+}
+
+int M2XStreamClient::deleteValues(const char* deviceId, const char* streamName,
+                                  const char* from, const char* end) {
+  if (_client->connect(_host, _port)) {
+    DBGLN("%s", "Connected to M2X server!");
+    int length = write_delete_values(&_null_print, from, end);
+    writeDeleteHeader(deviceId, streamName, length);
+    write_delete_values(_client, from, end);
+  } else {
+    DBGLN("%s", "ERROR: Cannot connect to M2X server!");
+    return E_NOCONNECTION;
+  }
+
+  return readStatusCode(true);
 }
 
-int TimeService::reset() {
-  int32_t ts;
-  int status = _client->getTimestamp32(&ts);
+int M2XStreamClient::getTimestamp32(int32_t *ts) {
+  // The maximum value of signed 64-bit integer is 0x7fffffffffffffff,
+  // which is 9223372036854775807. It consists of 19 characters, so a
+  // buffer of 20 is definitely enough here
+  int length = 20;
+  char buffer[20];
+  int status = getTimestamp(buffer, &length);
+  if (status == 200) {
+    int32_t result = 0;
+    for (int i = 0; i < length; i++) {
+      result = result * 10 + (buffer[i] - '0');
+    }
+    if (ts != NULL) { *ts = result; }
+  }
+  return status;
+}
+
+int M2XStreamClient::getTimestamp(char* buffer, int *bufferLength) {
+  if (bufferLength == NULL) { return E_INVALID; }
+  if (_client->connect(_host, _port)) {
+    DBGLN("%s", "Connected to M2X server!");
+    _client->println("GET /v2/time/seconds HTTP/1.0");
 
-  if (m2x_status_is_success(status)) {
-    _server_timestamp = ts;
-    _local_last_milli = _timer.read_ms();
+    writeHttpHeader(-1);
+  } else {
+    DBGLN("%s", "ERROR: Cannot connect to M2X server!");
+    return E_NOCONNECTION;
   }
+  int status = readStatusCode(false);
+  if (status == 200) {
+    int length = readContentLength();
+    if (length < 0) {
+      close();
+      return length;
+    }
+    if (*bufferLength < length) {
+      *bufferLength = length;
+      return E_BUFFER_TOO_SMALL;
+    }
+    *bufferLength = length;
+    int index = skipHttpHeader();
+    if (index != E_OK) {
+      close();
+      return index;
+    }
+    index = 0;
+    while (index < length) {
+      DBG("%s", "Received Data: ");
+      while ((index < length) && _client->available()) {
+        buffer[index++] = _client->read();
+        DBG("%c", buffer[index - 1]);
+      }
+      DBGLNEND;
 
+      if ((!_client->connected()) &&
+          (index < length)) {
+        close();
+        return E_NOCONNECTION;
+      }
+
+      delay(200);
+    }
+  }
+  close();
   return status;
 }
 
-int TimeService::getTimestamp(char* buffer, int* length) {
-  uint32_t now = _timer.read_ms();
-  if (now < _local_last_milli) {
-    // In case of a timestamp overflow, we reset the server timestamp recorded.
-    // Notice that unlike Arduino, mbed would overflow every 30 minutes,
-    // so if 2 calls to this API are more than 30 minutes apart, we are
-    // likely to run into troubles. This is a limitation of the current
-    // mbed platform. However, we argue that it might be a rare case that
-    // 2 calls to this are 30 minutes apart. In most cases, we would call
-    // this API every few seconds or minutes, this won't be a huge problem.
-    // However, if you have a use case that would require 2 intervening
-    // calls to this be 30 minutes apart, you might want to leverage a RTC
-    // clock instead of the simple ticker here.
-    int status = reset();
-    if (!m2x_status_is_success(status)) { return status; }
-    now = _timer.read_ms();
+static int write_delete_values(Print* print, const char* from,
+                               const char* end) {
+  int bytes = 0;
+  bytes += print->print("{\"from\":\"");
+  bytes += print->print(from);
+  bytes += print->print("\",\"end\":\"");
+  bytes += print->print(end);
+  bytes += print->print("\"}");
+  return bytes;
+}
+
+// Encodes and prints string using Percent-encoding specified
+// in RFC 1738, Section 2.2
+int print_encoded_string(Print* print, const char* str) {
+  int bytes = 0;
+  for (int i = 0; str[i] != 0; i++) {
+    if (((str[i] >= 'A') && (str[i] <= 'Z')) ||
+        ((str[i] >= 'a') && (str[i] <= 'z')) ||
+        ((str[i] >= '0') && (str[i] <= '9')) ||
+        (str[i] == '-') || (str[i] == '_') ||
+        (str[i] == '.') || (str[i] == '~')) {
+      bytes += print->print(str[i]);
+    } else {
+      // Encode all other characters
+      bytes += print->print('%');
+      bytes += print->print(HEX(str[i] / 16));
+      bytes += print->print(HEX(str[i] % 16));
+    }
   }
-  if (now < _local_last_milli) {
-    // We have already reseted the timestamp, so this cannot happen,
-    // an HTTP request cannot last 30 minutes, something else must be wrong
-    // here.
-    return E_TIMESTAMP_ERROR;
-  }
-  uint32_t diff = now - _local_last_milli;
-  _local_last_milli = now;
-  _server_timestamp += (int32_t) (diff / 1000); // Milliseconds to seconds
-  return fill_iso8601_timestamp(_server_timestamp, (int32_t) (diff % 1000),
-                                buffer, length);
+  return bytes;
+}
+
+void M2XStreamClient::writePutHeader(const char* deviceId,
+                                     const char* streamName,
+                                     int contentLength) {
+  _client->print("PUT ");
+  if (_path_prefix)
+    _client->print(_path_prefix);
+  _client->print("/v2/devices/");
+  print_encoded_string(_client, deviceId);
+  _client->print("/streams/");
+  print_encoded_string(_client, streamName);
+  _client->println("/value HTTP/1.0");
+
+  writeHttpHeader(contentLength);
+}
+
+void M2XStreamClient::writeDeleteHeader(const char* deviceId,
+                                        const char* streamName,
+                                        int contentLength) {
+  _client->print("DELETE ");
+  if (_path_prefix)
+    _client->print(_path_prefix);
+  _client->print("/v2/devices/");
+  print_encoded_string(_client, deviceId);
+  _client->print("/streams/");
+  print_encoded_string(_client, streamName);
+  _client->print("/values");
+  _client->println(" HTTP/1.0");
+
+  writeHttpHeader(contentLength);
 }
 
-#define SIZE_ISO_8601 25
-static inline bool is_leap_year(int16_t y) {
-  return ((1970 + y) > 0) &&
-      !((1970 + y) % 4) &&
-      (((1970 + y) % 100) || !((1970 + y) % 400));
+void M2XStreamClient::writeHttpHeader(int contentLength) {
+  _client->println(USER_AGENT);
+  _client->print("X-M2X-KEY: ");
+  _client->println(_key);
+
+  _client->print("Host: ");
+  print_encoded_string(_client, _host);
+  if (_port != kDefaultM2XPort) {
+    _client->print(":");
+    // port is an integer, does not need encoding
+    _client->print(_port);
+  }
+  _client->println();
+
+  if (contentLength > 0) {
+    _client->println("Content-Type: application/json");
+    DBG("%s", "Content Length: ");
+    DBGLN("%d", contentLength);
+
+    _client->print("Content-Length: ");
+    _client->println(contentLength);
+  }
+  _client->println();
 }
-static inline int32_t days_in_year(int16_t y) {
-  return is_leap_year(y) ? 366 : 365;
-}
-static const uint8_t MONTH_DAYS[]={31,28,31,30,31,30,31,31,30,31,30,31};
+
+int M2XStreamClient::waitForString(const char* str) {
+  int currentIndex = 0;
+  if (str[currentIndex] == '\0') return E_OK;
+
+  while (true) {
+    while (_client->available()) {
+      char c = _client->read();
+      DBG("%c", c);
+
+      int cmp;
+      if (_case_insensitive) {
+        cmp = tolower(c) - tolower(str[currentIndex]);
+      } else {
+        cmp = c - str[currentIndex];
+      }
 
-static int fill_iso8601_timestamp(int32_t timestamp, int32_t milli,
-                                  char* buffer, int* length) {
-  int16_t year;
-  int8_t month, month_length;
-  int32_t day;
-  int8_t hour, minute, second;
+      if ((str[currentIndex] == '*') || (cmp == 0)) {
+        currentIndex++;
+        if (str[currentIndex] == '\0') {
+          return E_OK;
+        }
+      } else {
+        // start from the beginning
+        currentIndex = 0;
+      }
+    }
+
+    if (!_client->connected()) {
+      DBGLN("%s", "ERROR: The client is disconnected from the server!");
+
+      close();
+      return E_DISCONNECTED;
+    }
+
+    delay(1000);
+  }
+  // never reached here
+  return E_NOTREACHABLE;
+}
 
-  if (*length < SIZE_ISO_8601) {
-    *length = SIZE_ISO_8601;
-    return E_BUFFER_TOO_SMALL;
+int M2XStreamClient::readStatusCode(bool closeClient) {
+  int responseCode = 0;
+  int ret = waitForString("HTTP/*.* ");
+  if (ret != E_OK) {
+    if (closeClient) close();
+    return ret;
+  }
+
+  // ret is not needed from here(since it must be E_OK), so we can use it
+  // as a regular variable now.
+  ret = 0;
+  while (true) {
+    while (_client->available()) {
+      char c = _client->read();
+      DBG("%c", c);
+
+      responseCode = responseCode * 10 + (c - '0');
+      ret++;
+      if (ret == 3) {
+        if (closeClient) close();
+        return responseCode;
+      }
+    }
+
+    if (!_client->connected()) {
+      DBGLN("%s", "ERROR: The client is disconnected from the server!");
+
+      if (closeClient) close();
+      return E_DISCONNECTED;
+    }
+
+    delay(1000);
   }
 
-  second = timestamp % 60;
-  timestamp /= 60; // now it is minutes
+  // never reached here
+  return E_NOTREACHABLE;
+}
 
-  minute = timestamp % 60;
-  timestamp /= 60; // now it is hours
-
-  hour = timestamp % 24;
-  timestamp /= 24; // now it is days
+int M2XStreamClient::readContentLength() {
+  int ret = waitForString("Content-Length: ");
+  if (ret != E_OK) {
+    return ret;
+  }
 
-  year = 0;
-  day = 0;
-  while ((day += days_in_year(year)) <= timestamp) {
-    year++;
-  }
-  day -= days_in_year(year);
-  timestamp -= day; // now it is days in this year, starting at 0
+  // From now on, ret is not needed, we can use it
+  // to keep the final result
+  ret = 0;
+  while (true) {
+    while (_client->available()) {
+      char c = _client->read();
+      DBG("%c", c);
 
-  day = 0;
-  month_length = 0;
-  for (month = 0; month < 12; month++) {
-    if (month == 1) {
-      // February
-      month_length = is_leap_year(year) ? 29 : 28;
-    } else {
-      month_length = MONTH_DAYS[month];
+      if ((c == '\r') || (c == '\n')) {
+        return (ret == 0) ? (E_INVALID) : (ret);
+      } else {
+        ret = ret * 10 + (c - '0');
+      }
+    }
+
+    if (!_client->connected()) {
+      DBGLN("%s", "ERROR: The client is disconnected from the server!");
+
+      return E_DISCONNECTED;
     }
 
-    if (timestamp >= month_length) {
-      timestamp -= month_length;
-    } else {
-      break;
-    }
+    delay(1000);
   }
-  year = 1970 + year;
-  month++; // offset by 1
-  day = timestamp + 1;
+
+  // never reached here
+  return E_NOTREACHABLE;
+}
+
+int M2XStreamClient::skipHttpHeader() {
+  return waitForString("\n\r\n");
+}
+
+void M2XStreamClient::close() {
+  // Eats up buffered data before closing
+  _client->flush();
+  _client->stop();
+}
+
+int M2XStreamClient::readStreamValue(stream_value_read_callback callback,
+                                     void* context) {
+  const int BUF_LEN = 64;
+  char buf[BUF_LEN];
+
+  int length = readContentLength();
+  if (length < 0) {
+    close();
+    return length;
+  }
 
-  int i = 0, j = 0;
+  int index = skipHttpHeader();
+  if (index != E_OK) {
+    close();
+    return index;
+  }
+  index = 0;
+
+  stream_parsing_context_state state;
+  state.state = state.index = 0;
+  state.callback = callback;
+  state.context = context;
+
+  jsonlite_parser_callbacks cbs = jsonlite_default_callbacks;
+  cbs.key_found = on_stream_key_found;
+  cbs.number_found = on_stream_number_found;
+  cbs.string_found = on_stream_string_found;
+  cbs.context.client_state = &state;
 
-  // NOTE: It seems the snprintf implementation in Arduino has bugs,
-  // we have to manually piece the string together here.
-#define INT_TO_STR(v_, width_) \
-  for (j = 0; j < (width_); j++) { \
-    buffer[i + (width_) - 1 - j] = '0' + ((v_) % 10); \
-    (v_) /= 10; \
-  } \
-  i += (width_)
+  jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5));
+  jsonlite_parser_set_callback(p, &cbs);
+
+  jsonlite_result result = jsonlite_result_unknown;
+  while (index < length) {
+    int i = 0;
+
+    DBG("%s", "Received Data: ");
+    while ((i < BUF_LEN) && _client->available()) {
+      buf[i++] = _client->read();
+      DBG("%c", buf[i - 1]);
+    }
+    DBGLNEND;
+
+    if ((!_client->connected()) &&
+        (!_client->available()) &&
+        ((index + i) < length)) {
+      jsonlite_parser_release(p);
+      close();
+      return E_NOCONNECTION;
+    }
 
-  INT_TO_STR(year, 4);
-  buffer[i++] = '-';
-  INT_TO_STR(month, 2);
-  buffer[i++] = '-';
-  INT_TO_STR(day, 2);
-  buffer[i++] = 'T';
-  INT_TO_STR(hour, 2);
-  buffer[i++] = ':';
-  INT_TO_STR(minute, 2);
-  buffer[i++] = ':';
-  INT_TO_STR(second, 2);
-  buffer[i++] = '.';
-  INT_TO_STR(milli, 3);
-  buffer[i++] = 'Z';
-  buffer[i++] = '\0';
+    result = jsonlite_parser_tokenize(p, buf, i);
+    if ((result != jsonlite_result_ok) &&
+        (result != jsonlite_result_end_of_stream)) {
+      jsonlite_parser_release(p);
+      close();
+      return E_JSON_INVALID;
+    }
+
+    index += i;
+  }
+
+  jsonlite_parser_release(p);
+  close();
+  return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID);
+}
+
+int M2XStreamClient::readLocation(location_read_callback callback,
+                                  void* context) {
+  const int BUF_LEN = 40;
+  char buf[BUF_LEN];
+
+  int length = readContentLength();
+  if (length < 0) {
+    close();
+    return length;
+  }
+
+  int index = skipHttpHeader();
+  if (index != E_OK) {
+    close();
+    return index;
+  }
+  index = 0;
+
+  location_parsing_context_state state;
+  state.state = state.index = 0;
+  state.callback = callback;
+  state.context = context;
 
-#undef INT_TO_STR
+  jsonlite_parser_callbacks cbs = jsonlite_default_callbacks;
+  cbs.key_found = on_location_key_found;
+  cbs.string_found = on_location_string_found;
+  cbs.context.client_state = &state;
+
+  jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5));
+  jsonlite_parser_set_callback(p, &cbs);
+
+  jsonlite_result result = jsonlite_result_unknown;
+  while (index < length) {
+    int i = 0;
+
+    DBG("%s", "Received Data: ");
+    while ((i < BUF_LEN) && _client->available()) {
+      buf[i++] = _client->read();
+      DBG("%c", buf[i - 1]);
+    }
+    DBGLNEND;
 
-  *length = i;
-  return E_OK;
+    if ((!_client->connected()) &&
+        (!_client->available()) &&
+        ((index + i) < length)) {
+      jsonlite_parser_release(p);
+      close();
+      return E_NOCONNECTION;
+    }
+
+    result = jsonlite_parser_tokenize(p, buf, i);
+    if ((result != jsonlite_result_ok) &&
+        (result != jsonlite_result_end_of_stream)) {
+      jsonlite_parser_release(p);
+      close();
+      return E_JSON_INVALID;
+    }
+
+    index += i;
+  }
+
+  jsonlite_parser_release(p);
+  close();
+  return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID);
 }