MQTT Client for ENC28J60 Ethernet modules. Depends on the UIPEthernet library.

Dependents:   mBuino_ENC28_MQTT MQTT_DHT11_ENC28J60 MQTT_DHT11_ENC28J60_tushar MQTT_Hello_ENC28J60

MQTT library for ENC28J60 Ethernet modules.

/media/uploads/hudakz/enc28j60_module01.jpg

Depends on the UIPEthernet library.

Example program:

Import programMQTT_Hello_ENC28J60

MQTT Client example program. Ethernet connection is via an ENC28J60 module.

Committer:
hudakz
Date:
Mon Sep 15 12:43:54 2014 +0000
Revision:
0:83c732b10e95
Child:
1:87da395325fc
rev. 00

Who changed what in which revision?

UserRevisionLine numberNew contents of line
hudakz 0:83c732b10e95 1 /*
hudakz 0:83c732b10e95 2 MQTTClient.cpp - A simple client for MQTT.
hudakz 0:83c732b10e95 3 Nicholas O'Leary
hudakz 0:83c732b10e95 4 http://knolleary.net
hudakz 0:83c732b10e95 5 */
hudakz 0:83c732b10e95 6
hudakz 0:83c732b10e95 7 #include "MQTTClient.h"
hudakz 0:83c732b10e95 8 #include <string.h>
hudakz 0:83c732b10e95 9 #include <time.h>
hudakz 0:83c732b10e95 10
hudakz 0:83c732b10e95 11 MQTTClient::MQTTClient()
hudakz 0:83c732b10e95 12 {
hudakz 0:83c732b10e95 13 this->_client = NULL;
hudakz 0:83c732b10e95 14 this->stream = NULL;
hudakz 0:83c732b10e95 15 }
hudakz 0:83c732b10e95 16
hudakz 0:83c732b10e95 17 MQTTClient::MQTTClient(IPAddress& ip, uint16_t port, void (*onMessage)(char*,uint8_t*,unsigned int), Client& client)
hudakz 0:83c732b10e95 18 {
hudakz 0:83c732b10e95 19 this->_client = &client;
hudakz 0:83c732b10e95 20 this->onMessage = onMessage;
hudakz 0:83c732b10e95 21 this->ip = ip;
hudakz 0:83c732b10e95 22 this->port = port;
hudakz 0:83c732b10e95 23 this->domain = NULL;
hudakz 0:83c732b10e95 24 this->stream = NULL;
hudakz 0:83c732b10e95 25 }
hudakz 0:83c732b10e95 26
hudakz 0:83c732b10e95 27 MQTTClient::MQTTClient(char* domain, uint16_t port, void (*onMessage)(char*,uint8_t*,unsigned int), Client& client)
hudakz 0:83c732b10e95 28 {
hudakz 0:83c732b10e95 29 this->_client = &client;
hudakz 0:83c732b10e95 30 this->onMessage = onMessage;
hudakz 0:83c732b10e95 31 this->domain = domain;
hudakz 0:83c732b10e95 32 this->port = port;
hudakz 0:83c732b10e95 33 this->stream = NULL;
hudakz 0:83c732b10e95 34 }
hudakz 0:83c732b10e95 35
hudakz 0:83c732b10e95 36 MQTTClient::MQTTClient(IPAddress& ip, uint16_t port, void (*onMessage)(char*,uint8_t*,unsigned int), Client& client, Stream& stream)
hudakz 0:83c732b10e95 37 {
hudakz 0:83c732b10e95 38 this->_client = &client;
hudakz 0:83c732b10e95 39 this->onMessage = onMessage;
hudakz 0:83c732b10e95 40 this->ip = ip;
hudakz 0:83c732b10e95 41 this->port = port;
hudakz 0:83c732b10e95 42 this->domain = NULL;
hudakz 0:83c732b10e95 43 this->stream = &stream;
hudakz 0:83c732b10e95 44 }
hudakz 0:83c732b10e95 45
hudakz 0:83c732b10e95 46 MQTTClient::MQTTClient(char* domain, uint16_t port, void (*onMessage)(char*,uint8_t*,unsigned int), Client& client, Stream& stream)
hudakz 0:83c732b10e95 47 {
hudakz 0:83c732b10e95 48 this->_client = &client;
hudakz 0:83c732b10e95 49 this->onMessage = onMessage;
hudakz 0:83c732b10e95 50 this->domain = domain;
hudakz 0:83c732b10e95 51 this->port = port;
hudakz 0:83c732b10e95 52 this->stream = &stream;
hudakz 0:83c732b10e95 53 }
hudakz 0:83c732b10e95 54
hudakz 0:83c732b10e95 55 bool MQTTClient::connect(char *id)
hudakz 0:83c732b10e95 56 {
hudakz 0:83c732b10e95 57 return connect(id,NULL,NULL,0,0,0,0);
hudakz 0:83c732b10e95 58 }
hudakz 0:83c732b10e95 59
hudakz 0:83c732b10e95 60 bool MQTTClient::connect(char *id, char *user, char *pass)
hudakz 0:83c732b10e95 61 {
hudakz 0:83c732b10e95 62 return connect(id,user,pass,0,0,0,0);
hudakz 0:83c732b10e95 63 }
hudakz 0:83c732b10e95 64
hudakz 0:83c732b10e95 65 bool MQTTClient::connect(char *id, char* willTopic, uint8_t willQos, uint8_t willRetain, char* willMessage)
hudakz 0:83c732b10e95 66 {
hudakz 0:83c732b10e95 67 return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage);
hudakz 0:83c732b10e95 68 }
hudakz 0:83c732b10e95 69
hudakz 0:83c732b10e95 70 bool MQTTClient::connect(char *id, char *user, char *pass, char* willTopic, uint8_t willQos, uint8_t willRetain, char* willMessage)
hudakz 0:83c732b10e95 71 {
hudakz 0:83c732b10e95 72 if (!connected()) {
hudakz 0:83c732b10e95 73 int result = 0;
hudakz 0:83c732b10e95 74
hudakz 0:83c732b10e95 75 if (domain != NULL) {
hudakz 0:83c732b10e95 76 result = _client->connect(this->domain, this->port);
hudakz 0:83c732b10e95 77 } else {
hudakz 0:83c732b10e95 78 result = _client->connect(this->ip, this->port);
hudakz 0:83c732b10e95 79 }
hudakz 0:83c732b10e95 80
hudakz 0:83c732b10e95 81 if (result) {
hudakz 0:83c732b10e95 82 nextMsgId = 1;
hudakz 0:83c732b10e95 83 uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p',MQTTPROTOCOLVERSION};
hudakz 0:83c732b10e95 84 // Leave room in the buffer for header and variable length field
hudakz 0:83c732b10e95 85 uint16_t length = 5;
hudakz 0:83c732b10e95 86 unsigned int j;
hudakz 0:83c732b10e95 87 for (j = 0; j<9; j++) {
hudakz 0:83c732b10e95 88 buffer[length++] = d[j];
hudakz 0:83c732b10e95 89 }
hudakz 0:83c732b10e95 90
hudakz 0:83c732b10e95 91 uint8_t v;
hudakz 0:83c732b10e95 92 if (willTopic) {
hudakz 0:83c732b10e95 93 v = 0x06|(willQos<<3)|(willRetain<<5);
hudakz 0:83c732b10e95 94 } else {
hudakz 0:83c732b10e95 95 v = 0x02;
hudakz 0:83c732b10e95 96 }
hudakz 0:83c732b10e95 97
hudakz 0:83c732b10e95 98 if(user != NULL) {
hudakz 0:83c732b10e95 99 v = v|0x80;
hudakz 0:83c732b10e95 100
hudakz 0:83c732b10e95 101 if(pass != NULL) {
hudakz 0:83c732b10e95 102 v = v|(0x80>>1);
hudakz 0:83c732b10e95 103 }
hudakz 0:83c732b10e95 104 }
hudakz 0:83c732b10e95 105
hudakz 0:83c732b10e95 106 buffer[length++] = v;
hudakz 0:83c732b10e95 107
hudakz 0:83c732b10e95 108 buffer[length++] = ((MQTT_KEEPALIVE) >> 8);
hudakz 0:83c732b10e95 109 buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF);
hudakz 0:83c732b10e95 110 length = writeString(id,buffer,length);
hudakz 0:83c732b10e95 111 if (willTopic) {
hudakz 0:83c732b10e95 112 length = writeString(willTopic,buffer,length);
hudakz 0:83c732b10e95 113 length = writeString(willMessage,buffer,length);
hudakz 0:83c732b10e95 114 }
hudakz 0:83c732b10e95 115
hudakz 0:83c732b10e95 116 if(user != NULL) {
hudakz 0:83c732b10e95 117 length = writeString(user,buffer,length);
hudakz 0:83c732b10e95 118 if(pass != NULL) {
hudakz 0:83c732b10e95 119 length = writeString(pass,buffer,length);
hudakz 0:83c732b10e95 120 }
hudakz 0:83c732b10e95 121 }
hudakz 0:83c732b10e95 122
hudakz 0:83c732b10e95 123 write(MQTTCONNECT,buffer,length-5);
hudakz 0:83c732b10e95 124
hudakz 0:83c732b10e95 125 lastInActivity = lastOutActivity = time(NULL);
hudakz 0:83c732b10e95 126
hudakz 0:83c732b10e95 127 while (!_client->available()) {
hudakz 0:83c732b10e95 128 time_t t = time(NULL);
hudakz 0:83c732b10e95 129 if (t-lastInActivity > MQTT_KEEPALIVE) {
hudakz 0:83c732b10e95 130 _client->stop();
hudakz 0:83c732b10e95 131 return false;
hudakz 0:83c732b10e95 132 }
hudakz 0:83c732b10e95 133 }
hudakz 0:83c732b10e95 134 uint8_t llen;
hudakz 0:83c732b10e95 135 uint16_t len = readPacket(&llen);
hudakz 0:83c732b10e95 136
hudakz 0:83c732b10e95 137 if (len == 4 && buffer[3] == 0) {
hudakz 0:83c732b10e95 138 lastInActivity = time(NULL);
hudakz 0:83c732b10e95 139 pingOutstanding = false;
hudakz 0:83c732b10e95 140 return true;
hudakz 0:83c732b10e95 141 }
hudakz 0:83c732b10e95 142 }
hudakz 0:83c732b10e95 143 _client->stop();
hudakz 0:83c732b10e95 144 }
hudakz 0:83c732b10e95 145 return false;
hudakz 0:83c732b10e95 146 }
hudakz 0:83c732b10e95 147
hudakz 0:83c732b10e95 148 uint8_t MQTTClient::readByte()
hudakz 0:83c732b10e95 149 {
hudakz 0:83c732b10e95 150 while(!_client->available()) {}
hudakz 0:83c732b10e95 151 return _client->read();
hudakz 0:83c732b10e95 152 }
hudakz 0:83c732b10e95 153
hudakz 0:83c732b10e95 154 uint16_t MQTTClient::readPacket(uint8_t* lengthLength)
hudakz 0:83c732b10e95 155 {
hudakz 0:83c732b10e95 156 uint16_t len = 0;
hudakz 0:83c732b10e95 157 buffer[len++] = readByte();
hudakz 0:83c732b10e95 158 bool isPublish = (buffer[0]&0xF0) == MQTTPUBLISH;
hudakz 0:83c732b10e95 159 uint32_t multiplier = 1;
hudakz 0:83c732b10e95 160 uint16_t length = 0;
hudakz 0:83c732b10e95 161 uint8_t digit = 0;
hudakz 0:83c732b10e95 162 uint16_t skip = 0;
hudakz 0:83c732b10e95 163 uint8_t start = 0;
hudakz 0:83c732b10e95 164
hudakz 0:83c732b10e95 165 do {
hudakz 0:83c732b10e95 166 digit = readByte();
hudakz 0:83c732b10e95 167 buffer[len++] = digit;
hudakz 0:83c732b10e95 168 length += (digit & 127) * multiplier;
hudakz 0:83c732b10e95 169 multiplier *= 128;
hudakz 0:83c732b10e95 170 } while ((digit & 128) != 0);
hudakz 0:83c732b10e95 171 *lengthLength = len-1;
hudakz 0:83c732b10e95 172
hudakz 0:83c732b10e95 173 if (isPublish) {
hudakz 0:83c732b10e95 174 // Read in topic length to calculate bytes to skip over for Stream writing
hudakz 0:83c732b10e95 175 buffer[len++] = readByte();
hudakz 0:83c732b10e95 176 buffer[len++] = readByte();
hudakz 0:83c732b10e95 177 skip = (buffer[*lengthLength+1]<<8)+buffer[*lengthLength+2];
hudakz 0:83c732b10e95 178 start = 2;
hudakz 0:83c732b10e95 179 if (buffer[0]&MQTTQOS1) {
hudakz 0:83c732b10e95 180 // skip message id
hudakz 0:83c732b10e95 181 skip += 2;
hudakz 0:83c732b10e95 182 }
hudakz 0:83c732b10e95 183 }
hudakz 0:83c732b10e95 184
hudakz 0:83c732b10e95 185 for (uint16_t i = start; i<length; i++) {
hudakz 0:83c732b10e95 186 digit = readByte();
hudakz 0:83c732b10e95 187 if (this->stream) {
hudakz 0:83c732b10e95 188 if (isPublish && len-*lengthLength-2>skip) {
hudakz 0:83c732b10e95 189 this->stream->putc(digit);
hudakz 0:83c732b10e95 190 }
hudakz 0:83c732b10e95 191 }
hudakz 0:83c732b10e95 192 if (len < MQTT_MAX_PACKET_SIZE) {
hudakz 0:83c732b10e95 193 buffer[len] = digit;
hudakz 0:83c732b10e95 194 }
hudakz 0:83c732b10e95 195 len++;
hudakz 0:83c732b10e95 196 }
hudakz 0:83c732b10e95 197
hudakz 0:83c732b10e95 198 if (!this->stream && len > MQTT_MAX_PACKET_SIZE) {
hudakz 0:83c732b10e95 199 len = 0; // This will cause the packet to be ignored.
hudakz 0:83c732b10e95 200 }
hudakz 0:83c732b10e95 201
hudakz 0:83c732b10e95 202 return len;
hudakz 0:83c732b10e95 203 }
hudakz 0:83c732b10e95 204
hudakz 0:83c732b10e95 205 bool MQTTClient::loop()
hudakz 0:83c732b10e95 206 {
hudakz 0:83c732b10e95 207 if (connected()) {
hudakz 0:83c732b10e95 208 time_t t = time(NULL);
hudakz 0:83c732b10e95 209 if ((t - lastInActivity > MQTT_KEEPALIVE) || (t - lastOutActivity > MQTT_KEEPALIVE)) {
hudakz 0:83c732b10e95 210 if (pingOutstanding) {
hudakz 0:83c732b10e95 211 _client->stop();
hudakz 0:83c732b10e95 212 return false;
hudakz 0:83c732b10e95 213 } else {
hudakz 0:83c732b10e95 214 buffer[0] = MQTTPINGREQ;
hudakz 0:83c732b10e95 215 buffer[1] = 0;
hudakz 0:83c732b10e95 216 _client->write(buffer,2);
hudakz 0:83c732b10e95 217 lastOutActivity = t;
hudakz 0:83c732b10e95 218 lastInActivity = t;
hudakz 0:83c732b10e95 219 pingOutstanding = true;
hudakz 0:83c732b10e95 220 }
hudakz 0:83c732b10e95 221 }
hudakz 0:83c732b10e95 222 if (_client->available()) {
hudakz 0:83c732b10e95 223 uint8_t llen;
hudakz 0:83c732b10e95 224 uint16_t len = readPacket(&llen);
hudakz 0:83c732b10e95 225 uint16_t msgId = 0;
hudakz 0:83c732b10e95 226 uint8_t *payload;
hudakz 0:83c732b10e95 227 if (len > 0) {
hudakz 0:83c732b10e95 228 lastInActivity = t;
hudakz 0:83c732b10e95 229 uint8_t type = buffer[0]&0xF0;
hudakz 0:83c732b10e95 230 if (type == MQTTPUBLISH) {
hudakz 0:83c732b10e95 231 if (onMessage) {
hudakz 0:83c732b10e95 232 uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2];
hudakz 0:83c732b10e95 233 char topic[tl+1];
hudakz 0:83c732b10e95 234 for (uint16_t i=0; i<tl; i++) {
hudakz 0:83c732b10e95 235 topic[i] = buffer[llen+3+i];
hudakz 0:83c732b10e95 236 }
hudakz 0:83c732b10e95 237 topic[tl] = 0;
hudakz 0:83c732b10e95 238 // msgId only present for QOS>0
hudakz 0:83c732b10e95 239 if ((buffer[0]&0x06) == MQTTQOS1) {
hudakz 0:83c732b10e95 240 msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1];
hudakz 0:83c732b10e95 241 payload = buffer+llen+3+tl+2;
hudakz 0:83c732b10e95 242 onMessage(topic,payload,len-llen-3-tl-2);
hudakz 0:83c732b10e95 243
hudakz 0:83c732b10e95 244 buffer[0] = MQTTPUBACK;
hudakz 0:83c732b10e95 245 buffer[1] = 2;
hudakz 0:83c732b10e95 246 buffer[2] = (msgId >> 8);
hudakz 0:83c732b10e95 247 buffer[3] = (msgId & 0xFF);
hudakz 0:83c732b10e95 248 _client->write(buffer,4);
hudakz 0:83c732b10e95 249 lastOutActivity = t;
hudakz 0:83c732b10e95 250
hudakz 0:83c732b10e95 251 } else {
hudakz 0:83c732b10e95 252 payload = buffer+llen+3+tl;
hudakz 0:83c732b10e95 253 onMessage(topic,payload,len-llen-3-tl);
hudakz 0:83c732b10e95 254 }
hudakz 0:83c732b10e95 255 }
hudakz 0:83c732b10e95 256 } else if (type == MQTTPINGREQ) {
hudakz 0:83c732b10e95 257 buffer[0] = MQTTPINGRESP;
hudakz 0:83c732b10e95 258 buffer[1] = 0;
hudakz 0:83c732b10e95 259 _client->write(buffer,2);
hudakz 0:83c732b10e95 260 } else if (type == MQTTPINGRESP) {
hudakz 0:83c732b10e95 261 pingOutstanding = false;
hudakz 0:83c732b10e95 262 }
hudakz 0:83c732b10e95 263 }
hudakz 0:83c732b10e95 264 }
hudakz 0:83c732b10e95 265 return true;
hudakz 0:83c732b10e95 266 }
hudakz 0:83c732b10e95 267 return false;
hudakz 0:83c732b10e95 268 }
hudakz 0:83c732b10e95 269
hudakz 0:83c732b10e95 270 bool MQTTClient::publish(char* topic, char* payload)
hudakz 0:83c732b10e95 271 {
hudakz 0:83c732b10e95 272 return publish(topic,(uint8_t*)payload,strlen(payload),false);
hudakz 0:83c732b10e95 273 }
hudakz 0:83c732b10e95 274
hudakz 0:83c732b10e95 275 bool MQTTClient::publish(char* topic, uint8_t* payload, unsigned int plength)
hudakz 0:83c732b10e95 276 {
hudakz 0:83c732b10e95 277 return publish(topic, payload, plength, false);
hudakz 0:83c732b10e95 278 }
hudakz 0:83c732b10e95 279
hudakz 0:83c732b10e95 280 bool MQTTClient::publish(char* topic, uint8_t* payload, unsigned int plength, bool retained)
hudakz 0:83c732b10e95 281 {
hudakz 0:83c732b10e95 282 if (connected()) {
hudakz 0:83c732b10e95 283 // Leave room in the buffer for header and variable length field
hudakz 0:83c732b10e95 284 uint16_t length = 5;
hudakz 0:83c732b10e95 285 length = writeString(topic,buffer,length);
hudakz 0:83c732b10e95 286 uint16_t i;
hudakz 0:83c732b10e95 287 for (i=0; i<plength; i++) {
hudakz 0:83c732b10e95 288 buffer[length++] = payload[i];
hudakz 0:83c732b10e95 289 }
hudakz 0:83c732b10e95 290 uint8_t header = MQTTPUBLISH;
hudakz 0:83c732b10e95 291 if (retained) {
hudakz 0:83c732b10e95 292 header |= 1;
hudakz 0:83c732b10e95 293 }
hudakz 0:83c732b10e95 294 return write(header,buffer,length-5);
hudakz 0:83c732b10e95 295 }
hudakz 0:83c732b10e95 296 return false;
hudakz 0:83c732b10e95 297 }
hudakz 0:83c732b10e95 298
hudakz 0:83c732b10e95 299 bool MQTTClient::write(uint8_t header, uint8_t* buf, uint16_t length)
hudakz 0:83c732b10e95 300 {
hudakz 0:83c732b10e95 301 uint8_t lenBuf[4];
hudakz 0:83c732b10e95 302 uint8_t llen = 0;
hudakz 0:83c732b10e95 303 uint8_t digit;
hudakz 0:83c732b10e95 304 uint8_t pos = 0;
hudakz 0:83c732b10e95 305 uint8_t rc;
hudakz 0:83c732b10e95 306 uint8_t len = length;
hudakz 0:83c732b10e95 307 do {
hudakz 0:83c732b10e95 308 digit = len % 128;
hudakz 0:83c732b10e95 309 len = len / 128;
hudakz 0:83c732b10e95 310 if (len > 0) {
hudakz 0:83c732b10e95 311 digit |= 0x80;
hudakz 0:83c732b10e95 312 }
hudakz 0:83c732b10e95 313 lenBuf[pos++] = digit;
hudakz 0:83c732b10e95 314 llen++;
hudakz 0:83c732b10e95 315 } while(len>0);
hudakz 0:83c732b10e95 316
hudakz 0:83c732b10e95 317 buf[4-llen] = header;
hudakz 0:83c732b10e95 318 for (int i=0; i<llen; i++) {
hudakz 0:83c732b10e95 319 buf[5-llen+i] = lenBuf[i];
hudakz 0:83c732b10e95 320 }
hudakz 0:83c732b10e95 321 rc = _client->write(buf+(4-llen),length+1+llen);
hudakz 0:83c732b10e95 322
hudakz 0:83c732b10e95 323 lastOutActivity = time(NULL);
hudakz 0:83c732b10e95 324 return (rc == 1+llen+length);
hudakz 0:83c732b10e95 325 }
hudakz 0:83c732b10e95 326
hudakz 0:83c732b10e95 327 bool MQTTClient::subscribe(char* topic)
hudakz 0:83c732b10e95 328 {
hudakz 0:83c732b10e95 329 return subscribe(topic, 0);
hudakz 0:83c732b10e95 330 }
hudakz 0:83c732b10e95 331
hudakz 0:83c732b10e95 332 bool MQTTClient::subscribe(char* topic, uint8_t qos)
hudakz 0:83c732b10e95 333 {
hudakz 0:83c732b10e95 334 if (qos > 1)
hudakz 0:83c732b10e95 335 return false;
hudakz 0:83c732b10e95 336
hudakz 0:83c732b10e95 337 if (connected()) {
hudakz 0:83c732b10e95 338 // Leave room in the buffer for header and variable length field
hudakz 0:83c732b10e95 339 uint16_t length = 5;
hudakz 0:83c732b10e95 340 nextMsgId++;
hudakz 0:83c732b10e95 341 if (nextMsgId == 0) {
hudakz 0:83c732b10e95 342 nextMsgId = 1;
hudakz 0:83c732b10e95 343 }
hudakz 0:83c732b10e95 344 buffer[length++] = (nextMsgId >> 8);
hudakz 0:83c732b10e95 345 buffer[length++] = (nextMsgId & 0xFF);
hudakz 0:83c732b10e95 346 length = writeString(topic, buffer,length);
hudakz 0:83c732b10e95 347 buffer[length++] = qos;
hudakz 0:83c732b10e95 348 return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5);
hudakz 0:83c732b10e95 349 }
hudakz 0:83c732b10e95 350 return false;
hudakz 0:83c732b10e95 351 }
hudakz 0:83c732b10e95 352
hudakz 0:83c732b10e95 353 bool MQTTClient::unsubscribe(char* topic)
hudakz 0:83c732b10e95 354 {
hudakz 0:83c732b10e95 355 if (connected()) {
hudakz 0:83c732b10e95 356 uint16_t length = 5;
hudakz 0:83c732b10e95 357 nextMsgId++;
hudakz 0:83c732b10e95 358 if (nextMsgId == 0) {
hudakz 0:83c732b10e95 359 nextMsgId = 1;
hudakz 0:83c732b10e95 360 }
hudakz 0:83c732b10e95 361 buffer[length++] = (nextMsgId >> 8);
hudakz 0:83c732b10e95 362 buffer[length++] = (nextMsgId & 0xFF);
hudakz 0:83c732b10e95 363 length = writeString(topic, buffer,length);
hudakz 0:83c732b10e95 364 return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5);
hudakz 0:83c732b10e95 365 }
hudakz 0:83c732b10e95 366 return false;
hudakz 0:83c732b10e95 367 }
hudakz 0:83c732b10e95 368
hudakz 0:83c732b10e95 369 void MQTTClient::disconnect()
hudakz 0:83c732b10e95 370 {
hudakz 0:83c732b10e95 371 buffer[0] = MQTTDISCONNECT;
hudakz 0:83c732b10e95 372 buffer[1] = 0;
hudakz 0:83c732b10e95 373 _client->write(buffer,2);
hudakz 0:83c732b10e95 374 _client->stop();
hudakz 0:83c732b10e95 375 lastInActivity = lastOutActivity = time(NULL);
hudakz 0:83c732b10e95 376 }
hudakz 0:83c732b10e95 377
hudakz 0:83c732b10e95 378 uint16_t MQTTClient::writeString(char* string, uint8_t* buf, uint16_t pos)
hudakz 0:83c732b10e95 379 {
hudakz 0:83c732b10e95 380 char* idp = string;
hudakz 0:83c732b10e95 381 uint16_t i = 0;
hudakz 0:83c732b10e95 382 pos += 2;
hudakz 0:83c732b10e95 383 while (*idp) {
hudakz 0:83c732b10e95 384 buf[pos++] = *idp++;
hudakz 0:83c732b10e95 385 i++;
hudakz 0:83c732b10e95 386 }
hudakz 0:83c732b10e95 387 buf[pos-i-2] = (i >> 8);
hudakz 0:83c732b10e95 388 buf[pos-i-1] = (i & 0xFF);
hudakz 0:83c732b10e95 389 return pos;
hudakz 0:83c732b10e95 390 }
hudakz 0:83c732b10e95 391
hudakz 0:83c732b10e95 392
hudakz 0:83c732b10e95 393 bool MQTTClient::connected()
hudakz 0:83c732b10e95 394 {
hudakz 0:83c732b10e95 395 bool rc;
hudakz 0:83c732b10e95 396 if (_client == NULL ) {
hudakz 0:83c732b10e95 397 rc = false;
hudakz 0:83c732b10e95 398 } else {
hudakz 0:83c732b10e95 399 rc = (int)_client->connected();
hudakz 0:83c732b10e95 400 if (!rc) _client->stop();
hudakz 0:83c732b10e95 401 }
hudakz 0:83c732b10e95 402 return rc;
hudakz 0:83c732b10e95 403 }
hudakz 0:83c732b10e95 404
hudakz 0:83c732b10e95 405
hudakz 0:83c732b10e95 406