ARM mbed M2X API Client: The ARM mbed client library is used to send/receive data to/from AT&T's M2X service from mbed LPC1768 microcontrollers.
Dependents: m2x-demo-all M2X_MTS_ACCEL_DEMO M2X_MTS_Accel M2X_K64F_ACCEL ... more
Revision 22:4d895e732765, committed 2016-06-15
- Comitter:
- defmacro
- Date:
- Wed Jun 15 12:36:55 2016 +0000
- Parent:
- 21:6878944d2ce2
- Commit message:
- Update to new single header based implementation
Changed in this revision
diff -r 6878944d2ce2 -r 4d895e732765 Client.cpp --- a/Client.cpp Sat Jan 02 02:29:43 2016 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,108 +0,0 @@ -#include "Client.h" -#include "mbed.h" - -#include <stdint.h> - -Client::Client() : _incnt(0), _outcnt(0), _sock() { - _sock.set_blocking(false, 1500); -} - -Client::~Client() { -} - -int Client::connect(const char *host, uint16_t port) { - return _sock.connect(host, port) == 0; -} - -size_t Client::write(uint8_t b) { - return write(&b, 1); -} - -size_t Client::write(const uint8_t *buf, size_t size) { - size_t cnt = 0; - while (size) { - int tmp = sizeof(_outbuf) - _outcnt; - if (tmp > size) tmp = size; - memcpy(_outbuf + _outcnt, buf, tmp); - _outcnt += tmp; - buf += tmp; - size -= tmp; - cnt += tmp; - // if no space flush it - if (_outcnt == sizeof(_outbuf)) - _flushout(); - } - return cnt; -} - -void Client::_flushout(void) -{ - if (_outcnt > 0) { - // NOTE: we know it's dangerous to cast from (const uint8_t *) to (char *), - // but we are trying to maintain a stable interface between the Arduino - // one and the mbed one. What's more, while TCPSocketConnection has no - // intention of modifying the data here, it requires us to send a (char *) - // typed data. So we belive it's safe to do the cast here. - _sock.send_all(const_cast<char*>((const char*) _outbuf), _outcnt); - _outcnt = 0; - } -} - -void Client::_fillin(void) -{ - int tmp = sizeof(_inbuf) - _incnt; - if (tmp) { - tmp = _sock.receive_all((char*)_inbuf + _incnt, tmp); - if (tmp > 0) - _incnt += tmp; - } -} - -void Client::flush() { - _flushout(); -} - -int Client::available() { - if (_incnt == 0) { - _flushout(); - _fillin(); - } - return (_incnt > 0) ? 1 : 0; -} - -int Client::read() { - uint8_t ch; - return (read(&ch, 1) == 1) ? ch : -1; -} - -int Client::read(uint8_t *buf, size_t size) { - int cnt = 0; - while (size) { - // need more - if (size > _incnt) { - _flushout(); - _fillin(); - } - if (_incnt > 0) { - int tmp = _incnt; - if (tmp > size) tmp = size; - memcpy(buf, _inbuf, tmp); - if (tmp != _incnt) - memmove(_inbuf, _inbuf + tmp, _incnt - tmp); - _incnt -= tmp; - size -= tmp; - buf += tmp; - cnt += tmp; - } else // no data - break; - } - return cnt; -} - -void Client::stop() { - _sock.close(); -} - -uint8_t Client::connected() { - return _sock.is_connected() ? 1 : 0; -}
diff -r 6878944d2ce2 -r 4d895e732765 Client.h --- a/Client.h Sat Jan 02 02:29:43 2016 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,36 +0,0 @@ -#ifndef Client_h -#define Client_h - -#include "TCPSocketConnection.h" - -#include "Print.h" -#include "Utility.h" - -/* - * TCP Client - */ -class Client : public Print { -public: - Client(); - ~Client(); - - virtual int connect(const char *host, uint16_t port); - virtual size_t write(uint8_t); - virtual size_t write(const uint8_t *buf, size_t size); - virtual int available(); - virtual int read(); - virtual void flush(); - virtual void stop(); - virtual uint8_t connected(); -private: - virtual int read(uint8_t *buf, size_t size); - void _fillin(void); - uint8_t _inbuf[128]; - uint8_t _incnt; - void _flushout(void); - uint8_t _outbuf[128]; - uint8_t _outcnt; - TCPSocketConnection _sock; -}; - -#endif
diff -r 6878944d2ce2 -r 4d895e732765 LocationParseFunctions.h --- a/LocationParseFunctions.h Sat Jan 02 02:29:43 2016 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,100 +0,0 @@ -#ifndef LocationParseFunctions_h -#define LocationParseFunctions_h - -// Data structures and functions used to parse locations - -#define LOCATION_BUF_LEN 20 - -typedef struct { - uint16_t state; - char name_str[LOCATION_BUF_LEN + 1]; - double latitude; - double longitude; - double elevation; - char timestamp_str[LOCATION_BUF_LEN + 1]; - int index; - - location_read_callback callback; - void* context; -} location_parsing_context_state; - -#define WAITING_NAME 0x1 -#define WAITING_LATITUDE 0x2 -#define WAITING_LONGITUDE 0x4 -#define WAITING_ELEVATION 0x8 -#define WAITING_TIMESTAMP 0x10 - -#define GOT_NAME 0x20 -#define GOT_LATITUDE 0x40 -#define GOT_LONGITUDE 0x80 -#define GOT_ELEVATION 0x100 -#define GOT_TIMESTAMP 0x200 - -#define GOT_LOCATION (GOT_NAME | GOT_LATITUDE | GOT_LONGITUDE | GOT_ELEVATION | GOT_TIMESTAMP) -#define TEST_GOT_LOCATION(state_) (((state_) & GOT_LOCATION) == GOT_LOCATION) - -#define TEST_IS_NAME(state_) (((state_) & (WAITING_NAME | GOT_NAME)) == WAITING_NAME) -#define TEST_IS_LATITUDE(state_) (((state_) & (WAITING_LATITUDE | GOT_LATITUDE)) \ - == WAITING_LATITUDE) -#define TEST_IS_LONGITUDE(state_) (((state_) & (WAITING_LONGITUDE | GOT_LONGITUDE)) \ - == WAITING_LONGITUDE) -#define TEST_IS_ELEVATION(state_) (((state_) & (WAITING_ELEVATION | GOT_ELEVATION)) \ - == WAITING_ELEVATION) -#define TEST_IS_TIMESTAMP(state_) (((state_) & (WAITING_TIMESTAMP | GOT_TIMESTAMP)) \ - == WAITING_TIMESTAMP) - -static void on_location_key_found(jsonlite_callback_context* context, - jsonlite_token* token) { - location_parsing_context_state* state = - (location_parsing_context_state*) context->client_state; - if (strncmp((const char*) token->start, "waypoints", 9) == 0) { - // only parses those locations in waypoints, skip the outer one - state->state = 0; - } else if (strncmp((const char*) token->start, "name", 4) == 0) { - state->state |= WAITING_NAME; - } else if (strncmp((const char*) token->start, "latitude", 8) == 0) { - state->state |= WAITING_LATITUDE; - } else if (strncmp((const char*) token->start, "longitude", 9) == 0) { - state->state |= WAITING_LONGITUDE; - } else if (strncmp((const char*) token->start, "elevation", 9) == 0) { - state->state |= WAITING_ELEVATION; - } else if (strncmp((const char*) token->start, "timestamp", 9) == 0) { - state->state |= WAITING_TIMESTAMP; - } -} - -static void on_location_string_found(jsonlite_callback_context* context, - jsonlite_token* token) { - location_parsing_context_state* state = - (location_parsing_context_state*) context->client_state; - - if (TEST_IS_NAME(state->state)) { - strncpy(state->name_str, (const char*) token->start, - MIN(token->end - token->start, LOCATION_BUF_LEN)); - state->name_str[MIN(token->end - token->start, LOCATION_BUF_LEN)] = '\0'; - state->state |= GOT_NAME; - } else if (TEST_IS_LATITUDE(state->state)) { - state->latitude = atof((const char*) token->start); - state->state |= GOT_LATITUDE; - } else if (TEST_IS_LONGITUDE(state->state)) { - state->longitude = atof((const char*) token->start); - state->state |= GOT_LONGITUDE; - } else if (TEST_IS_ELEVATION(state->state)) { - state->elevation = atof((const char*) token->start); - state->state |= GOT_ELEVATION; - } else if (TEST_IS_TIMESTAMP(state->state)) { - strncpy(state->timestamp_str, (const char*) token->start, - MIN(token->end - token->start, LOCATION_BUF_LEN)); - state->timestamp_str[MIN(token->end - token->start, LOCATION_BUF_LEN)] = '\0'; - state->state |= GOT_TIMESTAMP; - } - - if (TEST_GOT_LOCATION(state->state)) { - state->callback(state->name_str, state->latitude, state->longitude, - state->elevation, state->timestamp_str, state->index++, - state->context); - state->state = 0; - } -} - -#endif /* LocationParseFunctions_h */
diff -r 6878944d2ce2 -r 4d895e732765 M2XStreamClient.cpp --- a/M2XStreamClient.cpp Sat Jan 02 02:29:43 2016 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,527 +0,0 @@ -#include "M2XStreamClient.h" - -#include <jsonlite.h> - -#include "StreamParseFunctions.h" -#include "LocationParseFunctions.h" - -const char* M2XStreamClient::kDefaultM2XHost = "api-m2x.att.com"; - -static int write_delete_values(Print* print, const char* from, const char* end); -int print_encoded_string(Print* print, const char* str); -int tolower(int ch); - -#if defined(ARDUINO_PLATFORM) || defined(MBED_PLATFORM) -int tolower(int ch) -{ - // Arduino and mbed use ASCII table, so we can simplify the implementation - if ((ch >= 'A') && (ch <= 'Z')) { - return (ch + 32); - } - return ch; -} -#else -// For other platform, we use libc's tolower by default -#include <ctype.h> -#endif - -M2XStreamClient::M2XStreamClient(Client* client, - const char* key, - int case_insensitive, - const char* host, - int port, - const char* path_prefix) : _client(client), - _key(key), - _case_insensitive(case_insensitive), - _host(host), - _port(port), - _path_prefix(path_prefix), - _null_print() { -} - -int M2XStreamClient::listStreamValues(const char* deviceId, const char* streamName, - stream_value_read_callback callback, void* context, - const char* query) { - if (_client->connect(_host, _port)) { - DBGLN("%s", "Connected to M2X server!"); - _client->print("GET "); - if (_path_prefix) - _client->print(_path_prefix); - _client->print("/v2/devices/"); - print_encoded_string(_client, deviceId); - _client->print("/streams/"); - print_encoded_string(_client, streamName); - _client->print("/values.json"); - - if (query) { - if (query[0] != '?') { - _client->print('?'); - } - _client->print(query); - } - - _client->println(" HTTP/1.0"); - writeHttpHeader(-1); - } else { - DBGLN("%s", "ERROR: Cannot connect to M2X server!"); - return E_NOCONNECTION; - } - int status = readStatusCode(false); - if (status == 200) { - readStreamValue(callback, context); - } - - close(); - return status; -} - -int M2XStreamClient::readLocation(const char* deviceId, - location_read_callback callback, - void* context) { - if (_client->connect(_host, _port)) { - DBGLN("%s", "Connected to M2X server!"); - _client->print("GET "); - if (_path_prefix) - _client->print(_path_prefix); - _client->print("/v2/devices/"); - print_encoded_string(_client, deviceId); - _client->println("/location HTTP/1.0"); - - writeHttpHeader(-1); - } else { - DBGLN("%s", "ERROR: Cannot connect to M2X server!"); - return E_NOCONNECTION; - } - int status = readStatusCode(false); - if (status == 200) { - readLocation(callback, context); - } - - close(); - return status; -} - -int M2XStreamClient::deleteValues(const char* deviceId, const char* streamName, - const char* from, const char* end) { - if (_client->connect(_host, _port)) { - DBGLN("%s", "Connected to M2X server!"); - int length = write_delete_values(&_null_print, from, end); - writeDeleteHeader(deviceId, streamName, length); - write_delete_values(_client, from, end); - } else { - DBGLN("%s", "ERROR: Cannot connect to M2X server!"); - return E_NOCONNECTION; - } - - return readStatusCode(true); -} - -int M2XStreamClient::getTimestamp32(int32_t *ts) { - // The maximum value of signed 64-bit integer is 0x7fffffffffffffff, - // which is 9223372036854775807. It consists of 19 characters, so a - // buffer of 20 is definitely enough here - int length = 20; - char buffer[20]; - int status = getTimestamp(buffer, &length); - if (status == 200) { - int32_t result = 0; - for (int i = 0; i < length; i++) { - result = result * 10 + (buffer[i] - '0'); - } - if (ts != NULL) { *ts = result; } - } - return status; -} - -int M2XStreamClient::getTimestamp(char* buffer, int *bufferLength) { - if (bufferLength == NULL) { return E_INVALID; } - if (_client->connect(_host, _port)) { - DBGLN("%s", "Connected to M2X server!"); - _client->println("GET /v2/time/seconds HTTP/1.0"); - - writeHttpHeader(-1); - } else { - DBGLN("%s", "ERROR: Cannot connect to M2X server!"); - return E_NOCONNECTION; - } - int status = readStatusCode(false); - if (status == 200) { - int length = readContentLength(); - if (length < 0) { - close(); - return length; - } - if (*bufferLength < length) { - *bufferLength = length; - return E_BUFFER_TOO_SMALL; - } - *bufferLength = length; - int index = skipHttpHeader(); - if (index != E_OK) { - close(); - return index; - } - index = 0; - while (index < length) { - DBG("%s", "Received Data: "); - while ((index < length) && _client->available()) { - buffer[index++] = _client->read(); - DBG("%c", buffer[index - 1]); - } - DBGLNEND; - - if ((!_client->connected()) && - (index < length)) { - close(); - return E_NOCONNECTION; - } - - delay(200); - } - } - close(); - return status; -} - -static int write_delete_values(Print* print, const char* from, - const char* end) { - int bytes = 0; - bytes += print->print("{\"from\":\""); - bytes += print->print(from); - bytes += print->print("\",\"end\":\""); - bytes += print->print(end); - bytes += print->print("\"}"); - return bytes; -} - -// Encodes and prints string using Percent-encoding specified -// in RFC 1738, Section 2.2 -int print_encoded_string(Print* print, const char* str) { - int bytes = 0; - for (int i = 0; str[i] != 0; i++) { - if (((str[i] >= 'A') && (str[i] <= 'Z')) || - ((str[i] >= 'a') && (str[i] <= 'z')) || - ((str[i] >= '0') && (str[i] <= '9')) || - (str[i] == '-') || (str[i] == '_') || - (str[i] == '.') || (str[i] == '~')) { - bytes += print->print(str[i]); - } else { - // Encode all other characters - bytes += print->print('%'); - bytes += print->print(HEX(str[i] / 16)); - bytes += print->print(HEX(str[i] % 16)); - } - } - return bytes; -} - -void M2XStreamClient::writePutHeader(const char* deviceId, - const char* streamName, - int contentLength) { - _client->print("PUT "); - if (_path_prefix) - _client->print(_path_prefix); - _client->print("/v2/devices/"); - print_encoded_string(_client, deviceId); - _client->print("/streams/"); - print_encoded_string(_client, streamName); - _client->println("/value HTTP/1.0"); - - writeHttpHeader(contentLength); -} - -void M2XStreamClient::writeDeleteHeader(const char* deviceId, - const char* streamName, - int contentLength) { - _client->print("DELETE "); - if (_path_prefix) - _client->print(_path_prefix); - _client->print("/v2/devices/"); - print_encoded_string(_client, deviceId); - _client->print("/streams/"); - print_encoded_string(_client, streamName); - _client->print("/values"); - _client->println(" HTTP/1.0"); - - writeHttpHeader(contentLength); -} - -void M2XStreamClient::writeHttpHeader(int contentLength) { - _client->println(USER_AGENT); - _client->print("X-M2X-KEY: "); - _client->println(_key); - - _client->print("Host: "); - print_encoded_string(_client, _host); - if (_port != kDefaultM2XPort) { - _client->print(":"); - // port is an integer, does not need encoding - _client->print(_port); - } - _client->println(); - - if (contentLength > 0) { - _client->println("Content-Type: application/json"); - DBG("%s", "Content Length: "); - DBGLN("%d", contentLength); - - _client->print("Content-Length: "); - _client->println(contentLength); - } - _client->println(); -} - -int M2XStreamClient::waitForString(const char* str) { - int currentIndex = 0; - if (str[currentIndex] == '\0') return E_OK; - - while (true) { - while (_client->available()) { - char c = _client->read(); - DBG("%c", c); - - int cmp; - if (_case_insensitive) { - cmp = tolower(c) - tolower(str[currentIndex]); - } else { - cmp = c - str[currentIndex]; - } - - if ((str[currentIndex] == '*') || (cmp == 0)) { - currentIndex++; - if (str[currentIndex] == '\0') { - return E_OK; - } - } else { - // start from the beginning - currentIndex = 0; - } - } - - if (!_client->connected()) { - DBGLN("%s", "ERROR: The client is disconnected from the server!"); - - close(); - return E_DISCONNECTED; - } - - delay(1000); - } - // never reached here - return E_NOTREACHABLE; -} - -int M2XStreamClient::readStatusCode(bool closeClient) { - int responseCode = 0; - int ret = waitForString("HTTP/*.* "); - if (ret != E_OK) { - if (closeClient) close(); - return ret; - } - - // ret is not needed from here(since it must be E_OK), so we can use it - // as a regular variable now. - ret = 0; - while (true) { - while (_client->available()) { - char c = _client->read(); - DBG("%c", c); - - responseCode = responseCode * 10 + (c - '0'); - ret++; - if (ret == 3) { - if (closeClient) close(); - return responseCode; - } - } - - if (!_client->connected()) { - DBGLN("%s", "ERROR: The client is disconnected from the server!"); - - if (closeClient) close(); - return E_DISCONNECTED; - } - - delay(1000); - } - - // never reached here - return E_NOTREACHABLE; -} - -int M2XStreamClient::readContentLength() { - int ret = waitForString("Content-Length: "); - if (ret != E_OK) { - return ret; - } - - // From now on, ret is not needed, we can use it - // to keep the final result - ret = 0; - while (true) { - while (_client->available()) { - char c = _client->read(); - DBG("%c", c); - - if ((c == '\r') || (c == '\n')) { - return (ret == 0) ? (E_INVALID) : (ret); - } else { - ret = ret * 10 + (c - '0'); - } - } - - if (!_client->connected()) { - DBGLN("%s", "ERROR: The client is disconnected from the server!"); - - return E_DISCONNECTED; - } - - delay(1000); - } - - // never reached here - return E_NOTREACHABLE; -} - -int M2XStreamClient::skipHttpHeader() { - return waitForString("\n\r\n"); -} - -void M2XStreamClient::close() { - // Eats up buffered data before closing - _client->flush(); - _client->stop(); -} - -int M2XStreamClient::readStreamValue(stream_value_read_callback callback, - void* context) { - const int BUF_LEN = 64; - char buf[BUF_LEN]; - - int length = readContentLength(); - if (length < 0) { - close(); - return length; - } - - int index = skipHttpHeader(); - if (index != E_OK) { - close(); - return index; - } - index = 0; - - stream_parsing_context_state state; - state.state = state.index = 0; - state.callback = callback; - state.context = context; - - jsonlite_parser_callbacks cbs = jsonlite_default_callbacks; - cbs.key_found = on_stream_key_found; - cbs.number_found = on_stream_number_found; - cbs.string_found = on_stream_string_found; - cbs.context.client_state = &state; - - jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5)); - jsonlite_parser_set_callback(p, &cbs); - - jsonlite_result result = jsonlite_result_unknown; - while (index < length) { - int i = 0; - - DBG("%s", "Received Data: "); - while ((i < BUF_LEN) && _client->available()) { - buf[i++] = _client->read(); - DBG("%c", buf[i - 1]); - } - DBGLNEND; - - if ((!_client->connected()) && - (!_client->available()) && - ((index + i) < length)) { - jsonlite_parser_release(p); - close(); - return E_NOCONNECTION; - } - - result = jsonlite_parser_tokenize(p, buf, i); - if ((result != jsonlite_result_ok) && - (result != jsonlite_result_end_of_stream)) { - jsonlite_parser_release(p); - close(); - return E_JSON_INVALID; - } - - index += i; - } - - jsonlite_parser_release(p); - close(); - return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID); -} - -int M2XStreamClient::readLocation(location_read_callback callback, - void* context) { - const int BUF_LEN = 40; - char buf[BUF_LEN]; - - int length = readContentLength(); - if (length < 0) { - close(); - return length; - } - - int index = skipHttpHeader(); - if (index != E_OK) { - close(); - return index; - } - index = 0; - - location_parsing_context_state state; - state.state = state.index = 0; - state.callback = callback; - state.context = context; - - jsonlite_parser_callbacks cbs = jsonlite_default_callbacks; - cbs.key_found = on_location_key_found; - cbs.string_found = on_location_string_found; - cbs.context.client_state = &state; - - jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5)); - jsonlite_parser_set_callback(p, &cbs); - - jsonlite_result result = jsonlite_result_unknown; - while (index < length) { - int i = 0; - - DBG("%s", "Received Data: "); - while ((i < BUF_LEN) && _client->available()) { - buf[i++] = _client->read(); - DBG("%c", buf[i - 1]); - } - DBGLNEND; - - if ((!_client->connected()) && - (!_client->available()) && - ((index + i) < length)) { - jsonlite_parser_release(p); - close(); - return E_NOCONNECTION; - } - - result = jsonlite_parser_tokenize(p, buf, i); - if ((result != jsonlite_result_ok) && - (result != jsonlite_result_end_of_stream)) { - jsonlite_parser_release(p); - close(); - return E_JSON_INVALID; - } - - index += i; - } - - jsonlite_parser_release(p); - close(); - return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID); -}
diff -r 6878944d2ce2 -r 4d895e732765 M2XStreamClient.h --- a/M2XStreamClient.h Sat Jan 02 02:29:43 2016 +0000 +++ b/M2XStreamClient.h Wed Jun 15 12:36:55 2016 +0000 @@ -1,46 +1,38 @@ #ifndef M2XStreamClient_h #define M2XStreamClient_h -#define MIN(a, b) (((a) > (b))?(b):(a)) - -#define MBED_PLATFORM - -#ifdef ARDUINO_PLATFORM -#include "Arduino.h" - -#define USER_AGENT "User-Agent: M2X Arduino Client/2.0.2" -#endif - -#ifdef MBED_PLATFORM -#include "mbed.h" - -#define USER_AGENT "User-Agent: M2X Mbed Client/2.1.2" +#if (!defined(ARDUINO_PLATFORM)) && (!defined(ESP8266_PLATFORM)) && (!defined(MBED_PLATFORM)) +#error "Platform definition is missing!" #endif -#include "Client.h" -#include "NullPrint.h" +#define M2X_VERSION "2.2.0" -#ifdef DEBUG #ifdef ARDUINO_PLATFORM -#define DBG(fmt_, data_) Serial.print(data_) -#define DBGLN(fmt_, data_) Serial.println(data_) -#define DBGLNEND Serial.println() -#endif // ARDUINO_PLATFORM +#include "m2x-arduino.h" +#endif /* ARDUINO_PLATFORM */ + +#ifdef ESP8266_PLATFORM +#include "m2x-esp8266.h" +#endif /* ESP8266_PLATFORM */ #ifdef MBED_PLATFORM -#define DBG(fmt_, data_) printf((fmt_), (data_)) -#define DBGLN(fmt_, data_) printf((fmt_), (data_)); printf("\n") -#define DBGLNEND printf("\n") -#endif // MBED_PLATFORM -#else +#include "m2x-mbed.h" +#endif /* MBED_PLATFORM */ + +/* If we don't have DBG defined, provide dump implementation */ +#ifndef DBG #define DBG(fmt_, data_) #define DBGLN(fmt_, data_) #define DBGLNEND -#endif // DEBUG +#endif /* DBG */ -#define HEX(t_) ((char) (((t_) > 9) ? ((t_) - 10 + 'A') : ((t_) + '0'))) +#define MIN(a, b) (((a) > (b))?(b):(a)) +#define TO_HEX(t_) ((char) (((t_) > 9) ? ((t_) - 10 + 'A') : ((t_) + '0'))) #define MAX_DOUBLE_DIGITS 7 +/* For tolower */ +#include <ctype.h> + static const int E_OK = 0; static const int E_NOCONNECTION = -1; static const int E_DISCONNECTED = -2; @@ -50,6 +42,9 @@ static const int E_BUFFER_TOO_SMALL = -6; static const int E_TIMESTAMP_ERROR = -8; +static const char* DEFAULT_M2X_HOST = "api-m2x.att.com"; +static const int DEFAULT_M2X_PORT = 80; + static inline bool m2x_status_is_success(int status) { return (status == E_OK) || (status >= 200 && status <= 299); } @@ -67,6 +62,44 @@ m2x_status_is_server_error(status); } +// Null Print class used to calculate length to print +class NullPrint : public Print { +public: + size_t counter; + + virtual size_t write(uint8_t b) { + counter++; + return 1; + } + + virtual size_t write(const uint8_t* buf, size_t size) { + counter += size; + return size; + } +}; + +// Encodes and prints string using Percent-encoding specified +// in RFC 1738, Section 2.2 +static inline int print_encoded_string(Print* print, const char* str) { + int bytes = 0; + for (int i = 0; str[i] != 0; i++) { + if (((str[i] >= 'A') && (str[i] <= 'Z')) || + ((str[i] >= 'a') && (str[i] <= 'z')) || + ((str[i] >= '0') && (str[i] <= '9')) || + (str[i] == '-') || (str[i] == '_') || + (str[i] == '.') || (str[i] == '~')) { + bytes += print->print(str[i]); + } else { + // Encode all other characters + bytes += print->print('%'); + bytes += print->print(TO_HEX(str[i] / 16)); + bytes += print->print(TO_HEX(str[i] % 16)); + } + } + return bytes; +} + +#ifdef M2X_ENABLE_READER /* * +type+ indicates the value type: 1 for string, 2 for number * NOTE that the value type here only contains a hint on how @@ -88,19 +121,27 @@ int index, void* context); +typedef void (*m2x_command_read_callback)(const char* id, + const char* name, + int index, + void *context); +#endif /* M2X_ENABLE_READER */ + +typedef void (*m2x_fill_data_callback)(Print *print, void *context); + class M2XStreamClient { public: - static const char* kDefaultM2XHost; - static const int kDefaultM2XPort = 80; - M2XStreamClient(Client* client, const char* key, + void (* idlefunc)(void) = NULL, int case_insensitive = 1, - const char* host = kDefaultM2XHost, - int port = kDefaultM2XPort, + const char* host = DEFAULT_M2X_HOST, + int port = DEFAULT_M2X_PORT, const char* path_prefix = NULL); // Push data stream value using PUT request, returns the HTTP status code + // NOTE: if you want to update by a serial, use "serial/<serial ID>" as + // the device ID here. template <class T> int updateStreamValue(const char* deviceId, const char* streamName, T value); @@ -122,6 +163,8 @@ // stream, the succeeding +counts[1]+ number of items contain values // for the second stream, etc. The length of this array should be // the sum of all values in +counts+ array. + // NOTE: if you want to update by a serial, use "serial/<serial ID>" as + // the device ID here. template <class T> int postDeviceUpdates(const char* deviceId, int streamNum, const char* names[], const int counts[], @@ -136,11 +179,14 @@ // be exactly +streamNum+. Notice that the array of +values+ should // match the array of +names+, and that the ith value in +values+ is // exactly the value to post for the ith stream name in +names+ + // NOTE: if you want to update by a serial, use "serial/<serial ID>" as + // the device ID here. template <class T> - int postSingleDeviceUpdate(const char* deviceId, int streamNum, - const char* names[], T values[], - const char* at = NULL); + int postDeviceUpdate(const char* deviceId, int streamNum, + const char* names[], T values[], + const char* at = NULL); +#ifdef M2X_ENABLE_READER // Fetch values for a particular data stream. Since memory is // very limited on an Arduino, we cannot parse and get all the // data points in memory. Instead, we use callbacks here: whenever @@ -156,6 +202,7 @@ int listStreamValues(const char* deviceId, const char* streamName, stream_value_read_callback callback, void* context, const char* query = NULL); +#endif /* M2X_ENABLE_READER */ // Update datasource location // NOTE: On an Arduino Uno and other ATMEGA based boards, double has @@ -171,15 +218,19 @@ // precision, which means you are free to use the double-version only // without any precision problems. // Returned value is the http status code. + // NOTE: if you want to update by a serial, use "serial/<serial ID>" as + // the device ID here. template <class T> int updateLocation(const char* deviceId, const char* name, T latitude, T longitude, T elevation); +#ifdef M2X_ENABLE_READER // Read location information for a device. Also used callback to process // data points for memory reasons. The HTTP status code is returned, // response is only parsed when the HTTP status code is 200 int readLocation(const char* deviceId, location_read_callback callback, void* context); +#endif /* M2X_ENABLE_READER */ // Delete values from a data stream // You will need to provide from and end date/time strings in the ISO8601 @@ -198,6 +249,53 @@ int deleteValues(const char* deviceId, const char* streamName, const char* from, const char* end); +#ifdef M2X_ENABLE_READER + // Fetch commands available for this device, notice that for memory constraints, + // we only keep ID and name of the command received here. + // You can tweak the command receiving via the query parameter. + int listCommands(const char* deviceId, + m2x_command_read_callback callback, void* context, + const char* query = NULL); +#endif /* M2X_ENABLE_READER */ + + // Mark a command as processed. + // Link: https://m2x.att.com/developer/documentation/v2/commands#Device-Marks-a-Command-as-Processed + // To make sure the minimal amount of memory is needed, this API works with + // a callback function. The callback function is then used to fill the request + // data, note that in order to correctly set the content length in HTTP header, + // the callback function will be called twice, the caller must make sure both + // calls fill the Print object with exactly the same data. + // If you have a pre-allocated buffer filled with the data to post, you can + // use markCommandProcessedWithData API below. + int markCommandProcessed(const char* deviceId, const char* commandId, + m2x_fill_data_callback callback, void *context); + + // Mark a command as processed with a data buffer + // Link: https://m2x.att.com/developer/documentation/v2/commands#Device-Marks-a-Command-as-Processed + // This is exactly like markCommandProcessed, except that a buffer is use to + // contain the data to post as the request body. + int markCommandProcessedWithData(const char* deviceId, const char* commandId, + const char* data); + + // Mark a command as rejected. + // Link: https://m2x.att.com/developer/documentation/v2/commands#Device-Marks-a-Command-as-Rejected + // To make sure the minimal amount of memory is needed, this API works with + // a callback function. The callback function is then used to fill the request + // data, note that in order to correctly set the content length in HTTP header, + // the callback function will be called twice, the caller must make sure both + // calls fill the Print object with exactly the same data. + // If you have a pre-allocated buffer filled with the data to post, you can + // use markCommandRejectedWithData API below. + int markCommandRejected(const char* deviceId, const char* commandId, + m2x_fill_data_callback callback, void *context); + + // Mark a command as rejected with a data buffer + // Link: https://m2x.att.com/developer/documentation/v2/commands#Device-Marks-a-Command-as-Rejected + // This is exactly like markCommandRejected, except that a buffer is use to + // contain the data to post as the request body. + int markCommandRejectedWithData(const char* deviceId, const char* commandId, + const char* data); + // Fetches current timestamp in seconds from M2X server. Since we // are using signed 32-bit integer as return value, this will only // return valid results before 03:14:07 UTC on 19 January 2038. If @@ -252,6 +350,7 @@ int _case_insensitive; const char* _host; int _port; + void (* _idlefunc)(void); const char* _path_prefix; NullPrint _null_print; @@ -282,15 +381,1275 @@ int waitForString(const char* str); // Closes the connection void close(); + +#ifdef M2X_ENABLE_READER // Parses JSON response of stream value API, and calls callback function // once we get a data point int readStreamValue(stream_value_read_callback callback, void* context); // Parses JSON response of location API, and calls callback function once // we get a data point int readLocation(location_read_callback callback, void* context); + // Parses JSON response of command API, and calls callback function once + // we get a data point + int readCommand(m2x_command_read_callback callback, void* context); +#endif /* M2X_ENABLE_READER */ +}; + + +// A ISO8601 timestamp generation service for M2X. +// It uses the Time API provided by the M2X server to initialize +// clock, then uses millis() function provided by Arduino to calculate +// time advancements so as to reduce API query times. +// +// Right now, this service only works with 32-bit timestamp, meaning that +// this service won't work after 03:14:07 UTC on 19 January 2038. However, +// a similar service that uses 64-bit timestamp can be implemented following +// the logic here. +class TimeService { +public: + TimeService(M2XStreamClient* client); + + // Initialize the time service. Notice the TimeService instance is only + // working after calling this function successfully. + int init(); + + // Reset the internal recorded time by calling M2X Time API again. Normally, + // you don't need to call this manually. TimeService will handle Arduino clock + // overflow automatically + int reset(); + + // Fills ISO8601 formatted timestamp into the buffer provided. +length+ should + // contains the maximum supported length of the buffer when calling. For now, + // the buffer should be able to store 25 characters for a full ISO8601 formatted + // timestamp, otherwise, an error will be returned. + int getTimestamp(char* buffer, int* length); +private: + M2XStreamClient* _client; + int32_t _server_timestamp; + uint32_t _local_last_milli; + M2XTimer _timer; }; -#include "M2XStreamClient_template.h" -#include "TimeService.h" + +// Implementations +M2XStreamClient::M2XStreamClient(Client* client, + const char* key, + void (* idlefunc)(void), + int case_insensitive, + const char* host, + int port, + const char* path_prefix) : _client(client), + _key(key), + _idlefunc(idlefunc), + _case_insensitive(case_insensitive), + _host(host), + _port(port), + _path_prefix(path_prefix), + _null_print() { +} + +template <class T> +int M2XStreamClient::updateStreamValue(const char* deviceId, const char* streamName, T value) { + if (_client->connect(_host, _port)) { + DBGLN("%s", "Connected to M2X server!"); + writePutHeader(deviceId, streamName, + // for {"value": and } + _null_print.print(value) + 12); + _client->print("{\"value\":\""); + _client->print(value); + _client->print("\"}"); + } else { + DBGLN("%s", "ERROR: Cannot connect to M2X server!"); + return E_NOCONNECTION; + } + + return readStatusCode(true); +} + +template <class T> +inline int write_multiple_values(Print* print, int streamNum, + const char* names[], const int counts[], + const char* ats[], T values[]) { + int bytes = 0, value_index = 0; + bytes += print->print("{\"values\":{"); + for (int i = 0; i < streamNum; i++) { + bytes += print->print("\""); + bytes += print->print(names[i]); + bytes += print->print("\":["); + for (int j = 0; j < counts[i]; j++) { + bytes += print->print("{\"timestamp\": \""); + bytes += print->print(ats[value_index]); + bytes += print->print("\",\"value\": \""); + bytes += print->print(values[value_index]); + bytes += print->print("\"}"); + if (j < counts[i] - 1) { bytes += print->print(","); } + value_index++; + } + bytes += print->print("]"); + if (i < streamNum - 1) { bytes += print->print(","); } + } + bytes += print->print("}}"); + return bytes; +} + +template <class T> +int M2XStreamClient::postDeviceUpdates(const char* deviceId, int streamNum, + const char* names[], const int counts[], + const char* ats[], T values[]) { + if (_client->connect(_host, _port)) { + DBGLN("%s", "Connected to M2X server!"); + int length = write_multiple_values(&_null_print, streamNum, names, + counts, ats, values); + _client->print("POST "); + if (_path_prefix) { _client->print(_path_prefix); } + _client->print("/v2/devices/"); + _client->print(deviceId); + _client->println("/updates HTTP/1.0"); + writeHttpHeader(length); + write_multiple_values(_client, streamNum, names, counts, ats, values); + } else { + DBGLN("%s", "ERROR: Cannot connect to M2X server!"); + return E_NOCONNECTION; + } + return readStatusCode(true); +} + +template <class T> +inline int write_single_device_values(Print* print, int streamNum, + const char* names[], T values[], + const char* at) { + int bytes = 0; + bytes += print->print("{\"values\":{"); + for (int i = 0; i < streamNum; i++) { + bytes += print->print("\""); + bytes += print->print(names[i]); + bytes += print->print("\": \""); + bytes += print->print(values[i]); + bytes += print->print("\""); + if (i < streamNum - 1) { bytes += print->print(","); } + } + bytes += print->print("}"); + if (at != NULL) { + bytes += print->print(",\"timestamp\":\""); + bytes += print->print(at); + bytes += print->print("\""); + } + bytes += print->print("}"); + return bytes; +} + +template <class T> +int M2XStreamClient::postDeviceUpdate(const char* deviceId, int streamNum, + const char* names[], T values[], + const char* at) { + if (_client->connect(_host, _port)) { + DBGLN("%s", "Connected to M2X server!"); + int length = write_single_device_values(&_null_print, streamNum, names, + values, at); + _client->print("POST "); + if (_path_prefix) { _client->print(_path_prefix); } + _client->print("/v2/devices/"); + _client->print(deviceId); + _client->println("/update HTTP/1.0"); + writeHttpHeader(length); + write_single_device_values(_client, streamNum, names, values, at); + } else { + DBGLN("%s", "ERROR: Cannot connect to M2X server!"); + return E_NOCONNECTION; + } + return readStatusCode(true); +} + +template <class T> +static int write_location_data(Print* print, const char* name, + T latitude, T longitude, + T elevation) { + int bytes = 0; + bytes += print->print("{\"name\":\""); + bytes += print->print(name); + bytes += print->print("\",\"latitude\":\""); + bytes += print->print(latitude); + bytes += print->print("\",\"longitude\":\""); + bytes += print->print(longitude); + bytes += print->print("\",\"elevation\":\""); + bytes += print->print(elevation); + bytes += print->print("\"}"); + return bytes; +} + +static int write_location_data(Print* print, const char* name, + double latitude, double longitude, + double elevation) { + int bytes = 0; + bytes += print->print("{\"name\":\""); + bytes += print->print(name); + bytes += print->print("\",\"latitude\":\""); + bytes += print->print(latitude, MAX_DOUBLE_DIGITS); + bytes += print->print("\",\"longitude\":\""); + bytes += print->print(longitude, MAX_DOUBLE_DIGITS); + bytes += print->print("\",\"elevation\":\""); + bytes += print->print(elevation); + bytes += print->print("\"}"); + return bytes; +} + +template <class T> +int M2XStreamClient::updateLocation(const char* deviceId, + const char* name, + T latitude, + T longitude, + T elevation) { + if (_client->connect(_host, _port)) { + DBGLN("%s", "Connected to M2X server!"); + + int length = write_location_data(&_null_print, name, latitude, longitude, + elevation); + _client->print("PUT "); + if (_path_prefix) { _client->print(_path_prefix); } + _client->print("/v2/devices/"); + _client->print(deviceId); + _client->println("/location HTTP/1.0"); + + writeHttpHeader(length); + write_location_data(_client, name, latitude, longitude, elevation); + } else { + DBGLN("%s", "ERROR: Cannot connect to M2X server!"); + return E_NOCONNECTION; + } + return readStatusCode(true); +} + +static inline int write_delete_values(Print* print, const char* from, + const char* end) { + int bytes = 0; + bytes += print->print("{\"from\":\""); + bytes += print->print(from); + bytes += print->print("\",\"end\":\""); + bytes += print->print(end); + bytes += print->print("\"}"); + return bytes; +} + +int M2XStreamClient::deleteValues(const char* deviceId, const char* streamName, + const char* from, const char* end) { + if (_client->connect(_host, _port)) { + DBGLN("%s", "Connected to M2X server!"); + int length = write_delete_values(&_null_print, from, end); + writeDeleteHeader(deviceId, streamName, length); + write_delete_values(_client, from, end); + } else { + DBGLN("%s", "ERROR: Cannot connect to M2X server!"); + return E_NOCONNECTION; + } + + return readStatusCode(true); +} + +int M2XStreamClient::markCommandProcessed(const char* deviceId, + const char* commandId, + m2x_fill_data_callback callback, + void *context) { + if (_client->connect(_host, _port)) { + DBGLN("%s", "Connected to M2X server!"); + _null_print.counter = 0; + callback(&_null_print, context); + int length = _null_print.counter; + _client->print("POST "); + if (_path_prefix) { _client->print(_path_prefix); } + _client->print("/v2/devices/"); + _client->print(deviceId); + _client->print("/commands/"); + _client->print(commandId); + _client->print("/process"); + _client->println(" HTTP/1.0"); + writeHttpHeader(length); + callback(_client, context); + } else { + DBGLN("%s", "ERROR: Cannot connect to M2X server!"); + return E_NOCONNECTION; + } + return readStatusCode(true); +} + +static void m2x_fixed_buffer_filling_callback(Print *print, void *context) { + if (context) { + print->print((const char *) context); + } +} + +int M2XStreamClient::markCommandProcessedWithData(const char* deviceId, + const char* commandId, + const char* data) { + return markCommandProcessed(deviceId, commandId, + m2x_fixed_buffer_filling_callback, (void *) data); +} + +int M2XStreamClient::markCommandRejected(const char* deviceId, + const char* commandId, + m2x_fill_data_callback callback, + void *context) { + if (_client->connect(_host, _port)) { + DBGLN("%s", "Connected to M2X server!"); + _null_print.counter = 0; + callback(&_null_print, context); + int length = _null_print.counter; + _client->print("POST "); + if (_path_prefix) { _client->print(_path_prefix); } + _client->print("/v2/devices/"); + _client->print(deviceId); + _client->print("/commands/"); + _client->print(commandId); + _client->print("/reject"); + _client->println(" HTTP/1.0"); + writeHttpHeader(length); + callback(_client, context); + } else { + DBGLN("%s", "ERROR: Cannot connect to M2X server!"); + return E_NOCONNECTION; + } + return readStatusCode(true); +} + +int M2XStreamClient::markCommandRejectedWithData(const char* deviceId, + const char* commandId, + const char* data) { + return markCommandRejected(deviceId, commandId, + m2x_fixed_buffer_filling_callback, (void *) data); +} + +int M2XStreamClient::getTimestamp32(int32_t *ts) { + // The maximum value of signed 64-bit integer is 0x7fffffffffffffff, + // which is 9223372036854775807. It consists of 19 characters, so a + // buffer of 20 is definitely enough here + int length = 20; + char buffer[20]; + int status = getTimestamp(buffer, &length); + if (status == 200) { + int32_t result = 0; + for (int i = 0; i < length; i++) { + result = result * 10 + (buffer[i] - '0'); + } + if (ts != NULL) { *ts = result; } + } + return status; +} + +int M2XStreamClient::getTimestamp(char* buffer, int *bufferLength) { + if (bufferLength == NULL) { return E_INVALID; } + if (_client->connect(_host, _port)) { + DBGLN("%s", "Connected to M2X server!"); + _client->print("GET "); + if (_path_prefix) { _client->print(_path_prefix); } + _client->println("/v2/time/seconds HTTP/1.0"); + + writeHttpHeader(-1); + } else { + DBGLN("%s", "ERROR: Cannot connect to M2X server!"); + return E_NOCONNECTION; + } + int status = readStatusCode(false); + if (status == 200) { + int length = readContentLength(); + if (length < 0) { + close(); + return length; + } + if (*bufferLength < length) { + *bufferLength = length; + return E_BUFFER_TOO_SMALL; + } + *bufferLength = length; + int index = skipHttpHeader(); + if (index != E_OK) { + close(); + return index; + } + index = 0; + while (index < length) { + DBG("%s", "Received Data: "); + while ((index < length) && _client->available()) { + buffer[index++] = _client->read(); + DBG("%c", buffer[index - 1]); + } + DBGLNEND; + + if ((!_client->connected()) && + (index < length)) { + close(); + return E_NOCONNECTION; + } + + if (_idlefunc!=NULL) { + _idlefunc(); + } else { + delay(200); + } + } + } + close(); + return status; +} + + +void M2XStreamClient::writePutHeader(const char* deviceId, + const char* streamName, + int contentLength) { + _client->print("PUT "); + if (_path_prefix) { _client->print(_path_prefix); } + _client->print("/v2/devices/"); + _client->print(deviceId); + _client->print("/streams/"); + print_encoded_string(_client, streamName); + _client->println("/value HTTP/1.0"); + + writeHttpHeader(contentLength); +} + +void M2XStreamClient::writeDeleteHeader(const char* deviceId, + const char* streamName, + int contentLength) { + _client->print("DELETE "); + if (_path_prefix) { _client->print(_path_prefix); } + _client->print("/v2/devices/"); + _client->print(deviceId); + _client->print("/streams/"); + print_encoded_string(_client, streamName); + _client->print("/values"); + _client->println(" HTTP/1.0"); + + writeHttpHeader(contentLength); +} + +void M2XStreamClient::writeHttpHeader(int contentLength) { + _client->println(USER_AGENT); + _client->print("X-M2X-KEY: "); + _client->println(_key); + + _client->print("Host: "); + print_encoded_string(_client, _host); + if (_port != DEFAULT_M2X_PORT) { + _client->print(":"); + // port is an integer, does not need encoding + _client->print(_port); + } + _client->println(); + + if (contentLength > 0) { + _client->println("Content-Type: application/json"); + DBG("%s", "Content Length: "); + DBGLN("%d", contentLength); + + _client->print("Content-Length: "); + _client->println(contentLength); + } + _client->println(); +} + +int M2XStreamClient::waitForString(const char* str) { + int currentIndex = 0; + if (str[currentIndex] == '\0') return E_OK; + + while (true) { + while (_client->available()) { + char c = _client->read(); + DBG("%c", c); + + int cmp; + if (_case_insensitive) { + cmp = tolower(c) - tolower(str[currentIndex]); + } else { + cmp = c - str[currentIndex]; + } + + if ((str[currentIndex] == '*') || (cmp == 0)) { + currentIndex++; + if (str[currentIndex] == '\0') { + return E_OK; + } + } else { + // start from the beginning + currentIndex = 0; + } + } + + if (!_client->connected()) { + DBGLN("%s", "ERROR: The client is disconnected from the server!"); + + close(); + return E_DISCONNECTED; + } + if (_idlefunc!=NULL) + _idlefunc(); + else + delay(200); + } + // never reached here + return E_NOTREACHABLE; +} + +int M2XStreamClient::readStatusCode(bool closeClient) { + int responseCode = 0; + int ret = waitForString("HTTP/*.* "); + if (ret != E_OK) { + if (closeClient) close(); + return ret; + } + + // ret is not needed from here(since it must be E_OK), so we can use it + // as a regular variable now. + ret = 0; + while (true) { + while (_client->available()) { + char c = _client->read(); + DBG("%c", c); + + responseCode = responseCode * 10 + (c - '0'); + ret++; + if (ret == 3) { + if (closeClient) close(); + return responseCode; + } + } + + if (!_client->connected()) { + DBGLN("%s", "ERROR: The client is disconnected from the server!"); + + if (closeClient) close(); + return E_DISCONNECTED; + } + if (_idlefunc!=NULL) + _idlefunc(); + else + delay(200); + } + + // never reached here + return E_NOTREACHABLE; +} + +int M2XStreamClient::readContentLength() { + int ret = waitForString("Content-Length: "); + if (ret != E_OK) { + return ret; + } + + // From now on, ret is not needed, we can use it + // to keep the final result + ret = 0; + while (true) { + while (_client->available()) { + char c = _client->read(); + DBG("%c", c); + + if ((c == '\r') || (c == '\n')) { + return (ret == 0) ? (E_INVALID) : (ret); + } else { + ret = ret * 10 + (c - '0'); + } + } + + if (!_client->connected()) { + DBGLN("%s", "ERROR: The client is disconnected from the server!"); + + return E_DISCONNECTED; + } + if (_idlefunc!=NULL) + _idlefunc(); + else + delay(200); + } + + // never reached here + return E_NOTREACHABLE; +} + +int M2XStreamClient::skipHttpHeader() { + return waitForString("\n\r\n"); +} + +void M2XStreamClient::close() { + // Eats up buffered data before closing + _client->flush(); + _client->stop(); +} + +static inline int fill_iso8601_timestamp(int32_t seconds, int32_t milli, + char* buffer, int* length); + +TimeService::TimeService(M2XStreamClient* client) : _client(client) { +} + +int TimeService::init() { + _timer.start(); + return reset(); +} + +int TimeService::reset() { + int32_t ts; + int status = _client->getTimestamp32(&ts); + + if (m2x_status_is_success(status)) { + _server_timestamp = ts; + _local_last_milli = _timer.read_ms(); + } + + return status; +} + +int TimeService::getTimestamp(char* buffer, int* length) { + uint32_t now = _timer.read_ms(); + if (now < _local_last_milli) { + // In case of a timestamp overflow(happens once every 50 days on + // Arduino), we reset the server timestamp recorded. + // NOTE: while on an Arduino this might be okay, the situation is worse + // on mbed, see the notes in m2x-mbed.h for details + int status = reset(); + if (!m2x_status_is_success(status)) { return status; } + now = _timer.read_ms(); + } + if (now < _local_last_milli) { + // We have already reseted the timestamp, so this cannot happen + // (an HTTP request can take longer than 50 days to finished? You + // must be kidding here). Something else must be wrong here + return E_TIMESTAMP_ERROR; + } + uint32_t diff = now - _local_last_milli; + _local_last_milli = now; + _server_timestamp += (int32_t) (diff / 1000); // Milliseconds to seconds + return fill_iso8601_timestamp(_server_timestamp, (int32_t) (diff % 1000), + buffer, length); +} + +#define SIZE_ISO_8601 25 +static inline bool is_leap_year(int16_t y) { + return ((1970 + y) > 0) && + !((1970 + y) % 4) && + (((1970 + y) % 100) || !((1970 + y) % 400)); +} +static inline int32_t days_in_year(int16_t y) { + return is_leap_year(y) ? 366 : 365; +} +static const uint8_t MONTH_DAYS[]={31,28,31,30,31,30,31,31,30,31,30,31}; + +static inline int fill_iso8601_timestamp(int32_t timestamp, int32_t milli, + char* buffer, int* length) { + int16_t year; + int8_t month, month_length; + int32_t day; + int8_t hour, minute, second; + + if (*length < SIZE_ISO_8601) { + *length = SIZE_ISO_8601; + return E_BUFFER_TOO_SMALL; + } + + second = timestamp % 60; + timestamp /= 60; // now it is minutes + + minute = timestamp % 60; + timestamp /= 60; // now it is hours + + hour = timestamp % 24; + timestamp /= 24; // now it is days + + year = 0; + day = 0; + while ((day += days_in_year(year)) <= timestamp) { + year++; + } + day -= days_in_year(year); + timestamp -= day; // now it is days in this year, starting at 0 + + day = 0; + month_length = 0; + for (month = 0; month < 12; month++) { + if (month == 1) { + // February + month_length = is_leap_year(year) ? 29 : 28; + } else { + month_length = MONTH_DAYS[month]; + } + + if (timestamp >= month_length) { + timestamp -= month_length; + } else { + break; + } + } + year = 1970 + year; + month++; // offset by 1 + day = timestamp + 1; + + int i = 0, j = 0; + + // NOTE: It seems the snprintf implementation in Arduino has bugs, + // we have to manually piece the string together here. +#define INT_TO_STR(v_, width_) \ + for (j = 0; j < (width_); j++) { \ + buffer[i + (width_) - 1 - j] = '0' + ((v_) % 10); \ + (v_) /= 10; \ + } \ + i += (width_) + + INT_TO_STR(year, 4); + buffer[i++] = '-'; + INT_TO_STR(month, 2); + buffer[i++] = '-'; + INT_TO_STR(day, 2); + buffer[i++] = 'T'; + INT_TO_STR(hour, 2); + buffer[i++] = ':'; + INT_TO_STR(minute, 2); + buffer[i++] = ':'; + INT_TO_STR(second, 2); + buffer[i++] = '.'; + INT_TO_STR(milli, 3); + buffer[i++] = 'Z'; + buffer[i++] = '\0'; + +#undef INT_TO_STR + + *length = i; + return E_OK; +} + +/* Reader functions */ +#ifdef M2X_ENABLE_READER + +// Data structures and functions used to parse stream values + +#define STREAM_BUF_LEN 32 + +typedef struct { + uint8_t state; + char at_str[STREAM_BUF_LEN + 1]; + char value_str[STREAM_BUF_LEN + 1]; + int index; + + stream_value_read_callback callback; + void* context; +} stream_parsing_context_state; + +#define WAITING_AT 0x1 +#define GOT_AT 0x2 +#define WAITING_VALUE 0x4 +#define GOT_VALUE 0x8 + +#define GOT_STREAM (GOT_AT | GOT_VALUE) +#define TEST_GOT_STREAM(state_) (((state_) & GOT_STREAM) == GOT_STREAM) + +#define TEST_IS_AT(state_) (((state_) & (WAITING_AT | GOT_AT)) == WAITING_AT) +#define TEST_IS_VALUE(state_) (((state_) & (WAITING_VALUE | GOT_VALUE)) == \ + WAITING_VALUE) + +static void on_stream_key_found(jsonlite_callback_context* context, + jsonlite_token* token) +{ + stream_parsing_context_state* state = + (stream_parsing_context_state*) context->client_state; + if (strncmp((const char*) token->start, "timestamp", 9) == 0) { + state->state |= WAITING_AT; + } else if ((strncmp((const char*) token->start, "value", 5) == 0) && + (token->start[5] != 's')) { // get rid of "values" + state->state |= WAITING_VALUE; + } +} + +static void on_stream_value_found(jsonlite_callback_context* context, + jsonlite_token* token, + int type) +{ + stream_parsing_context_state* state = + (stream_parsing_context_state*) context->client_state; + + if (TEST_IS_AT(state->state)) { + strncpy(state->at_str, (const char*) token->start, + MIN(token->end - token->start, STREAM_BUF_LEN)); + state->at_str[MIN(token->end - token->start, STREAM_BUF_LEN)] = '\0'; + state->state |= GOT_AT; + } else if (TEST_IS_VALUE(state->state)) { + strncpy(state->value_str, (const char*) token->start, + MIN(token->end - token->start, STREAM_BUF_LEN)); + state->value_str[MIN(token->end - token->start, STREAM_BUF_LEN)] = '\0'; + state->state |= GOT_VALUE; + } + + if (TEST_GOT_STREAM(state->state)) { + state->callback(state->at_str, state->value_str, + state->index++, state->context, type); + state->state = 0; + } +} + +static void on_stream_string_found(jsonlite_callback_context* context, + jsonlite_token* token) +{ + on_stream_value_found(context, token, 1); +} + +static void on_stream_number_found(jsonlite_callback_context* context, + jsonlite_token* token) +{ + on_stream_value_found(context, token, 2); +} + +// Data structures and functions used to parse locations + +#define LOCATION_BUF_LEN 20 + +typedef struct { + uint16_t state; + char name_str[LOCATION_BUF_LEN + 1]; + double latitude; + double longitude; + double elevation; + char timestamp_str[LOCATION_BUF_LEN + 1]; + int index; + + location_read_callback callback; + void* context; +} location_parsing_context_state; + +#define WAITING_NAME 0x1 +#define WAITING_LATITUDE 0x2 +#define WAITING_LONGITUDE 0x4 +#define WAITING_ELEVATION 0x8 +#define WAITING_TIMESTAMP 0x10 + +#define GOT_NAME 0x20 +#define GOT_LATITUDE 0x40 +#define GOT_LONGITUDE 0x80 +#define GOT_ELEVATION 0x100 +#define GOT_TIMESTAMP 0x200 + +#define GOT_LOCATION (GOT_NAME | GOT_LATITUDE | GOT_LONGITUDE | GOT_ELEVATION | GOT_TIMESTAMP) +#define TEST_GOT_LOCATION(state_) (((state_) & GOT_LOCATION) == GOT_LOCATION) + +#define TEST_IS_NAME(state_) (((state_) & (WAITING_NAME | GOT_NAME)) == WAITING_NAME) +#define TEST_IS_LATITUDE(state_) (((state_) & (WAITING_LATITUDE | GOT_LATITUDE)) \ + == WAITING_LATITUDE) +#define TEST_IS_LONGITUDE(state_) (((state_) & (WAITING_LONGITUDE | GOT_LONGITUDE)) \ + == WAITING_LONGITUDE) +#define TEST_IS_ELEVATION(state_) (((state_) & (WAITING_ELEVATION | GOT_ELEVATION)) \ + == WAITING_ELEVATION) +#define TEST_IS_TIMESTAMP(state_) (((state_) & (WAITING_TIMESTAMP | GOT_TIMESTAMP)) \ + == WAITING_TIMESTAMP) + +static void on_location_key_found(jsonlite_callback_context* context, + jsonlite_token* token) { + location_parsing_context_state* state = + (location_parsing_context_state*) context->client_state; + if (strncmp((const char*) token->start, "waypoints", 9) == 0) { + // only parses those locations in waypoints, skip the outer one + state->state = 0; + } else if (strncmp((const char*) token->start, "name", 4) == 0) { + state->state |= WAITING_NAME; + } else if (strncmp((const char*) token->start, "latitude", 8) == 0) { + state->state |= WAITING_LATITUDE; + } else if (strncmp((const char*) token->start, "longitude", 9) == 0) { + state->state |= WAITING_LONGITUDE; + } else if (strncmp((const char*) token->start, "elevation", 9) == 0) { + state->state |= WAITING_ELEVATION; + } else if (strncmp((const char*) token->start, "timestamp", 9) == 0) { + state->state |= WAITING_TIMESTAMP; + } +} + +static void on_location_string_found(jsonlite_callback_context* context, + jsonlite_token* token) { + location_parsing_context_state* state = + (location_parsing_context_state*) context->client_state; + + if (TEST_IS_NAME(state->state)) { + strncpy(state->name_str, (const char*) token->start, + MIN(token->end - token->start, LOCATION_BUF_LEN)); + state->name_str[MIN(token->end - token->start, LOCATION_BUF_LEN)] = '\0'; + state->state |= GOT_NAME; + } else if (TEST_IS_LATITUDE(state->state)) { + state->latitude = atof((const char*) token->start); + state->state |= GOT_LATITUDE; + } else if (TEST_IS_LONGITUDE(state->state)) { + state->longitude = atof((const char*) token->start); + state->state |= GOT_LONGITUDE; + } else if (TEST_IS_ELEVATION(state->state)) { + state->elevation = atof((const char*) token->start); + state->state |= GOT_ELEVATION; + } else if (TEST_IS_TIMESTAMP(state->state)) { + strncpy(state->timestamp_str, (const char*) token->start, + MIN(token->end - token->start, LOCATION_BUF_LEN)); + state->timestamp_str[MIN(token->end - token->start, LOCATION_BUF_LEN)] = '\0'; + state->state |= GOT_TIMESTAMP; + } + + if (TEST_GOT_LOCATION(state->state)) { + state->callback(state->name_str, state->latitude, state->longitude, + state->elevation, state->timestamp_str, state->index++, + state->context); + state->state = 0; + } +} + +#ifndef M2X_COMMAND_BUF_LEN +#define M2X_COMMAND_BUF_LEN 40 +#endif /* M2X_COMMAND_BUF_LEN */ + +typedef struct { + uint8_t state; + char id_str[M2X_COMMAND_BUF_LEN + 1]; + char name_str[M2X_COMMAND_BUF_LEN + 1]; + int index; + + m2x_command_read_callback callback; + void* context; +} m2x_command_parsing_context_state; + +#define M2X_COMMAND_WAITING_ID 0x1 +#define M2X_COMMAND_GOT_ID 0x2 +#define M2X_COMMAND_WAITING_NAME 0x4 +#define M2X_COMMAND_GOT_NAME 0x8 + +#define M2X_COMMAND_GOT_COMMAND (M2X_COMMAND_GOT_ID | M2X_COMMAND_GOT_NAME) +#define M2X_COMMAND_TEST_GOT_COMMAND(state_) \ + (((state_) & M2X_COMMAND_GOT_COMMAND) == M2X_COMMAND_GOT_COMMAND) + +#define M2X_COMMAND_TEST_IS_ID(state_) \ + (((state_) & (M2X_COMMAND_WAITING_ID | M2X_COMMAND_GOT_ID)) == \ + M2X_COMMAND_WAITING_ID) +#define M2X_COMMAND_TEST_IS_NAME(state_) \ + (((state_) & (M2X_COMMAND_WAITING_NAME | M2X_COMMAND_GOT_NAME)) == \ + M2X_COMMAND_WAITING_NAME) + +static void m2x_on_command_key_found(jsonlite_callback_context* context, + jsonlite_token* token) +{ + m2x_command_parsing_context_state* state = + (m2x_command_parsing_context_state*) context->client_state; + if (strncmp((const char*) token->start, "id", 2) == 0) { + state->state |= M2X_COMMAND_WAITING_ID; + } else if (strncmp((const char*) token->start, "name", 4) == 0) { + state->state |= M2X_COMMAND_WAITING_NAME; + } +} + +static void m2x_on_command_value_found(jsonlite_callback_context* context, + jsonlite_token* token) +{ + m2x_command_parsing_context_state* state = + (m2x_command_parsing_context_state*) context->client_state; + if (M2X_COMMAND_TEST_IS_ID(state->state)) { + strncpy(state->id_str, (const char*) token->start, + MIN(token->end - token->start, M2X_COMMAND_BUF_LEN)); + state->id_str[MIN(token->end - token->start, M2X_COMMAND_BUF_LEN)] = '\0'; + state->state |= M2X_COMMAND_GOT_ID; + } else if (M2X_COMMAND_TEST_IS_NAME(state->state)) { + strncpy(state->name_str, (const char*) token->start, + MIN(token->end - token->start, M2X_COMMAND_BUF_LEN)); + state->name_str[MIN(token->end - token->start, M2X_COMMAND_BUF_LEN)] = '\0'; + state->state |= M2X_COMMAND_GOT_NAME; + } + + if (M2X_COMMAND_TEST_GOT_COMMAND(state->state)) { + state->callback(state->id_str, state->name_str, + state->index++, state->context); + state->state = 0; + } +} + +int M2XStreamClient::listStreamValues(const char* deviceId, const char* streamName, + stream_value_read_callback callback, void* context, + const char* query) { + if (_client->connect(_host, _port)) { + DBGLN("%s", "Connected to M2X server!"); + _client->print("GET "); + if (_path_prefix) { _client->print(_path_prefix); } + _client->print("/v2/devices/"); + _client->print(deviceId); + _client->print("/streams/"); + print_encoded_string(_client, streamName); + _client->print("/values"); + + if (query) { + if (query[0] != '?') { + _client->print('?'); + } + _client->print(query); + } + + _client->println(" HTTP/1.0"); + writeHttpHeader(-1); + } else { + DBGLN("%s", "ERROR: Cannot connect to M2X server!"); + return E_NOCONNECTION; + } + int status = readStatusCode(false); + if (status == 200) { + readStreamValue(callback, context); + } + + close(); + return status; +} + +int M2XStreamClient::readLocation(const char* deviceId, + location_read_callback callback, + void* context) { + if (_client->connect(_host, _port)) { + DBGLN("%s", "Connected to M2X server!"); + _client->print("GET "); + if (_path_prefix) { _client->print(_path_prefix); } + _client->print("/v2/devices/"); + _client->print(deviceId); + _client->println("/location HTTP/1.0"); + + writeHttpHeader(-1); + } else { + DBGLN("%s", "ERROR: Cannot connect to M2X server!"); + return E_NOCONNECTION; + } + int status = readStatusCode(false); + if (status == 200) { + readLocation(callback, context); + } + + close(); + return status; +} + +int M2XStreamClient::listCommands(const char* deviceId, + m2x_command_read_callback callback, + void* context, + const char* query) { + if (_client->connect(_host, _port)) { + DBGLN("%s", "Connected to M2X server!"); + _client->print("GET "); + if (_path_prefix) { _client->print(_path_prefix); } + _client->print("/v2/devices/"); + _client->print(deviceId); + _client->print("/commands"); + + if (query) { + if (query[0] != '?') { + _client->print('?'); + } + _client->print(query); + } + + _client->println(" HTTP/1.0"); + writeHttpHeader(-1); + } else { + DBGLN("%s", "ERROR: Cannot connect to M2X server!"); + return E_NOCONNECTION; + } + int status = readStatusCode(false); + if (status == 200) { + readCommand(callback, context); + } + + close(); + return status; +} + + +int M2XStreamClient::readStreamValue(stream_value_read_callback callback, + void* context) { + const int BUF_LEN = 64; + char buf[BUF_LEN]; + + int length = readContentLength(); + if (length < 0) { + close(); + return length; + } + + int index = skipHttpHeader(); + if (index != E_OK) { + close(); + return index; + } + index = 0; + + stream_parsing_context_state state; + state.state = state.index = 0; + state.callback = callback; + state.context = context; + + jsonlite_parser_callbacks cbs = jsonlite_default_callbacks; + cbs.key_found = on_stream_key_found; + cbs.number_found = on_stream_number_found; + cbs.string_found = on_stream_string_found; + cbs.context.client_state = &state; + + jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5)); + jsonlite_parser_set_callback(p, &cbs); + + jsonlite_result result = jsonlite_result_unknown; + while (index < length) { + int i = 0; + + DBG("%s", "Received Data: "); + while ((i < BUF_LEN) && _client->available()) { + buf[i++] = _client->read(); + DBG("%c", buf[i - 1]); + } + DBGLNEND; + + if ((!_client->connected()) && + (!_client->available()) && + ((index + i) < length)) { + jsonlite_parser_release(p); + close(); + return E_NOCONNECTION; + } + + result = jsonlite_parser_tokenize(p, buf, i); + if ((result != jsonlite_result_ok) && + (result != jsonlite_result_end_of_stream)) { + jsonlite_parser_release(p); + close(); + return E_JSON_INVALID; + } + + index += i; + } + + jsonlite_parser_release(p); + close(); + return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID); +} + +int M2XStreamClient::readLocation(location_read_callback callback, + void* context) { + const int BUF_LEN = 40; + char buf[BUF_LEN]; + + int length = readContentLength(); + if (length < 0) { + close(); + return length; + } + + int index = skipHttpHeader(); + if (index != E_OK) { + close(); + return index; + } + index = 0; + + location_parsing_context_state state; + state.state = state.index = 0; + state.callback = callback; + state.context = context; + + jsonlite_parser_callbacks cbs = jsonlite_default_callbacks; + cbs.key_found = on_location_key_found; + cbs.string_found = on_location_string_found; + cbs.context.client_state = &state; + + jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5)); + jsonlite_parser_set_callback(p, &cbs); + + jsonlite_result result = jsonlite_result_unknown; + while (index < length) { + int i = 0; + + DBG("%s", "Received Data: "); + while ((i < BUF_LEN) && _client->available()) { + buf[i++] = _client->read(); + DBG("%c", buf[i - 1]); + } + DBGLNEND; + + if ((!_client->connected()) && + (!_client->available()) && + ((index + i) < length)) { + jsonlite_parser_release(p); + close(); + return E_NOCONNECTION; + } + + result = jsonlite_parser_tokenize(p, buf, i); + if ((result != jsonlite_result_ok) && + (result != jsonlite_result_end_of_stream)) { + jsonlite_parser_release(p); + close(); + return E_JSON_INVALID; + } + + index += i; + } + + jsonlite_parser_release(p); + close(); + return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID); +} + +int M2XStreamClient::readCommand(m2x_command_read_callback callback, + void* context) { + const int BUF_LEN = 60; + char buf[BUF_LEN]; + + int length = readContentLength(); + if (length < 0) { + close(); + return length; + } + + int index = skipHttpHeader(); + if (index != E_OK) { + close(); + return index; + } + index = 0; + + m2x_command_parsing_context_state state; + state.state = 0; + state.index = 0; + state.callback = callback; + state.context = context; + + jsonlite_parser_callbacks cbs = jsonlite_default_callbacks; + cbs.key_found = m2x_on_command_key_found; + cbs.string_found = m2x_on_command_value_found; + cbs.context.client_state = &state; + + jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5)); + jsonlite_parser_set_callback(p, &cbs); + + jsonlite_result result = jsonlite_result_unknown; + while (index < length) { + int i = 0; + + DBG("%s", "Received Data: "); + while ((i < BUF_LEN) && _client->available()) { + buf[i++] = _client->read(); + DBG("%c", buf[i - 1]); + } + DBGLNEND; + + if ((!_client->connected()) && + (!_client->available()) && + ((index + i) < length)) { + jsonlite_parser_release(p); + close(); + return E_NOCONNECTION; + } + + result = jsonlite_parser_tokenize(p, buf, i); + if ((result != jsonlite_result_ok) && + (result != jsonlite_result_end_of_stream)) { + jsonlite_parser_release(p); + close(); + return E_JSON_INVALID; + } + + index += i; + } + + jsonlite_parser_release(p); + close(); + return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID); +} + +#endif /* M2X_ENABLE_READER */ #endif /* M2XStreamClient_h */
diff -r 6878944d2ce2 -r 4d895e732765 M2XStreamClient_template.h --- a/M2XStreamClient_template.h Sat Jan 02 02:29:43 2016 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,183 +0,0 @@ -#ifndef M2XStreamClient_template_h -#define M2XStreamClient_template_h - -// Implementations of template functions - -int print_encoded_string(Print* print, const char* str); - -template <class T> -int M2XStreamClient::updateStreamValue(const char* deviceId, const char* streamName, T value) { - if (_client->connect(_host, _port)) { - DBGLN("%s", "Connected to M2X server!"); - writePutHeader(deviceId, streamName, - // for {"value":" and "} - _null_print.print(value) + 12); - _client->print("{\"value\":\""); - _client->print(value); - _client->print("\"}"); - } else { - DBGLN("%s", "ERROR: Cannot connect to M2X server!"); - return E_NOCONNECTION; - } - - return readStatusCode(true); -} - -template <class T> -inline int write_multiple_values(Print* print, int streamNum, - const char* names[], const int counts[], - const char* ats[], T values[]) { - int bytes = 0, value_index = 0; - bytes += print->print("{\"values\":{"); - for (int i = 0; i < streamNum; i++) { - bytes += print->print("\""); - bytes += print->print(names[i]); - bytes += print->print("\":["); - for (int j = 0; j < counts[i]; j++) { - bytes += print->print("{\"timestamp\": \""); - bytes += print->print(ats[value_index]); - bytes += print->print("\", \"value\": \""); - bytes += print->print(values[value_index]); - bytes += print->print("\"}"); - if (j < counts[i] - 1) { bytes += print->print(","); } - value_index++; - } - bytes += print->print("]"); - if (i < streamNum - 1) { bytes += print->print(","); } - } - bytes += print->print("}}"); - return bytes; -} - -template <class T> -int M2XStreamClient::postDeviceUpdates(const char* deviceId, int streamNum, - const char* names[], const int counts[], - const char* ats[], T values[]) { - if (_client->connect(_host, _port)) { - DBGLN("%s", "Connected to M2X server!"); - int length = write_multiple_values(&_null_print, streamNum, names, - counts, ats, values); - _client->print("POST "); - if (_path_prefix) - _client->print(_path_prefix); - _client->print("/v2/devices/"); - print_encoded_string(_client, deviceId); - _client->println("/updates HTTP/1.0"); - writeHttpHeader(length); - write_multiple_values(_client, streamNum, names, counts, ats, values); - } else { - DBGLN("%s", "ERROR: Cannot connect to M2X server!"); - return E_NOCONNECTION; - } - return readStatusCode(true); -} - -template <class T> -inline int write_single_device_values(Print* print, int streamNum, - const char* names[], T values[], - const char* at) { - int bytes = 0; - bytes += print->print("{\"values\":{"); - for (int i = 0; i < streamNum; i++) { - bytes += print->print("\""); - bytes += print->print(names[i]); - bytes += print->print("\": \""); - bytes += print->print(values[i]); - bytes += print->print("\""); - if (i < streamNum - 1) { bytes += print->print(","); } - } - bytes += print->print("}"); - if (at != NULL) { - bytes += print->print(",\"timestamp\":\""); - bytes += print->print(at); - bytes += print->print("\""); - } - bytes += print->print("}"); - return bytes; -} - -template <class T> -int M2XStreamClient::postSingleDeviceUpdate(const char* deviceId, int streamNum, - const char* names[], T values[], - const char* at) { - if (_client->connect(_host, _port)) { - DBGLN("%s", "Connected to M2X server!"); - int length = write_single_device_values(&_null_print, streamNum, names, - values, at); - _client->print("POST "); - if (_path_prefix) { - _client->print(_path_prefix); - } - _client->print("/v2/devices/"); - print_encoded_string(_client, deviceId); - _client->println("/update HTTP/1.0"); - writeHttpHeader(length); - write_single_device_values(_client, streamNum, names, values, at); - } else { - DBGLN("%s", "ERROR: Cannot connect to M2X server!"); - return E_NOCONNECTION; - } - return readStatusCode(true); -} - -template <class T> -static int write_location_data(Print* print, const char* name, - T latitude, T longitude, - T elevation) { - int bytes = 0; - bytes += print->print("{\"name\":\""); - bytes += print->print(name); - bytes += print->print("\",\"latitude\":\""); - bytes += print->print(latitude); - bytes += print->print("\",\"longitude\":\""); - bytes += print->print(longitude); - bytes += print->print("\",\"elevation\":\""); - bytes += print->print(elevation); - bytes += print->print("\"}"); - return bytes; -} - -static int write_location_data(Print* print, const char* name, - double latitude, double longitude, - double elevation) { - int bytes = 0; - bytes += print->print("{\"name\":\""); - bytes += print->print(name); - bytes += print->print("\",\"latitude\":\""); - bytes += print->print(latitude, MAX_DOUBLE_DIGITS); - bytes += print->print("\",\"longitude\":\""); - bytes += print->print(longitude, MAX_DOUBLE_DIGITS); - bytes += print->print("\",\"elevation\":\""); - bytes += print->print(elevation); - bytes += print->print("\"}"); - return bytes; -} - -template <class T> -int M2XStreamClient::updateLocation(const char* deviceId, - const char* name, - T latitude, - T longitude, - T elevation) { - if (_client->connect(_host, _port)) { - DBGLN("%s", "Connected to M2X server!"); - - int length = write_location_data(&_null_print, name, latitude, longitude, - elevation); - _client->print("PUT "); - if (_path_prefix) - _client->print(_path_prefix); - _client->print("/v2/devices/"); - print_encoded_string(_client, deviceId); - _client->println("/location HTTP/1.0"); - - writeHttpHeader(length); - write_location_data(_client, name, latitude, longitude, elevation); - } else { - DBGLN("%s", "ERROR: Cannot connect to M2X server!"); - return E_NOCONNECTION; - } - return readStatusCode(true); -} - -#endif
diff -r 6878944d2ce2 -r 4d895e732765 NullPrint.h --- a/NullPrint.h Sat Jan 02 02:29:43 2016 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,18 +0,0 @@ -#ifndef NullPrint_h -#define NullPrint_h - -#include "Print.h" - -// Null Print class used to calculate length to print -class NullPrint : public Print { -public: - virtual size_t write(uint8_t b) { - return 1; - } - - virtual size_t write(const uint8_t* buf, size_t size) { - return size; - } -}; - -#endif /* NullPrint_h */
diff -r 6878944d2ce2 -r 4d895e732765 Print.cpp --- a/Print.cpp Sat Jan 02 02:29:43 2016 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,62 +0,0 @@ -#include "Print.h" -#include "mbed.h" - -#include <stdio.h> -#include <string.h> - -size_t Print::write(const uint8_t* buf, size_t size) { - size_t ret = 0; - while (size--) { - ret += write(*buf++); - } - return ret; -} - -size_t Print::print(const char* s) { - return write((const uint8_t*)s, strlen(s)); -} - -size_t Print::print(char c) { - return write(c); -} - -size_t Print::print(int n) { - return print((long) n); -} - -size_t Print::print(long n) { - char buf[8 * sizeof(long) + 1]; - snprintf(buf, sizeof(buf), "%ld", n); - return print(buf); -} - -// Digits are ignored for now -size_t Print::print(double n, int digits) { - char buf[65]; - snprintf(buf, sizeof(buf), "%g", n); - return print(buf); -} - -size_t Print::println(const char* s) { - return print(s) + println(); -} - -size_t Print::println(char c) { - return print(c) + println(); -} - -size_t Print::println(int n) { - return print(n) + println(); -} - -size_t Print::println(long n) { - return print(n) + println(); -} - -size_t Print::println(double n, int digits) { - return print(n, digits) + println(); -} - -size_t Print::println() { - return print('\r') + print('\n'); -}
diff -r 6878944d2ce2 -r 4d895e732765 Print.h --- a/Print.h Sat Jan 02 02:29:43 2016 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,26 +0,0 @@ -#ifndef Print_h -#define Print_h - -#include <stddef.h> -#include <stdint.h> - -class Print { -public: - size_t print(const char* s); - size_t print(char c); - size_t print(int n); - size_t print(long n); - size_t print(double n, int digits = 2); - - size_t println(const char* s); - size_t println(char c); - size_t println(int n); - size_t println(long n); - size_t println(double n, int digits = 2); - size_t println(); - - virtual size_t write(uint8_t c) = 0; - virtual size_t write(const uint8_t* buf, size_t size); -}; - -#endif
diff -r 6878944d2ce2 -r 4d895e732765 StreamParseFunctions.h --- a/StreamParseFunctions.h Sat Jan 02 02:29:43 2016 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,81 +0,0 @@ -#ifndef StreamParseFunctions_h -#define StreamParseFunctions_h - -// Data structures and functions used to parse stream values - -#define STREAM_BUF_LEN 32 - -typedef struct { - uint8_t state; - char at_str[STREAM_BUF_LEN + 1]; - char value_str[STREAM_BUF_LEN + 1]; - int index; - - stream_value_read_callback callback; - void* context; -} stream_parsing_context_state; - -#define WAITING_AT 0x1 -#define GOT_AT 0x2 -#define WAITING_VALUE 0x4 -#define GOT_VALUE 0x8 - -#define GOT_STREAM (GOT_AT | GOT_VALUE) -#define TEST_GOT_STREAM(state_) (((state_) & GOT_STREAM) == GOT_STREAM) - -#define TEST_IS_AT(state_) (((state_) & (WAITING_AT | GOT_AT)) == WAITING_AT) -#define TEST_IS_VALUE(state_) (((state_) & (WAITING_VALUE | GOT_VALUE)) == \ - WAITING_VALUE) - -static void on_stream_key_found(jsonlite_callback_context* context, - jsonlite_token* token) -{ - stream_parsing_context_state* state = - (stream_parsing_context_state*) context->client_state; - if (strncmp((const char*) token->start, "timestamp", 2) == 0) { - state->state |= WAITING_AT; - } else if ((strncmp((const char*) token->start, "value", 5) == 0) && - (token->start[5] != 's')) { // get rid of "values" - state->state |= WAITING_VALUE; - } -} - -static void on_stream_value_found(jsonlite_callback_context* context, - jsonlite_token* token, - int type) -{ - stream_parsing_context_state* state = - (stream_parsing_context_state*) context->client_state; - - if (TEST_IS_AT(state->state)) { - strncpy(state->at_str, (const char*) token->start, - MIN(token->end - token->start, STREAM_BUF_LEN)); - state->at_str[MIN(token->end - token->start, STREAM_BUF_LEN)] = '\0'; - state->state |= GOT_AT; - } else if (TEST_IS_VALUE(state->state)) { - strncpy(state->value_str, (const char*) token->start, - MIN(token->end - token->start, STREAM_BUF_LEN)); - state->value_str[MIN(token->end - token->start, STREAM_BUF_LEN)] = '\0'; - state->state |= GOT_VALUE; - } - - if (TEST_GOT_STREAM(state->state)) { - state->callback(state->at_str, state->value_str, - state->index++, state->context, type); - state->state = 0; - } -} - -static void on_stream_string_found(jsonlite_callback_context* context, - jsonlite_token* token) -{ - on_stream_value_found(context, token, 1); -} - -static void on_stream_number_found(jsonlite_callback_context* context, - jsonlite_token* token) -{ - on_stream_value_found(context, token, 2); -} - -#endif /* StreamParseFunctions_h */
diff -r 6878944d2ce2 -r 4d895e732765 TimeService.cpp --- a/TimeService.cpp Sat Jan 02 02:29:43 2016 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,139 +0,0 @@ -#include "M2XStreamClient.h" - -static int fill_iso8601_timestamp(int32_t seconds, int32_t milli, - char* buffer, int* length); - -TimeService::TimeService(M2XStreamClient* client) : _client(client) { -} - -int TimeService::init() { - _timer.start(); - return reset(); -} - -int TimeService::reset() { - int32_t ts; - int status = _client->getTimestamp32(&ts); - - if (m2x_status_is_success(status)) { - _server_timestamp = ts; - _local_last_milli = _timer.read_ms(); - } - - return status; -} - -int TimeService::getTimestamp(char* buffer, int* length) { - uint32_t now = _timer.read_ms(); - if (now < _local_last_milli) { - // In case of a timestamp overflow(happens once every 50 days on - // Arduino), we reset the server timestamp recorded. - int status = reset(); - if (!m2x_status_is_success(status)) { return status; } - now = _timer.read_ms(); - } - if (now < _local_last_milli) { - // We have already reseted the timestamp, so this cannot happen - // (an HTTP request can take longer than 50 days to finished? You - // must be kidding here). Something else must be wrong here - return E_TIMESTAMP_ERROR; - } - uint32_t diff = now - _local_last_milli; - _local_last_milli = now; - _server_timestamp += (int32_t) (diff / 1000); // Milliseconds to seconds - return fill_iso8601_timestamp(_server_timestamp, (int32_t) (diff % 1000), - buffer, length); -} - -#define SIZE_ISO_8601 25 -static inline bool is_leap_year(int16_t y) { - return ((1970 + y) > 0) && - !((1970 + y) % 4) && - (((1970 + y) % 100) || !((1970 + y) % 400)); -} -static inline int32_t days_in_year(int16_t y) { - return is_leap_year(y) ? 366 : 365; -} -static const uint8_t MONTH_DAYS[]={31,28,31,30,31,30,31,31,30,31,30,31}; - -static int fill_iso8601_timestamp(int32_t timestamp, int32_t milli, - char* buffer, int* length) { - int16_t year; - int8_t month, month_length; - int32_t day; - int8_t hour, minute, second; - - if (*length < SIZE_ISO_8601) { - *length = SIZE_ISO_8601; - return E_BUFFER_TOO_SMALL; - } - - second = timestamp % 60; - timestamp /= 60; // now it is minutes - - minute = timestamp % 60; - timestamp /= 60; // now it is hours - - hour = timestamp % 24; - timestamp /= 24; // now it is days - - year = 0; - day = 0; - while ((day += days_in_year(year)) <= timestamp) { - year++; - } - day -= days_in_year(year); - timestamp -= day; // now it is days in this year, starting at 0 - - day = 0; - month_length = 0; - for (month = 0; month < 12; month++) { - if (month == 1) { - // February - month_length = is_leap_year(year) ? 29 : 28; - } else { - month_length = MONTH_DAYS[month]; - } - - if (timestamp >= month_length) { - timestamp -= month_length; - } else { - break; - } - } - year = 1970 + year; - month++; // offset by 1 - day = timestamp + 1; - - int i = 0, j = 0; - - // NOTE: It seems the snprintf implementation in Arduino has bugs, - // we have to manually piece the string together here. -#define INT_TO_STR(v_, width_) \ - for (j = 0; j < (width_); j++) { \ - buffer[i + (width_) - 1 - j] = '0' + ((v_) % 10); \ - (v_) /= 10; \ - } \ - i += (width_) - - INT_TO_STR(year, 4); - buffer[i++] = '-'; - INT_TO_STR(month, 2); - buffer[i++] = '-'; - INT_TO_STR(day, 2); - buffer[i++] = 'T'; - INT_TO_STR(hour, 2); - buffer[i++] = ':'; - INT_TO_STR(minute, 2); - buffer[i++] = ':'; - INT_TO_STR(second, 2); - buffer[i++] = '.'; - INT_TO_STR(milli, 3); - buffer[i++] = 'Z'; - buffer[i++] = '\0'; - -#undef INT_TO_STR - - *length = i; - return E_OK; -}
diff -r 6878944d2ce2 -r 4d895e732765 TimeService.h --- a/TimeService.h Sat Jan 02 02:29:43 2016 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,40 +0,0 @@ -#ifndef TimeService_h -#define TimeService_h - -class M2XStreamClient; - -// A ISO8601 timestamp generation service for M2X. -// It uses the Time API provided by the M2X server to initialize -// clock, then uses millis() function provided by Arduino to calculate -// time advancements so as to reduce API query times. -// -// Right now, this service only works with 32-bit timestamp, meaning that -// this service won't work after 03:14:07 UTC on 19 January 2038. However, -// a similar service that uses 64-bit timestamp can be implemented following -// the logic here. -class TimeService { -public: - TimeService(M2XStreamClient* client); - - // Initialize the time service. Notice the TimeService instance is only - // working after calling this function successfully. - int init(); - - // Reset the internal recorded time by calling M2X Time API again. Normally, - // you don't need to call this manually. TimeService will handle Arduino clock - // overflow automatically - int reset(); - - // Fills ISO8601 formatted timestamp into the buffer provided. +length+ should - // contains the maximum supported length of the buffer when calling. For now, - // the buffer should be able to store 25 characters for a full ISO8601 formatted - // timestamp, otherwise, an error will be returned. - int getTimestamp(char* buffer, int* length); -private: - M2XStreamClient* _client; - int32_t _server_timestamp; - uint32_t _local_last_milli; - Timer _timer; -}; - -#endif /* TimeService_h */
diff -r 6878944d2ce2 -r 4d895e732765 Utility.cpp --- a/Utility.cpp Sat Jan 02 02:29:43 2016 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,11 +0,0 @@ -#include "Utility.h" - -void delay(int ms) { - wait_ms(ms); -} - -char* strdup(const char* s) { - char* ret = (char*) malloc(strlen(s) + 1); - if (ret == NULL) { return ret;} - return strcpy(ret, s); -}
diff -r 6878944d2ce2 -r 4d895e732765 Utility.h --- a/Utility.h Sat Jan 02 02:29:43 2016 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,18 +0,0 @@ -#ifndef UTILITY_H_ -#define UTILITY_H_ - -#include "mbed.h" -#include <string.h> - -#ifdef __cplusplus -extern "C" { -#endif - -void delay(int ms); -char* strdup(const char* s); - -#ifdef __cplusplus -} -#endif - -#endif \ No newline at end of file
diff -r 6878944d2ce2 -r 4d895e732765 m2x-mbed.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/m2x-mbed.h Wed Jun 15 12:36:55 2016 +0000 @@ -0,0 +1,264 @@ +#include "mbed.h" +#define USER_AGENT "User-Agent: M2X Mbed Client/" M2X_VERSION + +#ifdef DEBUG +#define DBG(fmt_, data_) printf((fmt_), (data_)) +#define DBGLN(fmt_, data_) printf((fmt_), (data_)); printf("\n") +#define DBGLNEND printf("\n") +#endif /* DEBUG */ + +class M2XTimer { +public: + void start() { _timer.start(); } + + unsigned long read_ms() { + // In case of a timestamp overflow, we reset the server timestamp recorded. + // Notice that unlike Arduino, mbed would overflow every 30 minutes, + // so if 2 calls to this API are more than 30 minutes apart, we are + // likely to run into troubles. This is a limitation of the current + // mbed platform. However, we argue that it might be a rare case that + // 2 calls to this are 30 minutes apart. In most cases, we would call + // this API every few seconds or minutes, this won't be a huge problem. + // However, if you have a use case that would require 2 intervening + // calls to this be 30 minutes apart, you might want to leverage a RTC + // clock instead of the simple ticker here, or call TimeService::reset() + // before making an API call to sync current time. + return _timer.read_ms(); + } +private: + Timer _timer; +}; + +#include "TCPSocketConnection.h" + +#include <stddef.h> +#include <stdint.h> +#include <stdio.h> +#include <string.h> + +#ifdef __cplusplus +extern "C" { +#endif + +void delay(int ms) +{ + wait_ms(ms); +} + +char* strdup(const char* s) +{ + char* ret = (char*) malloc(strlen(s) + 1); + if (ret == NULL) { return ret;} + return strcpy(ret, s); +} + +#ifdef __cplusplus +} +#endif + +class Print { +public: + size_t print(const char* s); + size_t print(char c); + size_t print(int n); + size_t print(long n); + size_t print(double n, int digits = 2); + + size_t println(const char* s); + size_t println(char c); + size_t println(int n); + size_t println(long n); + size_t println(double n, int digits = 2); + size_t println(); + + virtual size_t write(uint8_t c) = 0; + virtual size_t write(const uint8_t* buf, size_t size); +}; + +size_t Print::write(const uint8_t* buf, size_t size) { + size_t ret = 0; + while (size--) { + ret += write(*buf++); + } + return ret; +} + +size_t Print::print(const char* s) { + return write((const uint8_t*)s, strlen(s)); +} + +size_t Print::print(char c) { + return write(c); +} + +size_t Print::print(int n) { + return print((long) n); +} + +size_t Print::print(long n) { + char buf[8 * sizeof(long) + 1]; + snprintf(buf, sizeof(buf), "%ld", n); + return print(buf); +} + +// Digits are ignored for now +size_t Print::print(double n, int digits) { + char buf[65]; + snprintf(buf, sizeof(buf), "%g", n); + return print(buf); +} + +size_t Print::println(const char* s) { + return print(s) + println(); +} + +size_t Print::println(char c) { + return print(c) + println(); +} + +size_t Print::println(int n) { + return print(n) + println(); +} + +size_t Print::println(long n) { + return print(n) + println(); +} + +size_t Print::println(double n, int digits) { + return print(n, digits) + println(); +} + +size_t Print::println() { + return print('\r') + print('\n'); +} + +/* + * TCP Client + */ +class Client : public Print { +public: + Client(); + ~Client(); + + virtual int connect(const char *host, uint16_t port); + virtual size_t write(uint8_t); + virtual size_t write(const uint8_t *buf, size_t size); + virtual int available(); + virtual int read(); + virtual void flush(); + virtual void stop(); + virtual uint8_t connected(); +private: + virtual int read(uint8_t *buf, size_t size); + void _fillin(void); + uint8_t _inbuf[128]; + uint8_t _incnt; + void _flushout(void); + uint8_t _outbuf[128]; + uint8_t _outcnt; + TCPSocketConnection _sock; +}; + +Client::Client() : _incnt(0), _outcnt(0), _sock() { + _sock.set_blocking(false, 1500); +} + +Client::~Client() { +} + +int Client::connect(const char *host, uint16_t port) { + return _sock.connect(host, port) == 0; +} + +size_t Client::write(uint8_t b) { + return write(&b, 1); +} + +size_t Client::write(const uint8_t *buf, size_t size) { + size_t cnt = 0; + while (size) { + int tmp = sizeof(_outbuf) - _outcnt; + if (tmp > size) tmp = size; + memcpy(_outbuf + _outcnt, buf, tmp); + _outcnt += tmp; + buf += tmp; + size -= tmp; + cnt += tmp; + // if no space flush it + if (_outcnt == sizeof(_outbuf)) + _flushout(); + } + return cnt; +} + +void Client::_flushout(void) +{ + if (_outcnt > 0) { + // NOTE: we know it's dangerous to cast from (const uint8_t *) to (char *), + // but we are trying to maintain a stable interface between the Arduino + // one and the mbed one. What's more, while TCPSocketConnection has no + // intention of modifying the data here, it requires us to send a (char *) + // typed data. So we belive it's safe to do the cast here. + _sock.send_all(const_cast<char*>((const char*) _outbuf), _outcnt); + _outcnt = 0; + } +} + +void Client::_fillin(void) +{ + int tmp = sizeof(_inbuf) - _incnt; + if (tmp) { + tmp = _sock.receive_all((char*)_inbuf + _incnt, tmp); + if (tmp > 0) + _incnt += tmp; + } +} + +void Client::flush() { + _flushout(); +} + +int Client::available() { + if (_incnt == 0) { + _flushout(); + _fillin(); + } + return (_incnt > 0) ? 1 : 0; +} + +int Client::read() { + uint8_t ch; + return (read(&ch, 1) == 1) ? ch : -1; +} + +int Client::read(uint8_t *buf, size_t size) { + int cnt = 0; + while (size) { + // need more + if (size > _incnt) { + _flushout(); + _fillin(); + } + if (_incnt > 0) { + int tmp = _incnt; + if (tmp > size) tmp = size; + memcpy(buf, _inbuf, tmp); + if (tmp != _incnt) + memmove(_inbuf, _inbuf + tmp, _incnt - tmp); + _incnt -= tmp; + size -= tmp; + buf += tmp; + cnt += tmp; + } else // no data + break; + } + return cnt; +} + +void Client::stop() { + _sock.close(); +} + +uint8_t Client::connected() { + return _sock.is_connected() ? 1 : 0; +}