Initial port of the Arduino MQTT Client: http://knolleary.net/arduino-client-for-mqtt/
Dependents: MQTTTest 2lemetryNXP MyTempuratureMqtt MqttClientEthInt ... more
PubSubClient.cpp
00001 /* 00002 PubSubClient.cpp - A simple client for MQTT. 00003 Nicholas O'Leary 00004 http://knolleary.net 00005 00006 initial port for mbed 00007 Joerg Wende 00008 https://twitter.com/joerg_wende 00009 */ 00010 00011 #include "PubSubClient.h" 00012 00013 Timer t; 00014 //Serial pc1(USBTX, USBRX); 00015 00016 00017 int millis() 00018 { 00019 return t.read_ms(); 00020 } 00021 00022 PubSubClient::PubSubClient() 00023 { 00024 } 00025 00026 PubSubClient::PubSubClient(char *ip, int port, void (*callback)(char*,char*,unsigned int)) 00027 { 00028 this->callback = callback; 00029 this->ip = ip; 00030 this->port = port; 00031 t.start(); 00032 } 00033 00034 00035 bool PubSubClient::connect(char *id) 00036 { 00037 return connect(id,NULL,NULL,0,0,0,0); 00038 } 00039 00040 bool PubSubClient::connect(char *id, char *user, char *pass) 00041 { 00042 return connect(id,user,pass,0,0,0,0); 00043 } 00044 00045 bool PubSubClient::connect(char *id, char* willTopic, short willQos, short willRetain, char* willMessage) 00046 { 00047 return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage); 00048 } 00049 00050 bool PubSubClient::connect(char *id, char *user, char *pass, char* willTopic, short willQos, short willRetain, char* willMessage) 00051 { 00052 if (!connected()) { 00053 int result = 0; 00054 result = _client.connect(this->ip, this->port); 00055 _client.set_blocking(false, 1); 00056 //pc1.printf("IP: %s\r\n",this->ip); 00057 //pc1.printf("Port: %i\r\n",this->port); 00058 //pc1.printf("Result: %i \r\n", result); 00059 00060 if (result==0) { 00061 nextMsgId = 1; 00062 char d[9] = {0x00,0x06,'M','Q','I','s','d','p',MQTTPROTOCOLVERSION}; 00063 // Leave room in the buffer for header and variable length field 00064 int length = 5; 00065 unsigned int j; 00066 for (j = 0; j<9; j++) { 00067 buffer[length++] = d[j]; 00068 } 00069 00070 char v; 00071 if (willTopic) { 00072 v = 0x06|(willQos<<3)|(willRetain<<5); 00073 } else { 00074 v = 0x02; 00075 } 00076 00077 if(user != NULL) { 00078 v = v|0x80; 00079 00080 if(pass != NULL) { 00081 v = v|(0x80>>1); 00082 } 00083 } 00084 00085 buffer[length++] = v; 00086 00087 buffer[length++] = ((MQTT_KEEPALIVE) >> 8); 00088 buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF); 00089 length = writeString(id,buffer,length); 00090 if (willTopic) { 00091 length = writeString(willTopic,buffer,length); 00092 length = writeString(willMessage,buffer,length); 00093 } 00094 00095 if(user != NULL) { 00096 length = writeString(user,buffer,length); 00097 if(pass != NULL) { 00098 length = writeString(pass,buffer,length); 00099 } 00100 } 00101 //pc1.printf("Before MQTT Connect ... \r\n"); 00102 write(MQTTCONNECT,buffer,length-5); 00103 00104 lastInActivity = lastOutActivity = millis(); 00105 00106 int llen=128; 00107 int len =0; 00108 00109 while ((len=readPacket(llen))==0) { 00110 unsigned long t = millis(); 00111 if (t-lastInActivity > MQTT_KEEPALIVE*1000UL) { 00112 _client.close(true); 00113 return false; 00114 } 00115 } 00116 //pc1.printf("after MQTT Connect ... %i\r\n",len); 00117 if (len == 4 && buffer[3] == 0) { 00118 lastInActivity = millis(); 00119 pingOutstanding = false; 00120 return true; 00121 } 00122 } 00123 _client.close(true); 00124 } 00125 return false; 00126 } 00127 00128 00129 int PubSubClient::readPacket(int lengthLength) 00130 { 00131 int len = 0; 00132 len = _client.receive_all(buffer,lengthLength); 00133 return len; 00134 } 00135 00136 bool PubSubClient::loop() 00137 { 00138 if (connected()) { 00139 unsigned long t = millis(); 00140 if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) { 00141 if (pingOutstanding) { 00142 _client.close(true); 00143 return false; 00144 } else { 00145 buffer[0] = MQTTPINGREQ; 00146 buffer[1] = 0; 00147 _client.send(buffer,2); 00148 lastOutActivity = t; 00149 lastInActivity = t; 00150 pingOutstanding = true; 00151 } 00152 } 00153 int len; 00154 int llen= 128; 00155 if (!((len=readPacket(llen))==0)) { 00156 if (len > 0) { 00157 lastInActivity = t; 00158 char type = buffer[0]&0xF0; 00159 if (type == MQTTPUBLISH) { 00160 if (callback) { 00161 //pc1.printf("MQTTPUBLISH received ... %i\r\n",len); 00162 int tl = (buffer[2]<<8)+buffer[3]; 00163 //pc1.printf("t1 ... %i\r\n",tl); 00164 char topic[tl+1]; 00165 for (int i=0; i<tl; i++) { 00166 topic[i] = buffer[4+i]; 00167 } 00168 topic[tl] = 0; 00169 //pc1.printf("MQTTPUBLISH Topic ... %s\r\n",topic); 00170 // ignore msgID - only support QoS 0 subs 00171 int t2 = len-4-tl; 00172 //pc1.printf("t2 ... %i\r\n",t2); 00173 char payload[t2+1]; 00174 for (int i=0; i<t2; i++) { 00175 payload[i] = buffer[4+i+tl]; 00176 } 00177 payload[t2] = 0; 00178 //pc1.printf("MQTTPUBLISH Payload ... %s\r\n",payload); 00179 callback(topic,payload,t2); 00180 } 00181 } else if (type == MQTTPINGREQ) { 00182 buffer[0] = MQTTPINGRESP; 00183 buffer[1] = 0; 00184 _client.send(buffer,2); 00185 } else if (type == MQTTPINGRESP) { 00186 pingOutstanding = false; 00187 } 00188 } 00189 } 00190 return true; 00191 } 00192 return false; 00193 } 00194 00195 bool PubSubClient::publish(char* topic, char* payload) 00196 { 00197 return publish(topic,payload,strlen(payload),false); 00198 } 00199 00200 bool PubSubClient::publish(char* topic, char* payload, unsigned int plength) 00201 { 00202 return publish(topic, payload, plength, false); 00203 } 00204 00205 bool PubSubClient::publish(char* topic, char* payload, unsigned int plength, bool retained) 00206 { 00207 if (connected()) { 00208 // Leave room in the buffer for header and variable length field 00209 //pc1.printf("in publish ... %s\r\n",topic); 00210 //pc1.printf("in publish ... %s\r\n",payload); 00211 int length = 5; 00212 length = writeString(topic,buffer,length); 00213 int i; 00214 for (i=0; i<plength; i++) { 00215 buffer[length++] = payload[i]; 00216 } 00217 short header = MQTTPUBLISH; 00218 if (retained) { 00219 header |= 1; 00220 } 00221 return write(header,buffer,length-5); 00222 } 00223 return false; 00224 } 00225 00226 00227 00228 bool PubSubClient::write(short header, char* buf, int length) 00229 { 00230 short lenBuf[4]; 00231 short llen = 0; 00232 short digit; 00233 short pos = 0; 00234 short rc; 00235 short len = length; 00236 //pc1.printf("in write ... %d\r\n",length); 00237 //pc1.printf("in write ... %s\r\n",buf); 00238 do { 00239 digit = len % 128; 00240 len = len / 128; 00241 if (len > 0) { 00242 digit |= 0x80; 00243 } 00244 lenBuf[pos++] = digit; 00245 llen++; 00246 } while(len>0); 00247 00248 buf[4-llen] = header; 00249 for (int i=0; i<llen; i++) { 00250 buf[5-llen+i] = lenBuf[i]; 00251 } 00252 rc = _client.send(buf+(4-llen),length+1+llen); 00253 00254 lastOutActivity = millis(); 00255 return (rc == 1+llen+length); 00256 } 00257 00258 bool PubSubClient::subscribe(char* topic) 00259 { 00260 //pc1.printf("in subscribe ... %s\r\n",topic); 00261 if (connected()) { 00262 // Leave room in the buffer for header and variable length field 00263 int length = 5; 00264 nextMsgId++; 00265 if (nextMsgId == 0) { 00266 nextMsgId = 1; 00267 } 00268 buffer[length++] = (nextMsgId >> 8); 00269 buffer[length++] = (nextMsgId & 0xFF); 00270 length = writeString(topic, buffer,length); 00271 buffer[length++] = 0; // Only do QoS 0 subs 00272 return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5); 00273 } 00274 return false; 00275 } 00276 00277 bool PubSubClient::unsubscribe(char* topic) 00278 { 00279 if (connected()) { 00280 int length = 5; 00281 nextMsgId++; 00282 if (nextMsgId == 0) { 00283 nextMsgId = 1; 00284 } 00285 buffer[length++] = (nextMsgId >> 8); 00286 buffer[length++] = (nextMsgId & 0xFF); 00287 length = writeString(topic, buffer,length); 00288 return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5); 00289 } 00290 return false; 00291 } 00292 00293 void PubSubClient::disconnect() 00294 { 00295 buffer[0] = MQTTDISCONNECT; 00296 buffer[1] = 0; 00297 _client.send(buffer,2); 00298 _client.close(true); 00299 lastInActivity = lastOutActivity = millis(); 00300 } 00301 00302 int PubSubClient::writeString(char* string, char* buf, int pos) 00303 { 00304 char* idp = string; 00305 int i = 0; 00306 pos += 2; 00307 while (*idp) { 00308 buf[pos++] = *idp++; 00309 i++; 00310 } 00311 buf[pos-i-2] = (i >> 8); 00312 buf[pos-i-1] = (i & 0xFF); 00313 return pos; 00314 } 00315 00316 00317 bool PubSubClient::connected() 00318 { 00319 bool rc; 00320 rc = (int)_client.is_connected(); 00321 if (!rc) _client.close(true); 00322 return rc; 00323 }
Generated on Wed Jul 13 2022 23:16:09 by
