Yalgaar mBed SDK for real-time messaging
Fork of MQTT by
Diff: PubSubClient.cpp
- Revision:
- 1:c233cee7c15b
- Parent:
- 0:ca855d29545b
- Child:
- 3:511121ab8fef
--- a/PubSubClient.cpp Sun May 26 16:52:01 2013 +0000 +++ b/PubSubClient.cpp Fri Mar 10 13:11:49 2017 +0000 @@ -3,18 +3,22 @@ Nicholas O'Leary http://knolleary.net -initial port for mbed +initial port for mbed Joerg Wende https://twitter.com/joerg_wende */ #include "PubSubClient.h" -Timer t; -//Serial pc1(USBTX, USBRX); + -int millis() +Serial pc1(USBTX, USBRX, 115200); + +int pubsub_error_code; + + +int PubSubClient::millis() { return t.read_ms(); } @@ -23,12 +27,12 @@ { } -PubSubClient::PubSubClient(char *ip, int port, void (*callback)(char*,char*,unsigned int)) +PubSubClient::PubSubClient(char *ip, int port) { - this->callback = callback; + this->_state = MQTT_DISCONNECTED; this->ip = ip; this->port = port; - t.start(); + this->t.start(); } @@ -105,19 +109,25 @@ int llen=128; int len =0; - + while ((len=readPacket(llen))==0) { unsigned long t = millis(); if (t-lastInActivity > MQTT_KEEPALIVE*1000UL) { + _state = MQTT_CONNECTION_TIMEOUT; _client.close(true); return false; } } //pc1.printf("after MQTT Connect ... %i\r\n",len); - if (len == 4 && buffer[3] == 0) { - lastInActivity = millis(); - pingOutstanding = false; - return true; + if (len == 4) { + if (buffer[3] == 0) { + lastInActivity = millis(); + pingOutstanding = false; + _state = MQTT_CONNECTED; + return true; + } else { + _state = buffer[3]; + } } } _client.close(true); @@ -139,6 +149,7 @@ unsigned long t = millis(); if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) { if (pingOutstanding) { + this->_state = MQTT_CONNECTION_TIMEOUT; _client.close(true); return false; } else { @@ -156,6 +167,8 @@ if (len > 0) { lastInActivity = t; char type = buffer[0]&0xF0; + //pc1.printf("type :: %d\r\n",type); + //pc1.printf("buffer[%d] :: %d\r\n",len-1,buffer[len-1]); if (type == MQTTPUBLISH) { if (callback) { //pc1.printf("MQTTPUBLISH received ... %i\r\n",len); @@ -184,6 +197,16 @@ _client.send(buffer,2); } else if (type == MQTTPINGRESP) { pingOutstanding = false; + } else { + //for(int i=0;i<=len;i++) + //pc1.printf("buffer[%d] :: %d\r\n",i,buffer[i]); + if (buffer[0] == 144 && buffer[len-1] == 0) { + pc1.printf("Subscribed\r\n"); + } + if (true) { + //pc1.printf("In loop function buffer :: %d\r\n",buffer[len-1]); + pubsub_error_code = buffer[len-1]; + } } } } @@ -206,7 +229,7 @@ { if (connected()) { // Leave room in the buffer for header and variable length field - //pc1.printf("in publish ... %s\r\n",topic); + //pc1.printf("in publish ... $$$$$$$$$$$:::%s\r\n",topic); //pc1.printf("in publish ... %s\r\n",payload); int length = 5; length = writeString(topic,buffer,length); @@ -234,7 +257,7 @@ short rc; short len = length; //pc1.printf("in write ... %d\r\n",length); - //pc1.printf("in write ... %s\r\n",buf); + //pc1.printf("in write ...$$$$$$:: %s\r\n",buf); do { digit = len % 128; len = len / 128; @@ -295,6 +318,7 @@ buffer[0] = MQTTDISCONNECT; buffer[1] = 0; _client.send(buffer,2); + _state = MQTT_DISCONNECTED; _client.close(true); lastInActivity = lastOutActivity = millis(); } @@ -320,4 +344,15 @@ rc = (int)_client.is_connected(); if (!rc) _client.close(true); return rc; +} + +PubSubClient& PubSubClient::setCallback(MQTT_CALLBACK_SIGNATURE) +{ + this->callback = callback; + return *this; +} + +int PubSubClient::mqtt_state() +{ + return this->_state; } \ No newline at end of file