Initial port of the Arduino MQTT Client: http://knolleary.net/arduino-client-for-mqtt/

Dependents:   MQTTTest 2lemetryNXP MyTempuratureMqtt MqttClientEthInt ... more

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers PubSubClient.cpp Source File

PubSubClient.cpp

00001 /*
00002 PubSubClient.cpp - A simple client for MQTT.
00003 Nicholas O'Leary
00004 http://knolleary.net
00005 
00006 initial port for mbed 
00007 Joerg Wende
00008 https://twitter.com/joerg_wende
00009 */
00010 
00011 #include "PubSubClient.h"
00012 
00013 Timer t;
00014 //Serial pc1(USBTX, USBRX);
00015 
00016 
00017 int millis()
00018 {
00019     return t.read_ms();
00020 }
00021 
00022 PubSubClient::PubSubClient()
00023 {
00024 }
00025 
00026 PubSubClient::PubSubClient(char *ip, int port, void (*callback)(char*,char*,unsigned int))
00027 {
00028     this->callback = callback;
00029     this->ip = ip;
00030     this->port = port;
00031     t.start();
00032 }
00033 
00034 
00035 bool PubSubClient::connect(char *id)
00036 {
00037     return connect(id,NULL,NULL,0,0,0,0);
00038 }
00039 
00040 bool PubSubClient::connect(char *id, char *user, char *pass)
00041 {
00042     return connect(id,user,pass,0,0,0,0);
00043 }
00044 
00045 bool PubSubClient::connect(char *id, char* willTopic, short willQos, short willRetain, char* willMessage)
00046 {
00047     return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage);
00048 }
00049 
00050 bool PubSubClient::connect(char *id, char *user, char *pass, char* willTopic, short willQos, short willRetain, char* willMessage)
00051 {
00052     if (!connected()) {
00053         int result = 0;
00054         result = _client.connect(this->ip, this->port);
00055         _client.set_blocking(false, 1);
00056         //pc1.printf("IP: %s\r\n",this->ip);
00057         //pc1.printf("Port: %i\r\n",this->port);
00058         //pc1.printf("Result: %i \r\n", result);
00059 
00060         if (result==0) {
00061             nextMsgId = 1;
00062             char d[9] = {0x00,0x06,'M','Q','I','s','d','p',MQTTPROTOCOLVERSION};
00063             // Leave room in the buffer for header and variable length field
00064             int length = 5;
00065             unsigned int j;
00066             for (j = 0; j<9; j++) {
00067                 buffer[length++] = d[j];
00068             }
00069 
00070             char v;
00071             if (willTopic) {
00072                 v = 0x06|(willQos<<3)|(willRetain<<5);
00073             } else {
00074                 v = 0x02;
00075             }
00076 
00077             if(user != NULL) {
00078                 v = v|0x80;
00079 
00080                 if(pass != NULL) {
00081                     v = v|(0x80>>1);
00082                 }
00083             }
00084 
00085             buffer[length++] = v;
00086 
00087             buffer[length++] = ((MQTT_KEEPALIVE) >> 8);
00088             buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF);
00089             length = writeString(id,buffer,length);
00090             if (willTopic) {
00091                 length = writeString(willTopic,buffer,length);
00092                 length = writeString(willMessage,buffer,length);
00093             }
00094 
00095             if(user != NULL) {
00096                 length = writeString(user,buffer,length);
00097                 if(pass != NULL) {
00098                     length = writeString(pass,buffer,length);
00099                 }
00100             }
00101             //pc1.printf("Before MQTT Connect ... \r\n");
00102             write(MQTTCONNECT,buffer,length-5);
00103 
00104             lastInActivity = lastOutActivity = millis();
00105 
00106             int llen=128;
00107             int len =0;
00108             
00109             while ((len=readPacket(llen))==0) {
00110                 unsigned long t = millis();
00111                 if (t-lastInActivity > MQTT_KEEPALIVE*1000UL) {
00112                     _client.close(true);
00113                     return false;
00114                 }
00115             }
00116             //pc1.printf("after MQTT Connect ... %i\r\n",len);
00117             if (len == 4 && buffer[3] == 0) {
00118                 lastInActivity = millis();
00119                 pingOutstanding = false;
00120                 return true;
00121             }
00122         }
00123         _client.close(true);
00124     }
00125     return false;
00126 }
00127 
00128 
00129 int PubSubClient::readPacket(int lengthLength)
00130 {
00131     int len = 0;
00132     len = _client.receive_all(buffer,lengthLength);
00133     return len;
00134 }
00135 
00136 bool PubSubClient::loop()
00137 {
00138     if (connected()) {
00139         unsigned long t = millis();
00140         if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) {
00141             if (pingOutstanding) {
00142                 _client.close(true);
00143                 return false;
00144             } else {
00145                 buffer[0] = MQTTPINGREQ;
00146                 buffer[1] = 0;
00147                 _client.send(buffer,2);
00148                 lastOutActivity = t;
00149                 lastInActivity = t;
00150                 pingOutstanding = true;
00151             }
00152         }
00153         int len;
00154         int llen= 128;
00155         if (!((len=readPacket(llen))==0)) {
00156             if (len > 0) {
00157                 lastInActivity = t;
00158                 char type = buffer[0]&0xF0;
00159                 if (type == MQTTPUBLISH) {
00160                     if (callback) {
00161                         //pc1.printf("MQTTPUBLISH received ... %i\r\n",len);
00162                         int tl = (buffer[2]<<8)+buffer[3];
00163                         //pc1.printf("t1 ... %i\r\n",tl);
00164                         char topic[tl+1];
00165                         for (int i=0; i<tl; i++) {
00166                             topic[i] = buffer[4+i];
00167                         }
00168                         topic[tl] = 0;
00169                         //pc1.printf("MQTTPUBLISH Topic ... %s\r\n",topic);
00170                         // ignore msgID - only support QoS 0 subs
00171                         int t2 = len-4-tl;
00172                         //pc1.printf("t2 ... %i\r\n",t2);
00173                         char payload[t2+1];
00174                         for (int i=0; i<t2; i++) {
00175                             payload[i] = buffer[4+i+tl];
00176                         }
00177                         payload[t2] = 0;
00178                         //pc1.printf("MQTTPUBLISH Payload ... %s\r\n",payload);
00179                         callback(topic,payload,t2);
00180                     }
00181                 } else if (type == MQTTPINGREQ) {
00182                     buffer[0] = MQTTPINGRESP;
00183                     buffer[1] = 0;
00184                     _client.send(buffer,2);
00185                 } else if (type == MQTTPINGRESP) {
00186                     pingOutstanding = false;
00187                 }
00188             }
00189         }
00190         return true;
00191     }
00192     return false;
00193 }
00194 
00195 bool PubSubClient::publish(char* topic, char* payload)
00196 {
00197     return publish(topic,payload,strlen(payload),false);
00198 }
00199 
00200 bool PubSubClient::publish(char* topic, char* payload, unsigned int plength)
00201 {
00202     return publish(topic, payload, plength, false);
00203 }
00204 
00205 bool PubSubClient::publish(char* topic, char* payload, unsigned int plength, bool retained)
00206 {
00207     if (connected()) {
00208         // Leave room in the buffer for header and variable length field
00209         //pc1.printf("in publish ... %s\r\n",topic);
00210         //pc1.printf("in publish ... %s\r\n",payload);
00211         int length = 5;
00212         length = writeString(topic,buffer,length);
00213         int i;
00214         for (i=0; i<plength; i++) {
00215             buffer[length++] = payload[i];
00216         }
00217         short header = MQTTPUBLISH;
00218         if (retained) {
00219             header |= 1;
00220         }
00221         return write(header,buffer,length-5);
00222     }
00223     return false;
00224 }
00225 
00226 
00227 
00228 bool PubSubClient::write(short header, char* buf, int length)
00229 {
00230     short lenBuf[4];
00231     short llen = 0;
00232     short digit;
00233     short pos = 0;
00234     short rc;
00235     short len = length;
00236     //pc1.printf("in write ... %d\r\n",length);
00237     //pc1.printf("in write ... %s\r\n",buf);  
00238     do {
00239         digit = len % 128;
00240         len = len / 128;
00241         if (len > 0) {
00242             digit |= 0x80;
00243         }
00244         lenBuf[pos++] = digit;
00245         llen++;
00246     } while(len>0);
00247 
00248     buf[4-llen] = header;
00249     for (int i=0; i<llen; i++) {
00250         buf[5-llen+i] = lenBuf[i];
00251     }
00252     rc = _client.send(buf+(4-llen),length+1+llen);
00253 
00254     lastOutActivity = millis();
00255     return (rc == 1+llen+length);
00256 }
00257 
00258 bool PubSubClient::subscribe(char* topic)
00259 {
00260     //pc1.printf("in subscribe ... %s\r\n",topic);
00261     if (connected()) {
00262         // Leave room in the buffer for header and variable length field
00263         int length = 5;
00264         nextMsgId++;
00265         if (nextMsgId == 0) {
00266             nextMsgId = 1;
00267         }
00268         buffer[length++] = (nextMsgId >> 8);
00269         buffer[length++] = (nextMsgId & 0xFF);
00270         length = writeString(topic, buffer,length);
00271         buffer[length++] = 0; // Only do QoS 0 subs
00272         return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5);
00273     }
00274     return false;
00275 }
00276 
00277 bool PubSubClient::unsubscribe(char* topic)
00278 {
00279     if (connected()) {
00280         int length = 5;
00281         nextMsgId++;
00282         if (nextMsgId == 0) {
00283             nextMsgId = 1;
00284         }
00285         buffer[length++] = (nextMsgId >> 8);
00286         buffer[length++] = (nextMsgId & 0xFF);
00287         length = writeString(topic, buffer,length);
00288         return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5);
00289     }
00290     return false;
00291 }
00292 
00293 void PubSubClient::disconnect()
00294 {
00295     buffer[0] = MQTTDISCONNECT;
00296     buffer[1] = 0;
00297     _client.send(buffer,2);
00298     _client.close(true);
00299     lastInActivity = lastOutActivity = millis();
00300 }
00301 
00302 int PubSubClient::writeString(char* string, char* buf, int pos)
00303 {
00304     char* idp = string;
00305     int i = 0;
00306     pos += 2;
00307     while (*idp) {
00308         buf[pos++] = *idp++;
00309         i++;
00310     }
00311     buf[pos-i-2] = (i >> 8);
00312     buf[pos-i-1] = (i & 0xFF);
00313     return pos;
00314 }
00315 
00316 
00317 bool PubSubClient::connected()
00318 {
00319     bool rc;
00320     rc = (int)_client.is_connected();
00321     if (!rc) _client.close(true);
00322     return rc;
00323 }