Add MQTTMDM.h
Fork of MQTT by
Revision 46:e335fcc1a663, committed 2015-08-18
- Comitter:
- icraggs
- Date:
- Tue Aug 18 09:57:19 2015 +0000
- Parent:
- 45:37f007d2a8ae
- Child:
- 47:f9841b2adaa2
- Commit message:
- Latest update from Paho
Changed in this revision
| MQTTClient.h | Show annotated file Show diff for this revision Revisions of this file |
--- a/MQTTClient.h Mon Aug 03 12:40:57 2015 +0000
+++ b/MQTTClient.h Tue Aug 18 09:57:19 2015 +0000
@@ -1,5 +1,5 @@
/*******************************************************************************
- * Copyright (c) 2014 IBM Corp.
+ * Copyright (c) 2014, 2015 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
@@ -12,6 +12,10 @@
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
+ * Ian Craggs - fix for bug 458512 - QoS 2 messages
+ * Ian Craggs - fix for bug 460389 - send loop uses wrong length
+ * Ian Craggs - fix for bug 464169 - clearing subscriptions
+ * Ian Craggs - fix for bug 464551 - enums and ints can be different size
*******************************************************************************/
#if !defined(MQTTCLIENT_H)
@@ -118,7 +122,7 @@
*/
int connect();
- /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
+ /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
* The nework object must be connected to the network endpoint before calling this
* @param options - connect options
* @return success code -
@@ -190,6 +194,7 @@
private:
+ void cleanSession();
int cycle(Timer& timer);
int waitfor(int packet_type, Timer& timer);
int keepalive();
@@ -239,6 +244,7 @@
unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES];
bool isQoS2msgidFree(unsigned short id);
bool useQoS2msgid(unsigned short id);
+ void freeQoS2msgid(unsigned short id);
#endif
};
@@ -247,28 +253,35 @@
template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
+void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::cleanSession()
+{
+ ping_outstanding = false;
+ for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
+ messageHandlers[i].topicFilter = 0;
+ isconnected = false;
+
+#if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
+ inflightMsgid = 0;
+ inflightQoS = QOS0;
+#endif
+
+#if MQTTCLIENT_QOS2
+ pubrel = false;
+ for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
+ incomingQoS2messages[i] = 0;
+#endif
+}
+
+
+template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms) : ipstack(network), packetid()
{
last_sent = Timer();
last_received = Timer();
- ping_outstanding = false;
- for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
- messageHandlers[i].topicFilter = 0;
this->command_timeout_ms = command_timeout_ms;
- isconnected = false;
-
-#if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
- inflightMsgid = 0;
- inflightQoS = QOS0;
-#endif
+ cleanSession();
+}
-
-#if MQTTCLIENT_QOS2
- pubrel = false;
- for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
- incomingQoS2messages[i] = 0;
-#endif
-}
#if MQTTCLIENT_QOS2
template<class Network, class Timer, int a, int b>
@@ -296,6 +309,20 @@
}
return false;
}
+
+
+template<class Network, class Timer, int a, int b>
+void MQTT::Client<Network, Timer, a, b>::freeQoS2msgid(unsigned short id)
+{
+ for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
+ {
+ if (incomingQoS2messages[i] == id)
+ {
+ incomingQoS2messages[i] = 0;
+ return;
+ }
+ }
+}
#endif
@@ -307,7 +334,7 @@
while (sent < length && !timer.expired())
{
- rc = ipstack.write(&sendbuf[sent], length, timer.left_ms());
+ rc = ipstack.write(&sendbuf[sent], length - sent, timer.left_ms());
if (rc < 0) // there was an error writing the data
break;
sent += rc;
@@ -322,8 +349,8 @@
rc = FAILURE;
#if defined(MQTT_DEBUG)
- char printbuf[50];
- DEBUG("Rc %d from sending packet %s\n", rc, MQTTPacket_toString(printbuf, sizeof(printbuf), sendbuf, length));
+ char printbuf[150];
+ DEBUG("Rc %d from sending packet %s\n", rc, MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length));
#endif
return rc;
}
@@ -364,8 +391,8 @@
* @param timeout the max time to wait for the packet read to complete, in milliseconds
* @return the MQTT packet type, or -1 if none
*/
-template<class Network, class Timer, int a, int b>
-int MQTT::Client<Network, Timer, a, b>::readPacket(Timer& timer)
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
+int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::readPacket(Timer& timer)
{
int rc = FAILURE;
MQTTHeader header = {0};
@@ -381,6 +408,12 @@
decodePacket(&rem_len, timer.left_ms());
len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */
+ if (rem_len > (MAX_MQTT_PACKET_SIZE - len))
+ {
+ rc = BUFFER_OVERFLOW;
+ goto exit;
+ }
+
/* 3. read the rest of the buffer using a callback to supply the rest of the data */
if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len))
goto exit;
@@ -392,8 +425,11 @@
exit:
#if defined(MQTT_DEBUG)
- char printbuf[50];
- DEBUG("Rc %d from receiving packet %s\n", rc, MQTTPacket_toString(printbuf, sizeof(printbuf), readbuf, len));
+ if (rc >= 0)
+ {
+ char printbuf[50];
+ DEBUG("Rc %d from receiving packet %s\n", rc, MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len));
+ }
#endif
return rc;
}
@@ -473,7 +509,7 @@
timer.countdown_ms(timeout_ms);
while (!timer.expired())
{
- if (cycle(timer) == FAILURE)
+ if (cycle(timer) < 0)
{
rc = FAILURE;
break;
@@ -490,23 +526,30 @@
/* get one piece of work off the wire and one pass through */
// read the socket, see what work is due
- unsigned short packet_type = readPacket(timer);
+ int packet_type = readPacket(timer);
int len = 0,
rc = SUCCESS;
switch (packet_type)
{
+ case FAILURE:
+ case BUFFER_OVERFLOW:
+ rc = packet_type;
+ break;
case CONNACK:
case PUBACK:
case SUBACK:
break;
case PUBLISH:
- MQTTString topicName;
+ {
+ MQTTString topicName = MQTTString_initializer;
Message msg;
- if (MQTTDeserialize_publish((unsigned char*)&msg.dup, (int*)&msg.qos, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName,
+ int intQoS;
+ if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName,
(unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
goto exit;
+ msg.qos = (enum QoS)intQoS;
#if MQTTCLIENT_QOS2
if (msg.qos != QOS2)
#endif
@@ -518,7 +561,7 @@
deliverMessage(topicName, msg);
else
WARN("Maximum number of incoming QoS2 messages exceeded");
- }
+ }
#endif
#if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
if (msg.qos != QOS0)
@@ -536,19 +579,25 @@
}
break;
#endif
+ }
#if MQTTCLIENT_QOS2
case PUBREC:
+ case PUBREL:
unsigned short mypacketid;
unsigned char dup, type;
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
rc = FAILURE;
- else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid)) <= 0)
+ else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE,
+ (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0)
rc = FAILURE;
else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet
rc = FAILURE; // there was a problem
if (rc == FAILURE)
goto exit; // there was a problem
+ if (packet_type == PUBREL)
+ freeQoS2msgid(mypacketid);
break;
+
case PUBCOMP:
break;
#endif
@@ -579,7 +628,7 @@
{
if (!ping_outstanding)
{
- Timer timer = Timer(1000);
+ Timer timer(1000);
int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet
ping_outstanding = true;
@@ -611,7 +660,7 @@
template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options)
{
- Timer connect_timer = Timer(command_timeout_ms);
+ Timer connect_timer(command_timeout_ms);
int rc = FAILURE;
int len = 0;
@@ -639,10 +688,10 @@
}
else
rc = FAILURE;
-
+
#if MQTTCLIENT_QOS2
- // resend an inflight publish
- if (inflightMsgid >0 && inflightQoS == QOS2 && pubrel)
+ // resend any inflight publish
+ if (inflightMsgid > 0 && inflightQoS == QOS2 && pubrel)
{
if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0)
rc = FAILURE;
@@ -678,9 +727,9 @@
int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
{
int rc = FAILURE;
- Timer timer = Timer(command_timeout_ms);
+ Timer timer(command_timeout_ms);
int len = 0;
- MQTTString topic = {(char*)topicFilter, 0, 0};
+ MQTTString topic = {(char*)topicFilter, {0, 0}};
if (!isconnected)
goto exit;
@@ -716,7 +765,7 @@
exit:
if (rc != SUCCESS)
- isconnected = false;
+ cleanSession();
return rc;
}
@@ -725,8 +774,8 @@
int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter)
{
int rc = FAILURE;
- Timer timer = Timer(command_timeout_ms);
- MQTTString topic = {(char*)topicFilter, 0, 0};
+ Timer timer(command_timeout_ms);
+ MQTTString topic = {(char*)topicFilter, {0, 0}};
int len = 0;
if (!isconnected)
@@ -741,14 +790,26 @@
{
unsigned short mypacketid; // should be the same as the packetid above
if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
+ {
rc = 0;
+
+ // remove the subscription message handler associated with this topic, if there is one
+ for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
+ {
+ if (messageHandlers[i].topicFilter && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0)
+ {
+ messageHandlers[i].topicFilter = 0;
+ break;
+ }
+ }
+ }
}
else
rc = FAILURE;
exit:
if (rc != SUCCESS)
- isconnected = false;
+ cleanSession();
return rc;
}
@@ -757,11 +818,11 @@
int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos)
{
int rc;
-
+
if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet
goto exit; // there was a problem
-#if MQTTCLIENT_QOS1
+#if MQTTCLIENT_QOS1
if (qos == QOS1)
{
if (waitfor(PUBACK, timer) == PUBACK)
@@ -795,7 +856,7 @@
exit:
if (rc != SUCCESS)
- isconnected = false;
+ cleanSession();
return rc;
}
@@ -805,13 +866,13 @@
int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained)
{
int rc = FAILURE;
- Timer timer = Timer(command_timeout_ms);
+ Timer timer(command_timeout_ms);
MQTTString topicString = MQTTString_initializer;
int len = 0;
if (!isconnected)
goto exit;
-
+
topicString.cstring = (char*)topicName;
#if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
@@ -823,7 +884,7 @@
topicString, (unsigned char*)payload, payloadlen);
if (len <= 0)
goto exit;
-
+
#if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
if (!cleansession)
{
@@ -836,7 +897,7 @@
#endif
}
#endif
-
+
rc = publish(len, timer, qos);
exit:
return rc;
@@ -862,14 +923,17 @@
int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect()
{
int rc = FAILURE;
- Timer timer = Timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete
+ Timer timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete
int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE);
if (len > 0)
rc = sendPacket(len, timer); // send the disconnect packet
- isconnected = false;
+ if (cleansession)
+ cleanSession();
+ else
+ isconnected = false;
return rc;
}
-#endif
+#endif
\ No newline at end of file
