Modified M2XStreamClient

Fork of M2XStreamClient-JMF by Jim Flynn

Revision:
22:4d895e732765
Parent:
21:6878944d2ce2
Child:
23:f32837239193
--- a/M2XStreamClient.h	Sat Jan 02 02:29:43 2016 +0000
+++ b/M2XStreamClient.h	Wed Jun 15 12:36:55 2016 +0000
@@ -1,46 +1,38 @@
 #ifndef M2XStreamClient_h
 #define M2XStreamClient_h
 
-#define MIN(a, b) (((a) > (b))?(b):(a))
-
-#define MBED_PLATFORM
-
-#ifdef ARDUINO_PLATFORM
-#include "Arduino.h"
-
-#define USER_AGENT "User-Agent: M2X Arduino Client/2.0.2"
-#endif
-
-#ifdef MBED_PLATFORM
-#include "mbed.h"
-
-#define USER_AGENT "User-Agent: M2X Mbed Client/2.1.2"
+#if (!defined(ARDUINO_PLATFORM)) && (!defined(ESP8266_PLATFORM)) && (!defined(MBED_PLATFORM))
+#error "Platform definition is missing!"
 #endif
 
-#include "Client.h"
-#include "NullPrint.h"
+#define M2X_VERSION "2.2.0"
 
-#ifdef DEBUG
 #ifdef ARDUINO_PLATFORM
-#define DBG(fmt_, data_) Serial.print(data_)
-#define DBGLN(fmt_, data_) Serial.println(data_)
-#define DBGLNEND Serial.println()
-#endif  // ARDUINO_PLATFORM
+#include "m2x-arduino.h"
+#endif  /* ARDUINO_PLATFORM */
+
+#ifdef ESP8266_PLATFORM
+#include "m2x-esp8266.h"
+#endif /* ESP8266_PLATFORM */
 
 #ifdef MBED_PLATFORM
-#define DBG(fmt_, data_) printf((fmt_), (data_))
-#define DBGLN(fmt_, data_) printf((fmt_), (data_)); printf("\n")
-#define DBGLNEND printf("\n")
-#endif  // MBED_PLATFORM
-#else
+#include "m2x-mbed.h"
+#endif /* MBED_PLATFORM */
+
+/* If we don't have DBG defined, provide dump implementation */
+#ifndef DBG
 #define DBG(fmt_, data_)
 #define DBGLN(fmt_, data_)
 #define DBGLNEND
-#endif  // DEBUG
+#endif  /* DBG */
 
-#define HEX(t_) ((char) (((t_) > 9) ? ((t_) - 10 + 'A') : ((t_) + '0')))
+#define MIN(a, b) (((a) > (b))?(b):(a))
+#define TO_HEX(t_) ((char) (((t_) > 9) ? ((t_) - 10 + 'A') : ((t_) + '0')))
 #define MAX_DOUBLE_DIGITS 7
 
+/* For tolower */
+#include <ctype.h>
+
 static const int E_OK = 0;
 static const int E_NOCONNECTION = -1;
 static const int E_DISCONNECTED = -2;
@@ -50,6 +42,9 @@
 static const int E_BUFFER_TOO_SMALL = -6;
 static const int E_TIMESTAMP_ERROR = -8;
 
+static const char* DEFAULT_M2X_HOST = "api-m2x.att.com";
+static const int DEFAULT_M2X_PORT = 80;
+
 static inline bool m2x_status_is_success(int status) {
   return (status == E_OK) || (status >= 200 && status <= 299);
 }
@@ -67,6 +62,44 @@
       m2x_status_is_server_error(status);
 }
 
