Arch GPRS MQTT demo

Dependencies:   GPRSInterface USBDevice mbed

Fork of Seeed_HTTPClient_GPRSInterface_HelloWorld by Seeed

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers PubSubClient.cpp Source File

PubSubClient.cpp

00001 /*
00002 PubSubClient.cpp - A simple client for MQTT.
00003 Nicholas O'Leary
00004 http://knolleary.net
00005 
00006 initial port for mbed 
00007 Joerg Wende
00008 https://twitter.com/joerg_wende
00009 */
00010 
00011 
00012 
00013 #include "PubSubClient.h"
00014 
00015 #include "USBSerial.h"
00016 
00017 extern USBSerial pc;
00018 
00019 #define LOG(args...)    pc.printf(args)
00020 
00021 Timer t;
00022 
00023 int millis()
00024 {
00025     return t.read_ms();
00026 }
00027 
00028 PubSubClient::PubSubClient()
00029 {
00030 }
00031 
00032 PubSubClient::PubSubClient(char *ip, int port, void (*callback)(char*,char*,unsigned int))
00033 {
00034     this->callback = callback;
00035     this->ip = ip;
00036     this->port = port;
00037     t.start();
00038 }
00039 
00040 
00041 bool PubSubClient::connect(char *id)
00042 {
00043     return connect(id,NULL,NULL,0,0,0,0);
00044 }
00045 
00046 bool PubSubClient::connect(char *id, char *user, char *pass)
00047 {
00048     return connect(id,user,pass,0,0,0,0);
00049 }
00050 
00051 bool PubSubClient::connect(char *id, char* willTopic, short willQos, short willRetain, char* willMessage)
00052 {
00053     return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage);
00054 }
00055 
00056 bool PubSubClient::connect(char *id, char *user, char *pass, char* willTopic, short willQos, short willRetain, char* willMessage)
00057 {
00058     if (!connected()) {
00059         int result = 0;
00060         result = _client.connect(this->ip, this->port);
00061         _client.set_blocking(false, 1);
00062         LOG("IP: %s\r\n",this->ip);
00063         LOG("Port: %i\r\n",this->port);
00064         LOG("Result: %i \r\n", result);
00065 
00066         if (result==0) {
00067             nextMsgId = 1;
00068             char d[9] = {0x00,0x06,'M','Q','I','s','d','p',MQTTPROTOCOLVERSION};
00069             // Leave room in the buffer for header and variable length field
00070             int length = 5;
00071             unsigned int j;
00072             for (j = 0; j<9; j++) {
00073                 buffer[length++] = d[j];
00074             }
00075 
00076             char v;
00077             if (willTopic) {
00078                 v = 0x06|(willQos<<3)|(willRetain<<5);
00079             } else {
00080                 v = 0x02;
00081             }
00082 
00083             if(user != NULL) {
00084                 v = v|0x80;
00085 
00086                 if(pass != NULL) {
00087                     v = v|(0x80>>1);
00088                 }
00089             }
00090 
00091             buffer[length++] = v;
00092 
00093             buffer[length++] = ((MQTT_KEEPALIVE) >> 8);
00094             buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF);
00095             length = writeString(id,buffer,length);
00096             if (willTopic) {
00097                 length = writeString(willTopic,buffer,length);
00098                 length = writeString(willMessage,buffer,length);
00099             }
00100 
00101             if(user != NULL) {
00102                 length = writeString(user,buffer,length);
00103                 if(pass != NULL) {
00104                     length = writeString(pass,buffer,length);
00105                 }
00106             }
00107             LOG("Before MQTT Connect ... \r\n");
00108             write(MQTTCONNECT,buffer,length-5);
00109 
00110             lastInActivity = lastOutActivity = millis();
00111 
00112             int llen=128;
00113             int len =0;
00114             
00115             while ((len=readPacket(llen))==0) {
00116                 unsigned long t = millis();
00117                 if (t-lastInActivity > MQTT_KEEPALIVE*1000UL) {
00118                     _client.close();
00119                     return false;
00120                 }
00121             }
00122             LOG("after MQTT Connect ... %i\r\n", len);
00123             if (len == 4 && buffer[3] == 0) {
00124                 lastInActivity = millis();
00125                 pingOutstanding = false;
00126                 return true;
00127             }
00128         }
00129         _client.close();
00130     }
00131     return false;
00132 }
00133 
00134 
00135 int PubSubClient::readPacket(int lengthLength)
00136 {
00137     int len = 0;
00138     len = _client.receive_all(buffer,lengthLength);
00139     return len;
00140 }
00141 
00142 bool PubSubClient::loop()
00143 {
00144     if (connected()) {
00145         unsigned long t = millis();
00146         if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) {
00147             if (pingOutstanding) {
00148                 _client.close();
00149                 return false;
00150             } else {
00151                 buffer[0] = MQTTPINGREQ;
00152                 buffer[1] = 0;
00153                 _client.send(buffer,2);
00154                 lastOutActivity = t;
00155                 lastInActivity = t;
00156                 pingOutstanding = true;
00157             }
00158         }
00159         int len;
00160         int llen= 128;
00161         if (!((len=readPacket(llen))==0)) {
00162             if (len > 0) {
00163                 lastInActivity = t;
00164                 char type = buffer[0]&0xF0;
00165                 if (type == MQTTPUBLISH) {
00166                     if (callback) {
00167                         LOG("MQTTPUBLISH - bytes received:%i\r\n", len);
00168                         int tl = (buffer[2]<<8)+buffer[3];
00169                         LOG("t1:%i\r\n",tl);
00170                         char topic[tl+1];
00171                         for (int i=0; i<tl; i++) {
00172                             topic[i] = buffer[4+i];
00173                         }
00174                         topic[tl] = 0;
00175                         LOG("Topic:%s\r\n",topic);
00176                         // ignore msgID - only support QoS 0 subs
00177                         int t2 = len-4-tl;
00178                         LOG("t2:%i\r\n",t2);
00179                         char payload[t2+1];
00180                         for (int i=0; i<t2; i++) {
00181                             payload[i] = buffer[4+i+tl];
00182                         }
00183                         payload[t2] = 0;
00184                         LOG("Payload:%s\r\n", payload);
00185                         callback(topic,payload,t2);
00186                     }
00187                 } else if (type == MQTTPINGREQ) {
00188                     buffer[0] = MQTTPINGRESP;
00189                     buffer[1] = 0;
00190                     _client.send(buffer,2);
00191                 } else if (type == MQTTPINGRESP) {
00192                     pingOutstanding = false;
00193                 }
00194             }
00195         }
00196         return true;
00197     }
00198     return false;
00199 }
00200 
00201 bool PubSubClient::publish(char* topic, char* payload)
00202 {
00203     return publish(topic,payload,strlen(payload),false);
00204 }
00205 
00206 bool PubSubClient::publish(char* topic, char* payload, unsigned int plength)
00207 {
00208     return publish(topic, payload, plength, false);
00209 }
00210 
00211 bool PubSubClient::publish(char* topic, char* payload, unsigned int plength, bool retained)
00212 {
00213     if (connected()) {
00214         // Leave room in the buffer for header and variable length field
00215         LOG("publish - topic:%s, payload:%s\r\n", topic, payload);
00216         int length = 5;
00217         length = writeString(topic,buffer,length);
00218         int i;
00219         for (i=0; i<plength; i++) {
00220             buffer[length++] = payload[i];
00221         }
00222         short header = MQTTPUBLISH;
00223         if (retained) {
00224             header |= 1;
00225         }
00226         return write(header,buffer,length-5);
00227     }
00228     return false;
00229 }
00230 
00231 
00232 
00233 bool PubSubClient::write(short header, char* buf, int length)
00234 {
00235     short lenBuf[4];
00236     short llen = 0;
00237     short digit;
00238     short pos = 0;
00239     short rc;
00240     short len = length;
00241     do {
00242         digit = len % 128;
00243         len = len / 128;
00244         if (len > 0) {
00245             digit |= 0x80;
00246         }
00247         lenBuf[pos++] = digit;
00248         llen++;
00249     } while(len>0);
00250 
00251     buf[4-llen] = header;
00252     for (int i=0; i<llen; i++) {
00253         buf[5-llen+i] = lenBuf[i];
00254     }
00255     rc = _client.send(buf+(4-llen),length+1+llen);
00256 
00257     lastOutActivity = millis();
00258     return (rc == 1+llen+length);
00259 }
00260 
00261 bool PubSubClient::subscribe(char* topic)
00262 {
00263     LOG("subscribe - topic:%s\r\n",topic);
00264     if (connected()) {
00265         // Leave room in the buffer for header and variable length field
00266         int length = 5;
00267         nextMsgId++;
00268         if (nextMsgId == 0) {
00269             nextMsgId = 1;
00270         }
00271         buffer[length++] = (nextMsgId >> 8);
00272         buffer[length++] = (nextMsgId & 0xFF);
00273         length = writeString(topic, buffer,length);
00274         buffer[length++] = 0; // Only do QoS 0 subs
00275         return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5);
00276     }
00277     return false;
00278 }
00279 
00280 bool PubSubClient::unsubscribe(char* topic)
00281 {
00282     if (connected()) {
00283         int length = 5;
00284         nextMsgId++;
00285         if (nextMsgId == 0) {
00286             nextMsgId = 1;
00287         }
00288         buffer[length++] = (nextMsgId >> 8);
00289         buffer[length++] = (nextMsgId & 0xFF);
00290         length = writeString(topic, buffer,length);
00291         return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5);
00292     }
00293     return false;
00294 }
00295 
00296 void PubSubClient::disconnect()
00297 {
00298     buffer[0] = MQTTDISCONNECT;
00299     buffer[1] = 0;
00300     _client.send(buffer,2);
00301     _client.close();
00302     lastInActivity = lastOutActivity = millis();
00303 }
00304 
00305 int PubSubClient::writeString(char* string, char* buf, int pos)
00306 {
00307     char* idp = string;
00308     int i = 0;
00309     pos += 2;
00310     while (*idp) {
00311         buf[pos++] = *idp++;
00312         i++;
00313     }
00314     buf[pos-i-2] = (i >> 8);
00315     buf[pos-i-1] = (i & 0xFF);
00316     return pos;
00317 }
00318 
00319 
00320 bool PubSubClient::connected()
00321 {
00322     bool rc;
00323     rc = (int)_client.is_connected();
00324 //     if (!rc) _client.close();
00325     return rc;
00326 }