123

Committer:
hudakz
Date:
Tue Sep 03 13:41:16 2019 +0000
Revision:
3:5e31b4687aad
Parent:
2:ca17c5d846e7
Child:
4:8620de6d1696
Mbed OS 5 enabled.

Who changed what in which revision?

UserRevisionLine numberNew contents of line
hudakz 0:83c732b10e95 1 /*
hudakz 0:83c732b10e95 2 MQTTClient.cpp - A simple client for MQTT.
hudakz 0:83c732b10e95 3 Nicholas O'Leary
hudakz 0:83c732b10e95 4 http://knolleary.net
hudakz 2:ca17c5d846e7 5
hudakz 3:5e31b4687aad 6 Ported to mbed by Zoltan Hudak <hudakz@outlook.com>
hudakz 0:83c732b10e95 7 */
hudakz 0:83c732b10e95 8 #include "MQTTClient.h"
hudakz 0:83c732b10e95 9 #include <string.h>
hudakz 0:83c732b10e95 10 #include <time.h>
hudakz 0:83c732b10e95 11
hudakz 2:ca17c5d846e7 12 /**
hudakz 2:ca17c5d846e7 13 * @brief
hudakz 2:ca17c5d846e7 14 * @note
hudakz 2:ca17c5d846e7 15 * @param
hudakz 2:ca17c5d846e7 16 * @retval
hudakz 2:ca17c5d846e7 17 */
hudakz 3:5e31b4687aad 18 MQTTClient::MQTTClient(void) :
hudakz 3:5e31b4687aad 19 _client(NULL),
hudakz 3:5e31b4687aad 20 _stream(NULL)
hudakz 3:5e31b4687aad 21 { }
hudakz 0:83c732b10e95 22
hudakz 2:ca17c5d846e7 23 /**
hudakz 2:ca17c5d846e7 24 * @brief
hudakz 2:ca17c5d846e7 25 * @note
hudakz 2:ca17c5d846e7 26 * @param
hudakz 2:ca17c5d846e7 27 * @retval
hudakz 2:ca17c5d846e7 28 */
hudakz 2:ca17c5d846e7 29 MQTTClient::MQTTClient
hudakz 2:ca17c5d846e7 30 (
hudakz 3:5e31b4687aad 31 IpAddress& ip,
hudakz 2:ca17c5d846e7 32 uint16_t port,
hudakz 3:5e31b4687aad 33 void (*onMessage) (char*, uint8_t*, unsigned int),
hudakz 3:5e31b4687aad 34 TcpClient& client
hudakz 3:5e31b4687aad 35 ) :
hudakz 3:5e31b4687aad 36 _client(&client),
hudakz 3:5e31b4687aad 37 _ip(ip),
hudakz 3:5e31b4687aad 38 _domain(NULL),
hudakz 3:5e31b4687aad 39 _port(port),
hudakz 3:5e31b4687aad 40 _stream(NULL),
hudakz 3:5e31b4687aad 41 _onMessage(onMessage)
hudakz 3:5e31b4687aad 42 { }
hudakz 0:83c732b10e95 43
hudakz 2:ca17c5d846e7 44 /**
hudakz 2:ca17c5d846e7 45 * @brief
hudakz 2:ca17c5d846e7 46 * @note
hudakz 2:ca17c5d846e7 47 * @param
hudakz 2:ca17c5d846e7 48 * @retval
hudakz 2:ca17c5d846e7 49 */
hudakz 3:5e31b4687aad 50 MQTTClient::MQTTClient
hudakz 3:5e31b4687aad 51 (
hudakz 3:5e31b4687aad 52 const char* domain,
hudakz 3:5e31b4687aad 53 uint16_t port,
hudakz 3:5e31b4687aad 54 void (*onMessage) (char*, uint8_t*, unsigned int),
hudakz 3:5e31b4687aad 55 TcpClient& client
hudakz 3:5e31b4687aad 56 ) :
hudakz 3:5e31b4687aad 57 _client(&client),
hudakz 3:5e31b4687aad 58 _domain((char*)domain),
hudakz 3:5e31b4687aad 59 _port(port),
hudakz 3:5e31b4687aad 60 _stream(NULL),
hudakz 3:5e31b4687aad 61 _onMessage(onMessage)
hudakz 3:5e31b4687aad 62 { }
hudakz 0:83c732b10e95 63
hudakz 2:ca17c5d846e7 64 /**
hudakz 2:ca17c5d846e7 65 * @brief
hudakz 2:ca17c5d846e7 66 * @note
hudakz 2:ca17c5d846e7 67 * @param
hudakz 2:ca17c5d846e7 68 * @retval
hudakz 2:ca17c5d846e7 69 */
hudakz 2:ca17c5d846e7 70 MQTTClient::MQTTClient
hudakz 2:ca17c5d846e7 71 (
hudakz 3:5e31b4687aad 72 IpAddress& ip,
hudakz 2:ca17c5d846e7 73 uint16_t port,
hudakz 3:5e31b4687aad 74 void (*onMessage) (char*, uint8_t*, unsigned int),
hudakz 3:5e31b4687aad 75 TcpClient& client,
hudakz 3:5e31b4687aad 76 Stream& stream
hudakz 3:5e31b4687aad 77 ) :
hudakz 3:5e31b4687aad 78 _client(&client),
hudakz 3:5e31b4687aad 79 _ip(ip),
hudakz 3:5e31b4687aad 80 _domain(NULL),
hudakz 3:5e31b4687aad 81 _port(port),
hudakz 3:5e31b4687aad 82 _stream(&stream),
hudakz 3:5e31b4687aad 83 _onMessage(onMessage)
hudakz 3:5e31b4687aad 84 { }
hudakz 0:83c732b10e95 85
hudakz 2:ca17c5d846e7 86 /**
hudakz 2:ca17c5d846e7 87 * @brief
hudakz 2:ca17c5d846e7 88 * @note
hudakz 2:ca17c5d846e7 89 * @param
hudakz 2:ca17c5d846e7 90 * @retval
hudakz 2:ca17c5d846e7 91 */
hudakz 2:ca17c5d846e7 92 MQTTClient::MQTTClient
hudakz 2:ca17c5d846e7 93 (
hudakz 3:5e31b4687aad 94 const char* domain,
hudakz 2:ca17c5d846e7 95 uint16_t port,
hudakz 3:5e31b4687aad 96 void (*onMessage) (char*, uint8_t*, unsigned int),
hudakz 3:5e31b4687aad 97 TcpClient& client,
hudakz 3:5e31b4687aad 98 Stream& stream
hudakz 3:5e31b4687aad 99 ) :
hudakz 3:5e31b4687aad 100 _client(&client),
hudakz 3:5e31b4687aad 101 _domain((char*)domain),
hudakz 3:5e31b4687aad 102 _port(port),
hudakz 3:5e31b4687aad 103 _stream(&stream),
hudakz 3:5e31b4687aad 104 _onMessage(onMessage)
hudakz 3:5e31b4687aad 105 { }
hudakz 0:83c732b10e95 106
hudakz 2:ca17c5d846e7 107 /**
hudakz 2:ca17c5d846e7 108 * @brief
hudakz 2:ca17c5d846e7 109 * @note
hudakz 2:ca17c5d846e7 110 * @param
hudakz 2:ca17c5d846e7 111 * @retval
hudakz 2:ca17c5d846e7 112 */
hudakz 3:5e31b4687aad 113 bool MQTTClient::connect(const char* id)
hudakz 3:5e31b4687aad 114 {
hudakz 2:ca17c5d846e7 115 return connect(id, NULL, NULL, 0, 0, 0, 0);
hudakz 0:83c732b10e95 116 }
hudakz 0:83c732b10e95 117
hudakz 2:ca17c5d846e7 118 /**
hudakz 2:ca17c5d846e7 119 * @brief
hudakz 2:ca17c5d846e7 120 * @note
hudakz 2:ca17c5d846e7 121 * @param
hudakz 2:ca17c5d846e7 122 * @retval
hudakz 2:ca17c5d846e7 123 */
hudakz 3:5e31b4687aad 124 bool MQTTClient::connect(const char* id, const char* user, const char* pass)
hudakz 3:5e31b4687aad 125 {
hudakz 2:ca17c5d846e7 126 return connect(id, user, pass, 0, 0, 0, 0);
hudakz 0:83c732b10e95 127 }
hudakz 0:83c732b10e95 128
hudakz 2:ca17c5d846e7 129 /**
hudakz 2:ca17c5d846e7 130 * @brief
hudakz 2:ca17c5d846e7 131 * @note
hudakz 2:ca17c5d846e7 132 * @param
hudakz 2:ca17c5d846e7 133 * @retval
hudakz 2:ca17c5d846e7 134 */
hudakz 3:5e31b4687aad 135 bool MQTTClient::connect(const char* id, const char* willTopic, uint8_t willQos, uint8_t willRetain, const char* willMessage)
hudakz 3:5e31b4687aad 136 {
hudakz 2:ca17c5d846e7 137 return connect(id, NULL, NULL, willTopic, willQos, willRetain, willMessage);
hudakz 0:83c732b10e95 138 }
hudakz 0:83c732b10e95 139
hudakz 2:ca17c5d846e7 140 /**
hudakz 2:ca17c5d846e7 141 * @brief
hudakz 2:ca17c5d846e7 142 * @note
hudakz 2:ca17c5d846e7 143 * @param
hudakz 2:ca17c5d846e7 144 * @retval
hudakz 2:ca17c5d846e7 145 */
hudakz 2:ca17c5d846e7 146 bool MQTTClient::connect
hudakz 2:ca17c5d846e7 147 (
hudakz 3:5e31b4687aad 148 const char* id,
hudakz 3:5e31b4687aad 149 const char* user,
hudakz 3:5e31b4687aad 150 const char* pass,
hudakz 3:5e31b4687aad 151 const char* willTopic,
hudakz 3:5e31b4687aad 152 uint8_t willQos,
hudakz 3:5e31b4687aad 153 uint8_t willRetain,
hudakz 3:5e31b4687aad 154 const char* willMessage
hudakz 3:5e31b4687aad 155 )
hudakz 3:5e31b4687aad 156 {
hudakz 3:5e31b4687aad 157 if (!connected()) {
hudakz 0:83c732b10e95 158 int result = 0;
hudakz 0:83c732b10e95 159
hudakz 3:5e31b4687aad 160 if (_domain != NULL) {
hudakz 3:5e31b4687aad 161 result = !_client->connect(this->_domain, this->_port);
hudakz 2:ca17c5d846e7 162 }
hudakz 2:ca17c5d846e7 163 else {
hudakz 3:5e31b4687aad 164 result = _client->connect(this->_ip, this->_port);
hudakz 0:83c732b10e95 165 }
hudakz 0:83c732b10e95 166
hudakz 3:5e31b4687aad 167 if (result) {
hudakz 3:5e31b4687aad 168 _nextMsgId = 1;
hudakz 2:ca17c5d846e7 169
hudakz 2:ca17c5d846e7 170 uint8_t d[9] = { 0x00, 0x06, 'M', 'Q', 'I', 's', 'd', 'p', MQTTPROTOCOLVERSION };
hudakz 2:ca17c5d846e7 171
hudakz 0:83c732b10e95 172 // Leave room in the buffer for header and variable length field
hudakz 2:ca17c5d846e7 173 uint16_t length = 5;
hudakz 2:ca17c5d846e7 174 unsigned int j;
hudakz 3:5e31b4687aad 175 for (j = 0; j < 9; j++) {
hudakz 3:5e31b4687aad 176 _buffer[length++] = d[j];
hudakz 0:83c732b10e95 177 }
hudakz 0:83c732b10e95 178
hudakz 0:83c732b10e95 179 uint8_t v;
hudakz 3:5e31b4687aad 180 if (willTopic) {
hudakz 2:ca17c5d846e7 181 v = 0x06 | (willQos << 3) | (willRetain << 5);
hudakz 2:ca17c5d846e7 182 }
hudakz 2:ca17c5d846e7 183 else {
hudakz 0:83c732b10e95 184 v = 0x02;
hudakz 0:83c732b10e95 185 }
hudakz 0:83c732b10e95 186
hudakz 3:5e31b4687aad 187 if (user != NULL) {
hudakz 2:ca17c5d846e7 188 v = v | 0x80;
hudakz 0:83c732b10e95 189
hudakz 3:5e31b4687aad 190 if (pass != NULL) {
hudakz 2:ca17c5d846e7 191 v = v | (0x80 >> 1);
hudakz 0:83c732b10e95 192 }
hudakz 0:83c732b10e95 193 }
hudakz 0:83c732b10e95 194
hudakz 3:5e31b4687aad 195 _buffer[length++] = v;
hudakz 0:83c732b10e95 196
hudakz 3:5e31b4687aad 197 _buffer[length++] = ((MQTT_KEEPALIVE) >> 8);
hudakz 3:5e31b4687aad 198 _buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF);
hudakz 3:5e31b4687aad 199 length = _writeString(id, _buffer, length);
hudakz 3:5e31b4687aad 200 if (willTopic) {
hudakz 3:5e31b4687aad 201 length = _writeString(willTopic, _buffer, length);
hudakz 3:5e31b4687aad 202 length = _writeString(willMessage, _buffer, length);
hudakz 0:83c732b10e95 203 }
hudakz 0:83c732b10e95 204
hudakz 3:5e31b4687aad 205 if (user != NULL) {
hudakz 3:5e31b4687aad 206 length = _writeString(user, _buffer, length);
hudakz 3:5e31b4687aad 207 if (pass != NULL) {
hudakz 3:5e31b4687aad 208 length = _writeString(pass, _buffer, length);
hudakz 0:83c732b10e95 209 }
hudakz 0:83c732b10e95 210 }
hudakz 0:83c732b10e95 211
hudakz 3:5e31b4687aad 212 _write(MQTTCONNECT, _buffer, length - 5);
hudakz 0:83c732b10e95 213
hudakz 3:5e31b4687aad 214 _lastInActivity = _lastOutActivity = time(NULL);
hudakz 0:83c732b10e95 215
hudakz 3:5e31b4687aad 216 while (!_client->available()) {
hudakz 2:ca17c5d846e7 217 unsigned long t = time(NULL);
hudakz 3:5e31b4687aad 218 if (t - _lastInActivity > MQTT_KEEPALIVE) {
hudakz 0:83c732b10e95 219 _client->stop();
hudakz 0:83c732b10e95 220 return false;
hudakz 0:83c732b10e95 221 }
hudakz 0:83c732b10e95 222 }
hudakz 0:83c732b10e95 223
hudakz 2:ca17c5d846e7 224 uint8_t llen;
hudakz 3:5e31b4687aad 225 uint16_t len = _readPacket(&llen);
hudakz 2:ca17c5d846e7 226
hudakz 3:5e31b4687aad 227 if (len == 4 && _buffer[3] == 0) {
hudakz 3:5e31b4687aad 228 _lastInActivity = time(NULL);
hudakz 3:5e31b4687aad 229 _pingOutstanding = false;
hudakz 0:83c732b10e95 230 return true;
hudakz 0:83c732b10e95 231 }
hudakz 0:83c732b10e95 232 }
hudakz 2:ca17c5d846e7 233
hudakz 0:83c732b10e95 234 _client->stop();
hudakz 0:83c732b10e95 235 }
hudakz 2:ca17c5d846e7 236
hudakz 0:83c732b10e95 237 return false;
hudakz 0:83c732b10e95 238 }
hudakz 0:83c732b10e95 239
hudakz 2:ca17c5d846e7 240 /**
hudakz 2:ca17c5d846e7 241 * @brief
hudakz 2:ca17c5d846e7 242 * @note
hudakz 2:ca17c5d846e7 243 * @param
hudakz 2:ca17c5d846e7 244 * @retval
hudakz 2:ca17c5d846e7 245 */
hudakz 3:5e31b4687aad 246 uint8_t MQTTClient::_readByte(void)
hudakz 3:5e31b4687aad 247 {
hudakz 3:5e31b4687aad 248 while (!_client->available()) { }
hudakz 2:ca17c5d846e7 249
hudakz 3:5e31b4687aad 250 return _client->recv();
hudakz 0:83c732b10e95 251 }
hudakz 0:83c732b10e95 252
hudakz 2:ca17c5d846e7 253 /**
hudakz 2:ca17c5d846e7 254 * @brief
hudakz 2:ca17c5d846e7 255 * @note
hudakz 2:ca17c5d846e7 256 * @param
hudakz 2:ca17c5d846e7 257 * @retval
hudakz 2:ca17c5d846e7 258 */
hudakz 3:5e31b4687aad 259 uint16_t MQTTClient::_readPacket(uint8_t* lengthLength)
hudakz 3:5e31b4687aad 260 {
hudakz 2:ca17c5d846e7 261 uint16_t len = 0;
hudakz 3:5e31b4687aad 262 _buffer[len++] = _readByte();
hudakz 0:83c732b10e95 263
hudakz 3:5e31b4687aad 264 bool isPublish = (_buffer[0] & 0xF0) == MQTTPUBLISH;
hudakz 2:ca17c5d846e7 265 uint32_t multiplier = 1;
hudakz 2:ca17c5d846e7 266 uint16_t length = 0;
hudakz 2:ca17c5d846e7 267 uint8_t digit = 0;
hudakz 2:ca17c5d846e7 268 uint16_t skip = 0;
hudakz 2:ca17c5d846e7 269 uint8_t start = 0;
hudakz 2:ca17c5d846e7 270
hudakz 3:5e31b4687aad 271 do {
hudakz 3:5e31b4687aad 272 digit = _readByte();
hudakz 3:5e31b4687aad 273 _buffer[len++] = digit;
hudakz 0:83c732b10e95 274 length += (digit & 127) * multiplier;
hudakz 0:83c732b10e95 275 multiplier *= 128;
hudakz 3:5e31b4687aad 276 } while ((digit & 128) != 0);
hudakz 2:ca17c5d846e7 277 *lengthLength = len - 1;
hudakz 0:83c732b10e95 278
hudakz 3:5e31b4687aad 279 if (isPublish) {
hudakz 0:83c732b10e95 280 // Read in topic length to calculate bytes to skip over for Stream writing
hudakz 3:5e31b4687aad 281 _buffer[len++] = _readByte();
hudakz 3:5e31b4687aad 282 _buffer[len++] = _readByte();
hudakz 3:5e31b4687aad 283 skip = (_buffer[*lengthLength + 1] << 8) + _buffer[*lengthLength + 2];
hudakz 0:83c732b10e95 284 start = 2;
hudakz 3:5e31b4687aad 285 if (_buffer[0] & MQTTQOS1) {
hudakz 0:83c732b10e95 286 // skip message id
hudakz 0:83c732b10e95 287 skip += 2;
hudakz 0:83c732b10e95 288 }
hudakz 0:83c732b10e95 289 }
hudakz 0:83c732b10e95 290
hudakz 3:5e31b4687aad 291 for (uint16_t i = start; i < length; i++) {
hudakz 3:5e31b4687aad 292 digit = _readByte();
hudakz 3:5e31b4687aad 293 if (this->_stream) {
hudakz 3:5e31b4687aad 294 if (isPublish && len -*lengthLength - 2 > skip) {
hudakz 3:5e31b4687aad 295 this->_stream->putc(digit);
hudakz 0:83c732b10e95 296 }
hudakz 0:83c732b10e95 297 }
hudakz 2:ca17c5d846e7 298
hudakz 3:5e31b4687aad 299 if (len < MQTT_MAX_PACKET_SIZE) {
hudakz 3:5e31b4687aad 300 _buffer[len] = digit;
hudakz 0:83c732b10e95 301 }
hudakz 2:ca17c5d846e7 302
hudakz 0:83c732b10e95 303 len++;
hudakz 0:83c732b10e95 304 }
hudakz 0:83c732b10e95 305
hudakz 3:5e31b4687aad 306 if (!this->_stream && len > MQTT_MAX_PACKET_SIZE) {
hudakz 2:ca17c5d846e7 307 len = 0; // This will cause the packet to be ignored.
hudakz 0:83c732b10e95 308 }
hudakz 0:83c732b10e95 309
hudakz 0:83c732b10e95 310 return len;
hudakz 0:83c732b10e95 311 }
hudakz 0:83c732b10e95 312
hudakz 2:ca17c5d846e7 313 /**
hudakz 2:ca17c5d846e7 314 * @brief
hudakz 2:ca17c5d846e7 315 * @note
hudakz 2:ca17c5d846e7 316 * @param
hudakz 2:ca17c5d846e7 317 * @retval
hudakz 2:ca17c5d846e7 318 */
hudakz 3:5e31b4687aad 319 bool MQTTClient::loop(void)
hudakz 3:5e31b4687aad 320 {
hudakz 3:5e31b4687aad 321 if (connected()) {
hudakz 2:ca17c5d846e7 322 unsigned long t = time(NULL);
hudakz 3:5e31b4687aad 323 if ((t - _lastInActivity > MQTT_KEEPALIVE) || (t - _lastOutActivity > MQTT_KEEPALIVE)) {
hudakz 3:5e31b4687aad 324 if (_pingOutstanding) {
hudakz 0:83c732b10e95 325 _client->stop();
hudakz 0:83c732b10e95 326 return false;
hudakz 2:ca17c5d846e7 327 }
hudakz 2:ca17c5d846e7 328 else {
hudakz 3:5e31b4687aad 329 _buffer[0] = MQTTPINGREQ;
hudakz 3:5e31b4687aad 330 _buffer[1] = 0;
hudakz 3:5e31b4687aad 331 _client->send(_buffer, 2);
hudakz 3:5e31b4687aad 332 _lastOutActivity = t;
hudakz 3:5e31b4687aad 333 _lastInActivity = t;
hudakz 3:5e31b4687aad 334 _pingOutstanding = true;
hudakz 0:83c732b10e95 335 }
hudakz 0:83c732b10e95 336 }
hudakz 2:ca17c5d846e7 337
hudakz 3:5e31b4687aad 338 if (_client->available()) {
hudakz 2:ca17c5d846e7 339 uint8_t llen;
hudakz 3:5e31b4687aad 340 uint16_t len = _readPacket(&llen);
hudakz 2:ca17c5d846e7 341 uint16_t msgId = 0;
hudakz 2:ca17c5d846e7 342 uint8_t* payload;
hudakz 3:5e31b4687aad 343 if (len > 0) {
hudakz 3:5e31b4687aad 344 _lastInActivity = t;
hudakz 2:ca17c5d846e7 345
hudakz 3:5e31b4687aad 346 //printf("Here I am\r\n");
hudakz 3:5e31b4687aad 347 uint8_t type = _buffer[0] & 0xF0;
hudakz 3:5e31b4687aad 348 if (type == MQTTPUBLISH) {
hudakz 3:5e31b4687aad 349 if (_onMessage) {
hudakz 3:5e31b4687aad 350 uint16_t tl = (_buffer[llen + 1] << 8) + _buffer[llen + 2];
hudakz 2:ca17c5d846e7 351 char topic[tl + 1];
hudakz 3:5e31b4687aad 352 for (uint16_t i = 0; i < tl; i++) {
hudakz 3:5e31b4687aad 353 topic[i] = _buffer[llen + 3 + i];
hudakz 0:83c732b10e95 354 }
hudakz 2:ca17c5d846e7 355
hudakz 0:83c732b10e95 356 topic[tl] = 0;
hudakz 2:ca17c5d846e7 357
hudakz 0:83c732b10e95 358 // msgId only present for QOS>0
hudakz 3:5e31b4687aad 359 if ((_buffer[0] & 0x06) == MQTTQOS1) {
hudakz 3:5e31b4687aad 360 msgId = (_buffer[llen + 3 + tl] << 8) + _buffer[llen + 3 + tl + 1];
hudakz 3:5e31b4687aad 361 payload = _buffer + llen + 3 + tl + 2;
hudakz 3:5e31b4687aad 362 _onMessage(topic, payload, len - llen - 3 - tl - 2);
hudakz 0:83c732b10e95 363
hudakz 3:5e31b4687aad 364 _buffer[0] = MQTTPUBACK;
hudakz 3:5e31b4687aad 365 _buffer[1] = 2;
hudakz 3:5e31b4687aad 366 _buffer[2] = (msgId >> 8);
hudakz 3:5e31b4687aad 367 _buffer[3] = (msgId & 0xFF);
hudakz 3:5e31b4687aad 368 _client->send(_buffer, 4);
hudakz 3:5e31b4687aad 369 _lastOutActivity = t;
hudakz 2:ca17c5d846e7 370 }
hudakz 2:ca17c5d846e7 371 else {
hudakz 3:5e31b4687aad 372 payload = _buffer + llen + 3 + tl;
hudakz 3:5e31b4687aad 373 _onMessage(topic, payload, len - llen - 3 - tl);
hudakz 0:83c732b10e95 374 }
hudakz 0:83c732b10e95 375 }
hudakz 2:ca17c5d846e7 376 }
hudakz 2:ca17c5d846e7 377 else
hudakz 3:5e31b4687aad 378 if (type == MQTTPINGREQ) {
hudakz 3:5e31b4687aad 379 _buffer[0] = MQTTPINGRESP;
hudakz 3:5e31b4687aad 380 _buffer[1] = 0;
hudakz 3:5e31b4687aad 381 _client->send(_buffer, 2);
hudakz 2:ca17c5d846e7 382 }
hudakz 2:ca17c5d846e7 383 else
hudakz 3:5e31b4687aad 384 if (type == MQTTPINGRESP) {
hudakz 3:5e31b4687aad 385 _pingOutstanding = false;
hudakz 0:83c732b10e95 386 }
hudakz 0:83c732b10e95 387 }
hudakz 0:83c732b10e95 388 }
hudakz 2:ca17c5d846e7 389
hudakz 0:83c732b10e95 390 return true;
hudakz 0:83c732b10e95 391 }
hudakz 2:ca17c5d846e7 392
hudakz 0:83c732b10e95 393 return false;
hudakz 0:83c732b10e95 394 }
hudakz 0:83c732b10e95 395
hudakz 2:ca17c5d846e7 396 /**
hudakz 2:ca17c5d846e7 397 * @brief
hudakz 2:ca17c5d846e7 398 * @note
hudakz 2:ca17c5d846e7 399 * @param
hudakz 2:ca17c5d846e7 400 * @retval
hudakz 2:ca17c5d846e7 401 */
hudakz 3:5e31b4687aad 402 bool MQTTClient::publish(const char* topic, const char* payload)
hudakz 3:5e31b4687aad 403 {
hudakz 2:ca17c5d846e7 404 return publish(topic, (uint8_t*)payload, strlen(payload), false);
hudakz 0:83c732b10e95 405 }
hudakz 0:83c732b10e95 406
hudakz 2:ca17c5d846e7 407 /**
hudakz 2:ca17c5d846e7 408 * @brief
hudakz 2:ca17c5d846e7 409 * @note
hudakz 2:ca17c5d846e7 410 * @param
hudakz 2:ca17c5d846e7 411 * @retval
hudakz 2:ca17c5d846e7 412 */
hudakz 3:5e31b4687aad 413 bool MQTTClient::publish(const char* topic, uint8_t* payload, unsigned int plength)
hudakz 3:5e31b4687aad 414 {
hudakz 0:83c732b10e95 415 return publish(topic, payload, plength, false);
hudakz 0:83c732b10e95 416 }
hudakz 0:83c732b10e95 417
hudakz 2:ca17c5d846e7 418 /**
hudakz 2:ca17c5d846e7 419 * @brief
hudakz 2:ca17c5d846e7 420 * @note
hudakz 2:ca17c5d846e7 421 * @param
hudakz 2:ca17c5d846e7 422 * @retval
hudakz 2:ca17c5d846e7 423 */
hudakz 3:5e31b4687aad 424 bool MQTTClient::publish(const char* topic, uint8_t* payload, unsigned int plength, bool retained)
hudakz 3:5e31b4687aad 425 {
hudakz 3:5e31b4687aad 426 if (connected()) {
hudakz 0:83c732b10e95 427 // Leave room in the buffer for header and variable length field
hudakz 2:ca17c5d846e7 428 uint16_t length = 5;
hudakz 3:5e31b4687aad 429 length = _writeString(topic, _buffer, length);
hudakz 2:ca17c5d846e7 430
hudakz 2:ca17c5d846e7 431 uint16_t i;
hudakz 3:5e31b4687aad 432 for (i = 0; i < plength; i++) {
hudakz 3:5e31b4687aad 433 _buffer[length++] = payload[i];
hudakz 0:83c732b10e95 434 }
hudakz 2:ca17c5d846e7 435
hudakz 0:83c732b10e95 436 uint8_t header = MQTTPUBLISH;
hudakz 3:5e31b4687aad 437 if (retained) {
hudakz 0:83c732b10e95 438 header |= 1;
hudakz 0:83c732b10e95 439 }
hudakz 2:ca17c5d846e7 440
hudakz 3:5e31b4687aad 441 return _write(header, _buffer, length - 5);
hudakz 0:83c732b10e95 442 }
hudakz 2:ca17c5d846e7 443
hudakz 0:83c732b10e95 444 return false;
hudakz 0:83c732b10e95 445 }
hudakz 0:83c732b10e95 446
hudakz 2:ca17c5d846e7 447 /**
hudakz 2:ca17c5d846e7 448 * @brief
hudakz 2:ca17c5d846e7 449 * @note
hudakz 2:ca17c5d846e7 450 * @param
hudakz 2:ca17c5d846e7 451 * @retval
hudakz 2:ca17c5d846e7 452 */
hudakz 3:5e31b4687aad 453 bool MQTTClient::_write(uint8_t header, uint8_t* buf, uint16_t length)
hudakz 3:5e31b4687aad 454 {
hudakz 0:83c732b10e95 455 uint8_t lenBuf[4];
hudakz 0:83c732b10e95 456 uint8_t llen = 0;
hudakz 0:83c732b10e95 457 uint8_t digit;
hudakz 0:83c732b10e95 458 uint8_t pos = 0;
hudakz 0:83c732b10e95 459 uint8_t rc;
hudakz 0:83c732b10e95 460 uint8_t len = length;
hudakz 3:5e31b4687aad 461 do {
hudakz 0:83c732b10e95 462 digit = len % 128;
hudakz 0:83c732b10e95 463 len = len / 128;
hudakz 3:5e31b4687aad 464 if (len > 0) {
hudakz 0:83c732b10e95 465 digit |= 0x80;
hudakz 0:83c732b10e95 466 }
hudakz 2:ca17c5d846e7 467
hudakz 0:83c732b10e95 468 lenBuf[pos++] = digit;
hudakz 0:83c732b10e95 469 llen++;
hudakz 3:5e31b4687aad 470 } while (len > 0);
hudakz 0:83c732b10e95 471
hudakz 2:ca17c5d846e7 472 buf[4 - llen] = header;
hudakz 3:5e31b4687aad 473 for (int i = 0; i < llen; i++) {
hudakz 2:ca17c5d846e7 474 buf[5 - llen + i] = lenBuf[i];
hudakz 0:83c732b10e95 475 }
hudakz 2:ca17c5d846e7 476
hudakz 3:5e31b4687aad 477 rc = _client->send(buf + (4 - llen), length + 1 + llen);
hudakz 0:83c732b10e95 478
hudakz 3:5e31b4687aad 479 _lastOutActivity = time(NULL);
hudakz 2:ca17c5d846e7 480 return(rc == 1 + llen + length);
hudakz 0:83c732b10e95 481 }
hudakz 0:83c732b10e95 482
hudakz 2:ca17c5d846e7 483 /**
hudakz 2:ca17c5d846e7 484 * @brief
hudakz 2:ca17c5d846e7 485 * @note
hudakz 2:ca17c5d846e7 486 * @param
hudakz 2:ca17c5d846e7 487 * @retval
hudakz 2:ca17c5d846e7 488 */
hudakz 3:5e31b4687aad 489 bool MQTTClient::subscribe(const char* topic)
hudakz 3:5e31b4687aad 490 {
hudakz 2:ca17c5d846e7 491 bool result = subscribe(topic, 0);
hudakz 2:ca17c5d846e7 492 wait_ms(50);
hudakz 2:ca17c5d846e7 493 return result;
hudakz 0:83c732b10e95 494 }
hudakz 0:83c732b10e95 495
hudakz 2:ca17c5d846e7 496 /**
hudakz 2:ca17c5d846e7 497 * @brief
hudakz 2:ca17c5d846e7 498 * @note
hudakz 2:ca17c5d846e7 499 * @param
hudakz 2:ca17c5d846e7 500 * @retval
hudakz 2:ca17c5d846e7 501 */
hudakz 3:5e31b4687aad 502 bool MQTTClient::subscribe(const char* topic, uint8_t qos)
hudakz 3:5e31b4687aad 503 {
hudakz 3:5e31b4687aad 504 if (qos > 1)
hudakz 0:83c732b10e95 505 return false;
hudakz 0:83c732b10e95 506
hudakz 3:5e31b4687aad 507 if (connected()) {
hudakz 0:83c732b10e95 508 // Leave room in the buffer for header and variable length field
hudakz 2:ca17c5d846e7 509 uint16_t length = 5;
hudakz 3:5e31b4687aad 510 _nextMsgId++;
hudakz 3:5e31b4687aad 511 if (_nextMsgId == 0) {
hudakz 3:5e31b4687aad 512 _nextMsgId = 1;
hudakz 0:83c732b10e95 513 }
hudakz 2:ca17c5d846e7 514
hudakz 3:5e31b4687aad 515 _buffer[length++] = (_nextMsgId >> 8);
hudakz 3:5e31b4687aad 516 _buffer[length++] = (_nextMsgId & 0xFF);
hudakz 3:5e31b4687aad 517 length = _writeString(topic, _buffer, length);
hudakz 3:5e31b4687aad 518 _buffer[length++] = qos;
hudakz 3:5e31b4687aad 519 return _write(MQTTSUBSCRIBE | MQTTQOS1, _buffer, length - 5);
hudakz 0:83c732b10e95 520 }
hudakz 2:ca17c5d846e7 521
hudakz 0:83c732b10e95 522 return false;
hudakz 0:83c732b10e95 523 }
hudakz 0:83c732b10e95 524
hudakz 2:ca17c5d846e7 525 /**
hudakz 2:ca17c5d846e7 526 * @brief
hudakz 2:ca17c5d846e7 527 * @note
hudakz 2:ca17c5d846e7 528 * @param
hudakz 2:ca17c5d846e7 529 * @retval
hudakz 2:ca17c5d846e7 530 */
hudakz 3:5e31b4687aad 531 bool MQTTClient::unsubscribe(const char* topic)
hudakz 3:5e31b4687aad 532 {
hudakz 3:5e31b4687aad 533 if (connected()) {
hudakz 2:ca17c5d846e7 534 uint16_t length = 5;
hudakz 3:5e31b4687aad 535 _nextMsgId++;
hudakz 3:5e31b4687aad 536 if (_nextMsgId == 0) {
hudakz 3:5e31b4687aad 537 _nextMsgId = 1;
hudakz 0:83c732b10e95 538 }
hudakz 2:ca17c5d846e7 539
hudakz 3:5e31b4687aad 540 _buffer[length++] = (_nextMsgId >> 8);
hudakz 3:5e31b4687aad 541 _buffer[length++] = (_nextMsgId & 0xFF);
hudakz 3:5e31b4687aad 542 length = _writeString(topic, _buffer, length);
hudakz 3:5e31b4687aad 543 return _write(MQTTUNSUBSCRIBE | MQTTQOS1, _buffer, length - 5);
hudakz 0:83c732b10e95 544 }
hudakz 2:ca17c5d846e7 545
hudakz 0:83c732b10e95 546 return false;
hudakz 0:83c732b10e95 547 }
hudakz 0:83c732b10e95 548
hudakz 2:ca17c5d846e7 549 /**
hudakz 2:ca17c5d846e7 550 * @brief
hudakz 2:ca17c5d846e7 551 * @note
hudakz 2:ca17c5d846e7 552 * @param
hudakz 2:ca17c5d846e7 553 * @retval
hudakz 2:ca17c5d846e7 554 */
hudakz 3:5e31b4687aad 555 void MQTTClient::disconnect(void)
hudakz 3:5e31b4687aad 556 {
hudakz 3:5e31b4687aad 557 _buffer[0] = MQTTDISCONNECT;
hudakz 3:5e31b4687aad 558 _buffer[1] = 0;
hudakz 3:5e31b4687aad 559 _client->send(_buffer, 2);
hudakz 0:83c732b10e95 560 _client->stop();
hudakz 3:5e31b4687aad 561 _lastInActivity = _lastOutActivity = time(NULL);
hudakz 0:83c732b10e95 562 }
hudakz 0:83c732b10e95 563
hudakz 2:ca17c5d846e7 564 /**
hudakz 2:ca17c5d846e7 565 * @brief
hudakz 2:ca17c5d846e7 566 * @note
hudakz 2:ca17c5d846e7 567 * @param
hudakz 2:ca17c5d846e7 568 * @retval
hudakz 2:ca17c5d846e7 569 */
hudakz 3:5e31b4687aad 570 uint16_t MQTTClient::_writeString(const char* string, uint8_t* buf, uint16_t pos)
hudakz 3:5e31b4687aad 571 {
hudakz 3:5e31b4687aad 572 char* idp = (char*)string;
hudakz 2:ca17c5d846e7 573 uint16_t i = 0;
hudakz 3:5e31b4687aad 574
hudakz 0:83c732b10e95 575 pos += 2;
hudakz 3:5e31b4687aad 576 while (*idp) {
hudakz 0:83c732b10e95 577 buf[pos++] = *idp++;
hudakz 0:83c732b10e95 578 i++;
hudakz 0:83c732b10e95 579 }
hudakz 2:ca17c5d846e7 580
hudakz 2:ca17c5d846e7 581 buf[pos - i - 2] = (i >> 8);
hudakz 2:ca17c5d846e7 582 buf[pos - i - 1] = (i & 0xFF);
hudakz 0:83c732b10e95 583 return pos;
hudakz 0:83c732b10e95 584 }
hudakz 0:83c732b10e95 585
hudakz 2:ca17c5d846e7 586 /**
hudakz 2:ca17c5d846e7 587 * @brief
hudakz 2:ca17c5d846e7 588 * @note
hudakz 2:ca17c5d846e7 589 * @param
hudakz 2:ca17c5d846e7 590 * @retval
hudakz 2:ca17c5d846e7 591 */
hudakz 3:5e31b4687aad 592 bool MQTTClient::connected(void)
hudakz 3:5e31b4687aad 593 {
hudakz 2:ca17c5d846e7 594 bool rc;
hudakz 3:5e31b4687aad 595 if (_client == NULL) {
hudakz 2:ca17c5d846e7 596 rc = false;
hudakz 2:ca17c5d846e7 597 }
hudakz 2:ca17c5d846e7 598 else {
hudakz 2:ca17c5d846e7 599 rc = (int)_client->connected();
hudakz 3:5e31b4687aad 600 if (!rc)
hudakz 2:ca17c5d846e7 601 _client->stop();
hudakz 2:ca17c5d846e7 602 }
hudakz 0:83c732b10e95 603
hudakz 0:83c732b10e95 604 return rc;
hudakz 0:83c732b10e95 605 }