Local fixes

Committer:
ivo_n
Date:
Thu Sep 24 21:19:41 2020 +0000
Revision:
7:0f12a3d0bd10
Parent:
6:1ce536bf461b
Local Fixes

Who changed what in which revision?

UserRevisionLine numberNew contents of line
hudakz 0:83c732b10e95 1 /*
hudakz 0:83c732b10e95 2 MQTTClient.cpp - A simple client for MQTT.
hudakz 0:83c732b10e95 3 Nicholas O'Leary
hudakz 0:83c732b10e95 4 http://knolleary.net
hudakz 2:ca17c5d846e7 5
hudakz 3:5e31b4687aad 6 Ported to mbed by Zoltan Hudak <hudakz@outlook.com>
hudakz 0:83c732b10e95 7 */
hudakz 0:83c732b10e95 8 #include "MQTTClient.h"
hudakz 0:83c732b10e95 9 #include <string.h>
hudakz 0:83c732b10e95 10 #include <time.h>
hudakz 0:83c732b10e95 11
ivo_n 7:0f12a3d0bd10 12 #define UIPETHERNET_DEBUG 1
ivo_n 7:0f12a3d0bd10 13
hudakz 2:ca17c5d846e7 14 /**
hudakz 2:ca17c5d846e7 15 * @brief
hudakz 2:ca17c5d846e7 16 * @note
hudakz 2:ca17c5d846e7 17 * @param
hudakz 2:ca17c5d846e7 18 * @retval
hudakz 2:ca17c5d846e7 19 */
hudakz 4:8620de6d1696 20 MQTTClient::MQTTClient() :
hudakz 3:5e31b4687aad 21 _client(NULL),
hudakz 5:361a6987739b 22 _stream(NULL),
hudakz 5:361a6987739b 23 _onMessage(NULL)
hudakz 3:5e31b4687aad 24 { }
hudakz 0:83c732b10e95 25
hudakz 2:ca17c5d846e7 26 /**
hudakz 2:ca17c5d846e7 27 * @brief
hudakz 2:ca17c5d846e7 28 * @note
hudakz 2:ca17c5d846e7 29 * @param
hudakz 2:ca17c5d846e7 30 * @retval
hudakz 2:ca17c5d846e7 31 */
hudakz 5:361a6987739b 32 MQTTClient::MQTTClient(IpAddress& ip, uint16_t port) :
hudakz 3:5e31b4687aad 33 _ip(ip),
hudakz 3:5e31b4687aad 34 _domain(NULL),
hudakz 3:5e31b4687aad 35 _port(port),
hudakz 3:5e31b4687aad 36 _stream(NULL),
hudakz 5:361a6987739b 37 _onMessage(NULL)
hudakz 3:5e31b4687aad 38 { }
hudakz 0:83c732b10e95 39
hudakz 2:ca17c5d846e7 40 /**
hudakz 2:ca17c5d846e7 41 * @brief
hudakz 2:ca17c5d846e7 42 * @note
hudakz 2:ca17c5d846e7 43 * @param
hudakz 2:ca17c5d846e7 44 * @retval
hudakz 2:ca17c5d846e7 45 */
hudakz 5:361a6987739b 46 MQTTClient::MQTTClient(const char* domain, uint16_t port) :
hudakz 3:5e31b4687aad 47 _domain((char*)domain),
hudakz 3:5e31b4687aad 48 _port(port),
hudakz 3:5e31b4687aad 49 _stream(NULL),
hudakz 5:361a6987739b 50 _onMessage(NULL)
hudakz 3:5e31b4687aad 51 { }
hudakz 0:83c732b10e95 52
hudakz 2:ca17c5d846e7 53 /**
hudakz 2:ca17c5d846e7 54 * @brief
hudakz 2:ca17c5d846e7 55 * @note
hudakz 2:ca17c5d846e7 56 * @param
hudakz 2:ca17c5d846e7 57 * @retval
hudakz 2:ca17c5d846e7 58 */
hudakz 5:361a6987739b 59 MQTTClient::MQTTClient(IpAddress& ip, uint16_t port, Stream& stream) :
hudakz 3:5e31b4687aad 60 _ip(ip),
hudakz 3:5e31b4687aad 61 _domain(NULL),
hudakz 3:5e31b4687aad 62 _port(port),
hudakz 3:5e31b4687aad 63 _stream(&stream),
hudakz 5:361a6987739b 64 _onMessage(NULL)
hudakz 3:5e31b4687aad 65 { }
hudakz 0:83c732b10e95 66
hudakz 2:ca17c5d846e7 67 /**
hudakz 2:ca17c5d846e7 68 * @brief
hudakz 2:ca17c5d846e7 69 * @note
hudakz 2:ca17c5d846e7 70 * @param
hudakz 2:ca17c5d846e7 71 * @retval
hudakz 2:ca17c5d846e7 72 */
hudakz 5:361a6987739b 73 MQTTClient::MQTTClient(const char* domain, uint16_t port, Stream& stream) :
hudakz 3:5e31b4687aad 74 _domain((char*)domain),
hudakz 3:5e31b4687aad 75 _port(port),
hudakz 3:5e31b4687aad 76 _stream(&stream),
hudakz 5:361a6987739b 77 _onMessage(NULL)
hudakz 3:5e31b4687aad 78 { }
hudakz 0:83c732b10e95 79
hudakz 2:ca17c5d846e7 80 /**
hudakz 2:ca17c5d846e7 81 * @brief
hudakz 2:ca17c5d846e7 82 * @note
hudakz 2:ca17c5d846e7 83 * @param
hudakz 2:ca17c5d846e7 84 * @retval
hudakz 2:ca17c5d846e7 85 */
hudakz 3:5e31b4687aad 86 bool MQTTClient::connect(const char* id)
hudakz 3:5e31b4687aad 87 {
hudakz 2:ca17c5d846e7 88 return connect(id, NULL, NULL, 0, 0, 0, 0);
hudakz 0:83c732b10e95 89 }
hudakz 0:83c732b10e95 90
hudakz 2:ca17c5d846e7 91 /**
hudakz 2:ca17c5d846e7 92 * @brief
hudakz 2:ca17c5d846e7 93 * @note
hudakz 2:ca17c5d846e7 94 * @param
hudakz 2:ca17c5d846e7 95 * @retval
hudakz 2:ca17c5d846e7 96 */
hudakz 3:5e31b4687aad 97 bool MQTTClient::connect(const char* id, const char* user, const char* pass)
hudakz 3:5e31b4687aad 98 {
hudakz 2:ca17c5d846e7 99 return connect(id, user, pass, 0, 0, 0, 0);
hudakz 0:83c732b10e95 100 }
hudakz 0:83c732b10e95 101
hudakz 2:ca17c5d846e7 102 /**
hudakz 2:ca17c5d846e7 103 * @brief
hudakz 2:ca17c5d846e7 104 * @note
hudakz 2:ca17c5d846e7 105 * @param
hudakz 2:ca17c5d846e7 106 * @retval
hudakz 2:ca17c5d846e7 107 */
hudakz 4:8620de6d1696 108 bool MQTTClient::connect
hudakz 4:8620de6d1696 109 (
hudakz 4:8620de6d1696 110 const char* id,
hudakz 4:8620de6d1696 111 const char* willTopic,
hudakz 4:8620de6d1696 112 uint8_t willQos,
hudakz 4:8620de6d1696 113 uint8_t willRetain,
hudakz 4:8620de6d1696 114 const char* willMessage
hudakz 4:8620de6d1696 115 )
hudakz 3:5e31b4687aad 116 {
hudakz 2:ca17c5d846e7 117 return connect(id, NULL, NULL, willTopic, willQos, willRetain, willMessage);
hudakz 0:83c732b10e95 118 }
hudakz 0:83c732b10e95 119
hudakz 2:ca17c5d846e7 120 /**
hudakz 2:ca17c5d846e7 121 * @brief
hudakz 2:ca17c5d846e7 122 * @note
hudakz 2:ca17c5d846e7 123 * @param
hudakz 2:ca17c5d846e7 124 * @retval
hudakz 2:ca17c5d846e7 125 */
hudakz 2:ca17c5d846e7 126 bool MQTTClient::connect
hudakz 2:ca17c5d846e7 127 (
hudakz 4:8620de6d1696 128 const char* id,
hudakz 4:8620de6d1696 129 const char* user,
hudakz 4:8620de6d1696 130 const char* pass,
hudakz 4:8620de6d1696 131 const char* willTopic,
hudakz 4:8620de6d1696 132 uint8_t willQos,
hudakz 4:8620de6d1696 133 uint8_t willRetain,
hudakz 4:8620de6d1696 134 const char* willMessage
hudakz 3:5e31b4687aad 135 )
hudakz 3:5e31b4687aad 136 {
hudakz 3:5e31b4687aad 137 if (!connected()) {
ivo_n 7:0f12a3d0bd10 138 #ifdef UIPETHERNET_DEBUG
ivo_n 7:0f12a3d0bd10 139 printf("MQTTClient::connect \r\n");
ivo_n 7:0f12a3d0bd10 140 #endif
hudakz 0:83c732b10e95 141 int result = 0;
hudakz 0:83c732b10e95 142
hudakz 3:5e31b4687aad 143 if (_domain != NULL) {
hudakz 4:8620de6d1696 144 result = !_client.connect(this->_domain, this->_port);
hudakz 2:ca17c5d846e7 145 }
hudakz 2:ca17c5d846e7 146 else {
hudakz 4:8620de6d1696 147 result = _client.connect(this->_ip, this->_port);
hudakz 0:83c732b10e95 148 }
hudakz 0:83c732b10e95 149
hudakz 3:5e31b4687aad 150 if (result) {
hudakz 3:5e31b4687aad 151 _nextMsgId = 1;
hudakz 2:ca17c5d846e7 152
hudakz 5:361a6987739b 153 uint8_t d[] = { 0x00, 0x06, 'M', 'Q', 'I', 's', 'd', 'p', MQTTPROTOCOLVERSION };
hudakz 2:ca17c5d846e7 154
hudakz 0:83c732b10e95 155 // Leave room in the buffer for header and variable length field
hudakz 4:8620de6d1696 156 uint16_t pos = 5;
hudakz 4:8620de6d1696 157 uint16_t j;
hudakz 4:8620de6d1696 158 for (j = 0; j < sizeof(d); j++) {
hudakz 4:8620de6d1696 159 _buffer[pos++] = d[j];
hudakz 0:83c732b10e95 160 }
hudakz 0:83c732b10e95 161
hudakz 0:83c732b10e95 162 uint8_t v;
hudakz 3:5e31b4687aad 163 if (willTopic) {
hudakz 2:ca17c5d846e7 164 v = 0x06 | (willQos << 3) | (willRetain << 5);
hudakz 2:ca17c5d846e7 165 }
hudakz 2:ca17c5d846e7 166 else {
hudakz 0:83c732b10e95 167 v = 0x02;
hudakz 0:83c732b10e95 168 }
hudakz 0:83c732b10e95 169
hudakz 3:5e31b4687aad 170 if (user != NULL) {
hudakz 2:ca17c5d846e7 171 v = v | 0x80;
hudakz 0:83c732b10e95 172
hudakz 3:5e31b4687aad 173 if (pass != NULL) {
hudakz 2:ca17c5d846e7 174 v = v | (0x80 >> 1);
hudakz 0:83c732b10e95 175 }
hudakz 0:83c732b10e95 176 }
hudakz 0:83c732b10e95 177
hudakz 4:8620de6d1696 178 _buffer[pos++] = v;
hudakz 0:83c732b10e95 179
hudakz 4:8620de6d1696 180 _buffer[pos++] = ((MQTT_KEEPALIVE) >> 8);
hudakz 4:8620de6d1696 181 _buffer[pos++] = ((MQTT_KEEPALIVE) & 0xFF);
hudakz 4:8620de6d1696 182 pos = _writeString(id, _buffer, pos);
hudakz 3:5e31b4687aad 183 if (willTopic) {
hudakz 4:8620de6d1696 184 pos = _writeString(willTopic, _buffer, pos);
hudakz 4:8620de6d1696 185 pos = _writeString(willMessage, _buffer, pos);
hudakz 0:83c732b10e95 186 }
hudakz 0:83c732b10e95 187
hudakz 3:5e31b4687aad 188 if (user != NULL) {
hudakz 4:8620de6d1696 189 pos = _writeString(user, _buffer, pos);
hudakz 3:5e31b4687aad 190 if (pass != NULL) {
hudakz 4:8620de6d1696 191 pos = _writeString(pass, _buffer, pos);
hudakz 0:83c732b10e95 192 }
hudakz 0:83c732b10e95 193 }
hudakz 0:83c732b10e95 194
hudakz 4:8620de6d1696 195 _write(MQTTCONNECT, _buffer, pos - 5);
hudakz 0:83c732b10e95 196
hudakz 3:5e31b4687aad 197 _lastInActivity = _lastOutActivity = time(NULL);
hudakz 0:83c732b10e95 198
hudakz 4:8620de6d1696 199 while (!_client.available()) {
hudakz 2:ca17c5d846e7 200 unsigned long t = time(NULL);
hudakz 3:5e31b4687aad 201 if (t - _lastInActivity > MQTT_KEEPALIVE) {
hudakz 4:8620de6d1696 202 _client.stop();
ivo_n 7:0f12a3d0bd10 203 #ifdef UIPETHERNET_DEBUG
ivo_n 7:0f12a3d0bd10 204 printf("MQTTClient::connect available \r\n");
ivo_n 7:0f12a3d0bd10 205 #endif
hudakz 0:83c732b10e95 206 return false;
hudakz 0:83c732b10e95 207 }
hudakz 0:83c732b10e95 208 }
hudakz 0:83c732b10e95 209
hudakz 4:8620de6d1696 210 uint8_t len;
hudakz 2:ca17c5d846e7 211
hudakz 4:8620de6d1696 212 if (_readPacket(&len) == 4 && _buffer[3] == 0) {
hudakz 3:5e31b4687aad 213 _lastInActivity = time(NULL);
hudakz 3:5e31b4687aad 214 _pingOutstanding = false;
ivo_n 7:0f12a3d0bd10 215 #ifdef UIPETHERNET_DEBUG
ivo_n 7:0f12a3d0bd10 216 printf("MQTTClient::connect 4 \r\n");
ivo_n 7:0f12a3d0bd10 217 #endif
hudakz 0:83c732b10e95 218 return true;
hudakz 0:83c732b10e95 219 }
hudakz 0:83c732b10e95 220 }
hudakz 2:ca17c5d846e7 221
hudakz 4:8620de6d1696 222 _client.stop();
hudakz 0:83c732b10e95 223 }
ivo_n 7:0f12a3d0bd10 224 #ifdef UIPETHERNET_DEBUG
ivo_n 7:0f12a3d0bd10 225 printf("MQTTClient::connect End \r\n");
ivo_n 7:0f12a3d0bd10 226 #endif
hudakz 2:ca17c5d846e7 227
hudakz 0:83c732b10e95 228 return false;
hudakz 0:83c732b10e95 229 }
hudakz 0:83c732b10e95 230
hudakz 2:ca17c5d846e7 231 /**
hudakz 2:ca17c5d846e7 232 * @brief
hudakz 2:ca17c5d846e7 233 * @note
hudakz 2:ca17c5d846e7 234 * @param
hudakz 2:ca17c5d846e7 235 * @retval
hudakz 2:ca17c5d846e7 236 */
hudakz 4:8620de6d1696 237 uint8_t MQTTClient::_readByte()
hudakz 3:5e31b4687aad 238 {
hudakz 4:8620de6d1696 239 while (!_client.available()) { }
hudakz 2:ca17c5d846e7 240
hudakz 4:8620de6d1696 241 return _client.recv();
hudakz 0:83c732b10e95 242 }
hudakz 0:83c732b10e95 243
hudakz 2:ca17c5d846e7 244 /**
hudakz 2:ca17c5d846e7 245 * @brief
hudakz 2:ca17c5d846e7 246 * @note
hudakz 2:ca17c5d846e7 247 * @param
hudakz 2:ca17c5d846e7 248 * @retval
hudakz 2:ca17c5d846e7 249 */
hudakz 4:8620de6d1696 250 uint16_t MQTTClient::_readPacket(uint8_t* length)
hudakz 3:5e31b4687aad 251 {
ivo_n 7:0f12a3d0bd10 252 #ifdef UIPETHERNET_DEBUG
ivo_n 7:0f12a3d0bd10 253 printf("MQTTClient::_readPacket \r\n");
ivo_n 7:0f12a3d0bd10 254 #endif
ivo_n 7:0f12a3d0bd10 255
ivo_n 7:0f12a3d0bd10 256
hudakz 2:ca17c5d846e7 257 uint16_t len = 0;
hudakz 3:5e31b4687aad 258 _buffer[len++] = _readByte();
hudakz 0:83c732b10e95 259
hudakz 3:5e31b4687aad 260 bool isPublish = (_buffer[0] & 0xF0) == MQTTPUBLISH;
hudakz 2:ca17c5d846e7 261 uint32_t multiplier = 1;
hudakz 4:8620de6d1696 262 uint16_t pos = 0;
hudakz 2:ca17c5d846e7 263 uint8_t digit = 0;
hudakz 2:ca17c5d846e7 264 uint16_t skip = 0;
hudakz 2:ca17c5d846e7 265 uint8_t start = 0;
hudakz 2:ca17c5d846e7 266
hudakz 3:5e31b4687aad 267 do {
hudakz 3:5e31b4687aad 268 digit = _readByte();
hudakz 3:5e31b4687aad 269 _buffer[len++] = digit;
hudakz 4:8620de6d1696 270 pos += (digit & 127) * multiplier;
hudakz 0:83c732b10e95 271 multiplier *= 128;
hudakz 3:5e31b4687aad 272 } while ((digit & 128) != 0);
hudakz 4:8620de6d1696 273 *length = len - 1;
hudakz 0:83c732b10e95 274
hudakz 3:5e31b4687aad 275 if (isPublish) {
hudakz 0:83c732b10e95 276 // Read in topic length to calculate bytes to skip over for Stream writing
hudakz 3:5e31b4687aad 277 _buffer[len++] = _readByte();
hudakz 3:5e31b4687aad 278 _buffer[len++] = _readByte();
hudakz 4:8620de6d1696 279 skip = (_buffer[*length + 1] << 8) + _buffer[*length + 2];
hudakz 0:83c732b10e95 280 start = 2;
hudakz 3:5e31b4687aad 281 if (_buffer[0] & MQTTQOS1) {
hudakz 0:83c732b10e95 282 // skip message id
hudakz 0:83c732b10e95 283 skip += 2;
hudakz 0:83c732b10e95 284 }
hudakz 0:83c732b10e95 285 }
hudakz 0:83c732b10e95 286
hudakz 4:8620de6d1696 287 for (uint16_t i = start; i < pos; i++) {
hudakz 3:5e31b4687aad 288 digit = _readByte();
hudakz 3:5e31b4687aad 289 if (this->_stream) {
hudakz 4:8620de6d1696 290 if (isPublish && len -*length - 2 > skip) {
hudakz 3:5e31b4687aad 291 this->_stream->putc(digit);
hudakz 0:83c732b10e95 292 }
hudakz 0:83c732b10e95 293 }
hudakz 2:ca17c5d846e7 294
hudakz 3:5e31b4687aad 295 if (len < MQTT_MAX_PACKET_SIZE) {
hudakz 3:5e31b4687aad 296 _buffer[len] = digit;
hudakz 0:83c732b10e95 297 }
hudakz 2:ca17c5d846e7 298
hudakz 0:83c732b10e95 299 len++;
hudakz 0:83c732b10e95 300 }
hudakz 0:83c732b10e95 301
hudakz 3:5e31b4687aad 302 if (!this->_stream && len > MQTT_MAX_PACKET_SIZE) {
hudakz 2:ca17c5d846e7 303 len = 0; // This will cause the packet to be ignored.
hudakz 0:83c732b10e95 304 }
hudakz 0:83c732b10e95 305
ivo_n 7:0f12a3d0bd10 306 #ifdef UIPETHERNET_DEBUG
ivo_n 7:0f12a3d0bd10 307 printf("MQTTClient::_readPacket End \r\n");
ivo_n 7:0f12a3d0bd10 308 #endif
ivo_n 7:0f12a3d0bd10 309
hudakz 0:83c732b10e95 310 return len;
hudakz 0:83c732b10e95 311 }
hudakz 0:83c732b10e95 312
hudakz 2:ca17c5d846e7 313 /**
hudakz 2:ca17c5d846e7 314 * @brief
hudakz 2:ca17c5d846e7 315 * @note
hudakz 2:ca17c5d846e7 316 * @param
hudakz 2:ca17c5d846e7 317 * @retval
hudakz 2:ca17c5d846e7 318 */
hudakz 5:361a6987739b 319 bool MQTTClient::poll()
hudakz 3:5e31b4687aad 320 {
ivo_n 7:0f12a3d0bd10 321
ivo_n 7:0f12a3d0bd10 322 #ifdef UIPETHERNET_DEBUG
ivo_n 7:0f12a3d0bd10 323 // printf("bool MQTTClient::poll() \r\n");
ivo_n 7:0f12a3d0bd10 324 #endif
ivo_n 7:0f12a3d0bd10 325
hudakz 3:5e31b4687aad 326 if (connected()) {
hudakz 4:8620de6d1696 327 time_t now = time(NULL);
hudakz 4:8620de6d1696 328 if ((now - _lastInActivity > MQTT_KEEPALIVE) || (now - _lastOutActivity > MQTT_KEEPALIVE)) {
hudakz 3:5e31b4687aad 329 if (_pingOutstanding) {
hudakz 4:8620de6d1696 330 _client.stop();
hudakz 0:83c732b10e95 331 return false;
hudakz 2:ca17c5d846e7 332 }
hudakz 2:ca17c5d846e7 333 else {
hudakz 3:5e31b4687aad 334 _buffer[0] = MQTTPINGREQ;
hudakz 3:5e31b4687aad 335 _buffer[1] = 0;
hudakz 4:8620de6d1696 336 _client.send(_buffer, 2);
hudakz 4:8620de6d1696 337 _lastOutActivity = now;
hudakz 4:8620de6d1696 338 _lastInActivity = now;
hudakz 3:5e31b4687aad 339 _pingOutstanding = true;
hudakz 0:83c732b10e95 340 }
hudakz 0:83c732b10e95 341 }
hudakz 2:ca17c5d846e7 342
hudakz 4:8620de6d1696 343 if (_client.available()) {
hudakz 4:8620de6d1696 344 uint8_t len;
hudakz 4:8620de6d1696 345 uint16_t length = _readPacket(&len);
hudakz 2:ca17c5d846e7 346 uint16_t msgId = 0;
hudakz 2:ca17c5d846e7 347 uint8_t* payload;
hudakz 4:8620de6d1696 348 if (length > 0) {
hudakz 4:8620de6d1696 349 _lastInActivity = now;
hudakz 2:ca17c5d846e7 350
hudakz 3:5e31b4687aad 351 uint8_t type = _buffer[0] & 0xF0;
hudakz 3:5e31b4687aad 352 if (type == MQTTPUBLISH) {
hudakz 3:5e31b4687aad 353 if (_onMessage) {
hudakz 4:8620de6d1696 354 uint16_t topicLen = (_buffer[len + 1] << 8) + _buffer[len + 2];
hudakz 4:8620de6d1696 355 char topic[topicLen + 1];
hudakz 4:8620de6d1696 356 for (uint16_t i = 0; i < topicLen; i++) {
hudakz 4:8620de6d1696 357 topic[i] = _buffer[len + 3 + i];
hudakz 0:83c732b10e95 358 }
hudakz 2:ca17c5d846e7 359
hudakz 4:8620de6d1696 360 topic[topicLen] = '\0';
hudakz 2:ca17c5d846e7 361
hudakz 0:83c732b10e95 362 // msgId only present for QOS>0
hudakz 3:5e31b4687aad 363 if ((_buffer[0] & 0x06) == MQTTQOS1) {
hudakz 4:8620de6d1696 364 msgId = (_buffer[len + 3 + topicLen] << 8) + _buffer[len + 3 + topicLen + 1];
hudakz 4:8620de6d1696 365 payload = _buffer + len + 3 + topicLen + 2;
hudakz 4:8620de6d1696 366 _onMessage(topic, payload, length - len - 3 - topicLen - 2);
hudakz 0:83c732b10e95 367
hudakz 3:5e31b4687aad 368 _buffer[0] = MQTTPUBACK;
hudakz 3:5e31b4687aad 369 _buffer[1] = 2;
hudakz 3:5e31b4687aad 370 _buffer[2] = (msgId >> 8);
hudakz 3:5e31b4687aad 371 _buffer[3] = (msgId & 0xFF);
hudakz 4:8620de6d1696 372 _client.send(_buffer, 4);
hudakz 4:8620de6d1696 373 _lastOutActivity = now;
hudakz 2:ca17c5d846e7 374 }
hudakz 2:ca17c5d846e7 375 else {
hudakz 4:8620de6d1696 376 payload = _buffer + len + 3 + topicLen;
hudakz 4:8620de6d1696 377 _onMessage(topic, payload, length - len - 3 - topicLen);
hudakz 0:83c732b10e95 378 }
hudakz 0:83c732b10e95 379 }
hudakz 2:ca17c5d846e7 380 }
hudakz 2:ca17c5d846e7 381 else
hudakz 3:5e31b4687aad 382 if (type == MQTTPINGREQ) {
hudakz 3:5e31b4687aad 383 _buffer[0] = MQTTPINGRESP;
hudakz 3:5e31b4687aad 384 _buffer[1] = 0;
hudakz 4:8620de6d1696 385 _client.send(_buffer, 2);
hudakz 2:ca17c5d846e7 386 }
hudakz 2:ca17c5d846e7 387 else
hudakz 3:5e31b4687aad 388 if (type == MQTTPINGRESP) {
hudakz 3:5e31b4687aad 389 _pingOutstanding = false;
hudakz 0:83c732b10e95 390 }
hudakz 0:83c732b10e95 391 }
hudakz 0:83c732b10e95 392 }
hudakz 2:ca17c5d846e7 393
hudakz 0:83c732b10e95 394 return true;
hudakz 0:83c732b10e95 395 }
hudakz 2:ca17c5d846e7 396
ivo_n 7:0f12a3d0bd10 397 #ifdef UIPETHERNET_DEBUG
ivo_n 7:0f12a3d0bd10 398 printf("bool MQTTClient::poll() End \r\n");
ivo_n 7:0f12a3d0bd10 399 #endif
ivo_n 7:0f12a3d0bd10 400
hudakz 0:83c732b10e95 401 return false;
hudakz 0:83c732b10e95 402 }
hudakz 0:83c732b10e95 403
hudakz 2:ca17c5d846e7 404 /**
hudakz 2:ca17c5d846e7 405 * @brief
hudakz 2:ca17c5d846e7 406 * @note
hudakz 2:ca17c5d846e7 407 * @param
hudakz 2:ca17c5d846e7 408 * @retval
hudakz 2:ca17c5d846e7 409 */
hudakz 3:5e31b4687aad 410 bool MQTTClient::publish(const char* topic, const char* payload)
hudakz 3:5e31b4687aad 411 {
hudakz 2:ca17c5d846e7 412 return publish(topic, (uint8_t*)payload, strlen(payload), false);
hudakz 0:83c732b10e95 413 }
hudakz 0:83c732b10e95 414
hudakz 2:ca17c5d846e7 415 /**
hudakz 2:ca17c5d846e7 416 * @brief
hudakz 2:ca17c5d846e7 417 * @note
hudakz 2:ca17c5d846e7 418 * @param
hudakz 2:ca17c5d846e7 419 * @retval
hudakz 2:ca17c5d846e7 420 */
hudakz 4:8620de6d1696 421 bool MQTTClient::publish(const char* topic, uint8_t* payload, uint16_t length)
hudakz 3:5e31b4687aad 422 {
hudakz 4:8620de6d1696 423 return publish(topic, payload, length, false);
hudakz 0:83c732b10e95 424 }
hudakz 0:83c732b10e95 425
hudakz 2:ca17c5d846e7 426 /**
hudakz 2:ca17c5d846e7 427 * @brief
hudakz 2:ca17c5d846e7 428 * @note
hudakz 2:ca17c5d846e7 429 * @param
hudakz 2:ca17c5d846e7 430 * @retval
hudakz 2:ca17c5d846e7 431 */
hudakz 4:8620de6d1696 432 bool MQTTClient::publish(const char* topic, uint8_t* payload, uint16_t plength, bool retained)
hudakz 3:5e31b4687aad 433 {
ivo_n 7:0f12a3d0bd10 434
ivo_n 7:0f12a3d0bd10 435 #ifdef UIPETHERNET_DEBUG
ivo_n 7:0f12a3d0bd10 436 printf("MQTTClient::publish \r\n");
ivo_n 7:0f12a3d0bd10 437 #endif
ivo_n 7:0f12a3d0bd10 438
hudakz 3:5e31b4687aad 439 if (connected()) {
hudakz 0:83c732b10e95 440 // Leave room in the buffer for header and variable length field
hudakz 2:ca17c5d846e7 441 uint16_t length = 5;
hudakz 3:5e31b4687aad 442 length = _writeString(topic, _buffer, length);
hudakz 2:ca17c5d846e7 443
hudakz 2:ca17c5d846e7 444 uint16_t i;
hudakz 3:5e31b4687aad 445 for (i = 0; i < plength; i++) {
hudakz 3:5e31b4687aad 446 _buffer[length++] = payload[i];
hudakz 0:83c732b10e95 447 }
hudakz 2:ca17c5d846e7 448
hudakz 0:83c732b10e95 449 uint8_t header = MQTTPUBLISH;
hudakz 3:5e31b4687aad 450 if (retained) {
hudakz 0:83c732b10e95 451 header |= 1;
hudakz 0:83c732b10e95 452 }
hudakz 2:ca17c5d846e7 453
hudakz 3:5e31b4687aad 454 return _write(header, _buffer, length - 5);
hudakz 0:83c732b10e95 455 }
hudakz 2:ca17c5d846e7 456
ivo_n 7:0f12a3d0bd10 457 #ifdef UIPETHERNET_DEBUG
ivo_n 7:0f12a3d0bd10 458 printf("MQTTClient::publish End \r\n");
ivo_n 7:0f12a3d0bd10 459 #endif
ivo_n 7:0f12a3d0bd10 460
hudakz 0:83c732b10e95 461 return false;
hudakz 0:83c732b10e95 462 }
hudakz 0:83c732b10e95 463
hudakz 2:ca17c5d846e7 464 /**
hudakz 2:ca17c5d846e7 465 * @brief
hudakz 2:ca17c5d846e7 466 * @note
hudakz 2:ca17c5d846e7 467 * @param
hudakz 2:ca17c5d846e7 468 * @retval
hudakz 2:ca17c5d846e7 469 */
hudakz 3:5e31b4687aad 470 bool MQTTClient::_write(uint8_t header, uint8_t* buf, uint16_t length)
hudakz 3:5e31b4687aad 471 {
hudakz 4:8620de6d1696 472 uint8_t digitBuf[4];
hudakz 4:8620de6d1696 473 uint8_t digitLen = 0;
hudakz 0:83c732b10e95 474 uint8_t digit;
hudakz 0:83c732b10e95 475 uint8_t pos = 0;
hudakz 0:83c732b10e95 476 uint8_t rc;
hudakz 0:83c732b10e95 477 uint8_t len = length;
hudakz 3:5e31b4687aad 478 do {
hudakz 0:83c732b10e95 479 digit = len % 128;
hudakz 0:83c732b10e95 480 len = len / 128;
hudakz 3:5e31b4687aad 481 if (len > 0) {
hudakz 0:83c732b10e95 482 digit |= 0x80;
hudakz 0:83c732b10e95 483 }
hudakz 2:ca17c5d846e7 484
hudakz 4:8620de6d1696 485 digitBuf[pos++] = digit;
hudakz 4:8620de6d1696 486 digitLen++;
hudakz 3:5e31b4687aad 487 } while (len > 0);
hudakz 0:83c732b10e95 488
hudakz 4:8620de6d1696 489 buf[4 - digitLen] = header;
hudakz 4:8620de6d1696 490 for (int i = 0; i < digitLen; i++) {
hudakz 4:8620de6d1696 491 buf[5 - digitLen + i] = digitBuf[i];
hudakz 0:83c732b10e95 492 }
hudakz 2:ca17c5d846e7 493
hudakz 4:8620de6d1696 494 rc = _client.send(buf + (4 - digitLen), length + 1 + digitLen);
hudakz 0:83c732b10e95 495
hudakz 3:5e31b4687aad 496 _lastOutActivity = time(NULL);
hudakz 4:8620de6d1696 497 return(rc == 1 + digitLen + length);
hudakz 0:83c732b10e95 498 }
hudakz 0:83c732b10e95 499
hudakz 2:ca17c5d846e7 500 /**
hudakz 2:ca17c5d846e7 501 * @brief
hudakz 2:ca17c5d846e7 502 * @note
hudakz 2:ca17c5d846e7 503 * @param
hudakz 2:ca17c5d846e7 504 * @retval
hudakz 2:ca17c5d846e7 505 */
hudakz 3:5e31b4687aad 506 bool MQTTClient::subscribe(const char* topic)
hudakz 3:5e31b4687aad 507 {
hudakz 2:ca17c5d846e7 508 bool result = subscribe(topic, 0);
hudakz 6:1ce536bf461b 509 #if MBED_MAJOR_VERSION == 2
hudakz 2:ca17c5d846e7 510 wait_ms(50);
hudakz 6:1ce536bf461b 511 #else
hudakz 6:1ce536bf461b 512 thread_sleep_for(50);
hudakz 6:1ce536bf461b 513 #endif
hudakz 6:1ce536bf461b 514
hudakz 2:ca17c5d846e7 515 return result;
hudakz 0:83c732b10e95 516 }
hudakz 0:83c732b10e95 517
hudakz 2:ca17c5d846e7 518 /**
hudakz 2:ca17c5d846e7 519 * @brief
hudakz 2:ca17c5d846e7 520 * @note
hudakz 2:ca17c5d846e7 521 * @param
hudakz 2:ca17c5d846e7 522 * @retval
hudakz 2:ca17c5d846e7 523 */
hudakz 3:5e31b4687aad 524 bool MQTTClient::subscribe(const char* topic, uint8_t qos)
hudakz 3:5e31b4687aad 525 {
ivo_n 7:0f12a3d0bd10 526 #ifdef UIPETHERNET_DEBUG
ivo_n 7:0f12a3d0bd10 527 printf("MQTTClient::subscribe \r\n");
ivo_n 7:0f12a3d0bd10 528 #endif
ivo_n 7:0f12a3d0bd10 529
hudakz 3:5e31b4687aad 530 if (qos > 1)
hudakz 0:83c732b10e95 531 return false;
hudakz 0:83c732b10e95 532
hudakz 3:5e31b4687aad 533 if (connected()) {
hudakz 0:83c732b10e95 534 // Leave room in the buffer for header and variable length field
hudakz 2:ca17c5d846e7 535 uint16_t length = 5;
hudakz 3:5e31b4687aad 536 _nextMsgId++;
hudakz 3:5e31b4687aad 537 if (_nextMsgId == 0) {
hudakz 3:5e31b4687aad 538 _nextMsgId = 1;
hudakz 0:83c732b10e95 539 }
hudakz 2:ca17c5d846e7 540
hudakz 3:5e31b4687aad 541 _buffer[length++] = (_nextMsgId >> 8);
hudakz 3:5e31b4687aad 542 _buffer[length++] = (_nextMsgId & 0xFF);
hudakz 3:5e31b4687aad 543 length = _writeString(topic, _buffer, length);
hudakz 3:5e31b4687aad 544 _buffer[length++] = qos;
hudakz 3:5e31b4687aad 545 return _write(MQTTSUBSCRIBE | MQTTQOS1, _buffer, length - 5);
hudakz 0:83c732b10e95 546 }
hudakz 2:ca17c5d846e7 547
ivo_n 7:0f12a3d0bd10 548 #ifdef UIPETHERNET_DEBUG
ivo_n 7:0f12a3d0bd10 549 printf("MQTTClient::subscribe End \r\n");
ivo_n 7:0f12a3d0bd10 550 #endif
ivo_n 7:0f12a3d0bd10 551
hudakz 0:83c732b10e95 552 return false;
hudakz 0:83c732b10e95 553 }
hudakz 0:83c732b10e95 554
hudakz 2:ca17c5d846e7 555 /**
hudakz 2:ca17c5d846e7 556 * @brief
hudakz 2:ca17c5d846e7 557 * @note
hudakz 2:ca17c5d846e7 558 * @param
hudakz 2:ca17c5d846e7 559 * @retval
hudakz 2:ca17c5d846e7 560 */
hudakz 3:5e31b4687aad 561 bool MQTTClient::unsubscribe(const char* topic)
hudakz 3:5e31b4687aad 562 {
ivo_n 7:0f12a3d0bd10 563 #ifdef UIPETHERNET_DEBUG
ivo_n 7:0f12a3d0bd10 564 printf("MQTTClient::unsubscribe \r\n");
ivo_n 7:0f12a3d0bd10 565 #endif
ivo_n 7:0f12a3d0bd10 566
hudakz 3:5e31b4687aad 567 if (connected()) {
hudakz 2:ca17c5d846e7 568 uint16_t length = 5;
hudakz 3:5e31b4687aad 569 _nextMsgId++;
hudakz 3:5e31b4687aad 570 if (_nextMsgId == 0) {
hudakz 3:5e31b4687aad 571 _nextMsgId = 1;
hudakz 0:83c732b10e95 572 }
hudakz 2:ca17c5d846e7 573
hudakz 3:5e31b4687aad 574 _buffer[length++] = (_nextMsgId >> 8);
hudakz 3:5e31b4687aad 575 _buffer[length++] = (_nextMsgId & 0xFF);
hudakz 3:5e31b4687aad 576 length = _writeString(topic, _buffer, length);
hudakz 3:5e31b4687aad 577 return _write(MQTTUNSUBSCRIBE | MQTTQOS1, _buffer, length - 5);
hudakz 0:83c732b10e95 578 }
hudakz 2:ca17c5d846e7 579
ivo_n 7:0f12a3d0bd10 580 #ifdef UIPETHERNET_DEBUG
ivo_n 7:0f12a3d0bd10 581 printf("MQTTClient::unsubscribe End \r\n");
ivo_n 7:0f12a3d0bd10 582 #endif
ivo_n 7:0f12a3d0bd10 583
hudakz 0:83c732b10e95 584 return false;
hudakz 0:83c732b10e95 585 }
hudakz 0:83c732b10e95 586
hudakz 2:ca17c5d846e7 587 /**
hudakz 2:ca17c5d846e7 588 * @brief
hudakz 2:ca17c5d846e7 589 * @note
hudakz 2:ca17c5d846e7 590 * @param
hudakz 2:ca17c5d846e7 591 * @retval
hudakz 2:ca17c5d846e7 592 */
hudakz 4:8620de6d1696 593 void MQTTClient::disconnect()
hudakz 3:5e31b4687aad 594 {
ivo_n 7:0f12a3d0bd10 595 #ifdef UIPETHERNET_DEBUG
ivo_n 7:0f12a3d0bd10 596 printf("MQTTClient::disconnect() \r\n");
ivo_n 7:0f12a3d0bd10 597 #endif
ivo_n 7:0f12a3d0bd10 598
hudakz 3:5e31b4687aad 599 _buffer[0] = MQTTDISCONNECT;
hudakz 3:5e31b4687aad 600 _buffer[1] = 0;
hudakz 4:8620de6d1696 601 _client.send(_buffer, 2);
ivo_n 7:0f12a3d0bd10 602 _client.close();
hudakz 4:8620de6d1696 603 _client.stop();
hudakz 3:5e31b4687aad 604 _lastInActivity = _lastOutActivity = time(NULL);
ivo_n 7:0f12a3d0bd10 605
ivo_n 7:0f12a3d0bd10 606 #ifdef UIPETHERNET_DEBUG
ivo_n 7:0f12a3d0bd10 607 printf("MQTTClient::disconnect() End \r\n");
ivo_n 7:0f12a3d0bd10 608 #endif
ivo_n 7:0f12a3d0bd10 609
hudakz 0:83c732b10e95 610 }
hudakz 0:83c732b10e95 611
hudakz 2:ca17c5d846e7 612 /**
hudakz 2:ca17c5d846e7 613 * @brief
hudakz 2:ca17c5d846e7 614 * @note
hudakz 2:ca17c5d846e7 615 * @param
hudakz 2:ca17c5d846e7 616 * @retval
hudakz 2:ca17c5d846e7 617 */
hudakz 4:8620de6d1696 618 uint16_t MQTTClient::_writeString(const char* string, uint8_t* buf, uint16_t length)
hudakz 3:5e31b4687aad 619 {
hudakz 3:5e31b4687aad 620 char* idp = (char*)string;
hudakz 2:ca17c5d846e7 621 uint16_t i = 0;
hudakz 3:5e31b4687aad 622
hudakz 4:8620de6d1696 623 length += 2;
hudakz 3:5e31b4687aad 624 while (*idp) {
hudakz 4:8620de6d1696 625 buf[length++] = *idp++;
hudakz 0:83c732b10e95 626 i++;
hudakz 0:83c732b10e95 627 }
hudakz 2:ca17c5d846e7 628
hudakz 4:8620de6d1696 629 buf[length - i - 2] = (i >> 8);
hudakz 4:8620de6d1696 630 buf[length - i - 1] = (i & 0xFF);
hudakz 4:8620de6d1696 631 return length;
hudakz 0:83c732b10e95 632 }
hudakz 0:83c732b10e95 633
hudakz 2:ca17c5d846e7 634 /**
hudakz 2:ca17c5d846e7 635 * @brief
hudakz 2:ca17c5d846e7 636 * @note
hudakz 2:ca17c5d846e7 637 * @param
hudakz 2:ca17c5d846e7 638 * @retval
hudakz 2:ca17c5d846e7 639 */
hudakz 4:8620de6d1696 640 bool MQTTClient::connected()
hudakz 3:5e31b4687aad 641 {
hudakz 4:8620de6d1696 642 bool rc = (int)_client.connected();
hudakz 4:8620de6d1696 643
hudakz 4:8620de6d1696 644 if (!rc)
hudakz 4:8620de6d1696 645 _client.stop();
hudakz 0:83c732b10e95 646
hudakz 0:83c732b10e95 647 return rc;
hudakz 0:83c732b10e95 648 }
hudakz 5:361a6987739b 649
hudakz 5:361a6987739b 650 /**
hudakz 5:361a6987739b 651 * @brief
hudakz 5:361a6987739b 652 * @note
hudakz 5:361a6987739b 653 * @param
hudakz 5:361a6987739b 654 * @retval
hudakz 5:361a6987739b 655 */
hudakz 5:361a6987739b 656 void MQTTClient::attach(Callback<void (char *, uint8_t *, uint16_t)> fnc)
hudakz 5:361a6987739b 657 {
hudakz 5:361a6987739b 658 _onMessage = fnc;
hudakz 5:361a6987739b 659 }