M2X MQTT Client for ARM MBED

Dependents:   WNCInterface_M2XMQTTdemo

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers M2XMQTTClient.h Source File

M2XMQTTClient.h

00001 #ifndef M2XMQTTCLIENT_H_
00002 #define M2XMQTTCLIENT_H_
00003 
00004 #if (!defined(MBED_PLATFORM)) && (!defined(LINUX_PLATFORM))
00005 #error "Platform definition is missing!"
00006 #endif
00007 
00008 #define M2X_VERSION "0.1.0"
00009 
00010 #ifdef MBED_PLATFORM
00011 #include "m2x-mbed.h"
00012 #endif /* MBED_PLATFORM */
00013 
00014 #ifdef LINUX_PLATFORM
00015 #include "m2x-linux.h"
00016 #endif /* LINUX_PLATFORM */
00017 
00018 /* If we don't have DBG defined, provide dump implementation */
00019 #ifndef DBG
00020 #define DBG(fmt_, data_)
00021 #define DBGLN(fmt_, data_)
00022 #define DBGLNEND
00023 #endif  /* DBG */
00024 
00025 #define MIN(a, b) (((a) > (b))?(b):(a))
00026 #define TO_HEX(t_) ((char) (((t_) > 9) ? ((t_) - 10 + 'A') : ((t_) + '0')))
00027 #define MAX_DOUBLE_DIGITS 7
00028 
00029 /* For tolower */
00030 #include <ctype.h>
00031 
00032 static const int E_OK = 0;
00033 static const int E_NOCONNECTION = -1;
00034 static const int E_DISCONNECTED = -2;
00035 static const int E_NOTREACHABLE = -3;
00036 static const int E_INVALID = -4;
00037 static const int E_JSON_INVALID = -5;
00038 static const int E_BUFFER_TOO_SMALL = -6;
00039 static const int E_TIMESTAMP_ERROR = -8;
00040 
00041 static const char* DEFAULT_M2X_HOST = "api-m2x.att.com";
00042 static const int DEFAULT_M2X_PORT = 1883;
00043 
00044 static inline bool m2x_status_is_success(int status) {
00045   return (status == E_OK) || (status >= 200 && status <= 299);
00046 }
00047 
00048 static inline bool m2x_status_is_client_error(int status) {
00049   return status >= 400 && status <= 499;
00050 }
00051 
00052 static inline bool m2x_status_is_server_error(int status) {
00053   return status >= 500 && status <= 599;
00054 }
00055 
00056 static inline bool m2x_status_is_error(int status) {
00057   return m2x_status_is_client_error(status) ||
00058       m2x_status_is_server_error(status);
00059 }
00060 
00061 // Null Print class used to calculate length to print
00062 class NullPrint : public Print {
00063 public:
00064   size_t counter;
00065 
00066   virtual size_t write(uint8_t b) {
00067     counter++;
00068     return 1;
00069   }
00070 
00071   virtual size_t write(const uint8_t* buf, size_t size) {
00072     counter += size;
00073     return size;
00074   }
00075 };
00076 
00077 // Handy helper class for printing MQTT payload using a Print
00078 class MMQTTPrint : public Print {
00079 public:
00080   mmqtt_connection *connection;
00081   mmqtt_s_puller puller;
00082 
00083   virtual size_t write(uint8_t b) {
00084     return write(&b, 1);
00085   }
00086 
00087   virtual size_t write(const uint8_t* buf, size_t size) {
00088     return mmqtt_s_encode_buffer(connection, puller, buf, size) == MMQTT_STATUS_OK ? size : -1;
00089   }
00090 };
00091 
00092 class M2XMQTTClient {
00093 public:
00094   M2XMQTTClient(Client* client,
00095                 const char* key,
00096                 void (* idlefunc)(void) = NULL,
00097                 bool keepalive = true,
00098                 const char* host = DEFAULT_M2X_HOST,
00099                 int port = DEFAULT_M2X_PORT,
00100                 const char* path_prefix = NULL);
00101 
00102   // Push data stream value using PUT request, returns the HTTP status code
00103   // NOTE: if you want to update by a serial, use "serial/<serial ID>" as
00104   // the device ID here.
00105   template <class T>
00106   int updateStreamValue(const char* deviceId, const char* streamName, T value);
00107 
00108   // Post multiple values to M2X all at once.
00109   // +deviceId+ - id of the device to post values
00110   // +streamNum+ - Number of streams to post
00111   // +names+ - Array of stream names, the length of the array should
00112   // be exactly +streamNum+
00113   // +counts+ - Array of +streamNum+ length, each item in this array
00114   // containing the number of values we want to post for each stream
00115   // +ats+ - Timestamps for each value, the length of this array should
00116   // be the some of all values in +counts+, for the first +counts[0]+
00117   // items, the values belong to the first stream, for the following
00118   // +counts[1]+ number of items, the values belong to the second stream,
00119   // etc. Notice that timestamps are required here: you must provide
00120   // a timestamp for each value posted.
00121   // +values+ - Values to post. This works the same way as +ats+, the
00122   // first +counts[0]+ number of items contain values to post to the first
00123   // stream, the succeeding +counts[1]+ number of items contain values
00124   // for the second stream, etc. The length of this array should be
00125   // the sum of all values in +counts+ array.
00126   // NOTE: if you want to update by a serial, use "serial/<serial ID>" as
00127   // the device ID here.
00128   template <class T>
00129   int postDeviceUpdates(const char* deviceId, int streamNum,
00130                         const char* names[], const int counts[],
00131                         const char* ats[], T values[]);
00132 
00133   // Post multiple values of a single device at once.
00134   // +deviceId+ - id of the device to post values
00135   // +streamNum+ - Number of streams to post
00136   // +names+ - Array of stream names, the length of the array should
00137   // be exactly +streamNum+
00138   // +values+ - Array of values to post, the length of the array should
00139   // be exactly +streamNum+. Notice that the array of +values+ should
00140   // match the array of +names+, and that the ith value in +values+ is
00141   // exactly the value to post for the ith stream name in +names+
00142   // NOTE: if you want to update by a serial, use "serial/<serial ID>" as
00143   // the device ID here.
00144   template <class T>
00145   int postDeviceUpdate(const char* deviceId, int streamNum,
00146                        const char* names[], T values[],
00147                        const char* at = NULL);
00148 
00149   // Update datasource location
00150   // NOTE: On an Arduino Uno and other ATMEGA based boards, double has
00151   // 4-byte (32 bits) precision, which is the same as float. So there's
00152   // no natural double-precision floating number on these boards. With
00153   // a float value, we have a precision of roughly 7 digits, that means
00154   // either 5 or 6 digits after the floating point. According to wikipedia,
00155   // a difference of 0.00001 will give us ~1.1132m distance. If this
00156   // precision is good for you, you can use the double-version we provided
00157   // here. Otherwise, you may need to use the string-version and do the
00158   // actual conversion by yourselves.
00159   // However, with an Arduino Due board, double has 8-bytes (64 bits)
00160   // precision, which means you are free to use the double-version only
00161   // without any precision problems.
00162   // Returned value is the http status code.
00163   // NOTE: if you want to update by a serial, use "serial/<serial ID>" as
00164   // the device ID here.
00165   template <class T>
00166   int updateLocation(const char* deviceId, const char* name,
00167                      T latitude, T longitude, T elevation);
00168 
00169   // Delete values from a data stream
00170   // You will need to provide from and end date/time strings in the ISO8601
00171   // format "yyyy-mm-ddTHH:MM:SS.SSSZ" where
00172   //   yyyy: the year
00173   //   mm: the month
00174   //   dd: the day
00175   //   HH: the hour (24 hour format)
00176   //   MM: the minute
00177   //   SS.SSS: the seconds (to the millisecond)
00178   // NOTE: the time is given in Zulu (GMT)
00179   // M2X will delete all values within the from to end date/time range.
00180   // The status code is 204 on success and 400 on a bad request (e.g. the
00181   // timestamp is not in ISO8601 format or the from timestamp is not less than
00182   // or equal to the end timestamp.
00183   int deleteValues(const char* deviceId, const char* streamName,
00184                    const char* from, const char* end);
00185 
00186   // Following fields are public so mmqtt callback functions can access directly
00187   Client* _client;
00188 private:
00189   struct mmqtt_connection _connection;
00190   const char* _key;
00191   uint16_t _key_length;
00192   bool _connected;
00193   bool _keepalive;
00194   const char* _host;
00195   int _port;
00196   void (* _idlefunc)(void);
00197   const char* _path_prefix;
00198   NullPrint _null_print;
00199   MMQTTPrint _mmqtt_print;
00200   int16_t _current_id;
00201 
00202   int connectToServer();
00203 
00204   template <class T>
00205   int printUpdateStreamValuePayload(Print* print, const char* deviceId,
00206                                     const char* streamName, T value);
00207 
00208   template <class T>
00209   int printPostDeviceUpdatesPayload(Print* print,
00210                                     const char* deviceId, int streamNum,
00211                                     const char* names[], const int counts[],
00212                                     const char* ats[], T values[]);
00213 
00214   template <class T>
00215   int printPostDeviceUpdatePayload(Print* print,
00216                                    const char* deviceId, int streamNum,
00217                                    const char* names[], T values[],
00218                                    const char* at = NULL);
00219 
00220   template <class T>
00221   int printUpdateLocationPayload(Print* print,
00222                                  const char* deviceId, const char* name,
00223                                  T latitude, T longitude, T elevation);
00224 
00225   int printDeleteValuesPayload(Print* print,
00226                                const char* deviceId, const char* streamName,
00227                                const char* from, const char* end);
00228 
00229   int readStatusCode();
00230   void close();
00231 };
00232 
00233 // Implementations
00234 M2XMQTTClient::M2XMQTTClient(Client* client,
00235                              const char* key,
00236                              void (* idlefunc)(void),
00237                              bool keepalive,
00238                              const char* host,
00239                              int port,
00240                              const char* path_prefix) : _client(client),
00241                                                         _key(key),
00242                                                         _idlefunc(idlefunc),
00243                                                         _keepalive(keepalive),
00244                                                         _connected(false),
00245                                                         _host(host),
00246                                                         _port(port),
00247                                                         _path_prefix(path_prefix),
00248                                                         _null_print(),
00249                                                         _mmqtt_print(),
00250                                                         _current_id(0) {
00251   _key_length = strlen(_key);
00252 }
00253 
00254 mmqtt_status_t m2x_mmqtt_puller(struct mmqtt_connection *connection) {
00255   const uint8_t *data = NULL;
00256   mmqtt_ssize_t length = 0;
00257   mmqtt_status_t status;
00258   M2XMQTTClient *client = (M2XMQTTClient *) connection->connection;
00259   Client *c = client->_client;
00260   struct mmqtt_stream *stream = mmqtt_connection_pullable_stream(connection);
00261   if (stream == NULL) { return MMQTT_STATUS_NOT_PULLABLE; }
00262 
00263   status = mmqtt_stream_external_pullable(stream, &data, &length);
00264   if (status != MMQTT_STATUS_OK) { return status; }
00265 
00266   length = c->write(data, length);
00267   if (length < 0) {
00268     c->stop();
00269     return MMQTT_STATUS_BROKEN_CONNECTION;
00270   }
00271   mmqtt_stream_external_pull(stream, length);
00272   if (mmqtt_stream_running(stream) == MMQTT_STATUS_DONE) {
00273     mmqtt_connection_release_write_stream(connection, stream);
00274   }
00275   return MMQTT_STATUS_OK;
00276 }
00277 
00278 mmqtt_status_t m2x_mmqtt_pusher(struct mmqtt_connection *connection, mmqtt_ssize_t max_size) {
00279   uint8_t *data = NULL;
00280   mmqtt_ssize_t length = 0, i = 0;
00281   mmqtt_status_t status;
00282   M2XMQTTClient *client = (M2XMQTTClient *) connection->connection;
00283   Client *c = client->_client;
00284   struct mmqtt_stream *stream = mmqtt_connection_pushable_stream(connection);
00285   if (stream == NULL) { return MMQTT_STATUS_NOT_PUSHABLE; }
00286 
00287   status = mmqtt_stream_external_pushable(stream, &data, &length);
00288   if (status != MMQTT_STATUS_OK) { return status; }
00289   length = min(length, max_size);
00290 
00291   /* Maybe we need another field in signature documenting how much data we want,
00292    * so we can handle end condition gracefully? Not 100% if `left` field in
00293    * mmqtt_stream is enough
00294    */
00295   while (i < length && c->available()) {
00296     data[i++] = c->read();
00297   }
00298   mmqtt_stream_external_push(stream, i);
00299   return MMQTT_STATUS_OK;
00300 }
00301 
00302 size_t m2x_mjson_reader(struct mjson_ctx *ctx, char *data, size_t limit)
00303 {
00304   mmqtt_connection *connection = (mmqtt_connection *) ctx->userdata;
00305   return mmqtt_s_decode_buffer(connection, m2x_mmqtt_pusher, (uint8_t *) data, limit,
00306                                limit) == MMQTT_STATUS_OK ? limit : 0;
00307 }
00308 
00309 int M2XMQTTClient::connectToServer() {
00310   mmqtt_status_t status;
00311   struct mmqtt_p_connect_header connect_header;
00312   struct mmqtt_p_connack_header connack_header;
00313   uint32_t packet_length;
00314   uint8_t flag;
00315   uint16_t length;
00316   uint8_t name[6];
00317 
00318   if (_client->connect(_host, _port)) {
00319     DBGLN("%s", F("Connected to M2X MQTT server!"));
00320     mmqtt_connection_init(&_connection, this);
00321     /* Send CONNECT packet first */
00322     connect_header.name = name;
00323     strncpy((char *)name, F("MQIsdp"), 6);
00324     connect_header.name_length = connect_header.name_max_length = 6;
00325     connect_header.protocol_version = 3;
00326     /* Clean session with username set */
00327     connect_header.flags = 0x82;
00328     connect_header.keepalive = 60;
00329     packet_length = mmqtt_s_connect_header_encoded_length(&connect_header) +
00330                     mmqtt_s_string_encoded_length(_key_length) +
00331                     mmqtt_s_string_encoded_length(_key_length);
00332     status = mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller,
00333                                          MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_CONNECT),
00334                                          packet_length);
00335     if (status != MMQTT_STATUS_OK) {
00336       DBG("%s", F("Error sending connect packet fixed header: "));
00337       DBGLN("%d", status);
00338       _client->stop();
00339       return E_DISCONNECTED;
00340     }
00341     status = mmqtt_s_encode_connect_header(&_connection, m2x_mmqtt_puller, &connect_header);
00342     if (status != MMQTT_STATUS_OK) {
00343       DBG("%s", F("Error sending connect packet variable header: "));
00344       DBGLN("%d", status);
00345       _client->stop();
00346       return E_DISCONNECTED;
00347     }
00348     /* Client ID */
00349     status = mmqtt_s_encode_string(&_connection, m2x_mmqtt_puller,
00350                                    (const uint8_t *) _key, _key_length);
00351     if (status != MMQTT_STATUS_OK) {
00352       DBG("%s", F("Error sending connect packet payload: "));
00353       DBGLN("%d", status);
00354       _client->stop();
00355       return E_DISCONNECTED;
00356     }
00357     /* Username */
00358     status = mmqtt_s_encode_string(&_connection, m2x_mmqtt_puller,
00359                                    (const uint8_t *) _key, _key_length);
00360     if (status != MMQTT_STATUS_OK) {
00361       DBG("%s", F("Error sending connect packet payload: "));
00362       DBGLN("%d", status);
00363       _client->stop();
00364       return E_DISCONNECTED;
00365     }
00366     /* Check CONNACK packet */
00367     do {
00368       status = mmqtt_s_decode_fixed_header(&_connection, m2x_mmqtt_pusher,
00369                                            &flag, &packet_length);
00370       if (status != MMQTT_STATUS_OK) {
00371         DBG("%s", F("Error decoding connack fixed header: "));
00372         DBGLN("%d", status);
00373         _client->stop();
00374         return E_DISCONNECTED;
00375       }
00376       if (MMQTT_UNPACK_MESSAGE_TYPE(flag) != MMQTT_MESSAGE_TYPE_CONNACK) {
00377         status = mmqtt_s_skip_buffer(&_connection, m2x_mmqtt_pusher, packet_length);
00378         if (status != MMQTT_STATUS_OK) {
00379           DBG("%s", F("Error skipping non-connack packet: "));
00380           DBGLN("%d", status);
00381           _client->stop();
00382           return E_DISCONNECTED;
00383         }
00384       }
00385     } while (MMQTT_UNPACK_MESSAGE_TYPE(flag) != MMQTT_MESSAGE_TYPE_CONNACK);
00386     status = mmqtt_s_decode_connack_header(&_connection, m2x_mmqtt_pusher,
00387                                            &connack_header);
00388     if (status != MMQTT_STATUS_OK) {
00389       DBG("%s", F("Error decoding connack variable header: "));
00390       DBGLN("%d", status);
00391       _client->stop();
00392       return E_DISCONNECTED;
00393     }
00394     if (connack_header.return_code != 0x0) {
00395       DBG("%s", F("CONNACK return code is not accepted: "));
00396       DBGLN("%d", connack_header.return_code);
00397       _client->stop();
00398       return E_DISCONNECTED;
00399     }
00400     /* Send SUBSCRIBE packet*/
00401     length = _key_length + 15 + 4;
00402     status = mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller,
00403                                          MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_SUBSCRIBE) | 0x2,
00404                                          length);
00405     if (status != MMQTT_STATUS_OK) {
00406       DBG("%s", F("Error sending subscribe packet fixed header: "));
00407       DBGLN("%d", status);
00408       _client->stop();
00409       return E_DISCONNECTED;
00410     }
00411     // Subscribe packet must use QoS 1
00412     mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, 0);
00413     mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 14);
00414     mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4);
00415     mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length);
00416     // The extra one is QoS, added here to save a function call
00417     mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/responses\0"), 11);
00418     /* Check SUBACK packet */
00419     do {
00420       status = mmqtt_s_decode_fixed_header(&_connection, m2x_mmqtt_pusher,
00421                                            &flag, &packet_length);
00422       if (status != MMQTT_STATUS_OK) {
00423         DBG("%s", F("Error decoding suback fixed header: "));
00424         DBGLN("%d", status);
00425         _client->stop();
00426         return E_DISCONNECTED;
00427       }
00428       if (MMQTT_UNPACK_MESSAGE_TYPE(flag) != MMQTT_MESSAGE_TYPE_SUBACK) {
00429         status = mmqtt_s_skip_buffer(&_connection, m2x_mmqtt_pusher, packet_length);
00430         if (status != MMQTT_STATUS_OK) {
00431           DBG("%s", F("Error skipping packet: "));
00432           DBGLN("%d", status);
00433           _client->stop();
00434           return E_DISCONNECTED;
00435         }
00436       }
00437     } while (MMQTT_UNPACK_MESSAGE_TYPE(flag) != MMQTT_MESSAGE_TYPE_SUBACK);
00438     status = mmqtt_s_skip_buffer(&_connection, m2x_mmqtt_pusher, packet_length);
00439     if (status != MMQTT_STATUS_OK) {
00440       DBG("%s", F("Error skipping suback packet: "));
00441       DBGLN("%d", status);
00442       _client->stop();
00443       return E_DISCONNECTED;
00444     }
00445     _mmqtt_print.connection = &_connection;
00446     _mmqtt_print.puller = m2x_mmqtt_puller;
00447     _connected = true;
00448     return E_OK;
00449   } else {
00450     DBGLN("%s", F("ERROR: Cannot connect to M2X MQTT server!"));
00451     return E_NOCONNECTION;
00452   }
00453 }
00454 
00455 template <class T>
00456 int M2XMQTTClient::updateStreamValue(const char* deviceId, const char* streamName, T value) {
00457   int length;
00458   if (!_connected) {
00459     if (connectToServer() != E_OK) {
00460       DBGLN("%s", "ERROR: Cannot connect to M2X server!");
00461       return E_NOCONNECTION;
00462     }
00463   }
00464   _current_id++;
00465   length = printUpdateStreamValuePayload(&_null_print, deviceId, streamName, value);
00466   mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller,
00467                               MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_PUBLISH),
00468                               length + _key_length + 15);
00469   mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 13);
00470   mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4);
00471   mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length);
00472   mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/requests"), 9);
00473   printUpdateStreamValuePayload(&_mmqtt_print, deviceId, streamName, value);
00474   return readStatusCode();
00475 }
00476 
00477 template <class T>
00478 int M2XMQTTClient::printUpdateStreamValuePayload(Print* print, const char* deviceId,
00479                                                  const char* streamName, T value) {
00480   int bytes = 0;
00481   bytes += print->print(F("{\"id\":\""));
00482   bytes += print->print(_current_id);
00483   bytes += print->print(F("\",\"method\":\"PUT\",\"resource\":\""));
00484   if (_path_prefix) { bytes += print->print(_path_prefix); }
00485   bytes += print->print(F("/v2/devices/"));
00486   bytes += print->print(deviceId);
00487   bytes += print->print(F("/streams/"));
00488   bytes += print->print(streamName);
00489   bytes += print->print(F("/value"));
00490   bytes += print->print(F("\",\"agent\":\""));
00491   bytes += print->print(USER_AGENT);
00492   bytes += print->print(F("\",\"body\":"));
00493   bytes += print->print(F("{\"value\":\""));
00494   bytes += print->print(value);
00495   bytes += print->print(F("\"}"));
00496   bytes += print->print(F("}"));
00497   return bytes;
00498 }
00499 
00500 template <class T>
00501 int M2XMQTTClient::postDeviceUpdates(const char* deviceId, int streamNum,
00502                                      const char* names[], const int counts[],
00503                                      const char* ats[], T values[]) {
00504   int length;
00505   if (!_connected) {
00506     if (connectToServer() != E_OK) {
00507       DBGLN("%s", "ERROR: Cannot connect to M2X server!");
00508       return E_NOCONNECTION;
00509     }
00510   }
00511   _current_id++;
00512   length = printPostDeviceUpdatesPayload(&_null_print, deviceId, streamNum,
00513                                          names, counts, ats, values);
00514   mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller,
00515                               MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_PUBLISH),
00516                               length + _key_length + 15);
00517   mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 13);
00518   mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4);
00519   mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length);
00520   mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/requests"), 9);
00521   printPostDeviceUpdatesPayload(&_mmqtt_print, deviceId, streamNum,
00522                                 names, counts, ats, values);
00523   return readStatusCode();
00524 }
00525 
00526 template <class T>
00527 int M2XMQTTClient::printPostDeviceUpdatesPayload(Print* print,
00528                                                  const char* deviceId, int streamNum,
00529                                                  const char* names[], const int counts[],
00530                                                  const char* ats[], T values[]) {
00531   int bytes = 0, value_index = 0, i, j;
00532   bytes += print->print(F("{\"id\":\""));
00533   bytes += print->print(_current_id);
00534   bytes += print->print(F("\",\"method\":\"POST\",\"resource\":\""));
00535   if (_path_prefix) { bytes += print->print(_path_prefix); }
00536   bytes += print->print(F("/v2/devices/"));
00537   bytes += print->print(deviceId);
00538   bytes += print->print(F("/updates"));
00539   bytes += print->print(F("\",\"agent\":\""));
00540   bytes += print->print(USER_AGENT);
00541   bytes += print->print(F("\",\"body\":"));
00542   bytes += print->print(F("{\"values\":{"));
00543   for (i = 0; i < streamNum; i++) {
00544     bytes += print->print(F("\""));
00545     bytes += print->print(names[i]);
00546     bytes += print->print(F("\":["));
00547     for (j = 0; j < counts[i]; j++) {
00548       bytes += print->print(F("{\"timestamp\": \""));
00549       bytes += print->print(ats[value_index]);
00550       bytes += print->print(F("\",\"value\": \""));
00551       bytes += print->print(values[value_index]);
00552       bytes += print->print(F("\"}"));
00553       if (j < counts[i] - 1) { bytes += print->print(F(",")); }
00554       value_index++;
00555     }
00556     bytes += print->print(F("]"));
00557     if (i < streamNum - 1) { bytes += print->print(F(",")); }
00558   }
00559   bytes += print->print(F(("}}}")));
00560   return bytes;
00561 }
00562 
00563 template <class T>
00564 int M2XMQTTClient::postDeviceUpdate(const char* deviceId, int streamNum,
00565                                     const char* names[], T values[],
00566                                     const char* at) {
00567   int length;
00568   if (!_connected) {
00569     if (connectToServer() != E_OK) {
00570       DBGLN("%s", "ERROR: Cannot connect to M2X server!");
00571       return E_NOCONNECTION;
00572     }
00573   }
00574   _current_id++;
00575   length = printPostDeviceUpdatePayload(&_null_print, deviceId, streamNum,
00576                                         names, values, at);
00577   mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller,
00578                               MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_PUBLISH),
00579                               length + _key_length + 15);
00580   mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 13);
00581   mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4);
00582   mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length);
00583   mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/requests"), 9);
00584   printPostDeviceUpdatePayload(&_mmqtt_print, deviceId, streamNum,
00585                                names, values, at);
00586   return readStatusCode();
00587 }
00588 
00589 template <class T>
00590 int M2XMQTTClient::printPostDeviceUpdatePayload(Print* print,
00591                                                 const char* deviceId, int streamNum,
00592                                                 const char* names[], T values[],
00593                                                 const char* at) {
00594   int bytes = 0, i;
00595   bytes += print->print(F("{\"id\":\""));
00596   bytes += print->print(_current_id);
00597   bytes += print->print(F("\",\"method\":\"POST\",\"resource\":\""));
00598   if (_path_prefix) { bytes += print->print(_path_prefix); }
00599   bytes += print->print(F("/v2/devices/"));
00600   bytes += print->print(deviceId);
00601   bytes += print->print(F("/update"));
00602   bytes += print->print(F("\",\"agent\":\""));
00603   bytes += print->print(USER_AGENT);
00604   bytes += print->print(F("\",\"body\":"));
00605   bytes += print->print(F("{\"values\":{"));
00606   for (int i = 0; i < streamNum; i++) {
00607     bytes += print->print(F("\""));
00608     bytes += print->print(names[i]);
00609     bytes += print->print(F("\": \""));
00610     bytes += print->print(values[i]);
00611     bytes += print->print(F("\""));
00612     if (i < streamNum - 1) { bytes += print->print(F(",")); }
00613   }
00614   bytes += print->print(F("}"));
00615   if (at != NULL) {
00616     bytes += print->print(F(",\"timestamp\":\""));
00617     bytes += print->print(at);
00618     bytes += print->print(F("\""));
00619   }
00620   bytes += print->print(F(("}")));
00621   return bytes;
00622 }
00623 
00624 template <class T>
00625 int M2XMQTTClient::updateLocation(const char* deviceId, const char* name,
00626                                   T latitude, T longitude, T elevation) {
00627   int length;
00628   if (!_connected) {
00629     if (connectToServer() != E_OK) {
00630       DBGLN("%s", "ERROR: Cannot connect to M2X server!");
00631       return E_NOCONNECTION;
00632     }
00633   }
00634   _current_id++;
00635   length = printUpdateLocationPayload(&_null_print, deviceId, name,
00636                                       latitude, longitude, elevation);
00637   mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller,
00638                               MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_PUBLISH),
00639                               length + _key_length + 15);
00640   mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 13);
00641   mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4);
00642   mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length);
00643   mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/requests"), 9);
00644   printUpdateLocationPayload(&_mmqtt_print, deviceId, name,
00645                              latitude, longitude, elevation);
00646   return readStatusCode();
00647 }
00648 
00649 template <class T>
00650 int M2XMQTTClient::printUpdateLocationPayload(Print* print,
00651                                               const char* deviceId, const char* name,
00652                                               T latitude, T longitude, T elevation) {
00653   int bytes = 0;
00654   bytes += print->print(F("{\"id\":\""));
00655   bytes += print->print(_current_id);
00656   bytes += print->print(F("\",\"method\":\"PUT\",\"resource\":\""));
00657   if (_path_prefix) { bytes += print->print(_path_prefix); }
00658   bytes += print->print(F("/v2/devices/"));
00659   bytes += print->print(deviceId);
00660   bytes += print->print(F("/location"));
00661   bytes += print->print(F("\",\"agent\":\""));
00662   bytes += print->print(USER_AGENT);
00663   bytes += print->print(F("\",\"body\":{\"name\":\""));
00664   bytes += print->print(name);
00665   bytes += print->print(F("\",\"latitude\":\""));
00666   bytes += print->print(latitude);
00667   bytes += print->print(F("\",\"longitude\":\""));
00668   bytes += print->print(longitude);
00669   bytes += print->print(F("\",\"elevation\":\""));
00670   bytes += print->print(elevation);
00671   bytes += print->print(F(("\"}}")));
00672   return bytes;
00673 }
00674 
00675 int M2XMQTTClient::deleteValues(const char* deviceId, const char* streamName,
00676                                 const char* from, const char* end) {
00677   int length;
00678   if (!_connected) {
00679     if (connectToServer() != E_OK) {
00680       DBGLN("%s", "ERROR: Cannot connect to M2X server!");
00681       return E_NOCONNECTION;
00682     }
00683   }
00684   _current_id++;
00685   length = printDeleteValuesPayload(&_null_print, deviceId, streamName, from, end);
00686   mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller,
00687                               MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_PUBLISH),
00688                               length + _key_length + 15);
00689   mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 13);
00690   mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4);
00691   mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length);
00692   mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/requests"), 9);
00693   printDeleteValuesPayload(&_mmqtt_print, deviceId, streamName, from, end);
00694   return readStatusCode();
00695 }
00696 
00697 int M2XMQTTClient::printDeleteValuesPayload(Print *print,
00698                                             const char* deviceId, const char* streamName,
00699                                             const char* from, const char* end) {
00700   int bytes = 0;
00701   bytes += print->print(F("{\"id\":\""));
00702   bytes += print->print(_current_id);
00703   bytes += print->print(F("\",\"method\":\"DELETE\",\"resource\":\""));
00704   if (_path_prefix) { bytes += print->print(_path_prefix); }
00705   bytes += print->print(F("/v2/devices/"));
00706   bytes += print->print(deviceId);
00707   bytes += print->print(F("/streams/"));
00708   bytes += print->print(streamName);
00709   bytes += print->print(F("/values"));
00710   bytes += print->print(F("\",\"agent\":\""));
00711   bytes += print->print(USER_AGENT);
00712   bytes += print->print(F("\",\"body\":{\"from\":\""));
00713   bytes += print->print(from);
00714   bytes += print->print(F("\",\"end\":\""));
00715   bytes += print->print(end);
00716   bytes += print->print(F(("\"}}")));
00717   return bytes;
00718 }
00719 
00720 int M2XMQTTClient::readStatusCode() {
00721   mmqtt_status_t status;
00722   uint32_t packet_length;
00723   uint8_t flag;
00724   int16_t parsed_id, response_status;
00725   struct mjson_ctx ctx;
00726   char buf[6];
00727   size_t buf_length;
00728 
00729   while (true) {
00730     status = mmqtt_s_decode_fixed_header(&_connection, m2x_mmqtt_pusher,
00731                                          &flag, &packet_length);
00732     if (status != MMQTT_STATUS_OK) {
00733       DBG("%s", F("Error decoding publish fixed header: "));
00734       DBGLN("%d", status);
00735       close();
00736       return E_DISCONNECTED;
00737     }
00738     if (MMQTT_UNPACK_MESSAGE_TYPE(flag) != MMQTT_MESSAGE_TYPE_PUBLISH) {
00739       status = mmqtt_s_skip_buffer(&_connection, m2x_mmqtt_pusher, packet_length);
00740       if (status != MMQTT_STATUS_OK) {
00741         DBG("%s", F("Error skipping non-publish packet: "));
00742         DBGLN("%d", status);
00743         close();
00744         return E_DISCONNECTED;
00745       }
00746     } else {
00747       /*
00748        * Since we only subscribe to one channel, there's no need to check channel name.
00749        */
00750       status = mmqtt_s_decode_string(&_connection, m2x_mmqtt_pusher, NULL, 0,
00751                                      NULL, NULL);
00752       if (status != MMQTT_STATUS_OK) {
00753         DBG("%s", F("Error skipping channel name string: "));
00754         DBGLN("%d", status);
00755         close();
00756         return E_DISCONNECTED;
00757       }
00758       /*
00759        * Parse JSON body for ID and status
00760        */
00761       mjson_init(&ctx, &_connection, m2x_mjson_reader);
00762       parsed_id = -1;
00763       if (mjson_readcheck_object_start(&ctx) != MJSON_OK) {
00764         /* Oops we have an error */
00765         DBG("%s", F("Publish Packet is not a JSON object!"));
00766         close();
00767         return E_DISCONNECTED;
00768       }
00769       while (mjson_read_object_separator_or_end(&ctx) != MJSON_SUBTYPE_OBJECT_END) {
00770         if (mjson_readcheck_string_start(&ctx) != MJSON_OK) {
00771           DBG("%s", F("Object key is not string!"));
00772           close();
00773           return E_DISCONNECTED;
00774         }
00775         mjson_read_full_string(&ctx, buf, 6, &buf_length);
00776         mjson_read_object_key_separator(&ctx);
00777         if (strncmp(buf, F("status"), 6) == 0) {
00778           mjson_read_int16(&ctx, &response_status);
00779         } else if (strncmp(buf, F("id"), 2) == 0) {
00780           /* Hack since we know the ID passed is actually an integer*/
00781           mjson_readcheck_string_start(&ctx);
00782           mjson_read_int16(&ctx, &parsed_id);
00783           mjson_read_string_end(&ctx);
00784         } else {
00785           mjson_skip_value(&ctx);
00786         }
00787       }
00788       if (parsed_id == _current_id) { return response_status; }
00789     }
00790   }
00791   return E_NOTREACHABLE;
00792 }
00793 
00794 void M2XMQTTClient::close() {
00795   _client->stop();
00796   _connected = false;
00797 }
00798 
00799 #endif  /* M2XMQTTCLIENT_H_ */