Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependents: WNCInterface_M2Xdemo ATT_WNCInterface_Info WNCInterface_HTTP_example Public_IoT_M2X_Cellular_Demo
Fork of M2XStreamClient by
Revision 19:4dfa28d37b8f, committed 2016-01-01
- Comitter:
- citrusbyte
- Date:
- Fri Jan 01 15:48:18 2016 +0000
- Parent:
- 18:510895884768
- Child:
- 20:32d24a6355b5
- Commit message:
- Use correct M2XStreamClient.cpp file
Changed in this revision
--- a/M2XStreamClient.cpp Mon Dec 28 14:30:33 2015 +0000
+++ b/M2XStreamClient.cpp Fri Jan 01 15:48:18 2016 +0000
@@ -1,147 +1,527 @@
#include "M2XStreamClient.h"
-static int fill_iso8601_timestamp(int32_t seconds, int32_t milli,
- char* buffer, int* length);
+#include <jsonlite.h>
+
+#include "StreamParseFunctions.h"
+#include "LocationParseFunctions.h"
+
+const char* M2XStreamClient::kDefaultM2XHost = "api-m2x.att.com";
+
+static int write_delete_values(Print* print, const char* from, const char* end);
+int print_encoded_string(Print* print, const char* str);
+int tolower(int ch);
+
+#if defined(ARDUINO_PLATFORM) || defined(MBED_PLATFORM)
+int tolower(int ch)
+{
+ // Arduino and mbed use ASCII table, so we can simplify the implementation
+ if ((ch >= 'A') && (ch <= 'Z')) {
+ return (ch + 32);
+ }
+ return ch;
+}
+#else
+// For other platform, we use libc's tolower by default
+#include <ctype.h>
+#endif
-TimeService::TimeService(M2XStreamClient* client) : _client(client) {
+M2XStreamClient::M2XStreamClient(Client* client,
+ const char* key,
+ int case_insensitive,
+ const char* host,
+ int port,
+ const char* path_prefix) : _client(client),
+ _key(key),
+ _case_insensitive(case_insensitive),
+ _host(host),
+ _port(port),
+ _path_prefix(path_prefix),
+ _null_print() {
+}
+
+int M2XStreamClient::listStreamValues(const char* deviceId, const char* streamName,
+ stream_value_read_callback callback, void* context,
+ const char* query) {
+ if (_client->connect(_host, _port)) {
+ DBGLN("%s", "Connected to M2X server!");
+ _client->print("GET ");
+ if (_path_prefix)
+ _client->print(_path_prefix);
+ _client->print("/v2/devices/");
+ print_encoded_string(_client, deviceId);
+ _client->print("/streams/");
+ print_encoded_string(_client, streamName);
+ _client->print("/values.json");
+
+ if (query) {
+ if (query[0] != '?') {
+ _client->print('?');
+ }
+ _client->print(query);
+ }
+
+ _client->println(" HTTP/1.0");
+ writeHttpHeader(-1);
+ } else {
+ DBGLN("%s", "ERROR: Cannot connect to M2X server!");
+ return E_NOCONNECTION;
+ }
+ int status = readStatusCode(false);
+ if (status == 200) {
+ readStreamValue(callback, context);
+ }
+
+ close();
+ return status;
}
-int TimeService::init() {
- _timer.start();
- return reset();
+int M2XStreamClient::readLocation(const char* deviceId,
+ location_read_callback callback,
+ void* context) {
+ if (_client->connect(_host, _port)) {
+ DBGLN("%s", "Connected to M2X server!");
+ _client->print("GET ");
+ if (_path_prefix)
+ _client->print(_path_prefix);
+ _client->print("/v2/devices/");
+ print_encoded_string(_client, deviceId);
+ _client->println("/location HTTP/1.0");
+
+ writeHttpHeader(-1);
+ } else {
+ DBGLN("%s", "ERROR: Cannot connect to M2X server!");
+ return E_NOCONNECTION;
+ }
+ int status = readStatusCode(false);
+ if (status == 200) {
+ readLocation(callback, context);
+ }
+
+ close();
+ return status;
+}
+
+int M2XStreamClient::deleteValues(const char* deviceId, const char* streamName,
+ const char* from, const char* end) {
+ if (_client->connect(_host, _port)) {
+ DBGLN("%s", "Connected to M2X server!");
+ int length = write_delete_values(&_null_print, from, end);
+ writeDeleteHeader(deviceId, streamName, length);
+ write_delete_values(_client, from, end);
+ } else {
+ DBGLN("%s", "ERROR: Cannot connect to M2X server!");
+ return E_NOCONNECTION;
+ }
+
+ return readStatusCode(true);
}
-int TimeService::reset() {
- int32_t ts;
- int status = _client->getTimestamp32(&ts);
+int M2XStreamClient::getTimestamp32(int32_t *ts) {
+ // The maximum value of signed 64-bit integer is 0x7fffffffffffffff,
+ // which is 9223372036854775807. It consists of 19 characters, so a
+ // buffer of 20 is definitely enough here
+ int length = 20;
+ char buffer[20];
+ int status = getTimestamp(buffer, &length);
+ if (status == 200) {
+ int32_t result = 0;
+ for (int i = 0; i < length; i++) {
+ result = result * 10 + (buffer[i] - '0');
+ }
+ if (ts != NULL) { *ts = result; }
+ }
+ return status;
+}
+
+int M2XStreamClient::getTimestamp(char* buffer, int *bufferLength) {
+ if (bufferLength == NULL) { return E_INVALID; }
+ if (_client->connect(_host, _port)) {
+ DBGLN("%s", "Connected to M2X server!");
+ _client->println("GET /v2/time/seconds HTTP/1.0");
- if (m2x_status_is_success(status)) {
- _server_timestamp = ts;
- _local_last_milli = _timer.read_ms();
+ writeHttpHeader(-1);
+ } else {
+ DBGLN("%s", "ERROR: Cannot connect to M2X server!");
+ return E_NOCONNECTION;
}
+ int status = readStatusCode(false);
+ if (status == 200) {
+ int length = readContentLength();
+ if (length < 0) {
+ close();
+ return length;
+ }
+ if (*bufferLength < length) {
+ *bufferLength = length;
+ return E_BUFFER_TOO_SMALL;
+ }
+ *bufferLength = length;
+ int index = skipHttpHeader();
+ if (index != E_OK) {
+ close();
+ return index;
+ }
+ index = 0;
+ while (index < length) {
+ DBG("%s", "Received Data: ");
+ while ((index < length) && _client->available()) {
+ buffer[index++] = _client->read();
+ DBG("%c", buffer[index - 1]);
+ }
+ DBGLNEND;
+ if ((!_client->connected()) &&
+ (index < length)) {
+ close();
+ return E_NOCONNECTION;
+ }
+
+ delay(200);
+ }
+ }
+ close();
return status;
}
-int TimeService::getTimestamp(char* buffer, int* length) {
- uint32_t now = _timer.read_ms();
- if (now < _local_last_milli) {
- // In case of a timestamp overflow, we reset the server timestamp recorded.
- // Notice that unlike Arduino, mbed would overflow every 30 minutes,
- // so if 2 calls to this API are more than 30 minutes apart, we are
- // likely to run into troubles. This is a limitation of the current
- // mbed platform. However, we argue that it might be a rare case that
- // 2 calls to this are 30 minutes apart. In most cases, we would call
- // this API every few seconds or minutes, this won't be a huge problem.
- // However, if you have a use case that would require 2 intervening
- // calls to this be 30 minutes apart, you might want to leverage a RTC
- // clock instead of the simple ticker here.
- int status = reset();
- if (!m2x_status_is_success(status)) { return status; }
- now = _timer.read_ms();
+static int write_delete_values(Print* print, const char* from,
+ const char* end) {
+ int bytes = 0;
+ bytes += print->print("{\"from\":\"");
+ bytes += print->print(from);
+ bytes += print->print("\",\"end\":\"");
+ bytes += print->print(end);
+ bytes += print->print("\"}");
+ return bytes;
+}
+
+// Encodes and prints string using Percent-encoding specified
+// in RFC 1738, Section 2.2
+int print_encoded_string(Print* print, const char* str) {
+ int bytes = 0;
+ for (int i = 0; str[i] != 0; i++) {
+ if (((str[i] >= 'A') && (str[i] <= 'Z')) ||
+ ((str[i] >= 'a') && (str[i] <= 'z')) ||
+ ((str[i] >= '0') && (str[i] <= '9')) ||
+ (str[i] == '-') || (str[i] == '_') ||
+ (str[i] == '.') || (str[i] == '~')) {
+ bytes += print->print(str[i]);
+ } else {
+ // Encode all other characters
+ bytes += print->print('%');
+ bytes += print->print(HEX(str[i] / 16));
+ bytes += print->print(HEX(str[i] % 16));
+ }
}
- if (now < _local_last_milli) {
- // We have already reseted the timestamp, so this cannot happen,
- // an HTTP request cannot last 30 minutes, something else must be wrong
- // here.
- return E_TIMESTAMP_ERROR;
- }
- uint32_t diff = now - _local_last_milli;
- _local_last_milli = now;
- _server_timestamp += (int32_t) (diff / 1000); // Milliseconds to seconds
- return fill_iso8601_timestamp(_server_timestamp, (int32_t) (diff % 1000),
- buffer, length);
+ return bytes;
+}
+
+void M2XStreamClient::writePutHeader(const char* deviceId,
+ const char* streamName,
+ int contentLength) {
+ _client->print("PUT ");
+ if (_path_prefix)
+ _client->print(_path_prefix);
+ _client->print("/v2/devices/");
+ print_encoded_string(_client, deviceId);
+ _client->print("/streams/");
+ print_encoded_string(_client, streamName);
+ _client->println("/value HTTP/1.0");
+
+ writeHttpHeader(contentLength);
+}
+
+void M2XStreamClient::writeDeleteHeader(const char* deviceId,
+ const char* streamName,
+ int contentLength) {
+ _client->print("DELETE ");
+ if (_path_prefix)
+ _client->print(_path_prefix);
+ _client->print("/v2/devices/");
+ print_encoded_string(_client, deviceId);
+ _client->print("/streams/");
+ print_encoded_string(_client, streamName);
+ _client->print("/values");
+ _client->println(" HTTP/1.0");
+
+ writeHttpHeader(contentLength);
}
-#define SIZE_ISO_8601 25
-static inline bool is_leap_year(int16_t y) {
- return ((1970 + y) > 0) &&
- !((1970 + y) % 4) &&
- (((1970 + y) % 100) || !((1970 + y) % 400));
+void M2XStreamClient::writeHttpHeader(int contentLength) {
+ _client->println(USER_AGENT);
+ _client->print("X-M2X-KEY: ");
+ _client->println(_key);
+
+ _client->print("Host: ");
+ print_encoded_string(_client, _host);
+ if (_port != kDefaultM2XPort) {
+ _client->print(":");
+ // port is an integer, does not need encoding
+ _client->print(_port);
+ }
+ _client->println();
+
+ if (contentLength > 0) {
+ _client->println("Content-Type: application/json");
+ DBG("%s", "Content Length: ");
+ DBGLN("%d", contentLength);
+
+ _client->print("Content-Length: ");
+ _client->println(contentLength);
+ }
+ _client->println();
}
-static inline int32_t days_in_year(int16_t y) {
- return is_leap_year(y) ? 366 : 365;
-}
-static const uint8_t MONTH_DAYS[]={31,28,31,30,31,30,31,31,30,31,30,31};
+
+int M2XStreamClient::waitForString(const char* str) {
+ int currentIndex = 0;
+ if (str[currentIndex] == '\0') return E_OK;
+
+ while (true) {
+ while (_client->available()) {
+ char c = _client->read();
+ DBG("%c", c);
+
+ int cmp;
+ if (_case_insensitive) {
+ cmp = tolower(c) - tolower(str[currentIndex]);
+ } else {
+ cmp = c - str[currentIndex];
+ }
-static int fill_iso8601_timestamp(int32_t timestamp, int32_t milli,
- char* buffer, int* length) {
- int16_t year;
- int8_t month, month_length;
- int32_t day;
- int8_t hour, minute, second;
+ if ((str[currentIndex] == '*') || (cmp == 0)) {
+ currentIndex++;
+ if (str[currentIndex] == '\0') {
+ return E_OK;
+ }
+ } else {
+ // start from the beginning
+ currentIndex = 0;
+ }
+ }
+
+ if (!_client->connected()) {
+ DBGLN("%s", "ERROR: The client is disconnected from the server!");
+
+ close();
+ return E_DISCONNECTED;
+ }
+
+ delay(1000);
+ }
+ // never reached here
+ return E_NOTREACHABLE;
+}
- if (*length < SIZE_ISO_8601) {
- *length = SIZE_ISO_8601;
- return E_BUFFER_TOO_SMALL;
+int M2XStreamClient::readStatusCode(bool closeClient) {
+ int responseCode = 0;
+ int ret = waitForString("HTTP/*.* ");
+ if (ret != E_OK) {
+ if (closeClient) close();
+ return ret;
+ }
+
+ // ret is not needed from here(since it must be E_OK), so we can use it
+ // as a regular variable now.
+ ret = 0;
+ while (true) {
+ while (_client->available()) {
+ char c = _client->read();
+ DBG("%c", c);
+
+ responseCode = responseCode * 10 + (c - '0');
+ ret++;
+ if (ret == 3) {
+ if (closeClient) close();
+ return responseCode;
+ }
+ }
+
+ if (!_client->connected()) {
+ DBGLN("%s", "ERROR: The client is disconnected from the server!");
+
+ if (closeClient) close();
+ return E_DISCONNECTED;
+ }
+
+ delay(1000);
}
- second = timestamp % 60;
- timestamp /= 60; // now it is minutes
+ // never reached here
+ return E_NOTREACHABLE;
+}
- minute = timestamp % 60;
- timestamp /= 60; // now it is hours
-
- hour = timestamp % 24;
- timestamp /= 24; // now it is days
+int M2XStreamClient::readContentLength() {
+ int ret = waitForString("Content-Length: ");
+ if (ret != E_OK) {
+ return ret;
+ }
- year = 0;
- day = 0;
- while ((day += days_in_year(year)) <= timestamp) {
- year++;
- }
- day -= days_in_year(year);
- timestamp -= day; // now it is days in this year, starting at 0
+ // From now on, ret is not needed, we can use it
+ // to keep the final result
+ ret = 0;
+ while (true) {
+ while (_client->available()) {
+ char c = _client->read();
+ DBG("%c", c);
- day = 0;
- month_length = 0;
- for (month = 0; month < 12; month++) {
- if (month == 1) {
- // February
- month_length = is_leap_year(year) ? 29 : 28;
- } else {
- month_length = MONTH_DAYS[month];
+ if ((c == '\r') || (c == '\n')) {
+ return (ret == 0) ? (E_INVALID) : (ret);
+ } else {
+ ret = ret * 10 + (c - '0');
+ }
+ }
+
+ if (!_client->connected()) {
+ DBGLN("%s", "ERROR: The client is disconnected from the server!");
+
+ return E_DISCONNECTED;
}
- if (timestamp >= month_length) {
- timestamp -= month_length;
- } else {
- break;
- }
+ delay(1000);
}
- year = 1970 + year;
- month++; // offset by 1
- day = timestamp + 1;
+
+ // never reached here
+ return E_NOTREACHABLE;
+}
+
+int M2XStreamClient::skipHttpHeader() {
+ return waitForString("\n\r\n");
+}
+
+void M2XStreamClient::close() {
+ // Eats up buffered data before closing
+ _client->flush();
+ _client->stop();
+}
+
+int M2XStreamClient::readStreamValue(stream_value_read_callback callback,
+ void* context) {
+ const int BUF_LEN = 64;
+ char buf[BUF_LEN];
+
+ int length = readContentLength();
+ if (length < 0) {
+ close();
+ return length;
+ }
- int i = 0, j = 0;
+ int index = skipHttpHeader();
+ if (index != E_OK) {
+ close();
+ return index;
+ }
+ index = 0;
+
+ stream_parsing_context_state state;
+ state.state = state.index = 0;
+ state.callback = callback;
+ state.context = context;
+
+ jsonlite_parser_callbacks cbs = jsonlite_default_callbacks;
+ cbs.key_found = on_stream_key_found;
+ cbs.number_found = on_stream_number_found;
+ cbs.string_found = on_stream_string_found;
+ cbs.context.client_state = &state;
- // NOTE: It seems the snprintf implementation in Arduino has bugs,
- // we have to manually piece the string together here.
-#define INT_TO_STR(v_, width_) \
- for (j = 0; j < (width_); j++) { \
- buffer[i + (width_) - 1 - j] = '0' + ((v_) % 10); \
- (v_) /= 10; \
- } \
- i += (width_)
+ jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5));
+ jsonlite_parser_set_callback(p, &cbs);
+
+ jsonlite_result result = jsonlite_result_unknown;
+ while (index < length) {
+ int i = 0;
+
+ DBG("%s", "Received Data: ");
+ while ((i < BUF_LEN) && _client->available()) {
+ buf[i++] = _client->read();
+ DBG("%c", buf[i - 1]);
+ }
+ DBGLNEND;
+
+ if ((!_client->connected()) &&
+ (!_client->available()) &&
+ ((index + i) < length)) {
+ jsonlite_parser_release(p);
+ close();
+ return E_NOCONNECTION;
+ }
- INT_TO_STR(year, 4);
- buffer[i++] = '-';
- INT_TO_STR(month, 2);
- buffer[i++] = '-';
- INT_TO_STR(day, 2);
- buffer[i++] = 'T';
- INT_TO_STR(hour, 2);
- buffer[i++] = ':';
- INT_TO_STR(minute, 2);
- buffer[i++] = ':';
- INT_TO_STR(second, 2);
- buffer[i++] = '.';
- INT_TO_STR(milli, 3);
- buffer[i++] = 'Z';
- buffer[i++] = '\0';
+ result = jsonlite_parser_tokenize(p, buf, i);
+ if ((result != jsonlite_result_ok) &&
+ (result != jsonlite_result_end_of_stream)) {
+ jsonlite_parser_release(p);
+ close();
+ return E_JSON_INVALID;
+ }
+
+ index += i;
+ }
+
+ jsonlite_parser_release(p);
+ close();
+ return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID);
+}
+
+int M2XStreamClient::readLocation(location_read_callback callback,
+ void* context) {
+ const int BUF_LEN = 40;
+ char buf[BUF_LEN];
+
+ int length = readContentLength();
+ if (length < 0) {
+ close();
+ return length;
+ }
+
+ int index = skipHttpHeader();
+ if (index != E_OK) {
+ close();
+ return index;
+ }
+ index = 0;
+
+ location_parsing_context_state state;
+ state.state = state.index = 0;
+ state.callback = callback;
+ state.context = context;
-#undef INT_TO_STR
+ jsonlite_parser_callbacks cbs = jsonlite_default_callbacks;
+ cbs.key_found = on_location_key_found;
+ cbs.string_found = on_location_string_found;
+ cbs.context.client_state = &state;
+
+ jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5));
+ jsonlite_parser_set_callback(p, &cbs);
+
+ jsonlite_result result = jsonlite_result_unknown;
+ while (index < length) {
+ int i = 0;
+
+ DBG("%s", "Received Data: ");
+ while ((i < BUF_LEN) && _client->available()) {
+ buf[i++] = _client->read();
+ DBG("%c", buf[i - 1]);
+ }
+ DBGLNEND;
- *length = i;
- return E_OK;
+ if ((!_client->connected()) &&
+ (!_client->available()) &&
+ ((index + i) < length)) {
+ jsonlite_parser_release(p);
+ close();
+ return E_NOCONNECTION;
+ }
+
+ result = jsonlite_parser_tokenize(p, buf, i);
+ if ((result != jsonlite_result_ok) &&
+ (result != jsonlite_result_end_of_stream)) {
+ jsonlite_parser_release(p);
+ close();
+ return E_JSON_INVALID;
+ }
+
+ index += i;
+ }
+
+ jsonlite_parser_release(p);
+ close();
+ return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID);
}
--- a/M2XStreamClient.h Mon Dec 28 14:30:33 2015 +0000
+++ b/M2XStreamClient.h Fri Jan 01 15:48:18 2016 +0000
@@ -127,6 +127,20 @@
const char* names[], const int counts[],
const char* ats[], T values[]);
+ // Post multiple values of a single device at once.
+ // +deviceId+ - id of the device to post values
+ // +streamNum+ - Number of streams to post
+ // +names+ - Array of stream names, the length of the array should
+ // be exactly +streamNum+
+ // +values+ - Array of values to post, the length of the array should
+ // be exactly +streamNum+. Notice that the array of +values+ should
+ // match the array of +names+, and that the ith value in +values+ is
+ // exactly the value to post for the ith stream name in +names+
+ template <class T>
+ int postSingleDeviceUpdate(const char* deviceId, int streamNum,
+ const char* names[], T values[],
+ const char* at = NULL);
+
// Fetch values for a particular data stream. Since memory is
// very limited on an Arduino, we cannot parse and get all the
// data points in memory. Instead, we use callbacks here: whenever
--- a/M2XStreamClient_template.h Mon Dec 28 14:30:33 2015 +0000
+++ b/M2XStreamClient_template.h Fri Jan 01 15:48:18 2016 +0000
@@ -73,6 +73,54 @@
}
template <class T>
+inline int write_single_device_values(Print* print, int streamNum,
+ const char* names[], T values[],
+ const char* at) {
+ int bytes = 0;
+ bytes += print->print("{\"values\":{");
+ for (int i = 0; i < streamNum; i++) {
+ bytes += print->print("\"");
+ bytes += print->print(names[i]);
+ bytes += print->print("\": \"");
+ bytes += print->print(values[value_index]);
+ bytes += print->print("\"");
+ if (i < streamNum - 1) { bytes += print->print(","); }
+ }
+ bytes += print->print("}");
+ if (at != NULL) {
+ bytes += print->print(",\"timestamp\":\"");
+ bytes += print->print(at);
+ bytes += print->print("\"");
+ }
+ bytes += print->print("}");
+ return bytes;
+}
+
+template <class T>
+int M2XStreamClient::postSingleDeviceUpdate(const char* deviceId, int streamNum,
+ const char* names[], T values[],
+ const char* at) {
+ if (_client->connect(_host, _port)) {
+ DBGLN("%s", "Connected to M2X server!");
+ int length = write_single_device_values(&_null_print, streamNum, names,
+ values, at);
+ _client->print("POST ");
+ if (_path_prefix) {
+ _client->print(_path_prefix);
+ }
+ _client->print("/v2/devices/");
+ print_encoded_string(_client, deviceId);
+ _client->println("/update HTTP/1.0");
+ writeHttpHeader(length);
+ write_single_device_values(_client, streamNum, names, values, at);
+ } else {
+ DBGLN("%s", "ERROR: Cannot connect to M2X server!");
+ return E_NOCONNECTION;
+ }
+ return readStatusCode(true);
+}
+
+template <class T>
static int write_location_data(Print* print, const char* name,
T latitude, T longitude,
T elevation) {
