M2X MQTT Client for ARM MBED

Dependents:   WNCInterface_M2XMQTTdemo

Committer:
citrusbyte
Date:
Mon Jun 27 13:25:45 2016 +0000
Revision:
0:5a4798128c36
Child:
1:c4d41ff3c58e
Initial Commit

Who changed what in which revision?

UserRevisionLine numberNew contents of line
citrusbyte 0:5a4798128c36 1 #ifndef M2XMQTTCLIENT_H_
citrusbyte 0:5a4798128c36 2 #define M2XMQTTCLIENT_H_
citrusbyte 0:5a4798128c36 3
citrusbyte 0:5a4798128c36 4 #if (!defined(MBED_PLATFORM)) && (!defined(LINUX_PLATFORM))
citrusbyte 0:5a4798128c36 5 #error "Platform definition is missing!"
citrusbyte 0:5a4798128c36 6 #endif
citrusbyte 0:5a4798128c36 7
citrusbyte 0:5a4798128c36 8 #define M2X_VERSION "0.1.0"
citrusbyte 0:5a4798128c36 9
citrusbyte 0:5a4798128c36 10 #ifdef MBED_PLATFORM
citrusbyte 0:5a4798128c36 11 #include "m2x-mbed.h"
citrusbyte 0:5a4798128c36 12 #endif /* MBED_PLATFORM */
citrusbyte 0:5a4798128c36 13
citrusbyte 0:5a4798128c36 14 #ifdef LINUX_PLATFORM
citrusbyte 0:5a4798128c36 15 #include "m2x-linux.h"
citrusbyte 0:5a4798128c36 16 #endif /* LINUX_PLATFORM */
citrusbyte 0:5a4798128c36 17
citrusbyte 0:5a4798128c36 18 /* If we don't have DBG defined, provide dump implementation */
citrusbyte 0:5a4798128c36 19 #ifndef DBG
citrusbyte 0:5a4798128c36 20 #define DBG(fmt_, data_)
citrusbyte 0:5a4798128c36 21 #define DBGLN(fmt_, data_)
citrusbyte 0:5a4798128c36 22 #define DBGLNEND
citrusbyte 0:5a4798128c36 23 #endif /* DBG */
citrusbyte 0:5a4798128c36 24
citrusbyte 0:5a4798128c36 25 #define MIN(a, b) (((a) > (b))?(b):(a))
citrusbyte 0:5a4798128c36 26 #define TO_HEX(t_) ((char) (((t_) > 9) ? ((t_) - 10 + 'A') : ((t_) + '0')))
citrusbyte 0:5a4798128c36 27 #define MAX_DOUBLE_DIGITS 7
citrusbyte 0:5a4798128c36 28
citrusbyte 0:5a4798128c36 29 /* For tolower */
citrusbyte 0:5a4798128c36 30 #include <ctype.h>
citrusbyte 0:5a4798128c36 31
citrusbyte 0:5a4798128c36 32 static const int E_OK = 0;
citrusbyte 0:5a4798128c36 33 static const int E_NOCONNECTION = -1;
citrusbyte 0:5a4798128c36 34 static const int E_DISCONNECTED = -2;
citrusbyte 0:5a4798128c36 35 static const int E_NOTREACHABLE = -3;
citrusbyte 0:5a4798128c36 36 static const int E_INVALID = -4;
citrusbyte 0:5a4798128c36 37 static const int E_JSON_INVALID = -5;
citrusbyte 0:5a4798128c36 38 static const int E_BUFFER_TOO_SMALL = -6;
citrusbyte 0:5a4798128c36 39 static const int E_TIMESTAMP_ERROR = -8;
citrusbyte 0:5a4798128c36 40
citrusbyte 0:5a4798128c36 41 static const char* DEFAULT_M2X_HOST = "api-m2x.att.com";
citrusbyte 0:5a4798128c36 42 static const int DEFAULT_M2X_PORT = 1883;
citrusbyte 0:5a4798128c36 43
citrusbyte 0:5a4798128c36 44 static inline bool m2x_status_is_success(int status) {
citrusbyte 0:5a4798128c36 45 return (status == E_OK) || (status >= 200 && status <= 299);
citrusbyte 0:5a4798128c36 46 }
citrusbyte 0:5a4798128c36 47
citrusbyte 0:5a4798128c36 48 static inline bool m2x_status_is_client_error(int status) {
citrusbyte 0:5a4798128c36 49 return status >= 400 && status <= 499;
citrusbyte 0:5a4798128c36 50 }
citrusbyte 0:5a4798128c36 51
citrusbyte 0:5a4798128c36 52 static inline bool m2x_status_is_server_error(int status) {
citrusbyte 0:5a4798128c36 53 return status >= 500 && status <= 599;
citrusbyte 0:5a4798128c36 54 }
citrusbyte 0:5a4798128c36 55
citrusbyte 0:5a4798128c36 56 static inline bool m2x_status_is_error(int status) {
citrusbyte 0:5a4798128c36 57 return m2x_status_is_client_error(status) ||
citrusbyte 0:5a4798128c36 58 m2x_status_is_server_error(status);
citrusbyte 0:5a4798128c36 59 }
citrusbyte 0:5a4798128c36 60
citrusbyte 0:5a4798128c36 61 // Null Print class used to calculate length to print
citrusbyte 0:5a4798128c36 62 class NullPrint : public Print {
citrusbyte 0:5a4798128c36 63 public:
citrusbyte 0:5a4798128c36 64 size_t counter;
citrusbyte 0:5a4798128c36 65
citrusbyte 0:5a4798128c36 66 virtual size_t write(uint8_t b) {
citrusbyte 0:5a4798128c36 67 counter++;
citrusbyte 0:5a4798128c36 68 return 1;
citrusbyte 0:5a4798128c36 69 }
citrusbyte 0:5a4798128c36 70
citrusbyte 0:5a4798128c36 71 virtual size_t write(const uint8_t* buf, size_t size) {
citrusbyte 0:5a4798128c36 72 counter += size;
citrusbyte 0:5a4798128c36 73 return size;
citrusbyte 0:5a4798128c36 74 }
citrusbyte 0:5a4798128c36 75 };
citrusbyte 0:5a4798128c36 76
citrusbyte 0:5a4798128c36 77 // Handy helper class for printing MQTT payload using a Print
citrusbyte 0:5a4798128c36 78 class MMQTTPrint : public Print {
citrusbyte 0:5a4798128c36 79 public:
citrusbyte 0:5a4798128c36 80 mmqtt_connection *connection;
citrusbyte 0:5a4798128c36 81 mmqtt_s_puller puller;
citrusbyte 0:5a4798128c36 82
citrusbyte 0:5a4798128c36 83 virtual size_t write(uint8_t b) {
citrusbyte 0:5a4798128c36 84 return write(&b, 1);
citrusbyte 0:5a4798128c36 85 }
citrusbyte 0:5a4798128c36 86
citrusbyte 0:5a4798128c36 87 virtual size_t write(const uint8_t* buf, size_t size) {
citrusbyte 0:5a4798128c36 88 return mmqtt_s_encode_buffer(connection, puller, buf, size) == MMQTT_STATUS_OK ? size : -1;
citrusbyte 0:5a4798128c36 89 }
citrusbyte 0:5a4798128c36 90 };
citrusbyte 0:5a4798128c36 91
citrusbyte 0:5a4798128c36 92 class M2XMQTTClient {
citrusbyte 0:5a4798128c36 93 public:
citrusbyte 0:5a4798128c36 94 M2XMQTTClient(Client* client,
citrusbyte 0:5a4798128c36 95 const char* key,
citrusbyte 0:5a4798128c36 96 void (* idlefunc)(void) = NULL,
citrusbyte 0:5a4798128c36 97 bool keepalive = true,
citrusbyte 0:5a4798128c36 98 const char* host = DEFAULT_M2X_HOST,
citrusbyte 0:5a4798128c36 99 int port = DEFAULT_M2X_PORT,
citrusbyte 0:5a4798128c36 100 const char* path_prefix = NULL);
citrusbyte 0:5a4798128c36 101
citrusbyte 0:5a4798128c36 102 // Push data stream value using PUT request, returns the HTTP status code
citrusbyte 0:5a4798128c36 103 // NOTE: if you want to update by a serial, use "serial/<serial ID>" as
citrusbyte 0:5a4798128c36 104 // the device ID here.
citrusbyte 0:5a4798128c36 105 template <class T>
citrusbyte 0:5a4798128c36 106 int updateStreamValue(const char* deviceId, const char* streamName, T value);
citrusbyte 0:5a4798128c36 107
citrusbyte 0:5a4798128c36 108 // Post multiple values to M2X all at once.
citrusbyte 0:5a4798128c36 109 // +deviceId+ - id of the device to post values
citrusbyte 0:5a4798128c36 110 // +streamNum+ - Number of streams to post
citrusbyte 0:5a4798128c36 111 // +names+ - Array of stream names, the length of the array should
citrusbyte 0:5a4798128c36 112 // be exactly +streamNum+
citrusbyte 0:5a4798128c36 113 // +counts+ - Array of +streamNum+ length, each item in this array
citrusbyte 0:5a4798128c36 114 // containing the number of values we want to post for each stream
citrusbyte 0:5a4798128c36 115 // +ats+ - Timestamps for each value, the length of this array should
citrusbyte 0:5a4798128c36 116 // be the some of all values in +counts+, for the first +counts[0]+
citrusbyte 0:5a4798128c36 117 // items, the values belong to the first stream, for the following
citrusbyte 0:5a4798128c36 118 // +counts[1]+ number of items, the values belong to the second stream,
citrusbyte 0:5a4798128c36 119 // etc. Notice that timestamps are required here: you must provide
citrusbyte 0:5a4798128c36 120 // a timestamp for each value posted.
citrusbyte 0:5a4798128c36 121 // +values+ - Values to post. This works the same way as +ats+, the
citrusbyte 0:5a4798128c36 122 // first +counts[0]+ number of items contain values to post to the first
citrusbyte 0:5a4798128c36 123 // stream, the succeeding +counts[1]+ number of items contain values
citrusbyte 0:5a4798128c36 124 // for the second stream, etc. The length of this array should be
citrusbyte 0:5a4798128c36 125 // the sum of all values in +counts+ array.
citrusbyte 0:5a4798128c36 126 // NOTE: if you want to update by a serial, use "serial/<serial ID>" as
citrusbyte 0:5a4798128c36 127 // the device ID here.
citrusbyte 0:5a4798128c36 128 template <class T>
citrusbyte 0:5a4798128c36 129 int postDeviceUpdates(const char* deviceId, int streamNum,
citrusbyte 0:5a4798128c36 130 const char* names[], const int counts[],
citrusbyte 0:5a4798128c36 131 const char* ats[], T values[]);
citrusbyte 0:5a4798128c36 132
citrusbyte 0:5a4798128c36 133 // Post multiple values of a single device at once.
citrusbyte 0:5a4798128c36 134 // +deviceId+ - id of the device to post values
citrusbyte 0:5a4798128c36 135 // +streamNum+ - Number of streams to post
citrusbyte 0:5a4798128c36 136 // +names+ - Array of stream names, the length of the array should
citrusbyte 0:5a4798128c36 137 // be exactly +streamNum+
citrusbyte 0:5a4798128c36 138 // +values+ - Array of values to post, the length of the array should
citrusbyte 0:5a4798128c36 139 // be exactly +streamNum+. Notice that the array of +values+ should
citrusbyte 0:5a4798128c36 140 // match the array of +names+, and that the ith value in +values+ is
citrusbyte 0:5a4798128c36 141 // exactly the value to post for the ith stream name in +names+
citrusbyte 0:5a4798128c36 142 // NOTE: if you want to update by a serial, use "serial/<serial ID>" as
citrusbyte 0:5a4798128c36 143 // the device ID here.
citrusbyte 0:5a4798128c36 144 template <class T>
citrusbyte 0:5a4798128c36 145 int postDeviceUpdate(const char* deviceId, int streamNum,
citrusbyte 0:5a4798128c36 146 const char* names[], T values[],
citrusbyte 0:5a4798128c36 147 const char* at = NULL);
citrusbyte 0:5a4798128c36 148
citrusbyte 0:5a4798128c36 149 // Update datasource location
citrusbyte 0:5a4798128c36 150 // NOTE: On an Arduino Uno and other ATMEGA based boards, double has
citrusbyte 0:5a4798128c36 151 // 4-byte (32 bits) precision, which is the same as float. So there's
citrusbyte 0:5a4798128c36 152 // no natural double-precision floating number on these boards. With
citrusbyte 0:5a4798128c36 153 // a float value, we have a precision of roughly 7 digits, that means
citrusbyte 0:5a4798128c36 154 // either 5 or 6 digits after the floating point. According to wikipedia,
citrusbyte 0:5a4798128c36 155 // a difference of 0.00001 will give us ~1.1132m distance. If this
citrusbyte 0:5a4798128c36 156 // precision is good for you, you can use the double-version we provided
citrusbyte 0:5a4798128c36 157 // here. Otherwise, you may need to use the string-version and do the
citrusbyte 0:5a4798128c36 158 // actual conversion by yourselves.
citrusbyte 0:5a4798128c36 159 // However, with an Arduino Due board, double has 8-bytes (64 bits)
citrusbyte 0:5a4798128c36 160 // precision, which means you are free to use the double-version only
citrusbyte 0:5a4798128c36 161 // without any precision problems.
citrusbyte 0:5a4798128c36 162 // Returned value is the http status code.
citrusbyte 0:5a4798128c36 163 // NOTE: if you want to update by a serial, use "serial/<serial ID>" as
citrusbyte 0:5a4798128c36 164 // the device ID here.
citrusbyte 0:5a4798128c36 165 template <class T>
citrusbyte 0:5a4798128c36 166 int updateLocation(const char* deviceId, const char* name,
citrusbyte 0:5a4798128c36 167 T latitude, T longitude, T elevation);
citrusbyte 0:5a4798128c36 168
citrusbyte 0:5a4798128c36 169 // Following fields are public so mmqtt callback functions can access directly
citrusbyte 0:5a4798128c36 170 Client* _client;
citrusbyte 0:5a4798128c36 171 private:
citrusbyte 0:5a4798128c36 172 struct mmqtt_connection _connection;
citrusbyte 0:5a4798128c36 173 const char* _key;
citrusbyte 0:5a4798128c36 174 uint16_t _key_length;
citrusbyte 0:5a4798128c36 175 bool _connected;
citrusbyte 0:5a4798128c36 176 bool _keepalive;
citrusbyte 0:5a4798128c36 177 const char* _host;
citrusbyte 0:5a4798128c36 178 int _port;
citrusbyte 0:5a4798128c36 179 void (* _idlefunc)(void);
citrusbyte 0:5a4798128c36 180 const char* _path_prefix;
citrusbyte 0:5a4798128c36 181 NullPrint _null_print;
citrusbyte 0:5a4798128c36 182 MMQTTPrint _mmqtt_print;
citrusbyte 0:5a4798128c36 183 int16_t _current_id;
citrusbyte 0:5a4798128c36 184
citrusbyte 0:5a4798128c36 185 int connectToServer();
citrusbyte 0:5a4798128c36 186
citrusbyte 0:5a4798128c36 187 template <class T>
citrusbyte 0:5a4798128c36 188 int printUpdateStreamValuePayload(Print* print, const char* deviceId,
citrusbyte 0:5a4798128c36 189 const char* streamName, T value);
citrusbyte 0:5a4798128c36 190
citrusbyte 0:5a4798128c36 191 template <class T>
citrusbyte 0:5a4798128c36 192 int printPostDeviceUpdatesPayload(Print* print,
citrusbyte 0:5a4798128c36 193 const char* deviceId, int streamNum,
citrusbyte 0:5a4798128c36 194 const char* names[], const int counts[],
citrusbyte 0:5a4798128c36 195 const char* ats[], T values[]);
citrusbyte 0:5a4798128c36 196
citrusbyte 0:5a4798128c36 197 template <class T>
citrusbyte 0:5a4798128c36 198 int printPostDeviceUpdatePayload(Print* print,
citrusbyte 0:5a4798128c36 199 const char* deviceId, int streamNum,
citrusbyte 0:5a4798128c36 200 const char* names[], T values[],
citrusbyte 0:5a4798128c36 201 const char* at = NULL);
citrusbyte 0:5a4798128c36 202
citrusbyte 0:5a4798128c36 203 template <class T>
citrusbyte 0:5a4798128c36 204 int printUpdateLocationPayload(Print* print,
citrusbyte 0:5a4798128c36 205 const char* deviceId, const char* name,
citrusbyte 0:5a4798128c36 206 T latitude, T longitude, T elevation);
citrusbyte 0:5a4798128c36 207
citrusbyte 0:5a4798128c36 208 int readStatusCode();
citrusbyte 0:5a4798128c36 209 void close();
citrusbyte 0:5a4798128c36 210 };
citrusbyte 0:5a4798128c36 211
citrusbyte 0:5a4798128c36 212 // Implementations
citrusbyte 0:5a4798128c36 213 M2XMQTTClient::M2XMQTTClient(Client* client,
citrusbyte 0:5a4798128c36 214 const char* key,
citrusbyte 0:5a4798128c36 215 void (* idlefunc)(void),
citrusbyte 0:5a4798128c36 216 bool keepalive,
citrusbyte 0:5a4798128c36 217 const char* host,
citrusbyte 0:5a4798128c36 218 int port,
citrusbyte 0:5a4798128c36 219 const char* path_prefix) : _client(client),
citrusbyte 0:5a4798128c36 220 _key(key),
citrusbyte 0:5a4798128c36 221 _idlefunc(idlefunc),
citrusbyte 0:5a4798128c36 222 _keepalive(keepalive),
citrusbyte 0:5a4798128c36 223 _connected(false),
citrusbyte 0:5a4798128c36 224 _host(host),
citrusbyte 0:5a4798128c36 225 _port(port),
citrusbyte 0:5a4798128c36 226 _path_prefix(path_prefix),
citrusbyte 0:5a4798128c36 227 _null_print(),
citrusbyte 0:5a4798128c36 228 _mmqtt_print(),
citrusbyte 0:5a4798128c36 229 _current_id(0) {
citrusbyte 0:5a4798128c36 230 _key_length = strlen(_key);
citrusbyte 0:5a4798128c36 231 }
citrusbyte 0:5a4798128c36 232
citrusbyte 0:5a4798128c36 233 mmqtt_status_t m2x_mmqtt_puller(struct mmqtt_connection *connection) {
citrusbyte 0:5a4798128c36 234 const uint8_t *data = NULL;
citrusbyte 0:5a4798128c36 235 mmqtt_ssize_t length = 0;
citrusbyte 0:5a4798128c36 236 mmqtt_status_t status;
citrusbyte 0:5a4798128c36 237 M2XMQTTClient *client = (M2XMQTTClient *) connection->connection;
citrusbyte 0:5a4798128c36 238 Client *c = client->_client;
citrusbyte 0:5a4798128c36 239 struct mmqtt_stream *stream = mmqtt_connection_pullable_stream(connection);
citrusbyte 0:5a4798128c36 240 if (stream == NULL) { return MMQTT_STATUS_NOT_PULLABLE; }
citrusbyte 0:5a4798128c36 241
citrusbyte 0:5a4798128c36 242 status = mmqtt_stream_external_pullable(stream, &data, &length);
citrusbyte 0:5a4798128c36 243 if (status != MMQTT_STATUS_OK) { return status; }
citrusbyte 0:5a4798128c36 244
citrusbyte 0:5a4798128c36 245 length = c->write(data, length);
citrusbyte 0:5a4798128c36 246 if (length < 0) {
citrusbyte 0:5a4798128c36 247 c->stop();
citrusbyte 0:5a4798128c36 248 return MMQTT_STATUS_BROKEN_CONNECTION;
citrusbyte 0:5a4798128c36 249 }
citrusbyte 0:5a4798128c36 250 mmqtt_stream_external_pull(stream, length);
citrusbyte 0:5a4798128c36 251 if (mmqtt_stream_running(stream) == MMQTT_STATUS_DONE) {
citrusbyte 0:5a4798128c36 252 mmqtt_connection_release_write_stream(connection, stream);
citrusbyte 0:5a4798128c36 253 }
citrusbyte 0:5a4798128c36 254 return MMQTT_STATUS_OK;
citrusbyte 0:5a4798128c36 255 }
citrusbyte 0:5a4798128c36 256
citrusbyte 0:5a4798128c36 257 mmqtt_status_t m2x_mmqtt_pusher(struct mmqtt_connection *connection, mmqtt_ssize_t max_size) {
citrusbyte 0:5a4798128c36 258 uint8_t *data = NULL;
citrusbyte 0:5a4798128c36 259 mmqtt_ssize_t length = 0, i = 0;
citrusbyte 0:5a4798128c36 260 mmqtt_status_t status;
citrusbyte 0:5a4798128c36 261 M2XMQTTClient *client = (M2XMQTTClient *) connection->connection;
citrusbyte 0:5a4798128c36 262 Client *c = client->_client;
citrusbyte 0:5a4798128c36 263 struct mmqtt_stream *stream = mmqtt_connection_pushable_stream(connection);
citrusbyte 0:5a4798128c36 264 if (stream == NULL) { return MMQTT_STATUS_NOT_PUSHABLE; }
citrusbyte 0:5a4798128c36 265
citrusbyte 0:5a4798128c36 266 status = mmqtt_stream_external_pushable(stream, &data, &length);
citrusbyte 0:5a4798128c36 267 if (status != MMQTT_STATUS_OK) { return status; }
citrusbyte 0:5a4798128c36 268 length = min(length, max_size);
citrusbyte 0:5a4798128c36 269
citrusbyte 0:5a4798128c36 270 /* Maybe we need another field in signature documenting how much data we want,
citrusbyte 0:5a4798128c36 271 * so we can handle end condition gracefully? Not 100% if `left` field in
citrusbyte 0:5a4798128c36 272 * mmqtt_stream is enough
citrusbyte 0:5a4798128c36 273 */
citrusbyte 0:5a4798128c36 274 while (i < length && c->available()) {
citrusbyte 0:5a4798128c36 275 data[i++] = c->read();
citrusbyte 0:5a4798128c36 276 }
citrusbyte 0:5a4798128c36 277 mmqtt_stream_external_push(stream, i);
citrusbyte 0:5a4798128c36 278 return MMQTT_STATUS_OK;
citrusbyte 0:5a4798128c36 279 }
citrusbyte 0:5a4798128c36 280
citrusbyte 0:5a4798128c36 281 size_t m2x_mjson_reader(struct mjson_ctx *ctx, char *data, size_t limit)
citrusbyte 0:5a4798128c36 282 {
citrusbyte 0:5a4798128c36 283 mmqtt_connection *connection = (mmqtt_connection *) ctx->userdata;
citrusbyte 0:5a4798128c36 284 return mmqtt_s_decode_buffer(connection, m2x_mmqtt_pusher, (uint8_t *) data, limit,
citrusbyte 0:5a4798128c36 285 limit) == MMQTT_STATUS_OK ? limit : 0;
citrusbyte 0:5a4798128c36 286 }
citrusbyte 0:5a4798128c36 287
citrusbyte 0:5a4798128c36 288 int M2XMQTTClient::connectToServer() {
citrusbyte 0:5a4798128c36 289 mmqtt_status_t status;
citrusbyte 0:5a4798128c36 290 struct mmqtt_p_connect_header connect_header;
citrusbyte 0:5a4798128c36 291 struct mmqtt_p_connack_header connack_header;
citrusbyte 0:5a4798128c36 292 uint32_t packet_length;
citrusbyte 0:5a4798128c36 293 uint8_t flag;
citrusbyte 0:5a4798128c36 294 uint16_t length;
citrusbyte 0:5a4798128c36 295 uint8_t name[6];
citrusbyte 0:5a4798128c36 296
citrusbyte 0:5a4798128c36 297 if (_client->connect(_host, _port)) {
citrusbyte 0:5a4798128c36 298 DBGLN("%s", F("Connected to M2X MQTT server!"));
citrusbyte 0:5a4798128c36 299 mmqtt_connection_init(&_connection, this);
citrusbyte 0:5a4798128c36 300 /* Send CONNECT packet first */
citrusbyte 0:5a4798128c36 301 connect_header.name = name;
citrusbyte 0:5a4798128c36 302 strncpy((char *)name, F("MQIsdp"), 6);
citrusbyte 0:5a4798128c36 303 connect_header.name_length = connect_header.name_max_length = 6;
citrusbyte 0:5a4798128c36 304 connect_header.protocol_version = 3;
citrusbyte 0:5a4798128c36 305 /* Clean session with username set */
citrusbyte 0:5a4798128c36 306 connect_header.flags = 0x82;
citrusbyte 0:5a4798128c36 307 connect_header.keepalive = 60;
citrusbyte 0:5a4798128c36 308 packet_length = mmqtt_s_connect_header_encoded_length(&connect_header) +
citrusbyte 0:5a4798128c36 309 mmqtt_s_string_encoded_length(_key_length);
citrusbyte 0:5a4798128c36 310 status = mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller,
citrusbyte 0:5a4798128c36 311 MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_CONNECT),
citrusbyte 0:5a4798128c36 312 packet_length);
citrusbyte 0:5a4798128c36 313 if (status != MMQTT_STATUS_OK) {
citrusbyte 0:5a4798128c36 314 DBG("%s", F("Error sending connect packet fixed header: "));
citrusbyte 0:5a4798128c36 315 DBGLN("%d", status);
citrusbyte 0:5a4798128c36 316 _client->stop();
citrusbyte 0:5a4798128c36 317 return E_DISCONNECTED;
citrusbyte 0:5a4798128c36 318 }
citrusbyte 0:5a4798128c36 319 status = mmqtt_s_encode_connect_header(&_connection, m2x_mmqtt_puller, &connect_header);
citrusbyte 0:5a4798128c36 320 if (status != MMQTT_STATUS_OK) {
citrusbyte 0:5a4798128c36 321 DBG("%s", F("Error sending connect packet variable header: "));
citrusbyte 0:5a4798128c36 322 DBGLN("%d", status);
citrusbyte 0:5a4798128c36 323 _client->stop();
citrusbyte 0:5a4798128c36 324 return E_DISCONNECTED;
citrusbyte 0:5a4798128c36 325 }
citrusbyte 0:5a4798128c36 326 status = mmqtt_s_encode_string(&_connection, m2x_mmqtt_puller,
citrusbyte 0:5a4798128c36 327 (const uint8_t *) _key, _key_length);
citrusbyte 0:5a4798128c36 328 if (status != MMQTT_STATUS_OK) {
citrusbyte 0:5a4798128c36 329 DBG("%s", F("Error sending connect packet payload: "));
citrusbyte 0:5a4798128c36 330 DBGLN("%d", status);
citrusbyte 0:5a4798128c36 331 _client->stop();
citrusbyte 0:5a4798128c36 332 return E_DISCONNECTED;
citrusbyte 0:5a4798128c36 333 }
citrusbyte 0:5a4798128c36 334 /* Check CONNACK packet */
citrusbyte 0:5a4798128c36 335 do {
citrusbyte 0:5a4798128c36 336 status = mmqtt_s_decode_fixed_header(&_connection, m2x_mmqtt_pusher,
citrusbyte 0:5a4798128c36 337 &flag, &packet_length);
citrusbyte 0:5a4798128c36 338 if (status != MMQTT_STATUS_OK) {
citrusbyte 0:5a4798128c36 339 DBG("%s", F("Error decoding connack fixed header: "));
citrusbyte 0:5a4798128c36 340 DBGLN("%d", status);
citrusbyte 0:5a4798128c36 341 _client->stop();
citrusbyte 0:5a4798128c36 342 return E_DISCONNECTED;
citrusbyte 0:5a4798128c36 343 }
citrusbyte 0:5a4798128c36 344 if (MMQTT_UNPACK_MESSAGE_TYPE(flag) != MMQTT_MESSAGE_TYPE_CONNACK) {
citrusbyte 0:5a4798128c36 345 status = mmqtt_s_skip_buffer(&_connection, m2x_mmqtt_pusher, packet_length);
citrusbyte 0:5a4798128c36 346 if (status != MMQTT_STATUS_OK) {
citrusbyte 0:5a4798128c36 347 DBG("%s", F("Error skipping non-connack packet: "));
citrusbyte 0:5a4798128c36 348 DBGLN("%d", status);
citrusbyte 0:5a4798128c36 349 _client->stop();
citrusbyte 0:5a4798128c36 350 return E_DISCONNECTED;
citrusbyte 0:5a4798128c36 351 }
citrusbyte 0:5a4798128c36 352 }
citrusbyte 0:5a4798128c36 353 } while (MMQTT_UNPACK_MESSAGE_TYPE(flag) != MMQTT_MESSAGE_TYPE_CONNACK);
citrusbyte 0:5a4798128c36 354 status = mmqtt_s_decode_connack_header(&_connection, m2x_mmqtt_pusher,
citrusbyte 0:5a4798128c36 355 &connack_header);
citrusbyte 0:5a4798128c36 356 if (status != MMQTT_STATUS_OK) {
citrusbyte 0:5a4798128c36 357 DBG("%s", F("Error decoding connack variable header: "));
citrusbyte 0:5a4798128c36 358 DBGLN("%d", status);
citrusbyte 0:5a4798128c36 359 _client->stop();
citrusbyte 0:5a4798128c36 360 return E_DISCONNECTED;
citrusbyte 0:5a4798128c36 361 }
citrusbyte 0:5a4798128c36 362 if (connack_header.return_code != 0x0) {
citrusbyte 0:5a4798128c36 363 DBG("%s", F("CONNACK return code is not accepted: "));
citrusbyte 0:5a4798128c36 364 DBGLN("%d", connack_header.return_code);
citrusbyte 0:5a4798128c36 365 _client->stop();
citrusbyte 0:5a4798128c36 366 return E_DISCONNECTED;
citrusbyte 0:5a4798128c36 367 }
citrusbyte 0:5a4798128c36 368 /* Send SUBSCRIBE packet*/
citrusbyte 0:5a4798128c36 369 length = _key_length + 15 + 4;
citrusbyte 0:5a4798128c36 370 status = mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller,
citrusbyte 0:5a4798128c36 371 MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_SUBSCRIBE) | 0x2,
citrusbyte 0:5a4798128c36 372 length);
citrusbyte 0:5a4798128c36 373 if (status != MMQTT_STATUS_OK) {
citrusbyte 0:5a4798128c36 374 DBG("%s", F("Error sending subscribe packet fixed header: "));
citrusbyte 0:5a4798128c36 375 DBGLN("%d", status);
citrusbyte 0:5a4798128c36 376 _client->stop();
citrusbyte 0:5a4798128c36 377 return E_DISCONNECTED;
citrusbyte 0:5a4798128c36 378 }
citrusbyte 0:5a4798128c36 379 // Subscribe packet must use QoS 1
citrusbyte 0:5a4798128c36 380 mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, 0);
citrusbyte 0:5a4798128c36 381 mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 14);
citrusbyte 0:5a4798128c36 382 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4);
citrusbyte 0:5a4798128c36 383 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length);
citrusbyte 0:5a4798128c36 384 // The extra one is QoS, added here to save a function call
citrusbyte 0:5a4798128c36 385 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/responses\0"), 11);
citrusbyte 0:5a4798128c36 386 /* Check SUBACK packet */
citrusbyte 0:5a4798128c36 387 do {
citrusbyte 0:5a4798128c36 388 status = mmqtt_s_decode_fixed_header(&_connection, m2x_mmqtt_pusher,
citrusbyte 0:5a4798128c36 389 &flag, &packet_length);
citrusbyte 0:5a4798128c36 390 if (status != MMQTT_STATUS_OK) {
citrusbyte 0:5a4798128c36 391 DBG("%s", F("Error decoding suback fixed header: "));
citrusbyte 0:5a4798128c36 392 DBGLN("%d", status);
citrusbyte 0:5a4798128c36 393 _client->stop();
citrusbyte 0:5a4798128c36 394 return E_DISCONNECTED;
citrusbyte 0:5a4798128c36 395 }
citrusbyte 0:5a4798128c36 396 if (MMQTT_UNPACK_MESSAGE_TYPE(flag) != MMQTT_MESSAGE_TYPE_SUBACK) {
citrusbyte 0:5a4798128c36 397 status = mmqtt_s_skip_buffer(&_connection, m2x_mmqtt_pusher, packet_length);
citrusbyte 0:5a4798128c36 398 if (status != MMQTT_STATUS_OK) {
citrusbyte 0:5a4798128c36 399 DBG("%s", F("Error skipping packet: "));
citrusbyte 0:5a4798128c36 400 DBGLN("%d", status);
citrusbyte 0:5a4798128c36 401 _client->stop();
citrusbyte 0:5a4798128c36 402 return E_DISCONNECTED;
citrusbyte 0:5a4798128c36 403 }
citrusbyte 0:5a4798128c36 404 }
citrusbyte 0:5a4798128c36 405 } while (MMQTT_UNPACK_MESSAGE_TYPE(flag) != MMQTT_MESSAGE_TYPE_SUBACK);
citrusbyte 0:5a4798128c36 406 status = mmqtt_s_skip_buffer(&_connection, m2x_mmqtt_pusher, packet_length);
citrusbyte 0:5a4798128c36 407 if (status != MMQTT_STATUS_OK) {
citrusbyte 0:5a4798128c36 408 DBG("%s", F("Error skipping suback packet: "));
citrusbyte 0:5a4798128c36 409 DBGLN("%d", status);
citrusbyte 0:5a4798128c36 410 _client->stop();
citrusbyte 0:5a4798128c36 411 return E_DISCONNECTED;
citrusbyte 0:5a4798128c36 412 }
citrusbyte 0:5a4798128c36 413 _mmqtt_print.connection = &_connection;
citrusbyte 0:5a4798128c36 414 _mmqtt_print.puller = m2x_mmqtt_puller;
citrusbyte 0:5a4798128c36 415 _connected = true;
citrusbyte 0:5a4798128c36 416 return E_OK;
citrusbyte 0:5a4798128c36 417 } else {
citrusbyte 0:5a4798128c36 418 DBGLN("%s", F("ERROR: Cannot connect to M2X MQTT server!"));
citrusbyte 0:5a4798128c36 419 return E_NOCONNECTION;
citrusbyte 0:5a4798128c36 420 }
citrusbyte 0:5a4798128c36 421 }
citrusbyte 0:5a4798128c36 422
citrusbyte 0:5a4798128c36 423 template <class T>
citrusbyte 0:5a4798128c36 424 int M2XMQTTClient::updateStreamValue(const char* deviceId, const char* streamName, T value) {
citrusbyte 0:5a4798128c36 425 int length;
citrusbyte 0:5a4798128c36 426 if (!_connected) {
citrusbyte 0:5a4798128c36 427 if (connectToServer() != E_OK) {
citrusbyte 0:5a4798128c36 428 DBGLN("%s", "ERROR: Cannot connect to M2X server!");
citrusbyte 0:5a4798128c36 429 return E_NOCONNECTION;
citrusbyte 0:5a4798128c36 430 }
citrusbyte 0:5a4798128c36 431 }
citrusbyte 0:5a4798128c36 432 _current_id++;
citrusbyte 0:5a4798128c36 433 length = printUpdateStreamValuePayload(&_null_print, deviceId, streamName, value);
citrusbyte 0:5a4798128c36 434 mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller,
citrusbyte 0:5a4798128c36 435 MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_PUBLISH),
citrusbyte 0:5a4798128c36 436 length + _key_length + 15);
citrusbyte 0:5a4798128c36 437 mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 13);
citrusbyte 0:5a4798128c36 438 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4);
citrusbyte 0:5a4798128c36 439 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length);
citrusbyte 0:5a4798128c36 440 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/requests"), 9);
citrusbyte 0:5a4798128c36 441 printUpdateStreamValuePayload(&_mmqtt_print, deviceId, streamName, value);
citrusbyte 0:5a4798128c36 442 return readStatusCode();
citrusbyte 0:5a4798128c36 443 }
citrusbyte 0:5a4798128c36 444
citrusbyte 0:5a4798128c36 445 template <class T>
citrusbyte 0:5a4798128c36 446 int M2XMQTTClient::printUpdateStreamValuePayload(Print* print, const char* deviceId,
citrusbyte 0:5a4798128c36 447 const char* streamName, T value) {
citrusbyte 0:5a4798128c36 448 int bytes = 0;
citrusbyte 0:5a4798128c36 449 bytes += print->print(F("{\"id\":\""));
citrusbyte 0:5a4798128c36 450 bytes += print->print(_current_id);
citrusbyte 0:5a4798128c36 451 bytes += print->print(F("\",\"method\":\"PUT\",\"resource\":\""));
citrusbyte 0:5a4798128c36 452 if (_path_prefix) { bytes += print->print(_path_prefix); }
citrusbyte 0:5a4798128c36 453 bytes += print->print(F("/v2/devices/"));
citrusbyte 0:5a4798128c36 454 bytes += print->print(deviceId);
citrusbyte 0:5a4798128c36 455 bytes += print->print(F("/streams/"));
citrusbyte 0:5a4798128c36 456 bytes += print->print(streamName);
citrusbyte 0:5a4798128c36 457 bytes += print->print(F("/value"));
citrusbyte 0:5a4798128c36 458 bytes += print->print(F("\",\"agent\":\""));
citrusbyte 0:5a4798128c36 459 bytes += print->print(USER_AGENT);
citrusbyte 0:5a4798128c36 460 bytes += print->print(F("\",\"body\":"));
citrusbyte 0:5a4798128c36 461 bytes += print->print(F("{\"value\":\""));
citrusbyte 0:5a4798128c36 462 bytes += print->print(value);
citrusbyte 0:5a4798128c36 463 bytes += print->print(F("\"}"));
citrusbyte 0:5a4798128c36 464 bytes += print->print(F("}"));
citrusbyte 0:5a4798128c36 465 }
citrusbyte 0:5a4798128c36 466
citrusbyte 0:5a4798128c36 467 template <class T>
citrusbyte 0:5a4798128c36 468 int M2XMQTTClient::postDeviceUpdates(const char* deviceId, int streamNum,
citrusbyte 0:5a4798128c36 469 const char* names[], const int counts[],
citrusbyte 0:5a4798128c36 470 const char* ats[], T values[]) {
citrusbyte 0:5a4798128c36 471 int length;
citrusbyte 0:5a4798128c36 472 if (!_connected) {
citrusbyte 0:5a4798128c36 473 if (connectToServer() != E_OK) {
citrusbyte 0:5a4798128c36 474 DBGLN("%s", "ERROR: Cannot connect to M2X server!");
citrusbyte 0:5a4798128c36 475 return E_NOCONNECTION;
citrusbyte 0:5a4798128c36 476 }
citrusbyte 0:5a4798128c36 477 }
citrusbyte 0:5a4798128c36 478 _current_id++;
citrusbyte 0:5a4798128c36 479 length = printPostDeviceUpdatesPayload(&_null_print, deviceId, streamNum,
citrusbyte 0:5a4798128c36 480 names, counts, ats, values);
citrusbyte 0:5a4798128c36 481 mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller,
citrusbyte 0:5a4798128c36 482 MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_PUBLISH),
citrusbyte 0:5a4798128c36 483 length + _key_length + 15);
citrusbyte 0:5a4798128c36 484 mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 13);
citrusbyte 0:5a4798128c36 485 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4);
citrusbyte 0:5a4798128c36 486 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length);
citrusbyte 0:5a4798128c36 487 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/requests"), 9);
citrusbyte 0:5a4798128c36 488 printPostDeviceUpdatesPayload(&_mmqtt_print, deviceId, streamNum,
citrusbyte 0:5a4798128c36 489 names, counts, ats, values);
citrusbyte 0:5a4798128c36 490 return readStatusCode();
citrusbyte 0:5a4798128c36 491 }
citrusbyte 0:5a4798128c36 492
citrusbyte 0:5a4798128c36 493 template <class T>
citrusbyte 0:5a4798128c36 494 int M2XMQTTClient::printPostDeviceUpdatesPayload(Print* print,
citrusbyte 0:5a4798128c36 495 const char* deviceId, int streamNum,
citrusbyte 0:5a4798128c36 496 const char* names[], const int counts[],
citrusbyte 0:5a4798128c36 497 const char* ats[], T values[]) {
citrusbyte 0:5a4798128c36 498 int bytes = 0, value_index = 0, i, j;
citrusbyte 0:5a4798128c36 499 bytes += print->print(F("{\"id\":\""));
citrusbyte 0:5a4798128c36 500 bytes += print->print(_current_id);
citrusbyte 0:5a4798128c36 501 bytes += print->print(F("\",\"method\":\"POST\",\"resource\":\""));
citrusbyte 0:5a4798128c36 502 if (_path_prefix) { bytes += print->print(_path_prefix); }
citrusbyte 0:5a4798128c36 503 bytes += print->print(F("/v2/devices/"));
citrusbyte 0:5a4798128c36 504 bytes += print->print(deviceId);
citrusbyte 0:5a4798128c36 505 bytes += print->print(F("/updates"));
citrusbyte 0:5a4798128c36 506 bytes += print->print(F("\",\"agent\":\""));
citrusbyte 0:5a4798128c36 507 bytes += print->print(USER_AGENT);
citrusbyte 0:5a4798128c36 508 bytes += print->print(F("\",\"body\":"));
citrusbyte 0:5a4798128c36 509 bytes += print->print(F("{\"values\":{"));
citrusbyte 0:5a4798128c36 510 for (i = 0; i < streamNum; i++) {
citrusbyte 0:5a4798128c36 511 bytes += print->print(F("\""));
citrusbyte 0:5a4798128c36 512 bytes += print->print(names[i]);
citrusbyte 0:5a4798128c36 513 bytes += print->print(F("\":["));
citrusbyte 0:5a4798128c36 514 for (j = 0; j < counts[i]; j++) {
citrusbyte 0:5a4798128c36 515 bytes += print->print(F("{\"timestamp\": \""));
citrusbyte 0:5a4798128c36 516 bytes += print->print(ats[value_index]);
citrusbyte 0:5a4798128c36 517 bytes += print->print(F("\",\"value\": \""));
citrusbyte 0:5a4798128c36 518 bytes += print->print(values[value_index]);
citrusbyte 0:5a4798128c36 519 bytes += print->print(F("\"}"));
citrusbyte 0:5a4798128c36 520 if (j < counts[i] - 1) { bytes += print->print(F(",")); }
citrusbyte 0:5a4798128c36 521 value_index++;
citrusbyte 0:5a4798128c36 522 }
citrusbyte 0:5a4798128c36 523 bytes += print->print(F("]"));
citrusbyte 0:5a4798128c36 524 if (i < streamNum - 1) { bytes += print->print(F(",")); }
citrusbyte 0:5a4798128c36 525 }
citrusbyte 0:5a4798128c36 526 bytes += print->print(F(("}}}")));
citrusbyte 0:5a4798128c36 527 return bytes;
citrusbyte 0:5a4798128c36 528 }
citrusbyte 0:5a4798128c36 529
citrusbyte 0:5a4798128c36 530 template <class T>
citrusbyte 0:5a4798128c36 531 int M2XMQTTClient::postDeviceUpdate(const char* deviceId, int streamNum,
citrusbyte 0:5a4798128c36 532 const char* names[], T values[],
citrusbyte 0:5a4798128c36 533 const char* at) {
citrusbyte 0:5a4798128c36 534 int length;
citrusbyte 0:5a4798128c36 535 if (!_connected) {
citrusbyte 0:5a4798128c36 536 if (connectToServer() != E_OK) {
citrusbyte 0:5a4798128c36 537 DBGLN("%s", "ERROR: Cannot connect to M2X server!");
citrusbyte 0:5a4798128c36 538 return E_NOCONNECTION;
citrusbyte 0:5a4798128c36 539 }
citrusbyte 0:5a4798128c36 540 }
citrusbyte 0:5a4798128c36 541 _current_id++;
citrusbyte 0:5a4798128c36 542 length = printPostDeviceUpdatePayload(&_null_print, deviceId, streamNum,
citrusbyte 0:5a4798128c36 543 names, values, at);
citrusbyte 0:5a4798128c36 544 mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller,
citrusbyte 0:5a4798128c36 545 MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_PUBLISH),
citrusbyte 0:5a4798128c36 546 length + _key_length + 15);
citrusbyte 0:5a4798128c36 547 mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 13);
citrusbyte 0:5a4798128c36 548 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4);
citrusbyte 0:5a4798128c36 549 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length);
citrusbyte 0:5a4798128c36 550 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/requests"), 9);
citrusbyte 0:5a4798128c36 551 printPostDeviceUpdatePayload(&_mmqtt_print, deviceId, streamNum,
citrusbyte 0:5a4798128c36 552 names, values, at);
citrusbyte 0:5a4798128c36 553 return readStatusCode();
citrusbyte 0:5a4798128c36 554 }
citrusbyte 0:5a4798128c36 555
citrusbyte 0:5a4798128c36 556 template <class T>
citrusbyte 0:5a4798128c36 557 int M2XMQTTClient::printPostDeviceUpdatePayload(Print* print,
citrusbyte 0:5a4798128c36 558 const char* deviceId, int streamNum,
citrusbyte 0:5a4798128c36 559 const char* names[], T values[],
citrusbyte 0:5a4798128c36 560 const char* at) {
citrusbyte 0:5a4798128c36 561 int bytes = 0, i;
citrusbyte 0:5a4798128c36 562 bytes += print->print(F("{\"id\":\""));
citrusbyte 0:5a4798128c36 563 bytes += print->print(_current_id);
citrusbyte 0:5a4798128c36 564 bytes += print->print(F("\",\"method\":\"POST\",\"resource\":\""));
citrusbyte 0:5a4798128c36 565 if (_path_prefix) { bytes += print->print(_path_prefix); }
citrusbyte 0:5a4798128c36 566 bytes += print->print(F("/v2/devices/"));
citrusbyte 0:5a4798128c36 567 bytes += print->print(deviceId);
citrusbyte 0:5a4798128c36 568 bytes += print->print(F("/update"));
citrusbyte 0:5a4798128c36 569 bytes += print->print(F("\",\"agent\":\""));
citrusbyte 0:5a4798128c36 570 bytes += print->print(USER_AGENT);
citrusbyte 0:5a4798128c36 571 bytes += print->print(F("\",\"body\":"));
citrusbyte 0:5a4798128c36 572 bytes += print->print(F("{\"values\":{"));
citrusbyte 0:5a4798128c36 573 for (int i = 0; i < streamNum; i++) {
citrusbyte 0:5a4798128c36 574 bytes += print->print(F("\""));
citrusbyte 0:5a4798128c36 575 bytes += print->print(names[i]);
citrusbyte 0:5a4798128c36 576 bytes += print->print(F("\": \""));
citrusbyte 0:5a4798128c36 577 bytes += print->print(values[i]);
citrusbyte 0:5a4798128c36 578 bytes += print->print(F("\""));
citrusbyte 0:5a4798128c36 579 if (i < streamNum - 1) { bytes += print->print(F(",")); }
citrusbyte 0:5a4798128c36 580 }
citrusbyte 0:5a4798128c36 581 bytes += print->print(F("}"));
citrusbyte 0:5a4798128c36 582 if (at != NULL) {
citrusbyte 0:5a4798128c36 583 bytes += print->print(F(",\"timestamp\":\""));
citrusbyte 0:5a4798128c36 584 bytes += print->print(at);
citrusbyte 0:5a4798128c36 585 bytes += print->print(F("\""));
citrusbyte 0:5a4798128c36 586 }
citrusbyte 0:5a4798128c36 587 bytes += print->print(F(("}")));
citrusbyte 0:5a4798128c36 588 return bytes;
citrusbyte 0:5a4798128c36 589 }
citrusbyte 0:5a4798128c36 590
citrusbyte 0:5a4798128c36 591 template <class T>
citrusbyte 0:5a4798128c36 592 int M2XMQTTClient::updateLocation(const char* deviceId, const char* name,
citrusbyte 0:5a4798128c36 593 T latitude, T longitude, T elevation) {
citrusbyte 0:5a4798128c36 594 int length;
citrusbyte 0:5a4798128c36 595 if (!_connected) {
citrusbyte 0:5a4798128c36 596 if (connectToServer() != E_OK) {
citrusbyte 0:5a4798128c36 597 DBGLN("%s", "ERROR: Cannot connect to M2X server!");
citrusbyte 0:5a4798128c36 598 return E_NOCONNECTION;
citrusbyte 0:5a4798128c36 599 }
citrusbyte 0:5a4798128c36 600 }
citrusbyte 0:5a4798128c36 601 _current_id++;
citrusbyte 0:5a4798128c36 602 length = printUpdateLocationPayload(&_null_print, deviceId, name,
citrusbyte 0:5a4798128c36 603 latitude, longitude, elevation);
citrusbyte 0:5a4798128c36 604 mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller,
citrusbyte 0:5a4798128c36 605 MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_PUBLISH),
citrusbyte 0:5a4798128c36 606 length + _key_length + 15);
citrusbyte 0:5a4798128c36 607 mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 13);
citrusbyte 0:5a4798128c36 608 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4);
citrusbyte 0:5a4798128c36 609 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length);
citrusbyte 0:5a4798128c36 610 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/requests"), 9);
citrusbyte 0:5a4798128c36 611 printUpdateLocationPayload(&_mmqtt_print, deviceId, name,
citrusbyte 0:5a4798128c36 612 latitude, longitude, elevation);
citrusbyte 0:5a4798128c36 613 return readStatusCode();
citrusbyte 0:5a4798128c36 614 }
citrusbyte 0:5a4798128c36 615
citrusbyte 0:5a4798128c36 616 template <class T>
citrusbyte 0:5a4798128c36 617 int M2XMQTTClient::printUpdateLocationPayload(Print* print,
citrusbyte 0:5a4798128c36 618 const char* deviceId, const char* name,
citrusbyte 0:5a4798128c36 619 T latitude, T longitude, T elevation) {
citrusbyte 0:5a4798128c36 620 int bytes = 0;
citrusbyte 0:5a4798128c36 621 bytes += print->print(F("{\"id\":\""));
citrusbyte 0:5a4798128c36 622 bytes += print->print(_current_id);
citrusbyte 0:5a4798128c36 623 bytes += print->print(F("\",\"method\":\"PUT\",\"resource\":\""));
citrusbyte 0:5a4798128c36 624 if (_path_prefix) { bytes += print->print(_path_prefix); }
citrusbyte 0:5a4798128c36 625 bytes += print->print(F("/v2/devices/"));
citrusbyte 0:5a4798128c36 626 bytes += print->print(deviceId);
citrusbyte 0:5a4798128c36 627 bytes += print->print(F("/location"));
citrusbyte 0:5a4798128c36 628 bytes += print->print(F("\",\"agent\":\""));
citrusbyte 0:5a4798128c36 629 bytes += print->print(USER_AGENT);
citrusbyte 0:5a4798128c36 630 bytes += print->print(F("\",\"body\":{\"name\":\""));
citrusbyte 0:5a4798128c36 631 bytes += print->print(name);
citrusbyte 0:5a4798128c36 632 bytes += print->print(F("\",\"latitude\":\""));
citrusbyte 0:5a4798128c36 633 bytes += print->print(latitude);
citrusbyte 0:5a4798128c36 634 bytes += print->print(F("\",\"longitude\":\""));
citrusbyte 0:5a4798128c36 635 bytes += print->print(longitude);
citrusbyte 0:5a4798128c36 636 bytes += print->print(F("\",\"elevation\":\""));
citrusbyte 0:5a4798128c36 637 bytes += print->print(elevation);
citrusbyte 0:5a4798128c36 638 bytes += print->print(F(("\"}}")));
citrusbyte 0:5a4798128c36 639 return bytes;
citrusbyte 0:5a4798128c36 640 }
citrusbyte 0:5a4798128c36 641
citrusbyte 0:5a4798128c36 642 int M2XMQTTClient::readStatusCode() {
citrusbyte 0:5a4798128c36 643 mmqtt_status_t status;
citrusbyte 0:5a4798128c36 644 uint32_t packet_length;
citrusbyte 0:5a4798128c36 645 uint8_t flag;
citrusbyte 0:5a4798128c36 646 int16_t parsed_id, response_status;
citrusbyte 0:5a4798128c36 647 struct mjson_ctx ctx;
citrusbyte 0:5a4798128c36 648 char buf[6];
citrusbyte 0:5a4798128c36 649 size_t buf_length;
citrusbyte 0:5a4798128c36 650
citrusbyte 0:5a4798128c36 651 while (true) {
citrusbyte 0:5a4798128c36 652 status = mmqtt_s_decode_fixed_header(&_connection, m2x_mmqtt_pusher,
citrusbyte 0:5a4798128c36 653 &flag, &packet_length);
citrusbyte 0:5a4798128c36 654 if (status != MMQTT_STATUS_OK) {
citrusbyte 0:5a4798128c36 655 DBG("%s", F("Error decoding publish fixed header: "));
citrusbyte 0:5a4798128c36 656 DBGLN("%d", status);
citrusbyte 0:5a4798128c36 657 close();
citrusbyte 0:5a4798128c36 658 return E_DISCONNECTED;
citrusbyte 0:5a4798128c36 659 }
citrusbyte 0:5a4798128c36 660 if (MMQTT_UNPACK_MESSAGE_TYPE(flag) != MMQTT_MESSAGE_TYPE_PUBLISH) {
citrusbyte 0:5a4798128c36 661 status = mmqtt_s_skip_buffer(&_connection, m2x_mmqtt_pusher, packet_length);
citrusbyte 0:5a4798128c36 662 if (status != MMQTT_STATUS_OK) {
citrusbyte 0:5a4798128c36 663 DBG("%s", F("Error skipping non-publish packet: "));
citrusbyte 0:5a4798128c36 664 DBGLN("%d", status);
citrusbyte 0:5a4798128c36 665 close();
citrusbyte 0:5a4798128c36 666 return E_DISCONNECTED;
citrusbyte 0:5a4798128c36 667 }
citrusbyte 0:5a4798128c36 668 } else {
citrusbyte 0:5a4798128c36 669 /*
citrusbyte 0:5a4798128c36 670 * Since we only subscribe to one channel, there's no need to check channel name.
citrusbyte 0:5a4798128c36 671 */
citrusbyte 0:5a4798128c36 672 status = mmqtt_s_decode_string(&_connection, m2x_mmqtt_pusher, NULL, 0,
citrusbyte 0:5a4798128c36 673 NULL, NULL);
citrusbyte 0:5a4798128c36 674 if (status != MMQTT_STATUS_OK) {
citrusbyte 0:5a4798128c36 675 DBG("%s", F("Error skipping channel name string: "));
citrusbyte 0:5a4798128c36 676 DBGLN("%d", status);
citrusbyte 0:5a4798128c36 677 close();
citrusbyte 0:5a4798128c36 678 return E_DISCONNECTED;
citrusbyte 0:5a4798128c36 679 }
citrusbyte 0:5a4798128c36 680 /*
citrusbyte 0:5a4798128c36 681 * Parse JSON body for ID and status
citrusbyte 0:5a4798128c36 682 */
citrusbyte 0:5a4798128c36 683 mjson_init(&ctx, &_connection, m2x_mjson_reader);
citrusbyte 0:5a4798128c36 684 parsed_id = -1;
citrusbyte 0:5a4798128c36 685 if (mjson_readcheck_object_start(&ctx) != MJSON_OK) {
citrusbyte 0:5a4798128c36 686 /* Oops we have an error */
citrusbyte 0:5a4798128c36 687 DBG("%s", F("Publish Packet is not a JSON object!"));
citrusbyte 0:5a4798128c36 688 close();
citrusbyte 0:5a4798128c36 689 return E_DISCONNECTED;
citrusbyte 0:5a4798128c36 690 }
citrusbyte 0:5a4798128c36 691 while (mjson_read_object_separator_or_end(&ctx) != MJSON_SUBTYPE_OBJECT_END) {
citrusbyte 0:5a4798128c36 692 if (mjson_readcheck_string_start(&ctx) != MJSON_OK) {
citrusbyte 0:5a4798128c36 693 DBG("%s", F("Object key is not string!"));
citrusbyte 0:5a4798128c36 694 close();
citrusbyte 0:5a4798128c36 695 return E_DISCONNECTED;
citrusbyte 0:5a4798128c36 696 }
citrusbyte 0:5a4798128c36 697 mjson_read_full_string(&ctx, buf, 6, &buf_length);
citrusbyte 0:5a4798128c36 698 mjson_read_object_key_separator(&ctx);
citrusbyte 0:5a4798128c36 699 if (strncmp(buf, F("status"), 6) == 0) {
citrusbyte 0:5a4798128c36 700 mjson_read_int16(&ctx, &response_status);
citrusbyte 0:5a4798128c36 701 } else if (strncmp(buf, F("id"), 2) == 0) {
citrusbyte 0:5a4798128c36 702 /* Hack since we know the ID passed is actually an integer*/
citrusbyte 0:5a4798128c36 703 mjson_readcheck_string_start(&ctx);
citrusbyte 0:5a4798128c36 704 mjson_read_int16(&ctx, &parsed_id);
citrusbyte 0:5a4798128c36 705 mjson_read_string_end(&ctx);
citrusbyte 0:5a4798128c36 706 } else {
citrusbyte 0:5a4798128c36 707 mjson_skip_value(&ctx);
citrusbyte 0:5a4798128c36 708 }
citrusbyte 0:5a4798128c36 709 }
citrusbyte 0:5a4798128c36 710 if (parsed_id == _current_id) { return response_status; }
citrusbyte 0:5a4798128c36 711 }
citrusbyte 0:5a4798128c36 712 }
citrusbyte 0:5a4798128c36 713 return E_NOTREACHABLE;
citrusbyte 0:5a4798128c36 714 }
citrusbyte 0:5a4798128c36 715
citrusbyte 0:5a4798128c36 716 void M2XMQTTClient::close() {
citrusbyte 0:5a4798128c36 717 _client->stop();
citrusbyte 0:5a4798128c36 718 _connected = false;
citrusbyte 0:5a4798128c36 719 }
citrusbyte 0:5a4798128c36 720
citrusbyte 0:5a4798128c36 721 #endif /* M2XMQTTCLIENT_H_ */