M2X MQTT Client for ARM MBED
Dependents: WNCInterface_M2XMQTTdemo
M2XMQTTClient.h
- Committer:
- citrusbyte
- Date:
- 2016-09-15
- Revision:
- 1:c4d41ff3c58e
- Parent:
- 0:5a4798128c36
File content as of revision 1:c4d41ff3c58e:
#ifndef M2XMQTTCLIENT_H_ #define M2XMQTTCLIENT_H_ #if (!defined(MBED_PLATFORM)) && (!defined(LINUX_PLATFORM)) #error "Platform definition is missing!" #endif #define M2X_VERSION "0.1.0" #ifdef MBED_PLATFORM #include "m2x-mbed.h" #endif /* MBED_PLATFORM */ #ifdef LINUX_PLATFORM #include "m2x-linux.h" #endif /* LINUX_PLATFORM */ /* If we don't have DBG defined, provide dump implementation */ #ifndef DBG #define DBG(fmt_, data_) #define DBGLN(fmt_, data_) #define DBGLNEND #endif /* DBG */ #define MIN(a, b) (((a) > (b))?(b):(a)) #define TO_HEX(t_) ((char) (((t_) > 9) ? ((t_) - 10 + 'A') : ((t_) + '0'))) #define MAX_DOUBLE_DIGITS 7 /* For tolower */ #include <ctype.h> static const int E_OK = 0; static const int E_NOCONNECTION = -1; static const int E_DISCONNECTED = -2; static const int E_NOTREACHABLE = -3; static const int E_INVALID = -4; static const int E_JSON_INVALID = -5; static const int E_BUFFER_TOO_SMALL = -6; static const int E_TIMESTAMP_ERROR = -8; static const char* DEFAULT_M2X_HOST = "api-m2x.att.com"; static const int DEFAULT_M2X_PORT = 1883; static inline bool m2x_status_is_success(int status) { return (status == E_OK) || (status >= 200 && status <= 299); } static inline bool m2x_status_is_client_error(int status) { return status >= 400 && status <= 499; } static inline bool m2x_status_is_server_error(int status) { return status >= 500 && status <= 599; } static inline bool m2x_status_is_error(int status) { return m2x_status_is_client_error(status) || m2x_status_is_server_error(status); } // Null Print class used to calculate length to print class NullPrint : public Print { public: size_t counter; virtual size_t write(uint8_t b) { counter++; return 1; } virtual size_t write(const uint8_t* buf, size_t size) { counter += size; return size; } }; // Handy helper class for printing MQTT payload using a Print class MMQTTPrint : public Print { public: mmqtt_connection *connection; mmqtt_s_puller puller; virtual size_t write(uint8_t b) { return write(&b, 1); } virtual size_t write(const uint8_t* buf, size_t size) { return mmqtt_s_encode_buffer(connection, puller, buf, size) == MMQTT_STATUS_OK ? size : -1; } }; class M2XMQTTClient { public: M2XMQTTClient(Client* client, const char* key, void (* idlefunc)(void) = NULL, bool keepalive = true, const char* host = DEFAULT_M2X_HOST, int port = DEFAULT_M2X_PORT, const char* path_prefix = NULL); // Push data stream value using PUT request, returns the HTTP status code // NOTE: if you want to update by a serial, use "serial/<serial ID>" as // the device ID here. template <class T> int updateStreamValue(const char* deviceId, const char* streamName, T value); // Post multiple values to M2X all at once. // +deviceId+ - id of the device to post values // +streamNum+ - Number of streams to post // +names+ - Array of stream names, the length of the array should // be exactly +streamNum+ // +counts+ - Array of +streamNum+ length, each item in this array // containing the number of values we want to post for each stream // +ats+ - Timestamps for each value, the length of this array should // be the some of all values in +counts+, for the first +counts[0]+ // items, the values belong to the first stream, for the following // +counts[1]+ number of items, the values belong to the second stream, // etc. Notice that timestamps are required here: you must provide // a timestamp for each value posted. // +values+ - Values to post. This works the same way as +ats+, the // first +counts[0]+ number of items contain values to post to the first // stream, the succeeding +counts[1]+ number of items contain values // for the second stream, etc. The length of this array should be // the sum of all values in +counts+ array. // NOTE: if you want to update by a serial, use "serial/<serial ID>" as // the device ID here. template <class T> int postDeviceUpdates(const char* deviceId, int streamNum, const char* names[], const int counts[], const char* ats[], T values[]); // Post multiple values of a single device at once. // +deviceId+ - id of the device to post values // +streamNum+ - Number of streams to post // +names+ - Array of stream names, the length of the array should // be exactly +streamNum+ // +values+ - Array of values to post, the length of the array should // be exactly +streamNum+. Notice that the array of +values+ should // match the array of +names+, and that the ith value in +values+ is // exactly the value to post for the ith stream name in +names+ // NOTE: if you want to update by a serial, use "serial/<serial ID>" as // the device ID here. template <class T> int postDeviceUpdate(const char* deviceId, int streamNum, const char* names[], T values[], const char* at = NULL); // Update datasource location // NOTE: On an Arduino Uno and other ATMEGA based boards, double has // 4-byte (32 bits) precision, which is the same as float. So there's // no natural double-precision floating number on these boards. With // a float value, we have a precision of roughly 7 digits, that means // either 5 or 6 digits after the floating point. According to wikipedia, // a difference of 0.00001 will give us ~1.1132m distance. If this // precision is good for you, you can use the double-version we provided // here. Otherwise, you may need to use the string-version and do the // actual conversion by yourselves. // However, with an Arduino Due board, double has 8-bytes (64 bits) // precision, which means you are free to use the double-version only // without any precision problems. // Returned value is the http status code. // NOTE: if you want to update by a serial, use "serial/<serial ID>" as // the device ID here. template <class T> int updateLocation(const char* deviceId, const char* name, T latitude, T longitude, T elevation); // Delete values from a data stream // You will need to provide from and end date/time strings in the ISO8601 // format "yyyy-mm-ddTHH:MM:SS.SSSZ" where // yyyy: the year // mm: the month // dd: the day // HH: the hour (24 hour format) // MM: the minute // SS.SSS: the seconds (to the millisecond) // NOTE: the time is given in Zulu (GMT) // M2X will delete all values within the from to end date/time range. // The status code is 204 on success and 400 on a bad request (e.g. the // timestamp is not in ISO8601 format or the from timestamp is not less than // or equal to the end timestamp. int deleteValues(const char* deviceId, const char* streamName, const char* from, const char* end); // Following fields are public so mmqtt callback functions can access directly Client* _client; private: struct mmqtt_connection _connection; const char* _key; uint16_t _key_length; bool _connected; bool _keepalive; const char* _host; int _port; void (* _idlefunc)(void); const char* _path_prefix; NullPrint _null_print; MMQTTPrint _mmqtt_print; int16_t _current_id; int connectToServer(); template <class T> int printUpdateStreamValuePayload(Print* print, const char* deviceId, const char* streamName, T value); template <class T> int printPostDeviceUpdatesPayload(Print* print, const char* deviceId, int streamNum, const char* names[], const int counts[], const char* ats[], T values[]); template <class T> int printPostDeviceUpdatePayload(Print* print, const char* deviceId, int streamNum, const char* names[], T values[], const char* at = NULL); template <class T> int printUpdateLocationPayload(Print* print, const char* deviceId, const char* name, T latitude, T longitude, T elevation); int printDeleteValuesPayload(Print* print, const char* deviceId, const char* streamName, const char* from, const char* end); int readStatusCode(); void close(); }; // Implementations M2XMQTTClient::M2XMQTTClient(Client* client, const char* key, void (* idlefunc)(void), bool keepalive, const char* host, int port, const char* path_prefix) : _client(client), _key(key), _idlefunc(idlefunc), _keepalive(keepalive), _connected(false), _host(host), _port(port), _path_prefix(path_prefix), _null_print(), _mmqtt_print(), _current_id(0) { _key_length = strlen(_key); } mmqtt_status_t m2x_mmqtt_puller(struct mmqtt_connection *connection) { const uint8_t *data = NULL; mmqtt_ssize_t length = 0; mmqtt_status_t status; M2XMQTTClient *client = (M2XMQTTClient *) connection->connection; Client *c = client->_client; struct mmqtt_stream *stream = mmqtt_connection_pullable_stream(connection); if (stream == NULL) { return MMQTT_STATUS_NOT_PULLABLE; } status = mmqtt_stream_external_pullable(stream, &data, &length); if (status != MMQTT_STATUS_OK) { return status; } length = c->write(data, length); if (length < 0) { c->stop(); return MMQTT_STATUS_BROKEN_CONNECTION; } mmqtt_stream_external_pull(stream, length); if (mmqtt_stream_running(stream) == MMQTT_STATUS_DONE) { mmqtt_connection_release_write_stream(connection, stream); } return MMQTT_STATUS_OK; } mmqtt_status_t m2x_mmqtt_pusher(struct mmqtt_connection *connection, mmqtt_ssize_t max_size) { uint8_t *data = NULL; mmqtt_ssize_t length = 0, i = 0; mmqtt_status_t status; M2XMQTTClient *client = (M2XMQTTClient *) connection->connection; Client *c = client->_client; struct mmqtt_stream *stream = mmqtt_connection_pushable_stream(connection); if (stream == NULL) { return MMQTT_STATUS_NOT_PUSHABLE; } status = mmqtt_stream_external_pushable(stream, &data, &length); if (status != MMQTT_STATUS_OK) { return status; } length = min(length, max_size); /* Maybe we need another field in signature documenting how much data we want, * so we can handle end condition gracefully? Not 100% if `left` field in * mmqtt_stream is enough */ while (i < length && c->available()) { data[i++] = c->read(); } mmqtt_stream_external_push(stream, i); return MMQTT_STATUS_OK; } size_t m2x_mjson_reader(struct mjson_ctx *ctx, char *data, size_t limit) { mmqtt_connection *connection = (mmqtt_connection *) ctx->userdata; return mmqtt_s_decode_buffer(connection, m2x_mmqtt_pusher, (uint8_t *) data, limit, limit) == MMQTT_STATUS_OK ? limit : 0; } int M2XMQTTClient::connectToServer() { mmqtt_status_t status; struct mmqtt_p_connect_header connect_header; struct mmqtt_p_connack_header connack_header; uint32_t packet_length; uint8_t flag; uint16_t length; uint8_t name[6]; if (_client->connect(_host, _port)) { DBGLN("%s", F("Connected to M2X MQTT server!")); mmqtt_connection_init(&_connection, this); /* Send CONNECT packet first */ connect_header.name = name; strncpy((char *)name, F("MQIsdp"), 6); connect_header.name_length = connect_header.name_max_length = 6; connect_header.protocol_version = 3; /* Clean session with username set */ connect_header.flags = 0x82; connect_header.keepalive = 60; packet_length = mmqtt_s_connect_header_encoded_length(&connect_header) + mmqtt_s_string_encoded_length(_key_length) + mmqtt_s_string_encoded_length(_key_length); status = mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller, MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_CONNECT), packet_length); if (status != MMQTT_STATUS_OK) { DBG("%s", F("Error sending connect packet fixed header: ")); DBGLN("%d", status); _client->stop(); return E_DISCONNECTED; } status = mmqtt_s_encode_connect_header(&_connection, m2x_mmqtt_puller, &connect_header); if (status != MMQTT_STATUS_OK) { DBG("%s", F("Error sending connect packet variable header: ")); DBGLN("%d", status); _client->stop(); return E_DISCONNECTED; } /* Client ID */ status = mmqtt_s_encode_string(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length); if (status != MMQTT_STATUS_OK) { DBG("%s", F("Error sending connect packet payload: ")); DBGLN("%d", status); _client->stop(); return E_DISCONNECTED; } /* Username */ status = mmqtt_s_encode_string(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length); if (status != MMQTT_STATUS_OK) { DBG("%s", F("Error sending connect packet payload: ")); DBGLN("%d", status); _client->stop(); return E_DISCONNECTED; } /* Check CONNACK packet */ do { status = mmqtt_s_decode_fixed_header(&_connection, m2x_mmqtt_pusher, &flag, &packet_length); if (status != MMQTT_STATUS_OK) { DBG("%s", F("Error decoding connack fixed header: ")); DBGLN("%d", status); _client->stop(); return E_DISCONNECTED; } if (MMQTT_UNPACK_MESSAGE_TYPE(flag) != MMQTT_MESSAGE_TYPE_CONNACK) { status = mmqtt_s_skip_buffer(&_connection, m2x_mmqtt_pusher, packet_length); if (status != MMQTT_STATUS_OK) { DBG("%s", F("Error skipping non-connack packet: ")); DBGLN("%d", status); _client->stop(); return E_DISCONNECTED; } } } while (MMQTT_UNPACK_MESSAGE_TYPE(flag) != MMQTT_MESSAGE_TYPE_CONNACK); status = mmqtt_s_decode_connack_header(&_connection, m2x_mmqtt_pusher, &connack_header); if (status != MMQTT_STATUS_OK) { DBG("%s", F("Error decoding connack variable header: ")); DBGLN("%d", status); _client->stop(); return E_DISCONNECTED; } if (connack_header.return_code != 0x0) { DBG("%s", F("CONNACK return code is not accepted: ")); DBGLN("%d", connack_header.return_code); _client->stop(); return E_DISCONNECTED; } /* Send SUBSCRIBE packet*/ length = _key_length + 15 + 4; status = mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller, MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_SUBSCRIBE) | 0x2, length); if (status != MMQTT_STATUS_OK) { DBG("%s", F("Error sending subscribe packet fixed header: ")); DBGLN("%d", status); _client->stop(); return E_DISCONNECTED; } // Subscribe packet must use QoS 1 mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, 0); mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 14); mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4); mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length); // The extra one is QoS, added here to save a function call mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/responses\0"), 11); /* Check SUBACK packet */ do { status = mmqtt_s_decode_fixed_header(&_connection, m2x_mmqtt_pusher, &flag, &packet_length); if (status != MMQTT_STATUS_OK) { DBG("%s", F("Error decoding suback fixed header: ")); DBGLN("%d", status); _client->stop(); return E_DISCONNECTED; } if (MMQTT_UNPACK_MESSAGE_TYPE(flag) != MMQTT_MESSAGE_TYPE_SUBACK) { status = mmqtt_s_skip_buffer(&_connection, m2x_mmqtt_pusher, packet_length); if (status != MMQTT_STATUS_OK) { DBG("%s", F("Error skipping packet: ")); DBGLN("%d", status); _client->stop(); return E_DISCONNECTED; } } } while (MMQTT_UNPACK_MESSAGE_TYPE(flag) != MMQTT_MESSAGE_TYPE_SUBACK); status = mmqtt_s_skip_buffer(&_connection, m2x_mmqtt_pusher, packet_length); if (status != MMQTT_STATUS_OK) { DBG("%s", F("Error skipping suback packet: ")); DBGLN("%d", status); _client->stop(); return E_DISCONNECTED; } _mmqtt_print.connection = &_connection; _mmqtt_print.puller = m2x_mmqtt_puller; _connected = true; return E_OK; } else { DBGLN("%s", F("ERROR: Cannot connect to M2X MQTT server!")); return E_NOCONNECTION; } } template <class T> int M2XMQTTClient::updateStreamValue(const char* deviceId, const char* streamName, T value) { int length; if (!_connected) { if (connectToServer() != E_OK) { DBGLN("%s", "ERROR: Cannot connect to M2X server!"); return E_NOCONNECTION; } } _current_id++; length = printUpdateStreamValuePayload(&_null_print, deviceId, streamName, value); mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller, MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_PUBLISH), length + _key_length + 15); mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 13); mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4); mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length); mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/requests"), 9); printUpdateStreamValuePayload(&_mmqtt_print, deviceId, streamName, value); return readStatusCode(); } template <class T> int M2XMQTTClient::printUpdateStreamValuePayload(Print* print, const char* deviceId, const char* streamName, T value) { int bytes = 0; bytes += print->print(F("{\"id\":\"")); bytes += print->print(_current_id); bytes += print->print(F("\",\"method\":\"PUT\",\"resource\":\"")); if (_path_prefix) { bytes += print->print(_path_prefix); } bytes += print->print(F("/v2/devices/")); bytes += print->print(deviceId); bytes += print->print(F("/streams/")); bytes += print->print(streamName); bytes += print->print(F("/value")); bytes += print->print(F("\",\"agent\":\"")); bytes += print->print(USER_AGENT); bytes += print->print(F("\",\"body\":")); bytes += print->print(F("{\"value\":\"")); bytes += print->print(value); bytes += print->print(F("\"}")); bytes += print->print(F("}")); return bytes; } template <class T> int M2XMQTTClient::postDeviceUpdates(const char* deviceId, int streamNum, const char* names[], const int counts[], const char* ats[], T values[]) { int length; if (!_connected) { if (connectToServer() != E_OK) { DBGLN("%s", "ERROR: Cannot connect to M2X server!"); return E_NOCONNECTION; } } _current_id++; length = printPostDeviceUpdatesPayload(&_null_print, deviceId, streamNum, names, counts, ats, values); mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller, MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_PUBLISH), length + _key_length + 15); mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 13); mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4); mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length); mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/requests"), 9); printPostDeviceUpdatesPayload(&_mmqtt_print, deviceId, streamNum, names, counts, ats, values); return readStatusCode(); } template <class T> int M2XMQTTClient::printPostDeviceUpdatesPayload(Print* print, const char* deviceId, int streamNum, const char* names[], const int counts[], const char* ats[], T values[]) { int bytes = 0, value_index = 0, i, j; bytes += print->print(F("{\"id\":\"")); bytes += print->print(_current_id); bytes += print->print(F("\",\"method\":\"POST\",\"resource\":\"")); if (_path_prefix) { bytes += print->print(_path_prefix); } bytes += print->print(F("/v2/devices/")); bytes += print->print(deviceId); bytes += print->print(F("/updates")); bytes += print->print(F("\",\"agent\":\"")); bytes += print->print(USER_AGENT); bytes += print->print(F("\",\"body\":")); bytes += print->print(F("{\"values\":{")); for (i = 0; i < streamNum; i++) { bytes += print->print(F("\"")); bytes += print->print(names[i]); bytes += print->print(F("\":[")); for (j = 0; j < counts[i]; j++) { bytes += print->print(F("{\"timestamp\": \"")); bytes += print->print(ats[value_index]); bytes += print->print(F("\",\"value\": \"")); bytes += print->print(values[value_index]); bytes += print->print(F("\"}")); if (j < counts[i] - 1) { bytes += print->print(F(",")); } value_index++; } bytes += print->print(F("]")); if (i < streamNum - 1) { bytes += print->print(F(",")); } } bytes += print->print(F(("}}}"))); return bytes; } template <class T> int M2XMQTTClient::postDeviceUpdate(const char* deviceId, int streamNum, const char* names[], T values[], const char* at) { int length; if (!_connected) { if (connectToServer() != E_OK) { DBGLN("%s", "ERROR: Cannot connect to M2X server!"); return E_NOCONNECTION; } } _current_id++; length = printPostDeviceUpdatePayload(&_null_print, deviceId, streamNum, names, values, at); mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller, MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_PUBLISH), length + _key_length + 15); mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 13); mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4); mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length); mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/requests"), 9); printPostDeviceUpdatePayload(&_mmqtt_print, deviceId, streamNum, names, values, at); return readStatusCode(); } template <class T> int M2XMQTTClient::printPostDeviceUpdatePayload(Print* print, const char* deviceId, int streamNum, const char* names[], T values[], const char* at) { int bytes = 0, i; bytes += print->print(F("{\"id\":\"")); bytes += print->print(_current_id); bytes += print->print(F("\",\"method\":\"POST\",\"resource\":\"")); if (_path_prefix) { bytes += print->print(_path_prefix); } bytes += print->print(F("/v2/devices/")); bytes += print->print(deviceId); bytes += print->print(F("/update")); bytes += print->print(F("\",\"agent\":\"")); bytes += print->print(USER_AGENT); bytes += print->print(F("\",\"body\":")); bytes += print->print(F("{\"values\":{")); for (int i = 0; i < streamNum; i++) { bytes += print->print(F("\"")); bytes += print->print(names[i]); bytes += print->print(F("\": \"")); bytes += print->print(values[i]); bytes += print->print(F("\"")); if (i < streamNum - 1) { bytes += print->print(F(",")); } } bytes += print->print(F("}")); if (at != NULL) { bytes += print->print(F(",\"timestamp\":\"")); bytes += print->print(at); bytes += print->print(F("\"")); } bytes += print->print(F(("}"))); return bytes; } template <class T> int M2XMQTTClient::updateLocation(const char* deviceId, const char* name, T latitude, T longitude, T elevation) { int length; if (!_connected) { if (connectToServer() != E_OK) { DBGLN("%s", "ERROR: Cannot connect to M2X server!"); return E_NOCONNECTION; } } _current_id++; length = printUpdateLocationPayload(&_null_print, deviceId, name, latitude, longitude, elevation); mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller, MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_PUBLISH), length + _key_length + 15); mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 13); mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4); mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length); mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/requests"), 9); printUpdateLocationPayload(&_mmqtt_print, deviceId, name, latitude, longitude, elevation); return readStatusCode(); } template <class T> int M2XMQTTClient::printUpdateLocationPayload(Print* print, const char* deviceId, const char* name, T latitude, T longitude, T elevation) { int bytes = 0; bytes += print->print(F("{\"id\":\"")); bytes += print->print(_current_id); bytes += print->print(F("\",\"method\":\"PUT\",\"resource\":\"")); if (_path_prefix) { bytes += print->print(_path_prefix); } bytes += print->print(F("/v2/devices/")); bytes += print->print(deviceId); bytes += print->print(F("/location")); bytes += print->print(F("\",\"agent\":\"")); bytes += print->print(USER_AGENT); bytes += print->print(F("\",\"body\":{\"name\":\"")); bytes += print->print(name); bytes += print->print(F("\",\"latitude\":\"")); bytes += print->print(latitude); bytes += print->print(F("\",\"longitude\":\"")); bytes += print->print(longitude); bytes += print->print(F("\",\"elevation\":\"")); bytes += print->print(elevation); bytes += print->print(F(("\"}}"))); return bytes; } int M2XMQTTClient::deleteValues(const char* deviceId, const char* streamName, const char* from, const char* end) { int length; if (!_connected) { if (connectToServer() != E_OK) { DBGLN("%s", "ERROR: Cannot connect to M2X server!"); return E_NOCONNECTION; } } _current_id++; length = printDeleteValuesPayload(&_null_print, deviceId, streamName, from, end); mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller, MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_PUBLISH), length + _key_length + 15); mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 13); mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4); mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length); mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/requests"), 9); printDeleteValuesPayload(&_mmqtt_print, deviceId, streamName, from, end); return readStatusCode(); } int M2XMQTTClient::printDeleteValuesPayload(Print *print, const char* deviceId, const char* streamName, const char* from, const char* end) { int bytes = 0; bytes += print->print(F("{\"id\":\"")); bytes += print->print(_current_id); bytes += print->print(F("\",\"method\":\"DELETE\",\"resource\":\"")); if (_path_prefix) { bytes += print->print(_path_prefix); } bytes += print->print(F("/v2/devices/")); bytes += print->print(deviceId); bytes += print->print(F("/streams/")); bytes += print->print(streamName); bytes += print->print(F("/values")); bytes += print->print(F("\",\"agent\":\"")); bytes += print->print(USER_AGENT); bytes += print->print(F("\",\"body\":{\"from\":\"")); bytes += print->print(from); bytes += print->print(F("\",\"end\":\"")); bytes += print->print(end); bytes += print->print(F(("\"}}"))); return bytes; } int M2XMQTTClient::readStatusCode() { mmqtt_status_t status; uint32_t packet_length; uint8_t flag; int16_t parsed_id, response_status; struct mjson_ctx ctx; char buf[6]; size_t buf_length; while (true) { status = mmqtt_s_decode_fixed_header(&_connection, m2x_mmqtt_pusher, &flag, &packet_length); if (status != MMQTT_STATUS_OK) { DBG("%s", F("Error decoding publish fixed header: ")); DBGLN("%d", status); close(); return E_DISCONNECTED; } if (MMQTT_UNPACK_MESSAGE_TYPE(flag) != MMQTT_MESSAGE_TYPE_PUBLISH) { status = mmqtt_s_skip_buffer(&_connection, m2x_mmqtt_pusher, packet_length); if (status != MMQTT_STATUS_OK) { DBG("%s", F("Error skipping non-publish packet: ")); DBGLN("%d", status); close(); return E_DISCONNECTED; } } else { /* * Since we only subscribe to one channel, there's no need to check channel name. */ status = mmqtt_s_decode_string(&_connection, m2x_mmqtt_pusher, NULL, 0, NULL, NULL); if (status != MMQTT_STATUS_OK) { DBG("%s", F("Error skipping channel name string: ")); DBGLN("%d", status); close(); return E_DISCONNECTED; } /* * Parse JSON body for ID and status */ mjson_init(&ctx, &_connection, m2x_mjson_reader); parsed_id = -1; if (mjson_readcheck_object_start(&ctx) != MJSON_OK) { /* Oops we have an error */ DBG("%s", F("Publish Packet is not a JSON object!")); close(); return E_DISCONNECTED; } while (mjson_read_object_separator_or_end(&ctx) != MJSON_SUBTYPE_OBJECT_END) { if (mjson_readcheck_string_start(&ctx) != MJSON_OK) { DBG("%s", F("Object key is not string!")); close(); return E_DISCONNECTED; } mjson_read_full_string(&ctx, buf, 6, &buf_length); mjson_read_object_key_separator(&ctx); if (strncmp(buf, F("status"), 6) == 0) { mjson_read_int16(&ctx, &response_status); } else if (strncmp(buf, F("id"), 2) == 0) { /* Hack since we know the ID passed is actually an integer*/ mjson_readcheck_string_start(&ctx); mjson_read_int16(&ctx, &parsed_id); mjson_read_string_end(&ctx); } else { mjson_skip_value(&ctx); } } if (parsed_id == _current_id) { return response_status; } } } return E_NOTREACHABLE; } void M2XMQTTClient::close() { _client->stop(); _connected = false; } #endif /* M2XMQTTCLIENT_H_ */