Forked version
Fork of M2XStreamClient by
Diff: M2XStreamClient.cpp
- Revision:
- 19:4dfa28d37b8f
- Parent:
- 17:9db4a86b876a
--- a/M2XStreamClient.cpp Mon Dec 28 14:30:33 2015 +0000 +++ b/M2XStreamClient.cpp Fri Jan 01 15:48:18 2016 +0000 @@ -1,147 +1,527 @@ #include "M2XStreamClient.h" -static int fill_iso8601_timestamp(int32_t seconds, int32_t milli, - char* buffer, int* length); +#include <jsonlite.h> + +#include "StreamParseFunctions.h" +#include "LocationParseFunctions.h" + +const char* M2XStreamClient::kDefaultM2XHost = "api-m2x.att.com"; + +static int write_delete_values(Print* print, const char* from, const char* end); +int print_encoded_string(Print* print, const char* str); +int tolower(int ch); + +#if defined(ARDUINO_PLATFORM) || defined(MBED_PLATFORM) +int tolower(int ch) +{ + // Arduino and mbed use ASCII table, so we can simplify the implementation + if ((ch >= 'A') && (ch <= 'Z')) { + return (ch + 32); + } + return ch; +} +#else +// For other platform, we use libc's tolower by default +#include <ctype.h> +#endif -TimeService::TimeService(M2XStreamClient* client) : _client(client) { +M2XStreamClient::M2XStreamClient(Client* client, + const char* key, + int case_insensitive, + const char* host, + int port, + const char* path_prefix) : _client(client), + _key(key), + _case_insensitive(case_insensitive), + _host(host), + _port(port), + _path_prefix(path_prefix), + _null_print() { +} + +int M2XStreamClient::listStreamValues(const char* deviceId, const char* streamName, + stream_value_read_callback callback, void* context, + const char* query) { + if (_client->connect(_host, _port)) { + DBGLN("%s", "Connected to M2X server!"); + _client->print("GET "); + if (_path_prefix) + _client->print(_path_prefix); + _client->print("/v2/devices/"); + print_encoded_string(_client, deviceId); + _client->print("/streams/"); + print_encoded_string(_client, streamName); + _client->print("/values.json"); + + if (query) { + if (query[0] != '?') { + _client->print('?'); + } + _client->print(query); + } + + _client->println(" HTTP/1.0"); + writeHttpHeader(-1); + } else { + DBGLN("%s", "ERROR: Cannot connect to M2X server!"); + return E_NOCONNECTION; + } + int status = readStatusCode(false); + if (status == 200) { + readStreamValue(callback, context); + } + + close(); + return status; } -int TimeService::init() { - _timer.start(); - return reset(); +int M2XStreamClient::readLocation(const char* deviceId, + location_read_callback callback, + void* context) { + if (_client->connect(_host, _port)) { + DBGLN("%s", "Connected to M2X server!"); + _client->print("GET "); + if (_path_prefix) + _client->print(_path_prefix); + _client->print("/v2/devices/"); + print_encoded_string(_client, deviceId); + _client->println("/location HTTP/1.0"); + + writeHttpHeader(-1); + } else { + DBGLN("%s", "ERROR: Cannot connect to M2X server!"); + return E_NOCONNECTION; + } + int status = readStatusCode(false); + if (status == 200) { + readLocation(callback, context); + } + + close(); + return status; +} + +int M2XStreamClient::deleteValues(const char* deviceId, const char* streamName, + const char* from, const char* end) { + if (_client->connect(_host, _port)) { + DBGLN("%s", "Connected to M2X server!"); + int length = write_delete_values(&_null_print, from, end); + writeDeleteHeader(deviceId, streamName, length); + write_delete_values(_client, from, end); + } else { + DBGLN("%s", "ERROR: Cannot connect to M2X server!"); + return E_NOCONNECTION; + } + + return readStatusCode(true); } -int TimeService::reset() { - int32_t ts; - int status = _client->getTimestamp32(&ts); +int M2XStreamClient::getTimestamp32(int32_t *ts) { + // The maximum value of signed 64-bit integer is 0x7fffffffffffffff, + // which is 9223372036854775807. It consists of 19 characters, so a + // buffer of 20 is definitely enough here + int length = 20; + char buffer[20]; + int status = getTimestamp(buffer, &length); + if (status == 200) { + int32_t result = 0; + for (int i = 0; i < length; i++) { + result = result * 10 + (buffer[i] - '0'); + } + if (ts != NULL) { *ts = result; } + } + return status; +} + +int M2XStreamClient::getTimestamp(char* buffer, int *bufferLength) { + if (bufferLength == NULL) { return E_INVALID; } + if (_client->connect(_host, _port)) { + DBGLN("%s", "Connected to M2X server!"); + _client->println("GET /v2/time/seconds HTTP/1.0"); - if (m2x_status_is_success(status)) { - _server_timestamp = ts; - _local_last_milli = _timer.read_ms(); + writeHttpHeader(-1); + } else { + DBGLN("%s", "ERROR: Cannot connect to M2X server!"); + return E_NOCONNECTION; } + int status = readStatusCode(false); + if (status == 200) { + int length = readContentLength(); + if (length < 0) { + close(); + return length; + } + if (*bufferLength < length) { + *bufferLength = length; + return E_BUFFER_TOO_SMALL; + } + *bufferLength = length; + int index = skipHttpHeader(); + if (index != E_OK) { + close(); + return index; + } + index = 0; + while (index < length) { + DBG("%s", "Received Data: "); + while ((index < length) && _client->available()) { + buffer[index++] = _client->read(); + DBG("%c", buffer[index - 1]); + } + DBGLNEND; + if ((!_client->connected()) && + (index < length)) { + close(); + return E_NOCONNECTION; + } + + delay(200); + } + } + close(); return status; } -int TimeService::getTimestamp(char* buffer, int* length) { - uint32_t now = _timer.read_ms(); - if (now < _local_last_milli) { - // In case of a timestamp overflow, we reset the server timestamp recorded. - // Notice that unlike Arduino, mbed would overflow every 30 minutes, - // so if 2 calls to this API are more than 30 minutes apart, we are - // likely to run into troubles. This is a limitation of the current - // mbed platform. However, we argue that it might be a rare case that - // 2 calls to this are 30 minutes apart. In most cases, we would call - // this API every few seconds or minutes, this won't be a huge problem. - // However, if you have a use case that would require 2 intervening - // calls to this be 30 minutes apart, you might want to leverage a RTC - // clock instead of the simple ticker here. - int status = reset(); - if (!m2x_status_is_success(status)) { return status; } - now = _timer.read_ms(); +static int write_delete_values(Print* print, const char* from, + const char* end) { + int bytes = 0; + bytes += print->print("{\"from\":\""); + bytes += print->print(from); + bytes += print->print("\",\"end\":\""); + bytes += print->print(end); + bytes += print->print("\"}"); + return bytes; +} + +// Encodes and prints string using Percent-encoding specified +// in RFC 1738, Section 2.2 +int print_encoded_string(Print* print, const char* str) { + int bytes = 0; + for (int i = 0; str[i] != 0; i++) { + if (((str[i] >= 'A') && (str[i] <= 'Z')) || + ((str[i] >= 'a') && (str[i] <= 'z')) || + ((str[i] >= '0') && (str[i] <= '9')) || + (str[i] == '-') || (str[i] == '_') || + (str[i] == '.') || (str[i] == '~')) { + bytes += print->print(str[i]); + } else { + // Encode all other characters + bytes += print->print('%'); + bytes += print->print(HEX(str[i] / 16)); + bytes += print->print(HEX(str[i] % 16)); + } } - if (now < _local_last_milli) { - // We have already reseted the timestamp, so this cannot happen, - // an HTTP request cannot last 30 minutes, something else must be wrong - // here. - return E_TIMESTAMP_ERROR; - } - uint32_t diff = now - _local_last_milli; - _local_last_milli = now; - _server_timestamp += (int32_t) (diff / 1000); // Milliseconds to seconds - return fill_iso8601_timestamp(_server_timestamp, (int32_t) (diff % 1000), - buffer, length); + return bytes; +} + +void M2XStreamClient::writePutHeader(const char* deviceId, + const char* streamName, + int contentLength) { + _client->print("PUT "); + if (_path_prefix) + _client->print(_path_prefix); + _client->print("/v2/devices/"); + print_encoded_string(_client, deviceId); + _client->print("/streams/"); + print_encoded_string(_client, streamName); + _client->println("/value HTTP/1.0"); + + writeHttpHeader(contentLength); +} + +void M2XStreamClient::writeDeleteHeader(const char* deviceId, + const char* streamName, + int contentLength) { + _client->print("DELETE "); + if (_path_prefix) + _client->print(_path_prefix); + _client->print("/v2/devices/"); + print_encoded_string(_client, deviceId); + _client->print("/streams/"); + print_encoded_string(_client, streamName); + _client->print("/values"); + _client->println(" HTTP/1.0"); + + writeHttpHeader(contentLength); } -#define SIZE_ISO_8601 25 -static inline bool is_leap_year(int16_t y) { - return ((1970 + y) > 0) && - !((1970 + y) % 4) && - (((1970 + y) % 100) || !((1970 + y) % 400)); +void M2XStreamClient::writeHttpHeader(int contentLength) { + _client->println(USER_AGENT); + _client->print("X-M2X-KEY: "); + _client->println(_key); + + _client->print("Host: "); + print_encoded_string(_client, _host); + if (_port != kDefaultM2XPort) { + _client->print(":"); + // port is an integer, does not need encoding + _client->print(_port); + } + _client->println(); + + if (contentLength > 0) { + _client->println("Content-Type: application/json"); + DBG("%s", "Content Length: "); + DBGLN("%d", contentLength); + + _client->print("Content-Length: "); + _client->println(contentLength); + } + _client->println(); } -static inline int32_t days_in_year(int16_t y) { - return is_leap_year(y) ? 366 : 365; -} -static const uint8_t MONTH_DAYS[]={31,28,31,30,31,30,31,31,30,31,30,31}; + +int M2XStreamClient::waitForString(const char* str) { + int currentIndex = 0; + if (str[currentIndex] == '\0') return E_OK; + + while (true) { + while (_client->available()) { + char c = _client->read(); + DBG("%c", c); + + int cmp; + if (_case_insensitive) { + cmp = tolower(c) - tolower(str[currentIndex]); + } else { + cmp = c - str[currentIndex]; + } -static int fill_iso8601_timestamp(int32_t timestamp, int32_t milli, - char* buffer, int* length) { - int16_t year; - int8_t month, month_length; - int32_t day; - int8_t hour, minute, second; + if ((str[currentIndex] == '*') || (cmp == 0)) { + currentIndex++; + if (str[currentIndex] == '\0') { + return E_OK; + } + } else { + // start from the beginning + currentIndex = 0; + } + } + + if (!_client->connected()) { + DBGLN("%s", "ERROR: The client is disconnected from the server!"); + + close(); + return E_DISCONNECTED; + } + + delay(1000); + } + // never reached here + return E_NOTREACHABLE; +} - if (*length < SIZE_ISO_8601) { - *length = SIZE_ISO_8601; - return E_BUFFER_TOO_SMALL; +int M2XStreamClient::readStatusCode(bool closeClient) { + int responseCode = 0; + int ret = waitForString("HTTP/*.* "); + if (ret != E_OK) { + if (closeClient) close(); + return ret; + } + + // ret is not needed from here(since it must be E_OK), so we can use it + // as a regular variable now. + ret = 0; + while (true) { + while (_client->available()) { + char c = _client->read(); + DBG("%c", c); + + responseCode = responseCode * 10 + (c - '0'); + ret++; + if (ret == 3) { + if (closeClient) close(); + return responseCode; + } + } + + if (!_client->connected()) { + DBGLN("%s", "ERROR: The client is disconnected from the server!"); + + if (closeClient) close(); + return E_DISCONNECTED; + } + + delay(1000); } - second = timestamp % 60; - timestamp /= 60; // now it is minutes + // never reached here + return E_NOTREACHABLE; +} - minute = timestamp % 60; - timestamp /= 60; // now it is hours - - hour = timestamp % 24; - timestamp /= 24; // now it is days +int M2XStreamClient::readContentLength() { + int ret = waitForString("Content-Length: "); + if (ret != E_OK) { + return ret; + } - year = 0; - day = 0; - while ((day += days_in_year(year)) <= timestamp) { - year++; - } - day -= days_in_year(year); - timestamp -= day; // now it is days in this year, starting at 0 + // From now on, ret is not needed, we can use it + // to keep the final result + ret = 0; + while (true) { + while (_client->available()) { + char c = _client->read(); + DBG("%c", c); - day = 0; - month_length = 0; - for (month = 0; month < 12; month++) { - if (month == 1) { - // February - month_length = is_leap_year(year) ? 29 : 28; - } else { - month_length = MONTH_DAYS[month]; + if ((c == '\r') || (c == '\n')) { + return (ret == 0) ? (E_INVALID) : (ret); + } else { + ret = ret * 10 + (c - '0'); + } + } + + if (!_client->connected()) { + DBGLN("%s", "ERROR: The client is disconnected from the server!"); + + return E_DISCONNECTED; } - if (timestamp >= month_length) { - timestamp -= month_length; - } else { - break; - } + delay(1000); } - year = 1970 + year; - month++; // offset by 1 - day = timestamp + 1; + + // never reached here + return E_NOTREACHABLE; +} + +int M2XStreamClient::skipHttpHeader() { + return waitForString("\n\r\n"); +} + +void M2XStreamClient::close() { + // Eats up buffered data before closing + _client->flush(); + _client->stop(); +} + +int M2XStreamClient::readStreamValue(stream_value_read_callback callback, + void* context) { + const int BUF_LEN = 64; + char buf[BUF_LEN]; + + int length = readContentLength(); + if (length < 0) { + close(); + return length; + } - int i = 0, j = 0; + int index = skipHttpHeader(); + if (index != E_OK) { + close(); + return index; + } + index = 0; + + stream_parsing_context_state state; + state.state = state.index = 0; + state.callback = callback; + state.context = context; + + jsonlite_parser_callbacks cbs = jsonlite_default_callbacks; + cbs.key_found = on_stream_key_found; + cbs.number_found = on_stream_number_found; + cbs.string_found = on_stream_string_found; + cbs.context.client_state = &state; - // NOTE: It seems the snprintf implementation in Arduino has bugs, - // we have to manually piece the string together here. -#define INT_TO_STR(v_, width_) \ - for (j = 0; j < (width_); j++) { \ - buffer[i + (width_) - 1 - j] = '0' + ((v_) % 10); \ - (v_) /= 10; \ - } \ - i += (width_) + jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5)); + jsonlite_parser_set_callback(p, &cbs); + + jsonlite_result result = jsonlite_result_unknown; + while (index < length) { + int i = 0; + + DBG("%s", "Received Data: "); + while ((i < BUF_LEN) && _client->available()) { + buf[i++] = _client->read(); + DBG("%c", buf[i - 1]); + } + DBGLNEND; + + if ((!_client->connected()) && + (!_client->available()) && + ((index + i) < length)) { + jsonlite_parser_release(p); + close(); + return E_NOCONNECTION; + } - INT_TO_STR(year, 4); - buffer[i++] = '-'; - INT_TO_STR(month, 2); - buffer[i++] = '-'; - INT_TO_STR(day, 2); - buffer[i++] = 'T'; - INT_TO_STR(hour, 2); - buffer[i++] = ':'; - INT_TO_STR(minute, 2); - buffer[i++] = ':'; - INT_TO_STR(second, 2); - buffer[i++] = '.'; - INT_TO_STR(milli, 3); - buffer[i++] = 'Z'; - buffer[i++] = '\0'; + result = jsonlite_parser_tokenize(p, buf, i); + if ((result != jsonlite_result_ok) && + (result != jsonlite_result_end_of_stream)) { + jsonlite_parser_release(p); + close(); + return E_JSON_INVALID; + } + + index += i; + } + + jsonlite_parser_release(p); + close(); + return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID); +} + +int M2XStreamClient::readLocation(location_read_callback callback, + void* context) { + const int BUF_LEN = 40; + char buf[BUF_LEN]; + + int length = readContentLength(); + if (length < 0) { + close(); + return length; + } + + int index = skipHttpHeader(); + if (index != E_OK) { + close(); + return index; + } + index = 0; + + location_parsing_context_state state; + state.state = state.index = 0; + state.callback = callback; + state.context = context; -#undef INT_TO_STR + jsonlite_parser_callbacks cbs = jsonlite_default_callbacks; + cbs.key_found = on_location_key_found; + cbs.string_found = on_location_string_found; + cbs.context.client_state = &state; + + jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5)); + jsonlite_parser_set_callback(p, &cbs); + + jsonlite_result result = jsonlite_result_unknown; + while (index < length) { + int i = 0; + + DBG("%s", "Received Data: "); + while ((i < BUF_LEN) && _client->available()) { + buf[i++] = _client->read(); + DBG("%c", buf[i - 1]); + } + DBGLNEND; - *length = i; - return E_OK; + if ((!_client->connected()) && + (!_client->available()) && + ((index + i) < length)) { + jsonlite_parser_release(p); + close(); + return E_NOCONNECTION; + } + + result = jsonlite_parser_tokenize(p, buf, i); + if ((result != jsonlite_result_ok) && + (result != jsonlite_result_end_of_stream)) { + jsonlite_parser_release(p); + close(); + return E_JSON_INVALID; + } + + index += i; + } + + jsonlite_parser_release(p); + close(); + return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID); }