MQTT client for mbed Port of the Arduino MQTT Client: http://knolleary.net/arduino-client-for-mqtt/

Committer:
dwijaybane
Date:
Thu Feb 04 06:46:15 2016 +0000
Revision:
1:05fe2e37b93d
Parent:
0:c492c0206878
keep alive timeout increased

Who changed what in which revision?

UserRevisionLine numberNew contents of line
dwijaybane 0:c492c0206878 1 /*
dwijaybane 0:c492c0206878 2 PubSubClient.cpp - A simple client for MQTT.
dwijaybane 0:c492c0206878 3 Nicholas O'Leary
dwijaybane 0:c492c0206878 4 http://knolleary.net
dwijaybane 0:c492c0206878 5
dwijaybane 0:c492c0206878 6 improved port for mbed
dwijaybane 0:c492c0206878 7 Edutech IoT Team
dwijaybane 0:c492c0206878 8 */
dwijaybane 0:c492c0206878 9
dwijaybane 0:c492c0206878 10 #include "PubSubClient.h"
dwijaybane 0:c492c0206878 11
dwijaybane 0:c492c0206878 12 //Serial pc1(USBTX, USBRX);
dwijaybane 0:c492c0206878 13
dwijaybane 0:c492c0206878 14
dwijaybane 0:c492c0206878 15 int PubSubClient::millis()
dwijaybane 0:c492c0206878 16 {
dwijaybane 0:c492c0206878 17 return t.read_ms();
dwijaybane 0:c492c0206878 18 }
dwijaybane 0:c492c0206878 19
dwijaybane 0:c492c0206878 20 PubSubClient::PubSubClient()
dwijaybane 0:c492c0206878 21 {
dwijaybane 0:c492c0206878 22 }
dwijaybane 0:c492c0206878 23
dwijaybane 0:c492c0206878 24 PubSubClient::PubSubClient(char *ip, int port, void (*callback)(char*,char*,unsigned int))
dwijaybane 0:c492c0206878 25 {
dwijaybane 0:c492c0206878 26 this->callback = callback;
dwijaybane 0:c492c0206878 27 this->ip = ip;
dwijaybane 0:c492c0206878 28 this->port = port;
dwijaybane 0:c492c0206878 29 t.start();
dwijaybane 0:c492c0206878 30 }
dwijaybane 0:c492c0206878 31
dwijaybane 0:c492c0206878 32
dwijaybane 0:c492c0206878 33 bool PubSubClient::connect(char *id)
dwijaybane 0:c492c0206878 34 {
dwijaybane 0:c492c0206878 35 return connect(id,NULL,NULL,0,0,0,0);
dwijaybane 0:c492c0206878 36 }
dwijaybane 0:c492c0206878 37
dwijaybane 0:c492c0206878 38 bool PubSubClient::connect(char *id, char *user, char *pass)
dwijaybane 0:c492c0206878 39 {
dwijaybane 0:c492c0206878 40 return connect(id,user,pass,0,0,0,0);
dwijaybane 0:c492c0206878 41 }
dwijaybane 0:c492c0206878 42
dwijaybane 0:c492c0206878 43 bool PubSubClient::connect(char *id, char* willTopic, short willQos, short willRetain, char* willMessage)
dwijaybane 0:c492c0206878 44 {
dwijaybane 0:c492c0206878 45 return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage);
dwijaybane 0:c492c0206878 46 }
dwijaybane 0:c492c0206878 47
dwijaybane 0:c492c0206878 48 bool PubSubClient::connect(char *id, char *user, char *pass, char* willTopic, short willQos, short willRetain, char* willMessage)
dwijaybane 0:c492c0206878 49 {
dwijaybane 0:c492c0206878 50 if (!connected()) {
dwijaybane 0:c492c0206878 51 int result = 0;
dwijaybane 0:c492c0206878 52 result = _client.connect(this->ip, this->port);
dwijaybane 0:c492c0206878 53 _client.set_blocking(false, 1);
dwijaybane 0:c492c0206878 54 //pc1.printf("IP: %s\r\n",this->ip);
dwijaybane 0:c492c0206878 55 //pc1.printf("Port: %i\r\n",this->port);
dwijaybane 0:c492c0206878 56 //pc1.printf("Result: %i \r\n", result);
dwijaybane 0:c492c0206878 57
dwijaybane 0:c492c0206878 58 if (result==0) {
dwijaybane 0:c492c0206878 59 nextMsgId = 1;
dwijaybane 0:c492c0206878 60 char d[9] = {0x00,0x06,'M','Q','I','s','d','p',MQTTPROTOCOLVERSION};
dwijaybane 0:c492c0206878 61 // Leave room in the buffer for header and variable length field
dwijaybane 0:c492c0206878 62 int length = 5;
dwijaybane 0:c492c0206878 63 unsigned int j;
dwijaybane 0:c492c0206878 64 for (j = 0; j<9; j++) {
dwijaybane 0:c492c0206878 65 buffer[length++] = d[j];
dwijaybane 0:c492c0206878 66 }
dwijaybane 0:c492c0206878 67
dwijaybane 0:c492c0206878 68 char v;
dwijaybane 0:c492c0206878 69 if (willTopic) {
dwijaybane 0:c492c0206878 70 v = 0x06|(willQos<<3)|(willRetain<<5);
dwijaybane 0:c492c0206878 71 } else {
dwijaybane 0:c492c0206878 72 v = 0x02;
dwijaybane 0:c492c0206878 73 }
dwijaybane 0:c492c0206878 74
dwijaybane 0:c492c0206878 75 if(user != NULL) {
dwijaybane 0:c492c0206878 76 v = v|0x80;
dwijaybane 0:c492c0206878 77
dwijaybane 0:c492c0206878 78 if(pass != NULL) {
dwijaybane 0:c492c0206878 79 v = v|(0x80>>1);
dwijaybane 0:c492c0206878 80 }
dwijaybane 0:c492c0206878 81 }
dwijaybane 0:c492c0206878 82
dwijaybane 0:c492c0206878 83 buffer[length++] = v;
dwijaybane 0:c492c0206878 84
dwijaybane 0:c492c0206878 85 buffer[length++] = ((MQTT_KEEPALIVE) >> 8);
dwijaybane 0:c492c0206878 86 buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF);
dwijaybane 0:c492c0206878 87 length = writeString(id,buffer,length);
dwijaybane 0:c492c0206878 88 if (willTopic) {
dwijaybane 0:c492c0206878 89 length = writeString(willTopic,buffer,length);
dwijaybane 0:c492c0206878 90 length = writeString(willMessage,buffer,length);
dwijaybane 0:c492c0206878 91 }
dwijaybane 0:c492c0206878 92
dwijaybane 0:c492c0206878 93 if(user != NULL) {
dwijaybane 0:c492c0206878 94 length = writeString(user,buffer,length);
dwijaybane 0:c492c0206878 95 if(pass != NULL) {
dwijaybane 0:c492c0206878 96 length = writeString(pass,buffer,length);
dwijaybane 0:c492c0206878 97 }
dwijaybane 0:c492c0206878 98 }
dwijaybane 0:c492c0206878 99 //pc1.printf("Before MQTT Connect ... \r\n");
dwijaybane 0:c492c0206878 100 write(MQTTCONNECT,buffer,length-5);
dwijaybane 0:c492c0206878 101
dwijaybane 0:c492c0206878 102 lastInActivity = lastOutActivity = millis();
dwijaybane 0:c492c0206878 103
dwijaybane 0:c492c0206878 104 int llen=128;
dwijaybane 0:c492c0206878 105 int len =0;
dwijaybane 0:c492c0206878 106
dwijaybane 0:c492c0206878 107 while ((len=readPacket(llen))==0) {
dwijaybane 0:c492c0206878 108 unsigned long t = millis();
dwijaybane 0:c492c0206878 109 if (t-lastInActivity > MQTT_KEEPALIVE*1000UL) {
dwijaybane 0:c492c0206878 110 _client.close(true);
dwijaybane 0:c492c0206878 111 return false;
dwijaybane 0:c492c0206878 112 }
dwijaybane 0:c492c0206878 113 }
dwijaybane 0:c492c0206878 114 //pc1.printf("after MQTT Connect ... %i\r\n",len);
dwijaybane 0:c492c0206878 115 if (len == 4 && buffer[3] == 0) {
dwijaybane 0:c492c0206878 116 lastInActivity = millis();
dwijaybane 0:c492c0206878 117 pingOutstanding = false;
dwijaybane 0:c492c0206878 118 return true;
dwijaybane 0:c492c0206878 119 }
dwijaybane 0:c492c0206878 120 }
dwijaybane 0:c492c0206878 121 _client.close(true);
dwijaybane 0:c492c0206878 122 }
dwijaybane 0:c492c0206878 123 return false;
dwijaybane 0:c492c0206878 124 }
dwijaybane 0:c492c0206878 125
dwijaybane 0:c492c0206878 126
dwijaybane 0:c492c0206878 127 int PubSubClient::readPacket(int lengthLength)
dwijaybane 0:c492c0206878 128 {
dwijaybane 0:c492c0206878 129 int len = 0;
dwijaybane 0:c492c0206878 130 len = _client.receive_all(buffer,lengthLength);
dwijaybane 0:c492c0206878 131 return len;
dwijaybane 0:c492c0206878 132 }
dwijaybane 0:c492c0206878 133
dwijaybane 0:c492c0206878 134 bool PubSubClient::loop()
dwijaybane 0:c492c0206878 135 {
dwijaybane 0:c492c0206878 136 if (connected()) {
dwijaybane 0:c492c0206878 137 unsigned long t = millis();
dwijaybane 0:c492c0206878 138 if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) {
dwijaybane 0:c492c0206878 139 if (pingOutstanding) {
dwijaybane 0:c492c0206878 140 _client.close(true);
dwijaybane 0:c492c0206878 141 return false;
dwijaybane 0:c492c0206878 142 } else {
dwijaybane 0:c492c0206878 143 buffer[0] = MQTTPINGREQ;
dwijaybane 0:c492c0206878 144 buffer[1] = 0;
dwijaybane 0:c492c0206878 145 _client.send(buffer,2);
dwijaybane 0:c492c0206878 146 lastOutActivity = t;
dwijaybane 0:c492c0206878 147 lastInActivity = t;
dwijaybane 0:c492c0206878 148 pingOutstanding = true;
dwijaybane 0:c492c0206878 149 }
dwijaybane 0:c492c0206878 150 }
dwijaybane 0:c492c0206878 151 int len;
dwijaybane 0:c492c0206878 152 int llen= 128;
dwijaybane 0:c492c0206878 153 if (!((len=readPacket(llen))==0)) {
dwijaybane 0:c492c0206878 154 if (len > 0) {
dwijaybane 0:c492c0206878 155 lastInActivity = t;
dwijaybane 0:c492c0206878 156 char type = buffer[0]&0xF0;
dwijaybane 0:c492c0206878 157 if (type == MQTTPUBLISH) {
dwijaybane 0:c492c0206878 158 if (callback) {
dwijaybane 0:c492c0206878 159 //pc1.printf("MQTTPUBLISH received ... %i\r\n",len);
dwijaybane 0:c492c0206878 160 int tl = (buffer[2]<<8)+buffer[3];
dwijaybane 0:c492c0206878 161 //pc1.printf("t1 ... %i\r\n",tl);
dwijaybane 0:c492c0206878 162 char topic[tl+1];
dwijaybane 0:c492c0206878 163 for (int i=0; i<tl; i++) {
dwijaybane 0:c492c0206878 164 topic[i] = buffer[4+i];
dwijaybane 0:c492c0206878 165 }
dwijaybane 0:c492c0206878 166 topic[tl] = 0;
dwijaybane 0:c492c0206878 167 //pc1.printf("MQTTPUBLISH Topic ... %s\r\n",topic);
dwijaybane 0:c492c0206878 168 // ignore msgID - only support QoS 0 subs
dwijaybane 0:c492c0206878 169 int t2 = len-4-tl;
dwijaybane 0:c492c0206878 170 //pc1.printf("t2 ... %i\r\n",t2);
dwijaybane 0:c492c0206878 171 char payload[t2+1];
dwijaybane 0:c492c0206878 172 for (int i=0; i<t2; i++) {
dwijaybane 0:c492c0206878 173 payload[i] = buffer[4+i+tl];
dwijaybane 0:c492c0206878 174 }
dwijaybane 0:c492c0206878 175 payload[t2] = 0;
dwijaybane 0:c492c0206878 176 //pc1.printf("MQTTPUBLISH Payload ... %s\r\n",payload);
dwijaybane 0:c492c0206878 177 callback(topic,payload,t2);
dwijaybane 0:c492c0206878 178 }
dwijaybane 0:c492c0206878 179 } else if (type == MQTTPINGREQ) {
dwijaybane 0:c492c0206878 180 buffer[0] = MQTTPINGRESP;
dwijaybane 0:c492c0206878 181 buffer[1] = 0;
dwijaybane 0:c492c0206878 182 _client.send(buffer,2);
dwijaybane 0:c492c0206878 183 } else if (type == MQTTPINGRESP) {
dwijaybane 0:c492c0206878 184 pingOutstanding = false;
dwijaybane 0:c492c0206878 185 }
dwijaybane 0:c492c0206878 186 }
dwijaybane 0:c492c0206878 187 }
dwijaybane 0:c492c0206878 188 return true;
dwijaybane 0:c492c0206878 189 }
dwijaybane 0:c492c0206878 190 return false;
dwijaybane 0:c492c0206878 191 }
dwijaybane 0:c492c0206878 192
dwijaybane 0:c492c0206878 193 bool PubSubClient::publish(char* topic, char* payload)
dwijaybane 0:c492c0206878 194 {
dwijaybane 0:c492c0206878 195 return publish(topic,payload,strlen(payload),false);
dwijaybane 0:c492c0206878 196 }
dwijaybane 0:c492c0206878 197
dwijaybane 0:c492c0206878 198 bool PubSubClient::publish(char* topic, char* payload, unsigned int plength)
dwijaybane 0:c492c0206878 199 {
dwijaybane 0:c492c0206878 200 return publish(topic, payload, plength, false);
dwijaybane 0:c492c0206878 201 }
dwijaybane 0:c492c0206878 202
dwijaybane 0:c492c0206878 203 bool PubSubClient::publish(char* topic, char* payload, unsigned int plength, bool retained)
dwijaybane 0:c492c0206878 204 {
dwijaybane 0:c492c0206878 205 if (connected()) {
dwijaybane 0:c492c0206878 206 // Leave room in the buffer for header and variable length field
dwijaybane 0:c492c0206878 207 //pc1.printf("in publish ... %s\r\n",topic);
dwijaybane 0:c492c0206878 208 //pc1.printf("in publish ... %s\r\n",payload);
dwijaybane 0:c492c0206878 209 int length = 5;
dwijaybane 0:c492c0206878 210 length = writeString(topic,buffer,length);
dwijaybane 0:c492c0206878 211 int i;
dwijaybane 0:c492c0206878 212 for (i=0; i<plength; i++) {
dwijaybane 0:c492c0206878 213 buffer[length++] = payload[i];
dwijaybane 0:c492c0206878 214 }
dwijaybane 0:c492c0206878 215 short header = MQTTPUBLISH;
dwijaybane 0:c492c0206878 216 if (retained) {
dwijaybane 0:c492c0206878 217 header |= 1;
dwijaybane 0:c492c0206878 218 }
dwijaybane 0:c492c0206878 219 return write(header,buffer,length-5);
dwijaybane 0:c492c0206878 220 }
dwijaybane 0:c492c0206878 221 return false;
dwijaybane 0:c492c0206878 222 }
dwijaybane 0:c492c0206878 223
dwijaybane 0:c492c0206878 224
dwijaybane 0:c492c0206878 225
dwijaybane 0:c492c0206878 226 bool PubSubClient::write(short header, char* buf, int length)
dwijaybane 0:c492c0206878 227 {
dwijaybane 0:c492c0206878 228 short lenBuf[4];
dwijaybane 0:c492c0206878 229 short llen = 0;
dwijaybane 0:c492c0206878 230 short digit;
dwijaybane 0:c492c0206878 231 short pos = 0;
dwijaybane 0:c492c0206878 232 short rc;
dwijaybane 0:c492c0206878 233 short len = length;
dwijaybane 0:c492c0206878 234 //pc1.printf("in write ... %d\r\n",length);
dwijaybane 0:c492c0206878 235 //pc1.printf("in write ... %s\r\n",buf);
dwijaybane 0:c492c0206878 236 do {
dwijaybane 0:c492c0206878 237 digit = len % 128;
dwijaybane 0:c492c0206878 238 len = len / 128;
dwijaybane 0:c492c0206878 239 if (len > 0) {
dwijaybane 0:c492c0206878 240 digit |= 0x80;
dwijaybane 0:c492c0206878 241 }
dwijaybane 0:c492c0206878 242 lenBuf[pos++] = digit;
dwijaybane 0:c492c0206878 243 llen++;
dwijaybane 0:c492c0206878 244 } while(len>0);
dwijaybane 0:c492c0206878 245
dwijaybane 0:c492c0206878 246 buf[4-llen] = header;
dwijaybane 0:c492c0206878 247 for (int i=0; i<llen; i++) {
dwijaybane 0:c492c0206878 248 buf[5-llen+i] = lenBuf[i];
dwijaybane 0:c492c0206878 249 }
dwijaybane 0:c492c0206878 250 rc = _client.send(buf+(4-llen),length+1+llen);
dwijaybane 0:c492c0206878 251
dwijaybane 0:c492c0206878 252 lastOutActivity = millis();
dwijaybane 0:c492c0206878 253 return (rc == 1+llen+length);
dwijaybane 0:c492c0206878 254 }
dwijaybane 0:c492c0206878 255
dwijaybane 0:c492c0206878 256 bool PubSubClient::subscribe(char* topic)
dwijaybane 0:c492c0206878 257 {
dwijaybane 0:c492c0206878 258 //pc1.printf("in subscribe ... %s\r\n",topic);
dwijaybane 0:c492c0206878 259 if (connected()) {
dwijaybane 0:c492c0206878 260 // Leave room in the buffer for header and variable length field
dwijaybane 0:c492c0206878 261 int length = 5;
dwijaybane 0:c492c0206878 262 nextMsgId++;
dwijaybane 0:c492c0206878 263 if (nextMsgId == 0) {
dwijaybane 0:c492c0206878 264 nextMsgId = 1;
dwijaybane 0:c492c0206878 265 }
dwijaybane 0:c492c0206878 266 buffer[length++] = (nextMsgId >> 8);
dwijaybane 0:c492c0206878 267 buffer[length++] = (nextMsgId & 0xFF);
dwijaybane 0:c492c0206878 268 length = writeString(topic, buffer,length);
dwijaybane 0:c492c0206878 269 buffer[length++] = 0; // Only do QoS 0 subs
dwijaybane 0:c492c0206878 270 return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5);
dwijaybane 0:c492c0206878 271 }
dwijaybane 0:c492c0206878 272 return false;
dwijaybane 0:c492c0206878 273 }
dwijaybane 0:c492c0206878 274
dwijaybane 0:c492c0206878 275 bool PubSubClient::unsubscribe(char* topic)
dwijaybane 0:c492c0206878 276 {
dwijaybane 0:c492c0206878 277 if (connected()) {
dwijaybane 0:c492c0206878 278 int length = 5;
dwijaybane 0:c492c0206878 279 nextMsgId++;
dwijaybane 0:c492c0206878 280 if (nextMsgId == 0) {
dwijaybane 0:c492c0206878 281 nextMsgId = 1;
dwijaybane 0:c492c0206878 282 }
dwijaybane 0:c492c0206878 283 buffer[length++] = (nextMsgId >> 8);
dwijaybane 0:c492c0206878 284 buffer[length++] = (nextMsgId & 0xFF);
dwijaybane 0:c492c0206878 285 length = writeString(topic, buffer,length);
dwijaybane 0:c492c0206878 286 return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5);
dwijaybane 0:c492c0206878 287 }
dwijaybane 0:c492c0206878 288 return false;
dwijaybane 0:c492c0206878 289 }
dwijaybane 0:c492c0206878 290
dwijaybane 0:c492c0206878 291 void PubSubClient::disconnect()
dwijaybane 0:c492c0206878 292 {
dwijaybane 0:c492c0206878 293 buffer[0] = MQTTDISCONNECT;
dwijaybane 0:c492c0206878 294 buffer[1] = 0;
dwijaybane 0:c492c0206878 295 _client.send(buffer,2);
dwijaybane 0:c492c0206878 296 _client.close(true);
dwijaybane 0:c492c0206878 297 lastInActivity = lastOutActivity = millis();
dwijaybane 0:c492c0206878 298 }
dwijaybane 0:c492c0206878 299
dwijaybane 0:c492c0206878 300 int PubSubClient::writeString(char* string, char* buf, int pos)
dwijaybane 0:c492c0206878 301 {
dwijaybane 0:c492c0206878 302 char* idp = string;
dwijaybane 0:c492c0206878 303 int i = 0;
dwijaybane 0:c492c0206878 304 pos += 2;
dwijaybane 0:c492c0206878 305 while (*idp) {
dwijaybane 0:c492c0206878 306 buf[pos++] = *idp++;
dwijaybane 0:c492c0206878 307 i++;
dwijaybane 0:c492c0206878 308 }
dwijaybane 0:c492c0206878 309 buf[pos-i-2] = (i >> 8);
dwijaybane 0:c492c0206878 310 buf[pos-i-1] = (i & 0xFF);
dwijaybane 0:c492c0206878 311 return pos;
dwijaybane 0:c492c0206878 312 }
dwijaybane 0:c492c0206878 313
dwijaybane 0:c492c0206878 314
dwijaybane 0:c492c0206878 315 bool PubSubClient::connected()
dwijaybane 0:c492c0206878 316 {
dwijaybane 0:c492c0206878 317 bool rc;
dwijaybane 0:c492c0206878 318 rc = (int)_client.is_connected();
dwijaybane 0:c492c0206878 319 if (!rc) _client.close(true);
dwijaybane 0:c492c0206878 320 return rc;
dwijaybane 0:c492c0206878 321 }