123

Committer:
advxolltm
Date:
Mon Jun 06 16:36:45 2022 +0000
Revision:
7:9571227876ee
Parent:
6:1ce536bf461b
123;

Who changed what in which revision?

UserRevisionLine numberNew contents of line
hudakz 0:83c732b10e95 1 /*
hudakz 0:83c732b10e95 2 MQTTClient.cpp - A simple client for MQTT.
hudakz 0:83c732b10e95 3 Nicholas O'Leary
hudakz 0:83c732b10e95 4 http://knolleary.net
hudakz 2:ca17c5d846e7 5
hudakz 3:5e31b4687aad 6 Ported to mbed by Zoltan Hudak <hudakz@outlook.com>
hudakz 0:83c732b10e95 7 */
hudakz 0:83c732b10e95 8 #include "MQTTClient.h"
hudakz 0:83c732b10e95 9 #include <string.h>
hudakz 0:83c732b10e95 10 #include <time.h>
hudakz 0:83c732b10e95 11
hudakz 2:ca17c5d846e7 12 /**
hudakz 2:ca17c5d846e7 13 * @brief
hudakz 2:ca17c5d846e7 14 * @note
hudakz 2:ca17c5d846e7 15 * @param
hudakz 2:ca17c5d846e7 16 * @retval
hudakz 2:ca17c5d846e7 17 */
hudakz 4:8620de6d1696 18 MQTTClient::MQTTClient() :
hudakz 3:5e31b4687aad 19 _client(NULL),
hudakz 5:361a6987739b 20 _stream(NULL),
hudakz 5:361a6987739b 21 _onMessage(NULL)
hudakz 3:5e31b4687aad 22 { }
hudakz 0:83c732b10e95 23
hudakz 2:ca17c5d846e7 24 /**
hudakz 2:ca17c5d846e7 25 * @brief
hudakz 2:ca17c5d846e7 26 * @note
hudakz 2:ca17c5d846e7 27 * @param
hudakz 2:ca17c5d846e7 28 * @retval
hudakz 2:ca17c5d846e7 29 */
hudakz 5:361a6987739b 30 MQTTClient::MQTTClient(IpAddress& ip, uint16_t port) :
hudakz 3:5e31b4687aad 31 _ip(ip),
hudakz 3:5e31b4687aad 32 _domain(NULL),
hudakz 3:5e31b4687aad 33 _port(port),
hudakz 3:5e31b4687aad 34 _stream(NULL),
hudakz 5:361a6987739b 35 _onMessage(NULL)
hudakz 3:5e31b4687aad 36 { }
hudakz 0:83c732b10e95 37
hudakz 2:ca17c5d846e7 38 /**
hudakz 2:ca17c5d846e7 39 * @brief
hudakz 2:ca17c5d846e7 40 * @note
hudakz 2:ca17c5d846e7 41 * @param
hudakz 2:ca17c5d846e7 42 * @retval
hudakz 2:ca17c5d846e7 43 */
hudakz 5:361a6987739b 44 MQTTClient::MQTTClient(const char* domain, uint16_t port) :
hudakz 3:5e31b4687aad 45 _domain((char*)domain),
hudakz 3:5e31b4687aad 46 _port(port),
hudakz 3:5e31b4687aad 47 _stream(NULL),
hudakz 5:361a6987739b 48 _onMessage(NULL)
hudakz 3:5e31b4687aad 49 { }
hudakz 0:83c732b10e95 50
hudakz 2:ca17c5d846e7 51 /**
hudakz 2:ca17c5d846e7 52 * @brief
hudakz 2:ca17c5d846e7 53 * @note
hudakz 2:ca17c5d846e7 54 * @param
hudakz 2:ca17c5d846e7 55 * @retval
hudakz 2:ca17c5d846e7 56 */
hudakz 5:361a6987739b 57 MQTTClient::MQTTClient(IpAddress& ip, uint16_t port, Stream& stream) :
hudakz 3:5e31b4687aad 58 _ip(ip),
hudakz 3:5e31b4687aad 59 _domain(NULL),
hudakz 3:5e31b4687aad 60 _port(port),
hudakz 3:5e31b4687aad 61 _stream(&stream),
hudakz 5:361a6987739b 62 _onMessage(NULL)
hudakz 3:5e31b4687aad 63 { }
hudakz 0:83c732b10e95 64
hudakz 2:ca17c5d846e7 65 /**
hudakz 2:ca17c5d846e7 66 * @brief
hudakz 2:ca17c5d846e7 67 * @note
hudakz 2:ca17c5d846e7 68 * @param
hudakz 2:ca17c5d846e7 69 * @retval
hudakz 2:ca17c5d846e7 70 */
hudakz 5:361a6987739b 71 MQTTClient::MQTTClient(const char* domain, uint16_t port, Stream& stream) :
hudakz 3:5e31b4687aad 72 _domain((char*)domain),
hudakz 3:5e31b4687aad 73 _port(port),
hudakz 3:5e31b4687aad 74 _stream(&stream),
hudakz 5:361a6987739b 75 _onMessage(NULL)
hudakz 3:5e31b4687aad 76 { }
hudakz 0:83c732b10e95 77
hudakz 2:ca17c5d846e7 78 /**
hudakz 2:ca17c5d846e7 79 * @brief
hudakz 2:ca17c5d846e7 80 * @note
hudakz 2:ca17c5d846e7 81 * @param
hudakz 2:ca17c5d846e7 82 * @retval
hudakz 2:ca17c5d846e7 83 */
hudakz 3:5e31b4687aad 84 bool MQTTClient::connect(const char* id)
hudakz 3:5e31b4687aad 85 {
hudakz 2:ca17c5d846e7 86 return connect(id, NULL, NULL, 0, 0, 0, 0);
hudakz 0:83c732b10e95 87 }
hudakz 0:83c732b10e95 88
hudakz 2:ca17c5d846e7 89 /**
hudakz 2:ca17c5d846e7 90 * @brief
hudakz 2:ca17c5d846e7 91 * @note
hudakz 2:ca17c5d846e7 92 * @param
hudakz 2:ca17c5d846e7 93 * @retval
hudakz 2:ca17c5d846e7 94 */
hudakz 3:5e31b4687aad 95 bool MQTTClient::connect(const char* id, const char* user, const char* pass)
hudakz 3:5e31b4687aad 96 {
hudakz 2:ca17c5d846e7 97 return connect(id, user, pass, 0, 0, 0, 0);
hudakz 0:83c732b10e95 98 }
hudakz 0:83c732b10e95 99
hudakz 2:ca17c5d846e7 100 /**
hudakz 2:ca17c5d846e7 101 * @brief
hudakz 2:ca17c5d846e7 102 * @note
hudakz 2:ca17c5d846e7 103 * @param
hudakz 2:ca17c5d846e7 104 * @retval
hudakz 2:ca17c5d846e7 105 */
hudakz 4:8620de6d1696 106 bool MQTTClient::connect
hudakz 4:8620de6d1696 107 (
hudakz 4:8620de6d1696 108 const char* id,
hudakz 4:8620de6d1696 109 const char* willTopic,
hudakz 4:8620de6d1696 110 uint8_t willQos,
hudakz 4:8620de6d1696 111 uint8_t willRetain,
hudakz 4:8620de6d1696 112 const char* willMessage
hudakz 4:8620de6d1696 113 )
hudakz 3:5e31b4687aad 114 {
hudakz 2:ca17c5d846e7 115 return connect(id, NULL, NULL, willTopic, willQos, willRetain, willMessage);
hudakz 0:83c732b10e95 116 }
hudakz 0:83c732b10e95 117
hudakz 2:ca17c5d846e7 118 /**
hudakz 2:ca17c5d846e7 119 * @brief
hudakz 2:ca17c5d846e7 120 * @note
hudakz 2:ca17c5d846e7 121 * @param
hudakz 2:ca17c5d846e7 122 * @retval
hudakz 2:ca17c5d846e7 123 */
hudakz 2:ca17c5d846e7 124 bool MQTTClient::connect
hudakz 2:ca17c5d846e7 125 (
hudakz 4:8620de6d1696 126 const char* id,
hudakz 4:8620de6d1696 127 const char* user,
hudakz 4:8620de6d1696 128 const char* pass,
hudakz 4:8620de6d1696 129 const char* willTopic,
hudakz 4:8620de6d1696 130 uint8_t willQos,
hudakz 4:8620de6d1696 131 uint8_t willRetain,
hudakz 4:8620de6d1696 132 const char* willMessage
hudakz 3:5e31b4687aad 133 )
hudakz 3:5e31b4687aad 134 {
hudakz 3:5e31b4687aad 135 if (!connected()) {
hudakz 0:83c732b10e95 136 int result = 0;
hudakz 0:83c732b10e95 137
hudakz 3:5e31b4687aad 138 if (_domain != NULL) {
hudakz 4:8620de6d1696 139 result = !_client.connect(this->_domain, this->_port);
hudakz 2:ca17c5d846e7 140 }
hudakz 2:ca17c5d846e7 141 else {
hudakz 4:8620de6d1696 142 result = _client.connect(this->_ip, this->_port);
hudakz 0:83c732b10e95 143 }
hudakz 0:83c732b10e95 144
hudakz 3:5e31b4687aad 145 if (result) {
hudakz 3:5e31b4687aad 146 _nextMsgId = 1;
hudakz 2:ca17c5d846e7 147
hudakz 5:361a6987739b 148 uint8_t d[] = { 0x00, 0x06, 'M', 'Q', 'I', 's', 'd', 'p', MQTTPROTOCOLVERSION };
hudakz 2:ca17c5d846e7 149
hudakz 0:83c732b10e95 150 // Leave room in the buffer for header and variable length field
hudakz 4:8620de6d1696 151 uint16_t pos = 5;
hudakz 4:8620de6d1696 152 uint16_t j;
hudakz 4:8620de6d1696 153 for (j = 0; j < sizeof(d); j++) {
hudakz 4:8620de6d1696 154 _buffer[pos++] = d[j];
hudakz 0:83c732b10e95 155 }
hudakz 0:83c732b10e95 156
hudakz 0:83c732b10e95 157 uint8_t v;
hudakz 3:5e31b4687aad 158 if (willTopic) {
hudakz 2:ca17c5d846e7 159 v = 0x06 | (willQos << 3) | (willRetain << 5);
hudakz 2:ca17c5d846e7 160 }
hudakz 2:ca17c5d846e7 161 else {
hudakz 0:83c732b10e95 162 v = 0x02;
hudakz 0:83c732b10e95 163 }
hudakz 0:83c732b10e95 164
hudakz 3:5e31b4687aad 165 if (user != NULL) {
hudakz 2:ca17c5d846e7 166 v = v | 0x80;
hudakz 0:83c732b10e95 167
hudakz 3:5e31b4687aad 168 if (pass != NULL) {
hudakz 2:ca17c5d846e7 169 v = v | (0x80 >> 1);
hudakz 0:83c732b10e95 170 }
hudakz 0:83c732b10e95 171 }
hudakz 0:83c732b10e95 172
hudakz 4:8620de6d1696 173 _buffer[pos++] = v;
hudakz 0:83c732b10e95 174
hudakz 4:8620de6d1696 175 _buffer[pos++] = ((MQTT_KEEPALIVE) >> 8);
hudakz 4:8620de6d1696 176 _buffer[pos++] = ((MQTT_KEEPALIVE) & 0xFF);
hudakz 4:8620de6d1696 177 pos = _writeString(id, _buffer, pos);
hudakz 3:5e31b4687aad 178 if (willTopic) {
hudakz 4:8620de6d1696 179 pos = _writeString(willTopic, _buffer, pos);
hudakz 4:8620de6d1696 180 pos = _writeString(willMessage, _buffer, pos);
hudakz 0:83c732b10e95 181 }
hudakz 0:83c732b10e95 182
hudakz 3:5e31b4687aad 183 if (user != NULL) {
hudakz 4:8620de6d1696 184 pos = _writeString(user, _buffer, pos);
hudakz 3:5e31b4687aad 185 if (pass != NULL) {
hudakz 4:8620de6d1696 186 pos = _writeString(pass, _buffer, pos);
hudakz 0:83c732b10e95 187 }
hudakz 0:83c732b10e95 188 }
hudakz 0:83c732b10e95 189
hudakz 4:8620de6d1696 190 _write(MQTTCONNECT, _buffer, pos - 5);
hudakz 0:83c732b10e95 191
hudakz 3:5e31b4687aad 192 _lastInActivity = _lastOutActivity = time(NULL);
hudakz 0:83c732b10e95 193
hudakz 4:8620de6d1696 194 while (!_client.available()) {
hudakz 2:ca17c5d846e7 195 unsigned long t = time(NULL);
hudakz 3:5e31b4687aad 196 if (t - _lastInActivity > MQTT_KEEPALIVE) {
hudakz 4:8620de6d1696 197 _client.stop();
hudakz 0:83c732b10e95 198 return false;
hudakz 0:83c732b10e95 199 }
hudakz 0:83c732b10e95 200 }
hudakz 0:83c732b10e95 201
hudakz 4:8620de6d1696 202 uint8_t len;
hudakz 2:ca17c5d846e7 203
hudakz 4:8620de6d1696 204 if (_readPacket(&len) == 4 && _buffer[3] == 0) {
hudakz 3:5e31b4687aad 205 _lastInActivity = time(NULL);
hudakz 3:5e31b4687aad 206 _pingOutstanding = false;
hudakz 0:83c732b10e95 207 return true;
hudakz 0:83c732b10e95 208 }
hudakz 0:83c732b10e95 209 }
hudakz 2:ca17c5d846e7 210
hudakz 4:8620de6d1696 211 _client.stop();
hudakz 0:83c732b10e95 212 }
hudakz 2:ca17c5d846e7 213
hudakz 0:83c732b10e95 214 return false;
hudakz 0:83c732b10e95 215 }
hudakz 0:83c732b10e95 216
hudakz 2:ca17c5d846e7 217 /**
hudakz 2:ca17c5d846e7 218 * @brief
hudakz 2:ca17c5d846e7 219 * @note
hudakz 2:ca17c5d846e7 220 * @param
hudakz 2:ca17c5d846e7 221 * @retval
hudakz 2:ca17c5d846e7 222 */
hudakz 4:8620de6d1696 223 uint8_t MQTTClient::_readByte()
hudakz 3:5e31b4687aad 224 {
hudakz 4:8620de6d1696 225 while (!_client.available()) { }
hudakz 2:ca17c5d846e7 226
hudakz 4:8620de6d1696 227 return _client.recv();
hudakz 0:83c732b10e95 228 }
hudakz 0:83c732b10e95 229
hudakz 2:ca17c5d846e7 230 /**
hudakz 2:ca17c5d846e7 231 * @brief
hudakz 2:ca17c5d846e7 232 * @note
hudakz 2:ca17c5d846e7 233 * @param
hudakz 2:ca17c5d846e7 234 * @retval
hudakz 2:ca17c5d846e7 235 */
hudakz 4:8620de6d1696 236 uint16_t MQTTClient::_readPacket(uint8_t* length)
hudakz 3:5e31b4687aad 237 {
hudakz 2:ca17c5d846e7 238 uint16_t len = 0;
hudakz 3:5e31b4687aad 239 _buffer[len++] = _readByte();
hudakz 0:83c732b10e95 240
hudakz 3:5e31b4687aad 241 bool isPublish = (_buffer[0] & 0xF0) == MQTTPUBLISH;
hudakz 2:ca17c5d846e7 242 uint32_t multiplier = 1;
hudakz 4:8620de6d1696 243 uint16_t pos = 0;
hudakz 2:ca17c5d846e7 244 uint8_t digit = 0;
hudakz 2:ca17c5d846e7 245 uint16_t skip = 0;
hudakz 2:ca17c5d846e7 246 uint8_t start = 0;
hudakz 2:ca17c5d846e7 247
hudakz 3:5e31b4687aad 248 do {
hudakz 3:5e31b4687aad 249 digit = _readByte();
hudakz 3:5e31b4687aad 250 _buffer[len++] = digit;
hudakz 4:8620de6d1696 251 pos += (digit & 127) * multiplier;
hudakz 0:83c732b10e95 252 multiplier *= 128;
hudakz 3:5e31b4687aad 253 } while ((digit & 128) != 0);
hudakz 4:8620de6d1696 254 *length = len - 1;
hudakz 0:83c732b10e95 255
hudakz 3:5e31b4687aad 256 if (isPublish) {
hudakz 0:83c732b10e95 257 // Read in topic length to calculate bytes to skip over for Stream writing
hudakz 3:5e31b4687aad 258 _buffer[len++] = _readByte();
hudakz 3:5e31b4687aad 259 _buffer[len++] = _readByte();
hudakz 4:8620de6d1696 260 skip = (_buffer[*length + 1] << 8) + _buffer[*length + 2];
hudakz 0:83c732b10e95 261 start = 2;
hudakz 3:5e31b4687aad 262 if (_buffer[0] & MQTTQOS1) {
hudakz 0:83c732b10e95 263 // skip message id
hudakz 0:83c732b10e95 264 skip += 2;
hudakz 0:83c732b10e95 265 }
hudakz 0:83c732b10e95 266 }
hudakz 0:83c732b10e95 267
hudakz 4:8620de6d1696 268 for (uint16_t i = start; i < pos; i++) {
hudakz 3:5e31b4687aad 269 digit = _readByte();
hudakz 3:5e31b4687aad 270 if (this->_stream) {
hudakz 4:8620de6d1696 271 if (isPublish && len -*length - 2 > skip) {
hudakz 3:5e31b4687aad 272 this->_stream->putc(digit);
hudakz 0:83c732b10e95 273 }
hudakz 0:83c732b10e95 274 }
hudakz 2:ca17c5d846e7 275
hudakz 3:5e31b4687aad 276 if (len < MQTT_MAX_PACKET_SIZE) {
hudakz 3:5e31b4687aad 277 _buffer[len] = digit;
hudakz 0:83c732b10e95 278 }
hudakz 2:ca17c5d846e7 279
hudakz 0:83c732b10e95 280 len++;
hudakz 0:83c732b10e95 281 }
hudakz 0:83c732b10e95 282
hudakz 3:5e31b4687aad 283 if (!this->_stream && len > MQTT_MAX_PACKET_SIZE) {
hudakz 2:ca17c5d846e7 284 len = 0; // This will cause the packet to be ignored.
hudakz 0:83c732b10e95 285 }
hudakz 0:83c732b10e95 286
hudakz 0:83c732b10e95 287 return len;
hudakz 0:83c732b10e95 288 }
hudakz 0:83c732b10e95 289
hudakz 2:ca17c5d846e7 290 /**
hudakz 2:ca17c5d846e7 291 * @brief
hudakz 2:ca17c5d846e7 292 * @note
hudakz 2:ca17c5d846e7 293 * @param
hudakz 2:ca17c5d846e7 294 * @retval
hudakz 2:ca17c5d846e7 295 */
hudakz 5:361a6987739b 296 bool MQTTClient::poll()
hudakz 3:5e31b4687aad 297 {
hudakz 3:5e31b4687aad 298 if (connected()) {
hudakz 4:8620de6d1696 299 time_t now = time(NULL);
hudakz 4:8620de6d1696 300 if ((now - _lastInActivity > MQTT_KEEPALIVE) || (now - _lastOutActivity > MQTT_KEEPALIVE)) {
hudakz 3:5e31b4687aad 301 if (_pingOutstanding) {
hudakz 4:8620de6d1696 302 _client.stop();
hudakz 0:83c732b10e95 303 return false;
hudakz 2:ca17c5d846e7 304 }
hudakz 2:ca17c5d846e7 305 else {
hudakz 3:5e31b4687aad 306 _buffer[0] = MQTTPINGREQ;
hudakz 3:5e31b4687aad 307 _buffer[1] = 0;
hudakz 4:8620de6d1696 308 _client.send(_buffer, 2);
hudakz 4:8620de6d1696 309 _lastOutActivity = now;
hudakz 4:8620de6d1696 310 _lastInActivity = now;
hudakz 3:5e31b4687aad 311 _pingOutstanding = true;
hudakz 0:83c732b10e95 312 }
hudakz 0:83c732b10e95 313 }
hudakz 2:ca17c5d846e7 314
hudakz 4:8620de6d1696 315 if (_client.available()) {
hudakz 4:8620de6d1696 316 uint8_t len;
hudakz 4:8620de6d1696 317 uint16_t length = _readPacket(&len);
hudakz 2:ca17c5d846e7 318 uint16_t msgId = 0;
hudakz 2:ca17c5d846e7 319 uint8_t* payload;
hudakz 4:8620de6d1696 320 if (length > 0) {
hudakz 4:8620de6d1696 321 _lastInActivity = now;
hudakz 2:ca17c5d846e7 322
hudakz 3:5e31b4687aad 323 uint8_t type = _buffer[0] & 0xF0;
hudakz 3:5e31b4687aad 324 if (type == MQTTPUBLISH) {
hudakz 3:5e31b4687aad 325 if (_onMessage) {
hudakz 4:8620de6d1696 326 uint16_t topicLen = (_buffer[len + 1] << 8) + _buffer[len + 2];
hudakz 4:8620de6d1696 327 char topic[topicLen + 1];
hudakz 4:8620de6d1696 328 for (uint16_t i = 0; i < topicLen; i++) {
hudakz 4:8620de6d1696 329 topic[i] = _buffer[len + 3 + i];
hudakz 0:83c732b10e95 330 }
hudakz 2:ca17c5d846e7 331
hudakz 4:8620de6d1696 332 topic[topicLen] = '\0';
hudakz 2:ca17c5d846e7 333
hudakz 0:83c732b10e95 334 // msgId only present for QOS>0
hudakz 3:5e31b4687aad 335 if ((_buffer[0] & 0x06) == MQTTQOS1) {
hudakz 4:8620de6d1696 336 msgId = (_buffer[len + 3 + topicLen] << 8) + _buffer[len + 3 + topicLen + 1];
hudakz 4:8620de6d1696 337 payload = _buffer + len + 3 + topicLen + 2;
hudakz 4:8620de6d1696 338 _onMessage(topic, payload, length - len - 3 - topicLen - 2);
hudakz 0:83c732b10e95 339
hudakz 3:5e31b4687aad 340 _buffer[0] = MQTTPUBACK;
hudakz 3:5e31b4687aad 341 _buffer[1] = 2;
hudakz 3:5e31b4687aad 342 _buffer[2] = (msgId >> 8);
hudakz 3:5e31b4687aad 343 _buffer[3] = (msgId & 0xFF);
hudakz 4:8620de6d1696 344 _client.send(_buffer, 4);
hudakz 4:8620de6d1696 345 _lastOutActivity = now;
hudakz 2:ca17c5d846e7 346 }
hudakz 2:ca17c5d846e7 347 else {
hudakz 4:8620de6d1696 348 payload = _buffer + len + 3 + topicLen;
hudakz 4:8620de6d1696 349 _onMessage(topic, payload, length - len - 3 - topicLen);
hudakz 0:83c732b10e95 350 }
hudakz 0:83c732b10e95 351 }
hudakz 2:ca17c5d846e7 352 }
hudakz 2:ca17c5d846e7 353 else
hudakz 3:5e31b4687aad 354 if (type == MQTTPINGREQ) {
hudakz 3:5e31b4687aad 355 _buffer[0] = MQTTPINGRESP;
hudakz 3:5e31b4687aad 356 _buffer[1] = 0;
hudakz 4:8620de6d1696 357 _client.send(_buffer, 2);
hudakz 2:ca17c5d846e7 358 }
hudakz 2:ca17c5d846e7 359 else
hudakz 3:5e31b4687aad 360 if (type == MQTTPINGRESP) {
hudakz 3:5e31b4687aad 361 _pingOutstanding = false;
hudakz 0:83c732b10e95 362 }
hudakz 0:83c732b10e95 363 }
hudakz 0:83c732b10e95 364 }
hudakz 2:ca17c5d846e7 365
hudakz 0:83c732b10e95 366 return true;
hudakz 0:83c732b10e95 367 }
hudakz 2:ca17c5d846e7 368
hudakz 0:83c732b10e95 369 return false;
hudakz 0:83c732b10e95 370 }
hudakz 0:83c732b10e95 371
hudakz 2:ca17c5d846e7 372 /**
hudakz 2:ca17c5d846e7 373 * @brief
hudakz 2:ca17c5d846e7 374 * @note
hudakz 2:ca17c5d846e7 375 * @param
hudakz 2:ca17c5d846e7 376 * @retval
hudakz 2:ca17c5d846e7 377 */
hudakz 3:5e31b4687aad 378 bool MQTTClient::publish(const char* topic, const char* payload)
hudakz 3:5e31b4687aad 379 {
hudakz 2:ca17c5d846e7 380 return publish(topic, (uint8_t*)payload, strlen(payload), false);
hudakz 0:83c732b10e95 381 }
hudakz 0:83c732b10e95 382
hudakz 2:ca17c5d846e7 383 /**
hudakz 2:ca17c5d846e7 384 * @brief
hudakz 2:ca17c5d846e7 385 * @note
hudakz 2:ca17c5d846e7 386 * @param
hudakz 2:ca17c5d846e7 387 * @retval
hudakz 2:ca17c5d846e7 388 */
hudakz 4:8620de6d1696 389 bool MQTTClient::publish(const char* topic, uint8_t* payload, uint16_t length)
hudakz 3:5e31b4687aad 390 {
hudakz 4:8620de6d1696 391 return publish(topic, payload, length, false);
hudakz 0:83c732b10e95 392 }
hudakz 0:83c732b10e95 393
hudakz 2:ca17c5d846e7 394 /**
hudakz 2:ca17c5d846e7 395 * @brief
hudakz 2:ca17c5d846e7 396 * @note
hudakz 2:ca17c5d846e7 397 * @param
hudakz 2:ca17c5d846e7 398 * @retval
hudakz 2:ca17c5d846e7 399 */
hudakz 4:8620de6d1696 400 bool MQTTClient::publish(const char* topic, uint8_t* payload, uint16_t plength, bool retained)
hudakz 3:5e31b4687aad 401 {
hudakz 3:5e31b4687aad 402 if (connected()) {
hudakz 0:83c732b10e95 403 // Leave room in the buffer for header and variable length field
hudakz 2:ca17c5d846e7 404 uint16_t length = 5;
hudakz 3:5e31b4687aad 405 length = _writeString(topic, _buffer, length);
hudakz 2:ca17c5d846e7 406
hudakz 2:ca17c5d846e7 407 uint16_t i;
hudakz 3:5e31b4687aad 408 for (i = 0; i < plength; i++) {
hudakz 3:5e31b4687aad 409 _buffer[length++] = payload[i];
hudakz 0:83c732b10e95 410 }
hudakz 2:ca17c5d846e7 411
hudakz 0:83c732b10e95 412 uint8_t header = MQTTPUBLISH;
hudakz 3:5e31b4687aad 413 if (retained) {
hudakz 0:83c732b10e95 414 header |= 1;
hudakz 0:83c732b10e95 415 }
hudakz 2:ca17c5d846e7 416
hudakz 3:5e31b4687aad 417 return _write(header, _buffer, length - 5);
hudakz 0:83c732b10e95 418 }
hudakz 2:ca17c5d846e7 419
hudakz 0:83c732b10e95 420 return false;
hudakz 0:83c732b10e95 421 }
hudakz 0:83c732b10e95 422
hudakz 2:ca17c5d846e7 423 /**
hudakz 2:ca17c5d846e7 424 * @brief
hudakz 2:ca17c5d846e7 425 * @note
hudakz 2:ca17c5d846e7 426 * @param
hudakz 2:ca17c5d846e7 427 * @retval
hudakz 2:ca17c5d846e7 428 */
hudakz 3:5e31b4687aad 429 bool MQTTClient::_write(uint8_t header, uint8_t* buf, uint16_t length)
hudakz 3:5e31b4687aad 430 {
hudakz 4:8620de6d1696 431 uint8_t digitBuf[4];
hudakz 4:8620de6d1696 432 uint8_t digitLen = 0;
hudakz 0:83c732b10e95 433 uint8_t digit;
hudakz 0:83c732b10e95 434 uint8_t pos = 0;
hudakz 0:83c732b10e95 435 uint8_t rc;
hudakz 0:83c732b10e95 436 uint8_t len = length;
hudakz 3:5e31b4687aad 437 do {
hudakz 0:83c732b10e95 438 digit = len % 128;
hudakz 0:83c732b10e95 439 len = len / 128;
hudakz 3:5e31b4687aad 440 if (len > 0) {
hudakz 0:83c732b10e95 441 digit |= 0x80;
hudakz 0:83c732b10e95 442 }
hudakz 2:ca17c5d846e7 443
hudakz 4:8620de6d1696 444 digitBuf[pos++] = digit;
hudakz 4:8620de6d1696 445 digitLen++;
hudakz 3:5e31b4687aad 446 } while (len > 0);
hudakz 0:83c732b10e95 447
hudakz 4:8620de6d1696 448 buf[4 - digitLen] = header;
hudakz 4:8620de6d1696 449 for (int i = 0; i < digitLen; i++) {
hudakz 4:8620de6d1696 450 buf[5 - digitLen + i] = digitBuf[i];
hudakz 0:83c732b10e95 451 }
hudakz 2:ca17c5d846e7 452
hudakz 4:8620de6d1696 453 rc = _client.send(buf + (4 - digitLen), length + 1 + digitLen);
hudakz 0:83c732b10e95 454
hudakz 3:5e31b4687aad 455 _lastOutActivity = time(NULL);
hudakz 4:8620de6d1696 456 return(rc == 1 + digitLen + length);
hudakz 0:83c732b10e95 457 }
hudakz 0:83c732b10e95 458
hudakz 2:ca17c5d846e7 459 /**
hudakz 2:ca17c5d846e7 460 * @brief
hudakz 2:ca17c5d846e7 461 * @note
hudakz 2:ca17c5d846e7 462 * @param
hudakz 2:ca17c5d846e7 463 * @retval
hudakz 2:ca17c5d846e7 464 */
hudakz 3:5e31b4687aad 465 bool MQTTClient::subscribe(const char* topic)
hudakz 3:5e31b4687aad 466 {
hudakz 2:ca17c5d846e7 467 bool result = subscribe(topic, 0);
hudakz 6:1ce536bf461b 468 #if MBED_MAJOR_VERSION == 2
hudakz 2:ca17c5d846e7 469 wait_ms(50);
hudakz 6:1ce536bf461b 470 #else
advxolltm 7:9571227876ee 471 wait_us(50000);
hudakz 6:1ce536bf461b 472 #endif
hudakz 6:1ce536bf461b 473
hudakz 2:ca17c5d846e7 474 return result;
hudakz 0:83c732b10e95 475 }
hudakz 0:83c732b10e95 476
hudakz 2:ca17c5d846e7 477 /**
hudakz 2:ca17c5d846e7 478 * @brief
hudakz 2:ca17c5d846e7 479 * @note
hudakz 2:ca17c5d846e7 480 * @param
hudakz 2:ca17c5d846e7 481 * @retval
hudakz 2:ca17c5d846e7 482 */
hudakz 3:5e31b4687aad 483 bool MQTTClient::subscribe(const char* topic, uint8_t qos)
hudakz 3:5e31b4687aad 484 {
hudakz 3:5e31b4687aad 485 if (qos > 1)
hudakz 0:83c732b10e95 486 return false;
hudakz 0:83c732b10e95 487
hudakz 3:5e31b4687aad 488 if (connected()) {
hudakz 0:83c732b10e95 489 // Leave room in the buffer for header and variable length field
hudakz 2:ca17c5d846e7 490 uint16_t length = 5;
hudakz 3:5e31b4687aad 491 _nextMsgId++;
hudakz 3:5e31b4687aad 492 if (_nextMsgId == 0) {
hudakz 3:5e31b4687aad 493 _nextMsgId = 1;
hudakz 0:83c732b10e95 494 }
hudakz 2:ca17c5d846e7 495
hudakz 3:5e31b4687aad 496 _buffer[length++] = (_nextMsgId >> 8);
hudakz 3:5e31b4687aad 497 _buffer[length++] = (_nextMsgId & 0xFF);
hudakz 3:5e31b4687aad 498 length = _writeString(topic, _buffer, length);
hudakz 3:5e31b4687aad 499 _buffer[length++] = qos;
hudakz 3:5e31b4687aad 500 return _write(MQTTSUBSCRIBE | MQTTQOS1, _buffer, length - 5);
hudakz 0:83c732b10e95 501 }
hudakz 2:ca17c5d846e7 502
hudakz 0:83c732b10e95 503 return false;
hudakz 0:83c732b10e95 504 }
hudakz 0:83c732b10e95 505
hudakz 2:ca17c5d846e7 506 /**
hudakz 2:ca17c5d846e7 507 * @brief
hudakz 2:ca17c5d846e7 508 * @note
hudakz 2:ca17c5d846e7 509 * @param
hudakz 2:ca17c5d846e7 510 * @retval
hudakz 2:ca17c5d846e7 511 */
hudakz 3:5e31b4687aad 512 bool MQTTClient::unsubscribe(const char* topic)
hudakz 3:5e31b4687aad 513 {
hudakz 3:5e31b4687aad 514 if (connected()) {
hudakz 2:ca17c5d846e7 515 uint16_t length = 5;
hudakz 3:5e31b4687aad 516 _nextMsgId++;
hudakz 3:5e31b4687aad 517 if (_nextMsgId == 0) {
hudakz 3:5e31b4687aad 518 _nextMsgId = 1;
hudakz 0:83c732b10e95 519 }
hudakz 2:ca17c5d846e7 520
hudakz 3:5e31b4687aad 521 _buffer[length++] = (_nextMsgId >> 8);
hudakz 3:5e31b4687aad 522 _buffer[length++] = (_nextMsgId & 0xFF);
hudakz 3:5e31b4687aad 523 length = _writeString(topic, _buffer, length);
hudakz 3:5e31b4687aad 524 return _write(MQTTUNSUBSCRIBE | MQTTQOS1, _buffer, length - 5);
hudakz 0:83c732b10e95 525 }
hudakz 2:ca17c5d846e7 526
hudakz 0:83c732b10e95 527 return false;
hudakz 0:83c732b10e95 528 }
hudakz 0:83c732b10e95 529
hudakz 2:ca17c5d846e7 530 /**
hudakz 2:ca17c5d846e7 531 * @brief
hudakz 2:ca17c5d846e7 532 * @note
hudakz 2:ca17c5d846e7 533 * @param
hudakz 2:ca17c5d846e7 534 * @retval
hudakz 2:ca17c5d846e7 535 */
hudakz 4:8620de6d1696 536 void MQTTClient::disconnect()
hudakz 3:5e31b4687aad 537 {
hudakz 3:5e31b4687aad 538 _buffer[0] = MQTTDISCONNECT;
hudakz 3:5e31b4687aad 539 _buffer[1] = 0;
hudakz 4:8620de6d1696 540 _client.send(_buffer, 2);
hudakz 4:8620de6d1696 541 _client.stop();
hudakz 3:5e31b4687aad 542 _lastInActivity = _lastOutActivity = time(NULL);
hudakz 0:83c732b10e95 543 }
hudakz 0:83c732b10e95 544
hudakz 2:ca17c5d846e7 545 /**
hudakz 2:ca17c5d846e7 546 * @brief
hudakz 2:ca17c5d846e7 547 * @note
hudakz 2:ca17c5d846e7 548 * @param
hudakz 2:ca17c5d846e7 549 * @retval
hudakz 2:ca17c5d846e7 550 */
hudakz 4:8620de6d1696 551 uint16_t MQTTClient::_writeString(const char* string, uint8_t* buf, uint16_t length)
hudakz 3:5e31b4687aad 552 {
hudakz 3:5e31b4687aad 553 char* idp = (char*)string;
hudakz 2:ca17c5d846e7 554 uint16_t i = 0;
hudakz 3:5e31b4687aad 555
hudakz 4:8620de6d1696 556 length += 2;
hudakz 3:5e31b4687aad 557 while (*idp) {
hudakz 4:8620de6d1696 558 buf[length++] = *idp++;
hudakz 0:83c732b10e95 559 i++;
hudakz 0:83c732b10e95 560 }
hudakz 2:ca17c5d846e7 561
hudakz 4:8620de6d1696 562 buf[length - i - 2] = (i >> 8);
hudakz 4:8620de6d1696 563 buf[length - i - 1] = (i & 0xFF);
hudakz 4:8620de6d1696 564 return length;
hudakz 0:83c732b10e95 565 }
hudakz 0:83c732b10e95 566
hudakz 2:ca17c5d846e7 567 /**
hudakz 2:ca17c5d846e7 568 * @brief
hudakz 2:ca17c5d846e7 569 * @note
hudakz 2:ca17c5d846e7 570 * @param
hudakz 2:ca17c5d846e7 571 * @retval
hudakz 2:ca17c5d846e7 572 */
hudakz 4:8620de6d1696 573 bool MQTTClient::connected()
hudakz 3:5e31b4687aad 574 {
hudakz 4:8620de6d1696 575 bool rc = (int)_client.connected();
hudakz 4:8620de6d1696 576
hudakz 4:8620de6d1696 577 if (!rc)
hudakz 4:8620de6d1696 578 _client.stop();
hudakz 0:83c732b10e95 579
hudakz 0:83c732b10e95 580 return rc;
hudakz 0:83c732b10e95 581 }
hudakz 5:361a6987739b 582
hudakz 5:361a6987739b 583 /**
hudakz 5:361a6987739b 584 * @brief
hudakz 5:361a6987739b 585 * @note
hudakz 5:361a6987739b 586 * @param
hudakz 5:361a6987739b 587 * @retval
hudakz 5:361a6987739b 588 */
hudakz 5:361a6987739b 589 void MQTTClient::attach(Callback<void (char *, uint8_t *, uint16_t)> fnc)
hudakz 5:361a6987739b 590 {
hudakz 5:361a6987739b 591 _onMessage = fnc;
hudakz 5:361a6987739b 592 }