Local fixes

Committer:
hudakz
Date:
Thu Sep 12 20:26:40 2019 +0000
Revision:
4:8620de6d1696
Parent:
3:5e31b4687aad
Child:
5:361a6987739b
Modified.

Who changed what in which revision?

UserRevisionLine numberNew contents of line
hudakz 0:83c732b10e95 1 /*
hudakz 0:83c732b10e95 2 MQTTClient.cpp - A simple client for MQTT.
hudakz 0:83c732b10e95 3 Nicholas O'Leary
hudakz 0:83c732b10e95 4 http://knolleary.net
hudakz 2:ca17c5d846e7 5
hudakz 3:5e31b4687aad 6 Ported to mbed by Zoltan Hudak <hudakz@outlook.com>
hudakz 0:83c732b10e95 7 */
hudakz 0:83c732b10e95 8 #include "MQTTClient.h"
hudakz 0:83c732b10e95 9 #include <string.h>
hudakz 0:83c732b10e95 10 #include <time.h>
hudakz 0:83c732b10e95 11
hudakz 2:ca17c5d846e7 12 /**
hudakz 2:ca17c5d846e7 13 * @brief
hudakz 2:ca17c5d846e7 14 * @note
hudakz 2:ca17c5d846e7 15 * @param
hudakz 2:ca17c5d846e7 16 * @retval
hudakz 2:ca17c5d846e7 17 */
hudakz 4:8620de6d1696 18 MQTTClient::MQTTClient() :
hudakz 3:5e31b4687aad 19 _client(NULL),
hudakz 3:5e31b4687aad 20 _stream(NULL)
hudakz 3:5e31b4687aad 21 { }
hudakz 0:83c732b10e95 22
hudakz 2:ca17c5d846e7 23 /**
hudakz 2:ca17c5d846e7 24 * @brief
hudakz 2:ca17c5d846e7 25 * @note
hudakz 2:ca17c5d846e7 26 * @param
hudakz 2:ca17c5d846e7 27 * @retval
hudakz 2:ca17c5d846e7 28 */
hudakz 2:ca17c5d846e7 29 MQTTClient::MQTTClient
hudakz 2:ca17c5d846e7 30 (
hudakz 3:5e31b4687aad 31 IpAddress& ip,
hudakz 2:ca17c5d846e7 32 uint16_t port,
hudakz 4:8620de6d1696 33 Callback<void (char*, uint8_t*, uint16_t)> onMessage
hudakz 3:5e31b4687aad 34 ) :
hudakz 3:5e31b4687aad 35 _ip(ip),
hudakz 3:5e31b4687aad 36 _domain(NULL),
hudakz 3:5e31b4687aad 37 _port(port),
hudakz 3:5e31b4687aad 38 _stream(NULL),
hudakz 3:5e31b4687aad 39 _onMessage(onMessage)
hudakz 3:5e31b4687aad 40 { }
hudakz 0:83c732b10e95 41
hudakz 2:ca17c5d846e7 42 /**
hudakz 2:ca17c5d846e7 43 * @brief
hudakz 2:ca17c5d846e7 44 * @note
hudakz 2:ca17c5d846e7 45 * @param
hudakz 2:ca17c5d846e7 46 * @retval
hudakz 2:ca17c5d846e7 47 */
hudakz 3:5e31b4687aad 48 MQTTClient::MQTTClient
hudakz 3:5e31b4687aad 49 (
hudakz 4:8620de6d1696 50 const char* domain,
hudakz 3:5e31b4687aad 51 uint16_t port,
hudakz 4:8620de6d1696 52 Callback<void (char*, uint8_t*, uint16_t)> onMessage
hudakz 3:5e31b4687aad 53 ) :
hudakz 3:5e31b4687aad 54 _domain((char*)domain),
hudakz 3:5e31b4687aad 55 _port(port),
hudakz 3:5e31b4687aad 56 _stream(NULL),
hudakz 3:5e31b4687aad 57 _onMessage(onMessage)
hudakz 3:5e31b4687aad 58 { }
hudakz 0:83c732b10e95 59
hudakz 2:ca17c5d846e7 60 /**
hudakz 2:ca17c5d846e7 61 * @brief
hudakz 2:ca17c5d846e7 62 * @note
hudakz 2:ca17c5d846e7 63 * @param
hudakz 2:ca17c5d846e7 64 * @retval
hudakz 2:ca17c5d846e7 65 */
hudakz 2:ca17c5d846e7 66 MQTTClient::MQTTClient
hudakz 2:ca17c5d846e7 67 (
hudakz 3:5e31b4687aad 68 IpAddress& ip,
hudakz 2:ca17c5d846e7 69 uint16_t port,
hudakz 4:8620de6d1696 70 Callback<void (char*, uint8_t*, uint16_t)> onMessage,
hudakz 3:5e31b4687aad 71 Stream& stream
hudakz 3:5e31b4687aad 72 ) :
hudakz 3:5e31b4687aad 73 _ip(ip),
hudakz 3:5e31b4687aad 74 _domain(NULL),
hudakz 3:5e31b4687aad 75 _port(port),
hudakz 3:5e31b4687aad 76 _stream(&stream),
hudakz 3:5e31b4687aad 77 _onMessage(onMessage)
hudakz 3:5e31b4687aad 78 { }
hudakz 0:83c732b10e95 79
hudakz 2:ca17c5d846e7 80 /**
hudakz 2:ca17c5d846e7 81 * @brief
hudakz 2:ca17c5d846e7 82 * @note
hudakz 2:ca17c5d846e7 83 * @param
hudakz 2:ca17c5d846e7 84 * @retval
hudakz 2:ca17c5d846e7 85 */
hudakz 2:ca17c5d846e7 86 MQTTClient::MQTTClient
hudakz 2:ca17c5d846e7 87 (
hudakz 3:5e31b4687aad 88 const char* domain,
hudakz 2:ca17c5d846e7 89 uint16_t port,
hudakz 4:8620de6d1696 90 Callback<void (char*, uint8_t*, uint16_t)> onMessage,
hudakz 3:5e31b4687aad 91 Stream& stream
hudakz 3:5e31b4687aad 92 ) :
hudakz 3:5e31b4687aad 93 _domain((char*)domain),
hudakz 3:5e31b4687aad 94 _port(port),
hudakz 3:5e31b4687aad 95 _stream(&stream),
hudakz 3:5e31b4687aad 96 _onMessage(onMessage)
hudakz 3:5e31b4687aad 97 { }
hudakz 0:83c732b10e95 98
hudakz 2:ca17c5d846e7 99 /**
hudakz 2:ca17c5d846e7 100 * @brief
hudakz 2:ca17c5d846e7 101 * @note
hudakz 2:ca17c5d846e7 102 * @param
hudakz 2:ca17c5d846e7 103 * @retval
hudakz 2:ca17c5d846e7 104 */
hudakz 3:5e31b4687aad 105 bool MQTTClient::connect(const char* id)
hudakz 3:5e31b4687aad 106 {
hudakz 2:ca17c5d846e7 107 return connect(id, NULL, NULL, 0, 0, 0, 0);
hudakz 0:83c732b10e95 108 }
hudakz 0:83c732b10e95 109
hudakz 2:ca17c5d846e7 110 /**
hudakz 2:ca17c5d846e7 111 * @brief
hudakz 2:ca17c5d846e7 112 * @note
hudakz 2:ca17c5d846e7 113 * @param
hudakz 2:ca17c5d846e7 114 * @retval
hudakz 2:ca17c5d846e7 115 */
hudakz 3:5e31b4687aad 116 bool MQTTClient::connect(const char* id, const char* user, const char* pass)
hudakz 3:5e31b4687aad 117 {
hudakz 2:ca17c5d846e7 118 return connect(id, user, pass, 0, 0, 0, 0);
hudakz 0:83c732b10e95 119 }
hudakz 0:83c732b10e95 120
hudakz 2:ca17c5d846e7 121 /**
hudakz 2:ca17c5d846e7 122 * @brief
hudakz 2:ca17c5d846e7 123 * @note
hudakz 2:ca17c5d846e7 124 * @param
hudakz 2:ca17c5d846e7 125 * @retval
hudakz 2:ca17c5d846e7 126 */
hudakz 4:8620de6d1696 127 bool MQTTClient::connect
hudakz 4:8620de6d1696 128 (
hudakz 4:8620de6d1696 129 const char* id,
hudakz 4:8620de6d1696 130 const char* willTopic,
hudakz 4:8620de6d1696 131 uint8_t willQos,
hudakz 4:8620de6d1696 132 uint8_t willRetain,
hudakz 4:8620de6d1696 133 const char* willMessage
hudakz 4:8620de6d1696 134 )
hudakz 3:5e31b4687aad 135 {
hudakz 2:ca17c5d846e7 136 return connect(id, NULL, NULL, willTopic, willQos, willRetain, willMessage);
hudakz 0:83c732b10e95 137 }
hudakz 0:83c732b10e95 138
hudakz 2:ca17c5d846e7 139 /**
hudakz 2:ca17c5d846e7 140 * @brief
hudakz 2:ca17c5d846e7 141 * @note
hudakz 2:ca17c5d846e7 142 * @param
hudakz 2:ca17c5d846e7 143 * @retval
hudakz 2:ca17c5d846e7 144 */
hudakz 2:ca17c5d846e7 145 bool MQTTClient::connect
hudakz 2:ca17c5d846e7 146 (
hudakz 4:8620de6d1696 147 const char* id,
hudakz 4:8620de6d1696 148 const char* user,
hudakz 4:8620de6d1696 149 const char* pass,
hudakz 4:8620de6d1696 150 const char* willTopic,
hudakz 4:8620de6d1696 151 uint8_t willQos,
hudakz 4:8620de6d1696 152 uint8_t willRetain,
hudakz 4:8620de6d1696 153 const char* willMessage
hudakz 3:5e31b4687aad 154 )
hudakz 3:5e31b4687aad 155 {
hudakz 3:5e31b4687aad 156 if (!connected()) {
hudakz 0:83c732b10e95 157 int result = 0;
hudakz 0:83c732b10e95 158
hudakz 3:5e31b4687aad 159 if (_domain != NULL) {
hudakz 4:8620de6d1696 160 result = !_client.connect(this->_domain, this->_port);
hudakz 2:ca17c5d846e7 161 }
hudakz 2:ca17c5d846e7 162 else {
hudakz 4:8620de6d1696 163 result = _client.connect(this->_ip, this->_port);
hudakz 0:83c732b10e95 164 }
hudakz 0:83c732b10e95 165
hudakz 3:5e31b4687aad 166 if (result) {
hudakz 3:5e31b4687aad 167 _nextMsgId = 1;
hudakz 2:ca17c5d846e7 168
hudakz 4:8620de6d1696 169 uint8_t d[] = { 0x00, 0x06, 'M', 'Q', 'I', 's', 'd', 'p', MQTTPROTOCOLVERSION };
hudakz 2:ca17c5d846e7 170
hudakz 0:83c732b10e95 171 // Leave room in the buffer for header and variable length field
hudakz 4:8620de6d1696 172 uint16_t pos = 5;
hudakz 4:8620de6d1696 173 uint16_t j;
hudakz 4:8620de6d1696 174 for (j = 0; j < sizeof(d); j++) {
hudakz 4:8620de6d1696 175 _buffer[pos++] = d[j];
hudakz 0:83c732b10e95 176 }
hudakz 0:83c732b10e95 177
hudakz 0:83c732b10e95 178 uint8_t v;
hudakz 3:5e31b4687aad 179 if (willTopic) {
hudakz 2:ca17c5d846e7 180 v = 0x06 | (willQos << 3) | (willRetain << 5);
hudakz 2:ca17c5d846e7 181 }
hudakz 2:ca17c5d846e7 182 else {
hudakz 0:83c732b10e95 183 v = 0x02;
hudakz 0:83c732b10e95 184 }
hudakz 0:83c732b10e95 185
hudakz 3:5e31b4687aad 186 if (user != NULL) {
hudakz 2:ca17c5d846e7 187 v = v | 0x80;
hudakz 0:83c732b10e95 188
hudakz 3:5e31b4687aad 189 if (pass != NULL) {
hudakz 2:ca17c5d846e7 190 v = v | (0x80 >> 1);
hudakz 0:83c732b10e95 191 }
hudakz 0:83c732b10e95 192 }
hudakz 0:83c732b10e95 193
hudakz 4:8620de6d1696 194 _buffer[pos++] = v;
hudakz 0:83c732b10e95 195
hudakz 4:8620de6d1696 196 _buffer[pos++] = ((MQTT_KEEPALIVE) >> 8);
hudakz 4:8620de6d1696 197 _buffer[pos++] = ((MQTT_KEEPALIVE) & 0xFF);
hudakz 4:8620de6d1696 198 pos = _writeString(id, _buffer, pos);
hudakz 3:5e31b4687aad 199 if (willTopic) {
hudakz 4:8620de6d1696 200 pos = _writeString(willTopic, _buffer, pos);
hudakz 4:8620de6d1696 201 pos = _writeString(willMessage, _buffer, pos);
hudakz 0:83c732b10e95 202 }
hudakz 0:83c732b10e95 203
hudakz 3:5e31b4687aad 204 if (user != NULL) {
hudakz 4:8620de6d1696 205 pos = _writeString(user, _buffer, pos);
hudakz 3:5e31b4687aad 206 if (pass != NULL) {
hudakz 4:8620de6d1696 207 pos = _writeString(pass, _buffer, pos);
hudakz 0:83c732b10e95 208 }
hudakz 0:83c732b10e95 209 }
hudakz 0:83c732b10e95 210
hudakz 4:8620de6d1696 211 _write(MQTTCONNECT, _buffer, pos - 5);
hudakz 0:83c732b10e95 212
hudakz 3:5e31b4687aad 213 _lastInActivity = _lastOutActivity = time(NULL);
hudakz 0:83c732b10e95 214
hudakz 4:8620de6d1696 215 while (!_client.available()) {
hudakz 2:ca17c5d846e7 216 unsigned long t = time(NULL);
hudakz 3:5e31b4687aad 217 if (t - _lastInActivity > MQTT_KEEPALIVE) {
hudakz 4:8620de6d1696 218 _client.stop();
hudakz 0:83c732b10e95 219 return false;
hudakz 0:83c732b10e95 220 }
hudakz 0:83c732b10e95 221 }
hudakz 0:83c732b10e95 222
hudakz 4:8620de6d1696 223 uint8_t len;
hudakz 2:ca17c5d846e7 224
hudakz 4:8620de6d1696 225 if (_readPacket(&len) == 4 && _buffer[3] == 0) {
hudakz 3:5e31b4687aad 226 _lastInActivity = time(NULL);
hudakz 3:5e31b4687aad 227 _pingOutstanding = false;
hudakz 0:83c732b10e95 228 return true;
hudakz 0:83c732b10e95 229 }
hudakz 0:83c732b10e95 230 }
hudakz 2:ca17c5d846e7 231
hudakz 4:8620de6d1696 232 _client.stop();
hudakz 0:83c732b10e95 233 }
hudakz 2:ca17c5d846e7 234
hudakz 0:83c732b10e95 235 return false;
hudakz 0:83c732b10e95 236 }
hudakz 0:83c732b10e95 237
hudakz 2:ca17c5d846e7 238 /**
hudakz 2:ca17c5d846e7 239 * @brief
hudakz 2:ca17c5d846e7 240 * @note
hudakz 2:ca17c5d846e7 241 * @param
hudakz 2:ca17c5d846e7 242 * @retval
hudakz 2:ca17c5d846e7 243 */
hudakz 4:8620de6d1696 244 uint8_t MQTTClient::_readByte()
hudakz 3:5e31b4687aad 245 {
hudakz 4:8620de6d1696 246 while (!_client.available()) { }
hudakz 2:ca17c5d846e7 247
hudakz 4:8620de6d1696 248 return _client.recv();
hudakz 0:83c732b10e95 249 }
hudakz 0:83c732b10e95 250
hudakz 2:ca17c5d846e7 251 /**
hudakz 2:ca17c5d846e7 252 * @brief
hudakz 2:ca17c5d846e7 253 * @note
hudakz 2:ca17c5d846e7 254 * @param
hudakz 2:ca17c5d846e7 255 * @retval
hudakz 2:ca17c5d846e7 256 */
hudakz 4:8620de6d1696 257 uint16_t MQTTClient::_readPacket(uint8_t* length)
hudakz 3:5e31b4687aad 258 {
hudakz 2:ca17c5d846e7 259 uint16_t len = 0;
hudakz 3:5e31b4687aad 260 _buffer[len++] = _readByte();
hudakz 0:83c732b10e95 261
hudakz 3:5e31b4687aad 262 bool isPublish = (_buffer[0] & 0xF0) == MQTTPUBLISH;
hudakz 2:ca17c5d846e7 263 uint32_t multiplier = 1;
hudakz 4:8620de6d1696 264 uint16_t pos = 0;
hudakz 2:ca17c5d846e7 265 uint8_t digit = 0;
hudakz 2:ca17c5d846e7 266 uint16_t skip = 0;
hudakz 2:ca17c5d846e7 267 uint8_t start = 0;
hudakz 2:ca17c5d846e7 268
hudakz 3:5e31b4687aad 269 do {
hudakz 3:5e31b4687aad 270 digit = _readByte();
hudakz 3:5e31b4687aad 271 _buffer[len++] = digit;
hudakz 4:8620de6d1696 272 pos += (digit & 127) * multiplier;
hudakz 0:83c732b10e95 273 multiplier *= 128;
hudakz 3:5e31b4687aad 274 } while ((digit & 128) != 0);
hudakz 4:8620de6d1696 275 *length = len - 1;
hudakz 0:83c732b10e95 276
hudakz 3:5e31b4687aad 277 if (isPublish) {
hudakz 0:83c732b10e95 278 // Read in topic length to calculate bytes to skip over for Stream writing
hudakz 3:5e31b4687aad 279 _buffer[len++] = _readByte();
hudakz 3:5e31b4687aad 280 _buffer[len++] = _readByte();
hudakz 4:8620de6d1696 281 skip = (_buffer[*length + 1] << 8) + _buffer[*length + 2];
hudakz 0:83c732b10e95 282 start = 2;
hudakz 3:5e31b4687aad 283 if (_buffer[0] & MQTTQOS1) {
hudakz 0:83c732b10e95 284 // skip message id
hudakz 0:83c732b10e95 285 skip += 2;
hudakz 0:83c732b10e95 286 }
hudakz 0:83c732b10e95 287 }
hudakz 0:83c732b10e95 288
hudakz 4:8620de6d1696 289 for (uint16_t i = start; i < pos; i++) {
hudakz 3:5e31b4687aad 290 digit = _readByte();
hudakz 3:5e31b4687aad 291 if (this->_stream) {
hudakz 4:8620de6d1696 292 if (isPublish && len -*length - 2 > skip) {
hudakz 3:5e31b4687aad 293 this->_stream->putc(digit);
hudakz 0:83c732b10e95 294 }
hudakz 0:83c732b10e95 295 }
hudakz 2:ca17c5d846e7 296
hudakz 3:5e31b4687aad 297 if (len < MQTT_MAX_PACKET_SIZE) {
hudakz 3:5e31b4687aad 298 _buffer[len] = digit;
hudakz 0:83c732b10e95 299 }
hudakz 2:ca17c5d846e7 300
hudakz 0:83c732b10e95 301 len++;
hudakz 0:83c732b10e95 302 }
hudakz 0:83c732b10e95 303
hudakz 3:5e31b4687aad 304 if (!this->_stream && len > MQTT_MAX_PACKET_SIZE) {
hudakz 2:ca17c5d846e7 305 len = 0; // This will cause the packet to be ignored.
hudakz 0:83c732b10e95 306 }
hudakz 0:83c732b10e95 307
hudakz 0:83c732b10e95 308 return len;
hudakz 0:83c732b10e95 309 }
hudakz 0:83c732b10e95 310
hudakz 2:ca17c5d846e7 311 /**
hudakz 2:ca17c5d846e7 312 * @brief
hudakz 2:ca17c5d846e7 313 * @note
hudakz 2:ca17c5d846e7 314 * @param
hudakz 2:ca17c5d846e7 315 * @retval
hudakz 2:ca17c5d846e7 316 */
hudakz 4:8620de6d1696 317 bool MQTTClient::process()
hudakz 3:5e31b4687aad 318 {
hudakz 3:5e31b4687aad 319 if (connected()) {
hudakz 4:8620de6d1696 320 time_t now = time(NULL);
hudakz 4:8620de6d1696 321 if ((now - _lastInActivity > MQTT_KEEPALIVE) || (now - _lastOutActivity > MQTT_KEEPALIVE)) {
hudakz 3:5e31b4687aad 322 if (_pingOutstanding) {
hudakz 4:8620de6d1696 323 _client.stop();
hudakz 0:83c732b10e95 324 return false;
hudakz 2:ca17c5d846e7 325 }
hudakz 2:ca17c5d846e7 326 else {
hudakz 3:5e31b4687aad 327 _buffer[0] = MQTTPINGREQ;
hudakz 3:5e31b4687aad 328 _buffer[1] = 0;
hudakz 4:8620de6d1696 329 _client.send(_buffer, 2);
hudakz 4:8620de6d1696 330 _lastOutActivity = now;
hudakz 4:8620de6d1696 331 _lastInActivity = now;
hudakz 3:5e31b4687aad 332 _pingOutstanding = true;
hudakz 0:83c732b10e95 333 }
hudakz 0:83c732b10e95 334 }
hudakz 2:ca17c5d846e7 335
hudakz 4:8620de6d1696 336 if (_client.available()) {
hudakz 4:8620de6d1696 337 uint8_t len;
hudakz 4:8620de6d1696 338 uint16_t length = _readPacket(&len);
hudakz 2:ca17c5d846e7 339 uint16_t msgId = 0;
hudakz 2:ca17c5d846e7 340 uint8_t* payload;
hudakz 4:8620de6d1696 341 if (length > 0) {
hudakz 4:8620de6d1696 342 _lastInActivity = now;
hudakz 2:ca17c5d846e7 343
hudakz 3:5e31b4687aad 344 //printf("Here I am\r\n");
hudakz 3:5e31b4687aad 345 uint8_t type = _buffer[0] & 0xF0;
hudakz 3:5e31b4687aad 346 if (type == MQTTPUBLISH) {
hudakz 3:5e31b4687aad 347 if (_onMessage) {
hudakz 4:8620de6d1696 348 uint16_t topicLen = (_buffer[len + 1] << 8) + _buffer[len + 2];
hudakz 4:8620de6d1696 349 char topic[topicLen + 1];
hudakz 4:8620de6d1696 350 for (uint16_t i = 0; i < topicLen; i++) {
hudakz 4:8620de6d1696 351 topic[i] = _buffer[len + 3 + i];
hudakz 0:83c732b10e95 352 }
hudakz 2:ca17c5d846e7 353
hudakz 4:8620de6d1696 354 topic[topicLen] = '\0';
hudakz 2:ca17c5d846e7 355
hudakz 0:83c732b10e95 356 // msgId only present for QOS>0
hudakz 3:5e31b4687aad 357 if ((_buffer[0] & 0x06) == MQTTQOS1) {
hudakz 4:8620de6d1696 358 msgId = (_buffer[len + 3 + topicLen] << 8) + _buffer[len + 3 + topicLen + 1];
hudakz 4:8620de6d1696 359 payload = _buffer + len + 3 + topicLen + 2;
hudakz 4:8620de6d1696 360 _onMessage(topic, payload, length - len - 3 - topicLen - 2);
hudakz 0:83c732b10e95 361
hudakz 3:5e31b4687aad 362 _buffer[0] = MQTTPUBACK;
hudakz 3:5e31b4687aad 363 _buffer[1] = 2;
hudakz 3:5e31b4687aad 364 _buffer[2] = (msgId >> 8);
hudakz 3:5e31b4687aad 365 _buffer[3] = (msgId & 0xFF);
hudakz 4:8620de6d1696 366 _client.send(_buffer, 4);
hudakz 4:8620de6d1696 367 _lastOutActivity = now;
hudakz 2:ca17c5d846e7 368 }
hudakz 2:ca17c5d846e7 369 else {
hudakz 4:8620de6d1696 370 payload = _buffer + len + 3 + topicLen;
hudakz 4:8620de6d1696 371 _onMessage(topic, payload, length - len - 3 - topicLen);
hudakz 0:83c732b10e95 372 }
hudakz 0:83c732b10e95 373 }
hudakz 2:ca17c5d846e7 374 }
hudakz 2:ca17c5d846e7 375 else
hudakz 3:5e31b4687aad 376 if (type == MQTTPINGREQ) {
hudakz 3:5e31b4687aad 377 _buffer[0] = MQTTPINGRESP;
hudakz 3:5e31b4687aad 378 _buffer[1] = 0;
hudakz 4:8620de6d1696 379 _client.send(_buffer, 2);
hudakz 2:ca17c5d846e7 380 }
hudakz 2:ca17c5d846e7 381 else
hudakz 3:5e31b4687aad 382 if (type == MQTTPINGRESP) {
hudakz 3:5e31b4687aad 383 _pingOutstanding = false;
hudakz 0:83c732b10e95 384 }
hudakz 0:83c732b10e95 385 }
hudakz 0:83c732b10e95 386 }
hudakz 2:ca17c5d846e7 387
hudakz 0:83c732b10e95 388 return true;
hudakz 0:83c732b10e95 389 }
hudakz 2:ca17c5d846e7 390
hudakz 0:83c732b10e95 391 return false;
hudakz 0:83c732b10e95 392 }
hudakz 0:83c732b10e95 393
hudakz 2:ca17c5d846e7 394 /**
hudakz 2:ca17c5d846e7 395 * @brief
hudakz 2:ca17c5d846e7 396 * @note
hudakz 2:ca17c5d846e7 397 * @param
hudakz 2:ca17c5d846e7 398 * @retval
hudakz 2:ca17c5d846e7 399 */
hudakz 3:5e31b4687aad 400 bool MQTTClient::publish(const char* topic, const char* payload)
hudakz 3:5e31b4687aad 401 {
hudakz 2:ca17c5d846e7 402 return publish(topic, (uint8_t*)payload, strlen(payload), false);
hudakz 0:83c732b10e95 403 }
hudakz 0:83c732b10e95 404
hudakz 2:ca17c5d846e7 405 /**
hudakz 2:ca17c5d846e7 406 * @brief
hudakz 2:ca17c5d846e7 407 * @note
hudakz 2:ca17c5d846e7 408 * @param
hudakz 2:ca17c5d846e7 409 * @retval
hudakz 2:ca17c5d846e7 410 */
hudakz 4:8620de6d1696 411 bool MQTTClient::publish(const char* topic, uint8_t* payload, uint16_t length)
hudakz 3:5e31b4687aad 412 {
hudakz 4:8620de6d1696 413 return publish(topic, payload, length, false);
hudakz 0:83c732b10e95 414 }
hudakz 0:83c732b10e95 415
hudakz 2:ca17c5d846e7 416 /**
hudakz 2:ca17c5d846e7 417 * @brief
hudakz 2:ca17c5d846e7 418 * @note
hudakz 2:ca17c5d846e7 419 * @param
hudakz 2:ca17c5d846e7 420 * @retval
hudakz 2:ca17c5d846e7 421 */
hudakz 4:8620de6d1696 422 bool MQTTClient::publish(const char* topic, uint8_t* payload, uint16_t plength, bool retained)
hudakz 3:5e31b4687aad 423 {
hudakz 3:5e31b4687aad 424 if (connected()) {
hudakz 0:83c732b10e95 425 // Leave room in the buffer for header and variable length field
hudakz 2:ca17c5d846e7 426 uint16_t length = 5;
hudakz 3:5e31b4687aad 427 length = _writeString(topic, _buffer, length);
hudakz 2:ca17c5d846e7 428
hudakz 2:ca17c5d846e7 429 uint16_t i;
hudakz 3:5e31b4687aad 430 for (i = 0; i < plength; i++) {
hudakz 3:5e31b4687aad 431 _buffer[length++] = payload[i];
hudakz 0:83c732b10e95 432 }
hudakz 2:ca17c5d846e7 433
hudakz 0:83c732b10e95 434 uint8_t header = MQTTPUBLISH;
hudakz 3:5e31b4687aad 435 if (retained) {
hudakz 0:83c732b10e95 436 header |= 1;
hudakz 0:83c732b10e95 437 }
hudakz 2:ca17c5d846e7 438
hudakz 3:5e31b4687aad 439 return _write(header, _buffer, length - 5);
hudakz 0:83c732b10e95 440 }
hudakz 2:ca17c5d846e7 441
hudakz 0:83c732b10e95 442 return false;
hudakz 0:83c732b10e95 443 }
hudakz 0:83c732b10e95 444
hudakz 2:ca17c5d846e7 445 /**
hudakz 2:ca17c5d846e7 446 * @brief
hudakz 2:ca17c5d846e7 447 * @note
hudakz 2:ca17c5d846e7 448 * @param
hudakz 2:ca17c5d846e7 449 * @retval
hudakz 2:ca17c5d846e7 450 */
hudakz 3:5e31b4687aad 451 bool MQTTClient::_write(uint8_t header, uint8_t* buf, uint16_t length)
hudakz 3:5e31b4687aad 452 {
hudakz 4:8620de6d1696 453 uint8_t digitBuf[4];
hudakz 4:8620de6d1696 454 uint8_t digitLen = 0;
hudakz 0:83c732b10e95 455 uint8_t digit;
hudakz 0:83c732b10e95 456 uint8_t pos = 0;
hudakz 0:83c732b10e95 457 uint8_t rc;
hudakz 0:83c732b10e95 458 uint8_t len = length;
hudakz 3:5e31b4687aad 459 do {
hudakz 0:83c732b10e95 460 digit = len % 128;
hudakz 0:83c732b10e95 461 len = len / 128;
hudakz 3:5e31b4687aad 462 if (len > 0) {
hudakz 0:83c732b10e95 463 digit |= 0x80;
hudakz 0:83c732b10e95 464 }
hudakz 2:ca17c5d846e7 465
hudakz 4:8620de6d1696 466 digitBuf[pos++] = digit;
hudakz 4:8620de6d1696 467 digitLen++;
hudakz 3:5e31b4687aad 468 } while (len > 0);
hudakz 0:83c732b10e95 469
hudakz 4:8620de6d1696 470 buf[4 - digitLen] = header;
hudakz 4:8620de6d1696 471 for (int i = 0; i < digitLen; i++) {
hudakz 4:8620de6d1696 472 buf[5 - digitLen + i] = digitBuf[i];
hudakz 0:83c732b10e95 473 }
hudakz 2:ca17c5d846e7 474
hudakz 4:8620de6d1696 475 rc = _client.send(buf + (4 - digitLen), length + 1 + digitLen);
hudakz 0:83c732b10e95 476
hudakz 3:5e31b4687aad 477 _lastOutActivity = time(NULL);
hudakz 4:8620de6d1696 478 return(rc == 1 + digitLen + length);
hudakz 0:83c732b10e95 479 }
hudakz 0:83c732b10e95 480
hudakz 2:ca17c5d846e7 481 /**
hudakz 2:ca17c5d846e7 482 * @brief
hudakz 2:ca17c5d846e7 483 * @note
hudakz 2:ca17c5d846e7 484 * @param
hudakz 2:ca17c5d846e7 485 * @retval
hudakz 2:ca17c5d846e7 486 */
hudakz 3:5e31b4687aad 487 bool MQTTClient::subscribe(const char* topic)
hudakz 3:5e31b4687aad 488 {
hudakz 2:ca17c5d846e7 489 bool result = subscribe(topic, 0);
hudakz 2:ca17c5d846e7 490 wait_ms(50);
hudakz 2:ca17c5d846e7 491 return result;
hudakz 0:83c732b10e95 492 }
hudakz 0:83c732b10e95 493
hudakz 2:ca17c5d846e7 494 /**
hudakz 2:ca17c5d846e7 495 * @brief
hudakz 2:ca17c5d846e7 496 * @note
hudakz 2:ca17c5d846e7 497 * @param
hudakz 2:ca17c5d846e7 498 * @retval
hudakz 2:ca17c5d846e7 499 */
hudakz 3:5e31b4687aad 500 bool MQTTClient::subscribe(const char* topic, uint8_t qos)
hudakz 3:5e31b4687aad 501 {
hudakz 3:5e31b4687aad 502 if (qos > 1)
hudakz 0:83c732b10e95 503 return false;
hudakz 0:83c732b10e95 504
hudakz 3:5e31b4687aad 505 if (connected()) {
hudakz 0:83c732b10e95 506 // Leave room in the buffer for header and variable length field
hudakz 2:ca17c5d846e7 507 uint16_t length = 5;
hudakz 3:5e31b4687aad 508 _nextMsgId++;
hudakz 3:5e31b4687aad 509 if (_nextMsgId == 0) {
hudakz 3:5e31b4687aad 510 _nextMsgId = 1;
hudakz 0:83c732b10e95 511 }
hudakz 2:ca17c5d846e7 512
hudakz 3:5e31b4687aad 513 _buffer[length++] = (_nextMsgId >> 8);
hudakz 3:5e31b4687aad 514 _buffer[length++] = (_nextMsgId & 0xFF);
hudakz 3:5e31b4687aad 515 length = _writeString(topic, _buffer, length);
hudakz 3:5e31b4687aad 516 _buffer[length++] = qos;
hudakz 3:5e31b4687aad 517 return _write(MQTTSUBSCRIBE | MQTTQOS1, _buffer, length - 5);
hudakz 0:83c732b10e95 518 }
hudakz 2:ca17c5d846e7 519
hudakz 0:83c732b10e95 520 return false;
hudakz 0:83c732b10e95 521 }
hudakz 0:83c732b10e95 522
hudakz 2:ca17c5d846e7 523 /**
hudakz 2:ca17c5d846e7 524 * @brief
hudakz 2:ca17c5d846e7 525 * @note
hudakz 2:ca17c5d846e7 526 * @param
hudakz 2:ca17c5d846e7 527 * @retval
hudakz 2:ca17c5d846e7 528 */
hudakz 3:5e31b4687aad 529 bool MQTTClient::unsubscribe(const char* topic)
hudakz 3:5e31b4687aad 530 {
hudakz 3:5e31b4687aad 531 if (connected()) {
hudakz 2:ca17c5d846e7 532 uint16_t length = 5;
hudakz 3:5e31b4687aad 533 _nextMsgId++;
hudakz 3:5e31b4687aad 534 if (_nextMsgId == 0) {
hudakz 3:5e31b4687aad 535 _nextMsgId = 1;
hudakz 0:83c732b10e95 536 }
hudakz 2:ca17c5d846e7 537
hudakz 3:5e31b4687aad 538 _buffer[length++] = (_nextMsgId >> 8);
hudakz 3:5e31b4687aad 539 _buffer[length++] = (_nextMsgId & 0xFF);
hudakz 3:5e31b4687aad 540 length = _writeString(topic, _buffer, length);
hudakz 3:5e31b4687aad 541 return _write(MQTTUNSUBSCRIBE | MQTTQOS1, _buffer, length - 5);
hudakz 0:83c732b10e95 542 }
hudakz 2:ca17c5d846e7 543
hudakz 0:83c732b10e95 544 return false;
hudakz 0:83c732b10e95 545 }
hudakz 0:83c732b10e95 546
hudakz 2:ca17c5d846e7 547 /**
hudakz 2:ca17c5d846e7 548 * @brief
hudakz 2:ca17c5d846e7 549 * @note
hudakz 2:ca17c5d846e7 550 * @param
hudakz 2:ca17c5d846e7 551 * @retval
hudakz 2:ca17c5d846e7 552 */
hudakz 4:8620de6d1696 553 void MQTTClient::disconnect()
hudakz 3:5e31b4687aad 554 {
hudakz 3:5e31b4687aad 555 _buffer[0] = MQTTDISCONNECT;
hudakz 3:5e31b4687aad 556 _buffer[1] = 0;
hudakz 4:8620de6d1696 557 _client.send(_buffer, 2);
hudakz 4:8620de6d1696 558 _client.stop();
hudakz 3:5e31b4687aad 559 _lastInActivity = _lastOutActivity = time(NULL);
hudakz 0:83c732b10e95 560 }
hudakz 0:83c732b10e95 561
hudakz 2:ca17c5d846e7 562 /**
hudakz 2:ca17c5d846e7 563 * @brief
hudakz 2:ca17c5d846e7 564 * @note
hudakz 2:ca17c5d846e7 565 * @param
hudakz 2:ca17c5d846e7 566 * @retval
hudakz 2:ca17c5d846e7 567 */
hudakz 4:8620de6d1696 568 uint16_t MQTTClient::_writeString(const char* string, uint8_t* buf, uint16_t length)
hudakz 3:5e31b4687aad 569 {
hudakz 3:5e31b4687aad 570 char* idp = (char*)string;
hudakz 2:ca17c5d846e7 571 uint16_t i = 0;
hudakz 3:5e31b4687aad 572
hudakz 4:8620de6d1696 573 length += 2;
hudakz 3:5e31b4687aad 574 while (*idp) {
hudakz 4:8620de6d1696 575 buf[length++] = *idp++;
hudakz 0:83c732b10e95 576 i++;
hudakz 0:83c732b10e95 577 }
hudakz 2:ca17c5d846e7 578
hudakz 4:8620de6d1696 579 buf[length - i - 2] = (i >> 8);
hudakz 4:8620de6d1696 580 buf[length - i - 1] = (i & 0xFF);
hudakz 4:8620de6d1696 581 return length;
hudakz 0:83c732b10e95 582 }
hudakz 0:83c732b10e95 583
hudakz 2:ca17c5d846e7 584 /**
hudakz 2:ca17c5d846e7 585 * @brief
hudakz 2:ca17c5d846e7 586 * @note
hudakz 2:ca17c5d846e7 587 * @param
hudakz 2:ca17c5d846e7 588 * @retval
hudakz 2:ca17c5d846e7 589 */
hudakz 4:8620de6d1696 590 bool MQTTClient::connected()
hudakz 3:5e31b4687aad 591 {
hudakz 4:8620de6d1696 592 bool rc = (int)_client.connected();
hudakz 4:8620de6d1696 593
hudakz 4:8620de6d1696 594 if (!rc)
hudakz 4:8620de6d1696 595 _client.stop();
hudakz 0:83c732b10e95 596
hudakz 0:83c732b10e95 597 return rc;
hudakz 0:83c732b10e95 598 }