+// Null Print class used to calculate length to print
+class NullPrint : public Print {
+public:
+  size_t counter;
+
+  virtual size_t write(uint8_t b) {
+    counter++;
+    return 1;
+  }
+
+  virtual size_t write(const uint8_t* buf, size_t size) {
+    counter += size;
+    return size;
+  }
+};
+
+// Encodes and prints string using Percent-encoding specified
+// in RFC 1738, Section 2.2
+static inline int print_encoded_string(Print* print, const char* str) {
+  int bytes = 0;
+  for (int i = 0; str[i] != 0; i++) {
+    if (((str[i] >= 'A') && (str[i] <= 'Z')) ||
+        ((str[i] >= 'a') && (str[i] <= 'z')) ||
+        ((str[i] >= '0') && (str[i] <= '9')) ||
+        (str[i] == '-') || (str[i] == '_') ||
+        (str[i] == '.') || (str[i] == '~')) {
+      bytes += print->print(str[i]);
+    } else {
+      // Encode all other characters
+      bytes += print->print('%');
+      bytes += print->print(TO_HEX(str[i] / 16));
+      bytes += print->print(TO_HEX(str[i] % 16));
+    }
+  }
+  return bytes;
+}
+
+#ifdef M2X_ENABLE_READER
 /*
  * +type+ indicates the value type: 1 for string, 2 for number
  * NOTE that the value type here only contains a hint on how
@@ -88,19 +121,27 @@
                                        int index,
                                        void* context);
 
+typedef void (*m2x_command_read_callback)(const char* id,
+                                          const char* name,
+                                          int index,
+                                          void *context);
+#endif  /* M2X_ENABLE_READER */
+
+typedef void (*m2x_fill_data_callback)(Print *print, void *context);
+
 class M2XStreamClient {
 public:
-  static const char* kDefaultM2XHost;
-  static const int kDefaultM2XPort = 80;
-
   M2XStreamClient(Client* client,
                   const char* key,
+                  void (* idlefunc)(void) = NULL,
                   int case_insensitive = 1,
-                  const char* host = kDefaultM2XHost,
-                  int port = kDefaultM2XPort,
+                  const char* host = DEFAULT_M2X_HOST,
+                  int port = DEFAULT_M2X_PORT,
                   const char* path_prefix = NULL);
 
   // Push data stream value using PUT request, returns the HTTP status code
+  // NOTE: if you want to update by a serial, use "serial/<serial ID>" as
+  // the device ID here.
   template <class T>
   int updateStreamValue(const char* deviceId, const char* streamName, T value);
 
@@ -122,6 +163,8 @@
   // stream, the succeeding +counts[1]+ number of items contain values
   // for the second stream, etc. The length of this array should be
   // the sum of all values in +counts+ array.
+  // NOTE: if you want to update by a serial, use "serial/<serial ID>" as
+  // the device ID here.
   template <class T>
   int postDeviceUpdates(const char* deviceId, int streamNum,
                         const char* names[], const int counts[],
@@ -136,11 +179,14 @@
   // be exactly +streamNum+. Notice that the array of +values+ should
   // match the array of +names+, and that the ith value in +values+ is
   // exactly the value to post for the ith stream name in +names+
+  // NOTE: if you want to update by a serial, use "serial/<serial ID>" as
+  // the device ID here.
   template <class T>
-  int postSingleDeviceUpdate(const char* deviceId, int streamNum,
-                             const char* names[], T values[],
-                             const char* at = NULL);
+  int postDeviceUpdate(const char* deviceId, int streamNum,
+                       const char* names[], T values[],
+                       const char* at = NULL);
 
+#ifdef M2X_ENABLE_READER
   // Fetch values for a particular data stream. Since memory is
   // very limited on an Arduino, we cannot parse and get all the
   // data points in memory. Instead, we use callbacks here: whenever
@@ -156,6 +202,7 @@
   int listStreamValues(const char* deviceId, const char* streamName,
                        stream_value_read_callback callback, void* context,
                        const char* query = NULL);
+#endif  /* M2X_ENABLE_READER */
 
   // Update datasource location
   // NOTE: On an Arduino Uno and other ATMEGA based boards, double has
@@ -171,15 +218,19 @@
   // precision, which means you are free to use the double-version only
   // without any precision problems.
   // Returned value is the http status code.
+  // NOTE: if you want to update by a serial, use "serial/<serial ID>" as
+  // the device ID here.
   template <class T>
   int updateLocation(const char* deviceId, const char* name,
                      T latitude, T longitude, T elevation);
 
+#ifdef M2X_ENABLE_READER
   // Read location information for a device. Also used callback to process
   // data points for memory reasons. The HTTP status code is returned,
   // response is only parsed when the HTTP status code is 200
   int readLocation(const char* deviceId, location_read_callback callback,
                    void* context);
+#endif  /* M2X_ENABLE_READER */
 
   // Delete values from a data stream
   // You will need to provide from and end date/time strings in the ISO8601
@@ -198,6 +249,53 @@
   int deleteValues(const char* deviceId, const char* streamName,
                    const char* from, const char* end);
 
+#ifdef M2X_ENABLE_READER
+  // Fetch commands available for this device, notice that for memory constraints,
+  // we only keep ID and name of the command received here.
+  // You can tweak the command receiving via the query parameter.
+  int listCommands(const char* deviceId,
+                   m2x_command_read_callback callback, void* context,
+                   const char* query = NULL);
+#endif  /* M2X_ENABLE_READER */
+
+  // Mark a command as processed.
+  // Link: https://m2x.att.com/developer/documentation/v2/commands#Device-Marks-a-Command-as-Processed
+  // To make sure the minimal amount of memory is needed, this API works with
+  // a callback function. The callback function is then used to fill the request
+  // data, note that in order to correctly set the content length in HTTP header,
+  // the callback function will be called twice, the caller must make sure both
+  // calls fill the Print object with exactly the same data.
+  // If you have a pre-allocated buffer filled with the data to post, you can
+  // use markCommandProcessedWithData API below.
+  int markCommandProcessed(const char* deviceId, const char* commandId,
+                           m2x_fill_data_callback callback, void *context);
+
+  // Mark a command as processed with a data buffer
+  // Link: https://m2x.att.com/developer/documentation/v2/commands#Device-Marks-a-Command-as-Processed
+  // This is exactly like markCommandProcessed, except that a buffer is use to
+  // contain the data to post as the request body.
+  int markCommandProcessedWithData(const char* deviceId, const char* commandId,
+                                   const char* data);
+
+  // Mark a command as rejected.
+  // Link: https://m2x.att.com/developer/documentation/v2/commands#Device-Marks-a-Command-as-Rejected
+  // To make sure the minimal amount of memory is needed, this API works with
+  // a callback function. The callback function is then used to fill the request
+  // data, note that in order to correctly set the content length in HTTP header,
+  // the callback function will be called twice, the caller must make sure both
+  // calls fill the Print object with exactly the same data.
+  // If you have a pre-allocated buffer filled with the data to post, you can
+  // use markCommandRejectedWithData API below.
+  int markCommandRejected(const char* deviceId, const char* commandId,
+                          m2x_fill_data_callback callback, void *context);
+
+  // Mark a command as rejected with a data buffer
+  // Link: https://m2x.att.com/developer/documentation/v2/commands#Device-Marks-a-Command-as-Rejected
+  // This is exactly like markCommandRejected, except that a buffer is use to
+  // contain the data to post as the request body.
+  int markCommandRejectedWithData(const char* deviceId, const char* commandId,
+                                  const char* data);
+
   // Fetches current timestamp in seconds from M2X server. Since we
   // are using signed 32-bit integer as return value, this will only
   // return valid results before 03:14:07 UTC on 19 January 2038. If
@@ -252,6 +350,7 @@
   int _case_insensitive;
   const char* _host;
   int _port;
+  void (* _idlefunc)(void);
   const char* _path_prefix;
   NullPrint _null_print;
 
@@ -282,15 +381,1275 @@
   int waitForString(const char* str);
   // Closes the connection
   void close();
+
+#ifdef M2X_ENABLE_READER
   // Parses JSON response of stream value API, and calls callback function
   // once we get a data point
   int readStreamValue(stream_value_read_callback callback, void* context);
   // Parses JSON response of location API, and calls callback function once
   // we get a data point
   int readLocation(location_read_callback callback, void* context);
+  // Parses JSON response of command API, and calls callback function once
+  // we get a data point
+  int readCommand(m2x_command_read_callback callback, void* context);
+#endif  /* M2X_ENABLE_READER */
+};
+
+
+// A ISO8601 timestamp generation service for M2X.
+// It uses the Time API provided by the M2X server to initialize
+// clock, then uses millis() function provided by Arduino to calculate
+// time advancements so as to reduce API query times.
+//
+// Right now, this service only works with 32-bit timestamp, meaning that
+// this service won't work after 03:14:07 UTC on 19 January 2038. However,
+// a similar service that uses 64-bit timestamp can be implemented following
+// the logic here.
+class TimeService {
+public:
+  TimeService(M2XStreamClient* client);
+
+  // Initialize the time service. Notice the TimeService instance is only
+  // working after calling this function successfully.
+  int init();
+
+  // Reset the internal recorded time by calling M2X Time API again. Normally,
+  // you don't need to call this manually. TimeService will handle Arduino clock
+  // overflow automatically
+  int reset();
+
+  // Fills ISO8601 formatted timestamp into the buffer provided. +length+ should
+  // contains the maximum supported length of the buffer when calling. For now,
+  // the buffer should be able to store 25 characters for a full ISO8601 formatted
+  // timestamp, otherwise, an error will be returned.
+  int getTimestamp(char* buffer, int* length);
+private:
+  M2XStreamClient* _client;
+  int32_t _server_timestamp;
+  uint32_t _local_last_milli;
+  M2XTimer _timer;
 };
 
-#include "M2XStreamClient_template.h"
-#include "TimeService.h"
+
+// Implementations
+M2XStreamClient::M2XStreamClient(Client* client,
+                                 const char* key,
+                                 void (* idlefunc)(void),
+                                 int case_insensitive,
+                                 const char* host,
+                                 int port,
+                                 const char* path_prefix) : _client(client),
+                                             _key(key),
+                                             _idlefunc(idlefunc),
+                                             _case_insensitive(case_insensitive),
+                                             _host(host),
+                                             _port(port),
+                                             _path_prefix(path_prefix),
+                                             _null_print() {
+}
+
+template <class T>
+int M2XStreamClient::updateStreamValue(const char* deviceId, const char* streamName, T value) {
+  if (_client->connect(_host, _port)) {
+    DBGLN("%s", "Connected to M2X server!");
+    writePutHeader(deviceId, streamName,
+                   //  for {"value": and }
+                   _null_print.print(value) + 12);
+    _client->print("{\"value\":\"");
+    _client->print(value);
+    _client->print("\"}");
+  } else {
+    DBGLN("%s", "ERROR: Cannot connect to M2X server!");
+    return E_NOCONNECTION;
+  }
+
+  return readStatusCode(true);
+}
+
+template <class T>
+inline int write_multiple_values(Print* print, int streamNum,
+                                 const char* names[], const int counts[],
+                                 const char* ats[], T values[]) {
+  int bytes = 0, value_index = 0;
+  bytes += print->print("{\"values\":{");
+  for (int i = 0; i < streamNum; i++) {
+    bytes += print->print("\"");
+    bytes += print->print(names[i]);
+    bytes += print->print("\":[");
+    for (int j = 0; j < counts[i]; j++) {
+      bytes += print->print("{\"timestamp\": \"");
+      bytes += print->print(ats[value_index]);
+      bytes += print->print("\",\"value\": \"");
+      bytes += print->print(values[value_index]);
+      bytes += print->print("\"}");
+      if (j < counts[i] - 1) { bytes += print->print(","); }
+      value_index++;
+    }
+    bytes += print->print("]");
+    if (i < streamNum - 1) { bytes += print->print(","); }
+  }
+  bytes += print->print("}}");
+  return bytes;
+}
+
+template <class T>
+int M2XStreamClient::postDeviceUpdates(const char* deviceId, int streamNum,
+                                       const char* names[], const int counts[],
+                                       const char* ats[], T values[]) {
+  if (_client->connect(_host, _port)) {
+    DBGLN("%s", "Connected to M2X server!");
+    int length = write_multiple_values(&_null_print, streamNum, names,
+                                       counts, ats, values);
+    _client->print("POST ");
+    if (_path_prefix) { _client->print(_path_prefix); }
+    _client->print("/v2/devices/");
+    _client->print(deviceId);
+    _client->println("/updates HTTP/1.0");
+    writeHttpHeader(length);
+    write_multiple_values(_client, streamNum, names, counts, ats, values);
+  } else {
+    DBGLN("%s", "ERROR: Cannot connect to M2X server!");
+    return E_NOCONNECTION;
+  }
+  return readStatusCode(true);
+}
+
+template <class T>
+inline int write_single_device_values(Print* print, int streamNum,
+                                      const char* names[], T values[],
+                                      const char* at) {
+  int bytes = 0;
+  bytes += print->print("{\"values\":{");
+  for (int i = 0; i < streamNum; i++) {
+    bytes += print->print("\"");
+    bytes += print->print(names[i]);
+    bytes += print->print("\": \"");
+    bytes += print->print(values[i]);
+    bytes += print->print("\"");
+    if (i < streamNum - 1) { bytes += print->print(","); }
+  }
+  bytes += print->print("}");
+  if (at != NULL) {
+    bytes += print->print(",\"timestamp\":\"");
+    bytes += print->print(at);
+    bytes += print->print("\"");
+  }
+  bytes += print->print("}");
+  return bytes;
+}
+
+template <class T>
+int M2XStreamClient::postDeviceUpdate(const char* deviceId, int streamNum,
+                                      const char* names[], T values[],
+                                      const char* at) {
+  if (_client->connect(_host, _port)) {
+    DBGLN("%s", "Connected to M2X server!");
+    int length = write_single_device_values(&_null_print, streamNum, names,
+                                            values, at);
+    _client->print("POST ");
+    if (_path_prefix) { _client->print(_path_prefix); }
+    _client->print("/v2/devices/");
+    _client->print(deviceId);
+    _client->println("/update HTTP/1.0");
+    writeHttpHeader(length);
+    write_single_device_values(_client, streamNum, names, values, at);
+  } else {
+    DBGLN("%s", "ERROR: Cannot connect to M2X server!");
+    return E_NOCONNECTION;
+  }
+  return readStatusCode(true);
+}
+
+template <class T>
+static int write_location_data(Print* print, const char* name,
+                               T latitude, T longitude,
+                               T elevation) {
+  int bytes = 0;
+  bytes += print->print("{\"name\":\"");
+  bytes += print->print(name);
+  bytes += print->print("\",\"latitude\":\"");
+  bytes += print->print(latitude);
+  bytes += print->print("\",\"longitude\":\"");
+  bytes += print->print(longitude);
+  bytes += print->print("\",\"elevation\":\"");
+  bytes += print->print(elevation);
+  bytes += print->print("\"}");
+  return bytes;
+}
+
+static int write_location_data(Print* print, const char* name,
+                               double latitude, double longitude,
+                               double elevation) {
+  int bytes = 0;
+  bytes += print->print("{\"name\":\"");
+  bytes += print->print(name);
+  bytes += print->print("\",\"latitude\":\"");
+  bytes += print->print(latitude, MAX_DOUBLE_DIGITS);
+  bytes += print->print("\",\"longitude\":\"");
+  bytes += print->print(longitude, MAX_DOUBLE_DIGITS);
+  bytes += print->print("\",\"elevation\":\"");
+  bytes += print->print(elevation);
+  bytes += print->print("\"}");
+  return bytes;
+}
+
+template <class T>
+int M2XStreamClient::updateLocation(const char* deviceId,
+                                    const char* name,
+                                    T latitude,
+                                    T longitude,
+                                    T elevation) {
+  if (_client->connect(_host, _port)) {
+    DBGLN("%s", "Connected to M2X server!");
+
+    int length = write_location_data(&_null_print, name, latitude, longitude,
+                                     elevation);
+    _client->print("PUT ");
+    if (_path_prefix) { _client->print(_path_prefix); }
+    _client->print("/v2/devices/");
+    _client->print(deviceId);
+    _client->println("/location HTTP/1.0");
+
+    writeHttpHeader(length);
+    write_location_data(_client, name, latitude, longitude, elevation);
+  } else {
+    DBGLN("%s", "ERROR: Cannot connect to M2X server!");
+    return E_NOCONNECTION;
+  }
+  return readStatusCode(true);
+}
+
+static inline int write_delete_values(Print* print, const char* from,
+                                      const char* end) {
+  int bytes = 0;
+  bytes += print->print("{\"from\":\"");
+  bytes += print->print(from);
+  bytes += print->print("\",\"end\":\"");
+  bytes += print->print(end);
+  bytes += print->print("\"}");
+  return bytes;
+}
+
+int M2XStreamClient::deleteValues(const char* deviceId, const char* streamName,
+                                  const char* from, const char* end) {
+  if (_client->connect(_host, _port)) {
+    DBGLN("%s", "Connected to M2X server!");
+    int length = write_delete_values(&_null_print, from, end);
+    writeDeleteHeader(deviceId, streamName, length);
+    write_delete_values(_client, from, end);
+  } else {
+    DBGLN("%s", "ERROR: Cannot connect to M2X server!");
+    return E_NOCONNECTION;
+  }
+
+  return readStatusCode(true);
+}
+
+int M2XStreamClient::markCommandProcessed(const char* deviceId,
+                                          const char* commandId,
+                                          m2x_fill_data_callback callback,
+                                          void *context) {
+  if (_client->connect(_host, _port)) {
+    DBGLN("%s", "Connected to M2X server!");
+    _null_print.counter = 0;
+    callback(&_null_print, context);
+    int length = _null_print.counter;
+    _client->print("POST ");
+    if (_path_prefix) { _client->print(_path_prefix); }
+    _client->print("/v2/devices/");
+    _client->print(deviceId);
+    _client->print("/commands/");
+    _client->print(commandId);
+    _client->print("/process");
+    _client->println(" HTTP/1.0");
+    writeHttpHeader(length);
+    callback(_client, context);
+  } else {
+    DBGLN("%s", "ERROR: Cannot connect to M2X server!");
+    return E_NOCONNECTION;
+  }
+  return readStatusCode(true);
+}
+
+static void m2x_fixed_buffer_filling_callback(Print *print, void *context) {
+  if (context) {
+    print->print((const char *) context);
+  }
+}
+
+int M2XStreamClient::markCommandProcessedWithData(const char* deviceId,
+                                                  const char* commandId,
+                                                  const char* data) {
+  return markCommandProcessed(deviceId, commandId,
+                              m2x_fixed_buffer_filling_callback, (void *) data);
+}
+
+int M2XStreamClient::markCommandRejected(const char* deviceId,
+                                         const char* commandId,
+                                         m2x_fill_data_callback callback,
+                                         void *context) {
+  if (_client->connect(_host, _port)) {
+    DBGLN("%s", "Connected to M2X server!");
+    _null_print.counter = 0;
+    callback(&_null_print, context);
+    int length = _null_print.counter;
+    _client->print("POST ");
+    if (_path_prefix) { _client->print(_path_prefix); }
+    _client->print("/v2/devices/");
+    _client->print(deviceId);
+    _client->print("/commands/");
+    _client->print(commandId);
+    _client->print("/reject");
+    _client->println(" HTTP/1.0");
+    writeHttpHeader(length);
+    callback(_client, context);
+  } else {
+    DBGLN("%s", "ERROR: Cannot connect to M2X server!");
+    return E_NOCONNECTION;
+  }
+  return readStatusCode(true);
+}
+
+int M2XStreamClient::markCommandRejectedWithData(const char* deviceId,
+                                                 const char* commandId,
+                                                 const char* data) {
+  return markCommandRejected(deviceId, commandId,
+                             m2x_fixed_buffer_filling_callback, (void *) data);
+}
+
+int M2XStreamClient::getTimestamp32(int32_t *ts) {
+  // The maximum value of signed 64-bit integer is 0x7fffffffffffffff,
+  // which is 9223372036854775807. It consists of 19 characters, so a
+  // buffer of 20 is definitely enough here
+  int length = 20;
+  char buffer[20];
+  int status = getTimestamp(buffer, &length);
+  if (status == 200) {
+    int32_t result = 0;
+    for (int i = 0; i < length; i++) {
+      result = result * 10 + (buffer[i] - '0');
+    }
+    if (ts != NULL) { *ts = result; }
+  }
+  return status;
+}
+
+int M2XStreamClient::getTimestamp(char* buffer, int *bufferLength) {
+  if (bufferLength == NULL) { return E_INVALID; }
+  if (_client->connect(_host, _port)) {
+    DBGLN("%s", "Connected to M2X server!");
+    _client->print("GET ");
+    if (_path_prefix) { _client->print(_path_prefix); }
+    _client->println("/v2/time/seconds HTTP/1.0");
+
+    writeHttpHeader(-1);
+  } else {
+    DBGLN("%s", "ERROR: Cannot connect to M2X server!");
+    return E_NOCONNECTION;
+  }
+  int status = readStatusCode(false);
+  if (status == 200) {
+    int length = readContentLength();
+    if (length < 0) {
+      close();
+      return length;
+    }
+    if (*bufferLength < length) {
+      *bufferLength = length;
+      return E_BUFFER_TOO_SMALL;
+    }
+    *bufferLength = length;
+    int index = skipHttpHeader();
+    if (index != E_OK) {
+      close();
+      return index;
+    }
+    index = 0;
+    while (index < length) {
+      DBG("%s", "Received Data: ");
+      while ((index < length) && _client->available()) {
+        buffer[index++] = _client->read();
+        DBG("%c", buffer[index - 1]);
+      }
+      DBGLNEND;
+
+      if ((!_client->connected()) &&
+          (index < length)) {
+        close();
+        return E_NOCONNECTION;
+      }
+
+      if (_idlefunc!=NULL) {
+        _idlefunc();
+      } else {
+        delay(200);
+      }
+    }
+  }
+  close();
+  return status;
+}
+
+
+void M2XStreamClient::writePutHeader(const char* deviceId,
+                                     const char* streamName,
+                                     int contentLength) {
+  _client->print("PUT ");
+  if (_path_prefix) { _client->print(_path_prefix); }
+  _client->print("/v2/devices/");
+  _client->print(deviceId);
+  _client->print("/streams/");
+  print_encoded_string(_client, streamName);
+  _client->println("/value HTTP/1.0");
+
+  writeHttpHeader(contentLength);
+}
+
+void M2XStreamClient::writeDeleteHeader(const char* deviceId,
+                                        const char* streamName,
+                                        int contentLength) {
+  _client->print("DELETE ");
+  if (_path_prefix) { _client->print(_path_prefix); }
+  _client->print("/v2/devices/");
+  _client->print(deviceId);
+  _client->print("/streams/");
+  print_encoded_string(_client, streamName);
+  _client->print("/values");
+  _client->println(" HTTP/1.0");
+
+  writeHttpHeader(contentLength);
+}
+
+void M2XStreamClient::writeHttpHeader(int contentLength) {
+  _client->println(USER_AGENT);
+  _client->print("X-M2X-KEY: ");
+  _client->println(_key);
+
+  _client->print("Host: ");
+  print_encoded_string(_client, _host);
+  if (_port != DEFAULT_M2X_PORT) {
+    _client->print(":");
+    // port is an integer, does not need encoding
+    _client->print(_port);
+  }
+  _client->println();
+
+  if (contentLength > 0) {
+    _client->println("Content-Type: application/json");
+    DBG("%s", "Content Length: ");
+    DBGLN("%d", contentLength);
+
+    _client->print("Content-Length: ");
+    _client->println(contentLength);
+  }
+  _client->println();
+}
+
+int M2XStreamClient::waitForString(const char* str) {
+  int currentIndex = 0;
+  if (str[currentIndex] == '\0') return E_OK;
+
+  while (true) {
+    while (_client->available()) {
+      char c = _client->read();
+      DBG("%c", c);
+
+      int cmp;
+      if (_case_insensitive) {
+        cmp = tolower(c) - tolower(str[currentIndex]);
+      } else {
+        cmp = c - str[currentIndex];
+      }
+
+      if ((str[currentIndex] == '*') || (cmp == 0)) {
+        currentIndex++;
+        if (str[currentIndex] == '\0') {
+          return E_OK;
+        }
+      } else {
+        // start from the beginning
+        currentIndex = 0;
+      }
+    }
+
+    if (!_client->connected()) {
+      DBGLN("%s", "ERROR: The client is disconnected from the server!");
+
+      close();
+      return E_DISCONNECTED;
+    }
+    if (_idlefunc!=NULL)
+      _idlefunc();
+    else
+      delay(200);
+  }
+  // never reached here
+  return E_NOTREACHABLE;
+}
+
+int M2XStreamClient::readStatusCode(bool closeClient) {
+  int responseCode = 0;
+  int ret = waitForString("HTTP/*.* ");
+  if (ret != E_OK) {
+    if (closeClient) close();
+    return ret;
+  }
+
+  // ret is not needed from here(since it must be E_OK), so we can use it
+  // as a regular variable now.
+  ret = 0;
+  while (true) {
+    while (_client->available()) {
+      char c = _client->read();
+      DBG("%c", c);
+
+      responseCode = responseCode * 10 + (c - '0');
+      ret++;
+      if (ret == 3) {
+        if (closeClient) close();
+        return responseCode;
+      }
+    }
+
+    if (!_client->connected()) {
+      DBGLN("%s", "ERROR: The client is disconnected from the server!");
+
+      if (closeClient) close();
+      return E_DISCONNECTED;
+    }
+    if (_idlefunc!=NULL)
+      _idlefunc();
+    else
+      delay(200);
+  }
+
+  // never reached here
+  return E_NOTREACHABLE;
+}
+
+int M2XStreamClient::readContentLength() {
+  int ret = waitForString("Content-Length: ");
+  if (ret != E_OK) {
+    return ret;
+  }
+
+  // From now on, ret is not needed, we can use it
+  // to keep the final result
+  ret = 0;
+  while (true) {
+    while (_client->available()) {
+      char c = _client->read();
+      DBG("%c", c);
+
+      if ((c == '\r') || (c == '\n')) {
+        return (ret == 0) ? (E_INVALID) : (ret);
+      } else {
+        ret = ret * 10 + (c - '0');
+      }
+    }
+
+    if (!_client->connected()) {
+      DBGLN("%s", "ERROR: The client is disconnected from the server!");
+
+      return E_DISCONNECTED;
+    }
+    if (_idlefunc!=NULL)
+      _idlefunc();
+    else
+      delay(200);
+  }
+
+  // never reached here
+  return E_NOTREACHABLE;
+}
+
+int M2XStreamClient::skipHttpHeader() {
+  return waitForString("\n\r\n");
+}
+
+void M2XStreamClient::close() {
+  // Eats up buffered data before closing
+  _client->flush();
+  _client->stop();
+}
+
+static inline int fill_iso8601_timestamp(int32_t seconds, int32_t milli,
+                                         char* buffer, int* length);
+
+TimeService::TimeService(M2XStreamClient* client) : _client(client) {
+}
+
+int TimeService::init() {
+  _timer.start();
+  return reset();
+}
+
+int TimeService::reset() {
+  int32_t ts;
+  int status = _client->getTimestamp32(&ts);
+
+  if (m2x_status_is_success(status)) {
+    _server_timestamp = ts;
+    _local_last_milli = _timer.read_ms();
+  }
+
+  return status;
+}
+
+int TimeService::getTimestamp(char* buffer, int* length) {
+  uint32_t now = _timer.read_ms();
+  if (now < _local_last_milli) {
+    // In case of a timestamp overflow(happens once every 50 days on
+    // Arduino), we reset the server timestamp recorded.
+    // NOTE: while on an Arduino this might be okay, the situation is worse
+    // on mbed, see the notes in m2x-mbed.h for details
+    int status = reset();
+    if (!m2x_status_is_success(status)) { return status; }
+    now = _timer.read_ms();
+  }
+  if (now < _local_last_milli) {
+    // We have already reseted the timestamp, so this cannot happen
+    // (an HTTP request can take longer than 50 days to finished? You
+    // must be kidding here). Something else must be wrong here
+    return E_TIMESTAMP_ERROR;
+  }
+  uint32_t diff = now - _local_last_milli;
+  _local_last_milli = now;
+  _server_timestamp += (int32_t) (diff / 1000); // Milliseconds to seconds
+  return fill_iso8601_timestamp(_server_timestamp, (int32_t) (diff % 1000),
+                                buffer, length);
+}
+
+#define SIZE_ISO_8601 25
+static inline bool is_leap_year(int16_t y) {
+  return ((1970 + y) > 0) &&
+      !((1970 + y) % 4) &&
+      (((1970 + y) % 100) || !((1970 + y) % 400));
+}
+static inline int32_t days_in_year(int16_t y) {
+  return is_leap_year(y) ? 366 : 365;
+}
+static const uint8_t MONTH_DAYS[]={31,28,31,30,31,30,31,31,30,31,30,31};
+
+static inline int fill_iso8601_timestamp(int32_t timestamp, int32_t milli,
+                                         char* buffer, int* length) {
+  int16_t year;
+  int8_t month, month_length;
+  int32_t day;
+  int8_t hour, minute, second;
+
+  if (*length < SIZE_ISO_8601) {
+    *length = SIZE_ISO_8601;
+    return E_BUFFER_TOO_SMALL;
+  }
+
+  second = timestamp % 60;
+  timestamp /= 60; // now it is minutes
+
+  minute = timestamp % 60;
+  timestamp /= 60; // now it is hours
+
+  hour = timestamp % 24;
+  timestamp /= 24; // now it is days
+
+  year = 0;
+  day = 0;
+  while ((day += days_in_year(year)) <= timestamp) {
+    year++;
+  }
+  day -= days_in_year(year);
+  timestamp -= day; // now it is days in this year, starting at 0
+
+  day = 0;
+  month_length = 0;
+  for (month = 0; month < 12; month++) {
+    if (month == 1) {
+      // February
+      month_length = is_leap_year(year) ? 29 : 28;
+    } else {
+      month_length = MONTH_DAYS[month];
+    }
+
+    if (timestamp >= month_length) {
+      timestamp -= month_length;
+    } else {
+      break;
+    }
+  }
+  year = 1970 + year;
+  month++; // offset by 1
+  day = timestamp + 1;
+
+  int i = 0, j = 0;
+
+  // NOTE: It seems the snprintf implementation in Arduino has bugs,
+  // we have to manually piece the string together here.
+#define INT_TO_STR(v_, width_) \
+  for (j = 0; j < (width_); j++) { \
+    buffer[i + (width_) - 1 - j] = '0' + ((v_) % 10); \
+    (v_) /= 10; \
+  } \
+  i += (width_)
+
+  INT_TO_STR(year, 4);
+  buffer[i++] = '-';
+  INT_TO_STR(month, 2);
+  buffer[i++] = '-';
+  INT_TO_STR(day, 2);
+  buffer[i++] = 'T';
+  INT_TO_STR(hour, 2);
+  buffer[i++] = ':';
+  INT_TO_STR(minute, 2);
+  buffer[i++] = ':';
+  INT_TO_STR(second, 2);
+  buffer[i++] = '.';
+  INT_TO_STR(milli, 3);
+  buffer[i++] = 'Z';
+  buffer[i++] = '\0';
+
+#undef INT_TO_STR
+
+  *length = i;
+  return E_OK;
+}
+
+/* Reader functions */
+#ifdef M2X_ENABLE_READER
+
+// Data structures and functions used to parse stream values
+
+#define STREAM_BUF_LEN 32
+
+typedef struct {
+  uint8_t state;
+  char at_str[STREAM_BUF_LEN + 1];
+  char value_str[STREAM_BUF_LEN + 1];
+  int index;
+
+  stream_value_read_callback callback;
+  void* context;
+} stream_parsing_context_state;
+
+#define WAITING_AT 0x1
+#define GOT_AT 0x2
+#define WAITING_VALUE 0x4
+#define GOT_VALUE 0x8
+
+#define GOT_STREAM (GOT_AT | GOT_VALUE)
+#define TEST_GOT_STREAM(state_) (((state_) & GOT_STREAM) == GOT_STREAM)
+
+#define TEST_IS_AT(state_) (((state_) & (WAITING_AT | GOT_AT)) == WAITING_AT)
+#define TEST_IS_VALUE(state_) (((state_) & (WAITING_VALUE | GOT_VALUE)) == \
+                               WAITING_VALUE)
+
+static void on_stream_key_found(jsonlite_callback_context* context,
+                                jsonlite_token* token)
+{
+  stream_parsing_context_state* state =
+      (stream_parsing_context_state*) context->client_state;
+  if (strncmp((const char*) token->start, "timestamp", 9) == 0) {
+    state->state |= WAITING_AT;
+  } else if ((strncmp((const char*) token->start, "value", 5) == 0) &&
+             (token->start[5] != 's')) { // get rid of "values"
+    state->state |= WAITING_VALUE;
+  }
+}
+
+static void on_stream_value_found(jsonlite_callback_context* context,
+                                  jsonlite_token* token,
+                                  int type)
+{
+  stream_parsing_context_state* state =
+      (stream_parsing_context_state*) context->client_state;
+
+  if (TEST_IS_AT(state->state)) {
+    strncpy(state->at_str, (const char*) token->start,
+            MIN(token->end - token->start, STREAM_BUF_LEN));
+    state->at_str[MIN(token->end - token->start, STREAM_BUF_LEN)] = '\0';
+    state->state |= GOT_AT;
+  } else if (TEST_IS_VALUE(state->state)) {
+    strncpy(state->value_str, (const char*) token->start,
+            MIN(token->end - token->start, STREAM_BUF_LEN));
+    state->value_str[MIN(token->end - token->start, STREAM_BUF_LEN)] = '\0';
+    state->state |= GOT_VALUE;
+  }
+
+  if (TEST_GOT_STREAM(state->state)) {
+    state->callback(state->at_str, state->value_str,
+                    state->index++, state->context, type);
+    state->state = 0;
+  }
+}
+
+static void on_stream_string_found(jsonlite_callback_context* context,
+                                   jsonlite_token* token)
+{
+  on_stream_value_found(context, token, 1);
+}
+
+static void on_stream_number_found(jsonlite_callback_context* context,
+                                   jsonlite_token* token)
+{
+  on_stream_value_found(context, token, 2);
+}
+
+// Data structures and functions used to parse locations
+
+#define LOCATION_BUF_LEN 20
+
+typedef struct {
+  uint16_t state;
+  char name_str[LOCATION_BUF_LEN + 1];
+  double latitude;
+  double longitude;
+  double elevation;
+  char timestamp_str[LOCATION_BUF_LEN + 1];
+  int index;
+
+  location_read_callback callback;
+  void* context;
+} location_parsing_context_state;
+
+#define WAITING_NAME 0x1
+#define WAITING_LATITUDE 0x2
+#define WAITING_LONGITUDE 0x4
+#define WAITING_ELEVATION 0x8
+#define WAITING_TIMESTAMP 0x10
+
+#define GOT_NAME 0x20
+#define GOT_LATITUDE 0x40
+#define GOT_LONGITUDE 0x80
+#define GOT_ELEVATION 0x100
+#define GOT_TIMESTAMP 0x200
+
+#define GOT_LOCATION (GOT_NAME | GOT_LATITUDE | GOT_LONGITUDE | GOT_ELEVATION | GOT_TIMESTAMP)
+#define TEST_GOT_LOCATION(state_) (((state_) & GOT_LOCATION) == GOT_LOCATION)
+
+#define TEST_IS_NAME(state_) (((state_) & (WAITING_NAME | GOT_NAME)) == WAITING_NAME)
+#define TEST_IS_LATITUDE(state_) (((state_) & (WAITING_LATITUDE | GOT_LATITUDE)) \
+                                  == WAITING_LATITUDE)
+#define TEST_IS_LONGITUDE(state_) (((state_) & (WAITING_LONGITUDE | GOT_LONGITUDE)) \
+                                   == WAITING_LONGITUDE)
+#define TEST_IS_ELEVATION(state_) (((state_) & (WAITING_ELEVATION | GOT_ELEVATION)) \
+                                   == WAITING_ELEVATION)
+#define TEST_IS_TIMESTAMP(state_) (((state_) & (WAITING_TIMESTAMP | GOT_TIMESTAMP)) \
+                                   == WAITING_TIMESTAMP)
+
+static void on_location_key_found(jsonlite_callback_context* context,
+                                  jsonlite_token* token) {
+  location_parsing_context_state* state =
+      (location_parsing_context_state*) context->client_state;
+  if (strncmp((const char*) token->start, "waypoints", 9) == 0) {
+    // only parses those locations in waypoints, skip the outer one
+    state->state = 0;
+  } else if (strncmp((const char*) token->start, "name", 4) == 0) {
+    state->state |= WAITING_NAME;
+  } else if (strncmp((const char*) token->start, "latitude", 8) == 0) {
+    state->state |= WAITING_LATITUDE;
+  } else if (strncmp((const char*) token->start, "longitude", 9) == 0) {
+    state->state |= WAITING_LONGITUDE;
+  } else if (strncmp((const char*) token->start, "elevation", 9) == 0) {
+    state->state |= WAITING_ELEVATION;
+  } else if (strncmp((const char*) token->start, "timestamp", 9) == 0) {
+    state->state |= WAITING_TIMESTAMP;
+  }
+}
+
+static void on_location_string_found(jsonlite_callback_context* context,
+                                     jsonlite_token* token) {
+  location_parsing_context_state* state =
+      (location_parsing_context_state*) context->client_state;
+
+  if (TEST_IS_NAME(state->state)) {
+    strncpy(state->name_str, (const char*) token->start,
+            MIN(token->end - token->start, LOCATION_BUF_LEN));
+    state->name_str[MIN(token->end - token->start, LOCATION_BUF_LEN)] = '\0';
+    state->state |= GOT_NAME;
+  } else if (TEST_IS_LATITUDE(state->state)) {
+    state->latitude = atof((const char*) token->start);
+    state->state |= GOT_LATITUDE;
+  } else if (TEST_IS_LONGITUDE(state->state)) {
+    state->longitude = atof((const char*) token->start);
+    state->state |= GOT_LONGITUDE;
+  } else if (TEST_IS_ELEVATION(state->state)) {
+    state->elevation = atof((const char*) token->start);
+    state->state |= GOT_ELEVATION;
+  } else if (TEST_IS_TIMESTAMP(state->state)) {
+    strncpy(state->timestamp_str, (const char*) token->start,
+            MIN(token->end - token->start, LOCATION_BUF_LEN));
+    state->timestamp_str[MIN(token->end - token->start, LOCATION_BUF_LEN)] = '\0';
+    state->state |= GOT_TIMESTAMP;
+  }
+
+  if (TEST_GOT_LOCATION(state->state)) {
+    state->callback(state->name_str, state->latitude, state->longitude,
+                    state->elevation, state->timestamp_str, state->index++,
+                    state->context);
+    state->state = 0;
+  }
+}
+
+#ifndef M2X_COMMAND_BUF_LEN
+#define M2X_COMMAND_BUF_LEN 40
+#endif  /* M2X_COMMAND_BUF_LEN */
+
+typedef struct {
+  uint8_t state;
+  char id_str[M2X_COMMAND_BUF_LEN + 1];
+  char name_str[M2X_COMMAND_BUF_LEN + 1];
+  int index;
+
+  m2x_command_read_callback callback;
+  void* context;
+} m2x_command_parsing_context_state;
+
+#define M2X_COMMAND_WAITING_ID 0x1
+#define M2X_COMMAND_GOT_ID 0x2
+#define M2X_COMMAND_WAITING_NAME 0x4
+#define M2X_COMMAND_GOT_NAME 0x8
+
+#define M2X_COMMAND_GOT_COMMAND (M2X_COMMAND_GOT_ID | M2X_COMMAND_GOT_NAME)
+#define M2X_COMMAND_TEST_GOT_COMMAND(state_) \
+  (((state_) & M2X_COMMAND_GOT_COMMAND) == M2X_COMMAND_GOT_COMMAND)
+
+#define M2X_COMMAND_TEST_IS_ID(state_) \
+  (((state_) & (M2X_COMMAND_WAITING_ID | M2X_COMMAND_GOT_ID)) == \
+   M2X_COMMAND_WAITING_ID)
+#define M2X_COMMAND_TEST_IS_NAME(state_) \
+  (((state_) & (M2X_COMMAND_WAITING_NAME | M2X_COMMAND_GOT_NAME)) == \
+   M2X_COMMAND_WAITING_NAME)
+
+static void m2x_on_command_key_found(jsonlite_callback_context* context,
+                                     jsonlite_token* token)
+{
+  m2x_command_parsing_context_state* state =
+      (m2x_command_parsing_context_state*) context->client_state;
+  if (strncmp((const char*) token->start, "id", 2) == 0) {
+    state->state |= M2X_COMMAND_WAITING_ID;
+  } else if (strncmp((const char*) token->start, "name", 4) == 0) {
+    state->state |= M2X_COMMAND_WAITING_NAME;
+  }
+}
+
+static void m2x_on_command_value_found(jsonlite_callback_context* context,
+                                       jsonlite_token* token)
+{
+  m2x_command_parsing_context_state* state =
+      (m2x_command_parsing_context_state*) context->client_state;
+  if (M2X_COMMAND_TEST_IS_ID(state->state)) {
+    strncpy(state->id_str, (const char*) token->start,
+            MIN(token->end - token->start, M2X_COMMAND_BUF_LEN));
+    state->id_str[MIN(token->end - token->start, M2X_COMMAND_BUF_LEN)] = '\0';
+    state->state |= M2X_COMMAND_GOT_ID;
+  } else if (M2X_COMMAND_TEST_IS_NAME(state->state)) {
+    strncpy(state->name_str, (const char*) token->start,
+            MIN(token->end - token->start, M2X_COMMAND_BUF_LEN));
+    state->name_str[MIN(token->end - token->start, M2X_COMMAND_BUF_LEN)] = '\0';
+    state->state |= M2X_COMMAND_GOT_NAME;
+  }
+
+  if (M2X_COMMAND_TEST_GOT_COMMAND(state->state)) {
+    state->callback(state->id_str, state->name_str,
+                    state->index++, state->context);
+    state->state = 0;
+  }
+}
+
+int M2XStreamClient::listStreamValues(const char* deviceId, const char* streamName,
+                                      stream_value_read_callback callback, void* context,
+                                      const char* query) {
+  if (_client->connect(_host, _port)) {
+    DBGLN("%s", "Connected to M2X server!");
+    _client->print("GET ");
+    if (_path_prefix) { _client->print(_path_prefix); }
+    _client->print("/v2/devices/");
+    _client->print(deviceId);
+    _client->print("/streams/");
+    print_encoded_string(_client, streamName);
+    _client->print("/values");
+
+    if (query) {
+      if (query[0] != '?') {
+        _client->print('?');
+      }
+      _client->print(query);
+    }
+
+    _client->println(" HTTP/1.0");
+    writeHttpHeader(-1);
+  } else {
+    DBGLN("%s", "ERROR: Cannot connect to M2X server!");
+    return E_NOCONNECTION;
+  }
+  int status = readStatusCode(false);
+  if (status == 200) {
+    readStreamValue(callback, context);
+  }
+
+  close();
+  return status;
+}
+
+int M2XStreamClient::readLocation(const char* deviceId,
+                                  location_read_callback callback,
+                                  void* context) {
+  if (_client->connect(_host, _port)) {
+    DBGLN("%s", "Connected to M2X server!");
+    _client->print("GET ");
+    if (_path_prefix) { _client->print(_path_prefix); }
+    _client->print("/v2/devices/");
+    _client->print(deviceId);
+    _client->println("/location HTTP/1.0");
+
+    writeHttpHeader(-1);
+  } else {
+    DBGLN("%s", "ERROR: Cannot connect to M2X server!");
+    return E_NOCONNECTION;
+  }
+  int status = readStatusCode(false);
+  if (status == 200) {
+    readLocation(callback, context);
+  }
+
+  close();
+  return status;
+}
+
+int M2XStreamClient::listCommands(const char* deviceId,
+                                  m2x_command_read_callback callback,
+                                  void* context,
+                                  const char* query) {
+  if (_client->connect(_host, _port)) {
+    DBGLN("%s", "Connected to M2X server!");
+    _client->print("GET ");
+    if (_path_prefix) { _client->print(_path_prefix); }
+    _client->print("/v2/devices/");
+    _client->print(deviceId);
+    _client->print("/commands");
+
+    if (query) {
+      if (query[0] != '?') {
+        _client->print('?');
+      }
+      _client->print(query);
+    }
+
+    _client->println(" HTTP/1.0");
+    writeHttpHeader(-1);
+  } else {
+    DBGLN("%s", "ERROR: Cannot connect to M2X server!");
+    return E_NOCONNECTION;
+  }
+  int status = readStatusCode(false);
+  if (status == 200) {
+    readCommand(callback, context);
+  }
+
+  close();
+  return status;
+}
+
+
+int M2XStreamClient::readStreamValue(stream_value_read_callback callback,
+                                     void* context) {
+  const int BUF_LEN = 64;
+  char buf[BUF_LEN];
+
+  int length = readContentLength();
+  if (length < 0) {
+    close();
+    return length;
+  }
+
+  int index = skipHttpHeader();
+  if (index != E_OK) {
+    close();
+    return index;
+  }
+  index = 0;
+
+  stream_parsing_context_state state;
+  state.state = state.index = 0;
+  state.callback = callback;
+  state.context = context;
+
+  jsonlite_parser_callbacks cbs = jsonlite_default_callbacks;
+  cbs.key_found = on_stream_key_found;
+  cbs.number_found = on_stream_number_found;
+  cbs.string_found = on_stream_string_found;
+  cbs.context.client_state = &state;
+
+  jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5));
+  jsonlite_parser_set_callback(p, &cbs);
+
+  jsonlite_result result = jsonlite_result_unknown;
+  while (index < length) {
+    int i = 0;
+
+    DBG("%s", "Received Data: ");
+    while ((i < BUF_LEN) && _client->available()) {
+      buf[i++] = _client->read();
+      DBG("%c", buf[i - 1]);
+    }
+    DBGLNEND;
+
+    if ((!_client->connected()) &&
+        (!_client->available()) &&
+        ((index + i) < length)) {
+      jsonlite_parser_release(p);
+      close();
+      return E_NOCONNECTION;
+    }
+
+    result = jsonlite_parser_tokenize(p, buf, i);
+    if ((result != jsonlite_result_ok) &&
+        (result != jsonlite_result_end_of_stream)) {
+      jsonlite_parser_release(p);
+      close();
+      return E_JSON_INVALID;
+    }
+
+    index += i;
+  }
+
+  jsonlite_parser_release(p);
+  close();
+  return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID);
+}
+
+int M2XStreamClient::readLocation(location_read_callback callback,
+                                  void* context) {
+  const int BUF_LEN = 40;
+  char buf[BUF_LEN];
+
+  int length = readContentLength();
+  if (length < 0) {
+    close();
+    return length;
+  }
+
+  int index = skipHttpHeader();
+  if (index != E_OK) {
+    close();
+    return index;
+  }
+  index = 0;
+
+  location_parsing_context_state state;
+  state.state = state.index = 0;
+  state.callback = callback;
+  state.context = context;
+
+  jsonlite_parser_callbacks cbs = jsonlite_default_callbacks;
+  cbs.key_found = on_location_key_found;
+  cbs.string_found = on_location_string_found;
+  cbs.context.client_state = &state;
+
+  jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5));
+  jsonlite_parser_set_callback(p, &cbs);
+
+  jsonlite_result result = jsonlite_result_unknown;
+  while (index < length) {
+    int i = 0;
+
+    DBG("%s", "Received Data: ");
+    while ((i < BUF_LEN) && _client->available()) {
+      buf[i++] = _client->read();
+      DBG("%c", buf[i - 1]);
+    }
+    DBGLNEND;
+
+    if ((!_client->connected()) &&
+        (!_client->available()) &&
+        ((index + i) < length)) {
+      jsonlite_parser_release(p);
+      close();
+      return E_NOCONNECTION;
+    }
+
+    result = jsonlite_parser_tokenize(p, buf, i);
+    if ((result != jsonlite_result_ok) &&
+        (result != jsonlite_result_end_of_stream)) {
+      jsonlite_parser_release(p);
+      close();
+      return E_JSON_INVALID;
+    }
+
+    index += i;
+  }
+
+  jsonlite_parser_release(p);
+  close();
+  return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID);
+}
+
+int M2XStreamClient::readCommand(m2x_command_read_callback callback,
+                                 void* context) {
+  const int BUF_LEN = 60;
+  char buf[BUF_LEN];
+
+  int length = readContentLength();
+  if (length < 0) {
+    close();
+    return length;
+  }
+
+  int index = skipHttpHeader();
+  if (index != E_OK) {
+    close();
+    return index;
+  }
+  index = 0;
+
+  m2x_command_parsing_context_state state;
+  state.state = 0;
+  state.index = 0;
+  state.callback = callback;
+  state.context = context;
+
+  jsonlite_parser_callbacks cbs = jsonlite_default_callbacks;
+  cbs.key_found = m2x_on_command_key_found;
+  cbs.string_found = m2x_on_command_value_found;
+  cbs.context.client_state = &state;
+
+  jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5));
+  jsonlite_parser_set_callback(p, &cbs);
+
+  jsonlite_result result = jsonlite_result_unknown;
+  while (index < length) {
+    int i = 0;
+
+    DBG("%s", "Received Data: ");
+    while ((i < BUF_LEN) && _client->available()) {
+      buf[i++] = _client->read();
+      DBG("%c", buf[i - 1]);
+    }
+    DBGLNEND;
+
+    if ((!_client->connected()) &&
+        (!_client->available()) &&
+        ((index + i) < length)) {
+      jsonlite_parser_release(p);
+      close();
+      return E_NOCONNECTION;
+    }
+
+    result = jsonlite_parser_tokenize(p, buf, i);
+    if ((result != jsonlite_result_ok) &&
+        (result != jsonlite_result_end_of_stream)) {
+      jsonlite_parser_release(p);
+      close();
+      return E_JSON_INVALID;
+    }
+
+    index += i;
+  }
+
+  jsonlite_parser_release(p);
+  close();
+  return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID);
+}
+
+#endif  /* M2X_ENABLE_READER */
 
 #endif  /* M2XStreamClient_h */