M2X MQTT Client for ARM MBED
Dependents: WNCInterface_M2XMQTTdemo
M2XMQTTClient.h
00001 #ifndef M2XMQTTCLIENT_H_ 00002 #define M2XMQTTCLIENT_H_ 00003 00004 #if (!defined(MBED_PLATFORM)) && (!defined(LINUX_PLATFORM)) 00005 #error "Platform definition is missing!" 00006 #endif 00007 00008 #define M2X_VERSION "0.1.0" 00009 00010 #ifdef MBED_PLATFORM 00011 #include "m2x-mbed.h" 00012 #endif /* MBED_PLATFORM */ 00013 00014 #ifdef LINUX_PLATFORM 00015 #include "m2x-linux.h" 00016 #endif /* LINUX_PLATFORM */ 00017 00018 /* If we don't have DBG defined, provide dump implementation */ 00019 #ifndef DBG 00020 #define DBG(fmt_, data_) 00021 #define DBGLN(fmt_, data_) 00022 #define DBGLNEND 00023 #endif /* DBG */ 00024 00025 #define MIN(a, b) (((a) > (b))?(b):(a)) 00026 #define TO_HEX(t_) ((char) (((t_) > 9) ? ((t_) - 10 + 'A') : ((t_) + '0'))) 00027 #define MAX_DOUBLE_DIGITS 7 00028 00029 /* For tolower */ 00030 #include <ctype.h> 00031 00032 static const int E_OK = 0; 00033 static const int E_NOCONNECTION = -1; 00034 static const int E_DISCONNECTED = -2; 00035 static const int E_NOTREACHABLE = -3; 00036 static const int E_INVALID = -4; 00037 static const int E_JSON_INVALID = -5; 00038 static const int E_BUFFER_TOO_SMALL = -6; 00039 static const int E_TIMESTAMP_ERROR = -8; 00040 00041 static const char* DEFAULT_M2X_HOST = "api-m2x.att.com"; 00042 static const int DEFAULT_M2X_PORT = 1883; 00043 00044 static inline bool m2x_status_is_success(int status) { 00045 return (status == E_OK) || (status >= 200 && status <= 299); 00046 } 00047 00048 static inline bool m2x_status_is_client_error(int status) { 00049 return status >= 400 && status <= 499; 00050 } 00051 00052 static inline bool m2x_status_is_server_error(int status) { 00053 return status >= 500 && status <= 599; 00054 } 00055 00056 static inline bool m2x_status_is_error(int status) { 00057 return m2x_status_is_client_error(status) || 00058 m2x_status_is_server_error(status); 00059 } 00060 00061 // Null Print class used to calculate length to print 00062 class NullPrint : public Print { 00063 public: 00064 size_t counter; 00065 00066 virtual size_t write(uint8_t b) { 00067 counter++; 00068 return 1; 00069 } 00070 00071 virtual size_t write(const uint8_t* buf, size_t size) { 00072 counter += size; 00073 return size; 00074 } 00075 }; 00076 00077 // Handy helper class for printing MQTT payload using a Print 00078 class MMQTTPrint : public Print { 00079 public: 00080 mmqtt_connection *connection; 00081 mmqtt_s_puller puller; 00082 00083 virtual size_t write(uint8_t b) { 00084 return write(&b, 1); 00085 } 00086 00087 virtual size_t write(const uint8_t* buf, size_t size) { 00088 return mmqtt_s_encode_buffer(connection, puller, buf, size) == MMQTT_STATUS_OK ? size : -1; 00089 } 00090 }; 00091 00092 class M2XMQTTClient { 00093 public: 00094 M2XMQTTClient(Client* client, 00095 const char* key, 00096 void (* idlefunc)(void) = NULL, 00097 bool keepalive = true, 00098 const char* host = DEFAULT_M2X_HOST, 00099 int port = DEFAULT_M2X_PORT, 00100 const char* path_prefix = NULL); 00101 00102 // Push data stream value using PUT request, returns the HTTP status code 00103 // NOTE: if you want to update by a serial, use "serial/<serial ID>" as 00104 // the device ID here. 00105 template <class T> 00106 int updateStreamValue(const char* deviceId, const char* streamName, T value); 00107 00108 // Post multiple values to M2X all at once. 00109 // +deviceId+ - id of the device to post values 00110 // +streamNum+ - Number of streams to post 00111 // +names+ - Array of stream names, the length of the array should 00112 // be exactly +streamNum+ 00113 // +counts+ - Array of +streamNum+ length, each item in this array 00114 // containing the number of values we want to post for each stream 00115 // +ats+ - Timestamps for each value, the length of this array should 00116 // be the some of all values in +counts+, for the first +counts[0]+ 00117 // items, the values belong to the first stream, for the following 00118 // +counts[1]+ number of items, the values belong to the second stream, 00119 // etc. Notice that timestamps are required here: you must provide 00120 // a timestamp for each value posted. 00121 // +values+ - Values to post. This works the same way as +ats+, the 00122 // first +counts[0]+ number of items contain values to post to the first 00123 // stream, the succeeding +counts[1]+ number of items contain values 00124 // for the second stream, etc. The length of this array should be 00125 // the sum of all values in +counts+ array. 00126 // NOTE: if you want to update by a serial, use "serial/<serial ID>" as 00127 // the device ID here. 00128 template <class T> 00129 int postDeviceUpdates(const char* deviceId, int streamNum, 00130 const char* names[], const int counts[], 00131 const char* ats[], T values[]); 00132 00133 // Post multiple values of a single device at once. 00134 // +deviceId+ - id of the device to post values 00135 // +streamNum+ - Number of streams to post 00136 // +names+ - Array of stream names, the length of the array should 00137 // be exactly +streamNum+ 00138 // +values+ - Array of values to post, the length of the array should 00139 // be exactly +streamNum+. Notice that the array of +values+ should 00140 // match the array of +names+, and that the ith value in +values+ is 00141 // exactly the value to post for the ith stream name in +names+ 00142 // NOTE: if you want to update by a serial, use "serial/<serial ID>" as 00143 // the device ID here. 00144 template <class T> 00145 int postDeviceUpdate(const char* deviceId, int streamNum, 00146 const char* names[], T values[], 00147 const char* at = NULL); 00148 00149 // Update datasource location 00150 // NOTE: On an Arduino Uno and other ATMEGA based boards, double has 00151 // 4-byte (32 bits) precision, which is the same as float. So there's 00152 // no natural double-precision floating number on these boards. With 00153 // a float value, we have a precision of roughly 7 digits, that means 00154 // either 5 or 6 digits after the floating point. According to wikipedia, 00155 // a difference of 0.00001 will give us ~1.1132m distance. If this 00156 // precision is good for you, you can use the double-version we provided 00157 // here. Otherwise, you may need to use the string-version and do the 00158 // actual conversion by yourselves. 00159 // However, with an Arduino Due board, double has 8-bytes (64 bits) 00160 // precision, which means you are free to use the double-version only 00161 // without any precision problems. 00162 // Returned value is the http status code. 00163 // NOTE: if you want to update by a serial, use "serial/<serial ID>" as 00164 // the device ID here. 00165 template <class T> 00166 int updateLocation(const char* deviceId, const char* name, 00167 T latitude, T longitude, T elevation); 00168 00169 // Delete values from a data stream 00170 // You will need to provide from and end date/time strings in the ISO8601 00171 // format "yyyy-mm-ddTHH:MM:SS.SSSZ" where 00172 // yyyy: the year 00173 // mm: the month 00174 // dd: the day 00175 // HH: the hour (24 hour format) 00176 // MM: the minute 00177 // SS.SSS: the seconds (to the millisecond) 00178 // NOTE: the time is given in Zulu (GMT) 00179 // M2X will delete all values within the from to end date/time range. 00180 // The status code is 204 on success and 400 on a bad request (e.g. the 00181 // timestamp is not in ISO8601 format or the from timestamp is not less than 00182 // or equal to the end timestamp. 00183 int deleteValues(const char* deviceId, const char* streamName, 00184 const char* from, const char* end); 00185 00186 // Following fields are public so mmqtt callback functions can access directly 00187 Client* _client; 00188 private: 00189 struct mmqtt_connection _connection; 00190 const char* _key; 00191 uint16_t _key_length; 00192 bool _connected; 00193 bool _keepalive; 00194 const char* _host; 00195 int _port; 00196 void (* _idlefunc)(void); 00197 const char* _path_prefix; 00198 NullPrint _null_print; 00199 MMQTTPrint _mmqtt_print; 00200 int16_t _current_id; 00201 00202 int connectToServer(); 00203 00204 template <class T> 00205 int printUpdateStreamValuePayload(Print* print, const char* deviceId, 00206 const char* streamName, T value); 00207 00208 template <class T> 00209 int printPostDeviceUpdatesPayload(Print* print, 00210 const char* deviceId, int streamNum, 00211 const char* names[], const int counts[], 00212 const char* ats[], T values[]); 00213 00214 template <class T> 00215 int printPostDeviceUpdatePayload(Print* print, 00216 const char* deviceId, int streamNum, 00217 const char* names[], T values[], 00218 const char* at = NULL); 00219 00220 template <class T> 00221 int printUpdateLocationPayload(Print* print, 00222 const char* deviceId, const char* name, 00223 T latitude, T longitude, T elevation); 00224 00225 int printDeleteValuesPayload(Print* print, 00226 const char* deviceId, const char* streamName, 00227 const char* from, const char* end); 00228 00229 int readStatusCode(); 00230 void close(); 00231 }; 00232 00233 // Implementations 00234 M2XMQTTClient::M2XMQTTClient(Client* client, 00235 const char* key, 00236 void (* idlefunc)(void), 00237 bool keepalive, 00238 const char* host, 00239 int port, 00240 const char* path_prefix) : _client(client), 00241 _key(key), 00242 _idlefunc(idlefunc), 00243 _keepalive(keepalive), 00244 _connected(false), 00245 _host(host), 00246 _port(port), 00247 _path_prefix(path_prefix), 00248 _null_print(), 00249 _mmqtt_print(), 00250 _current_id(0) { 00251 _key_length = strlen(_key); 00252 } 00253 00254 mmqtt_status_t m2x_mmqtt_puller(struct mmqtt_connection *connection) { 00255 const uint8_t *data = NULL; 00256 mmqtt_ssize_t length = 0; 00257 mmqtt_status_t status; 00258 M2XMQTTClient *client = (M2XMQTTClient *) connection->connection; 00259 Client *c = client->_client; 00260 struct mmqtt_stream *stream = mmqtt_connection_pullable_stream(connection); 00261 if (stream == NULL) { return MMQTT_STATUS_NOT_PULLABLE; } 00262 00263 status = mmqtt_stream_external_pullable(stream, &data, &length); 00264 if (status != MMQTT_STATUS_OK) { return status; } 00265 00266 length = c->write(data, length); 00267 if (length < 0) { 00268 c->stop(); 00269 return MMQTT_STATUS_BROKEN_CONNECTION; 00270 } 00271 mmqtt_stream_external_pull(stream, length); 00272 if (mmqtt_stream_running(stream) == MMQTT_STATUS_DONE) { 00273 mmqtt_connection_release_write_stream(connection, stream); 00274 } 00275 return MMQTT_STATUS_OK; 00276 } 00277 00278 mmqtt_status_t m2x_mmqtt_pusher(struct mmqtt_connection *connection, mmqtt_ssize_t max_size) { 00279 uint8_t *data = NULL; 00280 mmqtt_ssize_t length = 0, i = 0; 00281 mmqtt_status_t status; 00282 M2XMQTTClient *client = (M2XMQTTClient *) connection->connection; 00283 Client *c = client->_client; 00284 struct mmqtt_stream *stream = mmqtt_connection_pushable_stream(connection); 00285 if (stream == NULL) { return MMQTT_STATUS_NOT_PUSHABLE; } 00286 00287 status = mmqtt_stream_external_pushable(stream, &data, &length); 00288 if (status != MMQTT_STATUS_OK) { return status; } 00289 length = min(length, max_size); 00290 00291 /* Maybe we need another field in signature documenting how much data we want, 00292 * so we can handle end condition gracefully? Not 100% if `left` field in 00293 * mmqtt_stream is enough 00294 */ 00295 while (i < length && c->available()) { 00296 data[i++] = c->read(); 00297 } 00298 mmqtt_stream_external_push(stream, i); 00299 return MMQTT_STATUS_OK; 00300 } 00301 00302 size_t m2x_mjson_reader(struct mjson_ctx *ctx, char *data, size_t limit) 00303 { 00304 mmqtt_connection *connection = (mmqtt_connection *) ctx->userdata; 00305 return mmqtt_s_decode_buffer(connection, m2x_mmqtt_pusher, (uint8_t *) data, limit, 00306 limit) == MMQTT_STATUS_OK ? limit : 0; 00307 } 00308 00309 int M2XMQTTClient::connectToServer() { 00310 mmqtt_status_t status; 00311 struct mmqtt_p_connect_header connect_header; 00312 struct mmqtt_p_connack_header connack_header; 00313 uint32_t packet_length; 00314 uint8_t flag; 00315 uint16_t length; 00316 uint8_t name[6]; 00317 00318 if (_client->connect(_host, _port)) { 00319 DBGLN("%s", F("Connected to M2X MQTT server!")); 00320 mmqtt_connection_init(&_connection, this); 00321 /* Send CONNECT packet first */ 00322 connect_header.name = name; 00323 strncpy((char *)name, F("MQIsdp"), 6); 00324 connect_header.name_length = connect_header.name_max_length = 6; 00325 connect_header.protocol_version = 3; 00326 /* Clean session with username set */ 00327 connect_header.flags = 0x82; 00328 connect_header.keepalive = 60; 00329 packet_length = mmqtt_s_connect_header_encoded_length(&connect_header) + 00330 mmqtt_s_string_encoded_length(_key_length) + 00331 mmqtt_s_string_encoded_length(_key_length); 00332 status = mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller, 00333 MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_CONNECT), 00334 packet_length); 00335 if (status != MMQTT_STATUS_OK) { 00336 DBG("%s", F("Error sending connect packet fixed header: ")); 00337 DBGLN("%d", status); 00338 _client->stop(); 00339 return E_DISCONNECTED; 00340 } 00341 status = mmqtt_s_encode_connect_header(&_connection, m2x_mmqtt_puller, &connect_header); 00342 if (status != MMQTT_STATUS_OK) { 00343 DBG("%s", F("Error sending connect packet variable header: ")); 00344 DBGLN("%d", status); 00345 _client->stop(); 00346 return E_DISCONNECTED; 00347 } 00348 /* Client ID */ 00349 status = mmqtt_s_encode_string(&_connection, m2x_mmqtt_puller, 00350 (const uint8_t *) _key, _key_length); 00351 if (status != MMQTT_STATUS_OK) { 00352 DBG("%s", F("Error sending connect packet payload: ")); 00353 DBGLN("%d", status); 00354 _client->stop(); 00355 return E_DISCONNECTED; 00356 } 00357 /* Username */ 00358 status = mmqtt_s_encode_string(&_connection, m2x_mmqtt_puller, 00359 (const uint8_t *) _key, _key_length); 00360 if (status != MMQTT_STATUS_OK) { 00361 DBG("%s", F("Error sending connect packet payload: ")); 00362 DBGLN("%d", status); 00363 _client->stop(); 00364 return E_DISCONNECTED; 00365 } 00366 /* Check CONNACK packet */ 00367 do { 00368 status = mmqtt_s_decode_fixed_header(&_connection, m2x_mmqtt_pusher, 00369 &flag, &packet_length); 00370 if (status != MMQTT_STATUS_OK) { 00371 DBG("%s", F("Error decoding connack fixed header: ")); 00372 DBGLN("%d", status); 00373 _client->stop(); 00374 return E_DISCONNECTED; 00375 } 00376 if (MMQTT_UNPACK_MESSAGE_TYPE(flag) != MMQTT_MESSAGE_TYPE_CONNACK) { 00377 status = mmqtt_s_skip_buffer(&_connection, m2x_mmqtt_pusher, packet_length); 00378 if (status != MMQTT_STATUS_OK) { 00379 DBG("%s", F("Error skipping non-connack packet: ")); 00380 DBGLN("%d", status); 00381 _client->stop(); 00382 return E_DISCONNECTED; 00383 } 00384 } 00385 } while (MMQTT_UNPACK_MESSAGE_TYPE(flag) != MMQTT_MESSAGE_TYPE_CONNACK); 00386 status = mmqtt_s_decode_connack_header(&_connection, m2x_mmqtt_pusher, 00387 &connack_header); 00388 if (status != MMQTT_STATUS_OK) { 00389 DBG("%s", F("Error decoding connack variable header: ")); 00390 DBGLN("%d", status); 00391 _client->stop(); 00392 return E_DISCONNECTED; 00393 } 00394 if (connack_header.return_code != 0x0) { 00395 DBG("%s", F("CONNACK return code is not accepted: ")); 00396 DBGLN("%d", connack_header.return_code); 00397 _client->stop(); 00398 return E_DISCONNECTED; 00399 } 00400 /* Send SUBSCRIBE packet*/ 00401 length = _key_length + 15 + 4; 00402 status = mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller, 00403 MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_SUBSCRIBE) | 0x2, 00404 length); 00405 if (status != MMQTT_STATUS_OK) { 00406 DBG("%s", F("Error sending subscribe packet fixed header: ")); 00407 DBGLN("%d", status); 00408 _client->stop(); 00409 return E_DISCONNECTED; 00410 } 00411 // Subscribe packet must use QoS 1 00412 mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, 0); 00413 mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 14); 00414 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4); 00415 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length); 00416 // The extra one is QoS, added here to save a function call 00417 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/responses\0"), 11); 00418 /* Check SUBACK packet */ 00419 do { 00420 status = mmqtt_s_decode_fixed_header(&_connection, m2x_mmqtt_pusher, 00421 &flag, &packet_length); 00422 if (status != MMQTT_STATUS_OK) { 00423 DBG("%s", F("Error decoding suback fixed header: ")); 00424 DBGLN("%d", status); 00425 _client->stop(); 00426 return E_DISCONNECTED; 00427 } 00428 if (MMQTT_UNPACK_MESSAGE_TYPE(flag) != MMQTT_MESSAGE_TYPE_SUBACK) { 00429 status = mmqtt_s_skip_buffer(&_connection, m2x_mmqtt_pusher, packet_length); 00430 if (status != MMQTT_STATUS_OK) { 00431 DBG("%s", F("Error skipping packet: ")); 00432 DBGLN("%d", status); 00433 _client->stop(); 00434 return E_DISCONNECTED; 00435 } 00436 } 00437 } while (MMQTT_UNPACK_MESSAGE_TYPE(flag) != MMQTT_MESSAGE_TYPE_SUBACK); 00438 status = mmqtt_s_skip_buffer(&_connection, m2x_mmqtt_pusher, packet_length); 00439 if (status != MMQTT_STATUS_OK) { 00440 DBG("%s", F("Error skipping suback packet: ")); 00441 DBGLN("%d", status); 00442 _client->stop(); 00443 return E_DISCONNECTED; 00444 } 00445 _mmqtt_print.connection = &_connection; 00446 _mmqtt_print.puller = m2x_mmqtt_puller; 00447 _connected = true; 00448 return E_OK; 00449 } else { 00450 DBGLN("%s", F("ERROR: Cannot connect to M2X MQTT server!")); 00451 return E_NOCONNECTION; 00452 } 00453 } 00454 00455 template <class T> 00456 int M2XMQTTClient::updateStreamValue(const char* deviceId, const char* streamName, T value) { 00457 int length; 00458 if (!_connected) { 00459 if (connectToServer() != E_OK) { 00460 DBGLN("%s", "ERROR: Cannot connect to M2X server!"); 00461 return E_NOCONNECTION; 00462 } 00463 } 00464 _current_id++; 00465 length = printUpdateStreamValuePayload(&_null_print, deviceId, streamName, value); 00466 mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller, 00467 MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_PUBLISH), 00468 length + _key_length + 15); 00469 mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 13); 00470 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4); 00471 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length); 00472 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/requests"), 9); 00473 printUpdateStreamValuePayload(&_mmqtt_print, deviceId, streamName, value); 00474 return readStatusCode(); 00475 } 00476 00477 template <class T> 00478 int M2XMQTTClient::printUpdateStreamValuePayload(Print* print, const char* deviceId, 00479 const char* streamName, T value) { 00480 int bytes = 0; 00481 bytes += print->print(F("{\"id\":\"")); 00482 bytes += print->print(_current_id); 00483 bytes += print->print(F("\",\"method\":\"PUT\",\"resource\":\"")); 00484 if (_path_prefix) { bytes += print->print(_path_prefix); } 00485 bytes += print->print(F("/v2/devices/")); 00486 bytes += print->print(deviceId); 00487 bytes += print->print(F("/streams/")); 00488 bytes += print->print(streamName); 00489 bytes += print->print(F("/value")); 00490 bytes += print->print(F("\",\"agent\":\"")); 00491 bytes += print->print(USER_AGENT); 00492 bytes += print->print(F("\",\"body\":")); 00493 bytes += print->print(F("{\"value\":\"")); 00494 bytes += print->print(value); 00495 bytes += print->print(F("\"}")); 00496 bytes += print->print(F("}")); 00497 return bytes; 00498 } 00499 00500 template <class T> 00501 int M2XMQTTClient::postDeviceUpdates(const char* deviceId, int streamNum, 00502 const char* names[], const int counts[], 00503 const char* ats[], T values[]) { 00504 int length; 00505 if (!_connected) { 00506 if (connectToServer() != E_OK) { 00507 DBGLN("%s", "ERROR: Cannot connect to M2X server!"); 00508 return E_NOCONNECTION; 00509 } 00510 } 00511 _current_id++; 00512 length = printPostDeviceUpdatesPayload(&_null_print, deviceId, streamNum, 00513 names, counts, ats, values); 00514 mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller, 00515 MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_PUBLISH), 00516 length + _key_length + 15); 00517 mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 13); 00518 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4); 00519 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length); 00520 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/requests"), 9); 00521 printPostDeviceUpdatesPayload(&_mmqtt_print, deviceId, streamNum, 00522 names, counts, ats, values); 00523 return readStatusCode(); 00524 } 00525 00526 template <class T> 00527 int M2XMQTTClient::printPostDeviceUpdatesPayload(Print* print, 00528 const char* deviceId, int streamNum, 00529 const char* names[], const int counts[], 00530 const char* ats[], T values[]) { 00531 int bytes = 0, value_index = 0, i, j; 00532 bytes += print->print(F("{\"id\":\"")); 00533 bytes += print->print(_current_id); 00534 bytes += print->print(F("\",\"method\":\"POST\",\"resource\":\"")); 00535 if (_path_prefix) { bytes += print->print(_path_prefix); } 00536 bytes += print->print(F("/v2/devices/")); 00537 bytes += print->print(deviceId); 00538 bytes += print->print(F("/updates")); 00539 bytes += print->print(F("\",\"agent\":\"")); 00540 bytes += print->print(USER_AGENT); 00541 bytes += print->print(F("\",\"body\":")); 00542 bytes += print->print(F("{\"values\":{")); 00543 for (i = 0; i < streamNum; i++) { 00544 bytes += print->print(F("\"")); 00545 bytes += print->print(names[i]); 00546 bytes += print->print(F("\":[")); 00547 for (j = 0; j < counts[i]; j++) { 00548 bytes += print->print(F("{\"timestamp\": \"")); 00549 bytes += print->print(ats[value_index]); 00550 bytes += print->print(F("\",\"value\": \"")); 00551 bytes += print->print(values[value_index]); 00552 bytes += print->print(F("\"}")); 00553 if (j < counts[i] - 1) { bytes += print->print(F(",")); } 00554 value_index++; 00555 } 00556 bytes += print->print(F("]")); 00557 if (i < streamNum - 1) { bytes += print->print(F(",")); } 00558 } 00559 bytes += print->print(F(("}}}"))); 00560 return bytes; 00561 } 00562 00563 template <class T> 00564 int M2XMQTTClient::postDeviceUpdate(const char* deviceId, int streamNum, 00565 const char* names[], T values[], 00566 const char* at) { 00567 int length; 00568 if (!_connected) { 00569 if (connectToServer() != E_OK) { 00570 DBGLN("%s", "ERROR: Cannot connect to M2X server!"); 00571 return E_NOCONNECTION; 00572 } 00573 } 00574 _current_id++; 00575 length = printPostDeviceUpdatePayload(&_null_print, deviceId, streamNum, 00576 names, values, at); 00577 mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller, 00578 MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_PUBLISH), 00579 length + _key_length + 15); 00580 mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 13); 00581 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4); 00582 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length); 00583 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/requests"), 9); 00584 printPostDeviceUpdatePayload(&_mmqtt_print, deviceId, streamNum, 00585 names, values, at); 00586 return readStatusCode(); 00587 } 00588 00589 template <class T> 00590 int M2XMQTTClient::printPostDeviceUpdatePayload(Print* print, 00591 const char* deviceId, int streamNum, 00592 const char* names[], T values[], 00593 const char* at) { 00594 int bytes = 0, i; 00595 bytes += print->print(F("{\"id\":\"")); 00596 bytes += print->print(_current_id); 00597 bytes += print->print(F("\",\"method\":\"POST\",\"resource\":\"")); 00598 if (_path_prefix) { bytes += print->print(_path_prefix); } 00599 bytes += print->print(F("/v2/devices/")); 00600 bytes += print->print(deviceId); 00601 bytes += print->print(F("/update")); 00602 bytes += print->print(F("\",\"agent\":\"")); 00603 bytes += print->print(USER_AGENT); 00604 bytes += print->print(F("\",\"body\":")); 00605 bytes += print->print(F("{\"values\":{")); 00606 for (int i = 0; i < streamNum; i++) { 00607 bytes += print->print(F("\"")); 00608 bytes += print->print(names[i]); 00609 bytes += print->print(F("\": \"")); 00610 bytes += print->print(values[i]); 00611 bytes += print->print(F("\"")); 00612 if (i < streamNum - 1) { bytes += print->print(F(",")); } 00613 } 00614 bytes += print->print(F("}")); 00615 if (at != NULL) { 00616 bytes += print->print(F(",\"timestamp\":\"")); 00617 bytes += print->print(at); 00618 bytes += print->print(F("\"")); 00619 } 00620 bytes += print->print(F(("}"))); 00621 return bytes; 00622 } 00623 00624 template <class T> 00625 int M2XMQTTClient::updateLocation(const char* deviceId, const char* name, 00626 T latitude, T longitude, T elevation) { 00627 int length; 00628 if (!_connected) { 00629 if (connectToServer() != E_OK) { 00630 DBGLN("%s", "ERROR: Cannot connect to M2X server!"); 00631 return E_NOCONNECTION; 00632 } 00633 } 00634 _current_id++; 00635 length = printUpdateLocationPayload(&_null_print, deviceId, name, 00636 latitude, longitude, elevation); 00637 mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller, 00638 MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_PUBLISH), 00639 length + _key_length + 15); 00640 mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 13); 00641 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4); 00642 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length); 00643 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/requests"), 9); 00644 printUpdateLocationPayload(&_mmqtt_print, deviceId, name, 00645 latitude, longitude, elevation); 00646 return readStatusCode(); 00647 } 00648 00649 template <class T> 00650 int M2XMQTTClient::printUpdateLocationPayload(Print* print, 00651 const char* deviceId, const char* name, 00652 T latitude, T longitude, T elevation) { 00653 int bytes = 0; 00654 bytes += print->print(F("{\"id\":\"")); 00655 bytes += print->print(_current_id); 00656 bytes += print->print(F("\",\"method\":\"PUT\",\"resource\":\"")); 00657 if (_path_prefix) { bytes += print->print(_path_prefix); } 00658 bytes += print->print(F("/v2/devices/")); 00659 bytes += print->print(deviceId); 00660 bytes += print->print(F("/location")); 00661 bytes += print->print(F("\",\"agent\":\"")); 00662 bytes += print->print(USER_AGENT); 00663 bytes += print->print(F("\",\"body\":{\"name\":\"")); 00664 bytes += print->print(name); 00665 bytes += print->print(F("\",\"latitude\":\"")); 00666 bytes += print->print(latitude); 00667 bytes += print->print(F("\",\"longitude\":\"")); 00668 bytes += print->print(longitude); 00669 bytes += print->print(F("\",\"elevation\":\"")); 00670 bytes += print->print(elevation); 00671 bytes += print->print(F(("\"}}"))); 00672 return bytes; 00673 } 00674 00675 int M2XMQTTClient::deleteValues(const char* deviceId, const char* streamName, 00676 const char* from, const char* end) { 00677 int length; 00678 if (!_connected) { 00679 if (connectToServer() != E_OK) { 00680 DBGLN("%s", "ERROR: Cannot connect to M2X server!"); 00681 return E_NOCONNECTION; 00682 } 00683 } 00684 _current_id++; 00685 length = printDeleteValuesPayload(&_null_print, deviceId, streamName, from, end); 00686 mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller, 00687 MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_PUBLISH), 00688 length + _key_length + 15); 00689 mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 13); 00690 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4); 00691 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length); 00692 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/requests"), 9); 00693 printDeleteValuesPayload(&_mmqtt_print, deviceId, streamName, from, end); 00694 return readStatusCode(); 00695 } 00696 00697 int M2XMQTTClient::printDeleteValuesPayload(Print *print, 00698 const char* deviceId, const char* streamName, 00699 const char* from, const char* end) { 00700 int bytes = 0; 00701 bytes += print->print(F("{\"id\":\"")); 00702 bytes += print->print(_current_id); 00703 bytes += print->print(F("\",\"method\":\"DELETE\",\"resource\":\"")); 00704 if (_path_prefix) { bytes += print->print(_path_prefix); } 00705 bytes += print->print(F("/v2/devices/")); 00706 bytes += print->print(deviceId); 00707 bytes += print->print(F("/streams/")); 00708 bytes += print->print(streamName); 00709 bytes += print->print(F("/values")); 00710 bytes += print->print(F("\",\"agent\":\"")); 00711 bytes += print->print(USER_AGENT); 00712 bytes += print->print(F("\",\"body\":{\"from\":\"")); 00713 bytes += print->print(from); 00714 bytes += print->print(F("\",\"end\":\"")); 00715 bytes += print->print(end); 00716 bytes += print->print(F(("\"}}"))); 00717 return bytes; 00718 } 00719 00720 int M2XMQTTClient::readStatusCode() { 00721 mmqtt_status_t status; 00722 uint32_t packet_length; 00723 uint8_t flag; 00724 int16_t parsed_id, response_status; 00725 struct mjson_ctx ctx; 00726 char buf[6]; 00727 size_t buf_length; 00728 00729 while (true) { 00730 status = mmqtt_s_decode_fixed_header(&_connection, m2x_mmqtt_pusher, 00731 &flag, &packet_length); 00732 if (status != MMQTT_STATUS_OK) { 00733 DBG("%s", F("Error decoding publish fixed header: ")); 00734 DBGLN("%d", status); 00735 close(); 00736 return E_DISCONNECTED; 00737 } 00738 if (MMQTT_UNPACK_MESSAGE_TYPE(flag) != MMQTT_MESSAGE_TYPE_PUBLISH) { 00739 status = mmqtt_s_skip_buffer(&_connection, m2x_mmqtt_pusher, packet_length); 00740 if (status != MMQTT_STATUS_OK) { 00741 DBG("%s", F("Error skipping non-publish packet: ")); 00742 DBGLN("%d", status); 00743 close(); 00744 return E_DISCONNECTED; 00745 } 00746 } else { 00747 /* 00748 * Since we only subscribe to one channel, there's no need to check channel name. 00749 */ 00750 status = mmqtt_s_decode_string(&_connection, m2x_mmqtt_pusher, NULL, 0, 00751 NULL, NULL); 00752 if (status != MMQTT_STATUS_OK) { 00753 DBG("%s", F("Error skipping channel name string: ")); 00754 DBGLN("%d", status); 00755 close(); 00756 return E_DISCONNECTED; 00757 } 00758 /* 00759 * Parse JSON body for ID and status 00760 */ 00761 mjson_init(&ctx, &_connection, m2x_mjson_reader); 00762 parsed_id = -1; 00763 if (mjson_readcheck_object_start(&ctx) != MJSON_OK) { 00764 /* Oops we have an error */ 00765 DBG("%s", F("Publish Packet is not a JSON object!")); 00766 close(); 00767 return E_DISCONNECTED; 00768 } 00769 while (mjson_read_object_separator_or_end(&ctx) != MJSON_SUBTYPE_OBJECT_END) { 00770 if (mjson_readcheck_string_start(&ctx) != MJSON_OK) { 00771 DBG("%s", F("Object key is not string!")); 00772 close(); 00773 return E_DISCONNECTED; 00774 } 00775 mjson_read_full_string(&ctx, buf, 6, &buf_length); 00776 mjson_read_object_key_separator(&ctx); 00777 if (strncmp(buf, F("status"), 6) == 0) { 00778 mjson_read_int16(&ctx, &response_status); 00779 } else if (strncmp(buf, F("id"), 2) == 0) { 00780 /* Hack since we know the ID passed is actually an integer*/ 00781 mjson_readcheck_string_start(&ctx); 00782 mjson_read_int16(&ctx, &parsed_id); 00783 mjson_read_string_end(&ctx); 00784 } else { 00785 mjson_skip_value(&ctx); 00786 } 00787 } 00788 if (parsed_id == _current_id) { return response_status; } 00789 } 00790 } 00791 return E_NOTREACHABLE; 00792 } 00793 00794 void M2XMQTTClient::close() { 00795 _client->stop(); 00796 _connected = false; 00797 } 00798 00799 #endif /* M2XMQTTCLIENT_H_ */
Generated on Sun Aug 7 2022 04:57:39 by
![doxygen](doxygen.png)