M2X MQTT Client for ARM MBED

Dependents:   WNCInterface_M2XMQTTdemo

Revision:
1:c4d41ff3c58e
Parent:
0:5a4798128c36
--- a/M2XMQTTClient.h	Mon Jun 27 13:25:45 2016 +0000
+++ b/M2XMQTTClient.h	Thu Sep 15 10:17:00 2016 +0000
@@ -166,6 +166,23 @@
   int updateLocation(const char* deviceId, const char* name,
                      T latitude, T longitude, T elevation);
 
+  // Delete values from a data stream
+  // You will need to provide from and end date/time strings in the ISO8601
+  // format "yyyy-mm-ddTHH:MM:SS.SSSZ" where
+  //   yyyy: the year
+  //   mm: the month
+  //   dd: the day
+  //   HH: the hour (24 hour format)
+  //   MM: the minute
+  //   SS.SSS: the seconds (to the millisecond)
+  // NOTE: the time is given in Zulu (GMT)
+  // M2X will delete all values within the from to end date/time range.
+  // The status code is 204 on success and 400 on a bad request (e.g. the
+  // timestamp is not in ISO8601 format or the from timestamp is not less than
+  // or equal to the end timestamp.
+  int deleteValues(const char* deviceId, const char* streamName,
+                   const char* from, const char* end);
+
   // Following fields are public so mmqtt callback functions can access directly
   Client* _client;
 private:
@@ -205,6 +222,10 @@
                                  const char* deviceId, const char* name,
                                  T latitude, T longitude, T elevation);
 
+  int printDeleteValuesPayload(Print* print,
+                               const char* deviceId, const char* streamName,
+                               const char* from, const char* end);
+
   int readStatusCode();
   void close();
 };
@@ -306,6 +327,7 @@
     connect_header.flags = 0x82;
     connect_header.keepalive = 60;
     packet_length = mmqtt_s_connect_header_encoded_length(&connect_header) +
+                    mmqtt_s_string_encoded_length(_key_length) +
                     mmqtt_s_string_encoded_length(_key_length);
     status = mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller,
                                          MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_CONNECT),
@@ -323,6 +345,16 @@
       _client->stop();
       return E_DISCONNECTED;
     }
+    /* Client ID */
+    status = mmqtt_s_encode_string(&_connection, m2x_mmqtt_puller,
+                                   (const uint8_t *) _key, _key_length);
+    if (status != MMQTT_STATUS_OK) {
+      DBG("%s", F("Error sending connect packet payload: "));
+      DBGLN("%d", status);
+      _client->stop();
+      return E_DISCONNECTED;
+    }
+    /* Username */
     status = mmqtt_s_encode_string(&_connection, m2x_mmqtt_puller,
                                    (const uint8_t *) _key, _key_length);
     if (status != MMQTT_STATUS_OK) {
@@ -462,6 +494,7 @@
   bytes += print->print(value);
   bytes += print->print(F("\"}"));
   bytes += print->print(F("}"));
+  return bytes;
 }
 
 template <class T>
@@ -639,6 +672,51 @@
   return bytes;
 }
 
+int M2XMQTTClient::deleteValues(const char* deviceId, const char* streamName,
+                                const char* from, const char* end) {
+  int length;
+  if (!_connected) {
+    if (connectToServer() != E_OK) {
+      DBGLN("%s", "ERROR: Cannot connect to M2X server!");
+      return E_NOCONNECTION;
+    }
+  }
+  _current_id++;
+  length = printDeleteValuesPayload(&_null_print, deviceId, streamName, from, end);
+  mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller,
+                              MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_PUBLISH),
+                              length + _key_length + 15);
+  mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 13);
+  mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4);
+  mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length);
+  mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/requests"), 9);
+  printDeleteValuesPayload(&_mmqtt_print, deviceId, streamName, from, end);
+  return readStatusCode();
+}
+
+int M2XMQTTClient::printDeleteValuesPayload(Print *print,
+                                            const char* deviceId, const char* streamName,
+                                            const char* from, const char* end) {
+  int bytes = 0;
+  bytes += print->print(F("{\"id\":\""));
+  bytes += print->print(_current_id);
+  bytes += print->print(F("\",\"method\":\"DELETE\",\"resource\":\""));
+  if (_path_prefix) { bytes += print->print(_path_prefix); }
+  bytes += print->print(F("/v2/devices/"));
+  bytes += print->print(deviceId);
+  bytes += print->print(F("/streams/"));
+  bytes += print->print(streamName);
+  bytes += print->print(F("/values"));
+  bytes += print->print(F("\",\"agent\":\""));
+  bytes += print->print(USER_AGENT);
+  bytes += print->print(F("\",\"body\":{\"from\":\""));
+  bytes += print->print(from);
+  bytes += print->print(F("\",\"end\":\""));
+  bytes += print->print(end);
+  bytes += print->print(F(("\"}}")));
+  return bytes;
+}
+
 int M2XMQTTClient::readStatusCode() {
   mmqtt_status_t status;
   uint32_t packet_length;