Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Fork of M2XStreamClient by
Diff: M2XStreamClient.cpp
- Revision:
- 19:4dfa28d37b8f
- Parent:
- 17:9db4a86b876a
--- a/M2XStreamClient.cpp Mon Dec 28 14:30:33 2015 +0000
+++ b/M2XStreamClient.cpp Fri Jan 01 15:48:18 2016 +0000
@@ -1,147 +1,527 @@
#include "M2XStreamClient.h"
-static int fill_iso8601_timestamp(int32_t seconds, int32_t milli,
- char* buffer, int* length);
+#include <jsonlite.h>
+
+#include "StreamParseFunctions.h"
+#include "LocationParseFunctions.h"
+
+const char* M2XStreamClient::kDefaultM2XHost = "api-m2x.att.com";
+
+static int write_delete_values(Print* print, const char* from, const char* end);
+int print_encoded_string(Print* print, const char* str);
+int tolower(int ch);
+
+#if defined(ARDUINO_PLATFORM) || defined(MBED_PLATFORM)
+int tolower(int ch)
+{
+ // Arduino and mbed use ASCII table, so we can simplify the implementation
+ if ((ch >= 'A') && (ch <= 'Z')) {
+ return (ch + 32);
+ }
+ return ch;
+}
+#else
+// For other platform, we use libc's tolower by default
+#include <ctype.h>
+#endif
-TimeService::TimeService(M2XStreamClient* client) : _client(client) {
+M2XStreamClient::M2XStreamClient(Client* client,
+ const char* key,
+ int case_insensitive,
+ const char* host,
+ int port,
+ const char* path_prefix) : _client(client),
+ _key(key),
+ _case_insensitive(case_insensitive),
+ _host(host),
+ _port(port),
+ _path_prefix(path_prefix),
+ _null_print() {
+}
+
+int M2XStreamClient::listStreamValues(const char* deviceId, const char* streamName,
+ stream_value_read_callback callback, void* context,
+ const char* query) {
+ if (_client->connect(_host, _port)) {
+ DBGLN("%s", "Connected to M2X server!");
+ _client->print("GET ");
+ if (_path_prefix)
+ _client->print(_path_prefix);
+ _client->print("/v2/devices/");
+ print_encoded_string(_client, deviceId);
+ _client->print("/streams/");
+ print_encoded_string(_client, streamName);
+ _client->print("/values.json");
+
+ if (query) {
+ if (query[0] != '?') {
+ _client->print('?');
+ }
+ _client->print(query);
+ }
+
+ _client->println(" HTTP/1.0");
+ writeHttpHeader(-1);
+ } else {
+ DBGLN("%s", "ERROR: Cannot connect to M2X server!");
+ return E_NOCONNECTION;
+ }
+ int status = readStatusCode(false);
+ if (status == 200) {
+ readStreamValue(callback, context);
+ }
+
+ close();
+ return status;
}
-int TimeService::init() {
- _timer.start();
- return reset();
+int M2XStreamClient::readLocation(const char* deviceId,
+ location_read_callback callback,
+ void* context) {
+ if (_client->connect(_host, _port)) {
+ DBGLN("%s", "Connected to M2X server!");
+ _client->print("GET ");
+ if (_path_prefix)
+ _client->print(_path_prefix);
+ _client->print("/v2/devices/");
+ print_encoded_string(_client, deviceId);
+ _client->println("/location HTTP/1.0");
+
+ writeHttpHeader(-1);
+ } else {
+ DBGLN("%s", "ERROR: Cannot connect to M2X server!");
+ return E_NOCONNECTION;
+ }
+ int status = readStatusCode(false);
+ if (status == 200) {
+ readLocation(callback, context);
+ }
+
+ close();
+ return status;
+}
+
+int M2XStreamClient::deleteValues(const char* deviceId, const char* streamName,
+ const char* from, const char* end) {
+ if (_client->connect(_host, _port)) {
+ DBGLN("%s", "Connected to M2X server!");
+ int length = write_delete_values(&_null_print, from, end);
+ writeDeleteHeader(deviceId, streamName, length);
+ write_delete_values(_client, from, end);
+ } else {
+ DBGLN("%s", "ERROR: Cannot connect to M2X server!");
+ return E_NOCONNECTION;
+ }
+
+ return readStatusCode(true);
}
-int TimeService::reset() {
- int32_t ts;
- int status = _client->getTimestamp32(&ts);
+int M2XStreamClient::getTimestamp32(int32_t *ts) {
+ // The maximum value of signed 64-bit integer is 0x7fffffffffffffff,
+ // which is 9223372036854775807. It consists of 19 characters, so a
+ // buffer of 20 is definitely enough here
+ int length = 20;
+ char buffer[20];
+ int status = getTimestamp(buffer, &length);
+ if (status == 200) {
+ int32_t result = 0;
+ for (int i = 0; i < length; i++) {
+ result = result * 10 + (buffer[i] - '0');
+ }
+ if (ts != NULL) { *ts = result; }
+ }
+ return status;
+}
+
+int M2XStreamClient::getTimestamp(char* buffer, int *bufferLength) {
+ if (bufferLength == NULL) { return E_INVALID; }
+ if (_client->connect(_host, _port)) {
+ DBGLN("%s", "Connected to M2X server!");
+ _client->println("GET /v2/time/seconds HTTP/1.0");
- if (m2x_status_is_success(status)) {
- _server_timestamp = ts;
- _local_last_milli = _timer.read_ms();
+ writeHttpHeader(-1);
+ } else {
+ DBGLN("%s", "ERROR: Cannot connect to M2X server!");
+ return E_NOCONNECTION;
}
+ int status = readStatusCode(false);
+ if (status == 200) {
+ int length = readContentLength();
+ if (length < 0) {
+ close();
+ return length;
+ }
+ if (*bufferLength < length) {
+ *bufferLength = length;
+ return E_BUFFER_TOO_SMALL;
+ }
+ *bufferLength = length;
+ int index = skipHttpHeader();
+ if (index != E_OK) {
+ close();
+ return index;
+ }
+ index = 0;
+ while (index < length) {
+ DBG("%s", "Received Data: ");
+ while ((index < length) && _client->available()) {
+ buffer[index++] = _client->read();
+ DBG("%c", buffer[index - 1]);
+ }
+ DBGLNEND;
+ if ((!_client->connected()) &&
+ (index < length)) {
+ close();
+ return E_NOCONNECTION;
+ }
+
+ delay(200);
+ }
+ }
+ close();
return status;
}
-int TimeService::getTimestamp(char* buffer, int* length) {
- uint32_t now = _timer.read_ms();
- if (now < _local_last_milli) {
- // In case of a timestamp overflow, we reset the server timestamp recorded.
- // Notice that unlike Arduino, mbed would overflow every 30 minutes,
- // so if 2 calls to this API are more than 30 minutes apart, we are
- // likely to run into troubles. This is a limitation of the current
- // mbed platform. However, we argue that it might be a rare case that
- // 2 calls to this are 30 minutes apart. In most cases, we would call
- // this API every few seconds or minutes, this won't be a huge problem.
- // However, if you have a use case that would require 2 intervening
- // calls to this be 30 minutes apart, you might want to leverage a RTC
- // clock instead of the simple ticker here.
- int status = reset();
- if (!m2x_status_is_success(status)) { return status; }
- now = _timer.read_ms();
+static int write_delete_values(Print* print, const char* from,
+ const char* end) {
+ int bytes = 0;
+ bytes += print->print("{\"from\":\"");
+ bytes += print->print(from);
+ bytes += print->print("\",\"end\":\"");
+ bytes += print->print(end);
+ bytes += print->print("\"}");
+ return bytes;
+}
+
+// Encodes and prints string using Percent-encoding specified
+// in RFC 1738, Section 2.2
+int print_encoded_string(Print* print, const char* str) {
+ int bytes = 0;
+ for (int i = 0; str[i] != 0; i++) {
+ if (((str[i] >= 'A') && (str[i] <= 'Z')) ||
+ ((str[i] >= 'a') && (str[i] <= 'z')) ||
+ ((str[i] >= '0') && (str[i] <= '9')) ||
+ (str[i] == '-') || (str[i] == '_') ||
+ (str[i] == '.') || (str[i] == '~')) {
+ bytes += print->print(str[i]);
+ } else {
+ // Encode all other characters
+ bytes += print->print('%');
+ bytes += print->print(HEX(str[i] / 16));
+ bytes += print->print(HEX(str[i] % 16));
+ }
}
- if (now < _local_last_milli) {
- // We have already reseted the timestamp, so this cannot happen,
- // an HTTP request cannot last 30 minutes, something else must be wrong
- // here.
- return E_TIMESTAMP_ERROR;
- }
- uint32_t diff = now - _local_last_milli;
- _local_last_milli = now;
- _server_timestamp += (int32_t) (diff / 1000); // Milliseconds to seconds
- return fill_iso8601_timestamp(_server_timestamp, (int32_t) (diff % 1000),
- buffer, length);
+ return bytes;
+}
+
+void M2XStreamClient::writePutHeader(const char* deviceId,
+ const char* streamName,
+ int contentLength) {
+ _client->print("PUT ");
+ if (_path_prefix)
+ _client->print(_path_prefix);
+ _client->print("/v2/devices/");
+ print_encoded_string(_client, deviceId);
+ _client->print("/streams/");
+ print_encoded_string(_client, streamName);
+ _client->println("/value HTTP/1.0");
+
+ writeHttpHeader(contentLength);
+}
+
+void M2XStreamClient::writeDeleteHeader(const char* deviceId,
+ const char* streamName,
+ int contentLength) {
+ _client->print("DELETE ");
+ if (_path_prefix)
+ _client->print(_path_prefix);
+ _client->print("/v2/devices/");
+ print_encoded_string(_client, deviceId);
+ _client->print("/streams/");
+ print_encoded_string(_client, streamName);
+ _client->print("/values");
+ _client->println(" HTTP/1.0");
+
+ writeHttpHeader(contentLength);
}
-#define SIZE_ISO_8601 25
-static inline bool is_leap_year(int16_t y) {
- return ((1970 + y) > 0) &&
- !((1970 + y) % 4) &&
- (((1970 + y) % 100) || !((1970 + y) % 400));
+void M2XStreamClient::writeHttpHeader(int contentLength) {
+ _client->println(USER_AGENT);
+ _client->print("X-M2X-KEY: ");
+ _client->println(_key);
+
+ _client->print("Host: ");
+ print_encoded_string(_client, _host);
+ if (_port != kDefaultM2XPort) {
+ _client->print(":");
+ // port is an integer, does not need encoding
+ _client->print(_port);
+ }
+ _client->println();
+
+ if (contentLength > 0) {
+ _client->println("Content-Type: application/json");
+ DBG("%s", "Content Length: ");
+ DBGLN("%d", contentLength);
+
+ _client->print("Content-Length: ");
+ _client->println(contentLength);
+ }
+ _client->println();
}
-static inline int32_t days_in_year(int16_t y) {
- return is_leap_year(y) ? 366 : 365;
-}
-static const uint8_t MONTH_DAYS[]={31,28,31,30,31,30,31,31,30,31,30,31};
+
+int M2XStreamClient::waitForString(const char* str) {
+ int currentIndex = 0;
+ if (str[currentIndex] == '\0') return E_OK;
+
+ while (true) {
+ while (_client->available()) {
+ char c = _client->read();
+ DBG("%c", c);
+
+ int cmp;
+ if (_case_insensitive) {
+ cmp = tolower(c) - tolower(str[currentIndex]);
+ } else {
+ cmp = c - str[currentIndex];
+ }
-static int fill_iso8601_timestamp(int32_t timestamp, int32_t milli,
- char* buffer, int* length) {
- int16_t year;
- int8_t month, month_length;
- int32_t day;
- int8_t hour, minute, second;
+ if ((str[currentIndex] == '*') || (cmp == 0)) {
+ currentIndex++;
+ if (str[currentIndex] == '\0') {
+ return E_OK;
+ }
+ } else {
+ // start from the beginning
+ currentIndex = 0;
+ }
+ }
+
+ if (!_client->connected()) {
+ DBGLN("%s", "ERROR: The client is disconnected from the server!");
+
+ close();
+ return E_DISCONNECTED;
+ }
+
+ delay(1000);
+ }
+ // never reached here
+ return E_NOTREACHABLE;
+}
- if (*length < SIZE_ISO_8601) {
- *length = SIZE_ISO_8601;
- return E_BUFFER_TOO_SMALL;
+int M2XStreamClient::readStatusCode(bool closeClient) {
+ int responseCode = 0;
+ int ret = waitForString("HTTP/*.* ");
+ if (ret != E_OK) {
+ if (closeClient) close();
+ return ret;
+ }
+
+ // ret is not needed from here(since it must be E_OK), so we can use it
+ // as a regular variable now.
+ ret = 0;
+ while (true) {
+ while (_client->available()) {
+ char c = _client->read();
+ DBG("%c", c);
+
+ responseCode = responseCode * 10 + (c - '0');
+ ret++;
+ if (ret == 3) {
+ if (closeClient) close();
+ return responseCode;
+ }
+ }
+
+ if (!_client->connected()) {
+ DBGLN("%s", "ERROR: The client is disconnected from the server!");
+
+ if (closeClient) close();
+ return E_DISCONNECTED;
+ }
+
+ delay(1000);
}
- second = timestamp % 60;
- timestamp /= 60; // now it is minutes
+ // never reached here
+ return E_NOTREACHABLE;
+}
- minute = timestamp % 60;
- timestamp /= 60; // now it is hours
-
- hour = timestamp % 24;
- timestamp /= 24; // now it is days
+int M2XStreamClient::readContentLength() {
+ int ret = waitForString("Content-Length: ");
+ if (ret != E_OK) {
+ return ret;
+ }
- year = 0;
- day = 0;
- while ((day += days_in_year(year)) <= timestamp) {
- year++;
- }
- day -= days_in_year(year);
- timestamp -= day; // now it is days in this year, starting at 0
+ // From now on, ret is not needed, we can use it
+ // to keep the final result
+ ret = 0;
+ while (true) {
+ while (_client->available()) {
+ char c = _client->read();
+ DBG("%c", c);
- day = 0;
- month_length = 0;
- for (month = 0; month < 12; month++) {
- if (month == 1) {
- // February
- month_length = is_leap_year(year) ? 29 : 28;
- } else {
- month_length = MONTH_DAYS[month];
+ if ((c == '\r') || (c == '\n')) {
+ return (ret == 0) ? (E_INVALID) : (ret);
+ } else {
+ ret = ret * 10 + (c - '0');
+ }
+ }
+
+ if (!_client->connected()) {
+ DBGLN("%s", "ERROR: The client is disconnected from the server!");
+
+ return E_DISCONNECTED;
}
- if (timestamp >= month_length) {
- timestamp -= month_length;
- } else {
- break;
- }
+ delay(1000);
}
- year = 1970 + year;
- month++; // offset by 1
- day = timestamp + 1;
+
+ // never reached here
+ return E_NOTREACHABLE;
+}
+
+int M2XStreamClient::skipHttpHeader() {
+ return waitForString("\n\r\n");
+}
+
+void M2XStreamClient::close() {
+ // Eats up buffered data before closing
+ _client->flush();
+ _client->stop();
+}
+
+int M2XStreamClient::readStreamValue(stream_value_read_callback callback,
+ void* context) {
+ const int BUF_LEN = 64;
+ char buf[BUF_LEN];
+
+ int length = readContentLength();
+ if (length < 0) {
+ close();
+ return length;
+ }
- int i = 0, j = 0;
+ int index = skipHttpHeader();
+ if (index != E_OK) {
+ close();
+ return index;
+ }
+ index = 0;
+
+ stream_parsing_context_state state;
+ state.state = state.index = 0;
+ state.callback = callback;
+ state.context = context;
+
+ jsonlite_parser_callbacks cbs = jsonlite_default_callbacks;
+ cbs.key_found = on_stream_key_found;
+ cbs.number_found = on_stream_number_found;
+ cbs.string_found = on_stream_string_found;
+ cbs.context.client_state = &state;
- // NOTE: It seems the snprintf implementation in Arduino has bugs,
- // we have to manually piece the string together here.
-#define INT_TO_STR(v_, width_) \
- for (j = 0; j < (width_); j++) { \
- buffer[i + (width_) - 1 - j] = '0' + ((v_) % 10); \
- (v_) /= 10; \
- } \
- i += (width_)
+ jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5));
+ jsonlite_parser_set_callback(p, &cbs);
+
+ jsonlite_result result = jsonlite_result_unknown;
+ while (index < length) {
+ int i = 0;
+
+ DBG("%s", "Received Data: ");
+ while ((i < BUF_LEN) && _client->available()) {
+ buf[i++] = _client->read();
+ DBG("%c", buf[i - 1]);
+ }
+ DBGLNEND;
+
+ if ((!_client->connected()) &&
+ (!_client->available()) &&
+ ((index + i) < length)) {
+ jsonlite_parser_release(p);
+ close();
+ return E_NOCONNECTION;
+ }
- INT_TO_STR(year, 4);
- buffer[i++] = '-';
- INT_TO_STR(month, 2);
- buffer[i++] = '-';
- INT_TO_STR(day, 2);
- buffer[i++] = 'T';
- INT_TO_STR(hour, 2);
- buffer[i++] = ':';
- INT_TO_STR(minute, 2);
- buffer[i++] = ':';
- INT_TO_STR(second, 2);
- buffer[i++] = '.';
- INT_TO_STR(milli, 3);
- buffer[i++] = 'Z';
- buffer[i++] = '\0';
+ result = jsonlite_parser_tokenize(p, buf, i);
+ if ((result != jsonlite_result_ok) &&
+ (result != jsonlite_result_end_of_stream)) {
+ jsonlite_parser_release(p);
+ close();
+ return E_JSON_INVALID;
+ }
+
+ index += i;
+ }
+
+ jsonlite_parser_release(p);
+ close();
+ return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID);
+}
+
+int M2XStreamClient::readLocation(location_read_callback callback,
+ void* context) {
+ const int BUF_LEN = 40;
+ char buf[BUF_LEN];
+
+ int length = readContentLength();
+ if (length < 0) {
+ close();
+ return length;
+ }
+
+ int index = skipHttpHeader();
+ if (index != E_OK) {
+ close();
+ return index;
+ }
+ index = 0;
+
+ location_parsing_context_state state;
+ state.state = state.index = 0;
+ state.callback = callback;
+ state.context = context;
-#undef INT_TO_STR
+ jsonlite_parser_callbacks cbs = jsonlite_default_callbacks;
+ cbs.key_found = on_location_key_found;
+ cbs.string_found = on_location_string_found;
+ cbs.context.client_state = &state;
+
+ jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5));
+ jsonlite_parser_set_callback(p, &cbs);
+
+ jsonlite_result result = jsonlite_result_unknown;
+ while (index < length) {
+ int i = 0;
+
+ DBG("%s", "Received Data: ");
+ while ((i < BUF_LEN) && _client->available()) {
+ buf[i++] = _client->read();
+ DBG("%c", buf[i - 1]);
+ }
+ DBGLNEND;
- *length = i;
- return E_OK;
+ if ((!_client->connected()) &&
+ (!_client->available()) &&
+ ((index + i) < length)) {
+ jsonlite_parser_release(p);
+ close();
+ return E_NOCONNECTION;
+ }
+
+ result = jsonlite_parser_tokenize(p, buf, i);
+ if ((result != jsonlite_result_ok) &&
+ (result != jsonlite_result_end_of_stream)) {
+ jsonlite_parser_release(p);
+ close();
+ return E_JSON_INVALID;
+ }
+
+ index += i;
+ }
+
+ jsonlite_parser_release(p);
+ close();
+ return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID);
}
