MQTT
Revision 54:ff9e5c4b52d0, committed 2017-09-30
- Comitter:
- icraggs
- Date:
- Sat Sep 30 16:46:21 2017 +0000
- Parent:
- 53:15b5a280d22d
- Child:
- 55:b74b9ef26f33
- Child:
- 56:71ae1a773b64
- Commit message:
- Fix ping processing
Changed in this revision
--- a/MQTTClient.h Mon Sep 25 12:06:28 2017 +0000 +++ b/MQTTClient.h Sat Sep 30 16:46:21 2017 +0000 @@ -400,7 +400,8 @@ #if defined(MQTT_DEBUG) char printbuf[150]; - DEBUG("Rc %d from sending packet %s\n", rc, MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length)); + DEBUG("Rc %d from sending packet %s\n", rc, + MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length)); #endif return rc; } @@ -479,7 +480,7 @@ if (rc >= 0) { char printbuf[50]; - DEBUG("Rc %d from receiving packet %s\n", rc, + DEBUG("Rc %d receiving packet %s\n", rc, MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len)); } #endif @@ -676,13 +677,14 @@ int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive() { int rc = SUCCESS; + static Timer ping_sent; if (keepAliveInterval == 0) goto exit; - - if (last_sent.expired() || last_received.expired()) + + if (ping_outstanding) { - if (ping_outstanding) + if (ping_sent.expired()) { rc = FAILURE; // session failure #if defined(MQTT_DEBUG) @@ -690,15 +692,17 @@ DEBUG("PINGRESP not received in keepalive interval\n"); #endif } - else + } + else if (last_sent.expired() || last_received.expired()) + { + Timer timer(1000); + int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE); + if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet { - Timer timer(1000); - int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE); - if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet - ping_outstanding = true; + ping_outstanding = true; + ping_sent.countdown(this->keepAliveInterval); } } - exit: return rc; }
--- a/MQTTEthernet.h Mon Sep 25 12:06:28 2017 +0000 +++ b/MQTTEthernet.h Sat Sep 30 16:46:21 2017 +0000 @@ -9,9 +9,8 @@ class MQTTEthernet : public MQTTSocket { public: - MQTTEthernet() + MQTTEthernet() : MQTTSocket(ð) { - eth.init(); // Use DHCP eth.connect(); } @@ -20,11 +19,6 @@ return eth; } - void reconnect() - { - eth.connect(); // nothing I've tried actually works to reconnect - } - private: EthernetInterface eth;
--- a/MQTTSocket.h Mon Sep 25 12:06:28 2017 +0000 +++ b/MQTTSocket.h Sat Sep 30 16:46:21 2017 +0000 @@ -2,42 +2,95 @@ #define MQTTSOCKET_H #include "MQTTmbed.h" -#include "TCPSocketConnection.h" +#include <EthernetInterface.h> +#include <Timer.h> class MQTTSocket { public: + MQTTSocket(EthernetInterface *anet) + { + net = anet; + open = false; + } + int connect(char* hostname, int port, int timeout=1000) { - mysock.set_blocking(false, timeout); // 1 second Timeout - return mysock.connect(hostname, port); + if (open) + disconnect(); + nsapi_error_t rc = mysock.open(net); + open = true; + mysock.set_blocking(true); + mysock.set_timeout((unsigned int)timeout); + rc = mysock.connect(hostname, port); + mysock.set_blocking(false); // blocking timeouts seem not to work + return rc; } + // common read/write routine, avoiding blocking timeouts + int common(unsigned char* buffer, int len, int timeout, bool read) + { + timer.start(); + mysock.set_blocking(false); // blocking timeouts seem not to work + int bytes = 0; + bool first = true; + do + { + if (first) + first = false; + else + wait_ms(timeout < 100 ? timeout : 100); + int rc; + if (read) + rc = mysock.recv((char*)buffer, len); + else + rc = mysock.send((char*)buffer, len); + if (rc < 0) + { + if (rc != NSAPI_ERROR_WOULD_BLOCK) + { + bytes = -1; + break; + } + } + else + bytes += rc; + } + while (bytes < len && timer.read_ms() < timeout); + timer.stop(); + return bytes; + } + + /* returns the number of bytes read, which could be 0. + -1 if there was an error on the socket + */ int read(unsigned char* buffer, int len, int timeout) { - mysock.set_blocking(false, timeout); - return mysock.receive((char*)buffer, len); + return common(buffer, len, timeout, true); } int write(unsigned char* buffer, int len, int timeout) { - mysock.set_blocking(false, timeout); - return mysock.send((char*)buffer, len); + return common(buffer, len, timeout, false); } int disconnect() { + open = false; return mysock.close(); } - bool is_connected() + /*bool is_connected() { return mysock.is_connected(); - } + }*/ private: - TCPSocketConnection mysock; + bool open; + TCPSocket mysock; + EthernetInterface *net; + Timer timer; };