Added function deleteLocations to delete the location history.
Dependents: WNCInterface_M2Xdemo ATT_WNCInterface_Info WNCInterface_HTTP_example Public_IoT_M2X_Cellular_Demo
Fork of M2XStreamClient by
M2XStreamClient.h
- Committer:
- JMF
- Date:
- 2016-09-18
- Revision:
- 25:67079c11d1ea
- Parent:
- 24:6a5ba4cfa19a
File content as of revision 25:67079c11d1ea:
#ifndef M2XStreamClient_h
#define M2XStreamClient_h
#if (!defined(ARDUINO_PLATFORM)) && (!defined(ESP8266_PLATFORM)) && (!defined(MBED_PLATFORM))
#error "Platform definition is missing!"
#endif
#define M2X_VERSION "2.2.0"
#ifdef ARDUINO_PLATFORM
#include "m2x-arduino.h"
#endif /* ARDUINO_PLATFORM */
#ifdef ESP8266_PLATFORM
#include "m2x-esp8266.h"
#endif /* ESP8266_PLATFORM */
#ifdef MBED_PLATFORM
#include "m2x-mbed.h"
#endif /* MBED_PLATFORM */
/* If we don't have DBG defined, provide dump implementation */
#ifndef DBG
#define DBG(fmt_, data_)
#define DBGLN(fmt_, data_)
#define DBGLNEND
#endif /* DBG */
#define MIN(a, b) (((a) > (b))?(b):(a))
#define TO_HEX(t_) ((char) (((t_) > 9) ? ((t_) - 10 + 'A') : ((t_) + '0')))
#define MAX_DOUBLE_DIGITS 7
/* For tolower */
#include <ctype.h>
static const int E_OK = 0;
static const int E_NOCONNECTION = -1;
static const int E_DISCONNECTED = -2;
static const int E_NOTREACHABLE = -3;
static const int E_INVALID = -4;
static const int E_JSON_INVALID = -5;
static const int E_BUFFER_TOO_SMALL = -6;
static const int E_TIMESTAMP_ERROR = -8;
static const char* DEFAULT_M2X_HOST = "api-m2x.att.com";
static const int DEFAULT_M2X_PORT = 80;
static inline bool m2x_status_is_success(int status) {
return (status == E_OK) || (status >= 200 && status <= 299);
}
static inline bool m2x_status_is_client_error(int status) {
return status >= 400 && status <= 499;
}
static inline bool m2x_status_is_server_error(int status) {
return status >= 500 && status <= 599;
}
static inline bool m2x_status_is_error(int status) {
return m2x_status_is_client_error(status) ||
m2x_status_is_server_error(status);
}
// Null Print class used to calculate length to print
class NullPrint : public Print {
public:
size_t counter;
virtual size_t write(uint8_t b) {
counter++;
return 1;
}
virtual size_t write(const uint8_t* buf, size_t size) {
counter += size;
return size;
}
};
// Encodes and prints string using Percent-encoding specified
// in RFC 1738, Section 2.2
static inline int print_encoded_string(Print* print, const char* str) {
int bytes = 0;
for (int i = 0; str[i] != 0; i++) {
if (((str[i] >= 'A') && (str[i] <= 'Z')) ||
((str[i] >= 'a') && (str[i] <= 'z')) ||
((str[i] >= '0') && (str[i] <= '9')) ||
(str[i] == '-') || (str[i] == '_') ||
(str[i] == '.') || (str[i] == '~')) {
bytes += print->print(str[i]);
} else {
// Encode all other characters
bytes += print->print('%');
bytes += print->print(TO_HEX(str[i] / 16));
bytes += print->print(TO_HEX(str[i] % 16));
}
}
return bytes;
}
#ifdef M2X_ENABLE_READER
/*
* +type+ indicates the value type: 1 for string, 2 for number
* NOTE that the value type here only contains a hint on how
* you can use the value. Even though 2 is returned, the value
* is still stored in (const char *), and atoi/atof is needed to
* get the actual value
*/
typedef void (*stream_value_read_callback)(const char* at,
const char* value,
int index,
void* context,
int type);
typedef void (*location_read_callback)(const char* name,
double latitude,
double longitude,
double elevation,
const char* timestamp,
int index,
void* context);
typedef void (*m2x_command_read_callback)(const char* id,
const char* name,
int index,
void *context);
#endif /* M2X_ENABLE_READER */
typedef void (*m2x_fill_data_callback)(Print *print, void *context);
class M2XStreamClient {
public:
M2XStreamClient(Client* client,
const char* key,
void (* idlefunc)(void) = NULL,
int case_insensitive = 1,
const char* host = DEFAULT_M2X_HOST,
int port = DEFAULT_M2X_PORT,
const char* path_prefix = NULL);
// Push data stream value using PUT request, returns the HTTP status code
// NOTE: if you want to update by a serial, use "serial/<serial ID>" as
// the device ID here.
template <class T>
int updateStreamValue(const char* deviceId, const char* streamName, T value);
// Post multiple values to M2X all at once.
// +deviceId+ - id of the device to post values
// +streamNum+ - Number of streams to post
// +names+ - Array of stream names, the length of the array should
// be exactly +streamNum+
// +counts+ - Array of +streamNum+ length, each item in this array
// containing the number of values we want to post for each stream
// +ats+ - Timestamps for each value, the length of this array should
// be the some of all values in +counts+, for the first +counts[0]+
// items, the values belong to the first stream, for the following
// +counts[1]+ number of items, the values belong to the second stream,
// etc. Notice that timestamps are required here: you must provide
// a timestamp for each value posted.
// +values+ - Values to post. This works the same way as +ats+, the
// first +counts[0]+ number of items contain values to post to the first
// stream, the succeeding +counts[1]+ number of items contain values
// for the second stream, etc. The length of this array should be
// the sum of all values in +counts+ array.
// NOTE: if you want to update by a serial, use "serial/<serial ID>" as
// the device ID here.
template <class T>
int postDeviceUpdates(const char* deviceId, int streamNum,
const char* names[], const int counts[],
const char* ats[], T values[]);
// Post multiple values of a single device at once.
// +deviceId+ - id of the device to post values
// +streamNum+ - Number of streams to post
// +names+ - Array of stream names, the length of the array should
// be exactly +streamNum+
// +values+ - Array of values to post, the length of the array should
// be exactly +streamNum+. Notice that the array of +values+ should
// match the array of +names+, and that the ith value in +values+ is
// exactly the value to post for the ith stream name in +names+
// NOTE: if you want to update by a serial, use "serial/<serial ID>" as
// the device ID here.
template <class T>
int postDeviceUpdate(const char* deviceId, int streamNum,
const char* names[], T values[],
const char* at = NULL);
#ifdef M2X_ENABLE_READER
// Fetch values for a particular data stream. Since memory is
// very limited on an Arduino, we cannot parse and get all the
// data points in memory. Instead, we use callbacks here: whenever
// a new data point is parsed, we call the callback using the values,
// after that, the values will be thrown away to make space for new
// values.
// Note that you can also pass in a user-specified context in this
// function, this context will be passed to the callback function
// each time we get a data point.
// For each data point, the callback will be called once. The HTTP
// status code will be returned. And the content is only parsed when
// the status code is 200.
int listStreamValues(const char* deviceId, const char* streamName,
stream_value_read_callback callback, void* context,
const char* query = NULL);
#endif /* M2X_ENABLE_READER */
// Update datasource location
// NOTE: On an Arduino Uno and other ATMEGA based boards, double has
// 4-byte (32 bits) precision, which is the same as float. So there's
// no natural double-precision floating number on these boards. With
// a float value, we have a precision of roughly 7 digits, that means
// either 5 or 6 digits after the floating point. According to wikipedia,
// a difference of 0.00001 will give us ~1.1132m distance. If this
// precision is good for you, you can use the double-version we provided
// here. Otherwise, you may need to use the string-version and do the
// actual conversion by yourselves.
// However, with an Arduino Due board, double has 8-bytes (64 bits)
// precision, which means you are free to use the double-version only
// without any precision problems.
// Returned value is the http status code.
// NOTE: if you want to update by a serial, use "serial/<serial ID>" as
// the device ID here.
template <class T>
int updateLocation(const char* deviceId, const char* name,
T latitude, T longitude, T elevation);
#ifdef M2X_ENABLE_READER
// Read location information for a device. Also used callback to process
// data points for memory reasons. The HTTP status code is returned,
// response is only parsed when the HTTP status code is 200
int readLocation(const char* deviceId, location_read_callback callback,
void* context);
// Delete location information for a device. The HTTP status code is
// returned,response is only parsed when the HTTP status code is 200
int deleteLocations(const char* deviceId,
const char* from, const char* end);
#endif /* M2X_ENABLE_READER */
// Delete values from a data stream
// You will need to provide from and end date/time strings in the ISO8601
// format "yyyy-mm-ddTHH:MM:SS.SSSZ" where
// yyyy: the year
// mm: the month
// dd: the day
// HH: the hour (24 hour format)
// MM: the minute
// SS.SSS: the seconds (to the millisecond)
// NOTE: the time is given in Zulu (GMT)
// M2X will delete all values within the from to end date/time range.
// The status code is 204 on success and 400 on a bad request (e.g. the
// timestamp is not in ISO8601 format or the from timestamp is not less than
// or equal to the end timestamp.
int deleteValues(const char* deviceId, const char* streamName,
const char* from, const char* end);
#ifdef M2X_ENABLE_READER
// Fetch commands available for this device, notice that for memory constraints,
// we only keep ID and name of the command received here.
// You can tweak the command receiving via the query parameter.
int listCommands(const char* deviceId,
m2x_command_read_callback callback, void* context,
const char* query = NULL);
#endif /* M2X_ENABLE_READER */
// Mark a command as processed.
// Link: https://m2x.att.com/developer/documentation/v2/commands#Device-Marks-a-Command-as-Processed
// To make sure the minimal amount of memory is needed, this API works with
// a callback function. The callback function is then used to fill the request
// data, note that in order to correctly set the content length in HTTP header,
// the callback function will be called twice, the caller must make sure both
// calls fill the Print object with exactly the same data.
// If you have a pre-allocated buffer filled with the data to post, you can
// use markCommandProcessedWithData API below.
int markCommandProcessed(const char* deviceId, const char* commandId,
m2x_fill_data_callback callback, void *context);
// Mark a command as processed with a data buffer
// Link: https://m2x.att.com/developer/documentation/v2/commands#Device-Marks-a-Command-as-Processed
// This is exactly like markCommandProcessed, except that a buffer is use to
// contain the data to post as the request body.
int markCommandProcessedWithData(const char* deviceId, const char* commandId,
const char* data);
// Mark a command as rejected.
// Link: https://m2x.att.com/developer/documentation/v2/commands#Device-Marks-a-Command-as-Rejected
// To make sure the minimal amount of memory is needed, this API works with
// a callback function. The callback function is then used to fill the request
// data, note that in order to correctly set the content length in HTTP header,
// the callback function will be called twice, the caller must make sure both
// calls fill the Print object with exactly the same data.
// If you have a pre-allocated buffer filled with the data to post, you can
// use markCommandRejectedWithData API below.
int markCommandRejected(const char* deviceId, const char* commandId,
m2x_fill_data_callback callback, void *context);
// Mark a command as rejected with a data buffer
// Link: https://m2x.att.com/developer/documentation/v2/commands#Device-Marks-a-Command-as-Rejected
// This is exactly like markCommandRejected, except that a buffer is use to
// contain the data to post as the request body.
int markCommandRejectedWithData(const char* deviceId, const char* commandId,
const char* data);
// Fetches current timestamp in seconds from M2X server. Since we
// are using signed 32-bit integer as return value, this will only
// return valid results before 03:14:07 UTC on 19 January 2038. If
// the device is supposed to work after that, this function should
// not be used.
//
// The returned value will contain the status code(positive values)
// or the error code(negative values).
// In case of success, the current timestamp will be filled in the
// +ts+ pointer passed in as argument.
//
// NOTE: although returning uint32_t can give us a larger space,
// we prefer to cope with the unix convention here.
int getTimestamp32(int32_t* ts);
// Fetches current timestamp in seconds from M2X server.
// This function will return the timestamp as an integer literal
// in the provided buffer. Hence there's no problem working after
// 03:14:07 UTC on 19 January 2038. The drawback part here, is that
// you will have to work with 64-bit integer, which is not available
// on certain platform(such as Arduino), a bignum library or alike
// is needed in this case.
//
// Notice +bufferLength+ is supposed to contain the length of the
// buffer when calling this function. It is also the caller's
// responsibility to ensure the buffer is big enough, otherwise
// the library will return an error indicating the buffer is too
// small.
// While this is not accurate all the time, one trick here is to
// pass in 0 as the bufferLength, in which case we will always return
// the buffer-too-small error. However, the correct buffer length
// can be found this way so a secound execution is most likely to work
// (unless we are at the edge of the buffer length increasing, for
// example, when the timestamp jumps from 9999999999 to 10000000000,
// which is highly unlikely to happend). However, given that the
// maximum 64-bit integer can be stored in 19 bytes, there's not
// much need to use this trick.)
//
// The returned value will contain the status code(positive values)
// or the error code(negative values).
// In case of success, the current timestamp will be filled in the
// passed +buffer+ pointer, and the actual used buffer length will
// be returned in +bufferLength+ pointer.
// NOTE: as long as we can read the returned buffer length, it will
// be used to fill in the +bufferLength+ variable even though other
// errors occur(buffer is not enough, network is shutdown before
// reading the whole buffer, etc.)
int getTimestamp(char* buffer, int* bufferLength);
private:
Client* _client;
const char* _key;
int _case_insensitive;
const char* _host;
int _port;
void (* _idlefunc)(void);
const char* _path_prefix;
NullPrint _null_print;
// Writes the HTTP header part for updating a stream value
void writePutHeader(const char* deviceId,
const char* streamName,
int contentLength);
// Writes the HTTP header part for deleting stream values
void writeDeleteHeader(const char* deviceId,
const char* streamName,
int contentLength);
void writeDeleteLocationHeader(const char* deviceId,
int contentLength);
// Writes HTTP header lines including M2X API Key, host, content
// type and content length(if the body exists)
void writeHttpHeader(int contentLength);
// Parses HTTP response header and return the content length.
// Note that this function does not parse all http headers, as long
// as the content length is found, this function will return
int readContentLength();
// Skips all HTTP response header part. Return minus value in case
// the connection is closed before we got all headers
int skipHttpHeader();
// Parses and returns the HTTP status code, note this function will
// return immediately once it gets the status code
int readStatusCode(bool closeClient);
// Waits for a certain string pattern in the HTTP header, and returns
// once the pattern is found. In the pattern, you can use '*' to denote
// any character
int waitForString(const char* str);
// Closes the connection
void close();
#ifdef M2X_ENABLE_READER
// Parses JSON response of stream value API, and calls callback function
// once we get a data point
int readStreamValue(stream_value_read_callback callback, void* context);
// Parses JSON response of location API, and calls callback function once
// we get a data point
int readLocation(location_read_callback callback, void* context);
// Parses JSON response of command API, and calls callback function once
// we get a data point
int readCommand(m2x_command_read_callback callback, void* context);
#endif /* M2X_ENABLE_READER */
};
// A ISO8601 timestamp generation service for M2X.
// It uses the Time API provided by the M2X server to initialize
// clock, then uses millis() function provided by Arduino to calculate
// time advancements so as to reduce API query times.
//
// Right now, this service only works with 32-bit timestamp, meaning that
// this service won't work after 03:14:07 UTC on 19 January 2038. However,
// a similar service that uses 64-bit timestamp can be implemented following
// the logic here.
class TimeService {
public:
TimeService(M2XStreamClient* client);
// Initialize the time service. Notice the TimeService instance is only
// working after calling this function successfully.
int init();
// Reset the internal recorded time by calling M2X Time API again. Normally,
// you don't need to call this manually. TimeService will handle Arduino clock
// overflow automatically
int reset();
// Fills ISO8601 formatted timestamp into the buffer provided. +length+ should
// contains the maximum supported length of the buffer when calling. For now,
// the buffer should be able to store 25 characters for a full ISO8601 formatted
// timestamp, otherwise, an error will be returned.
int getTimestamp(char* buffer, int* length);
private:
M2XStreamClient* _client;
int32_t _server_timestamp;
uint32_t _local_last_milli;
M2XTimer _timer;
};
// Implementations
M2XStreamClient::M2XStreamClient(Client* client,
const char* key,
void (* idlefunc)(void),
int case_insensitive,
const char* host,
int port,
const char* path_prefix) : _client(client),
_key(key),
_idlefunc(idlefunc),
_case_insensitive(case_insensitive),
_host(host),
_port(port),
_path_prefix(path_prefix),
_null_print() {
}
template <class T>
int M2XStreamClient::updateStreamValue(const char* deviceId, const char* streamName, T value) {
if (_client->connect(_host, _port)) {
DBGLN("%s", "Connected to M2X server!");
writePutHeader(deviceId, streamName,
// for {"value": and }
_null_print.print(value) + 12);
_client->print("{\"value\":\"");
_client->print(value);
_client->print("\"}");
} else {
DBGLN("%s", "ERROR: Cannot connect to M2X server!");
return E_NOCONNECTION;
}
return readStatusCode(true);
}
template <class T>
inline int write_multiple_values(Print* print, int streamNum,
const char* names[], const int counts[],
const char* ats[], T values[]) {
int bytes = 0, value_index = 0;
bytes += print->print("{\"values\":{");
for (int i = 0; i < streamNum; i++) {
bytes += print->print("\"");
bytes += print->print(names[i]);
bytes += print->print("\":[");
for (int j = 0; j < counts[i]; j++) {
bytes += print->print("{\"timestamp\": \"");
bytes += print->print(ats[value_index]);
bytes += print->print("\",\"value\": \"");
bytes += print->print(values[value_index]);
bytes += print->print("\"}");
if (j < counts[i] - 1) { bytes += print->print(","); }
value_index++;
}
bytes += print->print("]");
if (i < streamNum - 1) { bytes += print->print(","); }
}
bytes += print->print("}}");
return bytes;
}
template <class T>
int M2XStreamClient::postDeviceUpdates(const char* deviceId, int streamNum,
const char* names[], const int counts[],
const char* ats[], T values[]) {
if (_client->connect(_host, _port)) {
DBGLN("%s", "Connected to M2X server!");
int length = write_multiple_values(&_null_print, streamNum, names,
counts, ats, values);
_client->print("POST ");
if (_path_prefix) { _client->print(_path_prefix); }
_client->print("/v2/devices/");
_client->print(deviceId);
_client->println("/updates HTTP/1.1");
writeHttpHeader(length);
write_multiple_values(_client, streamNum, names, counts, ats, values);
} else {
DBGLN("%s", "ERROR: Cannot connect to M2X server!");
return E_NOCONNECTION;
}
return readStatusCode(true);
}
template <class T>
inline int write_single_device_values(Print* print, int streamNum,
const char* names[], T values[],
const char* at) {
int bytes = 0;
bytes += print->print("{\"values\":{");
for (int i = 0; i < streamNum; i++) {
bytes += print->print("\"");
bytes += print->print(names[i]);
bytes += print->print("\": \"");
bytes += print->print(values[i]);
bytes += print->print("\"");
if (i < streamNum - 1) { bytes += print->print(","); }
}
bytes += print->print("}");
if (at != NULL) {
bytes += print->print(",\"timestamp\":\"");
bytes += print->print(at);
bytes += print->print("\"");
}
bytes += print->print("}");
return bytes;
}
template <class T>
int M2XStreamClient::postDeviceUpdate(const char* deviceId, int streamNum,
const char* names[], T values[],
const char* at) {
if (_client->connect(_host, _port)) {
DBGLN("%s", "Connected to M2X server!");
int length = write_single_device_values(&_null_print, streamNum, names,
values, at);
_client->print("POST ");
if (_path_prefix) { _client->print(_path_prefix); }
_client->print("/v2/devices/");
_client->print(deviceId);
_client->println("/update HTTP/1.1");
writeHttpHeader(length);
write_single_device_values(_client, streamNum, names, values, at);
} else {
DBGLN("%s", "ERROR: Cannot connect to M2X server!");
return E_NOCONNECTION;
}
return readStatusCode(true);
}
template <class T>
static int write_location_data(Print* print, const char* name,
T latitude, T longitude,
T elevation) {
int bytes = 0;
bytes += print->print("{\"name\":\"");
bytes += print->print(name);
bytes += print->print("\",\"latitude\":\"");
bytes += print->print(latitude);
bytes += print->print("\",\"longitude\":\"");
bytes += print->print(longitude);
bytes += print->print("\",\"elevation\":\"");
bytes += print->print(elevation);
bytes += print->print("\"}");
return bytes;
}
static int write_location_data(Print* print, const char* name,
double latitude, double longitude,
double elevation) {
int bytes = 0;
bytes += print->print("{\"name\":\"");
bytes += print->print(name);
bytes += print->print("\",\"latitude\":\"");
bytes += print->print(latitude, MAX_DOUBLE_DIGITS);
bytes += print->print("\",\"longitude\":\"");
bytes += print->print(longitude, MAX_DOUBLE_DIGITS);
bytes += print->print("\",\"elevation\":\"");
bytes += print->print(elevation);
bytes += print->print("\"}");
return bytes;
}
template <class T>
int M2XStreamClient::updateLocation(const char* deviceId,
const char* name,
T latitude,
T longitude,
T elevation) {
if (_client->connect(_host, _port)) {
DBGLN("%s", "Connected to M2X server!");
int length = write_location_data(&_null_print, name, latitude, longitude,
elevation);
_client->print("PUT ");
if (_path_prefix) { _client->print(_path_prefix); }
_client->print("/v2/devices/");
_client->print(deviceId);
_client->println("/location HTTP/1.1");
writeHttpHeader(length);
write_location_data(_client, name, latitude, longitude, elevation);
} else {
DBGLN("%s", "ERROR: Cannot connect to M2X server!");
return E_NOCONNECTION;
}
return readStatusCode(true);
}
static inline int write_delete_values(Print* print, const char* from,
const char* end) {
int bytes = 0;
bytes += print->print("{\"from\":\"");
bytes += print->print(from);
bytes += print->print("\",\"end\":\"");
bytes += print->print(end);
bytes += print->print("\"}");
return bytes;
}
int M2XStreamClient::deleteValues(const char* deviceId, const char* streamName,
const char* from, const char* end) {
if (_client->connect(_host, _port)) {
DBGLN("%s", "Connected to M2X server!");
int length = write_delete_values(&_null_print, from, end);
writeDeleteHeader(deviceId, streamName, length);
write_delete_values(_client, from, end);
} else {
DBGLN("%s", "ERROR: Cannot connect to M2X server!");
return E_NOCONNECTION;
}
return readStatusCode(true);
}
int M2XStreamClient::markCommandProcessed(const char* deviceId,
const char* commandId,
m2x_fill_data_callback callback,
void *context) {
if (_client->connect(_host, _port)) {
DBGLN("%s", "Connected to M2X server!");
_null_print.counter = 0;
callback(&_null_print, context);
int length = _null_print.counter;
_client->print("POST ");
if (_path_prefix) { _client->print(_path_prefix); }
_client->print("/v2/devices/");
_client->print(deviceId);
_client->print("/commands/");
_client->print(commandId);
_client->print("/process");
_client->println(" HTTP/1.1");
writeHttpHeader(length);
callback(_client, context);
} else {
DBGLN("%s", "ERROR: Cannot connect to M2X server!");
return E_NOCONNECTION;
}
return readStatusCode(true);
}
static void m2x_fixed_buffer_filling_callback(Print *print, void *context) {
if (context) {
print->print((const char *) context);
}
}
int M2XStreamClient::markCommandProcessedWithData(const char* deviceId,
const char* commandId,
const char* data) {
return markCommandProcessed(deviceId, commandId,
m2x_fixed_buffer_filling_callback, (void *) data);
}
int M2XStreamClient::markCommandRejected(const char* deviceId,
const char* commandId,
m2x_fill_data_callback callback,
void *context) {
if (_client->connect(_host, _port)) {
DBGLN("%s", "Connected to M2X server!");
_null_print.counter = 0;
callback(&_null_print, context);
int length = _null_print.counter;
_client->print("POST ");
if (_path_prefix) { _client->print(_path_prefix); }
_client->print("/v2/devices/");
_client->print(deviceId);
_client->print("/commands/");
_client->print(commandId);
_client->print("/reject");
_client->println(" HTTP/1.1");
writeHttpHeader(length);
callback(_client, context);
} else {
DBGLN("%s", "ERROR: Cannot connect to M2X server!");
return E_NOCONNECTION;
}
return readStatusCode(true);
}
int M2XStreamClient::markCommandRejectedWithData(const char* deviceId,
const char* commandId,
const char* data) {
return markCommandRejected(deviceId, commandId,
m2x_fixed_buffer_filling_callback, (void *) data);
}
int M2XStreamClient::getTimestamp32(int32_t *ts) {
// The maximum value of signed 64-bit integer is 0x7fffffffffffffff,
// which is 9223372036854775807. It consists of 19 characters, so a
// buffer of 20 is definitely enough here
int length = 20;
char buffer[20];
int status = getTimestamp(buffer, &length);
if (status == 200) {
int32_t result = 0;
for (int i = 0; i < length; i++) {
result = result * 10 + (buffer[i] - '0');
}
if (ts != NULL) { *ts = result; }
}
return status;
}
int M2XStreamClient::getTimestamp(char* buffer, int *bufferLength) {
if (bufferLength == NULL) { return E_INVALID; }
if (_client->connect(_host, _port)) {
DBGLN("%s", "Connected to M2X server!");
_client->print("GET ");
if (_path_prefix) { _client->print(_path_prefix); }
_client->println("/v2/time/seconds HTTP/1.1");
writeHttpHeader(-1);
} else {
DBGLN("%s", "ERROR: Cannot connect to M2X server!");
return E_NOCONNECTION;
}
int status = readStatusCode(false);
if (status == 200) {
int length = readContentLength();
if (length < 0) {
close();
return length;
}
if (*bufferLength < length) {
*bufferLength = length;
return E_BUFFER_TOO_SMALL;
}
*bufferLength = length;
int index = skipHttpHeader();
if (index != E_OK) {
close();
return index;
}
index = 0;
while (index < length) {
DBG("%s", "Received Data: ");
while ((index < length) && _client->available()) {
buffer[index++] = _client->read();
DBG("%c", buffer[index - 1]);
}
DBGLNEND;
if ((!_client->connected()) &&
(index < length)) {
close();
return E_NOCONNECTION;
}
if (_idlefunc!=NULL) {
_idlefunc();
} else {
delay(200);
}
}
}
close();
return status;
}
void M2XStreamClient::writePutHeader(const char* deviceId,
const char* streamName,
int contentLength) {
_client->print("PUT ");
if (_path_prefix) { _client->print(_path_prefix); }
_client->print("/v2/devices/");
_client->print(deviceId);
_client->print("/streams/");
print_encoded_string(_client, streamName);
_client->println("/value HTTP/1.1");
writeHttpHeader(contentLength);
}
void M2XStreamClient::writeDeleteHeader(const char* deviceId,
const char* streamName,
int contentLength) {
_client->print("DELETE ");
if (_path_prefix) { _client->print(_path_prefix); }
_client->print("/v2/devices/");
_client->print(deviceId);
_client->print("/streams/");
print_encoded_string(_client, streamName);
_client->print("/values");
_client->println(" HTTP/1.1");
writeHttpHeader(contentLength);
}
void M2XStreamClient::writeDeleteLocationHeader(const char* deviceId,
int contentLength) {
_client->print("DELETE ");
if (_path_prefix) { _client->print(_path_prefix); }
_client->print("/v2/devices/");
_client->print(deviceId);
_client->print("/location/waypoints/");
_client->println(" HTTP/1.1");
writeHttpHeader(contentLength);
}
void M2XStreamClient::writeHttpHeader(int contentLength) {
_client->println(USER_AGENT);
_client->print("X-M2X-KEY: ");
_client->println(_key);
_client->print("Host: ");
print_encoded_string(_client, _host);
if (_port != DEFAULT_M2X_PORT) {
_client->print(":");
// port is an integer, does not need encoding
_client->print(_port);
}
_client->println();
if (contentLength > 0) {
_client->println("Content-Type: application/json");
DBG("%s", "Content Length: ");
DBGLN("%d", contentLength);
_client->print("Content-Length: ");
_client->println(contentLength);
}
_client->println();
}
int M2XStreamClient::waitForString(const char* str) {
int currentIndex = 0;
if (str[currentIndex] == '\0') return E_OK;
while (true) {
while (_client->available()) {
char c = _client->read();
DBG("%c", c);
int cmp;
if (_case_insensitive) {
cmp = tolower(c) - tolower(str[currentIndex]);
} else {
cmp = c - str[currentIndex];
}
if ((str[currentIndex] == '*') || (cmp == 0)) {
currentIndex++;
if (str[currentIndex] == '\0') {
return E_OK;
}
} else {
// start from the beginning
currentIndex = 0;
}
}
if (!_client->connected()) {
DBGLN("%s", "ERROR: The client is disconnected from the server!");
close();
return E_DISCONNECTED;
}
if (_idlefunc!=NULL)
_idlefunc();
else
delay(200);
}
// never reached here
return E_NOTREACHABLE;
}
int M2XStreamClient::readStatusCode(bool closeClient) {
int responseCode = 0;
int ret = waitForString("HTTP/*.* ");
if (ret != E_OK) {
if (closeClient) close();
return ret;
}
// ret is not needed from here(since it must be E_OK), so we can use it
// as a regular variable now.
ret = 0;
while (true) {
while (_client->available()) {
char c = _client->read();
DBG("%c", c);
responseCode = responseCode * 10 + (c - '0');
ret++;
if (ret == 3) {
if (closeClient) close();
return responseCode;
}
}
if (!_client->connected()) {
DBGLN("%s", "ERROR: The client is disconnected from the server!");
if (closeClient) close();
return E_DISCONNECTED;
}
if (_idlefunc!=NULL)
_idlefunc();
else
delay(200);
}
// never reached here
return E_NOTREACHABLE;
}
int M2XStreamClient::readContentLength() {
int ret = waitForString("Content-Length: ");
if (ret != E_OK) {
return ret;
}
// From now on, ret is not needed, we can use it
// to keep the final result
ret = 0;
while (true) {
while (_client->available()) {
char c = _client->read();
DBG("%c", c);
if ((c == '\r') || (c == '\n')) {
return (ret == 0) ? (E_INVALID) : (ret);
} else {
ret = ret * 10 + (c - '0');
}
}
if (!_client->connected()) {
DBGLN("%s", "ERROR: The client is disconnected from the server!");
return E_DISCONNECTED;
}
if (_idlefunc!=NULL)
_idlefunc();
else
delay(200);
}
// never reached here
return E_NOTREACHABLE;
}
int M2XStreamClient::skipHttpHeader() {
return waitForString("\n\r\n");
}
void M2XStreamClient::close() {
// Eats up buffered data before closing
_client->flush();
_client->stop();
}
static inline int fill_iso8601_timestamp(int32_t seconds, int32_t milli,
char* buffer, int* length);
TimeService::TimeService(M2XStreamClient* client) : _client(client) {
}
int TimeService::init() {
_timer.start();
return reset();
}
int TimeService::reset() {
int32_t ts;
int status = _client->getTimestamp32(&ts);
if (m2x_status_is_success(status)) {
_server_timestamp = ts;
_local_last_milli = _timer.read_ms();
}
return status;
}
int TimeService::getTimestamp(char* buffer, int* length) {
uint32_t now = _timer.read_ms();
if (now < _local_last_milli) {
// In case of a timestamp overflow(happens once every 50 days on
// Arduino), we reset the server timestamp recorded.
// NOTE: while on an Arduino this might be okay, the situation is worse
// on mbed, see the notes in m2x-mbed.h for details
int status = reset();
if (!m2x_status_is_success(status)) { return status; }
now = _timer.read_ms();
}
if (now < _local_last_milli) {
// We have already reseted the timestamp, so this cannot happen
// (an HTTP request can take longer than 50 days to finished? You
// must be kidding here). Something else must be wrong here
return E_TIMESTAMP_ERROR;
}
uint32_t diff = now - _local_last_milli;
_local_last_milli = now;
_server_timestamp += (int32_t) (diff / 1000); // Milliseconds to seconds
return fill_iso8601_timestamp(_server_timestamp, (int32_t) (diff % 1000),
buffer, length);
}
#define SIZE_ISO_8601 25
static inline bool is_leap_year(int16_t y) {
return ((1970 + y) > 0) &&
!((1970 + y) % 4) &&
(((1970 + y) % 100) || !((1970 + y) % 400));
}
static inline int32_t days_in_year(int16_t y) {
return is_leap_year(y) ? 366 : 365;
}
static const uint8_t MONTH_DAYS[]={31,28,31,30,31,30,31,31,30,31,30,31};
static inline int fill_iso8601_timestamp(int32_t timestamp, int32_t milli,
char* buffer, int* length) {
int16_t year;
int8_t month, month_length;
int32_t day;
int8_t hour, minute, second;
if (*length < SIZE_ISO_8601) {
*length = SIZE_ISO_8601;
return E_BUFFER_TOO_SMALL;
}
second = timestamp % 60;
timestamp /= 60; // now it is minutes
minute = timestamp % 60;
timestamp /= 60; // now it is hours
hour = timestamp % 24;
timestamp /= 24; // now it is days
year = 0;
day = 0;
while ((day += days_in_year(year)) <= timestamp) {
year++;
}
day -= days_in_year(year);
timestamp -= day; // now it is days in this year, starting at 0
day = 0;
month_length = 0;
for (month = 0; month < 12; month++) {
if (month == 1) {
// February
month_length = is_leap_year(year) ? 29 : 28;
} else {
month_length = MONTH_DAYS[month];
}
if (timestamp >= month_length) {
timestamp -= month_length;
} else {
break;
}
}
year = 1970 + year;
month++; // offset by 1
day = timestamp + 1;
int i = 0, j = 0;
// NOTE: It seems the snprintf implementation in Arduino has bugs,
// we have to manually piece the string together here.
#define INT_TO_STR(v_, width_) \
for (j = 0; j < (width_); j++) { \
buffer[i + (width_) - 1 - j] = '0' + ((v_) % 10); \
(v_) /= 10; \
} \
i += (width_)
INT_TO_STR(year, 4);
buffer[i++] = '-';
INT_TO_STR(month, 2);
buffer[i++] = '-';
INT_TO_STR(day, 2);
buffer[i++] = 'T';
INT_TO_STR(hour, 2);
buffer[i++] = ':';
INT_TO_STR(minute, 2);
buffer[i++] = ':';
INT_TO_STR(second, 2);
buffer[i++] = '.';
INT_TO_STR(milli, 3);
buffer[i++] = 'Z';
buffer[i++] = '\0';
#undef INT_TO_STR
*length = i;
return E_OK;
}
/* Reader functions */
#ifdef M2X_ENABLE_READER
// Data structures and functions used to parse stream values
#define STREAM_BUF_LEN 32
typedef struct {
uint8_t state;
char at_str[STREAM_BUF_LEN + 1];
char value_str[STREAM_BUF_LEN + 1];
int index;
stream_value_read_callback callback;
void* context;
} stream_parsing_context_state;
#define WAITING_AT 0x1
#define GOT_AT 0x2
#define WAITING_VALUE 0x4
#define GOT_VALUE 0x8
#define GOT_STREAM (GOT_AT | GOT_VALUE)
#define TEST_GOT_STREAM(state_) (((state_) & GOT_STREAM) == GOT_STREAM)
#define TEST_IS_AT(state_) (((state_) & (WAITING_AT | GOT_AT)) == WAITING_AT)
#define TEST_IS_VALUE(state_) (((state_) & (WAITING_VALUE | GOT_VALUE)) == \
WAITING_VALUE)
static void on_stream_key_found(jsonlite_callback_context* context,
jsonlite_token* token)
{
stream_parsing_context_state* state =
(stream_parsing_context_state*) context->client_state;
if (strncmp((const char*) token->start, "timestamp", 9) == 0) {
state->state |= WAITING_AT;
} else if ((strncmp((const char*) token->start, "value", 5) == 0) &&
(token->start[5] != 's')) { // get rid of "values"
state->state |= WAITING_VALUE;
}
}
static void on_stream_value_found(jsonlite_callback_context* context,
jsonlite_token* token,
int type)
{
stream_parsing_context_state* state =
(stream_parsing_context_state*) context->client_state;
if (TEST_IS_AT(state->state)) {
strncpy(state->at_str, (const char*) token->start,
MIN(token->end - token->start, STREAM_BUF_LEN));
state->at_str[MIN(token->end - token->start, STREAM_BUF_LEN)] = '\0';
state->state |= GOT_AT;
} else if (TEST_IS_VALUE(state->state)) {
strncpy(state->value_str, (const char*) token->start,
MIN(token->end - token->start, STREAM_BUF_LEN));
state->value_str[MIN(token->end - token->start, STREAM_BUF_LEN)] = '\0';
state->state |= GOT_VALUE;
}
if (TEST_GOT_STREAM(state->state)) {
state->callback(state->at_str, state->value_str,
state->index++, state->context, type);
state->state = 0;
}
}
static void on_stream_string_found(jsonlite_callback_context* context,
jsonlite_token* token)
{
on_stream_value_found(context, token, 1);
}
static void on_stream_number_found(jsonlite_callback_context* context,
jsonlite_token* token)
{
on_stream_value_found(context, token, 2);
}
// Data structures and functions used to parse locations
#define LOCATION_BUF_LEN 20
typedef struct {
uint16_t state;
char name_str[LOCATION_BUF_LEN + 1];
double latitude;
double longitude;
double elevation;
char timestamp_str[LOCATION_BUF_LEN + 1];
int index;
location_read_callback callback;
void* context;
} location_parsing_context_state;
#define WAITING_NAME 0x1
#define WAITING_LATITUDE 0x2
#define WAITING_LONGITUDE 0x4
#define WAITING_ELEVATION 0x8
#define WAITING_TIMESTAMP 0x10
#define GOT_NAME 0x20
#define GOT_LATITUDE 0x40
#define GOT_LONGITUDE 0x80
#define GOT_ELEVATION 0x100
#define GOT_TIMESTAMP 0x200
#define GOT_LOCATION (GOT_NAME | GOT_LATITUDE | GOT_LONGITUDE | GOT_ELEVATION | GOT_TIMESTAMP)
#define TEST_GOT_LOCATION(state_) (((state_) & GOT_LOCATION) == GOT_LOCATION)
#define TEST_IS_NAME(state_) (((state_) & (WAITING_NAME | GOT_NAME)) == WAITING_NAME)
#define TEST_IS_LATITUDE(state_) (((state_) & (WAITING_LATITUDE | GOT_LATITUDE)) \
== WAITING_LATITUDE)
#define TEST_IS_LONGITUDE(state_) (((state_) & (WAITING_LONGITUDE | GOT_LONGITUDE)) \
== WAITING_LONGITUDE)
#define TEST_IS_ELEVATION(state_) (((state_) & (WAITING_ELEVATION | GOT_ELEVATION)) \
== WAITING_ELEVATION)
#define TEST_IS_TIMESTAMP(state_) (((state_) & (WAITING_TIMESTAMP | GOT_TIMESTAMP)) \
== WAITING_TIMESTAMP)
static void on_location_key_found(jsonlite_callback_context* context,
jsonlite_token* token) {
location_parsing_context_state* state =
(location_parsing_context_state*) context->client_state;
if (strncmp((const char*) token->start, "waypoints", 9) == 0) {
// only parses those locations in waypoints, skip the outer one
state->state = 0;
} else if (strncmp((const char*) token->start, "name", 4) == 0) {
state->state |= WAITING_NAME;
} else if (strncmp((const char*) token->start, "latitude", 8) == 0) {
state->state |= WAITING_LATITUDE;
} else if (strncmp((const char*) token->start, "longitude", 9) == 0) {
state->state |= WAITING_LONGITUDE;
} else if (strncmp((const char*) token->start, "elevation", 9) == 0) {
state->state |= WAITING_ELEVATION;
} else if (strncmp((const char*) token->start, "timestamp", 9) == 0) {
state->state |= WAITING_TIMESTAMP;
}
}
static void on_location_string_found(jsonlite_callback_context* context,
jsonlite_token* token) {
location_parsing_context_state* state =
(location_parsing_context_state*) context->client_state;
if (TEST_IS_NAME(state->state)) {
strncpy(state->name_str, (const char*) token->start,
MIN(token->end - token->start, LOCATION_BUF_LEN));
state->name_str[MIN(token->end - token->start, LOCATION_BUF_LEN)] = '\0';
state->state |= GOT_NAME;
} else if (TEST_IS_LATITUDE(state->state)) {
state->latitude = atof((const char*) token->start);
state->state |= GOT_LATITUDE;
} else if (TEST_IS_LONGITUDE(state->state)) {
state->longitude = atof((const char*) token->start);
state->state |= GOT_LONGITUDE;
} else if (TEST_IS_ELEVATION(state->state)) {
state->elevation = atof((const char*) token->start);
state->state |= GOT_ELEVATION;
} else if (TEST_IS_TIMESTAMP(state->state)) {
strncpy(state->timestamp_str, (const char*) token->start,
MIN(token->end - token->start, LOCATION_BUF_LEN));
state->timestamp_str[MIN(token->end - token->start, LOCATION_BUF_LEN)] = '\0';
state->state |= GOT_TIMESTAMP;
}
if (TEST_GOT_LOCATION(state->state)) {
state->callback(state->name_str, state->latitude, state->longitude,
state->elevation, state->timestamp_str, state->index++,
state->context);
state->state = 0;
}
}
#ifndef M2X_COMMAND_BUF_LEN
#define M2X_COMMAND_BUF_LEN 40
#endif /* M2X_COMMAND_BUF_LEN */
typedef struct {
uint8_t state;
char id_str[M2X_COMMAND_BUF_LEN + 1];
char name_str[M2X_COMMAND_BUF_LEN + 1];
int index;
m2x_command_read_callback callback;
void* context;
} m2x_command_parsing_context_state;
#define M2X_COMMAND_WAITING_ID 0x1
#define M2X_COMMAND_GOT_ID 0x2
#define M2X_COMMAND_WAITING_NAME 0x4
#define M2X_COMMAND_GOT_NAME 0x8
#define M2X_COMMAND_GOT_COMMAND (M2X_COMMAND_GOT_ID | M2X_COMMAND_GOT_NAME)
#define M2X_COMMAND_TEST_GOT_COMMAND(state_) \
(((state_) & M2X_COMMAND_GOT_COMMAND) == M2X_COMMAND_GOT_COMMAND)
#define M2X_COMMAND_TEST_IS_ID(state_) \
(((state_) & (M2X_COMMAND_WAITING_ID | M2X_COMMAND_GOT_ID)) == \
M2X_COMMAND_WAITING_ID)
#define M2X_COMMAND_TEST_IS_NAME(state_) \
(((state_) & (M2X_COMMAND_WAITING_NAME | M2X_COMMAND_GOT_NAME)) == \
M2X_COMMAND_WAITING_NAME)
static void m2x_on_command_key_found(jsonlite_callback_context* context,
jsonlite_token* token)
{
m2x_command_parsing_context_state* state =
(m2x_command_parsing_context_state*) context->client_state;
if (strncmp((const char*) token->start, "id", 2) == 0) {
state->state |= M2X_COMMAND_WAITING_ID;
} else if (strncmp((const char*) token->start, "name", 4) == 0) {
state->state |= M2X_COMMAND_WAITING_NAME;
}
}
static void m2x_on_command_value_found(jsonlite_callback_context* context,
jsonlite_token* token)
{
m2x_command_parsing_context_state* state =
(m2x_command_parsing_context_state*) context->client_state;
if (M2X_COMMAND_TEST_IS_ID(state->state)) {
strncpy(state->id_str, (const char*) token->start,
MIN(token->end - token->start, M2X_COMMAND_BUF_LEN));
state->id_str[MIN(token->end - token->start, M2X_COMMAND_BUF_LEN)] = '\0';
state->state |= M2X_COMMAND_GOT_ID;
} else if (M2X_COMMAND_TEST_IS_NAME(state->state)) {
strncpy(state->name_str, (const char*) token->start,
MIN(token->end - token->start, M2X_COMMAND_BUF_LEN));
state->name_str[MIN(token->end - token->start, M2X_COMMAND_BUF_LEN)] = '\0';
state->state |= M2X_COMMAND_GOT_NAME;
}
if (M2X_COMMAND_TEST_GOT_COMMAND(state->state)) {
state->callback(state->id_str, state->name_str,
state->index++, state->context);
state->state = 0;
}
}
int M2XStreamClient::listStreamValues(const char* deviceId, const char* streamName,
stream_value_read_callback callback, void* context,
const char* query) {
if (_client->connect(_host, _port)) {
DBGLN("%s", "Connected to M2X server!");
_client->print("GET ");
if (_path_prefix) { _client->print(_path_prefix); }
_client->print("/v2/devices/");
_client->print(deviceId);
_client->print("/streams/");
print_encoded_string(_client, streamName);
_client->print("/values");
if (query) {
if (query[0] != '?') {
_client->print('?');
}
_client->print(query);
}
_client->println(" HTTP/1.1");
writeHttpHeader(-1);
} else {
DBGLN("%s", "ERROR: Cannot connect to M2X server!");
return E_NOCONNECTION;
}
int status = readStatusCode(false);
if (status == 200) {
readStreamValue(callback, context);
}
close();
return status;
}
int M2XStreamClient::readLocation(const char* deviceId,
location_read_callback callback,
void* context) {
if (_client->connect(_host, _port)) {
DBGLN("%s", "Connected to M2X server!");
_client->print("GET ");
if (_path_prefix) { _client->print(_path_prefix); }
_client->print("/v2/devices/");
_client->print(deviceId);
_client->println("/location HTTP/1.1");
writeHttpHeader(-1);
} else {
DBGLN("%s", "ERROR: Cannot connect to M2X server!");
return E_NOCONNECTION;
}
int status = readStatusCode(false);
if (status == 200) {
readLocation(callback, context);
}
close();
return status;
}
int M2XStreamClient::deleteLocations(const char* deviceId,
const char* from, const char* end) {
if (_client->connect(_host, _port)) {
DBGLN("%s", "Connected to M2X server!");
int length = write_delete_values(&_null_print, from, end);
writeDeleteLocationHeader(deviceId, length);
write_delete_values(_client, from, end);
} else {
DBGLN("%s", "ERROR: Cannot connect to M2X server!");
return E_NOCONNECTION;
}
return readStatusCode(true);
}
int M2XStreamClient::listCommands(const char* deviceId,
m2x_command_read_callback callback,
void* context,
const char* query) {
if (_client->connect(_host, _port)) {
DBGLN("%s", "Connected to M2X server!");
_client->print("GET ");
if (_path_prefix) { _client->print(_path_prefix); }
_client->print("/v2/devices/");
_client->print(deviceId);
_client->print("/commands");
if (query) {
if (query[0] != '?') {
_client->print('?');
}
_client->print(query);
}
_client->println(" HTTP/1.1");
writeHttpHeader(-1);
} else {
DBGLN("%s", "ERROR: Cannot connect to M2X server!");
return E_NOCONNECTION;
}
int status = readStatusCode(false);
if (status == 200) {
readCommand(callback, context);
}
close();
return status;
}
int M2XStreamClient::readStreamValue(stream_value_read_callback callback,
void* context) {
const int BUF_LEN = 64;
char buf[BUF_LEN];
int length = readContentLength();
if (length < 0) {
close();
return length;
}
int index = skipHttpHeader();
if (index != E_OK) {
close();
return index;
}
index = 0;
stream_parsing_context_state state;
state.state = state.index = 0;
state.callback = callback;
state.context = context;
jsonlite_parser_callbacks cbs = jsonlite_default_callbacks;
cbs.key_found = on_stream_key_found;
cbs.number_found = on_stream_number_found;
cbs.string_found = on_stream_string_found;
cbs.context.client_state = &state;
jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5));
jsonlite_parser_set_callback(p, &cbs);
jsonlite_result result = jsonlite_result_unknown;
while (index < length) {
int i = 0;
DBG("%s", "Received Data: ");
while ((i < BUF_LEN) && _client->available()) {
buf[i++] = _client->read();
DBG("%c", buf[i - 1]);
}
DBGLNEND;
if ((!_client->connected()) &&
(!_client->available()) &&
((index + i) < length)) {
jsonlite_parser_release(p);
close();
return E_NOCONNECTION;
}
result = jsonlite_parser_tokenize(p, buf, i);
if ((result != jsonlite_result_ok) &&
(result != jsonlite_result_end_of_stream)) {
jsonlite_parser_release(p);
close();
return E_JSON_INVALID;
}
index += i;
}
jsonlite_parser_release(p);
close();
return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID);
}
int M2XStreamClient::readLocation(location_read_callback callback,
void* context) {
const int BUF_LEN = 40;
char buf[BUF_LEN];
int length = readContentLength();
if (length < 0) {
close();
return length;
}
int index = skipHttpHeader();
if (index != E_OK) {
close();
return index;
}
index = 0;
location_parsing_context_state state;
state.state = state.index = 0;
state.callback = callback;
state.context = context;
jsonlite_parser_callbacks cbs = jsonlite_default_callbacks;
cbs.key_found = on_location_key_found;
cbs.string_found = on_location_string_found;
cbs.context.client_state = &state;
jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5));
jsonlite_parser_set_callback(p, &cbs);
jsonlite_result result = jsonlite_result_unknown;
while (index < length) {
int i = 0;
DBG("%s", "Received Data: ");
while ((i < BUF_LEN) && _client->available()) {
buf[i++] = _client->read();
DBG("%c", buf[i - 1]);
}
DBGLNEND;
if ((!_client->connected()) &&
(!_client->available()) &&
((index + i) < length)) {
jsonlite_parser_release(p);
close();
return E_NOCONNECTION;
}
result = jsonlite_parser_tokenize(p, buf, i);
if ((result != jsonlite_result_ok) &&
(result != jsonlite_result_end_of_stream)) {
jsonlite_parser_release(p);
close();
return E_JSON_INVALID;
}
index += i;
}
jsonlite_parser_release(p);
close();
return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID);
}
int M2XStreamClient::readCommand(m2x_command_read_callback callback,
void* context) {
const int BUF_LEN = 60;
char buf[BUF_LEN];
int length = readContentLength();
if (length < 0) {
close();
return length;
}
int index = skipHttpHeader();
if (index != E_OK) {
close();
return index;
}
index = 0;
m2x_command_parsing_context_state state;
state.state = 0;
state.index = 0;
state.callback = callback;
state.context = context;
jsonlite_parser_callbacks cbs = jsonlite_default_callbacks;
cbs.key_found = m2x_on_command_key_found;
cbs.string_found = m2x_on_command_value_found;
cbs.context.client_state = &state;
jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5));
jsonlite_parser_set_callback(p, &cbs);
jsonlite_result result = jsonlite_result_unknown;
while (index < length) {
int i = 0;
DBG("%s", "Received Data: ");
while ((i < BUF_LEN) && _client->available()) {
buf[i++] = _client->read();
DBG("%c", buf[i - 1]);
}
DBGLNEND;
if ((!_client->connected()) &&
(!_client->available()) &&
((index + i) < length)) {
jsonlite_parser_release(p);
close();
return E_NOCONNECTION;
}
result = jsonlite_parser_tokenize(p, buf, i);
if ((result != jsonlite_result_ok) &&
(result != jsonlite_result_end_of_stream)) {
jsonlite_parser_release(p);
close();
return E_JSON_INVALID;
}
index += i;
}
jsonlite_parser_release(p);
close();
return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID);
}
#endif /* M2X_ENABLE_READER */
#endif /* M2XStreamClient_h */
