M2X MQTT Client for ARM MBED
Dependents: WNCInterface_M2XMQTTdemo
Revision 1:c4d41ff3c58e, committed 2016-09-15
- Comitter:
- citrusbyte
- Date:
- Thu Sep 15 10:17:00 2016 +0000
- Parent:
- 0:5a4798128c36
- Commit message:
- Update library to latest version
Changed in this revision
M2XMQTTClient.h | Show annotated file Show diff for this revision Revisions of this file |
diff -r 5a4798128c36 -r c4d41ff3c58e M2XMQTTClient.h --- a/M2XMQTTClient.h Mon Jun 27 13:25:45 2016 +0000 +++ b/M2XMQTTClient.h Thu Sep 15 10:17:00 2016 +0000 @@ -166,6 +166,23 @@ int updateLocation(const char* deviceId, const char* name, T latitude, T longitude, T elevation); + // Delete values from a data stream + // You will need to provide from and end date/time strings in the ISO8601 + // format "yyyy-mm-ddTHH:MM:SS.SSSZ" where + // yyyy: the year + // mm: the month + // dd: the day + // HH: the hour (24 hour format) + // MM: the minute + // SS.SSS: the seconds (to the millisecond) + // NOTE: the time is given in Zulu (GMT) + // M2X will delete all values within the from to end date/time range. + // The status code is 204 on success and 400 on a bad request (e.g. the + // timestamp is not in ISO8601 format or the from timestamp is not less than + // or equal to the end timestamp. + int deleteValues(const char* deviceId, const char* streamName, + const char* from, const char* end); + // Following fields are public so mmqtt callback functions can access directly Client* _client; private: @@ -205,6 +222,10 @@ const char* deviceId, const char* name, T latitude, T longitude, T elevation); + int printDeleteValuesPayload(Print* print, + const char* deviceId, const char* streamName, + const char* from, const char* end); + int readStatusCode(); void close(); }; @@ -306,6 +327,7 @@ connect_header.flags = 0x82; connect_header.keepalive = 60; packet_length = mmqtt_s_connect_header_encoded_length(&connect_header) + + mmqtt_s_string_encoded_length(_key_length) + mmqtt_s_string_encoded_length(_key_length); status = mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller, MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_CONNECT), @@ -323,6 +345,16 @@ _client->stop(); return E_DISCONNECTED; } + /* Client ID */ + status = mmqtt_s_encode_string(&_connection, m2x_mmqtt_puller, + (const uint8_t *) _key, _key_length); + if (status != MMQTT_STATUS_OK) { + DBG("%s", F("Error sending connect packet payload: ")); + DBGLN("%d", status); + _client->stop(); + return E_DISCONNECTED; + } + /* Username */ status = mmqtt_s_encode_string(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length); if (status != MMQTT_STATUS_OK) { @@ -462,6 +494,7 @@ bytes += print->print(value); bytes += print->print(F("\"}")); bytes += print->print(F("}")); + return bytes; } template <class T> @@ -639,6 +672,51 @@ return bytes; } +int M2XMQTTClient::deleteValues(const char* deviceId, const char* streamName, + const char* from, const char* end) { + int length; + if (!_connected) { + if (connectToServer() != E_OK) { + DBGLN("%s", "ERROR: Cannot connect to M2X server!"); + return E_NOCONNECTION; + } + } + _current_id++; + length = printDeleteValuesPayload(&_null_print, deviceId, streamName, from, end); + mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller, + MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_PUBLISH), + length + _key_length + 15); + mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 13); + mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4); + mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length); + mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/requests"), 9); + printDeleteValuesPayload(&_mmqtt_print, deviceId, streamName, from, end); + return readStatusCode(); +} + +int M2XMQTTClient::printDeleteValuesPayload(Print *print, + const char* deviceId, const char* streamName, + const char* from, const char* end) { + int bytes = 0; + bytes += print->print(F("{\"id\":\"")); + bytes += print->print(_current_id); + bytes += print->print(F("\",\"method\":\"DELETE\",\"resource\":\"")); + if (_path_prefix) { bytes += print->print(_path_prefix); } + bytes += print->print(F("/v2/devices/")); + bytes += print->print(deviceId); + bytes += print->print(F("/streams/")); + bytes += print->print(streamName); + bytes += print->print(F("/values")); + bytes += print->print(F("\",\"agent\":\"")); + bytes += print->print(USER_AGENT); + bytes += print->print(F("\",\"body\":{\"from\":\"")); + bytes += print->print(from); + bytes += print->print(F("\",\"end\":\"")); + bytes += print->print(end); + bytes += print->print(F(("\"}}"))); + return bytes; +} + int M2XMQTTClient::readStatusCode() { mmqtt_status_t status; uint32_t packet_length;