ARM mbed M2X API Client: The ARM mbed client library is used to send/receive data to/from AT&T's M2X service from mbed LPC1768 microcontrollers.
Dependents: m2x-demo-all M2X_MTS_ACCEL_DEMO M2X_MTS_Accel M2X_K64F_ACCEL ... more
M2XStreamClient.h
00001 #ifndef M2XStreamClient_h 00002 #define M2XStreamClient_h 00003 00004 #if (!defined(ARDUINO_PLATFORM)) && (!defined(ESP8266_PLATFORM)) && (!defined(MBED_PLATFORM)) 00005 #error "Platform definition is missing!" 00006 #endif 00007 00008 #define M2X_VERSION "2.2.0" 00009 00010 #ifdef ARDUINO_PLATFORM 00011 #include "m2x-arduino.h" 00012 #endif /* ARDUINO_PLATFORM */ 00013 00014 #ifdef ESP8266_PLATFORM 00015 #include "m2x-esp8266.h" 00016 #endif /* ESP8266_PLATFORM */ 00017 00018 #ifdef MBED_PLATFORM 00019 #include "m2x-mbed.h" 00020 #endif /* MBED_PLATFORM */ 00021 00022 /* If we don't have DBG defined, provide dump implementation */ 00023 #ifndef DBG 00024 #define DBG(fmt_, data_) 00025 #define DBGLN(fmt_, data_) 00026 #define DBGLNEND 00027 #endif /* DBG */ 00028 00029 #define MIN(a, b) (((a) > (b))?(b):(a)) 00030 #define TO_HEX(t_) ((char) (((t_) > 9) ? ((t_) - 10 + 'A') : ((t_) + '0'))) 00031 #define MAX_DOUBLE_DIGITS 7 00032 00033 /* For tolower */ 00034 #include <ctype.h> 00035 00036 static const int E_OK = 0; 00037 static const int E_NOCONNECTION = -1; 00038 static const int E_DISCONNECTED = -2; 00039 static const int E_NOTREACHABLE = -3; 00040 static const int E_INVALID = -4; 00041 static const int E_JSON_INVALID = -5; 00042 static const int E_BUFFER_TOO_SMALL = -6; 00043 static const int E_TIMESTAMP_ERROR = -8; 00044 00045 static const char* DEFAULT_M2X_HOST = "api-m2x.att.com"; 00046 static const int DEFAULT_M2X_PORT = 80; 00047 00048 static inline bool m2x_status_is_success(int status) { 00049 return (status == E_OK) || (status >= 200 && status <= 299); 00050 } 00051 00052 static inline bool m2x_status_is_client_error(int status) { 00053 return status >= 400 && status <= 499; 00054 } 00055 00056 static inline bool m2x_status_is_server_error(int status) { 00057 return status >= 500 && status <= 599; 00058 } 00059 00060 static inline bool m2x_status_is_error(int status) { 00061 return m2x_status_is_client_error(status) || 00062 m2x_status_is_server_error(status); 00063 } 00064 00065 // Null Print class used to calculate length to print 00066 class NullPrint : public Print { 00067 public: 00068 size_t counter; 00069 00070 virtual size_t write(uint8_t b) { 00071 counter++; 00072 return 1; 00073 } 00074 00075 virtual size_t write(const uint8_t* buf, size_t size) { 00076 counter += size; 00077 return size; 00078 } 00079 }; 00080 00081 // Encodes and prints string using Percent-encoding specified 00082 // in RFC 1738, Section 2.2 00083 static inline int print_encoded_string(Print* print, const char* str) { 00084 int bytes = 0; 00085 for (int i = 0; str[i] != 0; i++) { 00086 if (((str[i] >= 'A') && (str[i] <= 'Z')) || 00087 ((str[i] >= 'a') && (str[i] <= 'z')) || 00088 ((str[i] >= '0') && (str[i] <= '9')) || 00089 (str[i] == '-') || (str[i] == '_') || 00090 (str[i] == '.') || (str[i] == '~')) { 00091 bytes += print->print(str[i]); 00092 } else { 00093 // Encode all other characters 00094 bytes += print->print('%'); 00095 bytes += print->print(TO_HEX(str[i] / 16)); 00096 bytes += print->print(TO_HEX(str[i] % 16)); 00097 } 00098 } 00099 return bytes; 00100 } 00101 00102 #ifdef M2X_ENABLE_READER 00103 /* 00104 * +type+ indicates the value type: 1 for string, 2 for number 00105 * NOTE that the value type here only contains a hint on how 00106 * you can use the value. Even though 2 is returned, the value 00107 * is still stored in (const char *), and atoi/atof is needed to 00108 * get the actual value 00109 */ 00110 typedef void (*stream_value_read_callback)(const char* at, 00111 const char* value, 00112 int index, 00113 void* context, 00114 int type); 00115 00116 typedef void (*location_read_callback)(const char* name, 00117 double latitude, 00118 double longitude, 00119 double elevation, 00120 const char* timestamp, 00121 int index, 00122 void* context); 00123 00124 typedef void (*m2x_command_read_callback)(const char* id, 00125 const char* name, 00126 int index, 00127 void *context); 00128 #endif /* M2X_ENABLE_READER */ 00129 00130 typedef void (*m2x_fill_data_callback)(Print *print, void *context); 00131 00132 class M2XStreamClient { 00133 public: 00134 M2XStreamClient(Client* client, 00135 const char* key, 00136 void (* idlefunc)(void) = NULL, 00137 int case_insensitive = 1, 00138 const char* host = DEFAULT_M2X_HOST, 00139 int port = DEFAULT_M2X_PORT, 00140 const char* path_prefix = NULL); 00141 00142 // Push data stream value using PUT request, returns the HTTP status code 00143 // NOTE: if you want to update by a serial, use "serial/<serial ID>" as 00144 // the device ID here. 00145 template <class T> 00146 int updateStreamValue(const char* deviceId, const char* streamName, T value); 00147 00148 // Post multiple values to M2X all at once. 00149 // +deviceId+ - id of the device to post values 00150 // +streamNum+ - Number of streams to post 00151 // +names+ - Array of stream names, the length of the array should 00152 // be exactly +streamNum+ 00153 // +counts+ - Array of +streamNum+ length, each item in this array 00154 // containing the number of values we want to post for each stream 00155 // +ats+ - Timestamps for each value, the length of this array should 00156 // be the some of all values in +counts+, for the first +counts[0]+ 00157 // items, the values belong to the first stream, for the following 00158 // +counts[1]+ number of items, the values belong to the second stream, 00159 // etc. Notice that timestamps are required here: you must provide 00160 // a timestamp for each value posted. 00161 // +values+ - Values to post. This works the same way as +ats+, the 00162 // first +counts[0]+ number of items contain values to post to the first 00163 // stream, the succeeding +counts[1]+ number of items contain values 00164 // for the second stream, etc. The length of this array should be 00165 // the sum of all values in +counts+ array. 00166 // NOTE: if you want to update by a serial, use "serial/<serial ID>" as 00167 // the device ID here. 00168 template <class T> 00169 int postDeviceUpdates(const char* deviceId, int streamNum, 00170 const char* names[], const int counts[], 00171 const char* ats[], T values[]); 00172 00173 // Post multiple values of a single device at once. 00174 // +deviceId+ - id of the device to post values 00175 // +streamNum+ - Number of streams to post 00176 // +names+ - Array of stream names, the length of the array should 00177 // be exactly +streamNum+ 00178 // +values+ - Array of values to post, the length of the array should 00179 // be exactly +streamNum+. Notice that the array of +values+ should 00180 // match the array of +names+, and that the ith value in +values+ is 00181 // exactly the value to post for the ith stream name in +names+ 00182 // NOTE: if you want to update by a serial, use "serial/<serial ID>" as 00183 // the device ID here. 00184 template <class T> 00185 int postDeviceUpdate(const char* deviceId, int streamNum, 00186 const char* names[], T values[], 00187 const char* at = NULL); 00188 00189 #ifdef M2X_ENABLE_READER 00190 // Fetch values for a particular data stream. Since memory is 00191 // very limited on an Arduino, we cannot parse and get all the 00192 // data points in memory. Instead, we use callbacks here: whenever 00193 // a new data point is parsed, we call the callback using the values, 00194 // after that, the values will be thrown away to make space for new 00195 // values. 00196 // Note that you can also pass in a user-specified context in this 00197 // function, this context will be passed to the callback function 00198 // each time we get a data point. 00199 // For each data point, the callback will be called once. The HTTP 00200 // status code will be returned. And the content is only parsed when 00201 // the status code is 200. 00202 int listStreamValues(const char* deviceId, const char* streamName, 00203 stream_value_read_callback callback, void* context, 00204 const char* query = NULL); 00205 #endif /* M2X_ENABLE_READER */ 00206 00207 // Update datasource location 00208 // NOTE: On an Arduino Uno and other ATMEGA based boards, double has 00209 // 4-byte (32 bits) precision, which is the same as float. So there's 00210 // no natural double-precision floating number on these boards. With 00211 // a float value, we have a precision of roughly 7 digits, that means 00212 // either 5 or 6 digits after the floating point. According to wikipedia, 00213 // a difference of 0.00001 will give us ~1.1132m distance. If this 00214 // precision is good for you, you can use the double-version we provided 00215 // here. Otherwise, you may need to use the string-version and do the 00216 // actual conversion by yourselves. 00217 // However, with an Arduino Due board, double has 8-bytes (64 bits) 00218 // precision, which means you are free to use the double-version only 00219 // without any precision problems. 00220 // Returned value is the http status code. 00221 // NOTE: if you want to update by a serial, use "serial/<serial ID>" as 00222 // the device ID here. 00223 template <class T> 00224 int updateLocation(const char* deviceId, const char* name, 00225 T latitude, T longitude, T elevation); 00226 00227 #ifdef M2X_ENABLE_READER 00228 // Read location information for a device. Also used callback to process 00229 // data points for memory reasons. The HTTP status code is returned, 00230 // response is only parsed when the HTTP status code is 200 00231 int readLocation(const char* deviceId, location_read_callback callback, 00232 void* context); 00233 #endif /* M2X_ENABLE_READER */ 00234 00235 // Delete values from a data stream 00236 // You will need to provide from and end date/time strings in the ISO8601 00237 // format "yyyy-mm-ddTHH:MM:SS.SSSZ" where 00238 // yyyy: the year 00239 // mm: the month 00240 // dd: the day 00241 // HH: the hour (24 hour format) 00242 // MM: the minute 00243 // SS.SSS: the seconds (to the millisecond) 00244 // NOTE: the time is given in Zulu (GMT) 00245 // M2X will delete all values within the from to end date/time range. 00246 // The status code is 204 on success and 400 on a bad request (e.g. the 00247 // timestamp is not in ISO8601 format or the from timestamp is not less than 00248 // or equal to the end timestamp. 00249 int deleteValues(const char* deviceId, const char* streamName, 00250 const char* from, const char* end); 00251 00252 #ifdef M2X_ENABLE_READER 00253 // Fetch commands available for this device, notice that for memory constraints, 00254 // we only keep ID and name of the command received here. 00255 // You can tweak the command receiving via the query parameter. 00256 int listCommands(const char* deviceId, 00257 m2x_command_read_callback callback, void* context, 00258 const char* query = NULL); 00259 #endif /* M2X_ENABLE_READER */ 00260 00261 // Mark a command as processed. 00262 // Link: https://m2x.att.com/developer/documentation/v2/commands#Device-Marks-a-Command-as-Processed 00263 // To make sure the minimal amount of memory is needed, this API works with 00264 // a callback function. The callback function is then used to fill the request 00265 // data, note that in order to correctly set the content length in HTTP header, 00266 // the callback function will be called twice, the caller must make sure both 00267 // calls fill the Print object with exactly the same data. 00268 // If you have a pre-allocated buffer filled with the data to post, you can 00269 // use markCommandProcessedWithData API below. 00270 int markCommandProcessed(const char* deviceId, const char* commandId, 00271 m2x_fill_data_callback callback, void *context); 00272 00273 // Mark a command as processed with a data buffer 00274 // Link: https://m2x.att.com/developer/documentation/v2/commands#Device-Marks-a-Command-as-Processed 00275 // This is exactly like markCommandProcessed, except that a buffer is use to 00276 // contain the data to post as the request body. 00277 int markCommandProcessedWithData(const char* deviceId, const char* commandId, 00278 const char* data); 00279 00280 // Mark a command as rejected. 00281 // Link: https://m2x.att.com/developer/documentation/v2/commands#Device-Marks-a-Command-as-Rejected 00282 // To make sure the minimal amount of memory is needed, this API works with 00283 // a callback function. The callback function is then used to fill the request 00284 // data, note that in order to correctly set the content length in HTTP header, 00285 // the callback function will be called twice, the caller must make sure both 00286 // calls fill the Print object with exactly the same data. 00287 // If you have a pre-allocated buffer filled with the data to post, you can 00288 // use markCommandRejectedWithData API below. 00289 int markCommandRejected(const char* deviceId, const char* commandId, 00290 m2x_fill_data_callback callback, void *context); 00291 00292 // Mark a command as rejected with a data buffer 00293 // Link: https://m2x.att.com/developer/documentation/v2/commands#Device-Marks-a-Command-as-Rejected 00294 // This is exactly like markCommandRejected, except that a buffer is use to 00295 // contain the data to post as the request body. 00296 int markCommandRejectedWithData(const char* deviceId, const char* commandId, 00297 const char* data); 00298 00299 // Fetches current timestamp in seconds from M2X server. Since we 00300 // are using signed 32-bit integer as return value, this will only 00301 // return valid results before 03:14:07 UTC on 19 January 2038. If 00302 // the device is supposed to work after that, this function should 00303 // not be used. 00304 // 00305 // The returned value will contain the status code(positive values) 00306 // or the error code(negative values). 00307 // In case of success, the current timestamp will be filled in the 00308 // +ts+ pointer passed in as argument. 00309 // 00310 // NOTE: although returning uint32_t can give us a larger space, 00311 // we prefer to cope with the unix convention here. 00312 int getTimestamp32(int32_t* ts); 00313 00314 // Fetches current timestamp in seconds from M2X server. 00315 // This function will return the timestamp as an integer literal 00316 // in the provided buffer. Hence there's no problem working after 00317 // 03:14:07 UTC on 19 January 2038. The drawback part here, is that 00318 // you will have to work with 64-bit integer, which is not available 00319 // on certain platform(such as Arduino), a bignum library or alike 00320 // is needed in this case. 00321 // 00322 // Notice +bufferLength+ is supposed to contain the length of the 00323 // buffer when calling this function. It is also the caller's 00324 // responsibility to ensure the buffer is big enough, otherwise 00325 // the library will return an error indicating the buffer is too 00326 // small. 00327 // While this is not accurate all the time, one trick here is to 00328 // pass in 0 as the bufferLength, in which case we will always return 00329 // the buffer-too-small error. However, the correct buffer length 00330 // can be found this way so a secound execution is most likely to work 00331 // (unless we are at the edge of the buffer length increasing, for 00332 // example, when the timestamp jumps from 9999999999 to 10000000000, 00333 // which is highly unlikely to happend). However, given that the 00334 // maximum 64-bit integer can be stored in 19 bytes, there's not 00335 // much need to use this trick.) 00336 // 00337 // The returned value will contain the status code(positive values) 00338 // or the error code(negative values). 00339 // In case of success, the current timestamp will be filled in the 00340 // passed +buffer+ pointer, and the actual used buffer length will 00341 // be returned in +bufferLength+ pointer. 00342 // NOTE: as long as we can read the returned buffer length, it will 00343 // be used to fill in the +bufferLength+ variable even though other 00344 // errors occur(buffer is not enough, network is shutdown before 00345 // reading the whole buffer, etc.) 00346 int getTimestamp(char* buffer, int* bufferLength); 00347 private: 00348 Client* _client; 00349 const char* _key; 00350 int _case_insensitive; 00351 const char* _host; 00352 int _port; 00353 void (* _idlefunc)(void); 00354 const char* _path_prefix; 00355 NullPrint _null_print; 00356 00357 // Writes the HTTP header part for updating a stream value 00358 void writePutHeader(const char* deviceId, 00359 const char* streamName, 00360 int contentLength); 00361 // Writes the HTTP header part for deleting stream values 00362 void writeDeleteHeader(const char* deviceId, 00363 const char* streamName, 00364 int contentLength); 00365 // Writes HTTP header lines including M2X API Key, host, content 00366 // type and content length(if the body exists) 00367 void writeHttpHeader(int contentLength); 00368 // Parses HTTP response header and return the content length. 00369 // Note that this function does not parse all http headers, as long 00370 // as the content length is found, this function will return 00371 int readContentLength(); 00372 // Skips all HTTP response header part. Return minus value in case 00373 // the connection is closed before we got all headers 00374 int skipHttpHeader(); 00375 // Parses and returns the HTTP status code, note this function will 00376 // return immediately once it gets the status code 00377 int readStatusCode(bool closeClient); 00378 // Waits for a certain string pattern in the HTTP header, and returns 00379 // once the pattern is found. In the pattern, you can use '*' to denote 00380 // any character 00381 int waitForString(const char* str); 00382 // Closes the connection 00383 void close(); 00384 00385 #ifdef M2X_ENABLE_READER 00386 // Parses JSON response of stream value API, and calls callback function 00387 // once we get a data point 00388 int readStreamValue(stream_value_read_callback callback, void* context); 00389 // Parses JSON response of location API, and calls callback function once 00390 // we get a data point 00391 int readLocation(location_read_callback callback, void* context); 00392 // Parses JSON response of command API, and calls callback function once 00393 // we get a data point 00394 int readCommand(m2x_command_read_callback callback, void* context); 00395 #endif /* M2X_ENABLE_READER */ 00396 }; 00397 00398 00399 // A ISO8601 timestamp generation service for M2X. 00400 // It uses the Time API provided by the M2X server to initialize 00401 // clock, then uses millis() function provided by Arduino to calculate 00402 // time advancements so as to reduce API query times. 00403 // 00404 // Right now, this service only works with 32-bit timestamp, meaning that 00405 // this service won't work after 03:14:07 UTC on 19 January 2038. However, 00406 // a similar service that uses 64-bit timestamp can be implemented following 00407 // the logic here. 00408 class TimeService { 00409 public: 00410 TimeService(M2XStreamClient* client); 00411 00412 // Initialize the time service. Notice the TimeService instance is only 00413 // working after calling this function successfully. 00414 int init(); 00415 00416 // Reset the internal recorded time by calling M2X Time API again. Normally, 00417 // you don't need to call this manually. TimeService will handle Arduino clock 00418 // overflow automatically 00419 int reset(); 00420 00421 // Fills ISO8601 formatted timestamp into the buffer provided. +length+ should 00422 // contains the maximum supported length of the buffer when calling. For now, 00423 // the buffer should be able to store 25 characters for a full ISO8601 formatted 00424 // timestamp, otherwise, an error will be returned. 00425 int getTimestamp(char* buffer, int* length); 00426 private: 00427 M2XStreamClient* _client; 00428 int32_t _server_timestamp; 00429 uint32_t _local_last_milli; 00430 M2XTimer _timer; 00431 }; 00432 00433 00434 // Implementations 00435 M2XStreamClient::M2XStreamClient(Client* client, 00436 const char* key, 00437 void (* idlefunc)(void), 00438 int case_insensitive, 00439 const char* host, 00440 int port, 00441 const char* path_prefix) : _client(client), 00442 _key(key), 00443 _idlefunc(idlefunc), 00444 _case_insensitive(case_insensitive), 00445 _host(host), 00446 _port(port), 00447 _path_prefix(path_prefix), 00448 _null_print() { 00449 } 00450 00451 template <class T> 00452 int M2XStreamClient::updateStreamValue(const char* deviceId, const char* streamName, T value) { 00453 if (_client->connect(_host, _port)) { 00454 DBGLN("%s", "Connected to M2X server!"); 00455 writePutHeader(deviceId, streamName, 00456 // for {"value": and } 00457 _null_print.print(value) + 12); 00458 _client->print("{\"value\":\""); 00459 _client->print(value); 00460 _client->print("\"}"); 00461 } else { 00462 DBGLN("%s", "ERROR: Cannot connect to M2X server!"); 00463 return E_NOCONNECTION; 00464 } 00465 00466 return readStatusCode(true); 00467 } 00468 00469 template <class T> 00470 inline int write_multiple_values(Print* print, int streamNum, 00471 const char* names[], const int counts[], 00472 const char* ats[], T values[]) { 00473 int bytes = 0, value_index = 0; 00474 bytes += print->print("{\"values\":{"); 00475 for (int i = 0; i < streamNum; i++) { 00476 bytes += print->print("\""); 00477 bytes += print->print(names[i]); 00478 bytes += print->print("\":["); 00479 for (int j = 0; j < counts[i]; j++) { 00480 bytes += print->print("{\"timestamp\": \""); 00481 bytes += print->print(ats[value_index]); 00482 bytes += print->print("\",\"value\": \""); 00483 bytes += print->print(values[value_index]); 00484 bytes += print->print("\"}"); 00485 if (j < counts[i] - 1) { bytes += print->print(","); } 00486 value_index++; 00487 } 00488 bytes += print->print("]"); 00489 if (i < streamNum - 1) { bytes += print->print(","); } 00490 } 00491 bytes += print->print("}}"); 00492 return bytes; 00493 } 00494 00495 template <class T> 00496 int M2XStreamClient::postDeviceUpdates(const char* deviceId, int streamNum, 00497 const char* names[], const int counts[], 00498 const char* ats[], T values[]) { 00499 if (_client->connect(_host, _port)) { 00500 DBGLN("%s", "Connected to M2X server!"); 00501 int length = write_multiple_values(&_null_print, streamNum, names, 00502 counts, ats, values); 00503 _client->print("POST "); 00504 if (_path_prefix) { _client->print(_path_prefix); } 00505 _client->print("/v2/devices/"); 00506 _client->print(deviceId); 00507 _client->println("/updates HTTP/1.0"); 00508 writeHttpHeader(length); 00509 write_multiple_values(_client, streamNum, names, counts, ats, values); 00510 } else { 00511 DBGLN("%s", "ERROR: Cannot connect to M2X server!"); 00512 return E_NOCONNECTION; 00513 } 00514 return readStatusCode(true); 00515 } 00516 00517 template <class T> 00518 inline int write_single_device_values(Print* print, int streamNum, 00519 const char* names[], T values[], 00520 const char* at) { 00521 int bytes = 0; 00522 bytes += print->print("{\"values\":{"); 00523 for (int i = 0; i < streamNum; i++) { 00524 bytes += print->print("\""); 00525 bytes += print->print(names[i]); 00526 bytes += print->print("\": \""); 00527 bytes += print->print(values[i]); 00528 bytes += print->print("\""); 00529 if (i < streamNum - 1) { bytes += print->print(","); } 00530 } 00531 bytes += print->print("}"); 00532 if (at != NULL) { 00533 bytes += print->print(",\"timestamp\":\""); 00534 bytes += print->print(at); 00535 bytes += print->print("\""); 00536 } 00537 bytes += print->print("}"); 00538 return bytes; 00539 } 00540 00541 template <class T> 00542 int M2XStreamClient::postDeviceUpdate(const char* deviceId, int streamNum, 00543 const char* names[], T values[], 00544 const char* at) { 00545 if (_client->connect(_host, _port)) { 00546 DBGLN("%s", "Connected to M2X server!"); 00547 int length = write_single_device_values(&_null_print, streamNum, names, 00548 values, at); 00549 _client->print("POST "); 00550 if (_path_prefix) { _client->print(_path_prefix); } 00551 _client->print("/v2/devices/"); 00552 _client->print(deviceId); 00553 _client->println("/update HTTP/1.0"); 00554 writeHttpHeader(length); 00555 write_single_device_values(_client, streamNum, names, values, at); 00556 } else { 00557 DBGLN("%s", "ERROR: Cannot connect to M2X server!"); 00558 return E_NOCONNECTION; 00559 } 00560 return readStatusCode(true); 00561 } 00562 00563 template <class T> 00564 static int write_location_data(Print* print, const char* name, 00565 T latitude, T longitude, 00566 T elevation) { 00567 int bytes = 0; 00568 bytes += print->print("{\"name\":\""); 00569 bytes += print->print(name); 00570 bytes += print->print("\",\"latitude\":\""); 00571 bytes += print->print(latitude); 00572 bytes += print->print("\",\"longitude\":\""); 00573 bytes += print->print(longitude); 00574 bytes += print->print("\",\"elevation\":\""); 00575 bytes += print->print(elevation); 00576 bytes += print->print("\"}"); 00577 return bytes; 00578 } 00579 00580 static int write_location_data(Print* print, const char* name, 00581 double latitude, double longitude, 00582 double elevation) { 00583 int bytes = 0; 00584 bytes += print->print("{\"name\":\""); 00585 bytes += print->print(name); 00586 bytes += print->print("\",\"latitude\":\""); 00587 bytes += print->print(latitude, MAX_DOUBLE_DIGITS); 00588 bytes += print->print("\",\"longitude\":\""); 00589 bytes += print->print(longitude, MAX_DOUBLE_DIGITS); 00590 bytes += print->print("\",\"elevation\":\""); 00591 bytes += print->print(elevation); 00592 bytes += print->print("\"}"); 00593 return bytes; 00594 } 00595 00596 template <class T> 00597 int M2XStreamClient::updateLocation(const char* deviceId, 00598 const char* name, 00599 T latitude, 00600 T longitude, 00601 T elevation) { 00602 if (_client->connect(_host, _port)) { 00603 DBGLN("%s", "Connected to M2X server!"); 00604 00605 int length = write_location_data(&_null_print, name, latitude, longitude, 00606 elevation); 00607 _client->print("PUT "); 00608 if (_path_prefix) { _client->print(_path_prefix); } 00609 _client->print("/v2/devices/"); 00610 _client->print(deviceId); 00611 _client->println("/location HTTP/1.0"); 00612 00613 writeHttpHeader(length); 00614 write_location_data(_client, name, latitude, longitude, elevation); 00615 } else { 00616 DBGLN("%s", "ERROR: Cannot connect to M2X server!"); 00617 return E_NOCONNECTION; 00618 } 00619 return readStatusCode(true); 00620 } 00621 00622 static inline int write_delete_values(Print* print, const char* from, 00623 const char* end) { 00624 int bytes = 0; 00625 bytes += print->print("{\"from\":\""); 00626 bytes += print->print(from); 00627 bytes += print->print("\",\"end\":\""); 00628 bytes += print->print(end); 00629 bytes += print->print("\"}"); 00630 return bytes; 00631 } 00632 00633 int M2XStreamClient::deleteValues(const char* deviceId, const char* streamName, 00634 const char* from, const char* end) { 00635 if (_client->connect(_host, _port)) { 00636 DBGLN("%s", "Connected to M2X server!"); 00637 int length = write_delete_values(&_null_print, from, end); 00638 writeDeleteHeader(deviceId, streamName, length); 00639 write_delete_values(_client, from, end); 00640 } else { 00641 DBGLN("%s", "ERROR: Cannot connect to M2X server!"); 00642 return E_NOCONNECTION; 00643 } 00644 00645 return readStatusCode(true); 00646 } 00647 00648 int M2XStreamClient::markCommandProcessed(const char* deviceId, 00649 const char* commandId, 00650 m2x_fill_data_callback callback, 00651 void *context) { 00652 if (_client->connect(_host, _port)) { 00653 DBGLN("%s", "Connected to M2X server!"); 00654 _null_print.counter = 0; 00655 callback(&_null_print, context); 00656 int length = _null_print.counter; 00657 _client->print("POST "); 00658 if (_path_prefix) { _client->print(_path_prefix); } 00659 _client->print("/v2/devices/"); 00660 _client->print(deviceId); 00661 _client->print("/commands/"); 00662 _client->print(commandId); 00663 _client->print("/process"); 00664 _client->println(" HTTP/1.0"); 00665 writeHttpHeader(length); 00666 callback(_client, context); 00667 } else { 00668 DBGLN("%s", "ERROR: Cannot connect to M2X server!"); 00669 return E_NOCONNECTION; 00670 } 00671 return readStatusCode(true); 00672 } 00673 00674 static void m2x_fixed_buffer_filling_callback(Print *print, void *context) { 00675 if (context) { 00676 print->print((const char *) context); 00677 } 00678 } 00679 00680 int M2XStreamClient::markCommandProcessedWithData(const char* deviceId, 00681 const char* commandId, 00682 const char* data) { 00683 return markCommandProcessed(deviceId, commandId, 00684 m2x_fixed_buffer_filling_callback, (void *) data); 00685 } 00686 00687 int M2XStreamClient::markCommandRejected(const char* deviceId, 00688 const char* commandId, 00689 m2x_fill_data_callback callback, 00690 void *context) { 00691 if (_client->connect(_host, _port)) { 00692 DBGLN("%s", "Connected to M2X server!"); 00693 _null_print.counter = 0; 00694 callback(&_null_print, context); 00695 int length = _null_print.counter; 00696 _client->print("POST "); 00697 if (_path_prefix) { _client->print(_path_prefix); } 00698 _client->print("/v2/devices/"); 00699 _client->print(deviceId); 00700 _client->print("/commands/"); 00701 _client->print(commandId); 00702 _client->print("/reject"); 00703 _client->println(" HTTP/1.0"); 00704 writeHttpHeader(length); 00705 callback(_client, context); 00706 } else { 00707 DBGLN("%s", "ERROR: Cannot connect to M2X server!"); 00708 return E_NOCONNECTION; 00709 } 00710 return readStatusCode(true); 00711 } 00712 00713 int M2XStreamClient::markCommandRejectedWithData(const char* deviceId, 00714 const char* commandId, 00715 const char* data) { 00716 return markCommandRejected(deviceId, commandId, 00717 m2x_fixed_buffer_filling_callback, (void *) data); 00718 } 00719 00720 int M2XStreamClient::getTimestamp32(int32_t *ts) { 00721 // The maximum value of signed 64-bit integer is 0x7fffffffffffffff, 00722 // which is 9223372036854775807. It consists of 19 characters, so a 00723 // buffer of 20 is definitely enough here 00724 int length = 20; 00725 char buffer[20]; 00726 int status = getTimestamp(buffer, &length); 00727 if (status == 200) { 00728 int32_t result = 0; 00729 for (int i = 0; i < length; i++) { 00730 result = result * 10 + (buffer[i] - '0'); 00731 } 00732 if (ts != NULL) { *ts = result; } 00733 } 00734 return status; 00735 } 00736 00737 int M2XStreamClient::getTimestamp(char* buffer, int *bufferLength) { 00738 if (bufferLength == NULL) { return E_INVALID; } 00739 if (_client->connect(_host, _port)) { 00740 DBGLN("%s", "Connected to M2X server!"); 00741 _client->print("GET "); 00742 if (_path_prefix) { _client->print(_path_prefix); } 00743 _client->println("/v2/time/seconds HTTP/1.0"); 00744 00745 writeHttpHeader(-1); 00746 } else { 00747 DBGLN("%s", "ERROR: Cannot connect to M2X server!"); 00748 return E_NOCONNECTION; 00749 } 00750 int status = readStatusCode(false); 00751 if (status == 200) { 00752 int length = readContentLength(); 00753 if (length < 0) { 00754 close(); 00755 return length; 00756 } 00757 if (*bufferLength < length) { 00758 *bufferLength = length; 00759 return E_BUFFER_TOO_SMALL; 00760 } 00761 *bufferLength = length; 00762 int index = skipHttpHeader(); 00763 if (index != E_OK) { 00764 close(); 00765 return index; 00766 } 00767 index = 0; 00768 while (index < length) { 00769 DBG("%s", "Received Data: "); 00770 while ((index < length) && _client->available()) { 00771 buffer[index++] = _client->read(); 00772 DBG("%c", buffer[index - 1]); 00773 } 00774 DBGLNEND; 00775 00776 if ((!_client->connected()) && 00777 (index < length)) { 00778 close(); 00779 return E_NOCONNECTION; 00780 } 00781 00782 if (_idlefunc!=NULL) { 00783 _idlefunc(); 00784 } else { 00785 delay(200); 00786 } 00787 } 00788 } 00789 close(); 00790 return status; 00791 } 00792 00793 00794 void M2XStreamClient::writePutHeader(const char* deviceId, 00795 const char* streamName, 00796 int contentLength) { 00797 _client->print("PUT "); 00798 if (_path_prefix) { _client->print(_path_prefix); } 00799 _client->print("/v2/devices/"); 00800 _client->print(deviceId); 00801 _client->print("/streams/"); 00802 print_encoded_string(_client, streamName); 00803 _client->println("/value HTTP/1.0"); 00804 00805 writeHttpHeader(contentLength); 00806 } 00807 00808 void M2XStreamClient::writeDeleteHeader(const char* deviceId, 00809 const char* streamName, 00810 int contentLength) { 00811 _client->print("DELETE "); 00812 if (_path_prefix) { _client->print(_path_prefix); } 00813 _client->print("/v2/devices/"); 00814 _client->print(deviceId); 00815 _client->print("/streams/"); 00816 print_encoded_string(_client, streamName); 00817 _client->print("/values"); 00818 _client->println(" HTTP/1.0"); 00819 00820 writeHttpHeader(contentLength); 00821 } 00822 00823 void M2XStreamClient::writeHttpHeader(int contentLength) { 00824 _client->println(USER_AGENT); 00825 _client->print("X-M2X-KEY: "); 00826 _client->println(_key); 00827 00828 _client->print("Host: "); 00829 print_encoded_string(_client, _host); 00830 if (_port != DEFAULT_M2X_PORT) { 00831 _client->print(":"); 00832 // port is an integer, does not need encoding 00833 _client->print(_port); 00834 } 00835 _client->println(); 00836 00837 if (contentLength > 0) { 00838 _client->println("Content-Type: application/json"); 00839 DBG("%s", "Content Length: "); 00840 DBGLN("%d", contentLength); 00841 00842 _client->print("Content-Length: "); 00843 _client->println(contentLength); 00844 } 00845 _client->println(); 00846 } 00847 00848 int M2XStreamClient::waitForString(const char* str) { 00849 int currentIndex = 0; 00850 if (str[currentIndex] == '\0') return E_OK; 00851 00852 while (true) { 00853 while (_client->available()) { 00854 char c = _client->read(); 00855 DBG("%c", c); 00856 00857 int cmp; 00858 if (_case_insensitive) { 00859 cmp = tolower(c) - tolower(str[currentIndex]); 00860 } else { 00861 cmp = c - str[currentIndex]; 00862 } 00863 00864 if ((str[currentIndex] == '*') || (cmp == 0)) { 00865 currentIndex++; 00866 if (str[currentIndex] == '\0') { 00867 return E_OK; 00868 } 00869 } else { 00870 // start from the beginning 00871 currentIndex = 0; 00872 } 00873 } 00874 00875 if (!_client->connected()) { 00876 DBGLN("%s", "ERROR: The client is disconnected from the server!"); 00877 00878 close(); 00879 return E_DISCONNECTED; 00880 } 00881 if (_idlefunc!=NULL) 00882 _idlefunc(); 00883 else 00884 delay(200); 00885 } 00886 // never reached here 00887 return E_NOTREACHABLE; 00888 } 00889 00890 int M2XStreamClient::readStatusCode(bool closeClient) { 00891 int responseCode = 0; 00892 int ret = waitForString("HTTP/*.* "); 00893 if (ret != E_OK) { 00894 if (closeClient) close(); 00895 return ret; 00896 } 00897 00898 // ret is not needed from here(since it must be E_OK), so we can use it 00899 // as a regular variable now. 00900 ret = 0; 00901 while (true) { 00902 while (_client->available()) { 00903 char c = _client->read(); 00904 DBG("%c", c); 00905 00906 responseCode = responseCode * 10 + (c - '0'); 00907 ret++; 00908 if (ret == 3) { 00909 if (closeClient) close(); 00910 return responseCode; 00911 } 00912 } 00913 00914 if (!_client->connected()) { 00915 DBGLN("%s", "ERROR: The client is disconnected from the server!"); 00916 00917 if (closeClient) close(); 00918 return E_DISCONNECTED; 00919 } 00920 if (_idlefunc!=NULL) 00921 _idlefunc(); 00922 else 00923 delay(200); 00924 } 00925 00926 // never reached here 00927 return E_NOTREACHABLE; 00928 } 00929 00930 int M2XStreamClient::readContentLength() { 00931 int ret = waitForString("Content-Length: "); 00932 if (ret != E_OK) { 00933 return ret; 00934 } 00935 00936 // From now on, ret is not needed, we can use it 00937 // to keep the final result 00938 ret = 0; 00939 while (true) { 00940 while (_client->available()) { 00941 char c = _client->read(); 00942 DBG("%c", c); 00943 00944 if ((c == '\r') || (c == '\n')) { 00945 return (ret == 0) ? (E_INVALID) : (ret); 00946 } else { 00947 ret = ret * 10 + (c - '0'); 00948 } 00949 } 00950 00951 if (!_client->connected()) { 00952 DBGLN("%s", "ERROR: The client is disconnected from the server!"); 00953 00954 return E_DISCONNECTED; 00955 } 00956 if (_idlefunc!=NULL) 00957 _idlefunc(); 00958 else 00959 delay(200); 00960 } 00961 00962 // never reached here 00963 return E_NOTREACHABLE; 00964 } 00965 00966 int M2XStreamClient::skipHttpHeader() { 00967 return waitForString("\n\r\n"); 00968 } 00969 00970 void M2XStreamClient::close() { 00971 // Eats up buffered data before closing 00972 _client->flush(); 00973 _client->stop(); 00974 } 00975 00976 static inline int fill_iso8601_timestamp(int32_t seconds, int32_t milli, 00977 char* buffer, int* length); 00978 00979 TimeService::TimeService(M2XStreamClient* client) : _client(client) { 00980 } 00981 00982 int TimeService::init() { 00983 _timer.start(); 00984 return reset(); 00985 } 00986 00987 int TimeService::reset() { 00988 int32_t ts; 00989 int status = _client->getTimestamp32(&ts); 00990 00991 if (m2x_status_is_success(status)) { 00992 _server_timestamp = ts; 00993 _local_last_milli = _timer.read_ms(); 00994 } 00995 00996 return status; 00997 } 00998 00999 int TimeService::getTimestamp(char* buffer, int* length) { 01000 uint32_t now = _timer.read_ms(); 01001 if (now < _local_last_milli) { 01002 // In case of a timestamp overflow(happens once every 50 days on 01003 // Arduino), we reset the server timestamp recorded. 01004 // NOTE: while on an Arduino this might be okay, the situation is worse 01005 // on mbed, see the notes in m2x-mbed.h for details 01006 int status = reset(); 01007 if (!m2x_status_is_success(status)) { return status; } 01008 now = _timer.read_ms(); 01009 } 01010 if (now < _local_last_milli) { 01011 // We have already reseted the timestamp, so this cannot happen 01012 // (an HTTP request can take longer than 50 days to finished? You 01013 // must be kidding here). Something else must be wrong here 01014 return E_TIMESTAMP_ERROR; 01015 } 01016 uint32_t diff = now - _local_last_milli; 01017 _local_last_milli = now; 01018 _server_timestamp += (int32_t) (diff / 1000); // Milliseconds to seconds 01019 return fill_iso8601_timestamp(_server_timestamp, (int32_t) (diff % 1000), 01020 buffer, length); 01021 } 01022 01023 #define SIZE_ISO_8601 25 01024 static inline bool is_leap_year(int16_t y) { 01025 return ((1970 + y) > 0) && 01026 !((1970 + y) % 4) && 01027 (((1970 + y) % 100) || !((1970 + y) % 400)); 01028 } 01029 static inline int32_t days_in_year(int16_t y) { 01030 return is_leap_year(y) ? 366 : 365; 01031 } 01032 static const uint8_t MONTH_DAYS[]={31,28,31,30,31,30,31,31,30,31,30,31}; 01033 01034 static inline int fill_iso8601_timestamp(int32_t timestamp, int32_t milli, 01035 char* buffer, int* length) { 01036 int16_t year; 01037 int8_t month, month_length; 01038 int32_t day; 01039 int8_t hour, minute, second; 01040 01041 if (*length < SIZE_ISO_8601) { 01042 *length = SIZE_ISO_8601; 01043 return E_BUFFER_TOO_SMALL; 01044 } 01045 01046 second = timestamp % 60; 01047 timestamp /= 60; // now it is minutes 01048 01049 minute = timestamp % 60; 01050 timestamp /= 60; // now it is hours 01051 01052 hour = timestamp % 24; 01053 timestamp /= 24; // now it is days 01054 01055 year = 0; 01056 day = 0; 01057 while ((day += days_in_year(year)) <= timestamp) { 01058 year++; 01059 } 01060 day -= days_in_year(year); 01061 timestamp -= day; // now it is days in this year, starting at 0 01062 01063 day = 0; 01064 month_length = 0; 01065 for (month = 0; month < 12; month++) { 01066 if (month == 1) { 01067 // February 01068 month_length = is_leap_year(year) ? 29 : 28; 01069 } else { 01070 month_length = MONTH_DAYS[month]; 01071 } 01072 01073 if (timestamp >= month_length) { 01074 timestamp -= month_length; 01075 } else { 01076 break; 01077 } 01078 } 01079 year = 1970 + year; 01080 month++; // offset by 1 01081 day = timestamp + 1; 01082 01083 int i = 0, j = 0; 01084 01085 // NOTE: It seems the snprintf implementation in Arduino has bugs, 01086 // we have to manually piece the string together here. 01087 #define INT_TO_STR(v_, width_) \ 01088 for (j = 0; j < (width_); j++) { \ 01089 buffer[i + (width_) - 1 - j] = '0' + ((v_) % 10); \ 01090 (v_) /= 10; \ 01091 } \ 01092 i += (width_) 01093 01094 INT_TO_STR(year, 4); 01095 buffer[i++] = '-'; 01096 INT_TO_STR(month, 2); 01097 buffer[i++] = '-'; 01098 INT_TO_STR(day, 2); 01099 buffer[i++] = 'T'; 01100 INT_TO_STR(hour, 2); 01101 buffer[i++] = ':'; 01102 INT_TO_STR(minute, 2); 01103 buffer[i++] = ':'; 01104 INT_TO_STR(second, 2); 01105 buffer[i++] = '.'; 01106 INT_TO_STR(milli, 3); 01107 buffer[i++] = 'Z'; 01108 buffer[i++] = '\0'; 01109 01110 #undef INT_TO_STR 01111 01112 *length = i; 01113 return E_OK; 01114 } 01115 01116 /* Reader functions */ 01117 #ifdef M2X_ENABLE_READER 01118 01119 // Data structures and functions used to parse stream values 01120 01121 #define STREAM_BUF_LEN 32 01122 01123 typedef struct { 01124 uint8_t state; 01125 char at_str[STREAM_BUF_LEN + 1]; 01126 char value_str[STREAM_BUF_LEN + 1]; 01127 int index; 01128 01129 stream_value_read_callback callback; 01130 void* context; 01131 } stream_parsing_context_state; 01132 01133 #define WAITING_AT 0x1 01134 #define GOT_AT 0x2 01135 #define WAITING_VALUE 0x4 01136 #define GOT_VALUE 0x8 01137 01138 #define GOT_STREAM (GOT_AT | GOT_VALUE) 01139 #define TEST_GOT_STREAM(state_) (((state_) & GOT_STREAM) == GOT_STREAM) 01140 01141 #define TEST_IS_AT(state_) (((state_) & (WAITING_AT | GOT_AT)) == WAITING_AT) 01142 #define TEST_IS_VALUE(state_) (((state_) & (WAITING_VALUE | GOT_VALUE)) == \ 01143 WAITING_VALUE) 01144 01145 static void on_stream_key_found(jsonlite_callback_context* context, 01146 jsonlite_token* token) 01147 { 01148 stream_parsing_context_state* state = 01149 (stream_parsing_context_state*) context->client_state; 01150 if (strncmp((const char*) token->start, "timestamp", 9) == 0) { 01151 state->state |= WAITING_AT; 01152 } else if ((strncmp((const char*) token->start, "value", 5) == 0) && 01153 (token->start[5] != 's')) { // get rid of "values" 01154 state->state |= WAITING_VALUE; 01155 } 01156 } 01157 01158 static void on_stream_value_found(jsonlite_callback_context* context, 01159 jsonlite_token* token, 01160 int type) 01161 { 01162 stream_parsing_context_state* state = 01163 (stream_parsing_context_state*) context->client_state; 01164 01165 if (TEST_IS_AT(state->state)) { 01166 strncpy(state->at_str, (const char*) token->start, 01167 MIN(token->end - token->start, STREAM_BUF_LEN)); 01168 state->at_str[MIN(token->end - token->start, STREAM_BUF_LEN)] = '\0'; 01169 state->state |= GOT_AT; 01170 } else if (TEST_IS_VALUE(state->state)) { 01171 strncpy(state->value_str, (const char*) token->start, 01172 MIN(token->end - token->start, STREAM_BUF_LEN)); 01173 state->value_str[MIN(token->end - token->start, STREAM_BUF_LEN)] = '\0'; 01174 state->state |= GOT_VALUE; 01175 } 01176 01177 if (TEST_GOT_STREAM(state->state)) { 01178 state->callback(state->at_str, state->value_str, 01179 state->index++, state->context, type); 01180 state->state = 0; 01181 } 01182 } 01183 01184 static void on_stream_string_found(jsonlite_callback_context* context, 01185 jsonlite_token* token) 01186 { 01187 on_stream_value_found(context, token, 1); 01188 } 01189 01190 static void on_stream_number_found(jsonlite_callback_context* context, 01191 jsonlite_token* token) 01192 { 01193 on_stream_value_found(context, token, 2); 01194 } 01195 01196 // Data structures and functions used to parse locations 01197 01198 #define LOCATION_BUF_LEN 20 01199 01200 typedef struct { 01201 uint16_t state; 01202 char name_str[LOCATION_BUF_LEN + 1]; 01203 double latitude; 01204 double longitude; 01205 double elevation; 01206 char timestamp_str[LOCATION_BUF_LEN + 1]; 01207 int index; 01208 01209 location_read_callback callback; 01210 void* context; 01211 } location_parsing_context_state; 01212 01213 #define WAITING_NAME 0x1 01214 #define WAITING_LATITUDE 0x2 01215 #define WAITING_LONGITUDE 0x4 01216 #define WAITING_ELEVATION 0x8 01217 #define WAITING_TIMESTAMP 0x10 01218 01219 #define GOT_NAME 0x20 01220 #define GOT_LATITUDE 0x40 01221 #define GOT_LONGITUDE 0x80 01222 #define GOT_ELEVATION 0x100 01223 #define GOT_TIMESTAMP 0x200 01224 01225 #define GOT_LOCATION (GOT_NAME | GOT_LATITUDE | GOT_LONGITUDE | GOT_ELEVATION | GOT_TIMESTAMP) 01226 #define TEST_GOT_LOCATION(state_) (((state_) & GOT_LOCATION) == GOT_LOCATION) 01227 01228 #define TEST_IS_NAME(state_) (((state_) & (WAITING_NAME | GOT_NAME)) == WAITING_NAME) 01229 #define TEST_IS_LATITUDE(state_) (((state_) & (WAITING_LATITUDE | GOT_LATITUDE)) \ 01230 == WAITING_LATITUDE) 01231 #define TEST_IS_LONGITUDE(state_) (((state_) & (WAITING_LONGITUDE | GOT_LONGITUDE)) \ 01232 == WAITING_LONGITUDE) 01233 #define TEST_IS_ELEVATION(state_) (((state_) & (WAITING_ELEVATION | GOT_ELEVATION)) \ 01234 == WAITING_ELEVATION) 01235 #define TEST_IS_TIMESTAMP(state_) (((state_) & (WAITING_TIMESTAMP | GOT_TIMESTAMP)) \ 01236 == WAITING_TIMESTAMP) 01237 01238 static void on_location_key_found(jsonlite_callback_context* context, 01239 jsonlite_token* token) { 01240 location_parsing_context_state* state = 01241 (location_parsing_context_state*) context->client_state; 01242 if (strncmp((const char*) token->start, "waypoints", 9) == 0) { 01243 // only parses those locations in waypoints, skip the outer one 01244 state->state = 0; 01245 } else if (strncmp((const char*) token->start, "name", 4) == 0) { 01246 state->state |= WAITING_NAME; 01247 } else if (strncmp((const char*) token->start, "latitude", 8) == 0) { 01248 state->state |= WAITING_LATITUDE; 01249 } else if (strncmp((const char*) token->start, "longitude", 9) == 0) { 01250 state->state |= WAITING_LONGITUDE; 01251 } else if (strncmp((const char*) token->start, "elevation", 9) == 0) { 01252 state->state |= WAITING_ELEVATION; 01253 } else if (strncmp((const char*) token->start, "timestamp", 9) == 0) { 01254 state->state |= WAITING_TIMESTAMP; 01255 } 01256 } 01257 01258 static void on_location_string_found(jsonlite_callback_context* context, 01259 jsonlite_token* token) { 01260 location_parsing_context_state* state = 01261 (location_parsing_context_state*) context->client_state; 01262 01263 if (TEST_IS_NAME(state->state)) { 01264 strncpy(state->name_str, (const char*) token->start, 01265 MIN(token->end - token->start, LOCATION_BUF_LEN)); 01266 state->name_str[MIN(token->end - token->start, LOCATION_BUF_LEN)] = '\0'; 01267 state->state |= GOT_NAME; 01268 } else if (TEST_IS_LATITUDE(state->state)) { 01269 state->latitude = atof((const char*) token->start); 01270 state->state |= GOT_LATITUDE; 01271 } else if (TEST_IS_LONGITUDE(state->state)) { 01272 state->longitude = atof((const char*) token->start); 01273 state->state |= GOT_LONGITUDE; 01274 } else if (TEST_IS_ELEVATION(state->state)) { 01275 state->elevation = atof((const char*) token->start); 01276 state->state |= GOT_ELEVATION; 01277 } else if (TEST_IS_TIMESTAMP(state->state)) { 01278 strncpy(state->timestamp_str, (const char*) token->start, 01279 MIN(token->end - token->start, LOCATION_BUF_LEN)); 01280 state->timestamp_str[MIN(token->end - token->start, LOCATION_BUF_LEN)] = '\0'; 01281 state->state |= GOT_TIMESTAMP; 01282 } 01283 01284 if (TEST_GOT_LOCATION(state->state)) { 01285 state->callback(state->name_str, state->latitude, state->longitude, 01286 state->elevation, state->timestamp_str, state->index++, 01287 state->context); 01288 state->state = 0; 01289 } 01290 } 01291 01292 #ifndef M2X_COMMAND_BUF_LEN 01293 #define M2X_COMMAND_BUF_LEN 40 01294 #endif /* M2X_COMMAND_BUF_LEN */ 01295 01296 typedef struct { 01297 uint8_t state; 01298 char id_str[M2X_COMMAND_BUF_LEN + 1]; 01299 char name_str[M2X_COMMAND_BUF_LEN + 1]; 01300 int index; 01301 01302 m2x_command_read_callback callback; 01303 void* context; 01304 } m2x_command_parsing_context_state; 01305 01306 #define M2X_COMMAND_WAITING_ID 0x1 01307 #define M2X_COMMAND_GOT_ID 0x2 01308 #define M2X_COMMAND_WAITING_NAME 0x4 01309 #define M2X_COMMAND_GOT_NAME 0x8 01310 01311 #define M2X_COMMAND_GOT_COMMAND (M2X_COMMAND_GOT_ID | M2X_COMMAND_GOT_NAME) 01312 #define M2X_COMMAND_TEST_GOT_COMMAND(state_) \ 01313 (((state_) & M2X_COMMAND_GOT_COMMAND) == M2X_COMMAND_GOT_COMMAND) 01314 01315 #define M2X_COMMAND_TEST_IS_ID(state_) \ 01316 (((state_) & (M2X_COMMAND_WAITING_ID | M2X_COMMAND_GOT_ID)) == \ 01317 M2X_COMMAND_WAITING_ID) 01318 #define M2X_COMMAND_TEST_IS_NAME(state_) \ 01319 (((state_) & (M2X_COMMAND_WAITING_NAME | M2X_COMMAND_GOT_NAME)) == \ 01320 M2X_COMMAND_WAITING_NAME) 01321 01322 static void m2x_on_command_key_found(jsonlite_callback_context* context, 01323 jsonlite_token* token) 01324 { 01325 m2x_command_parsing_context_state* state = 01326 (m2x_command_parsing_context_state*) context->client_state; 01327 if (strncmp((const char*) token->start, "id", 2) == 0) { 01328 state->state |= M2X_COMMAND_WAITING_ID; 01329 } else if (strncmp((const char*) token->start, "name", 4) == 0) { 01330 state->state |= M2X_COMMAND_WAITING_NAME; 01331 } 01332 } 01333 01334 static void m2x_on_command_value_found(jsonlite_callback_context* context, 01335 jsonlite_token* token) 01336 { 01337 m2x_command_parsing_context_state* state = 01338 (m2x_command_parsing_context_state*) context->client_state; 01339 if (M2X_COMMAND_TEST_IS_ID(state->state)) { 01340 strncpy(state->id_str, (const char*) token->start, 01341 MIN(token->end - token->start, M2X_COMMAND_BUF_LEN)); 01342 state->id_str[MIN(token->end - token->start, M2X_COMMAND_BUF_LEN)] = '\0'; 01343 state->state |= M2X_COMMAND_GOT_ID; 01344 } else if (M2X_COMMAND_TEST_IS_NAME(state->state)) { 01345 strncpy(state->name_str, (const char*) token->start, 01346 MIN(token->end - token->start, M2X_COMMAND_BUF_LEN)); 01347 state->name_str[MIN(token->end - token->start, M2X_COMMAND_BUF_LEN)] = '\0'; 01348 state->state |= M2X_COMMAND_GOT_NAME; 01349 } 01350 01351 if (M2X_COMMAND_TEST_GOT_COMMAND(state->state)) { 01352 state->callback(state->id_str, state->name_str, 01353 state->index++, state->context); 01354 state->state = 0; 01355 } 01356 } 01357 01358 int M2XStreamClient::listStreamValues(const char* deviceId, const char* streamName, 01359 stream_value_read_callback callback, void* context, 01360 const char* query) { 01361 if (_client->connect(_host, _port)) { 01362 DBGLN("%s", "Connected to M2X server!"); 01363 _client->print("GET "); 01364 if (_path_prefix) { _client->print(_path_prefix); } 01365 _client->print("/v2/devices/"); 01366 _client->print(deviceId); 01367 _client->print("/streams/"); 01368 print_encoded_string(_client, streamName); 01369 _client->print("/values"); 01370 01371 if (query) { 01372 if (query[0] != '?') { 01373 _client->print('?'); 01374 } 01375 _client->print(query); 01376 } 01377 01378 _client->println(" HTTP/1.0"); 01379 writeHttpHeader(-1); 01380 } else { 01381 DBGLN("%s", "ERROR: Cannot connect to M2X server!"); 01382 return E_NOCONNECTION; 01383 } 01384 int status = readStatusCode(false); 01385 if (status == 200) { 01386 readStreamValue(callback, context); 01387 } 01388 01389 close(); 01390 return status; 01391 } 01392 01393 int M2XStreamClient::readLocation(const char* deviceId, 01394 location_read_callback callback, 01395 void* context) { 01396 if (_client->connect(_host, _port)) { 01397 DBGLN("%s", "Connected to M2X server!"); 01398 _client->print("GET "); 01399 if (_path_prefix) { _client->print(_path_prefix); } 01400 _client->print("/v2/devices/"); 01401 _client->print(deviceId); 01402 _client->println("/location HTTP/1.0"); 01403 01404 writeHttpHeader(-1); 01405 } else { 01406 DBGLN("%s", "ERROR: Cannot connect to M2X server!"); 01407 return E_NOCONNECTION; 01408 } 01409 int status = readStatusCode(false); 01410 if (status == 200) { 01411 readLocation(callback, context); 01412 } 01413 01414 close(); 01415 return status; 01416 } 01417 01418 int M2XStreamClient::listCommands(const char* deviceId, 01419 m2x_command_read_callback callback, 01420 void* context, 01421 const char* query) { 01422 if (_client->connect(_host, _port)) { 01423 DBGLN("%s", "Connected to M2X server!"); 01424 _client->print("GET "); 01425 if (_path_prefix) { _client->print(_path_prefix); } 01426 _client->print("/v2/devices/"); 01427 _client->print(deviceId); 01428 _client->print("/commands"); 01429 01430 if (query) { 01431 if (query[0] != '?') { 01432 _client->print('?'); 01433 } 01434 _client->print(query); 01435 } 01436 01437 _client->println(" HTTP/1.0"); 01438 writeHttpHeader(-1); 01439 } else { 01440 DBGLN("%s", "ERROR: Cannot connect to M2X server!"); 01441 return E_NOCONNECTION; 01442 } 01443 int status = readStatusCode(false); 01444 if (status == 200) { 01445 readCommand(callback, context); 01446 } 01447 01448 close(); 01449 return status; 01450 } 01451 01452 01453 int M2XStreamClient::readStreamValue(stream_value_read_callback callback, 01454 void* context) { 01455 const int BUF_LEN = 64; 01456 char buf[BUF_LEN]; 01457 01458 int length = readContentLength(); 01459 if (length < 0) { 01460 close(); 01461 return length; 01462 } 01463 01464 int index = skipHttpHeader(); 01465 if (index != E_OK) { 01466 close(); 01467 return index; 01468 } 01469 index = 0; 01470 01471 stream_parsing_context_state state; 01472 state.state = state.index = 0; 01473 state.callback = callback; 01474 state.context = context; 01475 01476 jsonlite_parser_callbacks cbs = jsonlite_default_callbacks; 01477 cbs.key_found = on_stream_key_found; 01478 cbs.number_found = on_stream_number_found; 01479 cbs.string_found = on_stream_string_found; 01480 cbs.context.client_state = &state; 01481 01482 jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5)); 01483 jsonlite_parser_set_callback(p, &cbs); 01484 01485 jsonlite_result result = jsonlite_result_unknown; 01486 while (index < length) { 01487 int i = 0; 01488 01489 DBG("%s", "Received Data: "); 01490 while ((i < BUF_LEN) && _client->available()) { 01491 buf[i++] = _client->read(); 01492 DBG("%c", buf[i - 1]); 01493 } 01494 DBGLNEND; 01495 01496 if ((!_client->connected()) && 01497 (!_client->available()) && 01498 ((index + i) < length)) { 01499 jsonlite_parser_release(p); 01500 close(); 01501 return E_NOCONNECTION; 01502 } 01503 01504 result = jsonlite_parser_tokenize(p, buf, i); 01505 if ((result != jsonlite_result_ok) && 01506 (result != jsonlite_result_end_of_stream)) { 01507 jsonlite_parser_release(p); 01508 close(); 01509 return E_JSON_INVALID; 01510 } 01511 01512 index += i; 01513 } 01514 01515 jsonlite_parser_release(p); 01516 close(); 01517 return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID); 01518 } 01519 01520 int M2XStreamClient::readLocation(location_read_callback callback, 01521 void* context) { 01522 const int BUF_LEN = 40; 01523 char buf[BUF_LEN]; 01524 01525 int length = readContentLength(); 01526 if (length < 0) { 01527 close(); 01528 return length; 01529 } 01530 01531 int index = skipHttpHeader(); 01532 if (index != E_OK) { 01533 close(); 01534 return index; 01535 } 01536 index = 0; 01537 01538 location_parsing_context_state state; 01539 state.state = state.index = 0; 01540 state.callback = callback; 01541 state.context = context; 01542 01543 jsonlite_parser_callbacks cbs = jsonlite_default_callbacks; 01544 cbs.key_found = on_location_key_found; 01545 cbs.string_found = on_location_string_found; 01546 cbs.context.client_state = &state; 01547 01548 jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5)); 01549 jsonlite_parser_set_callback(p, &cbs); 01550 01551 jsonlite_result result = jsonlite_result_unknown; 01552 while (index < length) { 01553 int i = 0; 01554 01555 DBG("%s", "Received Data: "); 01556 while ((i < BUF_LEN) && _client->available()) { 01557 buf[i++] = _client->read(); 01558 DBG("%c", buf[i - 1]); 01559 } 01560 DBGLNEND; 01561 01562 if ((!_client->connected()) && 01563 (!_client->available()) && 01564 ((index + i) < length)) { 01565 jsonlite_parser_release(p); 01566 close(); 01567 return E_NOCONNECTION; 01568 } 01569 01570 result = jsonlite_parser_tokenize(p, buf, i); 01571 if ((result != jsonlite_result_ok) && 01572 (result != jsonlite_result_end_of_stream)) { 01573 jsonlite_parser_release(p); 01574 close(); 01575 return E_JSON_INVALID; 01576 } 01577 01578 index += i; 01579 } 01580 01581 jsonlite_parser_release(p); 01582 close(); 01583 return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID); 01584 } 01585 01586 int M2XStreamClient::readCommand(m2x_command_read_callback callback, 01587 void* context) { 01588 const int BUF_LEN = 60; 01589 char buf[BUF_LEN]; 01590 01591 int length = readContentLength(); 01592 if (length < 0) { 01593 close(); 01594 return length; 01595 } 01596 01597 int index = skipHttpHeader(); 01598 if (index != E_OK) { 01599 close(); 01600 return index; 01601 } 01602 index = 0; 01603 01604 m2x_command_parsing_context_state state; 01605 state.state = 0; 01606 state.index = 0; 01607 state.callback = callback; 01608 state.context = context; 01609 01610 jsonlite_parser_callbacks cbs = jsonlite_default_callbacks; 01611 cbs.key_found = m2x_on_command_key_found; 01612 cbs.string_found = m2x_on_command_value_found; 01613 cbs.context.client_state = &state; 01614 01615 jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5)); 01616 jsonlite_parser_set_callback(p, &cbs); 01617 01618 jsonlite_result result = jsonlite_result_unknown; 01619 while (index < length) { 01620 int i = 0; 01621 01622 DBG("%s", "Received Data: "); 01623 while ((i < BUF_LEN) && _client->available()) { 01624 buf[i++] = _client->read(); 01625 DBG("%c", buf[i - 1]); 01626 } 01627 DBGLNEND; 01628 01629 if ((!_client->connected()) && 01630 (!_client->available()) && 01631 ((index + i) < length)) { 01632 jsonlite_parser_release(p); 01633 close(); 01634 return E_NOCONNECTION; 01635 } 01636 01637 result = jsonlite_parser_tokenize(p, buf, i); 01638 if ((result != jsonlite_result_ok) && 01639 (result != jsonlite_result_end_of_stream)) { 01640 jsonlite_parser_release(p); 01641 close(); 01642 return E_JSON_INVALID; 01643 } 01644 01645 index += i; 01646 } 01647 01648 jsonlite_parser_release(p); 01649 close(); 01650 return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID); 01651 } 01652 01653 #endif /* M2X_ENABLE_READER */ 01654 01655 #endif /* M2XStreamClient_h */
Generated on Tue Jul 12 2022 23:02:48 by 1.7.2