An API for using MQTT over multiple transports
Dependents: Water_Monitor_clone_v1 Cloud_IBM_MbedOS ble-star-mbed
Fork of MQTT by
Revision 54:ff9e5c4b52d0, committed 2017-09-30
- Comitter:
- icraggs
- Date:
- Sat Sep 30 16:46:21 2017 +0000
- Parent:
- 53:15b5a280d22d
- Child:
- 55:b74b9ef26f33
- Child:
- 56:71ae1a773b64
- Commit message:
- Fix ping processing
Changed in this revision
--- a/MQTTClient.h Mon Sep 25 12:06:28 2017 +0000
+++ b/MQTTClient.h Sat Sep 30 16:46:21 2017 +0000
@@ -400,7 +400,8 @@
#if defined(MQTT_DEBUG)
char printbuf[150];
- DEBUG("Rc %d from sending packet %s\n", rc, MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length));
+ DEBUG("Rc %d from sending packet %s\n", rc,
+ MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length));
#endif
return rc;
}
@@ -479,7 +480,7 @@
if (rc >= 0)
{
char printbuf[50];
- DEBUG("Rc %d from receiving packet %s\n", rc,
+ DEBUG("Rc %d receiving packet %s\n", rc,
MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len));
}
#endif
@@ -676,13 +677,14 @@
int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive()
{
int rc = SUCCESS;
+ static Timer ping_sent;
if (keepAliveInterval == 0)
goto exit;
-
- if (last_sent.expired() || last_received.expired())
+
+ if (ping_outstanding)
{
- if (ping_outstanding)
+ if (ping_sent.expired())
{
rc = FAILURE; // session failure
#if defined(MQTT_DEBUG)
@@ -690,15 +692,17 @@
DEBUG("PINGRESP not received in keepalive interval\n");
#endif
}
- else
+ }
+ else if (last_sent.expired() || last_received.expired())
+ {
+ Timer timer(1000);
+ int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
+ if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet
{
- Timer timer(1000);
- int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
- if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet
- ping_outstanding = true;
+ ping_outstanding = true;
+ ping_sent.countdown(this->keepAliveInterval);
}
}
-
exit:
return rc;
}
--- a/MQTTEthernet.h Mon Sep 25 12:06:28 2017 +0000
+++ b/MQTTEthernet.h Sat Sep 30 16:46:21 2017 +0000
@@ -9,9 +9,8 @@
class MQTTEthernet : public MQTTSocket
{
public:
- MQTTEthernet()
+ MQTTEthernet() : MQTTSocket(ð)
{
- eth.init(); // Use DHCP
eth.connect();
}
@@ -20,11 +19,6 @@
return eth;
}
- void reconnect()
- {
- eth.connect(); // nothing I've tried actually works to reconnect
- }
-
private:
EthernetInterface eth;
--- a/MQTTSocket.h Mon Sep 25 12:06:28 2017 +0000
+++ b/MQTTSocket.h Sat Sep 30 16:46:21 2017 +0000
@@ -2,42 +2,95 @@
#define MQTTSOCKET_H
#include "MQTTmbed.h"
-#include "TCPSocketConnection.h"
+#include <EthernetInterface.h>
+#include <Timer.h>
class MQTTSocket
{
public:
+ MQTTSocket(EthernetInterface *anet)
+ {
+ net = anet;
+ open = false;
+ }
+
int connect(char* hostname, int port, int timeout=1000)
{
- mysock.set_blocking(false, timeout); // 1 second Timeout
- return mysock.connect(hostname, port);
+ if (open)
+ disconnect();
+ nsapi_error_t rc = mysock.open(net);
+ open = true;
+ mysock.set_blocking(true);
+ mysock.set_timeout((unsigned int)timeout);
+ rc = mysock.connect(hostname, port);
+ mysock.set_blocking(false); // blocking timeouts seem not to work
+ return rc;
}
+ // common read/write routine, avoiding blocking timeouts
+ int common(unsigned char* buffer, int len, int timeout, bool read)
+ {
+ timer.start();
+ mysock.set_blocking(false); // blocking timeouts seem not to work
+ int bytes = 0;
+ bool first = true;
+ do
+ {
+ if (first)
+ first = false;
+ else
+ wait_ms(timeout < 100 ? timeout : 100);
+ int rc;
+ if (read)
+ rc = mysock.recv((char*)buffer, len);
+ else
+ rc = mysock.send((char*)buffer, len);
+ if (rc < 0)
+ {
+ if (rc != NSAPI_ERROR_WOULD_BLOCK)
+ {
+ bytes = -1;
+ break;
+ }
+ }
+ else
+ bytes += rc;
+ }
+ while (bytes < len && timer.read_ms() < timeout);
+ timer.stop();
+ return bytes;
+ }
+
+ /* returns the number of bytes read, which could be 0.
+ -1 if there was an error on the socket
+ */
int read(unsigned char* buffer, int len, int timeout)
{
- mysock.set_blocking(false, timeout);
- return mysock.receive((char*)buffer, len);
+ return common(buffer, len, timeout, true);
}
int write(unsigned char* buffer, int len, int timeout)
{
- mysock.set_blocking(false, timeout);
- return mysock.send((char*)buffer, len);
+ return common(buffer, len, timeout, false);
}
int disconnect()
{
+ open = false;
return mysock.close();
}
- bool is_connected()
+ /*bool is_connected()
{
return mysock.is_connected();
- }
+ }*/
private:
- TCPSocketConnection mysock;
+ bool open;
+ TCPSocket mysock;
+ EthernetInterface *net;
+ Timer timer;
};
