M2X MQTT Client for ARM MBED
Dependents: WNCInterface_M2XMQTTdemo
M2XMQTTClient.h@0:5a4798128c36, 2016-06-27 (annotated)
- Committer:
- citrusbyte
- Date:
- Mon Jun 27 13:25:45 2016 +0000
- Revision:
- 0:5a4798128c36
- Child:
- 1:c4d41ff3c58e
Initial Commit
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
citrusbyte | 0:5a4798128c36 | 1 | #ifndef M2XMQTTCLIENT_H_ |
citrusbyte | 0:5a4798128c36 | 2 | #define M2XMQTTCLIENT_H_ |
citrusbyte | 0:5a4798128c36 | 3 | |
citrusbyte | 0:5a4798128c36 | 4 | #if (!defined(MBED_PLATFORM)) && (!defined(LINUX_PLATFORM)) |
citrusbyte | 0:5a4798128c36 | 5 | #error "Platform definition is missing!" |
citrusbyte | 0:5a4798128c36 | 6 | #endif |
citrusbyte | 0:5a4798128c36 | 7 | |
citrusbyte | 0:5a4798128c36 | 8 | #define M2X_VERSION "0.1.0" |
citrusbyte | 0:5a4798128c36 | 9 | |
citrusbyte | 0:5a4798128c36 | 10 | #ifdef MBED_PLATFORM |
citrusbyte | 0:5a4798128c36 | 11 | #include "m2x-mbed.h" |
citrusbyte | 0:5a4798128c36 | 12 | #endif /* MBED_PLATFORM */ |
citrusbyte | 0:5a4798128c36 | 13 | |
citrusbyte | 0:5a4798128c36 | 14 | #ifdef LINUX_PLATFORM |
citrusbyte | 0:5a4798128c36 | 15 | #include "m2x-linux.h" |
citrusbyte | 0:5a4798128c36 | 16 | #endif /* LINUX_PLATFORM */ |
citrusbyte | 0:5a4798128c36 | 17 | |
citrusbyte | 0:5a4798128c36 | 18 | /* If we don't have DBG defined, provide dump implementation */ |
citrusbyte | 0:5a4798128c36 | 19 | #ifndef DBG |
citrusbyte | 0:5a4798128c36 | 20 | #define DBG(fmt_, data_) |
citrusbyte | 0:5a4798128c36 | 21 | #define DBGLN(fmt_, data_) |
citrusbyte | 0:5a4798128c36 | 22 | #define DBGLNEND |
citrusbyte | 0:5a4798128c36 | 23 | #endif /* DBG */ |
citrusbyte | 0:5a4798128c36 | 24 | |
citrusbyte | 0:5a4798128c36 | 25 | #define MIN(a, b) (((a) > (b))?(b):(a)) |
citrusbyte | 0:5a4798128c36 | 26 | #define TO_HEX(t_) ((char) (((t_) > 9) ? ((t_) - 10 + 'A') : ((t_) + '0'))) |
citrusbyte | 0:5a4798128c36 | 27 | #define MAX_DOUBLE_DIGITS 7 |
citrusbyte | 0:5a4798128c36 | 28 | |
citrusbyte | 0:5a4798128c36 | 29 | /* For tolower */ |
citrusbyte | 0:5a4798128c36 | 30 | #include <ctype.h> |
citrusbyte | 0:5a4798128c36 | 31 | |
citrusbyte | 0:5a4798128c36 | 32 | static const int E_OK = 0; |
citrusbyte | 0:5a4798128c36 | 33 | static const int E_NOCONNECTION = -1; |
citrusbyte | 0:5a4798128c36 | 34 | static const int E_DISCONNECTED = -2; |
citrusbyte | 0:5a4798128c36 | 35 | static const int E_NOTREACHABLE = -3; |
citrusbyte | 0:5a4798128c36 | 36 | static const int E_INVALID = -4; |
citrusbyte | 0:5a4798128c36 | 37 | static const int E_JSON_INVALID = -5; |
citrusbyte | 0:5a4798128c36 | 38 | static const int E_BUFFER_TOO_SMALL = -6; |
citrusbyte | 0:5a4798128c36 | 39 | static const int E_TIMESTAMP_ERROR = -8; |
citrusbyte | 0:5a4798128c36 | 40 | |
citrusbyte | 0:5a4798128c36 | 41 | static const char* DEFAULT_M2X_HOST = "api-m2x.att.com"; |
citrusbyte | 0:5a4798128c36 | 42 | static const int DEFAULT_M2X_PORT = 1883; |
citrusbyte | 0:5a4798128c36 | 43 | |
citrusbyte | 0:5a4798128c36 | 44 | static inline bool m2x_status_is_success(int status) { |
citrusbyte | 0:5a4798128c36 | 45 | return (status == E_OK) || (status >= 200 && status <= 299); |
citrusbyte | 0:5a4798128c36 | 46 | } |
citrusbyte | 0:5a4798128c36 | 47 | |
citrusbyte | 0:5a4798128c36 | 48 | static inline bool m2x_status_is_client_error(int status) { |
citrusbyte | 0:5a4798128c36 | 49 | return status >= 400 && status <= 499; |
citrusbyte | 0:5a4798128c36 | 50 | } |
citrusbyte | 0:5a4798128c36 | 51 | |
citrusbyte | 0:5a4798128c36 | 52 | static inline bool m2x_status_is_server_error(int status) { |
citrusbyte | 0:5a4798128c36 | 53 | return status >= 500 && status <= 599; |
citrusbyte | 0:5a4798128c36 | 54 | } |
citrusbyte | 0:5a4798128c36 | 55 | |
citrusbyte | 0:5a4798128c36 | 56 | static inline bool m2x_status_is_error(int status) { |
citrusbyte | 0:5a4798128c36 | 57 | return m2x_status_is_client_error(status) || |
citrusbyte | 0:5a4798128c36 | 58 | m2x_status_is_server_error(status); |
citrusbyte | 0:5a4798128c36 | 59 | } |
citrusbyte | 0:5a4798128c36 | 60 | |
citrusbyte | 0:5a4798128c36 | 61 | // Null Print class used to calculate length to print |
citrusbyte | 0:5a4798128c36 | 62 | class NullPrint : public Print { |
citrusbyte | 0:5a4798128c36 | 63 | public: |
citrusbyte | 0:5a4798128c36 | 64 | size_t counter; |
citrusbyte | 0:5a4798128c36 | 65 | |
citrusbyte | 0:5a4798128c36 | 66 | virtual size_t write(uint8_t b) { |
citrusbyte | 0:5a4798128c36 | 67 | counter++; |
citrusbyte | 0:5a4798128c36 | 68 | return 1; |
citrusbyte | 0:5a4798128c36 | 69 | } |
citrusbyte | 0:5a4798128c36 | 70 | |
citrusbyte | 0:5a4798128c36 | 71 | virtual size_t write(const uint8_t* buf, size_t size) { |
citrusbyte | 0:5a4798128c36 | 72 | counter += size; |
citrusbyte | 0:5a4798128c36 | 73 | return size; |
citrusbyte | 0:5a4798128c36 | 74 | } |
citrusbyte | 0:5a4798128c36 | 75 | }; |
citrusbyte | 0:5a4798128c36 | 76 | |
citrusbyte | 0:5a4798128c36 | 77 | // Handy helper class for printing MQTT payload using a Print |
citrusbyte | 0:5a4798128c36 | 78 | class MMQTTPrint : public Print { |
citrusbyte | 0:5a4798128c36 | 79 | public: |
citrusbyte | 0:5a4798128c36 | 80 | mmqtt_connection *connection; |
citrusbyte | 0:5a4798128c36 | 81 | mmqtt_s_puller puller; |
citrusbyte | 0:5a4798128c36 | 82 | |
citrusbyte | 0:5a4798128c36 | 83 | virtual size_t write(uint8_t b) { |
citrusbyte | 0:5a4798128c36 | 84 | return write(&b, 1); |
citrusbyte | 0:5a4798128c36 | 85 | } |
citrusbyte | 0:5a4798128c36 | 86 | |
citrusbyte | 0:5a4798128c36 | 87 | virtual size_t write(const uint8_t* buf, size_t size) { |
citrusbyte | 0:5a4798128c36 | 88 | return mmqtt_s_encode_buffer(connection, puller, buf, size) == MMQTT_STATUS_OK ? size : -1; |
citrusbyte | 0:5a4798128c36 | 89 | } |
citrusbyte | 0:5a4798128c36 | 90 | }; |
citrusbyte | 0:5a4798128c36 | 91 | |
citrusbyte | 0:5a4798128c36 | 92 | class M2XMQTTClient { |
citrusbyte | 0:5a4798128c36 | 93 | public: |
citrusbyte | 0:5a4798128c36 | 94 | M2XMQTTClient(Client* client, |
citrusbyte | 0:5a4798128c36 | 95 | const char* key, |
citrusbyte | 0:5a4798128c36 | 96 | void (* idlefunc)(void) = NULL, |
citrusbyte | 0:5a4798128c36 | 97 | bool keepalive = true, |
citrusbyte | 0:5a4798128c36 | 98 | const char* host = DEFAULT_M2X_HOST, |
citrusbyte | 0:5a4798128c36 | 99 | int port = DEFAULT_M2X_PORT, |
citrusbyte | 0:5a4798128c36 | 100 | const char* path_prefix = NULL); |
citrusbyte | 0:5a4798128c36 | 101 | |
citrusbyte | 0:5a4798128c36 | 102 | // Push data stream value using PUT request, returns the HTTP status code |
citrusbyte | 0:5a4798128c36 | 103 | // NOTE: if you want to update by a serial, use "serial/<serial ID>" as |
citrusbyte | 0:5a4798128c36 | 104 | // the device ID here. |
citrusbyte | 0:5a4798128c36 | 105 | template <class T> |
citrusbyte | 0:5a4798128c36 | 106 | int updateStreamValue(const char* deviceId, const char* streamName, T value); |
citrusbyte | 0:5a4798128c36 | 107 | |
citrusbyte | 0:5a4798128c36 | 108 | // Post multiple values to M2X all at once. |
citrusbyte | 0:5a4798128c36 | 109 | // +deviceId+ - id of the device to post values |
citrusbyte | 0:5a4798128c36 | 110 | // +streamNum+ - Number of streams to post |
citrusbyte | 0:5a4798128c36 | 111 | // +names+ - Array of stream names, the length of the array should |
citrusbyte | 0:5a4798128c36 | 112 | // be exactly +streamNum+ |
citrusbyte | 0:5a4798128c36 | 113 | // +counts+ - Array of +streamNum+ length, each item in this array |
citrusbyte | 0:5a4798128c36 | 114 | // containing the number of values we want to post for each stream |
citrusbyte | 0:5a4798128c36 | 115 | // +ats+ - Timestamps for each value, the length of this array should |
citrusbyte | 0:5a4798128c36 | 116 | // be the some of all values in +counts+, for the first +counts[0]+ |
citrusbyte | 0:5a4798128c36 | 117 | // items, the values belong to the first stream, for the following |
citrusbyte | 0:5a4798128c36 | 118 | // +counts[1]+ number of items, the values belong to the second stream, |
citrusbyte | 0:5a4798128c36 | 119 | // etc. Notice that timestamps are required here: you must provide |
citrusbyte | 0:5a4798128c36 | 120 | // a timestamp for each value posted. |
citrusbyte | 0:5a4798128c36 | 121 | // +values+ - Values to post. This works the same way as +ats+, the |
citrusbyte | 0:5a4798128c36 | 122 | // first +counts[0]+ number of items contain values to post to the first |
citrusbyte | 0:5a4798128c36 | 123 | // stream, the succeeding +counts[1]+ number of items contain values |
citrusbyte | 0:5a4798128c36 | 124 | // for the second stream, etc. The length of this array should be |
citrusbyte | 0:5a4798128c36 | 125 | // the sum of all values in +counts+ array. |
citrusbyte | 0:5a4798128c36 | 126 | // NOTE: if you want to update by a serial, use "serial/<serial ID>" as |
citrusbyte | 0:5a4798128c36 | 127 | // the device ID here. |
citrusbyte | 0:5a4798128c36 | 128 | template <class T> |
citrusbyte | 0:5a4798128c36 | 129 | int postDeviceUpdates(const char* deviceId, int streamNum, |
citrusbyte | 0:5a4798128c36 | 130 | const char* names[], const int counts[], |
citrusbyte | 0:5a4798128c36 | 131 | const char* ats[], T values[]); |
citrusbyte | 0:5a4798128c36 | 132 | |
citrusbyte | 0:5a4798128c36 | 133 | // Post multiple values of a single device at once. |
citrusbyte | 0:5a4798128c36 | 134 | // +deviceId+ - id of the device to post values |
citrusbyte | 0:5a4798128c36 | 135 | // +streamNum+ - Number of streams to post |
citrusbyte | 0:5a4798128c36 | 136 | // +names+ - Array of stream names, the length of the array should |
citrusbyte | 0:5a4798128c36 | 137 | // be exactly +streamNum+ |
citrusbyte | 0:5a4798128c36 | 138 | // +values+ - Array of values to post, the length of the array should |
citrusbyte | 0:5a4798128c36 | 139 | // be exactly +streamNum+. Notice that the array of +values+ should |
citrusbyte | 0:5a4798128c36 | 140 | // match the array of +names+, and that the ith value in +values+ is |
citrusbyte | 0:5a4798128c36 | 141 | // exactly the value to post for the ith stream name in +names+ |
citrusbyte | 0:5a4798128c36 | 142 | // NOTE: if you want to update by a serial, use "serial/<serial ID>" as |
citrusbyte | 0:5a4798128c36 | 143 | // the device ID here. |
citrusbyte | 0:5a4798128c36 | 144 | template <class T> |
citrusbyte | 0:5a4798128c36 | 145 | int postDeviceUpdate(const char* deviceId, int streamNum, |
citrusbyte | 0:5a4798128c36 | 146 | const char* names[], T values[], |
citrusbyte | 0:5a4798128c36 | 147 | const char* at = NULL); |
citrusbyte | 0:5a4798128c36 | 148 | |
citrusbyte | 0:5a4798128c36 | 149 | // Update datasource location |
citrusbyte | 0:5a4798128c36 | 150 | // NOTE: On an Arduino Uno and other ATMEGA based boards, double has |
citrusbyte | 0:5a4798128c36 | 151 | // 4-byte (32 bits) precision, which is the same as float. So there's |
citrusbyte | 0:5a4798128c36 | 152 | // no natural double-precision floating number on these boards. With |
citrusbyte | 0:5a4798128c36 | 153 | // a float value, we have a precision of roughly 7 digits, that means |
citrusbyte | 0:5a4798128c36 | 154 | // either 5 or 6 digits after the floating point. According to wikipedia, |
citrusbyte | 0:5a4798128c36 | 155 | // a difference of 0.00001 will give us ~1.1132m distance. If this |
citrusbyte | 0:5a4798128c36 | 156 | // precision is good for you, you can use the double-version we provided |
citrusbyte | 0:5a4798128c36 | 157 | // here. Otherwise, you may need to use the string-version and do the |
citrusbyte | 0:5a4798128c36 | 158 | // actual conversion by yourselves. |
citrusbyte | 0:5a4798128c36 | 159 | // However, with an Arduino Due board, double has 8-bytes (64 bits) |
citrusbyte | 0:5a4798128c36 | 160 | // precision, which means you are free to use the double-version only |
citrusbyte | 0:5a4798128c36 | 161 | // without any precision problems. |
citrusbyte | 0:5a4798128c36 | 162 | // Returned value is the http status code. |
citrusbyte | 0:5a4798128c36 | 163 | // NOTE: if you want to update by a serial, use "serial/<serial ID>" as |
citrusbyte | 0:5a4798128c36 | 164 | // the device ID here. |
citrusbyte | 0:5a4798128c36 | 165 | template <class T> |
citrusbyte | 0:5a4798128c36 | 166 | int updateLocation(const char* deviceId, const char* name, |
citrusbyte | 0:5a4798128c36 | 167 | T latitude, T longitude, T elevation); |
citrusbyte | 0:5a4798128c36 | 168 | |
citrusbyte | 0:5a4798128c36 | 169 | // Following fields are public so mmqtt callback functions can access directly |
citrusbyte | 0:5a4798128c36 | 170 | Client* _client; |
citrusbyte | 0:5a4798128c36 | 171 | private: |
citrusbyte | 0:5a4798128c36 | 172 | struct mmqtt_connection _connection; |
citrusbyte | 0:5a4798128c36 | 173 | const char* _key; |
citrusbyte | 0:5a4798128c36 | 174 | uint16_t _key_length; |
citrusbyte | 0:5a4798128c36 | 175 | bool _connected; |
citrusbyte | 0:5a4798128c36 | 176 | bool _keepalive; |
citrusbyte | 0:5a4798128c36 | 177 | const char* _host; |
citrusbyte | 0:5a4798128c36 | 178 | int _port; |
citrusbyte | 0:5a4798128c36 | 179 | void (* _idlefunc)(void); |
citrusbyte | 0:5a4798128c36 | 180 | const char* _path_prefix; |
citrusbyte | 0:5a4798128c36 | 181 | NullPrint _null_print; |
citrusbyte | 0:5a4798128c36 | 182 | MMQTTPrint _mmqtt_print; |
citrusbyte | 0:5a4798128c36 | 183 | int16_t _current_id; |
citrusbyte | 0:5a4798128c36 | 184 | |
citrusbyte | 0:5a4798128c36 | 185 | int connectToServer(); |
citrusbyte | 0:5a4798128c36 | 186 | |
citrusbyte | 0:5a4798128c36 | 187 | template <class T> |
citrusbyte | 0:5a4798128c36 | 188 | int printUpdateStreamValuePayload(Print* print, const char* deviceId, |
citrusbyte | 0:5a4798128c36 | 189 | const char* streamName, T value); |
citrusbyte | 0:5a4798128c36 | 190 | |
citrusbyte | 0:5a4798128c36 | 191 | template <class T> |
citrusbyte | 0:5a4798128c36 | 192 | int printPostDeviceUpdatesPayload(Print* print, |
citrusbyte | 0:5a4798128c36 | 193 | const char* deviceId, int streamNum, |
citrusbyte | 0:5a4798128c36 | 194 | const char* names[], const int counts[], |
citrusbyte | 0:5a4798128c36 | 195 | const char* ats[], T values[]); |
citrusbyte | 0:5a4798128c36 | 196 | |
citrusbyte | 0:5a4798128c36 | 197 | template <class T> |
citrusbyte | 0:5a4798128c36 | 198 | int printPostDeviceUpdatePayload(Print* print, |
citrusbyte | 0:5a4798128c36 | 199 | const char* deviceId, int streamNum, |
citrusbyte | 0:5a4798128c36 | 200 | const char* names[], T values[], |
citrusbyte | 0:5a4798128c36 | 201 | const char* at = NULL); |
citrusbyte | 0:5a4798128c36 | 202 | |
citrusbyte | 0:5a4798128c36 | 203 | template <class T> |
citrusbyte | 0:5a4798128c36 | 204 | int printUpdateLocationPayload(Print* print, |
citrusbyte | 0:5a4798128c36 | 205 | const char* deviceId, const char* name, |
citrusbyte | 0:5a4798128c36 | 206 | T latitude, T longitude, T elevation); |
citrusbyte | 0:5a4798128c36 | 207 | |
citrusbyte | 0:5a4798128c36 | 208 | int readStatusCode(); |
citrusbyte | 0:5a4798128c36 | 209 | void close(); |
citrusbyte | 0:5a4798128c36 | 210 | }; |
citrusbyte | 0:5a4798128c36 | 211 | |
citrusbyte | 0:5a4798128c36 | 212 | // Implementations |
citrusbyte | 0:5a4798128c36 | 213 | M2XMQTTClient::M2XMQTTClient(Client* client, |
citrusbyte | 0:5a4798128c36 | 214 | const char* key, |
citrusbyte | 0:5a4798128c36 | 215 | void (* idlefunc)(void), |
citrusbyte | 0:5a4798128c36 | 216 | bool keepalive, |
citrusbyte | 0:5a4798128c36 | 217 | const char* host, |
citrusbyte | 0:5a4798128c36 | 218 | int port, |
citrusbyte | 0:5a4798128c36 | 219 | const char* path_prefix) : _client(client), |
citrusbyte | 0:5a4798128c36 | 220 | _key(key), |
citrusbyte | 0:5a4798128c36 | 221 | _idlefunc(idlefunc), |
citrusbyte | 0:5a4798128c36 | 222 | _keepalive(keepalive), |
citrusbyte | 0:5a4798128c36 | 223 | _connected(false), |
citrusbyte | 0:5a4798128c36 | 224 | _host(host), |
citrusbyte | 0:5a4798128c36 | 225 | _port(port), |
citrusbyte | 0:5a4798128c36 | 226 | _path_prefix(path_prefix), |
citrusbyte | 0:5a4798128c36 | 227 | _null_print(), |
citrusbyte | 0:5a4798128c36 | 228 | _mmqtt_print(), |
citrusbyte | 0:5a4798128c36 | 229 | _current_id(0) { |
citrusbyte | 0:5a4798128c36 | 230 | _key_length = strlen(_key); |
citrusbyte | 0:5a4798128c36 | 231 | } |
citrusbyte | 0:5a4798128c36 | 232 | |
citrusbyte | 0:5a4798128c36 | 233 | mmqtt_status_t m2x_mmqtt_puller(struct mmqtt_connection *connection) { |
citrusbyte | 0:5a4798128c36 | 234 | const uint8_t *data = NULL; |
citrusbyte | 0:5a4798128c36 | 235 | mmqtt_ssize_t length = 0; |
citrusbyte | 0:5a4798128c36 | 236 | mmqtt_status_t status; |
citrusbyte | 0:5a4798128c36 | 237 | M2XMQTTClient *client = (M2XMQTTClient *) connection->connection; |
citrusbyte | 0:5a4798128c36 | 238 | Client *c = client->_client; |
citrusbyte | 0:5a4798128c36 | 239 | struct mmqtt_stream *stream = mmqtt_connection_pullable_stream(connection); |
citrusbyte | 0:5a4798128c36 | 240 | if (stream == NULL) { return MMQTT_STATUS_NOT_PULLABLE; } |
citrusbyte | 0:5a4798128c36 | 241 | |
citrusbyte | 0:5a4798128c36 | 242 | status = mmqtt_stream_external_pullable(stream, &data, &length); |
citrusbyte | 0:5a4798128c36 | 243 | if (status != MMQTT_STATUS_OK) { return status; } |
citrusbyte | 0:5a4798128c36 | 244 | |
citrusbyte | 0:5a4798128c36 | 245 | length = c->write(data, length); |
citrusbyte | 0:5a4798128c36 | 246 | if (length < 0) { |
citrusbyte | 0:5a4798128c36 | 247 | c->stop(); |
citrusbyte | 0:5a4798128c36 | 248 | return MMQTT_STATUS_BROKEN_CONNECTION; |
citrusbyte | 0:5a4798128c36 | 249 | } |
citrusbyte | 0:5a4798128c36 | 250 | mmqtt_stream_external_pull(stream, length); |
citrusbyte | 0:5a4798128c36 | 251 | if (mmqtt_stream_running(stream) == MMQTT_STATUS_DONE) { |
citrusbyte | 0:5a4798128c36 | 252 | mmqtt_connection_release_write_stream(connection, stream); |
citrusbyte | 0:5a4798128c36 | 253 | } |
citrusbyte | 0:5a4798128c36 | 254 | return MMQTT_STATUS_OK; |
citrusbyte | 0:5a4798128c36 | 255 | } |
citrusbyte | 0:5a4798128c36 | 256 | |
citrusbyte | 0:5a4798128c36 | 257 | mmqtt_status_t m2x_mmqtt_pusher(struct mmqtt_connection *connection, mmqtt_ssize_t max_size) { |
citrusbyte | 0:5a4798128c36 | 258 | uint8_t *data = NULL; |
citrusbyte | 0:5a4798128c36 | 259 | mmqtt_ssize_t length = 0, i = 0; |
citrusbyte | 0:5a4798128c36 | 260 | mmqtt_status_t status; |
citrusbyte | 0:5a4798128c36 | 261 | M2XMQTTClient *client = (M2XMQTTClient *) connection->connection; |
citrusbyte | 0:5a4798128c36 | 262 | Client *c = client->_client; |
citrusbyte | 0:5a4798128c36 | 263 | struct mmqtt_stream *stream = mmqtt_connection_pushable_stream(connection); |
citrusbyte | 0:5a4798128c36 | 264 | if (stream == NULL) { return MMQTT_STATUS_NOT_PUSHABLE; } |
citrusbyte | 0:5a4798128c36 | 265 | |
citrusbyte | 0:5a4798128c36 | 266 | status = mmqtt_stream_external_pushable(stream, &data, &length); |
citrusbyte | 0:5a4798128c36 | 267 | if (status != MMQTT_STATUS_OK) { return status; } |
citrusbyte | 0:5a4798128c36 | 268 | length = min(length, max_size); |
citrusbyte | 0:5a4798128c36 | 269 | |
citrusbyte | 0:5a4798128c36 | 270 | /* Maybe we need another field in signature documenting how much data we want, |
citrusbyte | 0:5a4798128c36 | 271 | * so we can handle end condition gracefully? Not 100% if `left` field in |
citrusbyte | 0:5a4798128c36 | 272 | * mmqtt_stream is enough |
citrusbyte | 0:5a4798128c36 | 273 | */ |
citrusbyte | 0:5a4798128c36 | 274 | while (i < length && c->available()) { |
citrusbyte | 0:5a4798128c36 | 275 | data[i++] = c->read(); |
citrusbyte | 0:5a4798128c36 | 276 | } |
citrusbyte | 0:5a4798128c36 | 277 | mmqtt_stream_external_push(stream, i); |
citrusbyte | 0:5a4798128c36 | 278 | return MMQTT_STATUS_OK; |
citrusbyte | 0:5a4798128c36 | 279 | } |
citrusbyte | 0:5a4798128c36 | 280 | |
citrusbyte | 0:5a4798128c36 | 281 | size_t m2x_mjson_reader(struct mjson_ctx *ctx, char *data, size_t limit) |
citrusbyte | 0:5a4798128c36 | 282 | { |
citrusbyte | 0:5a4798128c36 | 283 | mmqtt_connection *connection = (mmqtt_connection *) ctx->userdata; |
citrusbyte | 0:5a4798128c36 | 284 | return mmqtt_s_decode_buffer(connection, m2x_mmqtt_pusher, (uint8_t *) data, limit, |
citrusbyte | 0:5a4798128c36 | 285 | limit) == MMQTT_STATUS_OK ? limit : 0; |
citrusbyte | 0:5a4798128c36 | 286 | } |
citrusbyte | 0:5a4798128c36 | 287 | |
citrusbyte | 0:5a4798128c36 | 288 | int M2XMQTTClient::connectToServer() { |
citrusbyte | 0:5a4798128c36 | 289 | mmqtt_status_t status; |
citrusbyte | 0:5a4798128c36 | 290 | struct mmqtt_p_connect_header connect_header; |
citrusbyte | 0:5a4798128c36 | 291 | struct mmqtt_p_connack_header connack_header; |
citrusbyte | 0:5a4798128c36 | 292 | uint32_t packet_length; |
citrusbyte | 0:5a4798128c36 | 293 | uint8_t flag; |
citrusbyte | 0:5a4798128c36 | 294 | uint16_t length; |
citrusbyte | 0:5a4798128c36 | 295 | uint8_t name[6]; |
citrusbyte | 0:5a4798128c36 | 296 | |
citrusbyte | 0:5a4798128c36 | 297 | if (_client->connect(_host, _port)) { |
citrusbyte | 0:5a4798128c36 | 298 | DBGLN("%s", F("Connected to M2X MQTT server!")); |
citrusbyte | 0:5a4798128c36 | 299 | mmqtt_connection_init(&_connection, this); |
citrusbyte | 0:5a4798128c36 | 300 | /* Send CONNECT packet first */ |
citrusbyte | 0:5a4798128c36 | 301 | connect_header.name = name; |
citrusbyte | 0:5a4798128c36 | 302 | strncpy((char *)name, F("MQIsdp"), 6); |
citrusbyte | 0:5a4798128c36 | 303 | connect_header.name_length = connect_header.name_max_length = 6; |
citrusbyte | 0:5a4798128c36 | 304 | connect_header.protocol_version = 3; |
citrusbyte | 0:5a4798128c36 | 305 | /* Clean session with username set */ |
citrusbyte | 0:5a4798128c36 | 306 | connect_header.flags = 0x82; |
citrusbyte | 0:5a4798128c36 | 307 | connect_header.keepalive = 60; |
citrusbyte | 0:5a4798128c36 | 308 | packet_length = mmqtt_s_connect_header_encoded_length(&connect_header) + |
citrusbyte | 0:5a4798128c36 | 309 | mmqtt_s_string_encoded_length(_key_length); |
citrusbyte | 0:5a4798128c36 | 310 | status = mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller, |
citrusbyte | 0:5a4798128c36 | 311 | MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_CONNECT), |
citrusbyte | 0:5a4798128c36 | 312 | packet_length); |
citrusbyte | 0:5a4798128c36 | 313 | if (status != MMQTT_STATUS_OK) { |
citrusbyte | 0:5a4798128c36 | 314 | DBG("%s", F("Error sending connect packet fixed header: ")); |
citrusbyte | 0:5a4798128c36 | 315 | DBGLN("%d", status); |
citrusbyte | 0:5a4798128c36 | 316 | _client->stop(); |
citrusbyte | 0:5a4798128c36 | 317 | return E_DISCONNECTED; |
citrusbyte | 0:5a4798128c36 | 318 | } |
citrusbyte | 0:5a4798128c36 | 319 | status = mmqtt_s_encode_connect_header(&_connection, m2x_mmqtt_puller, &connect_header); |
citrusbyte | 0:5a4798128c36 | 320 | if (status != MMQTT_STATUS_OK) { |
citrusbyte | 0:5a4798128c36 | 321 | DBG("%s", F("Error sending connect packet variable header: ")); |
citrusbyte | 0:5a4798128c36 | 322 | DBGLN("%d", status); |
citrusbyte | 0:5a4798128c36 | 323 | _client->stop(); |
citrusbyte | 0:5a4798128c36 | 324 | return E_DISCONNECTED; |
citrusbyte | 0:5a4798128c36 | 325 | } |
citrusbyte | 0:5a4798128c36 | 326 | status = mmqtt_s_encode_string(&_connection, m2x_mmqtt_puller, |
citrusbyte | 0:5a4798128c36 | 327 | (const uint8_t *) _key, _key_length); |
citrusbyte | 0:5a4798128c36 | 328 | if (status != MMQTT_STATUS_OK) { |
citrusbyte | 0:5a4798128c36 | 329 | DBG("%s", F("Error sending connect packet payload: ")); |
citrusbyte | 0:5a4798128c36 | 330 | DBGLN("%d", status); |
citrusbyte | 0:5a4798128c36 | 331 | _client->stop(); |
citrusbyte | 0:5a4798128c36 | 332 | return E_DISCONNECTED; |
citrusbyte | 0:5a4798128c36 | 333 | } |
citrusbyte | 0:5a4798128c36 | 334 | /* Check CONNACK packet */ |
citrusbyte | 0:5a4798128c36 | 335 | do { |
citrusbyte | 0:5a4798128c36 | 336 | status = mmqtt_s_decode_fixed_header(&_connection, m2x_mmqtt_pusher, |
citrusbyte | 0:5a4798128c36 | 337 | &flag, &packet_length); |
citrusbyte | 0:5a4798128c36 | 338 | if (status != MMQTT_STATUS_OK) { |
citrusbyte | 0:5a4798128c36 | 339 | DBG("%s", F("Error decoding connack fixed header: ")); |
citrusbyte | 0:5a4798128c36 | 340 | DBGLN("%d", status); |
citrusbyte | 0:5a4798128c36 | 341 | _client->stop(); |
citrusbyte | 0:5a4798128c36 | 342 | return E_DISCONNECTED; |
citrusbyte | 0:5a4798128c36 | 343 | } |
citrusbyte | 0:5a4798128c36 | 344 | if (MMQTT_UNPACK_MESSAGE_TYPE(flag) != MMQTT_MESSAGE_TYPE_CONNACK) { |
citrusbyte | 0:5a4798128c36 | 345 | status = mmqtt_s_skip_buffer(&_connection, m2x_mmqtt_pusher, packet_length); |
citrusbyte | 0:5a4798128c36 | 346 | if (status != MMQTT_STATUS_OK) { |
citrusbyte | 0:5a4798128c36 | 347 | DBG("%s", F("Error skipping non-connack packet: ")); |
citrusbyte | 0:5a4798128c36 | 348 | DBGLN("%d", status); |
citrusbyte | 0:5a4798128c36 | 349 | _client->stop(); |
citrusbyte | 0:5a4798128c36 | 350 | return E_DISCONNECTED; |
citrusbyte | 0:5a4798128c36 | 351 | } |
citrusbyte | 0:5a4798128c36 | 352 | } |
citrusbyte | 0:5a4798128c36 | 353 | } while (MMQTT_UNPACK_MESSAGE_TYPE(flag) != MMQTT_MESSAGE_TYPE_CONNACK); |
citrusbyte | 0:5a4798128c36 | 354 | status = mmqtt_s_decode_connack_header(&_connection, m2x_mmqtt_pusher, |
citrusbyte | 0:5a4798128c36 | 355 | &connack_header); |
citrusbyte | 0:5a4798128c36 | 356 | if (status != MMQTT_STATUS_OK) { |
citrusbyte | 0:5a4798128c36 | 357 | DBG("%s", F("Error decoding connack variable header: ")); |
citrusbyte | 0:5a4798128c36 | 358 | DBGLN("%d", status); |
citrusbyte | 0:5a4798128c36 | 359 | _client->stop(); |
citrusbyte | 0:5a4798128c36 | 360 | return E_DISCONNECTED; |
citrusbyte | 0:5a4798128c36 | 361 | } |
citrusbyte | 0:5a4798128c36 | 362 | if (connack_header.return_code != 0x0) { |
citrusbyte | 0:5a4798128c36 | 363 | DBG("%s", F("CONNACK return code is not accepted: ")); |
citrusbyte | 0:5a4798128c36 | 364 | DBGLN("%d", connack_header.return_code); |
citrusbyte | 0:5a4798128c36 | 365 | _client->stop(); |
citrusbyte | 0:5a4798128c36 | 366 | return E_DISCONNECTED; |
citrusbyte | 0:5a4798128c36 | 367 | } |
citrusbyte | 0:5a4798128c36 | 368 | /* Send SUBSCRIBE packet*/ |
citrusbyte | 0:5a4798128c36 | 369 | length = _key_length + 15 + 4; |
citrusbyte | 0:5a4798128c36 | 370 | status = mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller, |
citrusbyte | 0:5a4798128c36 | 371 | MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_SUBSCRIBE) | 0x2, |
citrusbyte | 0:5a4798128c36 | 372 | length); |
citrusbyte | 0:5a4798128c36 | 373 | if (status != MMQTT_STATUS_OK) { |
citrusbyte | 0:5a4798128c36 | 374 | DBG("%s", F("Error sending subscribe packet fixed header: ")); |
citrusbyte | 0:5a4798128c36 | 375 | DBGLN("%d", status); |
citrusbyte | 0:5a4798128c36 | 376 | _client->stop(); |
citrusbyte | 0:5a4798128c36 | 377 | return E_DISCONNECTED; |
citrusbyte | 0:5a4798128c36 | 378 | } |
citrusbyte | 0:5a4798128c36 | 379 | // Subscribe packet must use QoS 1 |
citrusbyte | 0:5a4798128c36 | 380 | mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, 0); |
citrusbyte | 0:5a4798128c36 | 381 | mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 14); |
citrusbyte | 0:5a4798128c36 | 382 | mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4); |
citrusbyte | 0:5a4798128c36 | 383 | mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length); |
citrusbyte | 0:5a4798128c36 | 384 | // The extra one is QoS, added here to save a function call |
citrusbyte | 0:5a4798128c36 | 385 | mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/responses\0"), 11); |
citrusbyte | 0:5a4798128c36 | 386 | /* Check SUBACK packet */ |
citrusbyte | 0:5a4798128c36 | 387 | do { |
citrusbyte | 0:5a4798128c36 | 388 | status = mmqtt_s_decode_fixed_header(&_connection, m2x_mmqtt_pusher, |
citrusbyte | 0:5a4798128c36 | 389 | &flag, &packet_length); |
citrusbyte | 0:5a4798128c36 | 390 | if (status != MMQTT_STATUS_OK) { |
citrusbyte | 0:5a4798128c36 | 391 | DBG("%s", F("Error decoding suback fixed header: ")); |
citrusbyte | 0:5a4798128c36 | 392 | DBGLN("%d", status); |
citrusbyte | 0:5a4798128c36 | 393 | _client->stop(); |
citrusbyte | 0:5a4798128c36 | 394 | return E_DISCONNECTED; |
citrusbyte | 0:5a4798128c36 | 395 | } |
citrusbyte | 0:5a4798128c36 | 396 | if (MMQTT_UNPACK_MESSAGE_TYPE(flag) != MMQTT_MESSAGE_TYPE_SUBACK) { |
citrusbyte | 0:5a4798128c36 | 397 | status = mmqtt_s_skip_buffer(&_connection, m2x_mmqtt_pusher, packet_length); |
citrusbyte | 0:5a4798128c36 | 398 | if (status != MMQTT_STATUS_OK) { |
citrusbyte | 0:5a4798128c36 | 399 | DBG("%s", F("Error skipping packet: ")); |
citrusbyte | 0:5a4798128c36 | 400 | DBGLN("%d", status); |
citrusbyte | 0:5a4798128c36 | 401 | _client->stop(); |
citrusbyte | 0:5a4798128c36 | 402 | return E_DISCONNECTED; |
citrusbyte | 0:5a4798128c36 | 403 | } |
citrusbyte | 0:5a4798128c36 | 404 | } |
citrusbyte | 0:5a4798128c36 | 405 | } while (MMQTT_UNPACK_MESSAGE_TYPE(flag) != MMQTT_MESSAGE_TYPE_SUBACK); |
citrusbyte | 0:5a4798128c36 | 406 | status = mmqtt_s_skip_buffer(&_connection, m2x_mmqtt_pusher, packet_length); |
citrusbyte | 0:5a4798128c36 | 407 | if (status != MMQTT_STATUS_OK) { |
citrusbyte | 0:5a4798128c36 | 408 | DBG("%s", F("Error skipping suback packet: ")); |
citrusbyte | 0:5a4798128c36 | 409 | DBGLN("%d", status); |
citrusbyte | 0:5a4798128c36 | 410 | _client->stop(); |
citrusbyte | 0:5a4798128c36 | 411 | return E_DISCONNECTED; |
citrusbyte | 0:5a4798128c36 | 412 | } |
citrusbyte | 0:5a4798128c36 | 413 | _mmqtt_print.connection = &_connection; |
citrusbyte | 0:5a4798128c36 | 414 | _mmqtt_print.puller = m2x_mmqtt_puller; |
citrusbyte | 0:5a4798128c36 | 415 | _connected = true; |
citrusbyte | 0:5a4798128c36 | 416 | return E_OK; |
citrusbyte | 0:5a4798128c36 | 417 | } else { |
citrusbyte | 0:5a4798128c36 | 418 | DBGLN("%s", F("ERROR: Cannot connect to M2X MQTT server!")); |
citrusbyte | 0:5a4798128c36 | 419 | return E_NOCONNECTION; |
citrusbyte | 0:5a4798128c36 | 420 | } |
citrusbyte | 0:5a4798128c36 | 421 | } |
citrusbyte | 0:5a4798128c36 | 422 | |
citrusbyte | 0:5a4798128c36 | 423 | template <class T> |
citrusbyte | 0:5a4798128c36 | 424 | int M2XMQTTClient::updateStreamValue(const char* deviceId, const char* streamName, T value) { |
citrusbyte | 0:5a4798128c36 | 425 | int length; |
citrusbyte | 0:5a4798128c36 | 426 | if (!_connected) { |
citrusbyte | 0:5a4798128c36 | 427 | if (connectToServer() != E_OK) { |
citrusbyte | 0:5a4798128c36 | 428 | DBGLN("%s", "ERROR: Cannot connect to M2X server!"); |
citrusbyte | 0:5a4798128c36 | 429 | return E_NOCONNECTION; |
citrusbyte | 0:5a4798128c36 | 430 | } |
citrusbyte | 0:5a4798128c36 | 431 | } |
citrusbyte | 0:5a4798128c36 | 432 | _current_id++; |
citrusbyte | 0:5a4798128c36 | 433 | length = printUpdateStreamValuePayload(&_null_print, deviceId, streamName, value); |
citrusbyte | 0:5a4798128c36 | 434 | mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller, |
citrusbyte | 0:5a4798128c36 | 435 | MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_PUBLISH), |
citrusbyte | 0:5a4798128c36 | 436 | length + _key_length + 15); |
citrusbyte | 0:5a4798128c36 | 437 | mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 13); |
citrusbyte | 0:5a4798128c36 | 438 | mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4); |
citrusbyte | 0:5a4798128c36 | 439 | mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length); |
citrusbyte | 0:5a4798128c36 | 440 | mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/requests"), 9); |
citrusbyte | 0:5a4798128c36 | 441 | printUpdateStreamValuePayload(&_mmqtt_print, deviceId, streamName, value); |
citrusbyte | 0:5a4798128c36 | 442 | return readStatusCode(); |
citrusbyte | 0:5a4798128c36 | 443 | } |
citrusbyte | 0:5a4798128c36 | 444 | |
citrusbyte | 0:5a4798128c36 | 445 | template <class T> |
citrusbyte | 0:5a4798128c36 | 446 | int M2XMQTTClient::printUpdateStreamValuePayload(Print* print, const char* deviceId, |
citrusbyte | 0:5a4798128c36 | 447 | const char* streamName, T value) { |
citrusbyte | 0:5a4798128c36 | 448 | int bytes = 0; |
citrusbyte | 0:5a4798128c36 | 449 | bytes += print->print(F("{\"id\":\"")); |
citrusbyte | 0:5a4798128c36 | 450 | bytes += print->print(_current_id); |
citrusbyte | 0:5a4798128c36 | 451 | bytes += print->print(F("\",\"method\":\"PUT\",\"resource\":\"")); |
citrusbyte | 0:5a4798128c36 | 452 | if (_path_prefix) { bytes += print->print(_path_prefix); } |
citrusbyte | 0:5a4798128c36 | 453 | bytes += print->print(F("/v2/devices/")); |
citrusbyte | 0:5a4798128c36 | 454 | bytes += print->print(deviceId); |
citrusbyte | 0:5a4798128c36 | 455 | bytes += print->print(F("/streams/")); |
citrusbyte | 0:5a4798128c36 | 456 | bytes += print->print(streamName); |
citrusbyte | 0:5a4798128c36 | 457 | bytes += print->print(F("/value")); |
citrusbyte | 0:5a4798128c36 | 458 | bytes += print->print(F("\",\"agent\":\"")); |
citrusbyte | 0:5a4798128c36 | 459 | bytes += print->print(USER_AGENT); |
citrusbyte | 0:5a4798128c36 | 460 | bytes += print->print(F("\",\"body\":")); |
citrusbyte | 0:5a4798128c36 | 461 | bytes += print->print(F("{\"value\":\"")); |
citrusbyte | 0:5a4798128c36 | 462 | bytes += print->print(value); |
citrusbyte | 0:5a4798128c36 | 463 | bytes += print->print(F("\"}")); |
citrusbyte | 0:5a4798128c36 | 464 | bytes += print->print(F("}")); |
citrusbyte | 0:5a4798128c36 | 465 | } |
citrusbyte | 0:5a4798128c36 | 466 | |
citrusbyte | 0:5a4798128c36 | 467 | template <class T> |
citrusbyte | 0:5a4798128c36 | 468 | int M2XMQTTClient::postDeviceUpdates(const char* deviceId, int streamNum, |
citrusbyte | 0:5a4798128c36 | 469 | const char* names[], const int counts[], |
citrusbyte | 0:5a4798128c36 | 470 | const char* ats[], T values[]) { |
citrusbyte | 0:5a4798128c36 | 471 | int length; |
citrusbyte | 0:5a4798128c36 | 472 | if (!_connected) { |
citrusbyte | 0:5a4798128c36 | 473 | if (connectToServer() != E_OK) { |
citrusbyte | 0:5a4798128c36 | 474 | DBGLN("%s", "ERROR: Cannot connect to M2X server!"); |
citrusbyte | 0:5a4798128c36 | 475 | return E_NOCONNECTION; |
citrusbyte | 0:5a4798128c36 | 476 | } |
citrusbyte | 0:5a4798128c36 | 477 | } |
citrusbyte | 0:5a4798128c36 | 478 | _current_id++; |
citrusbyte | 0:5a4798128c36 | 479 | length = printPostDeviceUpdatesPayload(&_null_print, deviceId, streamNum, |
citrusbyte | 0:5a4798128c36 | 480 | names, counts, ats, values); |
citrusbyte | 0:5a4798128c36 | 481 | mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller, |
citrusbyte | 0:5a4798128c36 | 482 | MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_PUBLISH), |
citrusbyte | 0:5a4798128c36 | 483 | length + _key_length + 15); |
citrusbyte | 0:5a4798128c36 | 484 | mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 13); |
citrusbyte | 0:5a4798128c36 | 485 | mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4); |
citrusbyte | 0:5a4798128c36 | 486 | mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length); |
citrusbyte | 0:5a4798128c36 | 487 | mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/requests"), 9); |
citrusbyte | 0:5a4798128c36 | 488 | printPostDeviceUpdatesPayload(&_mmqtt_print, deviceId, streamNum, |
citrusbyte | 0:5a4798128c36 | 489 | names, counts, ats, values); |
citrusbyte | 0:5a4798128c36 | 490 | return readStatusCode(); |
citrusbyte | 0:5a4798128c36 | 491 | } |
citrusbyte | 0:5a4798128c36 | 492 | |
citrusbyte | 0:5a4798128c36 | 493 | template <class T> |
citrusbyte | 0:5a4798128c36 | 494 | int M2XMQTTClient::printPostDeviceUpdatesPayload(Print* print, |
citrusbyte | 0:5a4798128c36 | 495 | const char* deviceId, int streamNum, |
citrusbyte | 0:5a4798128c36 | 496 | const char* names[], const int counts[], |
citrusbyte | 0:5a4798128c36 | 497 | const char* ats[], T values[]) { |
citrusbyte | 0:5a4798128c36 | 498 | int bytes = 0, value_index = 0, i, j; |
citrusbyte | 0:5a4798128c36 | 499 | bytes += print->print(F("{\"id\":\"")); |
citrusbyte | 0:5a4798128c36 | 500 | bytes += print->print(_current_id); |
citrusbyte | 0:5a4798128c36 | 501 | bytes += print->print(F("\",\"method\":\"POST\",\"resource\":\"")); |
citrusbyte | 0:5a4798128c36 | 502 | if (_path_prefix) { bytes += print->print(_path_prefix); } |
citrusbyte | 0:5a4798128c36 | 503 | bytes += print->print(F("/v2/devices/")); |
citrusbyte | 0:5a4798128c36 | 504 | bytes += print->print(deviceId); |
citrusbyte | 0:5a4798128c36 | 505 | bytes += print->print(F("/updates")); |
citrusbyte | 0:5a4798128c36 | 506 | bytes += print->print(F("\",\"agent\":\"")); |
citrusbyte | 0:5a4798128c36 | 507 | bytes += print->print(USER_AGENT); |
citrusbyte | 0:5a4798128c36 | 508 | bytes += print->print(F("\",\"body\":")); |
citrusbyte | 0:5a4798128c36 | 509 | bytes += print->print(F("{\"values\":{")); |
citrusbyte | 0:5a4798128c36 | 510 | for (i = 0; i < streamNum; i++) { |
citrusbyte | 0:5a4798128c36 | 511 | bytes += print->print(F("\"")); |
citrusbyte | 0:5a4798128c36 | 512 | bytes += print->print(names[i]); |
citrusbyte | 0:5a4798128c36 | 513 | bytes += print->print(F("\":[")); |
citrusbyte | 0:5a4798128c36 | 514 | for (j = 0; j < counts[i]; j++) { |
citrusbyte | 0:5a4798128c36 | 515 | bytes += print->print(F("{\"timestamp\": \"")); |
citrusbyte | 0:5a4798128c36 | 516 | bytes += print->print(ats[value_index]); |
citrusbyte | 0:5a4798128c36 | 517 | bytes += print->print(F("\",\"value\": \"")); |
citrusbyte | 0:5a4798128c36 | 518 | bytes += print->print(values[value_index]); |
citrusbyte | 0:5a4798128c36 | 519 | bytes += print->print(F("\"}")); |
citrusbyte | 0:5a4798128c36 | 520 | if (j < counts[i] - 1) { bytes += print->print(F(",")); } |
citrusbyte | 0:5a4798128c36 | 521 | value_index++; |
citrusbyte | 0:5a4798128c36 | 522 | } |
citrusbyte | 0:5a4798128c36 | 523 | bytes += print->print(F("]")); |
citrusbyte | 0:5a4798128c36 | 524 | if (i < streamNum - 1) { bytes += print->print(F(",")); } |
citrusbyte | 0:5a4798128c36 | 525 | } |
citrusbyte | 0:5a4798128c36 | 526 | bytes += print->print(F(("}}}"))); |
citrusbyte | 0:5a4798128c36 | 527 | return bytes; |
citrusbyte | 0:5a4798128c36 | 528 | } |
citrusbyte | 0:5a4798128c36 | 529 | |
citrusbyte | 0:5a4798128c36 | 530 | template <class T> |
citrusbyte | 0:5a4798128c36 | 531 | int M2XMQTTClient::postDeviceUpdate(const char* deviceId, int streamNum, |
citrusbyte | 0:5a4798128c36 | 532 | const char* names[], T values[], |
citrusbyte | 0:5a4798128c36 | 533 | const char* at) { |
citrusbyte | 0:5a4798128c36 | 534 | int length; |
citrusbyte | 0:5a4798128c36 | 535 | if (!_connected) { |
citrusbyte | 0:5a4798128c36 | 536 | if (connectToServer() != E_OK) { |
citrusbyte | 0:5a4798128c36 | 537 | DBGLN("%s", "ERROR: Cannot connect to M2X server!"); |
citrusbyte | 0:5a4798128c36 | 538 | return E_NOCONNECTION; |
citrusbyte | 0:5a4798128c36 | 539 | } |
citrusbyte | 0:5a4798128c36 | 540 | } |
citrusbyte | 0:5a4798128c36 | 541 | _current_id++; |
citrusbyte | 0:5a4798128c36 | 542 | length = printPostDeviceUpdatePayload(&_null_print, deviceId, streamNum, |
citrusbyte | 0:5a4798128c36 | 543 | names, values, at); |
citrusbyte | 0:5a4798128c36 | 544 | mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller, |
citrusbyte | 0:5a4798128c36 | 545 | MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_PUBLISH), |
citrusbyte | 0:5a4798128c36 | 546 | length + _key_length + 15); |
citrusbyte | 0:5a4798128c36 | 547 | mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 13); |
citrusbyte | 0:5a4798128c36 | 548 | mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4); |
citrusbyte | 0:5a4798128c36 | 549 | mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length); |
citrusbyte | 0:5a4798128c36 | 550 | mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/requests"), 9); |
citrusbyte | 0:5a4798128c36 | 551 | printPostDeviceUpdatePayload(&_mmqtt_print, deviceId, streamNum, |
citrusbyte | 0:5a4798128c36 | 552 | names, values, at); |
citrusbyte | 0:5a4798128c36 | 553 | return readStatusCode(); |
citrusbyte | 0:5a4798128c36 | 554 | } |
citrusbyte | 0:5a4798128c36 | 555 | |
citrusbyte | 0:5a4798128c36 | 556 | template <class T> |
citrusbyte | 0:5a4798128c36 | 557 | int M2XMQTTClient::printPostDeviceUpdatePayload(Print* print, |
citrusbyte | 0:5a4798128c36 | 558 | const char* deviceId, int streamNum, |
citrusbyte | 0:5a4798128c36 | 559 | const char* names[], T values[], |
citrusbyte | 0:5a4798128c36 | 560 | const char* at) { |
citrusbyte | 0:5a4798128c36 | 561 | int bytes = 0, i; |
citrusbyte | 0:5a4798128c36 | 562 | bytes += print->print(F("{\"id\":\"")); |
citrusbyte | 0:5a4798128c36 | 563 | bytes += print->print(_current_id); |
citrusbyte | 0:5a4798128c36 | 564 | bytes += print->print(F("\",\"method\":\"POST\",\"resource\":\"")); |
citrusbyte | 0:5a4798128c36 | 565 | if (_path_prefix) { bytes += print->print(_path_prefix); } |
citrusbyte | 0:5a4798128c36 | 566 | bytes += print->print(F("/v2/devices/")); |
citrusbyte | 0:5a4798128c36 | 567 | bytes += print->print(deviceId); |
citrusbyte | 0:5a4798128c36 | 568 | bytes += print->print(F("/update")); |
citrusbyte | 0:5a4798128c36 | 569 | bytes += print->print(F("\",\"agent\":\"")); |
citrusbyte | 0:5a4798128c36 | 570 | bytes += print->print(USER_AGENT); |
citrusbyte | 0:5a4798128c36 | 571 | bytes += print->print(F("\",\"body\":")); |
citrusbyte | 0:5a4798128c36 | 572 | bytes += print->print(F("{\"values\":{")); |
citrusbyte | 0:5a4798128c36 | 573 | for (int i = 0; i < streamNum; i++) { |
citrusbyte | 0:5a4798128c36 | 574 | bytes += print->print(F("\"")); |
citrusbyte | 0:5a4798128c36 | 575 | bytes += print->print(names[i]); |
citrusbyte | 0:5a4798128c36 | 576 | bytes += print->print(F("\": \"")); |
citrusbyte | 0:5a4798128c36 | 577 | bytes += print->print(values[i]); |
citrusbyte | 0:5a4798128c36 | 578 | bytes += print->print(F("\"")); |
citrusbyte | 0:5a4798128c36 | 579 | if (i < streamNum - 1) { bytes += print->print(F(",")); } |
citrusbyte | 0:5a4798128c36 | 580 | } |
citrusbyte | 0:5a4798128c36 | 581 | bytes += print->print(F("}")); |
citrusbyte | 0:5a4798128c36 | 582 | if (at != NULL) { |
citrusbyte | 0:5a4798128c36 | 583 | bytes += print->print(F(",\"timestamp\":\"")); |
citrusbyte | 0:5a4798128c36 | 584 | bytes += print->print(at); |
citrusbyte | 0:5a4798128c36 | 585 | bytes += print->print(F("\"")); |
citrusbyte | 0:5a4798128c36 | 586 | } |
citrusbyte | 0:5a4798128c36 | 587 | bytes += print->print(F(("}"))); |
citrusbyte | 0:5a4798128c36 | 588 | return bytes; |
citrusbyte | 0:5a4798128c36 | 589 | } |
citrusbyte | 0:5a4798128c36 | 590 | |
citrusbyte | 0:5a4798128c36 | 591 | template <class T> |
citrusbyte | 0:5a4798128c36 | 592 | int M2XMQTTClient::updateLocation(const char* deviceId, const char* name, |
citrusbyte | 0:5a4798128c36 | 593 | T latitude, T longitude, T elevation) { |
citrusbyte | 0:5a4798128c36 | 594 | int length; |
citrusbyte | 0:5a4798128c36 | 595 | if (!_connected) { |
citrusbyte | 0:5a4798128c36 | 596 | if (connectToServer() != E_OK) { |
citrusbyte | 0:5a4798128c36 | 597 | DBGLN("%s", "ERROR: Cannot connect to M2X server!"); |
citrusbyte | 0:5a4798128c36 | 598 | return E_NOCONNECTION; |
citrusbyte | 0:5a4798128c36 | 599 | } |
citrusbyte | 0:5a4798128c36 | 600 | } |
citrusbyte | 0:5a4798128c36 | 601 | _current_id++; |
citrusbyte | 0:5a4798128c36 | 602 | length = printUpdateLocationPayload(&_null_print, deviceId, name, |
citrusbyte | 0:5a4798128c36 | 603 | latitude, longitude, elevation); |
citrusbyte | 0:5a4798128c36 | 604 | mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller, |
citrusbyte | 0:5a4798128c36 | 605 | MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_PUBLISH), |
citrusbyte | 0:5a4798128c36 | 606 | length + _key_length + 15); |
citrusbyte | 0:5a4798128c36 | 607 | mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 13); |
citrusbyte | 0:5a4798128c36 | 608 | mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4); |
citrusbyte | 0:5a4798128c36 | 609 | mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length); |
citrusbyte | 0:5a4798128c36 | 610 | mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/requests"), 9); |
citrusbyte | 0:5a4798128c36 | 611 | printUpdateLocationPayload(&_mmqtt_print, deviceId, name, |
citrusbyte | 0:5a4798128c36 | 612 | latitude, longitude, elevation); |
citrusbyte | 0:5a4798128c36 | 613 | return readStatusCode(); |
citrusbyte | 0:5a4798128c36 | 614 | } |
citrusbyte | 0:5a4798128c36 | 615 | |
citrusbyte | 0:5a4798128c36 | 616 | template <class T> |
citrusbyte | 0:5a4798128c36 | 617 | int M2XMQTTClient::printUpdateLocationPayload(Print* print, |
citrusbyte | 0:5a4798128c36 | 618 | const char* deviceId, const char* name, |
citrusbyte | 0:5a4798128c36 | 619 | T latitude, T longitude, T elevation) { |
citrusbyte | 0:5a4798128c36 | 620 | int bytes = 0; |
citrusbyte | 0:5a4798128c36 | 621 | bytes += print->print(F("{\"id\":\"")); |
citrusbyte | 0:5a4798128c36 | 622 | bytes += print->print(_current_id); |
citrusbyte | 0:5a4798128c36 | 623 | bytes += print->print(F("\",\"method\":\"PUT\",\"resource\":\"")); |
citrusbyte | 0:5a4798128c36 | 624 | if (_path_prefix) { bytes += print->print(_path_prefix); } |
citrusbyte | 0:5a4798128c36 | 625 | bytes += print->print(F("/v2/devices/")); |
citrusbyte | 0:5a4798128c36 | 626 | bytes += print->print(deviceId); |
citrusbyte | 0:5a4798128c36 | 627 | bytes += print->print(F("/location")); |
citrusbyte | 0:5a4798128c36 | 628 | bytes += print->print(F("\",\"agent\":\"")); |
citrusbyte | 0:5a4798128c36 | 629 | bytes += print->print(USER_AGENT); |
citrusbyte | 0:5a4798128c36 | 630 | bytes += print->print(F("\",\"body\":{\"name\":\"")); |
citrusbyte | 0:5a4798128c36 | 631 | bytes += print->print(name); |
citrusbyte | 0:5a4798128c36 | 632 | bytes += print->print(F("\",\"latitude\":\"")); |
citrusbyte | 0:5a4798128c36 | 633 | bytes += print->print(latitude); |
citrusbyte | 0:5a4798128c36 | 634 | bytes += print->print(F("\",\"longitude\":\"")); |
citrusbyte | 0:5a4798128c36 | 635 | bytes += print->print(longitude); |
citrusbyte | 0:5a4798128c36 | 636 | bytes += print->print(F("\",\"elevation\":\"")); |
citrusbyte | 0:5a4798128c36 | 637 | bytes += print->print(elevation); |
citrusbyte | 0:5a4798128c36 | 638 | bytes += print->print(F(("\"}}"))); |
citrusbyte | 0:5a4798128c36 | 639 | return bytes; |
citrusbyte | 0:5a4798128c36 | 640 | } |
citrusbyte | 0:5a4798128c36 | 641 | |
citrusbyte | 0:5a4798128c36 | 642 | int M2XMQTTClient::readStatusCode() { |
citrusbyte | 0:5a4798128c36 | 643 | mmqtt_status_t status; |
citrusbyte | 0:5a4798128c36 | 644 | uint32_t packet_length; |
citrusbyte | 0:5a4798128c36 | 645 | uint8_t flag; |
citrusbyte | 0:5a4798128c36 | 646 | int16_t parsed_id, response_status; |
citrusbyte | 0:5a4798128c36 | 647 | struct mjson_ctx ctx; |
citrusbyte | 0:5a4798128c36 | 648 | char buf[6]; |
citrusbyte | 0:5a4798128c36 | 649 | size_t buf_length; |
citrusbyte | 0:5a4798128c36 | 650 | |
citrusbyte | 0:5a4798128c36 | 651 | while (true) { |
citrusbyte | 0:5a4798128c36 | 652 | status = mmqtt_s_decode_fixed_header(&_connection, m2x_mmqtt_pusher, |
citrusbyte | 0:5a4798128c36 | 653 | &flag, &packet_length); |
citrusbyte | 0:5a4798128c36 | 654 | if (status != MMQTT_STATUS_OK) { |
citrusbyte | 0:5a4798128c36 | 655 | DBG("%s", F("Error decoding publish fixed header: ")); |
citrusbyte | 0:5a4798128c36 | 656 | DBGLN("%d", status); |
citrusbyte | 0:5a4798128c36 | 657 | close(); |
citrusbyte | 0:5a4798128c36 | 658 | return E_DISCONNECTED; |
citrusbyte | 0:5a4798128c36 | 659 | } |
citrusbyte | 0:5a4798128c36 | 660 | if (MMQTT_UNPACK_MESSAGE_TYPE(flag) != MMQTT_MESSAGE_TYPE_PUBLISH) { |
citrusbyte | 0:5a4798128c36 | 661 | status = mmqtt_s_skip_buffer(&_connection, m2x_mmqtt_pusher, packet_length); |
citrusbyte | 0:5a4798128c36 | 662 | if (status != MMQTT_STATUS_OK) { |
citrusbyte | 0:5a4798128c36 | 663 | DBG("%s", F("Error skipping non-publish packet: ")); |
citrusbyte | 0:5a4798128c36 | 664 | DBGLN("%d", status); |
citrusbyte | 0:5a4798128c36 | 665 | close(); |
citrusbyte | 0:5a4798128c36 | 666 | return E_DISCONNECTED; |
citrusbyte | 0:5a4798128c36 | 667 | } |
citrusbyte | 0:5a4798128c36 | 668 | } else { |
citrusbyte | 0:5a4798128c36 | 669 | /* |
citrusbyte | 0:5a4798128c36 | 670 | * Since we only subscribe to one channel, there's no need to check channel name. |
citrusbyte | 0:5a4798128c36 | 671 | */ |
citrusbyte | 0:5a4798128c36 | 672 | status = mmqtt_s_decode_string(&_connection, m2x_mmqtt_pusher, NULL, 0, |
citrusbyte | 0:5a4798128c36 | 673 | NULL, NULL); |
citrusbyte | 0:5a4798128c36 | 674 | if (status != MMQTT_STATUS_OK) { |
citrusbyte | 0:5a4798128c36 | 675 | DBG("%s", F("Error skipping channel name string: ")); |
citrusbyte | 0:5a4798128c36 | 676 | DBGLN("%d", status); |
citrusbyte | 0:5a4798128c36 | 677 | close(); |
citrusbyte | 0:5a4798128c36 | 678 | return E_DISCONNECTED; |
citrusbyte | 0:5a4798128c36 | 679 | } |
citrusbyte | 0:5a4798128c36 | 680 | /* |
citrusbyte | 0:5a4798128c36 | 681 | * Parse JSON body for ID and status |
citrusbyte | 0:5a4798128c36 | 682 | */ |
citrusbyte | 0:5a4798128c36 | 683 | mjson_init(&ctx, &_connection, m2x_mjson_reader); |
citrusbyte | 0:5a4798128c36 | 684 | parsed_id = -1; |
citrusbyte | 0:5a4798128c36 | 685 | if (mjson_readcheck_object_start(&ctx) != MJSON_OK) { |
citrusbyte | 0:5a4798128c36 | 686 | /* Oops we have an error */ |
citrusbyte | 0:5a4798128c36 | 687 | DBG("%s", F("Publish Packet is not a JSON object!")); |
citrusbyte | 0:5a4798128c36 | 688 | close(); |
citrusbyte | 0:5a4798128c36 | 689 | return E_DISCONNECTED; |
citrusbyte | 0:5a4798128c36 | 690 | } |
citrusbyte | 0:5a4798128c36 | 691 | while (mjson_read_object_separator_or_end(&ctx) != MJSON_SUBTYPE_OBJECT_END) { |
citrusbyte | 0:5a4798128c36 | 692 | if (mjson_readcheck_string_start(&ctx) != MJSON_OK) { |
citrusbyte | 0:5a4798128c36 | 693 | DBG("%s", F("Object key is not string!")); |
citrusbyte | 0:5a4798128c36 | 694 | close(); |
citrusbyte | 0:5a4798128c36 | 695 | return E_DISCONNECTED; |
citrusbyte | 0:5a4798128c36 | 696 | } |
citrusbyte | 0:5a4798128c36 | 697 | mjson_read_full_string(&ctx, buf, 6, &buf_length); |
citrusbyte | 0:5a4798128c36 | 698 | mjson_read_object_key_separator(&ctx); |
citrusbyte | 0:5a4798128c36 | 699 | if (strncmp(buf, F("status"), 6) == 0) { |
citrusbyte | 0:5a4798128c36 | 700 | mjson_read_int16(&ctx, &response_status); |
citrusbyte | 0:5a4798128c36 | 701 | } else if (strncmp(buf, F("id"), 2) == 0) { |
citrusbyte | 0:5a4798128c36 | 702 | /* Hack since we know the ID passed is actually an integer*/ |
citrusbyte | 0:5a4798128c36 | 703 | mjson_readcheck_string_start(&ctx); |
citrusbyte | 0:5a4798128c36 | 704 | mjson_read_int16(&ctx, &parsed_id); |
citrusbyte | 0:5a4798128c36 | 705 | mjson_read_string_end(&ctx); |
citrusbyte | 0:5a4798128c36 | 706 | } else { |
citrusbyte | 0:5a4798128c36 | 707 | mjson_skip_value(&ctx); |
citrusbyte | 0:5a4798128c36 | 708 | } |
citrusbyte | 0:5a4798128c36 | 709 | } |
citrusbyte | 0:5a4798128c36 | 710 | if (parsed_id == _current_id) { return response_status; } |
citrusbyte | 0:5a4798128c36 | 711 | } |
citrusbyte | 0:5a4798128c36 | 712 | } |
citrusbyte | 0:5a4798128c36 | 713 | return E_NOTREACHABLE; |
citrusbyte | 0:5a4798128c36 | 714 | } |
citrusbyte | 0:5a4798128c36 | 715 | |
citrusbyte | 0:5a4798128c36 | 716 | void M2XMQTTClient::close() { |
citrusbyte | 0:5a4798128c36 | 717 | _client->stop(); |
citrusbyte | 0:5a4798128c36 | 718 | _connected = false; |
citrusbyte | 0:5a4798128c36 | 719 | } |
citrusbyte | 0:5a4798128c36 | 720 | |
citrusbyte | 0:5a4798128c36 | 721 | #endif /* M2XMQTTCLIENT_H_ */ |