ARM mbed M2X API Client: The ARM mbed client library is used to send/receive data to/from AT&T's M2X service from mbed LPC1768 microcontrollers.

Dependents:   m2x-demo-all M2X_MTS_ACCEL_DEMO M2X_MTS_Accel M2X_K64F_ACCEL ... more

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers M2XStreamClient.h Source File

M2XStreamClient.h

00001 #ifndef M2XStreamClient_h
00002 #define M2XStreamClient_h
00003 
00004 #if (!defined(ARDUINO_PLATFORM)) && (!defined(ESP8266_PLATFORM)) && (!defined(MBED_PLATFORM))
00005 #error "Platform definition is missing!"
00006 #endif
00007 
00008 #define M2X_VERSION "2.2.0"
00009 
00010 #ifdef ARDUINO_PLATFORM
00011 #include "m2x-arduino.h"
00012 #endif  /* ARDUINO_PLATFORM */
00013 
00014 #ifdef ESP8266_PLATFORM
00015 #include "m2x-esp8266.h"
00016 #endif /* ESP8266_PLATFORM */
00017 
00018 #ifdef MBED_PLATFORM
00019 #include "m2x-mbed.h"
00020 #endif /* MBED_PLATFORM */
00021 
00022 /* If we don't have DBG defined, provide dump implementation */
00023 #ifndef DBG
00024 #define DBG(fmt_, data_)
00025 #define DBGLN(fmt_, data_)
00026 #define DBGLNEND
00027 #endif  /* DBG */
00028 
00029 #define MIN(a, b) (((a) > (b))?(b):(a))
00030 #define TO_HEX(t_) ((char) (((t_) > 9) ? ((t_) - 10 + 'A') : ((t_) + '0')))
00031 #define MAX_DOUBLE_DIGITS 7
00032 
00033 /* For tolower */
00034 #include <ctype.h>
00035 
00036 static const int E_OK = 0;
00037 static const int E_NOCONNECTION = -1;
00038 static const int E_DISCONNECTED = -2;
00039 static const int E_NOTREACHABLE = -3;
00040 static const int E_INVALID = -4;
00041 static const int E_JSON_INVALID = -5;
00042 static const int E_BUFFER_TOO_SMALL = -6;
00043 static const int E_TIMESTAMP_ERROR = -8;
00044 
00045 static const char* DEFAULT_M2X_HOST = "api-m2x.att.com";
00046 static const int DEFAULT_M2X_PORT = 80;
00047 
00048 static inline bool m2x_status_is_success(int status) {
00049   return (status == E_OK) || (status >= 200 && status <= 299);
00050 }
00051 
00052 static inline bool m2x_status_is_client_error(int status) {
00053   return status >= 400 && status <= 499;
00054 }
00055 
00056 static inline bool m2x_status_is_server_error(int status) {
00057   return status >= 500 && status <= 599;
00058 }
00059 
00060 static inline bool m2x_status_is_error(int status) {
00061   return m2x_status_is_client_error(status) ||
00062       m2x_status_is_server_error(status);
00063 }
00064 
00065 // Null Print class used to calculate length to print
00066 class NullPrint : public Print {
00067 public:
00068   size_t counter;
00069 
00070   virtual size_t write(uint8_t b) {
00071     counter++;
00072     return 1;
00073   }
00074 
00075   virtual size_t write(const uint8_t* buf, size_t size) {
00076     counter += size;
00077     return size;
00078   }
00079 };
00080 
00081 // Encodes and prints string using Percent-encoding specified
00082 // in RFC 1738, Section 2.2
00083 static inline int print_encoded_string(Print* print, const char* str) {
00084   int bytes = 0;
00085   for (int i = 0; str[i] != 0; i++) {
00086     if (((str[i] >= 'A') && (str[i] <= 'Z')) ||
00087         ((str[i] >= 'a') && (str[i] <= 'z')) ||
00088         ((str[i] >= '0') && (str[i] <= '9')) ||
00089         (str[i] == '-') || (str[i] == '_') ||
00090         (str[i] == '.') || (str[i] == '~')) {
00091       bytes += print->print(str[i]);
00092     } else {
00093       // Encode all other characters
00094       bytes += print->print('%');
00095       bytes += print->print(TO_HEX(str[i] / 16));
00096       bytes += print->print(TO_HEX(str[i] % 16));
00097     }
00098   }
00099   return bytes;
00100 }
00101 
00102 #ifdef M2X_ENABLE_READER
00103 /*
00104  * +type+ indicates the value type: 1 for string, 2 for number
00105  * NOTE that the value type here only contains a hint on how
00106  * you can use the value. Even though 2 is returned, the value
00107  * is still stored in (const char *), and atoi/atof is needed to
00108  * get the actual value
00109  */
00110 typedef void (*stream_value_read_callback)(const char* at,
00111                                            const char* value,
00112                                            int index,
00113                                            void* context,
00114                                            int type);
00115 
00116 typedef void (*location_read_callback)(const char* name,
00117                                        double latitude,
00118                                        double longitude,
00119                                        double elevation,
00120                                        const char* timestamp,
00121                                        int index,
00122                                        void* context);
00123 
00124 typedef void (*m2x_command_read_callback)(const char* id,
00125                                           const char* name,
00126                                           int index,
00127                                           void *context);
00128 #endif  /* M2X_ENABLE_READER */
00129 
00130 typedef void (*m2x_fill_data_callback)(Print *print, void *context);
00131 
00132 class M2XStreamClient {
00133 public:
00134   M2XStreamClient(Client* client,
00135                   const char* key,
00136                   void (* idlefunc)(void) = NULL,
00137                   int case_insensitive = 1,
00138                   const char* host = DEFAULT_M2X_HOST,
00139                   int port = DEFAULT_M2X_PORT,
00140                   const char* path_prefix = NULL);
00141 
00142   // Push data stream value using PUT request, returns the HTTP status code
00143   // NOTE: if you want to update by a serial, use "serial/<serial ID>" as
00144   // the device ID here.
00145   template <class T>
00146   int updateStreamValue(const char* deviceId, const char* streamName, T value);
00147 
00148   // Post multiple values to M2X all at once.
00149   // +deviceId+ - id of the device to post values
00150   // +streamNum+ - Number of streams to post
00151   // +names+ - Array of stream names, the length of the array should
00152   // be exactly +streamNum+
00153   // +counts+ - Array of +streamNum+ length, each item in this array
00154   // containing the number of values we want to post for each stream
00155   // +ats+ - Timestamps for each value, the length of this array should
00156   // be the some of all values in +counts+, for the first +counts[0]+
00157   // items, the values belong to the first stream, for the following
00158   // +counts[1]+ number of items, the values belong to the second stream,
00159   // etc. Notice that timestamps are required here: you must provide
00160   // a timestamp for each value posted.
00161   // +values+ - Values to post. This works the same way as +ats+, the
00162   // first +counts[0]+ number of items contain values to post to the first
00163   // stream, the succeeding +counts[1]+ number of items contain values
00164   // for the second stream, etc. The length of this array should be
00165   // the sum of all values in +counts+ array.
00166   // NOTE: if you want to update by a serial, use "serial/<serial ID>" as
00167   // the device ID here.
00168   template <class T>
00169   int postDeviceUpdates(const char* deviceId, int streamNum,
00170                         const char* names[], const int counts[],
00171                         const char* ats[], T values[]);
00172 
00173   // Post multiple values of a single device at once.
00174   // +deviceId+ - id of the device to post values
00175   // +streamNum+ - Number of streams to post
00176   // +names+ - Array of stream names, the length of the array should
00177   // be exactly +streamNum+
00178   // +values+ - Array of values to post, the length of the array should
00179   // be exactly +streamNum+. Notice that the array of +values+ should
00180   // match the array of +names+, and that the ith value in +values+ is
00181   // exactly the value to post for the ith stream name in +names+
00182   // NOTE: if you want to update by a serial, use "serial/<serial ID>" as
00183   // the device ID here.
00184   template <class T>
00185   int postDeviceUpdate(const char* deviceId, int streamNum,
00186                        const char* names[], T values[],
00187                        const char* at = NULL);
00188 
00189 #ifdef M2X_ENABLE_READER
00190   // Fetch values for a particular data stream. Since memory is
00191   // very limited on an Arduino, we cannot parse and get all the
00192   // data points in memory. Instead, we use callbacks here: whenever
00193   // a new data point is parsed, we call the callback using the values,
00194   // after that, the values will be thrown away to make space for new
00195   // values.
00196   // Note that you can also pass in a user-specified context in this
00197   // function, this context will be passed to the callback function
00198   // each time we get a data point.
00199   // For each data point, the callback will be called once. The HTTP
00200   // status code will be returned. And the content is only parsed when
00201   // the status code is 200.
00202   int listStreamValues(const char* deviceId, const char* streamName,
00203                        stream_value_read_callback callback, void* context,
00204                        const char* query = NULL);
00205 #endif  /* M2X_ENABLE_READER */
00206 
00207   // Update datasource location
00208   // NOTE: On an Arduino Uno and other ATMEGA based boards, double has
00209   // 4-byte (32 bits) precision, which is the same as float. So there's
00210   // no natural double-precision floating number on these boards. With
00211   // a float value, we have a precision of roughly 7 digits, that means
00212   // either 5 or 6 digits after the floating point. According to wikipedia,
00213   // a difference of 0.00001 will give us ~1.1132m distance. If this
00214   // precision is good for you, you can use the double-version we provided
00215   // here. Otherwise, you may need to use the string-version and do the
00216   // actual conversion by yourselves.
00217   // However, with an Arduino Due board, double has 8-bytes (64 bits)
00218   // precision, which means you are free to use the double-version only
00219   // without any precision problems.
00220   // Returned value is the http status code.
00221   // NOTE: if you want to update by a serial, use "serial/<serial ID>" as
00222   // the device ID here.
00223   template <class T>
00224   int updateLocation(const char* deviceId, const char* name,
00225                      T latitude, T longitude, T elevation);
00226 
00227 #ifdef M2X_ENABLE_READER
00228   // Read location information for a device. Also used callback to process
00229   // data points for memory reasons. The HTTP status code is returned,
00230   // response is only parsed when the HTTP status code is 200
00231   int readLocation(const char* deviceId, location_read_callback callback,
00232                    void* context);
00233 #endif  /* M2X_ENABLE_READER */
00234 
00235   // Delete values from a data stream
00236   // You will need to provide from and end date/time strings in the ISO8601
00237   // format "yyyy-mm-ddTHH:MM:SS.SSSZ" where
00238   //   yyyy: the year
00239   //   mm: the month
00240   //   dd: the day
00241   //   HH: the hour (24 hour format)
00242   //   MM: the minute
00243   //   SS.SSS: the seconds (to the millisecond)
00244   // NOTE: the time is given in Zulu (GMT)
00245   // M2X will delete all values within the from to end date/time range.
00246   // The status code is 204 on success and 400 on a bad request (e.g. the
00247   // timestamp is not in ISO8601 format or the from timestamp is not less than
00248   // or equal to the end timestamp.
00249   int deleteValues(const char* deviceId, const char* streamName,
00250                    const char* from, const char* end);
00251 
00252 #ifdef M2X_ENABLE_READER
00253   // Fetch commands available for this device, notice that for memory constraints,
00254   // we only keep ID and name of the command received here.
00255   // You can tweak the command receiving via the query parameter.
00256   int listCommands(const char* deviceId,
00257                    m2x_command_read_callback callback, void* context,
00258                    const char* query = NULL);
00259 #endif  /* M2X_ENABLE_READER */
00260 
00261   // Mark a command as processed.
00262   // Link: https://m2x.att.com/developer/documentation/v2/commands#Device-Marks-a-Command-as-Processed
00263   // To make sure the minimal amount of memory is needed, this API works with
00264   // a callback function. The callback function is then used to fill the request
00265   // data, note that in order to correctly set the content length in HTTP header,
00266   // the callback function will be called twice, the caller must make sure both
00267   // calls fill the Print object with exactly the same data.
00268   // If you have a pre-allocated buffer filled with the data to post, you can
00269   // use markCommandProcessedWithData API below.
00270   int markCommandProcessed(const char* deviceId, const char* commandId,
00271                            m2x_fill_data_callback callback, void *context);
00272 
00273   // Mark a command as processed with a data buffer
00274   // Link: https://m2x.att.com/developer/documentation/v2/commands#Device-Marks-a-Command-as-Processed
00275   // This is exactly like markCommandProcessed, except that a buffer is use to
00276   // contain the data to post as the request body.
00277   int markCommandProcessedWithData(const char* deviceId, const char* commandId,
00278                                    const char* data);
00279 
00280   // Mark a command as rejected.
00281   // Link: https://m2x.att.com/developer/documentation/v2/commands#Device-Marks-a-Command-as-Rejected
00282   // To make sure the minimal amount of memory is needed, this API works with
00283   // a callback function. The callback function is then used to fill the request
00284   // data, note that in order to correctly set the content length in HTTP header,
00285   // the callback function will be called twice, the caller must make sure both
00286   // calls fill the Print object with exactly the same data.
00287   // If you have a pre-allocated buffer filled with the data to post, you can
00288   // use markCommandRejectedWithData API below.
00289   int markCommandRejected(const char* deviceId, const char* commandId,
00290                           m2x_fill_data_callback callback, void *context);
00291 
00292   // Mark a command as rejected with a data buffer
00293   // Link: https://m2x.att.com/developer/documentation/v2/commands#Device-Marks-a-Command-as-Rejected
00294   // This is exactly like markCommandRejected, except that a buffer is use to
00295   // contain the data to post as the request body.
00296   int markCommandRejectedWithData(const char* deviceId, const char* commandId,
00297                                   const char* data);
00298 
00299   // Fetches current timestamp in seconds from M2X server. Since we
00300   // are using signed 32-bit integer as return value, this will only
00301   // return valid results before 03:14:07 UTC on 19 January 2038. If
00302   // the device is supposed to work after that, this function should
00303   // not be used.
00304   //
00305   // The returned value will contain the status code(positive values)
00306   // or the error code(negative values).
00307   // In case of success, the current timestamp will be filled in the
00308   // +ts+ pointer passed in as argument.
00309   //
00310   // NOTE: although returning uint32_t can give us a larger space,
00311   // we prefer to cope with the unix convention here.
00312   int getTimestamp32(int32_t* ts);
00313 
00314   // Fetches current timestamp in seconds from M2X server.
00315   // This function will return the timestamp as an integer literal
00316   // in the provided buffer. Hence there's no problem working after
00317   // 03:14:07 UTC on 19 January 2038. The drawback part here, is that
00318   // you will have to work with 64-bit integer, which is not available
00319   // on certain platform(such as Arduino), a bignum library or alike
00320   // is needed in this case.
00321   //
00322   // Notice +bufferLength+ is supposed to contain the length of the
00323   // buffer when calling this function. It is also the caller's
00324   // responsibility to ensure the buffer is big enough, otherwise
00325   // the library will return an error indicating the buffer is too
00326   // small.
00327   // While this is not accurate all the time, one trick here is to
00328   // pass in 0 as the bufferLength, in which case we will always return
00329   // the buffer-too-small error. However, the correct buffer length
00330   // can be found this way so a secound execution is most likely to work
00331   // (unless we are at the edge of the buffer length increasing, for
00332   // example, when the timestamp jumps from 9999999999 to 10000000000,
00333   // which is highly unlikely to happend). However, given that the
00334   // maximum 64-bit integer can be stored in 19 bytes, there's not
00335   // much need to use this trick.)
00336   //
00337   // The returned value will contain the status code(positive values)
00338   // or the error code(negative values).
00339   // In case of success, the current timestamp will be filled in the
00340   // passed +buffer+ pointer, and the actual used buffer length will
00341   // be returned in +bufferLength+ pointer.
00342   // NOTE: as long as we can read the returned buffer length, it will
00343   // be used to fill in the +bufferLength+ variable even though other
00344   // errors occur(buffer is not enough, network is shutdown before
00345   // reading the whole buffer, etc.)
00346   int getTimestamp(char* buffer, int* bufferLength);
00347 private:
00348   Client* _client;
00349   const char* _key;
00350   int _case_insensitive;
00351   const char* _host;
00352   int _port;
00353   void (* _idlefunc)(void);
00354   const char* _path_prefix;
00355   NullPrint _null_print;
00356 
00357   // Writes the HTTP header part for updating a stream value
00358   void writePutHeader(const char* deviceId,
00359                       const char* streamName,
00360                       int contentLength);
00361   // Writes the HTTP header part for deleting stream values
00362   void writeDeleteHeader(const char* deviceId,
00363                          const char* streamName,
00364                          int contentLength);
00365   // Writes HTTP header lines including M2X API Key, host, content
00366   // type and content length(if the body exists)
00367   void writeHttpHeader(int contentLength);
00368   // Parses HTTP response header and return the content length.
00369   // Note that this function does not parse all http headers, as long
00370   // as the content length is found, this function will return
00371   int readContentLength();
00372   // Skips all HTTP response header part. Return minus value in case
00373   // the connection is closed before we got all headers
00374   int skipHttpHeader();
00375   // Parses and returns the HTTP status code, note this function will
00376   // return immediately once it gets the status code
00377   int readStatusCode(bool closeClient);
00378   // Waits for a certain string pattern in the HTTP header, and returns
00379   // once the pattern is found. In the pattern, you can use '*' to denote
00380   // any character
00381   int waitForString(const char* str);
00382   // Closes the connection
00383   void close();
00384 
00385 #ifdef M2X_ENABLE_READER
00386   // Parses JSON response of stream value API, and calls callback function
00387   // once we get a data point
00388   int readStreamValue(stream_value_read_callback callback, void* context);
00389   // Parses JSON response of location API, and calls callback function once
00390   // we get a data point
00391   int readLocation(location_read_callback callback, void* context);
00392   // Parses JSON response of command API, and calls callback function once
00393   // we get a data point
00394   int readCommand(m2x_command_read_callback callback, void* context);
00395 #endif  /* M2X_ENABLE_READER */
00396 };
00397 
00398 
00399 // A ISO8601 timestamp generation service for M2X.
00400 // It uses the Time API provided by the M2X server to initialize
00401 // clock, then uses millis() function provided by Arduino to calculate
00402 // time advancements so as to reduce API query times.
00403 //
00404 // Right now, this service only works with 32-bit timestamp, meaning that
00405 // this service won't work after 03:14:07 UTC on 19 January 2038. However,
00406 // a similar service that uses 64-bit timestamp can be implemented following
00407 // the logic here.
00408 class TimeService {
00409 public:
00410   TimeService(M2XStreamClient* client);
00411 
00412   // Initialize the time service. Notice the TimeService instance is only
00413   // working after calling this function successfully.
00414   int init();
00415 
00416   // Reset the internal recorded time by calling M2X Time API again. Normally,
00417   // you don't need to call this manually. TimeService will handle Arduino clock
00418   // overflow automatically
00419   int reset();
00420 
00421   // Fills ISO8601 formatted timestamp into the buffer provided. +length+ should
00422   // contains the maximum supported length of the buffer when calling. For now,
00423   // the buffer should be able to store 25 characters for a full ISO8601 formatted
00424   // timestamp, otherwise, an error will be returned.
00425   int getTimestamp(char* buffer, int* length);
00426 private:
00427   M2XStreamClient* _client;
00428   int32_t _server_timestamp;
00429   uint32_t _local_last_milli;
00430   M2XTimer _timer;
00431 };
00432 
00433 
00434 // Implementations
00435 M2XStreamClient::M2XStreamClient(Client* client,
00436                                  const char* key,
00437                                  void (* idlefunc)(void),
00438                                  int case_insensitive,
00439                                  const char* host,
00440                                  int port,
00441                                  const char* path_prefix) : _client(client),
00442                                              _key(key),
00443                                              _idlefunc(idlefunc),
00444                                              _case_insensitive(case_insensitive),
00445                                              _host(host),
00446                                              _port(port),
00447                                              _path_prefix(path_prefix),
00448                                              _null_print() {
00449 }
00450 
00451 template <class T>
00452 int M2XStreamClient::updateStreamValue(const char* deviceId, const char* streamName, T value) {
00453   if (_client->connect(_host, _port)) {
00454     DBGLN("%s", "Connected to M2X server!");
00455     writePutHeader(deviceId, streamName,
00456                    //  for {"value": and }
00457                    _null_print.print(value) + 12);
00458     _client->print("{\"value\":\"");
00459     _client->print(value);
00460     _client->print("\"}");
00461   } else {
00462     DBGLN("%s", "ERROR: Cannot connect to M2X server!");
00463     return E_NOCONNECTION;
00464   }
00465 
00466   return readStatusCode(true);
00467 }
00468 
00469 template <class T>
00470 inline int write_multiple_values(Print* print, int streamNum,
00471                                  const char* names[], const int counts[],
00472                                  const char* ats[], T values[]) {
00473   int bytes = 0, value_index = 0;
00474   bytes += print->print("{\"values\":{");
00475   for (int i = 0; i < streamNum; i++) {
00476     bytes += print->print("\"");
00477     bytes += print->print(names[i]);
00478     bytes += print->print("\":[");
00479     for (int j = 0; j < counts[i]; j++) {
00480       bytes += print->print("{\"timestamp\": \"");
00481       bytes += print->print(ats[value_index]);
00482       bytes += print->print("\",\"value\": \"");
00483       bytes += print->print(values[value_index]);
00484       bytes += print->print("\"}");
00485       if (j < counts[i] - 1) { bytes += print->print(","); }
00486       value_index++;
00487     }
00488     bytes += print->print("]");
00489     if (i < streamNum - 1) { bytes += print->print(","); }
00490   }
00491   bytes += print->print("}}");
00492   return bytes;
00493 }
00494 
00495 template <class T>
00496 int M2XStreamClient::postDeviceUpdates(const char* deviceId, int streamNum,
00497                                        const char* names[], const int counts[],
00498                                        const char* ats[], T values[]) {
00499   if (_client->connect(_host, _port)) {
00500     DBGLN("%s", "Connected to M2X server!");
00501     int length = write_multiple_values(&_null_print, streamNum, names,
00502                                        counts, ats, values);
00503     _client->print("POST ");
00504     if (_path_prefix) { _client->print(_path_prefix); }
00505     _client->print("/v2/devices/");
00506     _client->print(deviceId);
00507     _client->println("/updates HTTP/1.0");
00508     writeHttpHeader(length);
00509     write_multiple_values(_client, streamNum, names, counts, ats, values);
00510   } else {
00511     DBGLN("%s", "ERROR: Cannot connect to M2X server!");
00512     return E_NOCONNECTION;
00513   }
00514   return readStatusCode(true);
00515 }
00516 
00517 template <class T>
00518 inline int write_single_device_values(Print* print, int streamNum,
00519                                       const char* names[], T values[],
00520                                       const char* at) {
00521   int bytes = 0;
00522   bytes += print->print("{\"values\":{");
00523   for (int i = 0; i < streamNum; i++) {
00524     bytes += print->print("\"");
00525     bytes += print->print(names[i]);
00526     bytes += print->print("\": \"");
00527     bytes += print->print(values[i]);
00528     bytes += print->print("\"");
00529     if (i < streamNum - 1) { bytes += print->print(","); }
00530   }
00531   bytes += print->print("}");
00532   if (at != NULL) {
00533     bytes += print->print(",\"timestamp\":\"");
00534     bytes += print->print(at);
00535     bytes += print->print("\"");
00536   }
00537   bytes += print->print("}");
00538   return bytes;
00539 }
00540 
00541 template <class T>
00542 int M2XStreamClient::postDeviceUpdate(const char* deviceId, int streamNum,
00543                                       const char* names[], T values[],
00544                                       const char* at) {
00545   if (_client->connect(_host, _port)) {
00546     DBGLN("%s", "Connected to M2X server!");
00547     int length = write_single_device_values(&_null_print, streamNum, names,
00548                                             values, at);
00549     _client->print("POST ");
00550     if (_path_prefix) { _client->print(_path_prefix); }
00551     _client->print("/v2/devices/");
00552     _client->print(deviceId);
00553     _client->println("/update HTTP/1.0");
00554     writeHttpHeader(length);
00555     write_single_device_values(_client, streamNum, names, values, at);
00556   } else {
00557     DBGLN("%s", "ERROR: Cannot connect to M2X server!");
00558     return E_NOCONNECTION;
00559   }
00560   return readStatusCode(true);
00561 }
00562 
00563 template <class T>
00564 static int write_location_data(Print* print, const char* name,
00565                                T latitude, T longitude,
00566                                T elevation) {
00567   int bytes = 0;
00568   bytes += print->print("{\"name\":\"");
00569   bytes += print->print(name);
00570   bytes += print->print("\",\"latitude\":\"");
00571   bytes += print->print(latitude);
00572   bytes += print->print("\",\"longitude\":\"");
00573   bytes += print->print(longitude);
00574   bytes += print->print("\",\"elevation\":\"");
00575   bytes += print->print(elevation);
00576   bytes += print->print("\"}");
00577   return bytes;
00578 }
00579 
00580 static int write_location_data(Print* print, const char* name,
00581                                double latitude, double longitude,
00582                                double elevation) {
00583   int bytes = 0;
00584   bytes += print->print("{\"name\":\"");
00585   bytes += print->print(name);
00586   bytes += print->print("\",\"latitude\":\"");
00587   bytes += print->print(latitude, MAX_DOUBLE_DIGITS);
00588   bytes += print->print("\",\"longitude\":\"");
00589   bytes += print->print(longitude, MAX_DOUBLE_DIGITS);
00590   bytes += print->print("\",\"elevation\":\"");
00591   bytes += print->print(elevation);
00592   bytes += print->print("\"}");
00593   return bytes;
00594 }
00595 
00596 template <class T>
00597 int M2XStreamClient::updateLocation(const char* deviceId,
00598                                     const char* name,
00599                                     T latitude,
00600                                     T longitude,
00601                                     T elevation) {
00602   if (_client->connect(_host, _port)) {
00603     DBGLN("%s", "Connected to M2X server!");
00604 
00605     int length = write_location_data(&_null_print, name, latitude, longitude,
00606                                      elevation);
00607     _client->print("PUT ");
00608     if (_path_prefix) { _client->print(_path_prefix); }
00609     _client->print("/v2/devices/");
00610     _client->print(deviceId);
00611     _client->println("/location HTTP/1.0");
00612 
00613     writeHttpHeader(length);
00614     write_location_data(_client, name, latitude, longitude, elevation);
00615   } else {
00616     DBGLN("%s", "ERROR: Cannot connect to M2X server!");
00617     return E_NOCONNECTION;
00618   }
00619   return readStatusCode(true);
00620 }
00621 
00622 static inline int write_delete_values(Print* print, const char* from,
00623                                       const char* end) {
00624   int bytes = 0;
00625   bytes += print->print("{\"from\":\"");
00626   bytes += print->print(from);
00627   bytes += print->print("\",\"end\":\"");
00628   bytes += print->print(end);
00629   bytes += print->print("\"}");
00630   return bytes;
00631 }
00632 
00633 int M2XStreamClient::deleteValues(const char* deviceId, const char* streamName,
00634                                   const char* from, const char* end) {
00635   if (_client->connect(_host, _port)) {
00636     DBGLN("%s", "Connected to M2X server!");
00637     int length = write_delete_values(&_null_print, from, end);
00638     writeDeleteHeader(deviceId, streamName, length);
00639     write_delete_values(_client, from, end);
00640   } else {
00641     DBGLN("%s", "ERROR: Cannot connect to M2X server!");
00642     return E_NOCONNECTION;
00643   }
00644 
00645   return readStatusCode(true);
00646 }
00647 
00648 int M2XStreamClient::markCommandProcessed(const char* deviceId,
00649                                           const char* commandId,
00650                                           m2x_fill_data_callback callback,
00651                                           void *context) {
00652   if (_client->connect(_host, _port)) {
00653     DBGLN("%s", "Connected to M2X server!");
00654     _null_print.counter = 0;
00655     callback(&_null_print, context);
00656     int length = _null_print.counter;
00657     _client->print("POST ");
00658     if (_path_prefix) { _client->print(_path_prefix); }
00659     _client->print("/v2/devices/");
00660     _client->print(deviceId);
00661     _client->print("/commands/");
00662     _client->print(commandId);
00663     _client->print("/process");
00664     _client->println(" HTTP/1.0");
00665     writeHttpHeader(length);
00666     callback(_client, context);
00667   } else {
00668     DBGLN("%s", "ERROR: Cannot connect to M2X server!");
00669     return E_NOCONNECTION;
00670   }
00671   return readStatusCode(true);
00672 }
00673 
00674 static void m2x_fixed_buffer_filling_callback(Print *print, void *context) {
00675   if (context) {
00676     print->print((const char *) context);
00677   }
00678 }
00679 
00680 int M2XStreamClient::markCommandProcessedWithData(const char* deviceId,
00681                                                   const char* commandId,
00682                                                   const char* data) {
00683   return markCommandProcessed(deviceId, commandId,
00684                               m2x_fixed_buffer_filling_callback, (void *) data);
00685 }
00686 
00687 int M2XStreamClient::markCommandRejected(const char* deviceId,
00688                                          const char* commandId,
00689                                          m2x_fill_data_callback callback,
00690                                          void *context) {
00691   if (_client->connect(_host, _port)) {
00692     DBGLN("%s", "Connected to M2X server!");
00693     _null_print.counter = 0;
00694     callback(&_null_print, context);
00695     int length = _null_print.counter;
00696     _client->print("POST ");
00697     if (_path_prefix) { _client->print(_path_prefix); }
00698     _client->print("/v2/devices/");
00699     _client->print(deviceId);
00700     _client->print("/commands/");
00701     _client->print(commandId);
00702     _client->print("/reject");
00703     _client->println(" HTTP/1.0");
00704     writeHttpHeader(length);
00705     callback(_client, context);
00706   } else {
00707     DBGLN("%s", "ERROR: Cannot connect to M2X server!");
00708     return E_NOCONNECTION;
00709   }
00710   return readStatusCode(true);
00711 }
00712 
00713 int M2XStreamClient::markCommandRejectedWithData(const char* deviceId,
00714                                                  const char* commandId,
00715                                                  const char* data) {
00716   return markCommandRejected(deviceId, commandId,
00717                              m2x_fixed_buffer_filling_callback, (void *) data);
00718 }
00719 
00720 int M2XStreamClient::getTimestamp32(int32_t *ts) {
00721   // The maximum value of signed 64-bit integer is 0x7fffffffffffffff,
00722   // which is 9223372036854775807. It consists of 19 characters, so a
00723   // buffer of 20 is definitely enough here
00724   int length = 20;
00725   char buffer[20];
00726   int status = getTimestamp(buffer, &length);
00727   if (status == 200) {
00728     int32_t result = 0;
00729     for (int i = 0; i < length; i++) {
00730       result = result * 10 + (buffer[i] - '0');
00731     }
00732     if (ts != NULL) { *ts = result; }
00733   }
00734   return status;
00735 }
00736 
00737 int M2XStreamClient::getTimestamp(char* buffer, int *bufferLength) {
00738   if (bufferLength == NULL) { return E_INVALID; }
00739   if (_client->connect(_host, _port)) {
00740     DBGLN("%s", "Connected to M2X server!");
00741     _client->print("GET ");
00742     if (_path_prefix) { _client->print(_path_prefix); }
00743     _client->println("/v2/time/seconds HTTP/1.0");
00744 
00745     writeHttpHeader(-1);
00746   } else {
00747     DBGLN("%s", "ERROR: Cannot connect to M2X server!");
00748     return E_NOCONNECTION;
00749   }
00750   int status = readStatusCode(false);
00751   if (status == 200) {
00752     int length = readContentLength();
00753     if (length < 0) {
00754       close();
00755       return length;
00756     }
00757     if (*bufferLength < length) {
00758       *bufferLength = length;
00759       return E_BUFFER_TOO_SMALL;
00760     }
00761     *bufferLength = length;
00762     int index = skipHttpHeader();
00763     if (index != E_OK) {
00764       close();
00765       return index;
00766     }
00767     index = 0;
00768     while (index < length) {
00769       DBG("%s", "Received Data: ");
00770       while ((index < length) && _client->available()) {
00771         buffer[index++] = _client->read();
00772         DBG("%c", buffer[index - 1]);
00773       }
00774       DBGLNEND;
00775 
00776       if ((!_client->connected()) &&
00777           (index < length)) {
00778         close();
00779         return E_NOCONNECTION;
00780       }
00781 
00782       if (_idlefunc!=NULL) {
00783         _idlefunc();
00784       } else {
00785         delay(200);
00786       }
00787     }
00788   }
00789   close();
00790   return status;
00791 }
00792 
00793 
00794 void M2XStreamClient::writePutHeader(const char* deviceId,
00795                                      const char* streamName,
00796                                      int contentLength) {
00797   _client->print("PUT ");
00798   if (_path_prefix) { _client->print(_path_prefix); }
00799   _client->print("/v2/devices/");
00800   _client->print(deviceId);
00801   _client->print("/streams/");
00802   print_encoded_string(_client, streamName);
00803   _client->println("/value HTTP/1.0");
00804 
00805   writeHttpHeader(contentLength);
00806 }
00807 
00808 void M2XStreamClient::writeDeleteHeader(const char* deviceId,
00809                                         const char* streamName,
00810                                         int contentLength) {
00811   _client->print("DELETE ");
00812   if (_path_prefix) { _client->print(_path_prefix); }
00813   _client->print("/v2/devices/");
00814   _client->print(deviceId);
00815   _client->print("/streams/");
00816   print_encoded_string(_client, streamName);
00817   _client->print("/values");
00818   _client->println(" HTTP/1.0");
00819 
00820   writeHttpHeader(contentLength);
00821 }
00822 
00823 void M2XStreamClient::writeHttpHeader(int contentLength) {
00824   _client->println(USER_AGENT);
00825   _client->print("X-M2X-KEY: ");
00826   _client->println(_key);
00827 
00828   _client->print("Host: ");
00829   print_encoded_string(_client, _host);
00830   if (_port != DEFAULT_M2X_PORT) {
00831     _client->print(":");
00832     // port is an integer, does not need encoding
00833     _client->print(_port);
00834   }
00835   _client->println();
00836 
00837   if (contentLength > 0) {
00838     _client->println("Content-Type: application/json");
00839     DBG("%s", "Content Length: ");
00840     DBGLN("%d", contentLength);
00841 
00842     _client->print("Content-Length: ");
00843     _client->println(contentLength);
00844   }
00845   _client->println();
00846 }
00847 
00848 int M2XStreamClient::waitForString(const char* str) {
00849   int currentIndex = 0;
00850   if (str[currentIndex] == '\0') return E_OK;
00851 
00852   while (true) {
00853     while (_client->available()) {
00854       char c = _client->read();
00855       DBG("%c", c);
00856 
00857       int cmp;
00858       if (_case_insensitive) {
00859         cmp = tolower(c) - tolower(str[currentIndex]);
00860       } else {
00861         cmp = c - str[currentIndex];
00862       }
00863 
00864       if ((str[currentIndex] == '*') || (cmp == 0)) {
00865         currentIndex++;
00866         if (str[currentIndex] == '\0') {
00867           return E_OK;
00868         }
00869       } else {
00870         // start from the beginning
00871         currentIndex = 0;
00872       }
00873     }
00874 
00875     if (!_client->connected()) {
00876       DBGLN("%s", "ERROR: The client is disconnected from the server!");
00877 
00878       close();
00879       return E_DISCONNECTED;
00880     }
00881     if (_idlefunc!=NULL)
00882       _idlefunc();
00883     else
00884       delay(200);
00885   }
00886   // never reached here
00887   return E_NOTREACHABLE;
00888 }
00889 
00890 int M2XStreamClient::readStatusCode(bool closeClient) {
00891   int responseCode = 0;
00892   int ret = waitForString("HTTP/*.* ");
00893   if (ret != E_OK) {
00894     if (closeClient) close();
00895     return ret;
00896   }
00897 
00898   // ret is not needed from here(since it must be E_OK), so we can use it
00899   // as a regular variable now.
00900   ret = 0;
00901   while (true) {
00902     while (_client->available()) {
00903       char c = _client->read();
00904       DBG("%c", c);
00905 
00906       responseCode = responseCode * 10 + (c - '0');
00907       ret++;
00908       if (ret == 3) {
00909         if (closeClient) close();
00910         return responseCode;
00911       }
00912     }
00913 
00914     if (!_client->connected()) {
00915       DBGLN("%s", "ERROR: The client is disconnected from the server!");
00916 
00917       if (closeClient) close();
00918       return E_DISCONNECTED;
00919     }
00920     if (_idlefunc!=NULL)
00921       _idlefunc();
00922     else
00923       delay(200);
00924   }
00925 
00926   // never reached here
00927   return E_NOTREACHABLE;
00928 }
00929 
00930 int M2XStreamClient::readContentLength() {
00931   int ret = waitForString("Content-Length: ");
00932   if (ret != E_OK) {
00933     return ret;
00934   }
00935 
00936   // From now on, ret is not needed, we can use it
00937   // to keep the final result
00938   ret = 0;
00939   while (true) {
00940     while (_client->available()) {
00941       char c = _client->read();
00942       DBG("%c", c);
00943 
00944       if ((c == '\r') || (c == '\n')) {
00945         return (ret == 0) ? (E_INVALID) : (ret);
00946       } else {
00947         ret = ret * 10 + (c - '0');
00948       }
00949     }
00950 
00951     if (!_client->connected()) {
00952       DBGLN("%s", "ERROR: The client is disconnected from the server!");
00953 
00954       return E_DISCONNECTED;
00955     }
00956     if (_idlefunc!=NULL)
00957       _idlefunc();
00958     else
00959       delay(200);
00960   }
00961 
00962   // never reached here
00963   return E_NOTREACHABLE;
00964 }
00965 
00966 int M2XStreamClient::skipHttpHeader() {
00967   return waitForString("\n\r\n");
00968 }
00969 
00970 void M2XStreamClient::close() {
00971   // Eats up buffered data before closing
00972   _client->flush();
00973   _client->stop();
00974 }
00975 
00976 static inline int fill_iso8601_timestamp(int32_t seconds, int32_t milli,
00977                                          char* buffer, int* length);
00978 
00979 TimeService::TimeService(M2XStreamClient* client) : _client(client) {
00980 }
00981 
00982 int TimeService::init() {
00983   _timer.start();
00984   return reset();
00985 }
00986 
00987 int TimeService::reset() {
00988   int32_t ts;
00989   int status = _client->getTimestamp32(&ts);
00990 
00991   if (m2x_status_is_success(status)) {
00992     _server_timestamp = ts;
00993     _local_last_milli = _timer.read_ms();
00994   }
00995 
00996   return status;
00997 }
00998 
00999 int TimeService::getTimestamp(char* buffer, int* length) {
01000   uint32_t now = _timer.read_ms();
01001   if (now < _local_last_milli) {
01002     // In case of a timestamp overflow(happens once every 50 days on
01003     // Arduino), we reset the server timestamp recorded.
01004     // NOTE: while on an Arduino this might be okay, the situation is worse
01005     // on mbed, see the notes in m2x-mbed.h for details
01006     int status = reset();
01007     if (!m2x_status_is_success(status)) { return status; }
01008     now = _timer.read_ms();
01009   }
01010   if (now < _local_last_milli) {
01011     // We have already reseted the timestamp, so this cannot happen
01012     // (an HTTP request can take longer than 50 days to finished? You
01013     // must be kidding here). Something else must be wrong here
01014     return E_TIMESTAMP_ERROR;
01015   }
01016   uint32_t diff = now - _local_last_milli;
01017   _local_last_milli = now;
01018   _server_timestamp += (int32_t) (diff / 1000); // Milliseconds to seconds
01019   return fill_iso8601_timestamp(_server_timestamp, (int32_t) (diff % 1000),
01020                                 buffer, length);
01021 }
01022 
01023 #define SIZE_ISO_8601 25
01024 static inline bool is_leap_year(int16_t y) {
01025   return ((1970 + y) > 0) &&
01026       !((1970 + y) % 4) &&
01027       (((1970 + y) % 100) || !((1970 + y) % 400));
01028 }
01029 static inline int32_t days_in_year(int16_t y) {
01030   return is_leap_year(y) ? 366 : 365;
01031 }
01032 static const uint8_t MONTH_DAYS[]={31,28,31,30,31,30,31,31,30,31,30,31};
01033 
01034 static inline int fill_iso8601_timestamp(int32_t timestamp, int32_t milli,
01035                                          char* buffer, int* length) {
01036   int16_t year;
01037   int8_t month, month_length;
01038   int32_t day;
01039   int8_t hour, minute, second;
01040 
01041   if (*length < SIZE_ISO_8601) {
01042     *length = SIZE_ISO_8601;
01043     return E_BUFFER_TOO_SMALL;
01044   }
01045 
01046   second = timestamp % 60;
01047   timestamp /= 60; // now it is minutes
01048 
01049   minute = timestamp % 60;
01050   timestamp /= 60; // now it is hours
01051 
01052   hour = timestamp % 24;
01053   timestamp /= 24; // now it is days
01054 
01055   year = 0;
01056   day = 0;
01057   while ((day += days_in_year(year)) <= timestamp) {
01058     year++;
01059   }
01060   day -= days_in_year(year);
01061   timestamp -= day; // now it is days in this year, starting at 0
01062 
01063   day = 0;
01064   month_length = 0;
01065   for (month = 0; month < 12; month++) {
01066     if (month == 1) {
01067       // February
01068       month_length = is_leap_year(year) ? 29 : 28;
01069     } else {
01070       month_length = MONTH_DAYS[month];
01071     }
01072 
01073     if (timestamp >= month_length) {
01074       timestamp -= month_length;
01075     } else {
01076       break;
01077     }
01078   }
01079   year = 1970 + year;
01080   month++; // offset by 1
01081   day = timestamp + 1;
01082 
01083   int i = 0, j = 0;
01084 
01085   // NOTE: It seems the snprintf implementation in Arduino has bugs,
01086   // we have to manually piece the string together here.
01087 #define INT_TO_STR(v_, width_) \
01088   for (j = 0; j < (width_); j++) { \
01089     buffer[i + (width_) - 1 - j] = '0' + ((v_) % 10); \
01090     (v_) /= 10; \
01091   } \
01092   i += (width_)
01093 
01094   INT_TO_STR(year, 4);
01095   buffer[i++] = '-';
01096   INT_TO_STR(month, 2);
01097   buffer[i++] = '-';
01098   INT_TO_STR(day, 2);
01099   buffer[i++] = 'T';
01100   INT_TO_STR(hour, 2);
01101   buffer[i++] = ':';
01102   INT_TO_STR(minute, 2);
01103   buffer[i++] = ':';
01104   INT_TO_STR(second, 2);
01105   buffer[i++] = '.';
01106   INT_TO_STR(milli, 3);
01107   buffer[i++] = 'Z';
01108   buffer[i++] = '\0';
01109 
01110 #undef INT_TO_STR
01111 
01112   *length = i;
01113   return E_OK;
01114 }
01115 
01116 /* Reader functions */
01117 #ifdef M2X_ENABLE_READER
01118 
01119 // Data structures and functions used to parse stream values
01120 
01121 #define STREAM_BUF_LEN 32
01122 
01123 typedef struct {
01124   uint8_t state;
01125   char at_str[STREAM_BUF_LEN + 1];
01126   char value_str[STREAM_BUF_LEN + 1];
01127   int index;
01128 
01129   stream_value_read_callback callback;
01130   void* context;
01131 } stream_parsing_context_state;
01132 
01133 #define WAITING_AT 0x1
01134 #define GOT_AT 0x2
01135 #define WAITING_VALUE 0x4
01136 #define GOT_VALUE 0x8
01137 
01138 #define GOT_STREAM (GOT_AT | GOT_VALUE)
01139 #define TEST_GOT_STREAM(state_) (((state_) & GOT_STREAM) == GOT_STREAM)
01140 
01141 #define TEST_IS_AT(state_) (((state_) & (WAITING_AT | GOT_AT)) == WAITING_AT)
01142 #define TEST_IS_VALUE(state_) (((state_) & (WAITING_VALUE | GOT_VALUE)) == \
01143                                WAITING_VALUE)
01144 
01145 static void on_stream_key_found(jsonlite_callback_context* context,
01146                                 jsonlite_token* token)
01147 {
01148   stream_parsing_context_state* state =
01149       (stream_parsing_context_state*) context->client_state;
01150   if (strncmp((const char*) token->start, "timestamp", 9) == 0) {
01151     state->state |= WAITING_AT;
01152   } else if ((strncmp((const char*) token->start, "value", 5) == 0) &&
01153              (token->start[5] != 's')) { // get rid of "values"
01154     state->state |= WAITING_VALUE;
01155   }
01156 }
01157 
01158 static void on_stream_value_found(jsonlite_callback_context* context,
01159                                   jsonlite_token* token,
01160                                   int type)
01161 {
01162   stream_parsing_context_state* state =
01163       (stream_parsing_context_state*) context->client_state;
01164 
01165   if (TEST_IS_AT(state->state)) {
01166     strncpy(state->at_str, (const char*) token->start,
01167             MIN(token->end - token->start, STREAM_BUF_LEN));
01168     state->at_str[MIN(token->end - token->start, STREAM_BUF_LEN)] = '\0';
01169     state->state |= GOT_AT;
01170   } else if (TEST_IS_VALUE(state->state)) {
01171     strncpy(state->value_str, (const char*) token->start,
01172             MIN(token->end - token->start, STREAM_BUF_LEN));
01173     state->value_str[MIN(token->end - token->start, STREAM_BUF_LEN)] = '\0';
01174     state->state |= GOT_VALUE;
01175   }
01176 
01177   if (TEST_GOT_STREAM(state->state)) {
01178     state->callback(state->at_str, state->value_str,
01179                     state->index++, state->context, type);
01180     state->state = 0;
01181   }
01182 }
01183 
01184 static void on_stream_string_found(jsonlite_callback_context* context,
01185                                    jsonlite_token* token)
01186 {
01187   on_stream_value_found(context, token, 1);
01188 }
01189 
01190 static void on_stream_number_found(jsonlite_callback_context* context,
01191                                    jsonlite_token* token)
01192 {
01193   on_stream_value_found(context, token, 2);
01194 }
01195 
01196 // Data structures and functions used to parse locations
01197 
01198 #define LOCATION_BUF_LEN 20
01199 
01200 typedef struct {
01201   uint16_t state;
01202   char name_str[LOCATION_BUF_LEN + 1];
01203   double latitude;
01204   double longitude;
01205   double elevation;
01206   char timestamp_str[LOCATION_BUF_LEN + 1];
01207   int index;
01208 
01209   location_read_callback callback;
01210   void* context;
01211 } location_parsing_context_state;
01212 
01213 #define WAITING_NAME 0x1
01214 #define WAITING_LATITUDE 0x2
01215 #define WAITING_LONGITUDE 0x4
01216 #define WAITING_ELEVATION 0x8
01217 #define WAITING_TIMESTAMP 0x10
01218 
01219 #define GOT_NAME 0x20
01220 #define GOT_LATITUDE 0x40
01221 #define GOT_LONGITUDE 0x80
01222 #define GOT_ELEVATION 0x100
01223 #define GOT_TIMESTAMP 0x200
01224 
01225 #define GOT_LOCATION (GOT_NAME | GOT_LATITUDE | GOT_LONGITUDE | GOT_ELEVATION | GOT_TIMESTAMP)
01226 #define TEST_GOT_LOCATION(state_) (((state_) & GOT_LOCATION) == GOT_LOCATION)
01227 
01228 #define TEST_IS_NAME(state_) (((state_) & (WAITING_NAME | GOT_NAME)) == WAITING_NAME)
01229 #define TEST_IS_LATITUDE(state_) (((state_) & (WAITING_LATITUDE | GOT_LATITUDE)) \
01230                                   == WAITING_LATITUDE)
01231 #define TEST_IS_LONGITUDE(state_) (((state_) & (WAITING_LONGITUDE | GOT_LONGITUDE)) \
01232                                    == WAITING_LONGITUDE)
01233 #define TEST_IS_ELEVATION(state_) (((state_) & (WAITING_ELEVATION | GOT_ELEVATION)) \
01234                                    == WAITING_ELEVATION)
01235 #define TEST_IS_TIMESTAMP(state_) (((state_) & (WAITING_TIMESTAMP | GOT_TIMESTAMP)) \
01236                                    == WAITING_TIMESTAMP)
01237 
01238 static void on_location_key_found(jsonlite_callback_context* context,
01239                                   jsonlite_token* token) {
01240   location_parsing_context_state* state =
01241       (location_parsing_context_state*) context->client_state;
01242   if (strncmp((const char*) token->start, "waypoints", 9) == 0) {
01243     // only parses those locations in waypoints, skip the outer one
01244     state->state = 0;
01245   } else if (strncmp((const char*) token->start, "name", 4) == 0) {
01246     state->state |= WAITING_NAME;
01247   } else if (strncmp((const char*) token->start, "latitude", 8) == 0) {
01248     state->state |= WAITING_LATITUDE;
01249   } else if (strncmp((const char*) token->start, "longitude", 9) == 0) {
01250     state->state |= WAITING_LONGITUDE;
01251   } else if (strncmp((const char*) token->start, "elevation", 9) == 0) {
01252     state->state |= WAITING_ELEVATION;
01253   } else if (strncmp((const char*) token->start, "timestamp", 9) == 0) {
01254     state->state |= WAITING_TIMESTAMP;
01255   }
01256 }
01257 
01258 static void on_location_string_found(jsonlite_callback_context* context,
01259                                      jsonlite_token* token) {
01260   location_parsing_context_state* state =
01261       (location_parsing_context_state*) context->client_state;
01262 
01263   if (TEST_IS_NAME(state->state)) {
01264     strncpy(state->name_str, (const char*) token->start,
01265             MIN(token->end - token->start, LOCATION_BUF_LEN));
01266     state->name_str[MIN(token->end - token->start, LOCATION_BUF_LEN)] = '\0';
01267     state->state |= GOT_NAME;
01268   } else if (TEST_IS_LATITUDE(state->state)) {
01269     state->latitude = atof((const char*) token->start);
01270     state->state |= GOT_LATITUDE;
01271   } else if (TEST_IS_LONGITUDE(state->state)) {
01272     state->longitude = atof((const char*) token->start);
01273     state->state |= GOT_LONGITUDE;
01274   } else if (TEST_IS_ELEVATION(state->state)) {
01275     state->elevation = atof((const char*) token->start);
01276     state->state |= GOT_ELEVATION;
01277   } else if (TEST_IS_TIMESTAMP(state->state)) {
01278     strncpy(state->timestamp_str, (const char*) token->start,
01279             MIN(token->end - token->start, LOCATION_BUF_LEN));
01280     state->timestamp_str[MIN(token->end - token->start, LOCATION_BUF_LEN)] = '\0';
01281     state->state |= GOT_TIMESTAMP;
01282   }
01283 
01284   if (TEST_GOT_LOCATION(state->state)) {
01285     state->callback(state->name_str, state->latitude, state->longitude,
01286                     state->elevation, state->timestamp_str, state->index++,
01287                     state->context);
01288     state->state = 0;
01289   }
01290 }
01291 
01292 #ifndef M2X_COMMAND_BUF_LEN
01293 #define M2X_COMMAND_BUF_LEN 40
01294 #endif  /* M2X_COMMAND_BUF_LEN */
01295 
01296 typedef struct {
01297   uint8_t state;
01298   char id_str[M2X_COMMAND_BUF_LEN + 1];
01299   char name_str[M2X_COMMAND_BUF_LEN + 1];
01300   int index;
01301 
01302   m2x_command_read_callback callback;
01303   void* context;
01304 } m2x_command_parsing_context_state;
01305 
01306 #define M2X_COMMAND_WAITING_ID 0x1
01307 #define M2X_COMMAND_GOT_ID 0x2
01308 #define M2X_COMMAND_WAITING_NAME 0x4
01309 #define M2X_COMMAND_GOT_NAME 0x8
01310 
01311 #define M2X_COMMAND_GOT_COMMAND (M2X_COMMAND_GOT_ID | M2X_COMMAND_GOT_NAME)
01312 #define M2X_COMMAND_TEST_GOT_COMMAND(state_) \
01313   (((state_) & M2X_COMMAND_GOT_COMMAND) == M2X_COMMAND_GOT_COMMAND)
01314 
01315 #define M2X_COMMAND_TEST_IS_ID(state_) \
01316   (((state_) & (M2X_COMMAND_WAITING_ID | M2X_COMMAND_GOT_ID)) == \
01317    M2X_COMMAND_WAITING_ID)
01318 #define M2X_COMMAND_TEST_IS_NAME(state_) \
01319   (((state_) & (M2X_COMMAND_WAITING_NAME | M2X_COMMAND_GOT_NAME)) == \
01320    M2X_COMMAND_WAITING_NAME)
01321 
01322 static void m2x_on_command_key_found(jsonlite_callback_context* context,
01323                                      jsonlite_token* token)
01324 {
01325   m2x_command_parsing_context_state* state =
01326       (m2x_command_parsing_context_state*) context->client_state;
01327   if (strncmp((const char*) token->start, "id", 2) == 0) {
01328     state->state |= M2X_COMMAND_WAITING_ID;
01329   } else if (strncmp((const char*) token->start, "name", 4) == 0) {
01330     state->state |= M2X_COMMAND_WAITING_NAME;
01331   }
01332 }
01333 
01334 static void m2x_on_command_value_found(jsonlite_callback_context* context,
01335                                        jsonlite_token* token)
01336 {
01337   m2x_command_parsing_context_state* state =
01338       (m2x_command_parsing_context_state*) context->client_state;
01339   if (M2X_COMMAND_TEST_IS_ID(state->state)) {
01340     strncpy(state->id_str, (const char*) token->start,
01341             MIN(token->end - token->start, M2X_COMMAND_BUF_LEN));
01342     state->id_str[MIN(token->end - token->start, M2X_COMMAND_BUF_LEN)] = '\0';
01343     state->state |= M2X_COMMAND_GOT_ID;
01344   } else if (M2X_COMMAND_TEST_IS_NAME(state->state)) {
01345     strncpy(state->name_str, (const char*) token->start,
01346             MIN(token->end - token->start, M2X_COMMAND_BUF_LEN));
01347     state->name_str[MIN(token->end - token->start, M2X_COMMAND_BUF_LEN)] = '\0';
01348     state->state |= M2X_COMMAND_GOT_NAME;
01349   }
01350 
01351   if (M2X_COMMAND_TEST_GOT_COMMAND(state->state)) {
01352     state->callback(state->id_str, state->name_str,
01353                     state->index++, state->context);
01354     state->state = 0;
01355   }
01356 }
01357 
01358 int M2XStreamClient::listStreamValues(const char* deviceId, const char* streamName,
01359                                       stream_value_read_callback callback, void* context,
01360                                       const char* query) {
01361   if (_client->connect(_host, _port)) {
01362     DBGLN("%s", "Connected to M2X server!");
01363     _client->print("GET ");
01364     if (_path_prefix) { _client->print(_path_prefix); }
01365     _client->print("/v2/devices/");
01366     _client->print(deviceId);
01367     _client->print("/streams/");
01368     print_encoded_string(_client, streamName);
01369     _client->print("/values");
01370 
01371     if (query) {
01372       if (query[0] != '?') {
01373         _client->print('?');
01374       }
01375       _client->print(query);
01376     }
01377 
01378     _client->println(" HTTP/1.0");
01379     writeHttpHeader(-1);
01380   } else {
01381     DBGLN("%s", "ERROR: Cannot connect to M2X server!");
01382     return E_NOCONNECTION;
01383   }
01384   int status = readStatusCode(false);
01385   if (status == 200) {
01386     readStreamValue(callback, context);
01387   }
01388 
01389   close();
01390   return status;
01391 }
01392 
01393 int M2XStreamClient::readLocation(const char* deviceId,
01394                                   location_read_callback callback,
01395                                   void* context) {
01396   if (_client->connect(_host, _port)) {
01397     DBGLN("%s", "Connected to M2X server!");
01398     _client->print("GET ");
01399     if (_path_prefix) { _client->print(_path_prefix); }
01400     _client->print("/v2/devices/");
01401     _client->print(deviceId);
01402     _client->println("/location HTTP/1.0");
01403 
01404     writeHttpHeader(-1);
01405   } else {
01406     DBGLN("%s", "ERROR: Cannot connect to M2X server!");
01407     return E_NOCONNECTION;
01408   }
01409   int status = readStatusCode(false);
01410   if (status == 200) {
01411     readLocation(callback, context);
01412   }
01413 
01414   close();
01415   return status;
01416 }
01417 
01418 int M2XStreamClient::listCommands(const char* deviceId,
01419                                   m2x_command_read_callback callback,
01420                                   void* context,
01421                                   const char* query) {
01422   if (_client->connect(_host, _port)) {
01423     DBGLN("%s", "Connected to M2X server!");
01424     _client->print("GET ");
01425     if (_path_prefix) { _client->print(_path_prefix); }
01426     _client->print("/v2/devices/");
01427     _client->print(deviceId);
01428     _client->print("/commands");
01429 
01430     if (query) {
01431       if (query[0] != '?') {
01432         _client->print('?');
01433       }
01434       _client->print(query);
01435     }
01436 
01437     _client->println(" HTTP/1.0");
01438     writeHttpHeader(-1);
01439   } else {
01440     DBGLN("%s", "ERROR: Cannot connect to M2X server!");
01441     return E_NOCONNECTION;
01442   }
01443   int status = readStatusCode(false);
01444   if (status == 200) {
01445     readCommand(callback, context);
01446   }
01447 
01448   close();
01449   return status;
01450 }
01451 
01452 
01453 int M2XStreamClient::readStreamValue(stream_value_read_callback callback,
01454                                      void* context) {
01455   const int BUF_LEN = 64;
01456   char buf[BUF_LEN];
01457 
01458   int length = readContentLength();
01459   if (length < 0) {
01460     close();
01461     return length;
01462   }
01463 
01464   int index = skipHttpHeader();
01465   if (index != E_OK) {
01466     close();
01467     return index;
01468   }
01469   index = 0;
01470 
01471   stream_parsing_context_state state;
01472   state.state = state.index = 0;
01473   state.callback = callback;
01474   state.context = context;
01475 
01476   jsonlite_parser_callbacks cbs = jsonlite_default_callbacks;
01477   cbs.key_found = on_stream_key_found;
01478   cbs.number_found = on_stream_number_found;
01479   cbs.string_found = on_stream_string_found;
01480   cbs.context.client_state = &state;
01481 
01482   jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5));
01483   jsonlite_parser_set_callback(p, &cbs);
01484 
01485   jsonlite_result result = jsonlite_result_unknown;
01486   while (index < length) {
01487     int i = 0;
01488 
01489     DBG("%s", "Received Data: ");
01490     while ((i < BUF_LEN) && _client->available()) {
01491       buf[i++] = _client->read();
01492       DBG("%c", buf[i - 1]);
01493     }
01494     DBGLNEND;
01495 
01496     if ((!_client->connected()) &&
01497         (!_client->available()) &&
01498         ((index + i) < length)) {
01499       jsonlite_parser_release(p);
01500       close();
01501       return E_NOCONNECTION;
01502     }
01503 
01504     result = jsonlite_parser_tokenize(p, buf, i);
01505     if ((result != jsonlite_result_ok) &&
01506         (result != jsonlite_result_end_of_stream)) {
01507       jsonlite_parser_release(p);
01508       close();
01509       return E_JSON_INVALID;
01510     }
01511 
01512     index += i;
01513   }
01514 
01515   jsonlite_parser_release(p);
01516   close();
01517   return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID);
01518 }
01519 
01520 int M2XStreamClient::readLocation(location_read_callback callback,
01521                                   void* context) {
01522   const int BUF_LEN = 40;
01523   char buf[BUF_LEN];
01524 
01525   int length = readContentLength();
01526   if (length < 0) {
01527     close();
01528     return length;
01529   }
01530 
01531   int index = skipHttpHeader();
01532   if (index != E_OK) {
01533     close();
01534     return index;
01535   }
01536   index = 0;
01537 
01538   location_parsing_context_state state;
01539   state.state = state.index = 0;
01540   state.callback = callback;
01541   state.context = context;
01542 
01543   jsonlite_parser_callbacks cbs = jsonlite_default_callbacks;
01544   cbs.key_found = on_location_key_found;
01545   cbs.string_found = on_location_string_found;
01546   cbs.context.client_state = &state;
01547 
01548   jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5));
01549   jsonlite_parser_set_callback(p, &cbs);
01550 
01551   jsonlite_result result = jsonlite_result_unknown;
01552   while (index < length) {
01553     int i = 0;
01554 
01555     DBG("%s", "Received Data: ");
01556     while ((i < BUF_LEN) && _client->available()) {
01557       buf[i++] = _client->read();
01558       DBG("%c", buf[i - 1]);
01559     }
01560     DBGLNEND;
01561 
01562     if ((!_client->connected()) &&
01563         (!_client->available()) &&
01564         ((index + i) < length)) {
01565       jsonlite_parser_release(p);
01566       close();
01567       return E_NOCONNECTION;
01568     }
01569 
01570     result = jsonlite_parser_tokenize(p, buf, i);
01571     if ((result != jsonlite_result_ok) &&
01572         (result != jsonlite_result_end_of_stream)) {
01573       jsonlite_parser_release(p);
01574       close();
01575       return E_JSON_INVALID;
01576     }
01577 
01578     index += i;
01579   }
01580 
01581   jsonlite_parser_release(p);
01582   close();
01583   return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID);
01584 }
01585 
01586 int M2XStreamClient::readCommand(m2x_command_read_callback callback,
01587                                  void* context) {
01588   const int BUF_LEN = 60;
01589   char buf[BUF_LEN];
01590 
01591   int length = readContentLength();
01592   if (length < 0) {
01593     close();
01594     return length;
01595   }
01596 
01597   int index = skipHttpHeader();
01598   if (index != E_OK) {
01599     close();
01600     return index;
01601   }
01602   index = 0;
01603 
01604   m2x_command_parsing_context_state state;
01605   state.state = 0;
01606   state.index = 0;
01607   state.callback = callback;
01608   state.context = context;
01609 
01610   jsonlite_parser_callbacks cbs = jsonlite_default_callbacks;
01611   cbs.key_found = m2x_on_command_key_found;
01612   cbs.string_found = m2x_on_command_value_found;
01613   cbs.context.client_state = &state;
01614 
01615   jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5));
01616   jsonlite_parser_set_callback(p, &cbs);
01617 
01618   jsonlite_result result = jsonlite_result_unknown;
01619   while (index < length) {
01620     int i = 0;
01621 
01622     DBG("%s", "Received Data: ");
01623     while ((i < BUF_LEN) && _client->available()) {
01624       buf[i++] = _client->read();
01625       DBG("%c", buf[i - 1]);
01626     }
01627     DBGLNEND;
01628 
01629     if ((!_client->connected()) &&
01630         (!_client->available()) &&
01631         ((index + i) < length)) {
01632       jsonlite_parser_release(p);
01633       close();
01634       return E_NOCONNECTION;
01635     }
01636 
01637     result = jsonlite_parser_tokenize(p, buf, i);
01638     if ((result != jsonlite_result_ok) &&
01639         (result != jsonlite_result_end_of_stream)) {
01640       jsonlite_parser_release(p);
01641       close();
01642       return E_JSON_INVALID;
01643     }
01644 
01645     index += i;
01646   }
01647 
01648   jsonlite_parser_release(p);
01649   close();
01650   return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID);
01651 }
01652 
01653 #endif  /* M2X_ENABLE_READER */
01654 
01655 #endif  /* M2XStreamClient_h */