Arch GPRS MQTT demo

Dependencies:   GPRSInterface USBDevice mbed

Fork of Seeed_HTTPClient_GPRSInterface_HelloWorld by Seeed

Committer:
yihui
Date:
Fri Apr 03 08:29:43 2015 +0000
Revision:
3:9dc67659d945
Arch GPRS MQTT demo

Who changed what in which revision?

UserRevisionLine numberNew contents of line
yihui 3:9dc67659d945 1 /*
yihui 3:9dc67659d945 2 PubSubClient.cpp - A simple client for MQTT.
yihui 3:9dc67659d945 3 Nicholas O'Leary
yihui 3:9dc67659d945 4 http://knolleary.net
yihui 3:9dc67659d945 5
yihui 3:9dc67659d945 6 initial port for mbed
yihui 3:9dc67659d945 7 Joerg Wende
yihui 3:9dc67659d945 8 https://twitter.com/joerg_wende
yihui 3:9dc67659d945 9 */
yihui 3:9dc67659d945 10
yihui 3:9dc67659d945 11
yihui 3:9dc67659d945 12
yihui 3:9dc67659d945 13 #include "PubSubClient.h"
yihui 3:9dc67659d945 14
yihui 3:9dc67659d945 15 #include "USBSerial.h"
yihui 3:9dc67659d945 16
yihui 3:9dc67659d945 17 extern USBSerial pc;
yihui 3:9dc67659d945 18
yihui 3:9dc67659d945 19 #define LOG(args...) pc.printf(args)
yihui 3:9dc67659d945 20
yihui 3:9dc67659d945 21 Timer t;
yihui 3:9dc67659d945 22
yihui 3:9dc67659d945 23 int millis()
yihui 3:9dc67659d945 24 {
yihui 3:9dc67659d945 25 return t.read_ms();
yihui 3:9dc67659d945 26 }
yihui 3:9dc67659d945 27
yihui 3:9dc67659d945 28 PubSubClient::PubSubClient()
yihui 3:9dc67659d945 29 {
yihui 3:9dc67659d945 30 }
yihui 3:9dc67659d945 31
yihui 3:9dc67659d945 32 PubSubClient::PubSubClient(char *ip, int port, void (*callback)(char*,char*,unsigned int))
yihui 3:9dc67659d945 33 {
yihui 3:9dc67659d945 34 this->callback = callback;
yihui 3:9dc67659d945 35 this->ip = ip;
yihui 3:9dc67659d945 36 this->port = port;
yihui 3:9dc67659d945 37 t.start();
yihui 3:9dc67659d945 38 }
yihui 3:9dc67659d945 39
yihui 3:9dc67659d945 40
yihui 3:9dc67659d945 41 bool PubSubClient::connect(char *id)
yihui 3:9dc67659d945 42 {
yihui 3:9dc67659d945 43 return connect(id,NULL,NULL,0,0,0,0);
yihui 3:9dc67659d945 44 }
yihui 3:9dc67659d945 45
yihui 3:9dc67659d945 46 bool PubSubClient::connect(char *id, char *user, char *pass)
yihui 3:9dc67659d945 47 {
yihui 3:9dc67659d945 48 return connect(id,user,pass,0,0,0,0);
yihui 3:9dc67659d945 49 }
yihui 3:9dc67659d945 50
yihui 3:9dc67659d945 51 bool PubSubClient::connect(char *id, char* willTopic, short willQos, short willRetain, char* willMessage)
yihui 3:9dc67659d945 52 {
yihui 3:9dc67659d945 53 return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage);
yihui 3:9dc67659d945 54 }
yihui 3:9dc67659d945 55
yihui 3:9dc67659d945 56 bool PubSubClient::connect(char *id, char *user, char *pass, char* willTopic, short willQos, short willRetain, char* willMessage)
yihui 3:9dc67659d945 57 {
yihui 3:9dc67659d945 58 if (!connected()) {
yihui 3:9dc67659d945 59 int result = 0;
yihui 3:9dc67659d945 60 result = _client.connect(this->ip, this->port);
yihui 3:9dc67659d945 61 _client.set_blocking(false, 1);
yihui 3:9dc67659d945 62 LOG("IP: %s\r\n",this->ip);
yihui 3:9dc67659d945 63 LOG("Port: %i\r\n",this->port);
yihui 3:9dc67659d945 64 LOG("Result: %i \r\n", result);
yihui 3:9dc67659d945 65
yihui 3:9dc67659d945 66 if (result==0) {
yihui 3:9dc67659d945 67 nextMsgId = 1;
yihui 3:9dc67659d945 68 char d[9] = {0x00,0x06,'M','Q','I','s','d','p',MQTTPROTOCOLVERSION};
yihui 3:9dc67659d945 69 // Leave room in the buffer for header and variable length field
yihui 3:9dc67659d945 70 int length = 5;
yihui 3:9dc67659d945 71 unsigned int j;
yihui 3:9dc67659d945 72 for (j = 0; j<9; j++) {
yihui 3:9dc67659d945 73 buffer[length++] = d[j];
yihui 3:9dc67659d945 74 }
yihui 3:9dc67659d945 75
yihui 3:9dc67659d945 76 char v;
yihui 3:9dc67659d945 77 if (willTopic) {
yihui 3:9dc67659d945 78 v = 0x06|(willQos<<3)|(willRetain<<5);
yihui 3:9dc67659d945 79 } else {
yihui 3:9dc67659d945 80 v = 0x02;
yihui 3:9dc67659d945 81 }
yihui 3:9dc67659d945 82
yihui 3:9dc67659d945 83 if(user != NULL) {
yihui 3:9dc67659d945 84 v = v|0x80;
yihui 3:9dc67659d945 85
yihui 3:9dc67659d945 86 if(pass != NULL) {
yihui 3:9dc67659d945 87 v = v|(0x80>>1);
yihui 3:9dc67659d945 88 }
yihui 3:9dc67659d945 89 }
yihui 3:9dc67659d945 90
yihui 3:9dc67659d945 91 buffer[length++] = v;
yihui 3:9dc67659d945 92
yihui 3:9dc67659d945 93 buffer[length++] = ((MQTT_KEEPALIVE) >> 8);
yihui 3:9dc67659d945 94 buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF);
yihui 3:9dc67659d945 95 length = writeString(id,buffer,length);
yihui 3:9dc67659d945 96 if (willTopic) {
yihui 3:9dc67659d945 97 length = writeString(willTopic,buffer,length);
yihui 3:9dc67659d945 98 length = writeString(willMessage,buffer,length);
yihui 3:9dc67659d945 99 }
yihui 3:9dc67659d945 100
yihui 3:9dc67659d945 101 if(user != NULL) {
yihui 3:9dc67659d945 102 length = writeString(user,buffer,length);
yihui 3:9dc67659d945 103 if(pass != NULL) {
yihui 3:9dc67659d945 104 length = writeString(pass,buffer,length);
yihui 3:9dc67659d945 105 }
yihui 3:9dc67659d945 106 }
yihui 3:9dc67659d945 107 LOG("Before MQTT Connect ... \r\n");
yihui 3:9dc67659d945 108 write(MQTTCONNECT,buffer,length-5);
yihui 3:9dc67659d945 109
yihui 3:9dc67659d945 110 lastInActivity = lastOutActivity = millis();
yihui 3:9dc67659d945 111
yihui 3:9dc67659d945 112 int llen=128;
yihui 3:9dc67659d945 113 int len =0;
yihui 3:9dc67659d945 114
yihui 3:9dc67659d945 115 while ((len=readPacket(llen))==0) {
yihui 3:9dc67659d945 116 unsigned long t = millis();
yihui 3:9dc67659d945 117 if (t-lastInActivity > MQTT_KEEPALIVE*1000UL) {
yihui 3:9dc67659d945 118 _client.close();
yihui 3:9dc67659d945 119 return false;
yihui 3:9dc67659d945 120 }
yihui 3:9dc67659d945 121 }
yihui 3:9dc67659d945 122 LOG("after MQTT Connect ... %i\r\n", len);
yihui 3:9dc67659d945 123 if (len == 4 && buffer[3] == 0) {
yihui 3:9dc67659d945 124 lastInActivity = millis();
yihui 3:9dc67659d945 125 pingOutstanding = false;
yihui 3:9dc67659d945 126 return true;
yihui 3:9dc67659d945 127 }
yihui 3:9dc67659d945 128 }
yihui 3:9dc67659d945 129 _client.close();
yihui 3:9dc67659d945 130 }
yihui 3:9dc67659d945 131 return false;
yihui 3:9dc67659d945 132 }
yihui 3:9dc67659d945 133
yihui 3:9dc67659d945 134
yihui 3:9dc67659d945 135 int PubSubClient::readPacket(int lengthLength)
yihui 3:9dc67659d945 136 {
yihui 3:9dc67659d945 137 int len = 0;
yihui 3:9dc67659d945 138 len = _client.receive_all(buffer,lengthLength);
yihui 3:9dc67659d945 139 return len;
yihui 3:9dc67659d945 140 }
yihui 3:9dc67659d945 141
yihui 3:9dc67659d945 142 bool PubSubClient::loop()
yihui 3:9dc67659d945 143 {
yihui 3:9dc67659d945 144 if (connected()) {
yihui 3:9dc67659d945 145 unsigned long t = millis();
yihui 3:9dc67659d945 146 if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) {
yihui 3:9dc67659d945 147 if (pingOutstanding) {
yihui 3:9dc67659d945 148 _client.close();
yihui 3:9dc67659d945 149 return false;
yihui 3:9dc67659d945 150 } else {
yihui 3:9dc67659d945 151 buffer[0] = MQTTPINGREQ;
yihui 3:9dc67659d945 152 buffer[1] = 0;
yihui 3:9dc67659d945 153 _client.send(buffer,2);
yihui 3:9dc67659d945 154 lastOutActivity = t;
yihui 3:9dc67659d945 155 lastInActivity = t;
yihui 3:9dc67659d945 156 pingOutstanding = true;
yihui 3:9dc67659d945 157 }
yihui 3:9dc67659d945 158 }
yihui 3:9dc67659d945 159 int len;
yihui 3:9dc67659d945 160 int llen= 128;
yihui 3:9dc67659d945 161 if (!((len=readPacket(llen))==0)) {
yihui 3:9dc67659d945 162 if (len > 0) {
yihui 3:9dc67659d945 163 lastInActivity = t;
yihui 3:9dc67659d945 164 char type = buffer[0]&0xF0;
yihui 3:9dc67659d945 165 if (type == MQTTPUBLISH) {
yihui 3:9dc67659d945 166 if (callback) {
yihui 3:9dc67659d945 167 LOG("MQTTPUBLISH - bytes received:%i\r\n", len);
yihui 3:9dc67659d945 168 int tl = (buffer[2]<<8)+buffer[3];
yihui 3:9dc67659d945 169 LOG("t1:%i\r\n",tl);
yihui 3:9dc67659d945 170 char topic[tl+1];
yihui 3:9dc67659d945 171 for (int i=0; i<tl; i++) {
yihui 3:9dc67659d945 172 topic[i] = buffer[4+i];
yihui 3:9dc67659d945 173 }
yihui 3:9dc67659d945 174 topic[tl] = 0;
yihui 3:9dc67659d945 175 LOG("Topic:%s\r\n",topic);
yihui 3:9dc67659d945 176 // ignore msgID - only support QoS 0 subs
yihui 3:9dc67659d945 177 int t2 = len-4-tl;
yihui 3:9dc67659d945 178 LOG("t2:%i\r\n",t2);
yihui 3:9dc67659d945 179 char payload[t2+1];
yihui 3:9dc67659d945 180 for (int i=0; i<t2; i++) {
yihui 3:9dc67659d945 181 payload[i] = buffer[4+i+tl];
yihui 3:9dc67659d945 182 }
yihui 3:9dc67659d945 183 payload[t2] = 0;
yihui 3:9dc67659d945 184 LOG("Payload:%s\r\n", payload);
yihui 3:9dc67659d945 185 callback(topic,payload,t2);
yihui 3:9dc67659d945 186 }
yihui 3:9dc67659d945 187 } else if (type == MQTTPINGREQ) {
yihui 3:9dc67659d945 188 buffer[0] = MQTTPINGRESP;
yihui 3:9dc67659d945 189 buffer[1] = 0;
yihui 3:9dc67659d945 190 _client.send(buffer,2);
yihui 3:9dc67659d945 191 } else if (type == MQTTPINGRESP) {
yihui 3:9dc67659d945 192 pingOutstanding = false;
yihui 3:9dc67659d945 193 }
yihui 3:9dc67659d945 194 }
yihui 3:9dc67659d945 195 }
yihui 3:9dc67659d945 196 return true;
yihui 3:9dc67659d945 197 }
yihui 3:9dc67659d945 198 return false;
yihui 3:9dc67659d945 199 }
yihui 3:9dc67659d945 200
yihui 3:9dc67659d945 201 bool PubSubClient::publish(char* topic, char* payload)
yihui 3:9dc67659d945 202 {
yihui 3:9dc67659d945 203 return publish(topic,payload,strlen(payload),false);
yihui 3:9dc67659d945 204 }
yihui 3:9dc67659d945 205
yihui 3:9dc67659d945 206 bool PubSubClient::publish(char* topic, char* payload, unsigned int plength)
yihui 3:9dc67659d945 207 {
yihui 3:9dc67659d945 208 return publish(topic, payload, plength, false);
yihui 3:9dc67659d945 209 }
yihui 3:9dc67659d945 210
yihui 3:9dc67659d945 211 bool PubSubClient::publish(char* topic, char* payload, unsigned int plength, bool retained)
yihui 3:9dc67659d945 212 {
yihui 3:9dc67659d945 213 if (connected()) {
yihui 3:9dc67659d945 214 // Leave room in the buffer for header and variable length field
yihui 3:9dc67659d945 215 LOG("publish - topic:%s, payload:%s\r\n", topic, payload);
yihui 3:9dc67659d945 216 int length = 5;
yihui 3:9dc67659d945 217 length = writeString(topic,buffer,length);
yihui 3:9dc67659d945 218 int i;
yihui 3:9dc67659d945 219 for (i=0; i<plength; i++) {
yihui 3:9dc67659d945 220 buffer[length++] = payload[i];
yihui 3:9dc67659d945 221 }
yihui 3:9dc67659d945 222 short header = MQTTPUBLISH;
yihui 3:9dc67659d945 223 if (retained) {
yihui 3:9dc67659d945 224 header |= 1;
yihui 3:9dc67659d945 225 }
yihui 3:9dc67659d945 226 return write(header,buffer,length-5);
yihui 3:9dc67659d945 227 }
yihui 3:9dc67659d945 228 return false;
yihui 3:9dc67659d945 229 }
yihui 3:9dc67659d945 230
yihui 3:9dc67659d945 231
yihui 3:9dc67659d945 232
yihui 3:9dc67659d945 233 bool PubSubClient::write(short header, char* buf, int length)
yihui 3:9dc67659d945 234 {
yihui 3:9dc67659d945 235 short lenBuf[4];
yihui 3:9dc67659d945 236 short llen = 0;
yihui 3:9dc67659d945 237 short digit;
yihui 3:9dc67659d945 238 short pos = 0;
yihui 3:9dc67659d945 239 short rc;
yihui 3:9dc67659d945 240 short len = length;
yihui 3:9dc67659d945 241 do {
yihui 3:9dc67659d945 242 digit = len % 128;
yihui 3:9dc67659d945 243 len = len / 128;
yihui 3:9dc67659d945 244 if (len > 0) {
yihui 3:9dc67659d945 245 digit |= 0x80;
yihui 3:9dc67659d945 246 }
yihui 3:9dc67659d945 247 lenBuf[pos++] = digit;
yihui 3:9dc67659d945 248 llen++;
yihui 3:9dc67659d945 249 } while(len>0);
yihui 3:9dc67659d945 250
yihui 3:9dc67659d945 251 buf[4-llen] = header;
yihui 3:9dc67659d945 252 for (int i=0; i<llen; i++) {
yihui 3:9dc67659d945 253 buf[5-llen+i] = lenBuf[i];
yihui 3:9dc67659d945 254 }
yihui 3:9dc67659d945 255 rc = _client.send(buf+(4-llen),length+1+llen);
yihui 3:9dc67659d945 256
yihui 3:9dc67659d945 257 lastOutActivity = millis();
yihui 3:9dc67659d945 258 return (rc == 1+llen+length);
yihui 3:9dc67659d945 259 }
yihui 3:9dc67659d945 260
yihui 3:9dc67659d945 261 bool PubSubClient::subscribe(char* topic)
yihui 3:9dc67659d945 262 {
yihui 3:9dc67659d945 263 LOG("subscribe - topic:%s\r\n",topic);
yihui 3:9dc67659d945 264 if (connected()) {
yihui 3:9dc67659d945 265 // Leave room in the buffer for header and variable length field
yihui 3:9dc67659d945 266 int length = 5;
yihui 3:9dc67659d945 267 nextMsgId++;
yihui 3:9dc67659d945 268 if (nextMsgId == 0) {
yihui 3:9dc67659d945 269 nextMsgId = 1;
yihui 3:9dc67659d945 270 }
yihui 3:9dc67659d945 271 buffer[length++] = (nextMsgId >> 8);
yihui 3:9dc67659d945 272 buffer[length++] = (nextMsgId & 0xFF);
yihui 3:9dc67659d945 273 length = writeString(topic, buffer,length);
yihui 3:9dc67659d945 274 buffer[length++] = 0; // Only do QoS 0 subs
yihui 3:9dc67659d945 275 return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5);
yihui 3:9dc67659d945 276 }
yihui 3:9dc67659d945 277 return false;
yihui 3:9dc67659d945 278 }
yihui 3:9dc67659d945 279
yihui 3:9dc67659d945 280 bool PubSubClient::unsubscribe(char* topic)
yihui 3:9dc67659d945 281 {
yihui 3:9dc67659d945 282 if (connected()) {
yihui 3:9dc67659d945 283 int length = 5;
yihui 3:9dc67659d945 284 nextMsgId++;
yihui 3:9dc67659d945 285 if (nextMsgId == 0) {
yihui 3:9dc67659d945 286 nextMsgId = 1;
yihui 3:9dc67659d945 287 }
yihui 3:9dc67659d945 288 buffer[length++] = (nextMsgId >> 8);
yihui 3:9dc67659d945 289 buffer[length++] = (nextMsgId & 0xFF);
yihui 3:9dc67659d945 290 length = writeString(topic, buffer,length);
yihui 3:9dc67659d945 291 return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5);
yihui 3:9dc67659d945 292 }
yihui 3:9dc67659d945 293 return false;
yihui 3:9dc67659d945 294 }
yihui 3:9dc67659d945 295
yihui 3:9dc67659d945 296 void PubSubClient::disconnect()
yihui 3:9dc67659d945 297 {
yihui 3:9dc67659d945 298 buffer[0] = MQTTDISCONNECT;
yihui 3:9dc67659d945 299 buffer[1] = 0;
yihui 3:9dc67659d945 300 _client.send(buffer,2);
yihui 3:9dc67659d945 301 _client.close();
yihui 3:9dc67659d945 302 lastInActivity = lastOutActivity = millis();
yihui 3:9dc67659d945 303 }
yihui 3:9dc67659d945 304
yihui 3:9dc67659d945 305 int PubSubClient::writeString(char* string, char* buf, int pos)
yihui 3:9dc67659d945 306 {
yihui 3:9dc67659d945 307 char* idp = string;
yihui 3:9dc67659d945 308 int i = 0;
yihui 3:9dc67659d945 309 pos += 2;
yihui 3:9dc67659d945 310 while (*idp) {
yihui 3:9dc67659d945 311 buf[pos++] = *idp++;
yihui 3:9dc67659d945 312 i++;
yihui 3:9dc67659d945 313 }
yihui 3:9dc67659d945 314 buf[pos-i-2] = (i >> 8);
yihui 3:9dc67659d945 315 buf[pos-i-1] = (i & 0xFF);
yihui 3:9dc67659d945 316 return pos;
yihui 3:9dc67659d945 317 }
yihui 3:9dc67659d945 318
yihui 3:9dc67659d945 319
yihui 3:9dc67659d945 320 bool PubSubClient::connected()
yihui 3:9dc67659d945 321 {
yihui 3:9dc67659d945 322 bool rc;
yihui 3:9dc67659d945 323 rc = (int)_client.is_connected();
yihui 3:9dc67659d945 324 // if (!rc) _client.close();
yihui 3:9dc67659d945 325 return rc;
yihui 3:9dc67659d945 326 }