Ivo Noorhoff / MQTTClient
Committer:
hudakz
Date:
Sun Nov 29 18:12:22 2015 +0000
Revision:
2:ca17c5d846e7
Parent:
1:87da395325fc
Child:
3:5e31b4687aad
Modified to make it work with the latest mbed library.

Who changed what in which revision?

UserRevisionLine numberNew contents of line
hudakz 0:83c732b10e95 1 /*
hudakz 0:83c732b10e95 2 MQTTClient.cpp - A simple client for MQTT.
hudakz 0:83c732b10e95 3 Nicholas O'Leary
hudakz 0:83c732b10e95 4 http://knolleary.net
hudakz 2:ca17c5d846e7 5
hudakz 1:87da395325fc 6 Ported to mbed by Zoltan Hudak <hudakz@inbox.com>
hudakz 0:83c732b10e95 7 */
hudakz 0:83c732b10e95 8 #include "MQTTClient.h"
hudakz 0:83c732b10e95 9 #include <string.h>
hudakz 0:83c732b10e95 10 #include <time.h>
hudakz 0:83c732b10e95 11
hudakz 2:ca17c5d846e7 12 /**
hudakz 2:ca17c5d846e7 13 * @brief
hudakz 2:ca17c5d846e7 14 * @note
hudakz 2:ca17c5d846e7 15 * @param
hudakz 2:ca17c5d846e7 16 * @retval
hudakz 2:ca17c5d846e7 17 */
hudakz 2:ca17c5d846e7 18 MQTTClient::MQTTClient(void) {
hudakz 0:83c732b10e95 19 this->_client = NULL;
hudakz 0:83c732b10e95 20 this->stream = NULL;
hudakz 0:83c732b10e95 21 }
hudakz 0:83c732b10e95 22
hudakz 2:ca17c5d846e7 23 /**
hudakz 2:ca17c5d846e7 24 * @brief
hudakz 2:ca17c5d846e7 25 * @note
hudakz 2:ca17c5d846e7 26 * @param
hudakz 2:ca17c5d846e7 27 * @retval
hudakz 2:ca17c5d846e7 28 */
hudakz 2:ca17c5d846e7 29 MQTTClient::MQTTClient
hudakz 2:ca17c5d846e7 30 (
hudakz 2:ca17c5d846e7 31 IPAddress& ip,
hudakz 2:ca17c5d846e7 32 uint16_t port,
hudakz 2:ca17c5d846e7 33 void (*onMessage) (char*, uint8_t*, unsigned int),
hudakz 2:ca17c5d846e7 34 Client& client
hudakz 2:ca17c5d846e7 35 ) {
hudakz 0:83c732b10e95 36 this->_client = &client;
hudakz 0:83c732b10e95 37 this->onMessage = onMessage;
hudakz 0:83c732b10e95 38 this->ip = ip;
hudakz 0:83c732b10e95 39 this->port = port;
hudakz 0:83c732b10e95 40 this->domain = NULL;
hudakz 0:83c732b10e95 41 this->stream = NULL;
hudakz 0:83c732b10e95 42 }
hudakz 0:83c732b10e95 43
hudakz 2:ca17c5d846e7 44 /**
hudakz 2:ca17c5d846e7 45 * @brief
hudakz 2:ca17c5d846e7 46 * @note
hudakz 2:ca17c5d846e7 47 * @param
hudakz 2:ca17c5d846e7 48 * @retval
hudakz 2:ca17c5d846e7 49 */
hudakz 2:ca17c5d846e7 50 MQTTClient::MQTTClient(char* domain, uint16_t port, void (*onMessage) (char*, uint8_t*, unsigned int), Client& client) {
hudakz 0:83c732b10e95 51 this->_client = &client;
hudakz 0:83c732b10e95 52 this->onMessage = onMessage;
hudakz 0:83c732b10e95 53 this->domain = domain;
hudakz 0:83c732b10e95 54 this->port = port;
hudakz 0:83c732b10e95 55 this->stream = NULL;
hudakz 0:83c732b10e95 56 }
hudakz 0:83c732b10e95 57
hudakz 2:ca17c5d846e7 58 /**
hudakz 2:ca17c5d846e7 59 * @brief
hudakz 2:ca17c5d846e7 60 * @note
hudakz 2:ca17c5d846e7 61 * @param
hudakz 2:ca17c5d846e7 62 * @retval
hudakz 2:ca17c5d846e7 63 */
hudakz 2:ca17c5d846e7 64 MQTTClient::MQTTClient
hudakz 2:ca17c5d846e7 65 (
hudakz 2:ca17c5d846e7 66 IPAddress& ip,
hudakz 2:ca17c5d846e7 67 uint16_t port,
hudakz 2:ca17c5d846e7 68 void (*onMessage) (char*, uint8_t*, unsigned int),
hudakz 2:ca17c5d846e7 69 Client& client,
hudakz 2:ca17c5d846e7 70 Stream& stream
hudakz 2:ca17c5d846e7 71 ) {
hudakz 0:83c732b10e95 72 this->_client = &client;
hudakz 0:83c732b10e95 73 this->onMessage = onMessage;
hudakz 0:83c732b10e95 74 this->ip = ip;
hudakz 0:83c732b10e95 75 this->port = port;
hudakz 0:83c732b10e95 76 this->domain = NULL;
hudakz 0:83c732b10e95 77 this->stream = &stream;
hudakz 0:83c732b10e95 78 }
hudakz 0:83c732b10e95 79
hudakz 2:ca17c5d846e7 80 /**
hudakz 2:ca17c5d846e7 81 * @brief
hudakz 2:ca17c5d846e7 82 * @note
hudakz 2:ca17c5d846e7 83 * @param
hudakz 2:ca17c5d846e7 84 * @retval
hudakz 2:ca17c5d846e7 85 */
hudakz 2:ca17c5d846e7 86 MQTTClient::MQTTClient
hudakz 2:ca17c5d846e7 87 (
hudakz 2:ca17c5d846e7 88 char* domain,
hudakz 2:ca17c5d846e7 89 uint16_t port,
hudakz 2:ca17c5d846e7 90 void (*onMessage) (char*, uint8_t*, unsigned int),
hudakz 2:ca17c5d846e7 91 Client& client,
hudakz 2:ca17c5d846e7 92 Stream& stream
hudakz 2:ca17c5d846e7 93 ) {
hudakz 0:83c732b10e95 94 this->_client = &client;
hudakz 0:83c732b10e95 95 this->onMessage = onMessage;
hudakz 0:83c732b10e95 96 this->domain = domain;
hudakz 0:83c732b10e95 97 this->port = port;
hudakz 0:83c732b10e95 98 this->stream = &stream;
hudakz 0:83c732b10e95 99 }
hudakz 0:83c732b10e95 100
hudakz 2:ca17c5d846e7 101 /**
hudakz 2:ca17c5d846e7 102 * @brief
hudakz 2:ca17c5d846e7 103 * @note
hudakz 2:ca17c5d846e7 104 * @param
hudakz 2:ca17c5d846e7 105 * @retval
hudakz 2:ca17c5d846e7 106 */
hudakz 2:ca17c5d846e7 107 bool MQTTClient::connect(char* id) {
hudakz 2:ca17c5d846e7 108 return connect(id, NULL, NULL, 0, 0, 0, 0);
hudakz 0:83c732b10e95 109 }
hudakz 0:83c732b10e95 110
hudakz 2:ca17c5d846e7 111 /**
hudakz 2:ca17c5d846e7 112 * @brief
hudakz 2:ca17c5d846e7 113 * @note
hudakz 2:ca17c5d846e7 114 * @param
hudakz 2:ca17c5d846e7 115 * @retval
hudakz 2:ca17c5d846e7 116 */
hudakz 2:ca17c5d846e7 117 bool MQTTClient::connect(char* id, char* user, char* pass) {
hudakz 2:ca17c5d846e7 118 return connect(id, user, pass, 0, 0, 0, 0);
hudakz 0:83c732b10e95 119 }
hudakz 0:83c732b10e95 120
hudakz 2:ca17c5d846e7 121 /**
hudakz 2:ca17c5d846e7 122 * @brief
hudakz 2:ca17c5d846e7 123 * @note
hudakz 2:ca17c5d846e7 124 * @param
hudakz 2:ca17c5d846e7 125 * @retval
hudakz 2:ca17c5d846e7 126 */
hudakz 2:ca17c5d846e7 127 bool MQTTClient::connect(char* id, char* willTopic, uint8_t willQos, uint8_t willRetain, char* willMessage) {
hudakz 2:ca17c5d846e7 128 return connect(id, NULL, NULL, willTopic, willQos, willRetain, willMessage);
hudakz 0:83c732b10e95 129 }
hudakz 0:83c732b10e95 130
hudakz 2:ca17c5d846e7 131 /**
hudakz 2:ca17c5d846e7 132 * @brief
hudakz 2:ca17c5d846e7 133 * @note
hudakz 2:ca17c5d846e7 134 * @param
hudakz 2:ca17c5d846e7 135 * @retval
hudakz 2:ca17c5d846e7 136 */
hudakz 2:ca17c5d846e7 137 bool MQTTClient::connect
hudakz 2:ca17c5d846e7 138 (
hudakz 2:ca17c5d846e7 139 char* id,
hudakz 2:ca17c5d846e7 140 char* user,
hudakz 2:ca17c5d846e7 141 char* pass,
hudakz 2:ca17c5d846e7 142 char* willTopic,
hudakz 2:ca17c5d846e7 143 uint8_t willQos,
hudakz 2:ca17c5d846e7 144 uint8_t willRetain,
hudakz 2:ca17c5d846e7 145 char* willMessage
hudakz 2:ca17c5d846e7 146 ) {
hudakz 2:ca17c5d846e7 147 if(!connected()) {
hudakz 0:83c732b10e95 148 int result = 0;
hudakz 0:83c732b10e95 149
hudakz 2:ca17c5d846e7 150 if(domain != NULL) {
hudakz 0:83c732b10e95 151 result = _client->connect(this->domain, this->port);
hudakz 2:ca17c5d846e7 152 }
hudakz 2:ca17c5d846e7 153 else {
hudakz 0:83c732b10e95 154 result = _client->connect(this->ip, this->port);
hudakz 0:83c732b10e95 155 }
hudakz 0:83c732b10e95 156
hudakz 2:ca17c5d846e7 157 if(result) {
hudakz 0:83c732b10e95 158 nextMsgId = 1;
hudakz 2:ca17c5d846e7 159
hudakz 2:ca17c5d846e7 160 uint8_t d[9] = { 0x00, 0x06, 'M', 'Q', 'I', 's', 'd', 'p', MQTTPROTOCOLVERSION };
hudakz 2:ca17c5d846e7 161
hudakz 0:83c732b10e95 162 // Leave room in the buffer for header and variable length field
hudakz 2:ca17c5d846e7 163 uint16_t length = 5;
hudakz 2:ca17c5d846e7 164 unsigned int j;
hudakz 2:ca17c5d846e7 165 for(j = 0; j < 9; j++) {
hudakz 0:83c732b10e95 166 buffer[length++] = d[j];
hudakz 0:83c732b10e95 167 }
hudakz 0:83c732b10e95 168
hudakz 0:83c732b10e95 169 uint8_t v;
hudakz 2:ca17c5d846e7 170 if(willTopic) {
hudakz 2:ca17c5d846e7 171 v = 0x06 | (willQos << 3) | (willRetain << 5);
hudakz 2:ca17c5d846e7 172 }
hudakz 2:ca17c5d846e7 173 else {
hudakz 0:83c732b10e95 174 v = 0x02;
hudakz 0:83c732b10e95 175 }
hudakz 0:83c732b10e95 176
hudakz 0:83c732b10e95 177 if(user != NULL) {
hudakz 2:ca17c5d846e7 178 v = v | 0x80;
hudakz 0:83c732b10e95 179
hudakz 0:83c732b10e95 180 if(pass != NULL) {
hudakz 2:ca17c5d846e7 181 v = v | (0x80 >> 1);
hudakz 0:83c732b10e95 182 }
hudakz 0:83c732b10e95 183 }
hudakz 0:83c732b10e95 184
hudakz 0:83c732b10e95 185 buffer[length++] = v;
hudakz 0:83c732b10e95 186
hudakz 0:83c732b10e95 187 buffer[length++] = ((MQTT_KEEPALIVE) >> 8);
hudakz 0:83c732b10e95 188 buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF);
hudakz 2:ca17c5d846e7 189 length = writeString(id, buffer, length);
hudakz 2:ca17c5d846e7 190 if(willTopic) {
hudakz 2:ca17c5d846e7 191 length = writeString(willTopic, buffer, length);
hudakz 2:ca17c5d846e7 192 length = writeString(willMessage, buffer, length);
hudakz 0:83c732b10e95 193 }
hudakz 0:83c732b10e95 194
hudakz 0:83c732b10e95 195 if(user != NULL) {
hudakz 2:ca17c5d846e7 196 length = writeString(user, buffer, length);
hudakz 0:83c732b10e95 197 if(pass != NULL) {
hudakz 2:ca17c5d846e7 198 length = writeString(pass, buffer, length);
hudakz 0:83c732b10e95 199 }
hudakz 0:83c732b10e95 200 }
hudakz 0:83c732b10e95 201
hudakz 2:ca17c5d846e7 202 write(MQTTCONNECT, buffer, length - 5);
hudakz 0:83c732b10e95 203
hudakz 0:83c732b10e95 204 lastInActivity = lastOutActivity = time(NULL);
hudakz 0:83c732b10e95 205
hudakz 2:ca17c5d846e7 206 while(!_client->available()) {
hudakz 2:ca17c5d846e7 207 unsigned long t = time(NULL);
hudakz 2:ca17c5d846e7 208 if(t - lastInActivity > MQTT_KEEPALIVE) {
hudakz 0:83c732b10e95 209 _client->stop();
hudakz 0:83c732b10e95 210 return false;
hudakz 0:83c732b10e95 211 }
hudakz 0:83c732b10e95 212 }
hudakz 0:83c732b10e95 213
hudakz 2:ca17c5d846e7 214 uint8_t llen;
hudakz 2:ca17c5d846e7 215 uint16_t len = readPacket(&llen);
hudakz 2:ca17c5d846e7 216
hudakz 2:ca17c5d846e7 217 if(len == 4 && buffer[3] == 0) {
hudakz 0:83c732b10e95 218 lastInActivity = time(NULL);
hudakz 0:83c732b10e95 219 pingOutstanding = false;
hudakz 0:83c732b10e95 220 return true;
hudakz 0:83c732b10e95 221 }
hudakz 0:83c732b10e95 222 }
hudakz 2:ca17c5d846e7 223
hudakz 0:83c732b10e95 224 _client->stop();
hudakz 0:83c732b10e95 225 }
hudakz 2:ca17c5d846e7 226
hudakz 0:83c732b10e95 227 return false;
hudakz 0:83c732b10e95 228 }
hudakz 0:83c732b10e95 229
hudakz 2:ca17c5d846e7 230 /**
hudakz 2:ca17c5d846e7 231 * @brief
hudakz 2:ca17c5d846e7 232 * @note
hudakz 2:ca17c5d846e7 233 * @param
hudakz 2:ca17c5d846e7 234 * @retval
hudakz 2:ca17c5d846e7 235 */
hudakz 2:ca17c5d846e7 236 uint8_t MQTTClient::readByte(void) {
hudakz 2:ca17c5d846e7 237 while(!_client->available()) { }
hudakz 2:ca17c5d846e7 238
hudakz 0:83c732b10e95 239 return _client->read();
hudakz 0:83c732b10e95 240 }
hudakz 0:83c732b10e95 241
hudakz 2:ca17c5d846e7 242 /**
hudakz 2:ca17c5d846e7 243 * @brief
hudakz 2:ca17c5d846e7 244 * @note
hudakz 2:ca17c5d846e7 245 * @param
hudakz 2:ca17c5d846e7 246 * @retval
hudakz 2:ca17c5d846e7 247 */
hudakz 2:ca17c5d846e7 248 uint16_t MQTTClient::readPacket(uint8_t* lengthLength) {
hudakz 2:ca17c5d846e7 249 uint16_t len = 0;
hudakz 0:83c732b10e95 250 buffer[len++] = readByte();
hudakz 0:83c732b10e95 251
hudakz 2:ca17c5d846e7 252 bool isPublish = (buffer[0] & 0xF0) == MQTTPUBLISH;
hudakz 2:ca17c5d846e7 253 uint32_t multiplier = 1;
hudakz 2:ca17c5d846e7 254 uint16_t length = 0;
hudakz 2:ca17c5d846e7 255 uint8_t digit = 0;
hudakz 2:ca17c5d846e7 256 uint16_t skip = 0;
hudakz 2:ca17c5d846e7 257 uint8_t start = 0;
hudakz 2:ca17c5d846e7 258
hudakz 2:ca17c5d846e7 259 do
hudakz 2:ca17c5d846e7 260 {
hudakz 0:83c732b10e95 261 digit = readByte();
hudakz 0:83c732b10e95 262 buffer[len++] = digit;
hudakz 0:83c732b10e95 263 length += (digit & 127) * multiplier;
hudakz 0:83c732b10e95 264 multiplier *= 128;
hudakz 2:ca17c5d846e7 265 } while((digit & 128) != 0);
hudakz 2:ca17c5d846e7 266 *lengthLength = len - 1;
hudakz 0:83c732b10e95 267
hudakz 2:ca17c5d846e7 268 if(isPublish) {
hudakz 2:ca17c5d846e7 269
hudakz 0:83c732b10e95 270 // Read in topic length to calculate bytes to skip over for Stream writing
hudakz 0:83c732b10e95 271 buffer[len++] = readByte();
hudakz 0:83c732b10e95 272 buffer[len++] = readByte();
hudakz 2:ca17c5d846e7 273 skip = (buffer[*lengthLength + 1] << 8) + buffer[*lengthLength + 2];
hudakz 0:83c732b10e95 274 start = 2;
hudakz 2:ca17c5d846e7 275 if(buffer[0] & MQTTQOS1) {
hudakz 2:ca17c5d846e7 276
hudakz 0:83c732b10e95 277 // skip message id
hudakz 0:83c732b10e95 278 skip += 2;
hudakz 0:83c732b10e95 279 }
hudakz 0:83c732b10e95 280 }
hudakz 0:83c732b10e95 281
hudakz 2:ca17c5d846e7 282 for(uint16_t i = start; i < length; i++) {
hudakz 0:83c732b10e95 283 digit = readByte();
hudakz 2:ca17c5d846e7 284 if(this->stream) {
hudakz 2:ca17c5d846e7 285 if(isPublish && len -*lengthLength - 2 > skip) {
hudakz 0:83c732b10e95 286 this->stream->putc(digit);
hudakz 0:83c732b10e95 287 }
hudakz 0:83c732b10e95 288 }
hudakz 2:ca17c5d846e7 289
hudakz 2:ca17c5d846e7 290 if(len < MQTT_MAX_PACKET_SIZE) {
hudakz 0:83c732b10e95 291 buffer[len] = digit;
hudakz 0:83c732b10e95 292 }
hudakz 2:ca17c5d846e7 293
hudakz 0:83c732b10e95 294 len++;
hudakz 0:83c732b10e95 295 }
hudakz 0:83c732b10e95 296
hudakz 2:ca17c5d846e7 297 if(!this->stream && len > MQTT_MAX_PACKET_SIZE) {
hudakz 2:ca17c5d846e7 298 len = 0; // This will cause the packet to be ignored.
hudakz 0:83c732b10e95 299 }
hudakz 0:83c732b10e95 300
hudakz 0:83c732b10e95 301 return len;
hudakz 0:83c732b10e95 302 }
hudakz 0:83c732b10e95 303
hudakz 2:ca17c5d846e7 304 /**
hudakz 2:ca17c5d846e7 305 * @brief
hudakz 2:ca17c5d846e7 306 * @note
hudakz 2:ca17c5d846e7 307 * @param
hudakz 2:ca17c5d846e7 308 * @retval
hudakz 2:ca17c5d846e7 309 */
hudakz 2:ca17c5d846e7 310 bool MQTTClient::loop(void) {
hudakz 2:ca17c5d846e7 311 if(connected()) {
hudakz 2:ca17c5d846e7 312 unsigned long t = time(NULL);
hudakz 2:ca17c5d846e7 313 if((t - lastInActivity > MQTT_KEEPALIVE) || (t - lastOutActivity > MQTT_KEEPALIVE)) {
hudakz 2:ca17c5d846e7 314 if(pingOutstanding) {
hudakz 0:83c732b10e95 315 _client->stop();
hudakz 0:83c732b10e95 316 return false;
hudakz 2:ca17c5d846e7 317 }
hudakz 2:ca17c5d846e7 318 else {
hudakz 0:83c732b10e95 319 buffer[0] = MQTTPINGREQ;
hudakz 0:83c732b10e95 320 buffer[1] = 0;
hudakz 2:ca17c5d846e7 321 _client->write(buffer, 2);
hudakz 0:83c732b10e95 322 lastOutActivity = t;
hudakz 0:83c732b10e95 323 lastInActivity = t;
hudakz 0:83c732b10e95 324 pingOutstanding = true;
hudakz 0:83c732b10e95 325 }
hudakz 0:83c732b10e95 326 }
hudakz 2:ca17c5d846e7 327
hudakz 2:ca17c5d846e7 328 if(_client->available()) {
hudakz 2:ca17c5d846e7 329 uint8_t llen;
hudakz 2:ca17c5d846e7 330 uint16_t len = readPacket(&llen);
hudakz 2:ca17c5d846e7 331 uint16_t msgId = 0;
hudakz 2:ca17c5d846e7 332 uint8_t* payload;
hudakz 2:ca17c5d846e7 333 if(len > 0) {
hudakz 0:83c732b10e95 334 lastInActivity = t;
hudakz 2:ca17c5d846e7 335
hudakz 2:ca17c5d846e7 336 uint8_t type = buffer[0] & 0xF0;
hudakz 2:ca17c5d846e7 337 if(type == MQTTPUBLISH) {
hudakz 2:ca17c5d846e7 338 if(onMessage) {
hudakz 2:ca17c5d846e7 339 uint16_t tl = (buffer[llen + 1] << 8) + buffer[llen + 2];
hudakz 2:ca17c5d846e7 340 char topic[tl + 1];
hudakz 2:ca17c5d846e7 341 for(uint16_t i = 0; i < tl; i++) {
hudakz 2:ca17c5d846e7 342 topic[i] = buffer[llen + 3 + i];
hudakz 0:83c732b10e95 343 }
hudakz 2:ca17c5d846e7 344
hudakz 0:83c732b10e95 345 topic[tl] = 0;
hudakz 2:ca17c5d846e7 346
hudakz 0:83c732b10e95 347 // msgId only present for QOS>0
hudakz 2:ca17c5d846e7 348 if((buffer[0] & 0x06) == MQTTQOS1) {
hudakz 2:ca17c5d846e7 349 msgId = (buffer[llen + 3 + tl] << 8) + buffer[llen + 3 + tl + 1];
hudakz 2:ca17c5d846e7 350 payload = buffer + llen + 3 + tl + 2;
hudakz 2:ca17c5d846e7 351 onMessage(topic, payload, len - llen - 3 - tl - 2);
hudakz 0:83c732b10e95 352
hudakz 0:83c732b10e95 353 buffer[0] = MQTTPUBACK;
hudakz 0:83c732b10e95 354 buffer[1] = 2;
hudakz 0:83c732b10e95 355 buffer[2] = (msgId >> 8);
hudakz 0:83c732b10e95 356 buffer[3] = (msgId & 0xFF);
hudakz 2:ca17c5d846e7 357 _client->write(buffer, 4);
hudakz 0:83c732b10e95 358 lastOutActivity = t;
hudakz 2:ca17c5d846e7 359 }
hudakz 2:ca17c5d846e7 360 else {
hudakz 2:ca17c5d846e7 361 payload = buffer + llen + 3 + tl;
hudakz 2:ca17c5d846e7 362 onMessage(topic, payload, len - llen - 3 - tl);
hudakz 0:83c732b10e95 363 }
hudakz 0:83c732b10e95 364 }
hudakz 2:ca17c5d846e7 365 }
hudakz 2:ca17c5d846e7 366 else
hudakz 2:ca17c5d846e7 367 if(type == MQTTPINGREQ) {
hudakz 0:83c732b10e95 368 buffer[0] = MQTTPINGRESP;
hudakz 0:83c732b10e95 369 buffer[1] = 0;
hudakz 2:ca17c5d846e7 370 _client->write(buffer, 2);
hudakz 2:ca17c5d846e7 371 }
hudakz 2:ca17c5d846e7 372 else
hudakz 2:ca17c5d846e7 373 if(type == MQTTPINGRESP) {
hudakz 0:83c732b10e95 374 pingOutstanding = false;
hudakz 0:83c732b10e95 375 }
hudakz 0:83c732b10e95 376 }
hudakz 0:83c732b10e95 377 }
hudakz 2:ca17c5d846e7 378
hudakz 0:83c732b10e95 379 return true;
hudakz 0:83c732b10e95 380 }
hudakz 2:ca17c5d846e7 381
hudakz 0:83c732b10e95 382 return false;
hudakz 0:83c732b10e95 383 }
hudakz 0:83c732b10e95 384
hudakz 2:ca17c5d846e7 385 /**
hudakz 2:ca17c5d846e7 386 * @brief
hudakz 2:ca17c5d846e7 387 * @note
hudakz 2:ca17c5d846e7 388 * @param
hudakz 2:ca17c5d846e7 389 * @retval
hudakz 2:ca17c5d846e7 390 */
hudakz 2:ca17c5d846e7 391 bool MQTTClient::publish(char* topic, char* payload) {
hudakz 2:ca17c5d846e7 392 return publish(topic, (uint8_t*)payload, strlen(payload), false);
hudakz 0:83c732b10e95 393 }
hudakz 0:83c732b10e95 394
hudakz 2:ca17c5d846e7 395 /**
hudakz 2:ca17c5d846e7 396 * @brief
hudakz 2:ca17c5d846e7 397 * @note
hudakz 2:ca17c5d846e7 398 * @param
hudakz 2:ca17c5d846e7 399 * @retval
hudakz 2:ca17c5d846e7 400 */
hudakz 2:ca17c5d846e7 401 bool MQTTClient::publish(char* topic, uint8_t* payload, unsigned int plength) {
hudakz 0:83c732b10e95 402 return publish(topic, payload, plength, false);
hudakz 0:83c732b10e95 403 }
hudakz 0:83c732b10e95 404
hudakz 2:ca17c5d846e7 405 /**
hudakz 2:ca17c5d846e7 406 * @brief
hudakz 2:ca17c5d846e7 407 * @note
hudakz 2:ca17c5d846e7 408 * @param
hudakz 2:ca17c5d846e7 409 * @retval
hudakz 2:ca17c5d846e7 410 */
hudakz 2:ca17c5d846e7 411 bool MQTTClient::publish(char* topic, uint8_t* payload, unsigned int plength, bool retained) {
hudakz 2:ca17c5d846e7 412 if(connected()) {
hudakz 2:ca17c5d846e7 413
hudakz 0:83c732b10e95 414 // Leave room in the buffer for header and variable length field
hudakz 2:ca17c5d846e7 415 uint16_t length = 5;
hudakz 2:ca17c5d846e7 416 length = writeString(topic, buffer, length);
hudakz 2:ca17c5d846e7 417
hudakz 2:ca17c5d846e7 418 uint16_t i;
hudakz 2:ca17c5d846e7 419 for(i = 0; i < plength; i++) {
hudakz 0:83c732b10e95 420 buffer[length++] = payload[i];
hudakz 0:83c732b10e95 421 }
hudakz 2:ca17c5d846e7 422
hudakz 0:83c732b10e95 423 uint8_t header = MQTTPUBLISH;
hudakz 2:ca17c5d846e7 424 if(retained) {
hudakz 0:83c732b10e95 425 header |= 1;
hudakz 0:83c732b10e95 426 }
hudakz 2:ca17c5d846e7 427
hudakz 2:ca17c5d846e7 428 return write(header, buffer, length - 5);
hudakz 0:83c732b10e95 429 }
hudakz 2:ca17c5d846e7 430
hudakz 0:83c732b10e95 431 return false;
hudakz 0:83c732b10e95 432 }
hudakz 0:83c732b10e95 433
hudakz 2:ca17c5d846e7 434 /**
hudakz 2:ca17c5d846e7 435 * @brief
hudakz 2:ca17c5d846e7 436 * @note
hudakz 2:ca17c5d846e7 437 * @param
hudakz 2:ca17c5d846e7 438 * @retval
hudakz 2:ca17c5d846e7 439 */
hudakz 2:ca17c5d846e7 440 bool MQTTClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
hudakz 0:83c732b10e95 441 uint8_t lenBuf[4];
hudakz 0:83c732b10e95 442 uint8_t llen = 0;
hudakz 0:83c732b10e95 443 uint8_t digit;
hudakz 0:83c732b10e95 444 uint8_t pos = 0;
hudakz 0:83c732b10e95 445 uint8_t rc;
hudakz 0:83c732b10e95 446 uint8_t len = length;
hudakz 2:ca17c5d846e7 447 do
hudakz 2:ca17c5d846e7 448 {
hudakz 0:83c732b10e95 449 digit = len % 128;
hudakz 0:83c732b10e95 450 len = len / 128;
hudakz 2:ca17c5d846e7 451 if(len > 0) {
hudakz 0:83c732b10e95 452 digit |= 0x80;
hudakz 0:83c732b10e95 453 }
hudakz 2:ca17c5d846e7 454
hudakz 0:83c732b10e95 455 lenBuf[pos++] = digit;
hudakz 0:83c732b10e95 456 llen++;
hudakz 2:ca17c5d846e7 457 } while(len > 0);
hudakz 0:83c732b10e95 458
hudakz 2:ca17c5d846e7 459 buf[4 - llen] = header;
hudakz 2:ca17c5d846e7 460 for(int i = 0; i < llen; i++) {
hudakz 2:ca17c5d846e7 461 buf[5 - llen + i] = lenBuf[i];
hudakz 0:83c732b10e95 462 }
hudakz 2:ca17c5d846e7 463
hudakz 2:ca17c5d846e7 464 rc = _client->write(buf + (4 - llen), length + 1 + llen);
hudakz 0:83c732b10e95 465
hudakz 0:83c732b10e95 466 lastOutActivity = time(NULL);
hudakz 2:ca17c5d846e7 467 return(rc == 1 + llen + length);
hudakz 0:83c732b10e95 468 }
hudakz 0:83c732b10e95 469
hudakz 2:ca17c5d846e7 470 /**
hudakz 2:ca17c5d846e7 471 * @brief
hudakz 2:ca17c5d846e7 472 * @note
hudakz 2:ca17c5d846e7 473 * @param
hudakz 2:ca17c5d846e7 474 * @retval
hudakz 2:ca17c5d846e7 475 */
hudakz 2:ca17c5d846e7 476 bool MQTTClient::subscribe(char* topic) {
hudakz 2:ca17c5d846e7 477 bool result = subscribe(topic, 0);
hudakz 2:ca17c5d846e7 478 wait_ms(50);
hudakz 2:ca17c5d846e7 479 return result;
hudakz 0:83c732b10e95 480 }
hudakz 0:83c732b10e95 481
hudakz 2:ca17c5d846e7 482 /**
hudakz 2:ca17c5d846e7 483 * @brief
hudakz 2:ca17c5d846e7 484 * @note
hudakz 2:ca17c5d846e7 485 * @param
hudakz 2:ca17c5d846e7 486 * @retval
hudakz 2:ca17c5d846e7 487 */
hudakz 2:ca17c5d846e7 488 bool MQTTClient::subscribe(char* topic, uint8_t qos) {
hudakz 2:ca17c5d846e7 489 if(qos > 1)
hudakz 0:83c732b10e95 490 return false;
hudakz 0:83c732b10e95 491
hudakz 2:ca17c5d846e7 492 if(connected()) {
hudakz 2:ca17c5d846e7 493
hudakz 0:83c732b10e95 494 // Leave room in the buffer for header and variable length field
hudakz 2:ca17c5d846e7 495 uint16_t length = 5;
hudakz 0:83c732b10e95 496 nextMsgId++;
hudakz 2:ca17c5d846e7 497 if(nextMsgId == 0) {
hudakz 0:83c732b10e95 498 nextMsgId = 1;
hudakz 0:83c732b10e95 499 }
hudakz 2:ca17c5d846e7 500
hudakz 0:83c732b10e95 501 buffer[length++] = (nextMsgId >> 8);
hudakz 0:83c732b10e95 502 buffer[length++] = (nextMsgId & 0xFF);
hudakz 2:ca17c5d846e7 503 length = writeString(topic, buffer, length);
hudakz 0:83c732b10e95 504 buffer[length++] = qos;
hudakz 2:ca17c5d846e7 505 return write(MQTTSUBSCRIBE | MQTTQOS1, buffer, length - 5);
hudakz 0:83c732b10e95 506 }
hudakz 2:ca17c5d846e7 507
hudakz 0:83c732b10e95 508 return false;
hudakz 0:83c732b10e95 509 }
hudakz 0:83c732b10e95 510
hudakz 2:ca17c5d846e7 511 /**
hudakz 2:ca17c5d846e7 512 * @brief
hudakz 2:ca17c5d846e7 513 * @note
hudakz 2:ca17c5d846e7 514 * @param
hudakz 2:ca17c5d846e7 515 * @retval
hudakz 2:ca17c5d846e7 516 */
hudakz 2:ca17c5d846e7 517 bool MQTTClient::unsubscribe(char* topic) {
hudakz 2:ca17c5d846e7 518 if(connected()) {
hudakz 2:ca17c5d846e7 519 uint16_t length = 5;
hudakz 0:83c732b10e95 520 nextMsgId++;
hudakz 2:ca17c5d846e7 521 if(nextMsgId == 0) {
hudakz 0:83c732b10e95 522 nextMsgId = 1;
hudakz 0:83c732b10e95 523 }
hudakz 2:ca17c5d846e7 524
hudakz 0:83c732b10e95 525 buffer[length++] = (nextMsgId >> 8);
hudakz 0:83c732b10e95 526 buffer[length++] = (nextMsgId & 0xFF);
hudakz 2:ca17c5d846e7 527 length = writeString(topic, buffer, length);
hudakz 2:ca17c5d846e7 528 return write(MQTTUNSUBSCRIBE | MQTTQOS1, buffer, length - 5);
hudakz 0:83c732b10e95 529 }
hudakz 2:ca17c5d846e7 530
hudakz 0:83c732b10e95 531 return false;
hudakz 0:83c732b10e95 532 }
hudakz 0:83c732b10e95 533
hudakz 2:ca17c5d846e7 534 /**
hudakz 2:ca17c5d846e7 535 * @brief
hudakz 2:ca17c5d846e7 536 * @note
hudakz 2:ca17c5d846e7 537 * @param
hudakz 2:ca17c5d846e7 538 * @retval
hudakz 2:ca17c5d846e7 539 */
hudakz 2:ca17c5d846e7 540 void MQTTClient::disconnect(void) {
hudakz 0:83c732b10e95 541 buffer[0] = MQTTDISCONNECT;
hudakz 0:83c732b10e95 542 buffer[1] = 0;
hudakz 2:ca17c5d846e7 543 _client->write(buffer, 2);
hudakz 0:83c732b10e95 544 _client->stop();
hudakz 0:83c732b10e95 545 lastInActivity = lastOutActivity = time(NULL);
hudakz 0:83c732b10e95 546 }
hudakz 0:83c732b10e95 547
hudakz 2:ca17c5d846e7 548 /**
hudakz 2:ca17c5d846e7 549 * @brief
hudakz 2:ca17c5d846e7 550 * @note
hudakz 2:ca17c5d846e7 551 * @param
hudakz 2:ca17c5d846e7 552 * @retval
hudakz 2:ca17c5d846e7 553 */
hudakz 2:ca17c5d846e7 554 uint16_t MQTTClient::writeString(char* string, uint8_t* buf, uint16_t pos) {
hudakz 2:ca17c5d846e7 555 char* idp = string;
hudakz 2:ca17c5d846e7 556 uint16_t i = 0;
hudakz 0:83c732b10e95 557 pos += 2;
hudakz 2:ca17c5d846e7 558 while(*idp) {
hudakz 0:83c732b10e95 559 buf[pos++] = *idp++;
hudakz 0:83c732b10e95 560 i++;
hudakz 0:83c732b10e95 561 }
hudakz 2:ca17c5d846e7 562
hudakz 2:ca17c5d846e7 563 buf[pos - i - 2] = (i >> 8);
hudakz 2:ca17c5d846e7 564 buf[pos - i - 1] = (i & 0xFF);
hudakz 0:83c732b10e95 565 return pos;
hudakz 0:83c732b10e95 566 }
hudakz 0:83c732b10e95 567
hudakz 2:ca17c5d846e7 568 /**
hudakz 2:ca17c5d846e7 569 * @brief
hudakz 2:ca17c5d846e7 570 * @note
hudakz 2:ca17c5d846e7 571 * @param
hudakz 2:ca17c5d846e7 572 * @retval
hudakz 2:ca17c5d846e7 573 */
hudakz 2:ca17c5d846e7 574 bool MQTTClient::connected(void) {
hudakz 2:ca17c5d846e7 575 bool rc;
hudakz 2:ca17c5d846e7 576 if(_client == NULL) {
hudakz 2:ca17c5d846e7 577 rc = false;
hudakz 2:ca17c5d846e7 578 }
hudakz 2:ca17c5d846e7 579 else {
hudakz 2:ca17c5d846e7 580 rc = (int)_client->connected();
hudakz 2:ca17c5d846e7 581 if(!rc)
hudakz 2:ca17c5d846e7 582 _client->stop();
hudakz 2:ca17c5d846e7 583 }
hudakz 0:83c732b10e95 584
hudakz 0:83c732b10e95 585 return rc;
hudakz 0:83c732b10e95 586 }
hudakz 0:83c732b10e95 587