Initial port of the Arduino MQTT Client: http://knolleary.net/arduino-client-for-mqtt/ Updated with larger timeout and buffer sizes

Dependents:   mbed_mqtt_endpoint_ublox_ethernet mbed_mqtt_endpoint_ublox_cellular mbed_mqtt_endpoint_nxp

Fork of MQTT by Joerg Wende

Committer:
ansond
Date:
Mon Jun 30 22:17:23 2014 +0000
Revision:
9:f28fd918d58c
Parent:
8:40326dd71c95
updates for ublox cellular modem enablement

Who changed what in which revision?

UserRevisionLine numberNew contents of line
jwende 0:ca855d29545b 1 /*
jwende 0:ca855d29545b 2 PubSubClient.cpp - A simple client for MQTT.
jwende 0:ca855d29545b 3 Nicholas O'Leary
jwende 0:ca855d29545b 4 http://knolleary.net
jwende 0:ca855d29545b 5
jwende 0:ca855d29545b 6 initial port for mbed
jwende 0:ca855d29545b 7 Joerg Wende
jwende 0:ca855d29545b 8 https://twitter.com/joerg_wende
jwende 0:ca855d29545b 9 */
jwende 0:ca855d29545b 10
jwende 0:ca855d29545b 11 #include "PubSubClient.h"
jwende 0:ca855d29545b 12
jwende 0:ca855d29545b 13 Timer t;
ansond 8:40326dd71c95 14 //Serial pc1(USBTX, USBRX);
jwende 0:ca855d29545b 15
jwende 0:ca855d29545b 16
jwende 0:ca855d29545b 17 int millis()
jwende 0:ca855d29545b 18 {
jwende 0:ca855d29545b 19 return t.read_ms();
jwende 0:ca855d29545b 20 }
jwende 0:ca855d29545b 21
jwende 0:ca855d29545b 22 PubSubClient::PubSubClient()
jwende 0:ca855d29545b 23 {
jwende 0:ca855d29545b 24 }
jwende 0:ca855d29545b 25
jwende 0:ca855d29545b 26 PubSubClient::PubSubClient(char *ip, int port, void (*callback)(char*,char*,unsigned int))
jwende 0:ca855d29545b 27 {
jwende 0:ca855d29545b 28 this->callback = callback;
jwende 0:ca855d29545b 29 this->ip = ip;
jwende 0:ca855d29545b 30 this->port = port;
jwende 0:ca855d29545b 31 t.start();
jwende 0:ca855d29545b 32 }
jwende 0:ca855d29545b 33
jwende 0:ca855d29545b 34
jwende 0:ca855d29545b 35 bool PubSubClient::connect(char *id)
jwende 0:ca855d29545b 36 {
jwende 0:ca855d29545b 37 return connect(id,NULL,NULL,0,0,0,0);
jwende 0:ca855d29545b 38 }
jwende 0:ca855d29545b 39
jwende 0:ca855d29545b 40 bool PubSubClient::connect(char *id, char *user, char *pass)
jwende 0:ca855d29545b 41 {
jwende 0:ca855d29545b 42 return connect(id,user,pass,0,0,0,0);
jwende 0:ca855d29545b 43 }
jwende 0:ca855d29545b 44
jwende 0:ca855d29545b 45 bool PubSubClient::connect(char *id, char* willTopic, short willQos, short willRetain, char* willMessage)
jwende 0:ca855d29545b 46 {
jwende 0:ca855d29545b 47 return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage);
jwende 0:ca855d29545b 48 }
jwende 0:ca855d29545b 49
jwende 0:ca855d29545b 50 bool PubSubClient::connect(char *id, char *user, char *pass, char* willTopic, short willQos, short willRetain, char* willMessage)
jwende 0:ca855d29545b 51 {
jwende 0:ca855d29545b 52 if (!connected()) {
jwende 0:ca855d29545b 53 int result = 0;
jwende 0:ca855d29545b 54 result = _client.connect(this->ip, this->port);
jwende 0:ca855d29545b 55 _client.set_blocking(false, 1);
ansond 8:40326dd71c95 56 //pc1.printf("IP: %s\r\n",this->ip);
ansond 8:40326dd71c95 57 //pc1.printf("Port: %i\r\n",this->port);
ansond 8:40326dd71c95 58 //pc1.printf("Result: %i \r\n", result);
jwende 0:ca855d29545b 59
jwende 0:ca855d29545b 60 if (result==0) {
jwende 0:ca855d29545b 61 nextMsgId = 1;
jwende 0:ca855d29545b 62 char d[9] = {0x00,0x06,'M','Q','I','s','d','p',MQTTPROTOCOLVERSION};
jwende 0:ca855d29545b 63 // Leave room in the buffer for header and variable length field
jwende 0:ca855d29545b 64 int length = 5;
jwende 0:ca855d29545b 65 unsigned int j;
jwende 0:ca855d29545b 66 for (j = 0; j<9; j++) {
jwende 0:ca855d29545b 67 buffer[length++] = d[j];
jwende 0:ca855d29545b 68 }
jwende 0:ca855d29545b 69
jwende 0:ca855d29545b 70 char v;
jwende 0:ca855d29545b 71 if (willTopic) {
jwende 0:ca855d29545b 72 v = 0x06|(willQos<<3)|(willRetain<<5);
jwende 0:ca855d29545b 73 } else {
jwende 0:ca855d29545b 74 v = 0x02;
jwende 0:ca855d29545b 75 }
jwende 0:ca855d29545b 76
jwende 0:ca855d29545b 77 if(user != NULL) {
jwende 0:ca855d29545b 78 v = v|0x80;
jwende 0:ca855d29545b 79
jwende 0:ca855d29545b 80 if(pass != NULL) {
jwende 0:ca855d29545b 81 v = v|(0x80>>1);
jwende 0:ca855d29545b 82 }
jwende 0:ca855d29545b 83 }
jwende 0:ca855d29545b 84
jwende 0:ca855d29545b 85 buffer[length++] = v;
jwende 0:ca855d29545b 86
jwende 0:ca855d29545b 87 buffer[length++] = ((MQTT_KEEPALIVE) >> 8);
jwende 0:ca855d29545b 88 buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF);
jwende 0:ca855d29545b 89 length = writeString(id,buffer,length);
jwende 0:ca855d29545b 90 if (willTopic) {
jwende 0:ca855d29545b 91 length = writeString(willTopic,buffer,length);
jwende 0:ca855d29545b 92 length = writeString(willMessage,buffer,length);
jwende 0:ca855d29545b 93 }
jwende 0:ca855d29545b 94
jwende 0:ca855d29545b 95 if(user != NULL) {
jwende 0:ca855d29545b 96 length = writeString(user,buffer,length);
jwende 0:ca855d29545b 97 if(pass != NULL) {
jwende 0:ca855d29545b 98 length = writeString(pass,buffer,length);
jwende 0:ca855d29545b 99 }
jwende 0:ca855d29545b 100 }
ansond 8:40326dd71c95 101 //pc1.printf("Before MQTT Connect ... \r\n");
jwende 0:ca855d29545b 102 write(MQTTCONNECT,buffer,length-5);
jwende 0:ca855d29545b 103
jwende 0:ca855d29545b 104 lastInActivity = lastOutActivity = millis();
jwende 0:ca855d29545b 105
jwende 0:ca855d29545b 106 int llen=128;
jwende 0:ca855d29545b 107 int len =0;
jwende 0:ca855d29545b 108
jwende 0:ca855d29545b 109 while ((len=readPacket(llen))==0) {
jwende 0:ca855d29545b 110 unsigned long t = millis();
jwende 0:ca855d29545b 111 if (t-lastInActivity > MQTT_KEEPALIVE*1000UL) {
ansond 9:f28fd918d58c 112 #ifdef CELLULAR_NETWORK
ansond 9:f28fd918d58c 113 _client.close();
ansond 9:f28fd918d58c 114 #else
jwende 0:ca855d29545b 115 _client.close(true);
ansond 9:f28fd918d58c 116 #endif
jwende 0:ca855d29545b 117 return false;
jwende 0:ca855d29545b 118 }
jwende 0:ca855d29545b 119 }
ansond 8:40326dd71c95 120 //pc1.printf("after MQTT Connect ... %i\r\n",len);
jwende 0:ca855d29545b 121 if (len == 4 && buffer[3] == 0) {
jwende 0:ca855d29545b 122 lastInActivity = millis();
jwende 0:ca855d29545b 123 pingOutstanding = false;
jwende 0:ca855d29545b 124 return true;
jwende 0:ca855d29545b 125 }
jwende 0:ca855d29545b 126 }
ansond 9:f28fd918d58c 127 #ifdef CELLULAR_NETWORK
ansond 9:f28fd918d58c 128 _client.close();
ansond 9:f28fd918d58c 129 #else
jwende 0:ca855d29545b 130 _client.close(true);
ansond 9:f28fd918d58c 131 #endif
jwende 0:ca855d29545b 132 }
jwende 0:ca855d29545b 133 return false;
jwende 0:ca855d29545b 134 }
jwende 0:ca855d29545b 135
jwende 0:ca855d29545b 136
jwende 0:ca855d29545b 137 int PubSubClient::readPacket(int lengthLength)
jwende 0:ca855d29545b 138 {
jwende 0:ca855d29545b 139 int len = 0;
jwende 0:ca855d29545b 140 len = _client.receive_all(buffer,lengthLength);
jwende 0:ca855d29545b 141 return len;
jwende 0:ca855d29545b 142 }
jwende 0:ca855d29545b 143
jwende 0:ca855d29545b 144 bool PubSubClient::loop()
jwende 0:ca855d29545b 145 {
jwende 0:ca855d29545b 146 if (connected()) {
jwende 0:ca855d29545b 147 unsigned long t = millis();
jwende 0:ca855d29545b 148 if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) {
jwende 0:ca855d29545b 149 if (pingOutstanding) {
ansond 9:f28fd918d58c 150 #ifdef CELLULAR_NETWORK
ansond 9:f28fd918d58c 151 _client.close();
ansond 9:f28fd918d58c 152 #else
jwende 0:ca855d29545b 153 _client.close(true);
ansond 9:f28fd918d58c 154 #endif
jwende 0:ca855d29545b 155 return false;
jwende 0:ca855d29545b 156 } else {
jwende 0:ca855d29545b 157 buffer[0] = MQTTPINGREQ;
jwende 0:ca855d29545b 158 buffer[1] = 0;
jwende 0:ca855d29545b 159 _client.send(buffer,2);
jwende 0:ca855d29545b 160 lastOutActivity = t;
jwende 0:ca855d29545b 161 lastInActivity = t;
jwende 0:ca855d29545b 162 pingOutstanding = true;
jwende 0:ca855d29545b 163 }
jwende 0:ca855d29545b 164 }
jwende 0:ca855d29545b 165 int len;
jwende 0:ca855d29545b 166 int llen= 128;
jwende 0:ca855d29545b 167 if (!((len=readPacket(llen))==0)) {
jwende 0:ca855d29545b 168 if (len > 0) {
jwende 0:ca855d29545b 169 lastInActivity = t;
jwende 0:ca855d29545b 170 char type = buffer[0]&0xF0;
jwende 0:ca855d29545b 171 if (type == MQTTPUBLISH) {
jwende 0:ca855d29545b 172 if (callback) {
jwende 0:ca855d29545b 173 //pc1.printf("MQTTPUBLISH received ... %i\r\n",len);
jwende 0:ca855d29545b 174 int tl = (buffer[2]<<8)+buffer[3];
jwende 0:ca855d29545b 175 //pc1.printf("t1 ... %i\r\n",tl);
jwende 0:ca855d29545b 176 char topic[tl+1];
jwende 0:ca855d29545b 177 for (int i=0; i<tl; i++) {
jwende 0:ca855d29545b 178 topic[i] = buffer[4+i];
jwende 0:ca855d29545b 179 }
jwende 0:ca855d29545b 180 topic[tl] = 0;
jwende 0:ca855d29545b 181 //pc1.printf("MQTTPUBLISH Topic ... %s\r\n",topic);
jwende 0:ca855d29545b 182 // ignore msgID - only support QoS 0 subs
jwende 0:ca855d29545b 183 int t2 = len-4-tl;
jwende 0:ca855d29545b 184 //pc1.printf("t2 ... %i\r\n",t2);
jwende 0:ca855d29545b 185 char payload[t2+1];
jwende 0:ca855d29545b 186 for (int i=0; i<t2; i++) {
jwende 0:ca855d29545b 187 payload[i] = buffer[4+i+tl];
jwende 0:ca855d29545b 188 }
jwende 0:ca855d29545b 189 payload[t2] = 0;
jwende 0:ca855d29545b 190 //pc1.printf("MQTTPUBLISH Payload ... %s\r\n",payload);
jwende 0:ca855d29545b 191 callback(topic,payload,t2);
jwende 0:ca855d29545b 192 }
jwende 0:ca855d29545b 193 } else if (type == MQTTPINGREQ) {
jwende 0:ca855d29545b 194 buffer[0] = MQTTPINGRESP;
jwende 0:ca855d29545b 195 buffer[1] = 0;
jwende 0:ca855d29545b 196 _client.send(buffer,2);
jwende 0:ca855d29545b 197 } else if (type == MQTTPINGRESP) {
jwende 0:ca855d29545b 198 pingOutstanding = false;
jwende 0:ca855d29545b 199 }
jwende 0:ca855d29545b 200 }
jwende 0:ca855d29545b 201 }
jwende 0:ca855d29545b 202 return true;
jwende 0:ca855d29545b 203 }
jwende 0:ca855d29545b 204 return false;
jwende 0:ca855d29545b 205 }
jwende 0:ca855d29545b 206
jwende 0:ca855d29545b 207 bool PubSubClient::publish(char* topic, char* payload)
jwende 0:ca855d29545b 208 {
jwende 0:ca855d29545b 209 return publish(topic,payload,strlen(payload),false);
jwende 0:ca855d29545b 210 }
jwende 0:ca855d29545b 211
jwende 0:ca855d29545b 212 bool PubSubClient::publish(char* topic, char* payload, unsigned int plength)
jwende 0:ca855d29545b 213 {
jwende 0:ca855d29545b 214 return publish(topic, payload, plength, false);
jwende 0:ca855d29545b 215 }
jwende 0:ca855d29545b 216
jwende 0:ca855d29545b 217 bool PubSubClient::publish(char* topic, char* payload, unsigned int plength, bool retained)
jwende 0:ca855d29545b 218 {
jwende 0:ca855d29545b 219 if (connected()) {
jwende 0:ca855d29545b 220 // Leave room in the buffer for header and variable length field
jwende 0:ca855d29545b 221 //pc1.printf("in publish ... %s\r\n",topic);
jwende 0:ca855d29545b 222 //pc1.printf("in publish ... %s\r\n",payload);
jwende 0:ca855d29545b 223 int length = 5;
jwende 0:ca855d29545b 224 length = writeString(topic,buffer,length);
jwende 0:ca855d29545b 225 int i;
jwende 0:ca855d29545b 226 for (i=0; i<plength; i++) {
jwende 0:ca855d29545b 227 buffer[length++] = payload[i];
jwende 0:ca855d29545b 228 }
jwende 0:ca855d29545b 229 short header = MQTTPUBLISH;
jwende 0:ca855d29545b 230 if (retained) {
jwende 0:ca855d29545b 231 header |= 1;
jwende 0:ca855d29545b 232 }
jwende 0:ca855d29545b 233 return write(header,buffer,length-5);
jwende 0:ca855d29545b 234 }
jwende 0:ca855d29545b 235 return false;
jwende 0:ca855d29545b 236 }
jwende 0:ca855d29545b 237
jwende 0:ca855d29545b 238
jwende 0:ca855d29545b 239
jwende 0:ca855d29545b 240 bool PubSubClient::write(short header, char* buf, int length)
jwende 0:ca855d29545b 241 {
jwende 0:ca855d29545b 242 short lenBuf[4];
jwende 0:ca855d29545b 243 short llen = 0;
jwende 0:ca855d29545b 244 short digit;
jwende 0:ca855d29545b 245 short pos = 0;
jwende 0:ca855d29545b 246 short rc;
jwende 0:ca855d29545b 247 short len = length;
jwende 0:ca855d29545b 248 //pc1.printf("in write ... %d\r\n",length);
jwende 0:ca855d29545b 249 //pc1.printf("in write ... %s\r\n",buf);
jwende 0:ca855d29545b 250 do {
jwende 0:ca855d29545b 251 digit = len % 128;
jwende 0:ca855d29545b 252 len = len / 128;
jwende 0:ca855d29545b 253 if (len > 0) {
jwende 0:ca855d29545b 254 digit |= 0x80;
jwende 0:ca855d29545b 255 }
jwende 0:ca855d29545b 256 lenBuf[pos++] = digit;
jwende 0:ca855d29545b 257 llen++;
jwende 0:ca855d29545b 258 } while(len>0);
jwende 0:ca855d29545b 259
jwende 0:ca855d29545b 260 buf[4-llen] = header;
jwende 0:ca855d29545b 261 for (int i=0; i<llen; i++) {
jwende 0:ca855d29545b 262 buf[5-llen+i] = lenBuf[i];
jwende 0:ca855d29545b 263 }
jwende 0:ca855d29545b 264 rc = _client.send(buf+(4-llen),length+1+llen);
jwende 0:ca855d29545b 265
jwende 0:ca855d29545b 266 lastOutActivity = millis();
jwende 0:ca855d29545b 267 return (rc == 1+llen+length);
jwende 0:ca855d29545b 268 }
jwende 0:ca855d29545b 269
jwende 0:ca855d29545b 270 bool PubSubClient::subscribe(char* topic)
jwende 0:ca855d29545b 271 {
jwende 0:ca855d29545b 272 //pc1.printf("in subscribe ... %s\r\n",topic);
jwende 0:ca855d29545b 273 if (connected()) {
jwende 0:ca855d29545b 274 // Leave room in the buffer for header and variable length field
jwende 0:ca855d29545b 275 int length = 5;
jwende 0:ca855d29545b 276 nextMsgId++;
jwende 0:ca855d29545b 277 if (nextMsgId == 0) {
jwende 0:ca855d29545b 278 nextMsgId = 1;
jwende 0:ca855d29545b 279 }
jwende 0:ca855d29545b 280 buffer[length++] = (nextMsgId >> 8);
jwende 0:ca855d29545b 281 buffer[length++] = (nextMsgId & 0xFF);
jwende 0:ca855d29545b 282 length = writeString(topic, buffer,length);
jwende 0:ca855d29545b 283 buffer[length++] = 0; // Only do QoS 0 subs
jwende 0:ca855d29545b 284 return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5);
jwende 0:ca855d29545b 285 }
jwende 0:ca855d29545b 286 return false;
jwende 0:ca855d29545b 287 }
jwende 0:ca855d29545b 288
jwende 0:ca855d29545b 289 bool PubSubClient::unsubscribe(char* topic)
jwende 0:ca855d29545b 290 {
jwende 0:ca855d29545b 291 if (connected()) {
jwende 0:ca855d29545b 292 int length = 5;
jwende 0:ca855d29545b 293 nextMsgId++;
jwende 0:ca855d29545b 294 if (nextMsgId == 0) {
jwende 0:ca855d29545b 295 nextMsgId = 1;
jwende 0:ca855d29545b 296 }
jwende 0:ca855d29545b 297 buffer[length++] = (nextMsgId >> 8);
jwende 0:ca855d29545b 298 buffer[length++] = (nextMsgId & 0xFF);
jwende 0:ca855d29545b 299 length = writeString(topic, buffer,length);
jwende 0:ca855d29545b 300 return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5);
jwende 0:ca855d29545b 301 }
jwende 0:ca855d29545b 302 return false;
jwende 0:ca855d29545b 303 }
jwende 0:ca855d29545b 304
jwende 0:ca855d29545b 305 void PubSubClient::disconnect()
jwende 0:ca855d29545b 306 {
jwende 0:ca855d29545b 307 buffer[0] = MQTTDISCONNECT;
jwende 0:ca855d29545b 308 buffer[1] = 0;
jwende 0:ca855d29545b 309 _client.send(buffer,2);
ansond 9:f28fd918d58c 310 #ifdef CELLULAR_NETWORK
ansond 9:f28fd918d58c 311 _client.close();
ansond 9:f28fd918d58c 312 #else
jwende 0:ca855d29545b 313 _client.close(true);
ansond 9:f28fd918d58c 314 #endif
jwende 0:ca855d29545b 315 lastInActivity = lastOutActivity = millis();
jwende 0:ca855d29545b 316 }
jwende 0:ca855d29545b 317
jwende 0:ca855d29545b 318 int PubSubClient::writeString(char* string, char* buf, int pos)
jwende 0:ca855d29545b 319 {
jwende 0:ca855d29545b 320 char* idp = string;
jwende 0:ca855d29545b 321 int i = 0;
jwende 0:ca855d29545b 322 pos += 2;
jwende 0:ca855d29545b 323 while (*idp) {
jwende 0:ca855d29545b 324 buf[pos++] = *idp++;
jwende 0:ca855d29545b 325 i++;
jwende 0:ca855d29545b 326 }
jwende 0:ca855d29545b 327 buf[pos-i-2] = (i >> 8);
jwende 0:ca855d29545b 328 buf[pos-i-1] = (i & 0xFF);
jwende 0:ca855d29545b 329 return pos;
jwende 0:ca855d29545b 330 }
jwende 0:ca855d29545b 331
jwende 0:ca855d29545b 332
jwende 0:ca855d29545b 333 bool PubSubClient::connected()
jwende 0:ca855d29545b 334 {
jwende 0:ca855d29545b 335 bool rc;
jwende 0:ca855d29545b 336 rc = (int)_client.is_connected();
ansond 9:f28fd918d58c 337 #ifdef CELLULAR_NETWORK
ansond 9:f28fd918d58c 338 if (!rc) _client.close();
ansond 9:f28fd918d58c 339 #else
jwende 0:ca855d29545b 340 if (!rc) _client.close(true);
ansond 9:f28fd918d58c 341 #endif
jwende 0:ca855d29545b 342 return rc;
jwende 0:ca855d29545b 343 }