Arch GPRS MQTT demo
Dependencies: GPRSInterface USBDevice mbed
Fork of Seeed_HTTPClient_GPRSInterface_HelloWorld by
PubSubClient.cpp
00001 /* 00002 PubSubClient.cpp - A simple client for MQTT. 00003 Nicholas O'Leary 00004 http://knolleary.net 00005 00006 initial port for mbed 00007 Joerg Wende 00008 https://twitter.com/joerg_wende 00009 */ 00010 00011 00012 00013 #include "PubSubClient.h" 00014 00015 #include "USBSerial.h" 00016 00017 extern USBSerial pc; 00018 00019 #define LOG(args...) pc.printf(args) 00020 00021 Timer t; 00022 00023 int millis() 00024 { 00025 return t.read_ms(); 00026 } 00027 00028 PubSubClient::PubSubClient() 00029 { 00030 } 00031 00032 PubSubClient::PubSubClient(char *ip, int port, void (*callback)(char*,char*,unsigned int)) 00033 { 00034 this->callback = callback; 00035 this->ip = ip; 00036 this->port = port; 00037 t.start(); 00038 } 00039 00040 00041 bool PubSubClient::connect(char *id) 00042 { 00043 return connect(id,NULL,NULL,0,0,0,0); 00044 } 00045 00046 bool PubSubClient::connect(char *id, char *user, char *pass) 00047 { 00048 return connect(id,user,pass,0,0,0,0); 00049 } 00050 00051 bool PubSubClient::connect(char *id, char* willTopic, short willQos, short willRetain, char* willMessage) 00052 { 00053 return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage); 00054 } 00055 00056 bool PubSubClient::connect(char *id, char *user, char *pass, char* willTopic, short willQos, short willRetain, char* willMessage) 00057 { 00058 if (!connected()) { 00059 int result = 0; 00060 result = _client.connect(this->ip, this->port); 00061 _client.set_blocking(false, 1); 00062 LOG("IP: %s\r\n",this->ip); 00063 LOG("Port: %i\r\n",this->port); 00064 LOG("Result: %i \r\n", result); 00065 00066 if (result==0) { 00067 nextMsgId = 1; 00068 char d[9] = {0x00,0x06,'M','Q','I','s','d','p',MQTTPROTOCOLVERSION}; 00069 // Leave room in the buffer for header and variable length field 00070 int length = 5; 00071 unsigned int j; 00072 for (j = 0; j<9; j++) { 00073 buffer[length++] = d[j]; 00074 } 00075 00076 char v; 00077 if (willTopic) { 00078 v = 0x06|(willQos<<3)|(willRetain<<5); 00079 } else { 00080 v = 0x02; 00081 } 00082 00083 if(user != NULL) { 00084 v = v|0x80; 00085 00086 if(pass != NULL) { 00087 v = v|(0x80>>1); 00088 } 00089 } 00090 00091 buffer[length++] = v; 00092 00093 buffer[length++] = ((MQTT_KEEPALIVE) >> 8); 00094 buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF); 00095 length = writeString(id,buffer,length); 00096 if (willTopic) { 00097 length = writeString(willTopic,buffer,length); 00098 length = writeString(willMessage,buffer,length); 00099 } 00100 00101 if(user != NULL) { 00102 length = writeString(user,buffer,length); 00103 if(pass != NULL) { 00104 length = writeString(pass,buffer,length); 00105 } 00106 } 00107 LOG("Before MQTT Connect ... \r\n"); 00108 write(MQTTCONNECT,buffer,length-5); 00109 00110 lastInActivity = lastOutActivity = millis(); 00111 00112 int llen=128; 00113 int len =0; 00114 00115 while ((len=readPacket(llen))==0) { 00116 unsigned long t = millis(); 00117 if (t-lastInActivity > MQTT_KEEPALIVE*1000UL) { 00118 _client.close(); 00119 return false; 00120 } 00121 } 00122 LOG("after MQTT Connect ... %i\r\n", len); 00123 if (len == 4 && buffer[3] == 0) { 00124 lastInActivity = millis(); 00125 pingOutstanding = false; 00126 return true; 00127 } 00128 } 00129 _client.close(); 00130 } 00131 return false; 00132 } 00133 00134 00135 int PubSubClient::readPacket(int lengthLength) 00136 { 00137 int len = 0; 00138 len = _client.receive_all(buffer,lengthLength); 00139 return len; 00140 } 00141 00142 bool PubSubClient::loop() 00143 { 00144 if (connected()) { 00145 unsigned long t = millis(); 00146 if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) { 00147 if (pingOutstanding) { 00148 _client.close(); 00149 return false; 00150 } else { 00151 buffer[0] = MQTTPINGREQ; 00152 buffer[1] = 0; 00153 _client.send(buffer,2); 00154 lastOutActivity = t; 00155 lastInActivity = t; 00156 pingOutstanding = true; 00157 } 00158 } 00159 int len; 00160 int llen= 128; 00161 if (!((len=readPacket(llen))==0)) { 00162 if (len > 0) { 00163 lastInActivity = t; 00164 char type = buffer[0]&0xF0; 00165 if (type == MQTTPUBLISH) { 00166 if (callback) { 00167 LOG("MQTTPUBLISH - bytes received:%i\r\n", len); 00168 int tl = (buffer[2]<<8)+buffer[3]; 00169 LOG("t1:%i\r\n",tl); 00170 char topic[tl+1]; 00171 for (int i=0; i<tl; i++) { 00172 topic[i] = buffer[4+i]; 00173 } 00174 topic[tl] = 0; 00175 LOG("Topic:%s\r\n",topic); 00176 // ignore msgID - only support QoS 0 subs 00177 int t2 = len-4-tl; 00178 LOG("t2:%i\r\n",t2); 00179 char payload[t2+1]; 00180 for (int i=0; i<t2; i++) { 00181 payload[i] = buffer[4+i+tl]; 00182 } 00183 payload[t2] = 0; 00184 LOG("Payload:%s\r\n", payload); 00185 callback(topic,payload,t2); 00186 } 00187 } else if (type == MQTTPINGREQ) { 00188 buffer[0] = MQTTPINGRESP; 00189 buffer[1] = 0; 00190 _client.send(buffer,2); 00191 } else if (type == MQTTPINGRESP) { 00192 pingOutstanding = false; 00193 } 00194 } 00195 } 00196 return true; 00197 } 00198 return false; 00199 } 00200 00201 bool PubSubClient::publish(char* topic, char* payload) 00202 { 00203 return publish(topic,payload,strlen(payload),false); 00204 } 00205 00206 bool PubSubClient::publish(char* topic, char* payload, unsigned int plength) 00207 { 00208 return publish(topic, payload, plength, false); 00209 } 00210 00211 bool PubSubClient::publish(char* topic, char* payload, unsigned int plength, bool retained) 00212 { 00213 if (connected()) { 00214 // Leave room in the buffer for header and variable length field 00215 LOG("publish - topic:%s, payload:%s\r\n", topic, payload); 00216 int length = 5; 00217 length = writeString(topic,buffer,length); 00218 int i; 00219 for (i=0; i<plength; i++) { 00220 buffer[length++] = payload[i]; 00221 } 00222 short header = MQTTPUBLISH; 00223 if (retained) { 00224 header |= 1; 00225 } 00226 return write(header,buffer,length-5); 00227 } 00228 return false; 00229 } 00230 00231 00232 00233 bool PubSubClient::write(short header, char* buf, int length) 00234 { 00235 short lenBuf[4]; 00236 short llen = 0; 00237 short digit; 00238 short pos = 0; 00239 short rc; 00240 short len = length; 00241 do { 00242 digit = len % 128; 00243 len = len / 128; 00244 if (len > 0) { 00245 digit |= 0x80; 00246 } 00247 lenBuf[pos++] = digit; 00248 llen++; 00249 } while(len>0); 00250 00251 buf[4-llen] = header; 00252 for (int i=0; i<llen; i++) { 00253 buf[5-llen+i] = lenBuf[i]; 00254 } 00255 rc = _client.send(buf+(4-llen),length+1+llen); 00256 00257 lastOutActivity = millis(); 00258 return (rc == 1+llen+length); 00259 } 00260 00261 bool PubSubClient::subscribe(char* topic) 00262 { 00263 LOG("subscribe - topic:%s\r\n",topic); 00264 if (connected()) { 00265 // Leave room in the buffer for header and variable length field 00266 int length = 5; 00267 nextMsgId++; 00268 if (nextMsgId == 0) { 00269 nextMsgId = 1; 00270 } 00271 buffer[length++] = (nextMsgId >> 8); 00272 buffer[length++] = (nextMsgId & 0xFF); 00273 length = writeString(topic, buffer,length); 00274 buffer[length++] = 0; // Only do QoS 0 subs 00275 return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5); 00276 } 00277 return false; 00278 } 00279 00280 bool PubSubClient::unsubscribe(char* topic) 00281 { 00282 if (connected()) { 00283 int length = 5; 00284 nextMsgId++; 00285 if (nextMsgId == 0) { 00286 nextMsgId = 1; 00287 } 00288 buffer[length++] = (nextMsgId >> 8); 00289 buffer[length++] = (nextMsgId & 0xFF); 00290 length = writeString(topic, buffer,length); 00291 return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5); 00292 } 00293 return false; 00294 } 00295 00296 void PubSubClient::disconnect() 00297 { 00298 buffer[0] = MQTTDISCONNECT; 00299 buffer[1] = 0; 00300 _client.send(buffer,2); 00301 _client.close(); 00302 lastInActivity = lastOutActivity = millis(); 00303 } 00304 00305 int PubSubClient::writeString(char* string, char* buf, int pos) 00306 { 00307 char* idp = string; 00308 int i = 0; 00309 pos += 2; 00310 while (*idp) { 00311 buf[pos++] = *idp++; 00312 i++; 00313 } 00314 buf[pos-i-2] = (i >> 8); 00315 buf[pos-i-1] = (i & 0xFF); 00316 return pos; 00317 } 00318 00319 00320 bool PubSubClient::connected() 00321 { 00322 bool rc; 00323 rc = (int)_client.is_connected(); 00324 // if (!rc) _client.close(); 00325 return rc; 00326 }
Generated on Tue Jul 19 2022 22:27:48 by 1.7.2