Local fixes

Committer:
hudakz
Date:
Mon Sep 15 12:48:55 2014 +0000
Revision:
1:87da395325fc
Parent:
0:83c732b10e95
Child:
2:ca17c5d846e7
rev. 00

Who changed what in which revision?

UserRevisionLine numberNew contents of line
hudakz 0:83c732b10e95 1 /*
hudakz 0:83c732b10e95 2 MQTTClient.cpp - A simple client for MQTT.
hudakz 0:83c732b10e95 3 Nicholas O'Leary
hudakz 0:83c732b10e95 4 http://knolleary.net
hudakz 1:87da395325fc 5
hudakz 1:87da395325fc 6 Ported to mbed by Zoltan Hudak <hudakz@inbox.com>
hudakz 0:83c732b10e95 7 */
hudakz 0:83c732b10e95 8
hudakz 0:83c732b10e95 9 #include "MQTTClient.h"
hudakz 0:83c732b10e95 10 #include <string.h>
hudakz 0:83c732b10e95 11 #include <time.h>
hudakz 0:83c732b10e95 12
hudakz 0:83c732b10e95 13 MQTTClient::MQTTClient()
hudakz 0:83c732b10e95 14 {
hudakz 0:83c732b10e95 15 this->_client = NULL;
hudakz 0:83c732b10e95 16 this->stream = NULL;
hudakz 0:83c732b10e95 17 }
hudakz 0:83c732b10e95 18
hudakz 0:83c732b10e95 19 MQTTClient::MQTTClient(IPAddress& ip, uint16_t port, void (*onMessage)(char*,uint8_t*,unsigned int), Client& client)
hudakz 0:83c732b10e95 20 {
hudakz 0:83c732b10e95 21 this->_client = &client;
hudakz 0:83c732b10e95 22 this->onMessage = onMessage;
hudakz 0:83c732b10e95 23 this->ip = ip;
hudakz 0:83c732b10e95 24 this->port = port;
hudakz 0:83c732b10e95 25 this->domain = NULL;
hudakz 0:83c732b10e95 26 this->stream = NULL;
hudakz 0:83c732b10e95 27 }
hudakz 0:83c732b10e95 28
hudakz 0:83c732b10e95 29 MQTTClient::MQTTClient(char* domain, uint16_t port, void (*onMessage)(char*,uint8_t*,unsigned int), Client& client)
hudakz 0:83c732b10e95 30 {
hudakz 0:83c732b10e95 31 this->_client = &client;
hudakz 0:83c732b10e95 32 this->onMessage = onMessage;
hudakz 0:83c732b10e95 33 this->domain = domain;
hudakz 0:83c732b10e95 34 this->port = port;
hudakz 0:83c732b10e95 35 this->stream = NULL;
hudakz 0:83c732b10e95 36 }
hudakz 0:83c732b10e95 37
hudakz 0:83c732b10e95 38 MQTTClient::MQTTClient(IPAddress& ip, uint16_t port, void (*onMessage)(char*,uint8_t*,unsigned int), Client& client, Stream& stream)
hudakz 0:83c732b10e95 39 {
hudakz 0:83c732b10e95 40 this->_client = &client;
hudakz 0:83c732b10e95 41 this->onMessage = onMessage;
hudakz 0:83c732b10e95 42 this->ip = ip;
hudakz 0:83c732b10e95 43 this->port = port;
hudakz 0:83c732b10e95 44 this->domain = NULL;
hudakz 0:83c732b10e95 45 this->stream = &stream;
hudakz 0:83c732b10e95 46 }
hudakz 0:83c732b10e95 47
hudakz 0:83c732b10e95 48 MQTTClient::MQTTClient(char* domain, uint16_t port, void (*onMessage)(char*,uint8_t*,unsigned int), Client& client, Stream& stream)
hudakz 0:83c732b10e95 49 {
hudakz 0:83c732b10e95 50 this->_client = &client;
hudakz 0:83c732b10e95 51 this->onMessage = onMessage;
hudakz 0:83c732b10e95 52 this->domain = domain;
hudakz 0:83c732b10e95 53 this->port = port;
hudakz 0:83c732b10e95 54 this->stream = &stream;
hudakz 0:83c732b10e95 55 }
hudakz 0:83c732b10e95 56
hudakz 0:83c732b10e95 57 bool MQTTClient::connect(char *id)
hudakz 0:83c732b10e95 58 {
hudakz 0:83c732b10e95 59 return connect(id,NULL,NULL,0,0,0,0);
hudakz 0:83c732b10e95 60 }
hudakz 0:83c732b10e95 61
hudakz 0:83c732b10e95 62 bool MQTTClient::connect(char *id, char *user, char *pass)
hudakz 0:83c732b10e95 63 {
hudakz 0:83c732b10e95 64 return connect(id,user,pass,0,0,0,0);
hudakz 0:83c732b10e95 65 }
hudakz 0:83c732b10e95 66
hudakz 0:83c732b10e95 67 bool MQTTClient::connect(char *id, char* willTopic, uint8_t willQos, uint8_t willRetain, char* willMessage)
hudakz 0:83c732b10e95 68 {
hudakz 0:83c732b10e95 69 return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage);
hudakz 0:83c732b10e95 70 }
hudakz 0:83c732b10e95 71
hudakz 0:83c732b10e95 72 bool MQTTClient::connect(char *id, char *user, char *pass, char* willTopic, uint8_t willQos, uint8_t willRetain, char* willMessage)
hudakz 0:83c732b10e95 73 {
hudakz 0:83c732b10e95 74 if (!connected()) {
hudakz 0:83c732b10e95 75 int result = 0;
hudakz 0:83c732b10e95 76
hudakz 0:83c732b10e95 77 if (domain != NULL) {
hudakz 0:83c732b10e95 78 result = _client->connect(this->domain, this->port);
hudakz 0:83c732b10e95 79 } else {
hudakz 0:83c732b10e95 80 result = _client->connect(this->ip, this->port);
hudakz 0:83c732b10e95 81 }
hudakz 0:83c732b10e95 82
hudakz 0:83c732b10e95 83 if (result) {
hudakz 0:83c732b10e95 84 nextMsgId = 1;
hudakz 0:83c732b10e95 85 uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p',MQTTPROTOCOLVERSION};
hudakz 0:83c732b10e95 86 // Leave room in the buffer for header and variable length field
hudakz 0:83c732b10e95 87 uint16_t length = 5;
hudakz 0:83c732b10e95 88 unsigned int j;
hudakz 0:83c732b10e95 89 for (j = 0; j<9; j++) {
hudakz 0:83c732b10e95 90 buffer[length++] = d[j];
hudakz 0:83c732b10e95 91 }
hudakz 0:83c732b10e95 92
hudakz 0:83c732b10e95 93 uint8_t v;
hudakz 0:83c732b10e95 94 if (willTopic) {
hudakz 0:83c732b10e95 95 v = 0x06|(willQos<<3)|(willRetain<<5);
hudakz 0:83c732b10e95 96 } else {
hudakz 0:83c732b10e95 97 v = 0x02;
hudakz 0:83c732b10e95 98 }
hudakz 0:83c732b10e95 99
hudakz 0:83c732b10e95 100 if(user != NULL) {
hudakz 0:83c732b10e95 101 v = v|0x80;
hudakz 0:83c732b10e95 102
hudakz 0:83c732b10e95 103 if(pass != NULL) {
hudakz 0:83c732b10e95 104 v = v|(0x80>>1);
hudakz 0:83c732b10e95 105 }
hudakz 0:83c732b10e95 106 }
hudakz 0:83c732b10e95 107
hudakz 0:83c732b10e95 108 buffer[length++] = v;
hudakz 0:83c732b10e95 109
hudakz 0:83c732b10e95 110 buffer[length++] = ((MQTT_KEEPALIVE) >> 8);
hudakz 0:83c732b10e95 111 buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF);
hudakz 0:83c732b10e95 112 length = writeString(id,buffer,length);
hudakz 0:83c732b10e95 113 if (willTopic) {
hudakz 0:83c732b10e95 114 length = writeString(willTopic,buffer,length);
hudakz 0:83c732b10e95 115 length = writeString(willMessage,buffer,length);
hudakz 0:83c732b10e95 116 }
hudakz 0:83c732b10e95 117
hudakz 0:83c732b10e95 118 if(user != NULL) {
hudakz 0:83c732b10e95 119 length = writeString(user,buffer,length);
hudakz 0:83c732b10e95 120 if(pass != NULL) {
hudakz 0:83c732b10e95 121 length = writeString(pass,buffer,length);
hudakz 0:83c732b10e95 122 }
hudakz 0:83c732b10e95 123 }
hudakz 0:83c732b10e95 124
hudakz 0:83c732b10e95 125 write(MQTTCONNECT,buffer,length-5);
hudakz 0:83c732b10e95 126
hudakz 0:83c732b10e95 127 lastInActivity = lastOutActivity = time(NULL);
hudakz 0:83c732b10e95 128
hudakz 0:83c732b10e95 129 while (!_client->available()) {
hudakz 0:83c732b10e95 130 time_t t = time(NULL);
hudakz 0:83c732b10e95 131 if (t-lastInActivity > MQTT_KEEPALIVE) {
hudakz 0:83c732b10e95 132 _client->stop();
hudakz 0:83c732b10e95 133 return false;
hudakz 0:83c732b10e95 134 }
hudakz 0:83c732b10e95 135 }
hudakz 0:83c732b10e95 136 uint8_t llen;
hudakz 0:83c732b10e95 137 uint16_t len = readPacket(&llen);
hudakz 0:83c732b10e95 138
hudakz 0:83c732b10e95 139 if (len == 4 && buffer[3] == 0) {
hudakz 0:83c732b10e95 140 lastInActivity = time(NULL);
hudakz 0:83c732b10e95 141 pingOutstanding = false;
hudakz 0:83c732b10e95 142 return true;
hudakz 0:83c732b10e95 143 }
hudakz 0:83c732b10e95 144 }
hudakz 0:83c732b10e95 145 _client->stop();
hudakz 0:83c732b10e95 146 }
hudakz 0:83c732b10e95 147 return false;
hudakz 0:83c732b10e95 148 }
hudakz 0:83c732b10e95 149
hudakz 0:83c732b10e95 150 uint8_t MQTTClient::readByte()
hudakz 0:83c732b10e95 151 {
hudakz 0:83c732b10e95 152 while(!_client->available()) {}
hudakz 0:83c732b10e95 153 return _client->read();
hudakz 0:83c732b10e95 154 }
hudakz 0:83c732b10e95 155
hudakz 0:83c732b10e95 156 uint16_t MQTTClient::readPacket(uint8_t* lengthLength)
hudakz 0:83c732b10e95 157 {
hudakz 0:83c732b10e95 158 uint16_t len = 0;
hudakz 0:83c732b10e95 159 buffer[len++] = readByte();
hudakz 0:83c732b10e95 160 bool isPublish = (buffer[0]&0xF0) == MQTTPUBLISH;
hudakz 0:83c732b10e95 161 uint32_t multiplier = 1;
hudakz 0:83c732b10e95 162 uint16_t length = 0;
hudakz 0:83c732b10e95 163 uint8_t digit = 0;
hudakz 0:83c732b10e95 164 uint16_t skip = 0;
hudakz 0:83c732b10e95 165 uint8_t start = 0;
hudakz 0:83c732b10e95 166
hudakz 0:83c732b10e95 167 do {
hudakz 0:83c732b10e95 168 digit = readByte();
hudakz 0:83c732b10e95 169 buffer[len++] = digit;
hudakz 0:83c732b10e95 170 length += (digit & 127) * multiplier;
hudakz 0:83c732b10e95 171 multiplier *= 128;
hudakz 0:83c732b10e95 172 } while ((digit & 128) != 0);
hudakz 0:83c732b10e95 173 *lengthLength = len-1;
hudakz 0:83c732b10e95 174
hudakz 0:83c732b10e95 175 if (isPublish) {
hudakz 0:83c732b10e95 176 // Read in topic length to calculate bytes to skip over for Stream writing
hudakz 0:83c732b10e95 177 buffer[len++] = readByte();
hudakz 0:83c732b10e95 178 buffer[len++] = readByte();
hudakz 0:83c732b10e95 179 skip = (buffer[*lengthLength+1]<<8)+buffer[*lengthLength+2];
hudakz 0:83c732b10e95 180 start = 2;
hudakz 0:83c732b10e95 181 if (buffer[0]&MQTTQOS1) {
hudakz 0:83c732b10e95 182 // skip message id
hudakz 0:83c732b10e95 183 skip += 2;
hudakz 0:83c732b10e95 184 }
hudakz 0:83c732b10e95 185 }
hudakz 0:83c732b10e95 186
hudakz 0:83c732b10e95 187 for (uint16_t i = start; i<length; i++) {
hudakz 0:83c732b10e95 188 digit = readByte();
hudakz 0:83c732b10e95 189 if (this->stream) {
hudakz 0:83c732b10e95 190 if (isPublish && len-*lengthLength-2>skip) {
hudakz 0:83c732b10e95 191 this->stream->putc(digit);
hudakz 0:83c732b10e95 192 }
hudakz 0:83c732b10e95 193 }
hudakz 0:83c732b10e95 194 if (len < MQTT_MAX_PACKET_SIZE) {
hudakz 0:83c732b10e95 195 buffer[len] = digit;
hudakz 0:83c732b10e95 196 }
hudakz 0:83c732b10e95 197 len++;
hudakz 0:83c732b10e95 198 }
hudakz 0:83c732b10e95 199
hudakz 0:83c732b10e95 200 if (!this->stream && len > MQTT_MAX_PACKET_SIZE) {
hudakz 0:83c732b10e95 201 len = 0; // This will cause the packet to be ignored.
hudakz 0:83c732b10e95 202 }
hudakz 0:83c732b10e95 203
hudakz 0:83c732b10e95 204 return len;
hudakz 0:83c732b10e95 205 }
hudakz 0:83c732b10e95 206
hudakz 0:83c732b10e95 207 bool MQTTClient::loop()
hudakz 0:83c732b10e95 208 {
hudakz 0:83c732b10e95 209 if (connected()) {
hudakz 0:83c732b10e95 210 time_t t = time(NULL);
hudakz 0:83c732b10e95 211 if ((t - lastInActivity > MQTT_KEEPALIVE) || (t - lastOutActivity > MQTT_KEEPALIVE)) {
hudakz 0:83c732b10e95 212 if (pingOutstanding) {
hudakz 0:83c732b10e95 213 _client->stop();
hudakz 0:83c732b10e95 214 return false;
hudakz 0:83c732b10e95 215 } else {
hudakz 0:83c732b10e95 216 buffer[0] = MQTTPINGREQ;
hudakz 0:83c732b10e95 217 buffer[1] = 0;
hudakz 0:83c732b10e95 218 _client->write(buffer,2);
hudakz 0:83c732b10e95 219 lastOutActivity = t;
hudakz 0:83c732b10e95 220 lastInActivity = t;
hudakz 0:83c732b10e95 221 pingOutstanding = true;
hudakz 0:83c732b10e95 222 }
hudakz 0:83c732b10e95 223 }
hudakz 0:83c732b10e95 224 if (_client->available()) {
hudakz 0:83c732b10e95 225 uint8_t llen;
hudakz 0:83c732b10e95 226 uint16_t len = readPacket(&llen);
hudakz 0:83c732b10e95 227 uint16_t msgId = 0;
hudakz 0:83c732b10e95 228 uint8_t *payload;
hudakz 0:83c732b10e95 229 if (len > 0) {
hudakz 0:83c732b10e95 230 lastInActivity = t;
hudakz 0:83c732b10e95 231 uint8_t type = buffer[0]&0xF0;
hudakz 0:83c732b10e95 232 if (type == MQTTPUBLISH) {
hudakz 0:83c732b10e95 233 if (onMessage) {
hudakz 0:83c732b10e95 234 uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2];
hudakz 0:83c732b10e95 235 char topic[tl+1];
hudakz 0:83c732b10e95 236 for (uint16_t i=0; i<tl; i++) {
hudakz 0:83c732b10e95 237 topic[i] = buffer[llen+3+i];
hudakz 0:83c732b10e95 238 }
hudakz 0:83c732b10e95 239 topic[tl] = 0;
hudakz 0:83c732b10e95 240 // msgId only present for QOS>0
hudakz 0:83c732b10e95 241 if ((buffer[0]&0x06) == MQTTQOS1) {
hudakz 0:83c732b10e95 242 msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1];
hudakz 0:83c732b10e95 243 payload = buffer+llen+3+tl+2;
hudakz 0:83c732b10e95 244 onMessage(topic,payload,len-llen-3-tl-2);
hudakz 0:83c732b10e95 245
hudakz 0:83c732b10e95 246 buffer[0] = MQTTPUBACK;
hudakz 0:83c732b10e95 247 buffer[1] = 2;
hudakz 0:83c732b10e95 248 buffer[2] = (msgId >> 8);
hudakz 0:83c732b10e95 249 buffer[3] = (msgId & 0xFF);
hudakz 0:83c732b10e95 250 _client->write(buffer,4);
hudakz 0:83c732b10e95 251 lastOutActivity = t;
hudakz 0:83c732b10e95 252
hudakz 0:83c732b10e95 253 } else {
hudakz 0:83c732b10e95 254 payload = buffer+llen+3+tl;
hudakz 0:83c732b10e95 255 onMessage(topic,payload,len-llen-3-tl);
hudakz 0:83c732b10e95 256 }
hudakz 0:83c732b10e95 257 }
hudakz 0:83c732b10e95 258 } else if (type == MQTTPINGREQ) {
hudakz 0:83c732b10e95 259 buffer[0] = MQTTPINGRESP;
hudakz 0:83c732b10e95 260 buffer[1] = 0;
hudakz 0:83c732b10e95 261 _client->write(buffer,2);
hudakz 0:83c732b10e95 262 } else if (type == MQTTPINGRESP) {
hudakz 0:83c732b10e95 263 pingOutstanding = false;
hudakz 0:83c732b10e95 264 }
hudakz 0:83c732b10e95 265 }
hudakz 0:83c732b10e95 266 }
hudakz 0:83c732b10e95 267 return true;
hudakz 0:83c732b10e95 268 }
hudakz 0:83c732b10e95 269 return false;
hudakz 0:83c732b10e95 270 }
hudakz 0:83c732b10e95 271
hudakz 0:83c732b10e95 272 bool MQTTClient::publish(char* topic, char* payload)
hudakz 0:83c732b10e95 273 {
hudakz 0:83c732b10e95 274 return publish(topic,(uint8_t*)payload,strlen(payload),false);
hudakz 0:83c732b10e95 275 }
hudakz 0:83c732b10e95 276
hudakz 0:83c732b10e95 277 bool MQTTClient::publish(char* topic, uint8_t* payload, unsigned int plength)
hudakz 0:83c732b10e95 278 {
hudakz 0:83c732b10e95 279 return publish(topic, payload, plength, false);
hudakz 0:83c732b10e95 280 }
hudakz 0:83c732b10e95 281
hudakz 0:83c732b10e95 282 bool MQTTClient::publish(char* topic, uint8_t* payload, unsigned int plength, bool retained)
hudakz 0:83c732b10e95 283 {
hudakz 0:83c732b10e95 284 if (connected()) {
hudakz 0:83c732b10e95 285 // Leave room in the buffer for header and variable length field
hudakz 0:83c732b10e95 286 uint16_t length = 5;
hudakz 0:83c732b10e95 287 length = writeString(topic,buffer,length);
hudakz 0:83c732b10e95 288 uint16_t i;
hudakz 0:83c732b10e95 289 for (i=0; i<plength; i++) {
hudakz 0:83c732b10e95 290 buffer[length++] = payload[i];
hudakz 0:83c732b10e95 291 }
hudakz 0:83c732b10e95 292 uint8_t header = MQTTPUBLISH;
hudakz 0:83c732b10e95 293 if (retained) {
hudakz 0:83c732b10e95 294 header |= 1;
hudakz 0:83c732b10e95 295 }
hudakz 0:83c732b10e95 296 return write(header,buffer,length-5);
hudakz 0:83c732b10e95 297 }
hudakz 0:83c732b10e95 298 return false;
hudakz 0:83c732b10e95 299 }
hudakz 0:83c732b10e95 300
hudakz 0:83c732b10e95 301 bool MQTTClient::write(uint8_t header, uint8_t* buf, uint16_t length)
hudakz 0:83c732b10e95 302 {
hudakz 0:83c732b10e95 303 uint8_t lenBuf[4];
hudakz 0:83c732b10e95 304 uint8_t llen = 0;
hudakz 0:83c732b10e95 305 uint8_t digit;
hudakz 0:83c732b10e95 306 uint8_t pos = 0;
hudakz 0:83c732b10e95 307 uint8_t rc;
hudakz 0:83c732b10e95 308 uint8_t len = length;
hudakz 0:83c732b10e95 309 do {
hudakz 0:83c732b10e95 310 digit = len % 128;
hudakz 0:83c732b10e95 311 len = len / 128;
hudakz 0:83c732b10e95 312 if (len > 0) {
hudakz 0:83c732b10e95 313 digit |= 0x80;
hudakz 0:83c732b10e95 314 }
hudakz 0:83c732b10e95 315 lenBuf[pos++] = digit;
hudakz 0:83c732b10e95 316 llen++;
hudakz 0:83c732b10e95 317 } while(len>0);
hudakz 0:83c732b10e95 318
hudakz 0:83c732b10e95 319 buf[4-llen] = header;
hudakz 0:83c732b10e95 320 for (int i=0; i<llen; i++) {
hudakz 0:83c732b10e95 321 buf[5-llen+i] = lenBuf[i];
hudakz 0:83c732b10e95 322 }
hudakz 0:83c732b10e95 323 rc = _client->write(buf+(4-llen),length+1+llen);
hudakz 0:83c732b10e95 324
hudakz 0:83c732b10e95 325 lastOutActivity = time(NULL);
hudakz 0:83c732b10e95 326 return (rc == 1+llen+length);
hudakz 0:83c732b10e95 327 }
hudakz 0:83c732b10e95 328
hudakz 0:83c732b10e95 329 bool MQTTClient::subscribe(char* topic)
hudakz 0:83c732b10e95 330 {
hudakz 0:83c732b10e95 331 return subscribe(topic, 0);
hudakz 0:83c732b10e95 332 }
hudakz 0:83c732b10e95 333
hudakz 0:83c732b10e95 334 bool MQTTClient::subscribe(char* topic, uint8_t qos)
hudakz 0:83c732b10e95 335 {
hudakz 0:83c732b10e95 336 if (qos > 1)
hudakz 0:83c732b10e95 337 return false;
hudakz 0:83c732b10e95 338
hudakz 0:83c732b10e95 339 if (connected()) {
hudakz 0:83c732b10e95 340 // Leave room in the buffer for header and variable length field
hudakz 0:83c732b10e95 341 uint16_t length = 5;
hudakz 0:83c732b10e95 342 nextMsgId++;
hudakz 0:83c732b10e95 343 if (nextMsgId == 0) {
hudakz 0:83c732b10e95 344 nextMsgId = 1;
hudakz 0:83c732b10e95 345 }
hudakz 0:83c732b10e95 346 buffer[length++] = (nextMsgId >> 8);
hudakz 0:83c732b10e95 347 buffer[length++] = (nextMsgId & 0xFF);
hudakz 0:83c732b10e95 348 length = writeString(topic, buffer,length);
hudakz 0:83c732b10e95 349 buffer[length++] = qos;
hudakz 0:83c732b10e95 350 return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5);
hudakz 0:83c732b10e95 351 }
hudakz 0:83c732b10e95 352 return false;
hudakz 0:83c732b10e95 353 }
hudakz 0:83c732b10e95 354
hudakz 0:83c732b10e95 355 bool MQTTClient::unsubscribe(char* topic)
hudakz 0:83c732b10e95 356 {
hudakz 0:83c732b10e95 357 if (connected()) {
hudakz 0:83c732b10e95 358 uint16_t length = 5;
hudakz 0:83c732b10e95 359 nextMsgId++;
hudakz 0:83c732b10e95 360 if (nextMsgId == 0) {
hudakz 0:83c732b10e95 361 nextMsgId = 1;
hudakz 0:83c732b10e95 362 }
hudakz 0:83c732b10e95 363 buffer[length++] = (nextMsgId >> 8);
hudakz 0:83c732b10e95 364 buffer[length++] = (nextMsgId & 0xFF);
hudakz 0:83c732b10e95 365 length = writeString(topic, buffer,length);
hudakz 0:83c732b10e95 366 return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5);
hudakz 0:83c732b10e95 367 }
hudakz 0:83c732b10e95 368 return false;
hudakz 0:83c732b10e95 369 }
hudakz 0:83c732b10e95 370
hudakz 0:83c732b10e95 371 void MQTTClient::disconnect()
hudakz 0:83c732b10e95 372 {
hudakz 0:83c732b10e95 373 buffer[0] = MQTTDISCONNECT;
hudakz 0:83c732b10e95 374 buffer[1] = 0;
hudakz 0:83c732b10e95 375 _client->write(buffer,2);
hudakz 0:83c732b10e95 376 _client->stop();
hudakz 0:83c732b10e95 377 lastInActivity = lastOutActivity = time(NULL);
hudakz 0:83c732b10e95 378 }
hudakz 0:83c732b10e95 379
hudakz 0:83c732b10e95 380 uint16_t MQTTClient::writeString(char* string, uint8_t* buf, uint16_t pos)
hudakz 0:83c732b10e95 381 {
hudakz 0:83c732b10e95 382 char* idp = string;
hudakz 0:83c732b10e95 383 uint16_t i = 0;
hudakz 0:83c732b10e95 384 pos += 2;
hudakz 0:83c732b10e95 385 while (*idp) {
hudakz 0:83c732b10e95 386 buf[pos++] = *idp++;
hudakz 0:83c732b10e95 387 i++;
hudakz 0:83c732b10e95 388 }
hudakz 0:83c732b10e95 389 buf[pos-i-2] = (i >> 8);
hudakz 0:83c732b10e95 390 buf[pos-i-1] = (i & 0xFF);
hudakz 0:83c732b10e95 391 return pos;
hudakz 0:83c732b10e95 392 }
hudakz 0:83c732b10e95 393
hudakz 0:83c732b10e95 394
hudakz 0:83c732b10e95 395 bool MQTTClient::connected()
hudakz 0:83c732b10e95 396 {
hudakz 0:83c732b10e95 397 bool rc;
hudakz 0:83c732b10e95 398 if (_client == NULL ) {
hudakz 0:83c732b10e95 399 rc = false;
hudakz 0:83c732b10e95 400 } else {
hudakz 0:83c732b10e95 401 rc = (int)_client->connected();
hudakz 0:83c732b10e95 402 if (!rc) _client->stop();
hudakz 0:83c732b10e95 403 }
hudakz 0:83c732b10e95 404 return rc;
hudakz 0:83c732b10e95 405 }
hudakz 0:83c732b10e95 406
hudakz 0:83c732b10e95 407
hudakz 0:83c732b10e95 408
hudakz 1:87da395325fc 409