Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Fork of MQTT by
Diff: MQTTClient.h
- Revision:
- 46:e335fcc1a663
- Parent:
- 44:c299463ae853
--- a/MQTTClient.h Mon Aug 03 12:40:57 2015 +0000
+++ b/MQTTClient.h Tue Aug 18 09:57:19 2015 +0000
@@ -1,5 +1,5 @@
/*******************************************************************************
- * Copyright (c) 2014 IBM Corp.
+ * Copyright (c) 2014, 2015 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
@@ -12,6 +12,10 @@
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
+ * Ian Craggs - fix for bug 458512 - QoS 2 messages
+ * Ian Craggs - fix for bug 460389 - send loop uses wrong length
+ * Ian Craggs - fix for bug 464169 - clearing subscriptions
+ * Ian Craggs - fix for bug 464551 - enums and ints can be different size
*******************************************************************************/
#if !defined(MQTTCLIENT_H)
@@ -118,7 +122,7 @@
*/
int connect();
- /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
+ /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
* The nework object must be connected to the network endpoint before calling this
* @param options - connect options
* @return success code -
@@ -190,6 +194,7 @@
private:
+ void cleanSession();
int cycle(Timer& timer);
int waitfor(int packet_type, Timer& timer);
int keepalive();
@@ -239,6 +244,7 @@
unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES];
bool isQoS2msgidFree(unsigned short id);
bool useQoS2msgid(unsigned short id);
+ void freeQoS2msgid(unsigned short id);
#endif
};
@@ -247,28 +253,35 @@
template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
+void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::cleanSession()
+{
+ ping_outstanding = false;
+ for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
+ messageHandlers[i].topicFilter = 0;
+ isconnected = false;
+
+#if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
+ inflightMsgid = 0;
+ inflightQoS = QOS0;
+#endif
+
+#if MQTTCLIENT_QOS2
+ pubrel = false;
+ for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
+ incomingQoS2messages[i] = 0;
+#endif
+}
+
+
+template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms) : ipstack(network), packetid()
{
last_sent = Timer();
last_received = Timer();
- ping_outstanding = false;
- for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
- messageHandlers[i].topicFilter = 0;
this->command_timeout_ms = command_timeout_ms;
- isconnected = false;
-
-#if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
- inflightMsgid = 0;
- inflightQoS = QOS0;
-#endif
+ cleanSession();
+}
-
-#if MQTTCLIENT_QOS2
- pubrel = false;
- for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
- incomingQoS2messages[i] = 0;
-#endif
-}
#if MQTTCLIENT_QOS2
template<class Network, class Timer, int a, int b>
@@ -296,6 +309,20 @@
}
return false;
}
+
+
+template<class Network, class Timer, int a, int b>
+void MQTT::Client<Network, Timer, a, b>::freeQoS2msgid(unsigned short id)
+{
+ for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
+ {
+ if (incomingQoS2messages[i] == id)
+ {
+ incomingQoS2messages[i] = 0;
+ return;
+ }
+ }
+}
#endif
@@ -307,7 +334,7 @@
while (sent < length && !timer.expired())
{
- rc = ipstack.write(&sendbuf[sent], length, timer.left_ms());
+ rc = ipstack.write(&sendbuf[sent], length - sent, timer.left_ms());
if (rc < 0) // there was an error writing the data
break;
sent += rc;
@@ -322,8 +349,8 @@
rc = FAILURE;
#if defined(MQTT_DEBUG)
- char printbuf[50];
- DEBUG("Rc %d from sending packet %s\n", rc, MQTTPacket_toString(printbuf, sizeof(printbuf), sendbuf, length));
+ char printbuf[150];
+ DEBUG("Rc %d from sending packet %s\n", rc, MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length));
#endif
return rc;
}
@@ -364,8 +391,8 @@
* @param timeout the max time to wait for the packet read to complete, in milliseconds
* @return the MQTT packet type, or -1 if none
*/
-template<class Network, class Timer, int a, int b>
-int MQTT::Client<Network, Timer, a, b>::readPacket(Timer& timer)
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
+int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::readPacket(Timer& timer)
{
int rc = FAILURE;
MQTTHeader header = {0};
@@ -381,6 +408,12 @@
decodePacket(&rem_len, timer.left_ms());
len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */
+ if (rem_len > (MAX_MQTT_PACKET_SIZE - len))
+ {
+ rc = BUFFER_OVERFLOW;
+ goto exit;
+ }
+
/* 3. read the rest of the buffer using a callback to supply the rest of the data */
if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len))
goto exit;
@@ -392,8 +425,11 @@
exit:
#if defined(MQTT_DEBUG)
- char printbuf[50];
- DEBUG("Rc %d from receiving packet %s\n", rc, MQTTPacket_toString(printbuf, sizeof(printbuf), readbuf, len));
+ if (rc >= 0)
+ {
+ char printbuf[50];
+ DEBUG("Rc %d from receiving packet %s\n", rc, MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len));
+ }
#endif
return rc;
}
@@ -473,7 +509,7 @@
timer.countdown_ms(timeout_ms);
while (!timer.expired())
{
- if (cycle(timer) == FAILURE)
+ if (cycle(timer) < 0)
{
rc = FAILURE;
break;
@@ -490,23 +526,30 @@
/* get one piece of work off the wire and one pass through */
// read the socket, see what work is due
- unsigned short packet_type = readPacket(timer);
+ int packet_type = readPacket(timer);
int len = 0,
rc = SUCCESS;
switch (packet_type)
{
+ case FAILURE:
+ case BUFFER_OVERFLOW:
+ rc = packet_type;
+ break;
case CONNACK:
case PUBACK:
case SUBACK:
break;
case PUBLISH:
- MQTTString topicName;
+ {
+ MQTTString topicName = MQTTString_initializer;
Message msg;
- if (MQTTDeserialize_publish((unsigned char*)&msg.dup, (int*)&msg.qos, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName,
+ int intQoS;
+ if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName,
(unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
goto exit;
+ msg.qos = (enum QoS)intQoS;
#if MQTTCLIENT_QOS2
if (msg.qos != QOS2)
#endif
@@ -518,7 +561,7 @@
deliverMessage(topicName, msg);
else
WARN("Maximum number of incoming QoS2 messages exceeded");
- }
+ }
#endif
#if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
if (msg.qos != QOS0)
@@ -536,19 +579,25 @@
}
break;
#endif
+ }
#if MQTTCLIENT_QOS2
case PUBREC:
+ case PUBREL:
unsigned short mypacketid;
unsigned char dup, type;
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
rc = FAILURE;
- else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid)) <= 0)
+ else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE,
+ (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0)
rc = FAILURE;
else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet
rc = FAILURE; // there was a problem
if (rc == FAILURE)
goto exit; // there was a problem
+ if (packet_type == PUBREL)
+ freeQoS2msgid(mypacketid);
break;
+
case PUBCOMP:
break;
#endif
@@ -579,7 +628,7 @@
{
if (!ping_outstanding)
{
- Timer timer = Timer(1000);
+ Timer timer(1000);
int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet
ping_outstanding = true;
@@ -611,7 +660,7 @@
template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options)
{
- Timer connect_timer = Timer(command_timeout_ms);
+ Timer connect_timer(command_timeout_ms);
int rc = FAILURE;
int len = 0;
@@ -639,10 +688,10 @@
}
else
rc = FAILURE;
-
+
#if MQTTCLIENT_QOS2
- // resend an inflight publish
- if (inflightMsgid >0 && inflightQoS == QOS2 && pubrel)
+ // resend any inflight publish
+ if (inflightMsgid > 0 && inflightQoS == QOS2 && pubrel)
{
if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0)
rc = FAILURE;
@@ -678,9 +727,9 @@
int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
{
int rc = FAILURE;
- Timer timer = Timer(command_timeout_ms);
+ Timer timer(command_timeout_ms);
int len = 0;
- MQTTString topic = {(char*)topicFilter, 0, 0};
+ MQTTString topic = {(char*)topicFilter, {0, 0}};
if (!isconnected)
goto exit;
@@ -716,7 +765,7 @@
exit:
if (rc != SUCCESS)
- isconnected = false;
+ cleanSession();
return rc;
}
@@ -725,8 +774,8 @@
int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter)
{
int rc = FAILURE;
- Timer timer = Timer(command_timeout_ms);
- MQTTString topic = {(char*)topicFilter, 0, 0};
+ Timer timer(command_timeout_ms);
+ MQTTString topic = {(char*)topicFilter, {0, 0}};
int len = 0;
if (!isconnected)
@@ -741,14 +790,26 @@
{
unsigned short mypacketid; // should be the same as the packetid above
if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
+ {
rc = 0;
+
+ // remove the subscription message handler associated with this topic, if there is one
+ for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
+ {
+ if (messageHandlers[i].topicFilter && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0)
+ {
+ messageHandlers[i].topicFilter = 0;
+ break;
+ }
+ }
+ }
}
else
rc = FAILURE;
exit:
if (rc != SUCCESS)
- isconnected = false;
+ cleanSession();
return rc;
}
@@ -757,11 +818,11 @@
int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos)
{
int rc;
-
+
if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet
goto exit; // there was a problem
-#if MQTTCLIENT_QOS1
+#if MQTTCLIENT_QOS1
if (qos == QOS1)
{
if (waitfor(PUBACK, timer) == PUBACK)
@@ -795,7 +856,7 @@
exit:
if (rc != SUCCESS)
- isconnected = false;
+ cleanSession();
return rc;
}
@@ -805,13 +866,13 @@
int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained)
{
int rc = FAILURE;
- Timer timer = Timer(command_timeout_ms);
+ Timer timer(command_timeout_ms);
MQTTString topicString = MQTTString_initializer;
int len = 0;
if (!isconnected)
goto exit;
-
+
topicString.cstring = (char*)topicName;
#if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
@@ -823,7 +884,7 @@
topicString, (unsigned char*)payload, payloadlen);
if (len <= 0)
goto exit;
-
+
#if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
if (!cleansession)
{
@@ -836,7 +897,7 @@
#endif
}
#endif
-
+
rc = publish(len, timer, qos);
exit:
return rc;
@@ -862,14 +923,17 @@
int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect()
{
int rc = FAILURE;
- Timer timer = Timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete
+ Timer timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete
int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE);
if (len > 0)
rc = sendPacket(len, timer); // send the disconnect packet
- isconnected = false;
+ if (cleansession)
+ cleanSession();
+ else
+ isconnected = false;
return rc;
}
-#endif
+#endif
\ No newline at end of file
