Yalgaar mBed SDK for real-time messaging
Fork of MQTT by
Embed:
(wiki syntax)
Show/hide line numbers
PubSubClient.cpp
00001 /* 00002 PubSubClient.cpp - A simple client for MQTT. 00003 Nicholas O'Leary 00004 http://knolleary.net 00005 00006 initial port for mbed 00007 Joerg Wende 00008 https://twitter.com/joerg_wende 00009 */ 00010 00011 #include "PubSubClient.h" 00012 00013 00014 00015 00016 Serial sls(USBTX, USBRX, 115200); 00017 00018 int pubsub_error_code; 00019 00020 00021 int PubSubClient::millis() 00022 { 00023 return t.read_ms(); 00024 } 00025 00026 PubSubClient::PubSubClient() 00027 { 00028 this->_state = MQTT_DISCONNECTED; 00029 this->ip = "api.yalgaar.io"; 00030 this->port = 1883; 00031 this->t.start(); 00032 } 00033 00034 PubSubClient::PubSubClient(char *ip, int port) 00035 { 00036 this->_state = MQTT_DISCONNECTED; 00037 this->ip = ip; 00038 this->port = port; 00039 this->t.start(); 00040 } 00041 00042 00043 bool PubSubClient::connect(char *id) 00044 { 00045 return connect(id,NULL,NULL,0,0,0,0); 00046 } 00047 00048 bool PubSubClient::connect(char *id, char *user, char *pass) 00049 { 00050 return connect(id,user,pass,0,0,0,0); 00051 } 00052 00053 bool PubSubClient::connect(char *id, char* willTopic, short willQos, short willRetain, char* willMessage) 00054 { 00055 return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage); 00056 } 00057 00058 bool PubSubClient::connect(char *id, char *user, char *pass, char* willTopic, short willQos, short willRetain, char* willMessage) 00059 { 00060 if (!connected()) { 00061 int result = 0; 00062 result = _client.connect(this->ip, this->port); 00063 _client.set_blocking(false, 1); 00064 //pc1.printf("IP: %s\r\n",this->ip); 00065 //pc1.printf("Port: %i\r\n",this->port); 00066 //pc1.printf("Result: %i \r\n", result); 00067 00068 if (result==0) { 00069 nextMsgId = 1; 00070 char d[9] = {0x00,0x06,'M','Q','I','s','d','p',MQTTPROTOCOLVERSION}; 00071 // Leave room in the buffer for header and variable length field 00072 int length = 5; 00073 unsigned int j; 00074 for (j = 0; j<9; j++) { 00075 buffer[length++] = d[j]; 00076 } 00077 00078 char v; 00079 if (willTopic) { 00080 v = 0x06|(willQos<<3)|(willRetain<<5); 00081 } else { 00082 v = 0x02; 00083 } 00084 00085 if(user != NULL) { 00086 v = v|0x80; 00087 00088 if(pass != NULL) { 00089 v = v|(0x80>>1); 00090 } 00091 } 00092 00093 buffer[length++] = v; 00094 00095 buffer[length++] = ((MQTT_KEEPALIVE) >> 8); 00096 buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF); 00097 length = writeString(id,buffer,length); 00098 if (willTopic) { 00099 length = writeString(willTopic,buffer,length); 00100 length = writeString(willMessage,buffer,length); 00101 } 00102 00103 if(user != NULL) { 00104 length = writeString(user,buffer,length); 00105 if(pass != NULL) { 00106 length = writeString(pass,buffer,length); 00107 } 00108 } 00109 //pc1.printf("Before MQTT Connect ... \r\n"); 00110 write(MQTTCONNECT,buffer,length-5); 00111 00112 lastInActivity = lastOutActivity = millis(); 00113 00114 int llen=128; 00115 int len =0; 00116 00117 while ((len=readPacket(llen))==0) { 00118 unsigned long t = millis(); 00119 if (t-lastInActivity > MQTT_KEEPALIVE*1000UL) { 00120 _state = MQTT_CONNECTION_TIMEOUT; 00121 _client.close(true); 00122 return false; 00123 } 00124 } 00125 //pc1.printf("after MQTT Connect ... %i\r\n",len); 00126 if (len == 4) { 00127 if (buffer[3] == 0) { 00128 lastInActivity = millis(); 00129 pingOutstanding = false; 00130 _state = MQTT_CONNECTED; 00131 return true; 00132 } else { 00133 _state = buffer[3]; 00134 } 00135 } 00136 } 00137 _client.close(true); 00138 } 00139 return false; 00140 } 00141 00142 00143 int PubSubClient::readPacket(int lengthLength) 00144 { 00145 int len = 0; 00146 len = _client.receive_all(buffer,lengthLength); 00147 return len; 00148 } 00149 00150 bool PubSubClient::loop() 00151 { 00152 if (connected()) { 00153 unsigned long t = millis(); 00154 if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) { 00155 if (pingOutstanding) { 00156 this->_state = MQTT_CONNECTION_TIMEOUT; 00157 _client.close(true); 00158 return false; 00159 } else { 00160 buffer[0] = MQTTPINGREQ; 00161 buffer[1] = 0; 00162 _client.send(buffer,2); 00163 lastOutActivity = t; 00164 lastInActivity = t; 00165 pingOutstanding = true; 00166 } 00167 } 00168 int len; 00169 int llen= 128; 00170 if (!((len=readPacket(llen))==0)) { 00171 if (len > 0) { 00172 lastInActivity = t; 00173 char type = buffer[0]&0xF0; 00174 //pc1.printf("type :: %d\r\n",type); 00175 //pc1.printf("buffer[%d] :: %d\r\n",len-1,buffer[len-1]); 00176 if (type == MQTTPUBLISH) { 00177 if (callback) { 00178 //pc1.printf("MQTTPUBLISH received ... %i\r\n",len); 00179 int tl = (buffer[2]<<8)+buffer[3]; 00180 //pc1.printf("t1 ... %i\r\n",tl); 00181 char topic[tl+1]; 00182 for (int i=0; i<tl; i++) { 00183 topic[i] = buffer[4+i]; 00184 } 00185 topic[tl] = 0; 00186 //pc1.printf("MQTTPUBLISH Topic ... %s\r\n",topic); 00187 // ignore msgID - only support QoS 0 subs 00188 int t2 = len-4-tl; 00189 //pc1.printf("t2 ... %i\r\n",t2); 00190 char payload[t2+1]; 00191 for (int i=0; i<t2; i++) { 00192 payload[i] = buffer[4+i+tl]; 00193 } 00194 payload[t2] = 0; 00195 //pc1.printf("MQTTPUBLISH Payload ... %s\r\n",payload); 00196 callback(topic,payload,t2); 00197 } 00198 } else if (type == MQTTPINGREQ) { 00199 buffer[0] = MQTTPINGRESP; 00200 buffer[1] = 0; 00201 _client.send(buffer,2); 00202 } else if (type == MQTTPINGRESP) { 00203 pingOutstanding = false; 00204 } else { 00205 //for(int i=0;i<=len;i++) 00206 //pc1.printf("buffer[%d] :: %d\r\n",i,buffer[i]); 00207 if (buffer[0] == 144 && buffer[len-1] == 0) { 00208 sls.printf("Subscribed\r\n"); 00209 } 00210 if (true) { 00211 //pc1.printf("In loop function buffer :: %d\r\n",buffer[len-1]); 00212 pubsub_error_code = buffer[len-1]; 00213 } 00214 } 00215 } 00216 } 00217 return true; 00218 } 00219 return false; 00220 } 00221 00222 bool PubSubClient::publish(char* topic, char* payload) 00223 { 00224 return publish(topic,payload,strlen(payload),false); 00225 } 00226 00227 bool PubSubClient::publish(char* topic, char* payload, unsigned int plength) 00228 { 00229 return publish(topic, payload, plength, false); 00230 } 00231 00232 bool PubSubClient::publish(char* topic, char* payload, unsigned int plength, bool retained) 00233 { 00234 if (connected()) { 00235 // Leave room in the buffer for header and variable length field 00236 //pc1.printf("in publish ... $$$$$$$$$$$:::%s\r\n",topic); 00237 //pc1.printf("in publish ... %s\r\n",payload); 00238 int length = 5; 00239 length = writeString(topic,buffer,length); 00240 int i; 00241 for (i=0; i<plength; i++) { 00242 buffer[length++] = payload[i]; 00243 } 00244 short header = MQTTPUBLISH; 00245 if (retained) { 00246 header |= 1; 00247 } 00248 return write(header,buffer,length-5); 00249 } 00250 return false; 00251 } 00252 00253 00254 00255 bool PubSubClient::write(short header, char* buf, int length) 00256 { 00257 short lenBuf[4]; 00258 short llen = 0; 00259 short digit; 00260 short pos = 0; 00261 short rc; 00262 short len = length; 00263 //pc1.printf("in write ... %d\r\n",length); 00264 //pc1.printf("in write ...$$$$$$:: %s\r\n",buf); 00265 do { 00266 digit = len % 128; 00267 len = len / 128; 00268 if (len > 0) { 00269 digit |= 0x80; 00270 } 00271 lenBuf[pos++] = digit; 00272 llen++; 00273 } while(len>0); 00274 00275 buf[4-llen] = header; 00276 for (int i=0; i<llen; i++) { 00277 buf[5-llen+i] = lenBuf[i]; 00278 } 00279 rc = _client.send(buf+(4-llen),length+1+llen); 00280 00281 lastOutActivity = millis(); 00282 return (rc == 1+llen+length); 00283 } 00284 00285 bool PubSubClient::subscribe(char* topic) 00286 { 00287 //pc1.printf("in subscribe ... %s\r\n",topic); 00288 if (connected()) { 00289 // Leave room in the buffer for header and variable length field 00290 int length = 5; 00291 nextMsgId++; 00292 if (nextMsgId == 0) { 00293 nextMsgId = 1; 00294 } 00295 buffer[length++] = (nextMsgId >> 8); 00296 buffer[length++] = (nextMsgId & 0xFF); 00297 length = writeString(topic, buffer,length); 00298 buffer[length++] = 0; // Only do QoS 0 subs 00299 return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5); 00300 } 00301 return false; 00302 } 00303 00304 bool PubSubClient::unsubscribe(char* topic) 00305 { 00306 if (connected()) { 00307 int length = 5; 00308 nextMsgId++; 00309 if (nextMsgId == 0) { 00310 nextMsgId = 1; 00311 } 00312 buffer[length++] = (nextMsgId >> 8); 00313 buffer[length++] = (nextMsgId & 0xFF); 00314 length = writeString(topic, buffer,length); 00315 return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5); 00316 } 00317 return false; 00318 } 00319 00320 void PubSubClient::disconnect() 00321 { 00322 buffer[0] = MQTTDISCONNECT; 00323 buffer[1] = 0; 00324 _client.send(buffer,2); 00325 _state = MQTT_DISCONNECTED; 00326 _client.close(true); 00327 lastInActivity = lastOutActivity = millis(); 00328 } 00329 00330 int PubSubClient::writeString(char* string, char* buf, int pos) 00331 { 00332 char* idp = string; 00333 int i = 0; 00334 pos += 2; 00335 while (*idp) { 00336 buf[pos++] = *idp++; 00337 i++; 00338 } 00339 buf[pos-i-2] = (i >> 8); 00340 buf[pos-i-1] = (i & 0xFF); 00341 return pos; 00342 } 00343 00344 00345 bool PubSubClient::connected() 00346 { 00347 bool rc; 00348 rc = (int)_client.is_connected(); 00349 if (!rc) _client.close(true); 00350 return rc; 00351 } 00352 00353 PubSubClient& PubSubClient::setCallback(MQTT_CALLBACK_SIGNATURE) 00354 { 00355 this->callback = callback; 00356 return *this; 00357 } 00358 00359 int PubSubClient::mqtt_state() 00360 { 00361 return this->_state; 00362 }
Generated on Wed Jul 13 2022 09:17:26 by 1.7.2