Yalgaar mBed SDK for real-time messaging

Dependents:   YalgaarSDK

Fork of MQTT by Joerg Wende

Committer:
nileshvora
Date:
Tue Mar 14 11:21:33 2017 +0000
Revision:
5:7dde43328c25
Parent:
4:013cbaaeda50
Yalgaar SDK V1.0.0

Who changed what in which revision?

UserRevisionLine numberNew contents of line
jwende 0:ca855d29545b 1 /*
jwende 0:ca855d29545b 2 PubSubClient.cpp - A simple client for MQTT.
jwende 0:ca855d29545b 3 Nicholas O'Leary
jwende 0:ca855d29545b 4 http://knolleary.net
jwende 0:ca855d29545b 5
nileshvora 1:c233cee7c15b 6 initial port for mbed
jwende 0:ca855d29545b 7 Joerg Wende
jwende 0:ca855d29545b 8 https://twitter.com/joerg_wende
jwende 0:ca855d29545b 9 */
jwende 0:ca855d29545b 10
jwende 0:ca855d29545b 11 #include "PubSubClient.h"
jwende 0:ca855d29545b 12
nileshvora 1:c233cee7c15b 13
jwende 0:ca855d29545b 14
jwende 0:ca855d29545b 15
nileshvora 3:511121ab8fef 16 Serial sls(USBTX, USBRX, 115200);
nileshvora 1:c233cee7c15b 17
nileshvora 1:c233cee7c15b 18 int pubsub_error_code;
nileshvora 1:c233cee7c15b 19
nileshvora 1:c233cee7c15b 20
nileshvora 1:c233cee7c15b 21 int PubSubClient::millis()
jwende 0:ca855d29545b 22 {
jwende 0:ca855d29545b 23 return t.read_ms();
jwende 0:ca855d29545b 24 }
jwende 0:ca855d29545b 25
jwende 0:ca855d29545b 26 PubSubClient::PubSubClient()
jwende 0:ca855d29545b 27 {
nileshvora 4:013cbaaeda50 28 this->_state = MQTT_DISCONNECTED;
nileshvora 4:013cbaaeda50 29 this->ip = "api.yalgaar.io";
nileshvora 4:013cbaaeda50 30 this->port = 1883;
nileshvora 4:013cbaaeda50 31 this->t.start();
jwende 0:ca855d29545b 32 }
jwende 0:ca855d29545b 33
nileshvora 1:c233cee7c15b 34 PubSubClient::PubSubClient(char *ip, int port)
jwende 0:ca855d29545b 35 {
nileshvora 1:c233cee7c15b 36 this->_state = MQTT_DISCONNECTED;
jwende 0:ca855d29545b 37 this->ip = ip;
jwende 0:ca855d29545b 38 this->port = port;
nileshvora 1:c233cee7c15b 39 this->t.start();
jwende 0:ca855d29545b 40 }
jwende 0:ca855d29545b 41
jwende 0:ca855d29545b 42
jwende 0:ca855d29545b 43 bool PubSubClient::connect(char *id)
jwende 0:ca855d29545b 44 {
jwende 0:ca855d29545b 45 return connect(id,NULL,NULL,0,0,0,0);
jwende 0:ca855d29545b 46 }
jwende 0:ca855d29545b 47
jwende 0:ca855d29545b 48 bool PubSubClient::connect(char *id, char *user, char *pass)
jwende 0:ca855d29545b 49 {
jwende 0:ca855d29545b 50 return connect(id,user,pass,0,0,0,0);
jwende 0:ca855d29545b 51 }
jwende 0:ca855d29545b 52
jwende 0:ca855d29545b 53 bool PubSubClient::connect(char *id, char* willTopic, short willQos, short willRetain, char* willMessage)
jwende 0:ca855d29545b 54 {
jwende 0:ca855d29545b 55 return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage);
jwende 0:ca855d29545b 56 }
jwende 0:ca855d29545b 57
jwende 0:ca855d29545b 58 bool PubSubClient::connect(char *id, char *user, char *pass, char* willTopic, short willQos, short willRetain, char* willMessage)
jwende 0:ca855d29545b 59 {
jwende 0:ca855d29545b 60 if (!connected()) {
jwende 0:ca855d29545b 61 int result = 0;
jwende 0:ca855d29545b 62 result = _client.connect(this->ip, this->port);
jwende 0:ca855d29545b 63 _client.set_blocking(false, 1);
jwende 0:ca855d29545b 64 //pc1.printf("IP: %s\r\n",this->ip);
jwende 0:ca855d29545b 65 //pc1.printf("Port: %i\r\n",this->port);
jwende 0:ca855d29545b 66 //pc1.printf("Result: %i \r\n", result);
jwende 0:ca855d29545b 67
jwende 0:ca855d29545b 68 if (result==0) {
jwende 0:ca855d29545b 69 nextMsgId = 1;
jwende 0:ca855d29545b 70 char d[9] = {0x00,0x06,'M','Q','I','s','d','p',MQTTPROTOCOLVERSION};
jwende 0:ca855d29545b 71 // Leave room in the buffer for header and variable length field
jwende 0:ca855d29545b 72 int length = 5;
jwende 0:ca855d29545b 73 unsigned int j;
jwende 0:ca855d29545b 74 for (j = 0; j<9; j++) {
jwende 0:ca855d29545b 75 buffer[length++] = d[j];
jwende 0:ca855d29545b 76 }
jwende 0:ca855d29545b 77
jwende 0:ca855d29545b 78 char v;
jwende 0:ca855d29545b 79 if (willTopic) {
jwende 0:ca855d29545b 80 v = 0x06|(willQos<<3)|(willRetain<<5);
jwende 0:ca855d29545b 81 } else {
jwende 0:ca855d29545b 82 v = 0x02;
jwende 0:ca855d29545b 83 }
jwende 0:ca855d29545b 84
jwende 0:ca855d29545b 85 if(user != NULL) {
jwende 0:ca855d29545b 86 v = v|0x80;
jwende 0:ca855d29545b 87
jwende 0:ca855d29545b 88 if(pass != NULL) {
jwende 0:ca855d29545b 89 v = v|(0x80>>1);
jwende 0:ca855d29545b 90 }
jwende 0:ca855d29545b 91 }
jwende 0:ca855d29545b 92
jwende 0:ca855d29545b 93 buffer[length++] = v;
jwende 0:ca855d29545b 94
jwende 0:ca855d29545b 95 buffer[length++] = ((MQTT_KEEPALIVE) >> 8);
jwende 0:ca855d29545b 96 buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF);
jwende 0:ca855d29545b 97 length = writeString(id,buffer,length);
jwende 0:ca855d29545b 98 if (willTopic) {
jwende 0:ca855d29545b 99 length = writeString(willTopic,buffer,length);
jwende 0:ca855d29545b 100 length = writeString(willMessage,buffer,length);
jwende 0:ca855d29545b 101 }
jwende 0:ca855d29545b 102
jwende 0:ca855d29545b 103 if(user != NULL) {
jwende 0:ca855d29545b 104 length = writeString(user,buffer,length);
jwende 0:ca855d29545b 105 if(pass != NULL) {
jwende 0:ca855d29545b 106 length = writeString(pass,buffer,length);
jwende 0:ca855d29545b 107 }
jwende 0:ca855d29545b 108 }
jwende 0:ca855d29545b 109 //pc1.printf("Before MQTT Connect ... \r\n");
jwende 0:ca855d29545b 110 write(MQTTCONNECT,buffer,length-5);
jwende 0:ca855d29545b 111
jwende 0:ca855d29545b 112 lastInActivity = lastOutActivity = millis();
jwende 0:ca855d29545b 113
jwende 0:ca855d29545b 114 int llen=128;
jwende 0:ca855d29545b 115 int len =0;
nileshvora 1:c233cee7c15b 116
jwende 0:ca855d29545b 117 while ((len=readPacket(llen))==0) {
jwende 0:ca855d29545b 118 unsigned long t = millis();
jwende 0:ca855d29545b 119 if (t-lastInActivity > MQTT_KEEPALIVE*1000UL) {
nileshvora 1:c233cee7c15b 120 _state = MQTT_CONNECTION_TIMEOUT;
jwende 0:ca855d29545b 121 _client.close(true);
jwende 0:ca855d29545b 122 return false;
jwende 0:ca855d29545b 123 }
jwende 0:ca855d29545b 124 }
jwende 0:ca855d29545b 125 //pc1.printf("after MQTT Connect ... %i\r\n",len);
nileshvora 1:c233cee7c15b 126 if (len == 4) {
nileshvora 1:c233cee7c15b 127 if (buffer[3] == 0) {
nileshvora 1:c233cee7c15b 128 lastInActivity = millis();
nileshvora 1:c233cee7c15b 129 pingOutstanding = false;
nileshvora 1:c233cee7c15b 130 _state = MQTT_CONNECTED;
nileshvora 1:c233cee7c15b 131 return true;
nileshvora 1:c233cee7c15b 132 } else {
nileshvora 1:c233cee7c15b 133 _state = buffer[3];
nileshvora 1:c233cee7c15b 134 }
jwende 0:ca855d29545b 135 }
jwende 0:ca855d29545b 136 }
jwende 0:ca855d29545b 137 _client.close(true);
jwende 0:ca855d29545b 138 }
jwende 0:ca855d29545b 139 return false;
jwende 0:ca855d29545b 140 }
jwende 0:ca855d29545b 141
jwende 0:ca855d29545b 142
jwende 0:ca855d29545b 143 int PubSubClient::readPacket(int lengthLength)
jwende 0:ca855d29545b 144 {
jwende 0:ca855d29545b 145 int len = 0;
jwende 0:ca855d29545b 146 len = _client.receive_all(buffer,lengthLength);
jwende 0:ca855d29545b 147 return len;
jwende 0:ca855d29545b 148 }
jwende 0:ca855d29545b 149
jwende 0:ca855d29545b 150 bool PubSubClient::loop()
jwende 0:ca855d29545b 151 {
jwende 0:ca855d29545b 152 if (connected()) {
jwende 0:ca855d29545b 153 unsigned long t = millis();
jwende 0:ca855d29545b 154 if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) {
jwende 0:ca855d29545b 155 if (pingOutstanding) {
nileshvora 1:c233cee7c15b 156 this->_state = MQTT_CONNECTION_TIMEOUT;
jwende 0:ca855d29545b 157 _client.close(true);
jwende 0:ca855d29545b 158 return false;
jwende 0:ca855d29545b 159 } else {
jwende 0:ca855d29545b 160 buffer[0] = MQTTPINGREQ;
jwende 0:ca855d29545b 161 buffer[1] = 0;
jwende 0:ca855d29545b 162 _client.send(buffer,2);
jwende 0:ca855d29545b 163 lastOutActivity = t;
jwende 0:ca855d29545b 164 lastInActivity = t;
jwende 0:ca855d29545b 165 pingOutstanding = true;
jwende 0:ca855d29545b 166 }
jwende 0:ca855d29545b 167 }
jwende 0:ca855d29545b 168 int len;
jwende 0:ca855d29545b 169 int llen= 128;
jwende 0:ca855d29545b 170 if (!((len=readPacket(llen))==0)) {
jwende 0:ca855d29545b 171 if (len > 0) {
jwende 0:ca855d29545b 172 lastInActivity = t;
jwende 0:ca855d29545b 173 char type = buffer[0]&0xF0;
nileshvora 1:c233cee7c15b 174 //pc1.printf("type :: %d\r\n",type);
nileshvora 1:c233cee7c15b 175 //pc1.printf("buffer[%d] :: %d\r\n",len-1,buffer[len-1]);
jwende 0:ca855d29545b 176 if (type == MQTTPUBLISH) {
jwende 0:ca855d29545b 177 if (callback) {
jwende 0:ca855d29545b 178 //pc1.printf("MQTTPUBLISH received ... %i\r\n",len);
jwende 0:ca855d29545b 179 int tl = (buffer[2]<<8)+buffer[3];
jwende 0:ca855d29545b 180 //pc1.printf("t1 ... %i\r\n",tl);
jwende 0:ca855d29545b 181 char topic[tl+1];
jwende 0:ca855d29545b 182 for (int i=0; i<tl; i++) {
jwende 0:ca855d29545b 183 topic[i] = buffer[4+i];
jwende 0:ca855d29545b 184 }
jwende 0:ca855d29545b 185 topic[tl] = 0;
jwende 0:ca855d29545b 186 //pc1.printf("MQTTPUBLISH Topic ... %s\r\n",topic);
jwende 0:ca855d29545b 187 // ignore msgID - only support QoS 0 subs
jwende 0:ca855d29545b 188 int t2 = len-4-tl;
jwende 0:ca855d29545b 189 //pc1.printf("t2 ... %i\r\n",t2);
jwende 0:ca855d29545b 190 char payload[t2+1];
jwende 0:ca855d29545b 191 for (int i=0; i<t2; i++) {
jwende 0:ca855d29545b 192 payload[i] = buffer[4+i+tl];
jwende 0:ca855d29545b 193 }
jwende 0:ca855d29545b 194 payload[t2] = 0;
jwende 0:ca855d29545b 195 //pc1.printf("MQTTPUBLISH Payload ... %s\r\n",payload);
jwende 0:ca855d29545b 196 callback(topic,payload,t2);
jwende 0:ca855d29545b 197 }
jwende 0:ca855d29545b 198 } else if (type == MQTTPINGREQ) {
jwende 0:ca855d29545b 199 buffer[0] = MQTTPINGRESP;
jwende 0:ca855d29545b 200 buffer[1] = 0;
jwende 0:ca855d29545b 201 _client.send(buffer,2);
jwende 0:ca855d29545b 202 } else if (type == MQTTPINGRESP) {
jwende 0:ca855d29545b 203 pingOutstanding = false;
nileshvora 1:c233cee7c15b 204 } else {
nileshvora 1:c233cee7c15b 205 //for(int i=0;i<=len;i++)
nileshvora 1:c233cee7c15b 206 //pc1.printf("buffer[%d] :: %d\r\n",i,buffer[i]);
nileshvora 1:c233cee7c15b 207 if (buffer[0] == 144 && buffer[len-1] == 0) {
nileshvora 3:511121ab8fef 208 sls.printf("Subscribed\r\n");
nileshvora 1:c233cee7c15b 209 }
nileshvora 1:c233cee7c15b 210 if (true) {
nileshvora 1:c233cee7c15b 211 //pc1.printf("In loop function buffer :: %d\r\n",buffer[len-1]);
nileshvora 1:c233cee7c15b 212 pubsub_error_code = buffer[len-1];
nileshvora 1:c233cee7c15b 213 }
jwende 0:ca855d29545b 214 }
jwende 0:ca855d29545b 215 }
jwende 0:ca855d29545b 216 }
jwende 0:ca855d29545b 217 return true;
jwende 0:ca855d29545b 218 }
jwende 0:ca855d29545b 219 return false;
jwende 0:ca855d29545b 220 }
jwende 0:ca855d29545b 221
jwende 0:ca855d29545b 222 bool PubSubClient::publish(char* topic, char* payload)
jwende 0:ca855d29545b 223 {
jwende 0:ca855d29545b 224 return publish(topic,payload,strlen(payload),false);
jwende 0:ca855d29545b 225 }
jwende 0:ca855d29545b 226
jwende 0:ca855d29545b 227 bool PubSubClient::publish(char* topic, char* payload, unsigned int plength)
jwende 0:ca855d29545b 228 {
jwende 0:ca855d29545b 229 return publish(topic, payload, plength, false);
jwende 0:ca855d29545b 230 }
jwende 0:ca855d29545b 231
jwende 0:ca855d29545b 232 bool PubSubClient::publish(char* topic, char* payload, unsigned int plength, bool retained)
jwende 0:ca855d29545b 233 {
jwende 0:ca855d29545b 234 if (connected()) {
jwende 0:ca855d29545b 235 // Leave room in the buffer for header and variable length field
nileshvora 1:c233cee7c15b 236 //pc1.printf("in publish ... $$$$$$$$$$$:::%s\r\n",topic);
jwende 0:ca855d29545b 237 //pc1.printf("in publish ... %s\r\n",payload);
jwende 0:ca855d29545b 238 int length = 5;
jwende 0:ca855d29545b 239 length = writeString(topic,buffer,length);
jwende 0:ca855d29545b 240 int i;
jwende 0:ca855d29545b 241 for (i=0; i<plength; i++) {
jwende 0:ca855d29545b 242 buffer[length++] = payload[i];
jwende 0:ca855d29545b 243 }
jwende 0:ca855d29545b 244 short header = MQTTPUBLISH;
jwende 0:ca855d29545b 245 if (retained) {
jwende 0:ca855d29545b 246 header |= 1;
jwende 0:ca855d29545b 247 }
jwende 0:ca855d29545b 248 return write(header,buffer,length-5);
jwende 0:ca855d29545b 249 }
jwende 0:ca855d29545b 250 return false;
jwende 0:ca855d29545b 251 }
jwende 0:ca855d29545b 252
jwende 0:ca855d29545b 253
jwende 0:ca855d29545b 254
jwende 0:ca855d29545b 255 bool PubSubClient::write(short header, char* buf, int length)
jwende 0:ca855d29545b 256 {
jwende 0:ca855d29545b 257 short lenBuf[4];
jwende 0:ca855d29545b 258 short llen = 0;
jwende 0:ca855d29545b 259 short digit;
jwende 0:ca855d29545b 260 short pos = 0;
jwende 0:ca855d29545b 261 short rc;
jwende 0:ca855d29545b 262 short len = length;
jwende 0:ca855d29545b 263 //pc1.printf("in write ... %d\r\n",length);
nileshvora 1:c233cee7c15b 264 //pc1.printf("in write ...$$$$$$:: %s\r\n",buf);
jwende 0:ca855d29545b 265 do {
jwende 0:ca855d29545b 266 digit = len % 128;
jwende 0:ca855d29545b 267 len = len / 128;
jwende 0:ca855d29545b 268 if (len > 0) {
jwende 0:ca855d29545b 269 digit |= 0x80;
jwende 0:ca855d29545b 270 }
jwende 0:ca855d29545b 271 lenBuf[pos++] = digit;
jwende 0:ca855d29545b 272 llen++;
jwende 0:ca855d29545b 273 } while(len>0);
jwende 0:ca855d29545b 274
jwende 0:ca855d29545b 275 buf[4-llen] = header;
jwende 0:ca855d29545b 276 for (int i=0; i<llen; i++) {
jwende 0:ca855d29545b 277 buf[5-llen+i] = lenBuf[i];
jwende 0:ca855d29545b 278 }
jwende 0:ca855d29545b 279 rc = _client.send(buf+(4-llen),length+1+llen);
jwende 0:ca855d29545b 280
jwende 0:ca855d29545b 281 lastOutActivity = millis();
jwende 0:ca855d29545b 282 return (rc == 1+llen+length);
jwende 0:ca855d29545b 283 }
jwende 0:ca855d29545b 284
jwende 0:ca855d29545b 285 bool PubSubClient::subscribe(char* topic)
jwende 0:ca855d29545b 286 {
jwende 0:ca855d29545b 287 //pc1.printf("in subscribe ... %s\r\n",topic);
jwende 0:ca855d29545b 288 if (connected()) {
jwende 0:ca855d29545b 289 // Leave room in the buffer for header and variable length field
jwende 0:ca855d29545b 290 int length = 5;
jwende 0:ca855d29545b 291 nextMsgId++;
jwende 0:ca855d29545b 292 if (nextMsgId == 0) {
jwende 0:ca855d29545b 293 nextMsgId = 1;
jwende 0:ca855d29545b 294 }
jwende 0:ca855d29545b 295 buffer[length++] = (nextMsgId >> 8);
jwende 0:ca855d29545b 296 buffer[length++] = (nextMsgId & 0xFF);
jwende 0:ca855d29545b 297 length = writeString(topic, buffer,length);
jwende 0:ca855d29545b 298 buffer[length++] = 0; // Only do QoS 0 subs
jwende 0:ca855d29545b 299 return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5);
jwende 0:ca855d29545b 300 }
jwende 0:ca855d29545b 301 return false;
jwende 0:ca855d29545b 302 }
jwende 0:ca855d29545b 303
jwende 0:ca855d29545b 304 bool PubSubClient::unsubscribe(char* topic)
jwende 0:ca855d29545b 305 {
jwende 0:ca855d29545b 306 if (connected()) {
jwende 0:ca855d29545b 307 int length = 5;
jwende 0:ca855d29545b 308 nextMsgId++;
jwende 0:ca855d29545b 309 if (nextMsgId == 0) {
jwende 0:ca855d29545b 310 nextMsgId = 1;
jwende 0:ca855d29545b 311 }
jwende 0:ca855d29545b 312 buffer[length++] = (nextMsgId >> 8);
jwende 0:ca855d29545b 313 buffer[length++] = (nextMsgId & 0xFF);
jwende 0:ca855d29545b 314 length = writeString(topic, buffer,length);
jwende 0:ca855d29545b 315 return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5);
jwende 0:ca855d29545b 316 }
jwende 0:ca855d29545b 317 return false;
jwende 0:ca855d29545b 318 }
jwende 0:ca855d29545b 319
jwende 0:ca855d29545b 320 void PubSubClient::disconnect()
jwende 0:ca855d29545b 321 {
jwende 0:ca855d29545b 322 buffer[0] = MQTTDISCONNECT;
jwende 0:ca855d29545b 323 buffer[1] = 0;
jwende 0:ca855d29545b 324 _client.send(buffer,2);
nileshvora 1:c233cee7c15b 325 _state = MQTT_DISCONNECTED;
jwende 0:ca855d29545b 326 _client.close(true);
jwende 0:ca855d29545b 327 lastInActivity = lastOutActivity = millis();
jwende 0:ca855d29545b 328 }
jwende 0:ca855d29545b 329
jwende 0:ca855d29545b 330 int PubSubClient::writeString(char* string, char* buf, int pos)
jwende 0:ca855d29545b 331 {
jwende 0:ca855d29545b 332 char* idp = string;
jwende 0:ca855d29545b 333 int i = 0;
jwende 0:ca855d29545b 334 pos += 2;
jwende 0:ca855d29545b 335 while (*idp) {
jwende 0:ca855d29545b 336 buf[pos++] = *idp++;
jwende 0:ca855d29545b 337 i++;
jwende 0:ca855d29545b 338 }
jwende 0:ca855d29545b 339 buf[pos-i-2] = (i >> 8);
jwende 0:ca855d29545b 340 buf[pos-i-1] = (i & 0xFF);
jwende 0:ca855d29545b 341 return pos;
jwende 0:ca855d29545b 342 }
jwende 0:ca855d29545b 343
jwende 0:ca855d29545b 344
jwende 0:ca855d29545b 345 bool PubSubClient::connected()
jwende 0:ca855d29545b 346 {
jwende 0:ca855d29545b 347 bool rc;
jwende 0:ca855d29545b 348 rc = (int)_client.is_connected();
jwende 0:ca855d29545b 349 if (!rc) _client.close(true);
jwende 0:ca855d29545b 350 return rc;
nileshvora 1:c233cee7c15b 351 }
nileshvora 1:c233cee7c15b 352
nileshvora 1:c233cee7c15b 353 PubSubClient& PubSubClient::setCallback(MQTT_CALLBACK_SIGNATURE)
nileshvora 1:c233cee7c15b 354 {
nileshvora 1:c233cee7c15b 355 this->callback = callback;
nileshvora 1:c233cee7c15b 356 return *this;
nileshvora 1:c233cee7c15b 357 }
nileshvora 1:c233cee7c15b 358
nileshvora 1:c233cee7c15b 359 int PubSubClient::mqtt_state()
nileshvora 1:c233cee7c15b 360 {
nileshvora 1:c233cee7c15b 361 return this->_state;
jwende 0:ca855d29545b 362 }