Initial port of the Arduino MQTT Client: http://knolleary.net/arduino-client-for-mqtt/ Updated with larger timeout and buffer sizes
Dependents: mbed_mqtt_endpoint_ublox_ethernet mbed_mqtt_endpoint_ublox_cellular mbed_mqtt_endpoint_nxp
Fork of MQTT by
PubSubClient.cpp@0:ca855d29545b, 2013-05-26 (annotated)
- Committer:
- jwende
- Date:
- Sun May 26 16:52:01 2013 +0000
- Revision:
- 0:ca855d29545b
- Child:
- 7:9e005abd6846
PubSubClient Library developed for Arduino - http://knolleary.net/arduino-client-for-mqtt/; Initial port to mbed using EthernetInterface
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
jwende | 0:ca855d29545b | 1 | /* |
jwende | 0:ca855d29545b | 2 | PubSubClient.cpp - A simple client for MQTT. |
jwende | 0:ca855d29545b | 3 | Nicholas O'Leary |
jwende | 0:ca855d29545b | 4 | http://knolleary.net |
jwende | 0:ca855d29545b | 5 | |
jwende | 0:ca855d29545b | 6 | initial port for mbed |
jwende | 0:ca855d29545b | 7 | Joerg Wende |
jwende | 0:ca855d29545b | 8 | https://twitter.com/joerg_wende |
jwende | 0:ca855d29545b | 9 | */ |
jwende | 0:ca855d29545b | 10 | |
jwende | 0:ca855d29545b | 11 | #include "PubSubClient.h" |
jwende | 0:ca855d29545b | 12 | |
jwende | 0:ca855d29545b | 13 | Timer t; |
jwende | 0:ca855d29545b | 14 | //Serial pc1(USBTX, USBRX); |
jwende | 0:ca855d29545b | 15 | |
jwende | 0:ca855d29545b | 16 | |
jwende | 0:ca855d29545b | 17 | int millis() |
jwende | 0:ca855d29545b | 18 | { |
jwende | 0:ca855d29545b | 19 | return t.read_ms(); |
jwende | 0:ca855d29545b | 20 | } |
jwende | 0:ca855d29545b | 21 | |
jwende | 0:ca855d29545b | 22 | PubSubClient::PubSubClient() |
jwende | 0:ca855d29545b | 23 | { |
jwende | 0:ca855d29545b | 24 | } |
jwende | 0:ca855d29545b | 25 | |
jwende | 0:ca855d29545b | 26 | PubSubClient::PubSubClient(char *ip, int port, void (*callback)(char*,char*,unsigned int)) |
jwende | 0:ca855d29545b | 27 | { |
jwende | 0:ca855d29545b | 28 | this->callback = callback; |
jwende | 0:ca855d29545b | 29 | this->ip = ip; |
jwende | 0:ca855d29545b | 30 | this->port = port; |
jwende | 0:ca855d29545b | 31 | t.start(); |
jwende | 0:ca855d29545b | 32 | } |
jwende | 0:ca855d29545b | 33 | |
jwende | 0:ca855d29545b | 34 | |
jwende | 0:ca855d29545b | 35 | bool PubSubClient::connect(char *id) |
jwende | 0:ca855d29545b | 36 | { |
jwende | 0:ca855d29545b | 37 | return connect(id,NULL,NULL,0,0,0,0); |
jwende | 0:ca855d29545b | 38 | } |
jwende | 0:ca855d29545b | 39 | |
jwende | 0:ca855d29545b | 40 | bool PubSubClient::connect(char *id, char *user, char *pass) |
jwende | 0:ca855d29545b | 41 | { |
jwende | 0:ca855d29545b | 42 | return connect(id,user,pass,0,0,0,0); |
jwende | 0:ca855d29545b | 43 | } |
jwende | 0:ca855d29545b | 44 | |
jwende | 0:ca855d29545b | 45 | bool PubSubClient::connect(char *id, char* willTopic, short willQos, short willRetain, char* willMessage) |
jwende | 0:ca855d29545b | 46 | { |
jwende | 0:ca855d29545b | 47 | return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage); |
jwende | 0:ca855d29545b | 48 | } |
jwende | 0:ca855d29545b | 49 | |
jwende | 0:ca855d29545b | 50 | bool PubSubClient::connect(char *id, char *user, char *pass, char* willTopic, short willQos, short willRetain, char* willMessage) |
jwende | 0:ca855d29545b | 51 | { |
jwende | 0:ca855d29545b | 52 | if (!connected()) { |
jwende | 0:ca855d29545b | 53 | int result = 0; |
jwende | 0:ca855d29545b | 54 | result = _client.connect(this->ip, this->port); |
jwende | 0:ca855d29545b | 55 | _client.set_blocking(false, 1); |
jwende | 0:ca855d29545b | 56 | //pc1.printf("IP: %s\r\n",this->ip); |
jwende | 0:ca855d29545b | 57 | //pc1.printf("Port: %i\r\n",this->port); |
jwende | 0:ca855d29545b | 58 | //pc1.printf("Result: %i \r\n", result); |
jwende | 0:ca855d29545b | 59 | |
jwende | 0:ca855d29545b | 60 | if (result==0) { |
jwende | 0:ca855d29545b | 61 | nextMsgId = 1; |
jwende | 0:ca855d29545b | 62 | char d[9] = {0x00,0x06,'M','Q','I','s','d','p',MQTTPROTOCOLVERSION}; |
jwende | 0:ca855d29545b | 63 | // Leave room in the buffer for header and variable length field |
jwende | 0:ca855d29545b | 64 | int length = 5; |
jwende | 0:ca855d29545b | 65 | unsigned int j; |
jwende | 0:ca855d29545b | 66 | for (j = 0; j<9; j++) { |
jwende | 0:ca855d29545b | 67 | buffer[length++] = d[j]; |
jwende | 0:ca855d29545b | 68 | } |
jwende | 0:ca855d29545b | 69 | |
jwende | 0:ca855d29545b | 70 | char v; |
jwende | 0:ca855d29545b | 71 | if (willTopic) { |
jwende | 0:ca855d29545b | 72 | v = 0x06|(willQos<<3)|(willRetain<<5); |
jwende | 0:ca855d29545b | 73 | } else { |
jwende | 0:ca855d29545b | 74 | v = 0x02; |
jwende | 0:ca855d29545b | 75 | } |
jwende | 0:ca855d29545b | 76 | |
jwende | 0:ca855d29545b | 77 | if(user != NULL) { |
jwende | 0:ca855d29545b | 78 | v = v|0x80; |
jwende | 0:ca855d29545b | 79 | |
jwende | 0:ca855d29545b | 80 | if(pass != NULL) { |
jwende | 0:ca855d29545b | 81 | v = v|(0x80>>1); |
jwende | 0:ca855d29545b | 82 | } |
jwende | 0:ca855d29545b | 83 | } |
jwende | 0:ca855d29545b | 84 | |
jwende | 0:ca855d29545b | 85 | buffer[length++] = v; |
jwende | 0:ca855d29545b | 86 | |
jwende | 0:ca855d29545b | 87 | buffer[length++] = ((MQTT_KEEPALIVE) >> 8); |
jwende | 0:ca855d29545b | 88 | buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF); |
jwende | 0:ca855d29545b | 89 | length = writeString(id,buffer,length); |
jwende | 0:ca855d29545b | 90 | if (willTopic) { |
jwende | 0:ca855d29545b | 91 | length = writeString(willTopic,buffer,length); |
jwende | 0:ca855d29545b | 92 | length = writeString(willMessage,buffer,length); |
jwende | 0:ca855d29545b | 93 | } |
jwende | 0:ca855d29545b | 94 | |
jwende | 0:ca855d29545b | 95 | if(user != NULL) { |
jwende | 0:ca855d29545b | 96 | length = writeString(user,buffer,length); |
jwende | 0:ca855d29545b | 97 | if(pass != NULL) { |
jwende | 0:ca855d29545b | 98 | length = writeString(pass,buffer,length); |
jwende | 0:ca855d29545b | 99 | } |
jwende | 0:ca855d29545b | 100 | } |
jwende | 0:ca855d29545b | 101 | //pc1.printf("Before MQTT Connect ... \r\n"); |
jwende | 0:ca855d29545b | 102 | write(MQTTCONNECT,buffer,length-5); |
jwende | 0:ca855d29545b | 103 | |
jwende | 0:ca855d29545b | 104 | lastInActivity = lastOutActivity = millis(); |
jwende | 0:ca855d29545b | 105 | |
jwende | 0:ca855d29545b | 106 | int llen=128; |
jwende | 0:ca855d29545b | 107 | int len =0; |
jwende | 0:ca855d29545b | 108 | |
jwende | 0:ca855d29545b | 109 | while ((len=readPacket(llen))==0) { |
jwende | 0:ca855d29545b | 110 | unsigned long t = millis(); |
jwende | 0:ca855d29545b | 111 | if (t-lastInActivity > MQTT_KEEPALIVE*1000UL) { |
jwende | 0:ca855d29545b | 112 | _client.close(true); |
jwende | 0:ca855d29545b | 113 | return false; |
jwende | 0:ca855d29545b | 114 | } |
jwende | 0:ca855d29545b | 115 | } |
jwende | 0:ca855d29545b | 116 | //pc1.printf("after MQTT Connect ... %i\r\n",len); |
jwende | 0:ca855d29545b | 117 | if (len == 4 && buffer[3] == 0) { |
jwende | 0:ca855d29545b | 118 | lastInActivity = millis(); |
jwende | 0:ca855d29545b | 119 | pingOutstanding = false; |
jwende | 0:ca855d29545b | 120 | return true; |
jwende | 0:ca855d29545b | 121 | } |
jwende | 0:ca855d29545b | 122 | } |
jwende | 0:ca855d29545b | 123 | _client.close(true); |
jwende | 0:ca855d29545b | 124 | } |
jwende | 0:ca855d29545b | 125 | return false; |
jwende | 0:ca855d29545b | 126 | } |
jwende | 0:ca855d29545b | 127 | |
jwende | 0:ca855d29545b | 128 | |
jwende | 0:ca855d29545b | 129 | int PubSubClient::readPacket(int lengthLength) |
jwende | 0:ca855d29545b | 130 | { |
jwende | 0:ca855d29545b | 131 | int len = 0; |
jwende | 0:ca855d29545b | 132 | len = _client.receive_all(buffer,lengthLength); |
jwende | 0:ca855d29545b | 133 | return len; |
jwende | 0:ca855d29545b | 134 | } |
jwende | 0:ca855d29545b | 135 | |
jwende | 0:ca855d29545b | 136 | bool PubSubClient::loop() |
jwende | 0:ca855d29545b | 137 | { |
jwende | 0:ca855d29545b | 138 | if (connected()) { |
jwende | 0:ca855d29545b | 139 | unsigned long t = millis(); |
jwende | 0:ca855d29545b | 140 | if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) { |
jwende | 0:ca855d29545b | 141 | if (pingOutstanding) { |
jwende | 0:ca855d29545b | 142 | _client.close(true); |
jwende | 0:ca855d29545b | 143 | return false; |
jwende | 0:ca855d29545b | 144 | } else { |
jwende | 0:ca855d29545b | 145 | buffer[0] = MQTTPINGREQ; |
jwende | 0:ca855d29545b | 146 | buffer[1] = 0; |
jwende | 0:ca855d29545b | 147 | _client.send(buffer,2); |
jwende | 0:ca855d29545b | 148 | lastOutActivity = t; |
jwende | 0:ca855d29545b | 149 | lastInActivity = t; |
jwende | 0:ca855d29545b | 150 | pingOutstanding = true; |
jwende | 0:ca855d29545b | 151 | } |
jwende | 0:ca855d29545b | 152 | } |
jwende | 0:ca855d29545b | 153 | int len; |
jwende | 0:ca855d29545b | 154 | int llen= 128; |
jwende | 0:ca855d29545b | 155 | if (!((len=readPacket(llen))==0)) { |
jwende | 0:ca855d29545b | 156 | if (len > 0) { |
jwende | 0:ca855d29545b | 157 | lastInActivity = t; |
jwende | 0:ca855d29545b | 158 | char type = buffer[0]&0xF0; |
jwende | 0:ca855d29545b | 159 | if (type == MQTTPUBLISH) { |
jwende | 0:ca855d29545b | 160 | if (callback) { |
jwende | 0:ca855d29545b | 161 | //pc1.printf("MQTTPUBLISH received ... %i\r\n",len); |
jwende | 0:ca855d29545b | 162 | int tl = (buffer[2]<<8)+buffer[3]; |
jwende | 0:ca855d29545b | 163 | //pc1.printf("t1 ... %i\r\n",tl); |
jwende | 0:ca855d29545b | 164 | char topic[tl+1]; |
jwende | 0:ca855d29545b | 165 | for (int i=0; i<tl; i++) { |
jwende | 0:ca855d29545b | 166 | topic[i] = buffer[4+i]; |
jwende | 0:ca855d29545b | 167 | } |
jwende | 0:ca855d29545b | 168 | topic[tl] = 0; |
jwende | 0:ca855d29545b | 169 | //pc1.printf("MQTTPUBLISH Topic ... %s\r\n",topic); |
jwende | 0:ca855d29545b | 170 | // ignore msgID - only support QoS 0 subs |
jwende | 0:ca855d29545b | 171 | int t2 = len-4-tl; |
jwende | 0:ca855d29545b | 172 | //pc1.printf("t2 ... %i\r\n",t2); |
jwende | 0:ca855d29545b | 173 | char payload[t2+1]; |
jwende | 0:ca855d29545b | 174 | for (int i=0; i<t2; i++) { |
jwende | 0:ca855d29545b | 175 | payload[i] = buffer[4+i+tl]; |
jwende | 0:ca855d29545b | 176 | } |
jwende | 0:ca855d29545b | 177 | payload[t2] = 0; |
jwende | 0:ca855d29545b | 178 | //pc1.printf("MQTTPUBLISH Payload ... %s\r\n",payload); |
jwende | 0:ca855d29545b | 179 | callback(topic,payload,t2); |
jwende | 0:ca855d29545b | 180 | } |
jwende | 0:ca855d29545b | 181 | } else if (type == MQTTPINGREQ) { |
jwende | 0:ca855d29545b | 182 | buffer[0] = MQTTPINGRESP; |
jwende | 0:ca855d29545b | 183 | buffer[1] = 0; |
jwende | 0:ca855d29545b | 184 | _client.send(buffer,2); |
jwende | 0:ca855d29545b | 185 | } else if (type == MQTTPINGRESP) { |
jwende | 0:ca855d29545b | 186 | pingOutstanding = false; |
jwende | 0:ca855d29545b | 187 | } |
jwende | 0:ca855d29545b | 188 | } |
jwende | 0:ca855d29545b | 189 | } |
jwende | 0:ca855d29545b | 190 | return true; |
jwende | 0:ca855d29545b | 191 | } |
jwende | 0:ca855d29545b | 192 | return false; |
jwende | 0:ca855d29545b | 193 | } |
jwende | 0:ca855d29545b | 194 | |
jwende | 0:ca855d29545b | 195 | bool PubSubClient::publish(char* topic, char* payload) |
jwende | 0:ca855d29545b | 196 | { |
jwende | 0:ca855d29545b | 197 | return publish(topic,payload,strlen(payload),false); |
jwende | 0:ca855d29545b | 198 | } |
jwende | 0:ca855d29545b | 199 | |
jwende | 0:ca855d29545b | 200 | bool PubSubClient::publish(char* topic, char* payload, unsigned int plength) |
jwende | 0:ca855d29545b | 201 | { |
jwende | 0:ca855d29545b | 202 | return publish(topic, payload, plength, false); |
jwende | 0:ca855d29545b | 203 | } |
jwende | 0:ca855d29545b | 204 | |
jwende | 0:ca855d29545b | 205 | bool PubSubClient::publish(char* topic, char* payload, unsigned int plength, bool retained) |
jwende | 0:ca855d29545b | 206 | { |
jwende | 0:ca855d29545b | 207 | if (connected()) { |
jwende | 0:ca855d29545b | 208 | // Leave room in the buffer for header and variable length field |
jwende | 0:ca855d29545b | 209 | //pc1.printf("in publish ... %s\r\n",topic); |
jwende | 0:ca855d29545b | 210 | //pc1.printf("in publish ... %s\r\n",payload); |
jwende | 0:ca855d29545b | 211 | int length = 5; |
jwende | 0:ca855d29545b | 212 | length = writeString(topic,buffer,length); |
jwende | 0:ca855d29545b | 213 | int i; |
jwende | 0:ca855d29545b | 214 | for (i=0; i<plength; i++) { |
jwende | 0:ca855d29545b | 215 | buffer[length++] = payload[i]; |
jwende | 0:ca855d29545b | 216 | } |
jwende | 0:ca855d29545b | 217 | short header = MQTTPUBLISH; |
jwende | 0:ca855d29545b | 218 | if (retained) { |
jwende | 0:ca855d29545b | 219 | header |= 1; |
jwende | 0:ca855d29545b | 220 | } |
jwende | 0:ca855d29545b | 221 | return write(header,buffer,length-5); |
jwende | 0:ca855d29545b | 222 | } |
jwende | 0:ca855d29545b | 223 | return false; |
jwende | 0:ca855d29545b | 224 | } |
jwende | 0:ca855d29545b | 225 | |
jwende | 0:ca855d29545b | 226 | |
jwende | 0:ca855d29545b | 227 | |
jwende | 0:ca855d29545b | 228 | bool PubSubClient::write(short header, char* buf, int length) |
jwende | 0:ca855d29545b | 229 | { |
jwende | 0:ca855d29545b | 230 | short lenBuf[4]; |
jwende | 0:ca855d29545b | 231 | short llen = 0; |
jwende | 0:ca855d29545b | 232 | short digit; |
jwende | 0:ca855d29545b | 233 | short pos = 0; |
jwende | 0:ca855d29545b | 234 | short rc; |
jwende | 0:ca855d29545b | 235 | short len = length; |
jwende | 0:ca855d29545b | 236 | //pc1.printf("in write ... %d\r\n",length); |
jwende | 0:ca855d29545b | 237 | //pc1.printf("in write ... %s\r\n",buf); |
jwende | 0:ca855d29545b | 238 | do { |
jwende | 0:ca855d29545b | 239 | digit = len % 128; |
jwende | 0:ca855d29545b | 240 | len = len / 128; |
jwende | 0:ca855d29545b | 241 | if (len > 0) { |
jwende | 0:ca855d29545b | 242 | digit |= 0x80; |
jwende | 0:ca855d29545b | 243 | } |
jwende | 0:ca855d29545b | 244 | lenBuf[pos++] = digit; |
jwende | 0:ca855d29545b | 245 | llen++; |
jwende | 0:ca855d29545b | 246 | } while(len>0); |
jwende | 0:ca855d29545b | 247 | |
jwende | 0:ca855d29545b | 248 | buf[4-llen] = header; |
jwende | 0:ca855d29545b | 249 | for (int i=0; i<llen; i++) { |
jwende | 0:ca855d29545b | 250 | buf[5-llen+i] = lenBuf[i]; |
jwende | 0:ca855d29545b | 251 | } |
jwende | 0:ca855d29545b | 252 | rc = _client.send(buf+(4-llen),length+1+llen); |
jwende | 0:ca855d29545b | 253 | |
jwende | 0:ca855d29545b | 254 | lastOutActivity = millis(); |
jwende | 0:ca855d29545b | 255 | return (rc == 1+llen+length); |
jwende | 0:ca855d29545b | 256 | } |
jwende | 0:ca855d29545b | 257 | |
jwende | 0:ca855d29545b | 258 | bool PubSubClient::subscribe(char* topic) |
jwende | 0:ca855d29545b | 259 | { |
jwende | 0:ca855d29545b | 260 | //pc1.printf("in subscribe ... %s\r\n",topic); |
jwende | 0:ca855d29545b | 261 | if (connected()) { |
jwende | 0:ca855d29545b | 262 | // Leave room in the buffer for header and variable length field |
jwende | 0:ca855d29545b | 263 | int length = 5; |
jwende | 0:ca855d29545b | 264 | nextMsgId++; |
jwende | 0:ca855d29545b | 265 | if (nextMsgId == 0) { |
jwende | 0:ca855d29545b | 266 | nextMsgId = 1; |
jwende | 0:ca855d29545b | 267 | } |
jwende | 0:ca855d29545b | 268 | buffer[length++] = (nextMsgId >> 8); |
jwende | 0:ca855d29545b | 269 | buffer[length++] = (nextMsgId & 0xFF); |
jwende | 0:ca855d29545b | 270 | length = writeString(topic, buffer,length); |
jwende | 0:ca855d29545b | 271 | buffer[length++] = 0; // Only do QoS 0 subs |
jwende | 0:ca855d29545b | 272 | return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5); |
jwende | 0:ca855d29545b | 273 | } |
jwende | 0:ca855d29545b | 274 | return false; |
jwende | 0:ca855d29545b | 275 | } |
jwende | 0:ca855d29545b | 276 | |
jwende | 0:ca855d29545b | 277 | bool PubSubClient::unsubscribe(char* topic) |
jwende | 0:ca855d29545b | 278 | { |
jwende | 0:ca855d29545b | 279 | if (connected()) { |
jwende | 0:ca855d29545b | 280 | int length = 5; |
jwende | 0:ca855d29545b | 281 | nextMsgId++; |
jwende | 0:ca855d29545b | 282 | if (nextMsgId == 0) { |
jwende | 0:ca855d29545b | 283 | nextMsgId = 1; |
jwende | 0:ca855d29545b | 284 | } |
jwende | 0:ca855d29545b | 285 | buffer[length++] = (nextMsgId >> 8); |
jwende | 0:ca855d29545b | 286 | buffer[length++] = (nextMsgId & 0xFF); |
jwende | 0:ca855d29545b | 287 | length = writeString(topic, buffer,length); |
jwende | 0:ca855d29545b | 288 | return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5); |
jwende | 0:ca855d29545b | 289 | } |
jwende | 0:ca855d29545b | 290 | return false; |
jwende | 0:ca855d29545b | 291 | } |
jwende | 0:ca855d29545b | 292 | |
jwende | 0:ca855d29545b | 293 | void PubSubClient::disconnect() |
jwende | 0:ca855d29545b | 294 | { |
jwende | 0:ca855d29545b | 295 | buffer[0] = MQTTDISCONNECT; |
jwende | 0:ca855d29545b | 296 | buffer[1] = 0; |
jwende | 0:ca855d29545b | 297 | _client.send(buffer,2); |
jwende | 0:ca855d29545b | 298 | _client.close(true); |
jwende | 0:ca855d29545b | 299 | lastInActivity = lastOutActivity = millis(); |
jwende | 0:ca855d29545b | 300 | } |
jwende | 0:ca855d29545b | 301 | |
jwende | 0:ca855d29545b | 302 | int PubSubClient::writeString(char* string, char* buf, int pos) |
jwende | 0:ca855d29545b | 303 | { |
jwende | 0:ca855d29545b | 304 | char* idp = string; |
jwende | 0:ca855d29545b | 305 | int i = 0; |
jwende | 0:ca855d29545b | 306 | pos += 2; |
jwende | 0:ca855d29545b | 307 | while (*idp) { |
jwende | 0:ca855d29545b | 308 | buf[pos++] = *idp++; |
jwende | 0:ca855d29545b | 309 | i++; |
jwende | 0:ca855d29545b | 310 | } |
jwende | 0:ca855d29545b | 311 | buf[pos-i-2] = (i >> 8); |
jwende | 0:ca855d29545b | 312 | buf[pos-i-1] = (i & 0xFF); |
jwende | 0:ca855d29545b | 313 | return pos; |
jwende | 0:ca855d29545b | 314 | } |
jwende | 0:ca855d29545b | 315 | |
jwende | 0:ca855d29545b | 316 | |
jwende | 0:ca855d29545b | 317 | bool PubSubClient::connected() |
jwende | 0:ca855d29545b | 318 | { |
jwende | 0:ca855d29545b | 319 | bool rc; |
jwende | 0:ca855d29545b | 320 | rc = (int)_client.is_connected(); |
jwende | 0:ca855d29545b | 321 | if (!rc) _client.close(true); |
jwende | 0:ca855d29545b | 322 | return rc; |
jwende | 0:ca855d29545b | 323 | } |