MQTTPacket fixes

Fork of MQTTPacket by MQTT

Files at this revision

API Documentation at this revision

Comitter:
icraggs
Date:
Tue Oct 03 17:02:36 2017 +0000
Parent:
22:1af97b41bc9d
Commit message:
Latest level of Paho code including PUBREL packet (QoS) fix.

Changed in this revision

MQTTFormat.c Show annotated file Show diff for this revision Revisions of this file
MQTTFormat.h Show annotated file Show diff for this revision Revisions of this file
MQTTPacket.c Show annotated file Show diff for this revision Revisions of this file
MQTTPacket.h Show annotated file Show diff for this revision Revisions of this file
MQTTSerializePublish.c Show annotated file Show diff for this revision Revisions of this file
diff -r 1af97b41bc9d -r 0cfb74c5a621 MQTTFormat.c
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/MQTTFormat.c	Tue Oct 03 17:02:36 2017 +0000
@@ -0,0 +1,262 @@
+/*******************************************************************************
+ * Copyright (c) 2014 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial API and implementation and/or initial documentation
+ *******************************************************************************/
+
+#include "StackTrace.h"
+#include "MQTTPacket.h"
+
+#include <string.h>
+
+
+const char* MQTTPacket_names[] =
+{
+    "RESERVED", "CONNECT", "CONNACK", "PUBLISH", "PUBACK", "PUBREC", "PUBREL",
+    "PUBCOMP", "SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK",
+    "PINGREQ", "PINGRESP", "DISCONNECT"
+};
+
+
+const char* MQTTPacket_getName(unsigned short packetid)
+{
+    return MQTTPacket_names[packetid];
+}
+
+
+int MQTTStringFormat_connect(char* strbuf, int strbuflen, MQTTPacket_connectData* data)
+{
+    int strindex = 0;
+
+    strindex = snprintf(strbuf, strbuflen,
+            "CONNECT MQTT version %d, client id %.*s, clean session %d, keep alive %d",
+            (int)data->MQTTVersion, data->clientID.lenstring.len, data->clientID.lenstring.data,
+            (int)data->cleansession, data->keepAliveInterval);
+    if (data->willFlag)
+        strindex += snprintf(&strbuf[strindex], strbuflen - strindex,
+                ", will QoS %d, will retain %d, will topic %.*s, will message %.*s",
+                data->will.qos, data->will.retained,
+                data->will.topicName.lenstring.len, data->will.topicName.lenstring.data,
+                data->will.message.lenstring.len, data->will.message.lenstring.data);
+    if (data->username.lenstring.data && data->username.lenstring.len > 0)
+        strindex += snprintf(&strbuf[strindex], strbuflen - strindex,
+                ", user name %.*s", data->username.lenstring.len, data->username.lenstring.data);
+    if (data->password.lenstring.data && data->password.lenstring.len > 0)
+        strindex += snprintf(&strbuf[strindex], strbuflen - strindex,
+                ", password %.*s", data->password.lenstring.len, data->password.lenstring.data);
+    return strindex;
+}
+
+
+int MQTTStringFormat_connack(char* strbuf, int strbuflen, unsigned char connack_rc, unsigned char sessionPresent)
+{
+    int strindex = snprintf(strbuf, strbuflen, "CONNACK session present %d, rc %d", sessionPresent, connack_rc);
+    return strindex;
+}
+
+
+int MQTTStringFormat_publish(char* strbuf, int strbuflen, unsigned char dup, int qos, unsigned char retained,
+        unsigned short packetid, MQTTString topicName, unsigned char* payload, int payloadlen)
+{
+    int strindex = snprintf(strbuf, strbuflen,
+                "PUBLISH dup %d, QoS %d, retained %d, packet id %d, topic %.*s, payload length %d, payload %.*s",
+                dup, qos, retained, packetid,
+                (topicName.lenstring.len < 20) ? topicName.lenstring.len : 20, topicName.lenstring.data,
+                payloadlen, (payloadlen < 20) ? payloadlen : 20, payload);
+    return strindex;
+}
+
+
+int MQTTStringFormat_ack(char* strbuf, int strbuflen, unsigned char packettype, unsigned char dup, unsigned short packetid)
+{
+    int strindex = snprintf(strbuf, strbuflen, "%s, packet id %d", MQTTPacket_names[packettype], packetid);
+    if (dup)
+        strindex += snprintf(strbuf + strindex, strbuflen - strindex, ", dup %d", dup);
+    return strindex;
+}
+
+
+int MQTTStringFormat_subscribe(char* strbuf, int strbuflen, unsigned char dup, unsigned short packetid, int count,
+        MQTTString topicFilters[], int requestedQoSs[])
+{
+    return snprintf(strbuf, strbuflen,
+        "SUBSCRIBE dup %d, packet id %d count %d topic %.*s qos %d",
+        dup, packetid, count,
+        topicFilters[0].lenstring.len, topicFilters[0].lenstring.data,
+        requestedQoSs[0]);
+}
+
+
+int MQTTStringFormat_suback(char* strbuf, int strbuflen, unsigned short packetid, int count, int* grantedQoSs)
+{
+    return snprintf(strbuf, strbuflen,
+        "SUBACK packet id %d count %d granted qos %d", packetid, count, grantedQoSs[0]);
+}
+
+
+int MQTTStringFormat_unsubscribe(char* strbuf, int strbuflen, unsigned char dup, unsigned short packetid,
+        int count, MQTTString topicFilters[])
+{
+    return snprintf(strbuf, strbuflen,
+                    "UNSUBSCRIBE dup %d, packet id %d count %d topic %.*s",
+                    dup, packetid, count,
+                    topicFilters[0].lenstring.len, topicFilters[0].lenstring.data);
+}
+
+
+#if defined(MQTT_CLIENT)
+char* MQTTFormat_toClientString(char* strbuf, int strbuflen, unsigned char* buf, int buflen)
+{
+    int index = 0;
+    int rem_length = 0;
+    MQTTHeader header = {0};
+    int strindex = 0;
+
+    header.byte = buf[index++];
+    index += MQTTPacket_decodeBuf(&buf[index], &rem_length);
+
+    switch (header.bits.type)
+    {
+
+    case CONNACK:
+    {
+        unsigned char sessionPresent, connack_rc;
+        if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) == 1)
+            strindex = MQTTStringFormat_connack(strbuf, strbuflen, connack_rc, sessionPresent);
+    }
+    break;
+    case PUBLISH:
+    {
+        unsigned char dup, retained, *payload;
+        unsigned short packetid;
+        int qos, payloadlen;
+        MQTTString topicName = MQTTString_initializer;
+        if (MQTTDeserialize_publish(&dup, &qos, &retained, &packetid, &topicName,
+                &payload, &payloadlen, buf, buflen) == 1)
+            strindex = MQTTStringFormat_publish(strbuf, strbuflen, dup, qos, retained, packetid,
+                    topicName, payload, payloadlen);
+    }
+    break;
+    case PUBACK:
+    case PUBREC:
+    case PUBREL:
+    case PUBCOMP:
+    {
+        unsigned char packettype, dup;
+        unsigned short packetid;
+        if (MQTTDeserialize_ack(&packettype, &dup, &packetid, buf, buflen) == 1)
+            strindex = MQTTStringFormat_ack(strbuf, strbuflen, packettype, dup, packetid);
+    }
+    break;
+    case SUBACK:
+    {
+        unsigned short packetid;
+        int maxcount = 1, count = 0;
+        int grantedQoSs[1];
+        if (MQTTDeserialize_suback(&packetid, maxcount, &count, grantedQoSs, buf, buflen) == 1)
+            strindex = MQTTStringFormat_suback(strbuf, strbuflen, packetid, count, grantedQoSs);
+    }
+    break;
+    case UNSUBACK:
+    {
+        unsigned short packetid;
+        if (MQTTDeserialize_unsuback(&packetid, buf, buflen) == 1)
+            strindex = MQTTStringFormat_ack(strbuf, strbuflen, UNSUBACK, 0, packetid);
+    }
+    break;
+    case PINGREQ:
+    case PINGRESP:
+    case DISCONNECT:
+        strindex = snprintf(strbuf, strbuflen, "%s", MQTTPacket_names[header.bits.type]);
+        break;
+    }
+    return strbuf;
+}
+#endif
+
+#if defined(MQTT_SERVER)
+char* MQTTFormat_toServerString(char* strbuf, int strbuflen, unsigned char* buf, int buflen)
+{
+    int index = 0;
+    int rem_length = 0;
+    MQTTHeader header = {0};
+    int strindex = 0;
+
+    header.byte = buf[index++];
+    index += MQTTPacket_decodeBuf(&buf[index], &rem_length);
+
+    switch (header.bits.type)
+    {
+    case CONNECT:
+    {
+        MQTTPacket_connectData data;
+        int rc;
+        if ((rc = MQTTDeserialize_connect(&data, buf, buflen)) == 1)
+            strindex = MQTTStringFormat_connect(strbuf, strbuflen, &data);
+    }
+    break;
+    case PUBLISH:
+    {
+        unsigned char dup, retained, *payload;
+        unsigned short packetid;
+        int qos, payloadlen;
+        MQTTString topicName = MQTTString_initializer;
+        if (MQTTDeserialize_publish(&dup, &qos, &retained, &packetid, &topicName,
+                &payload, &payloadlen, buf, buflen) == 1)
+            strindex = MQTTStringFormat_publish(strbuf, strbuflen, dup, qos, retained, packetid,
+                    topicName, payload, payloadlen);
+    }
+    break;
+    case PUBACK:
+    case PUBREC:
+    case PUBREL:
+    case PUBCOMP:
+    {
+        unsigned char packettype, dup;
+        unsigned short packetid;
+        if (MQTTDeserialize_ack(&packettype, &dup, &packetid, buf, buflen) == 1)
+            strindex = MQTTStringFormat_ack(strbuf, strbuflen, packettype, dup, packetid);
+    }
+    break;
+    case SUBSCRIBE:
+    {
+        unsigned char dup;
+        unsigned short packetid;
+        int maxcount = 1, count = 0;
+        MQTTString topicFilters[1];
+        int requestedQoSs[1];
+        if (MQTTDeserialize_subscribe(&dup, &packetid, maxcount, &count,
+                topicFilters, requestedQoSs, buf, buflen) == 1)
+            strindex = MQTTStringFormat_subscribe(strbuf, strbuflen, dup, packetid, count, topicFilters, requestedQoSs);;
+    }
+    break;
+    case UNSUBSCRIBE:
+    {
+        unsigned char dup;
+        unsigned short packetid;
+        int maxcount = 1, count = 0;
+        MQTTString topicFilters[1];
+        if (MQTTDeserialize_unsubscribe(&dup, &packetid, maxcount, &count, topicFilters, buf, buflen) == 1)
+            strindex =  MQTTStringFormat_unsubscribe(strbuf, strbuflen, dup, packetid, count, topicFilters);
+    }
+    break;
+    case PINGREQ:
+    case PINGRESP:
+    case DISCONNECT:
+        strindex = snprintf(strbuf, strbuflen, "%s", MQTTPacket_names[header.bits.type]);
+        break;
+    }
+    strbuf[strbuflen] = '\0';
+    return strbuf;
+}
+#endif
diff -r 1af97b41bc9d -r 0cfb74c5a621 MQTTFormat.h
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/MQTTFormat.h	Tue Oct 03 17:02:36 2017 +0000
@@ -0,0 +1,40 @@
+/*******************************************************************************
+ * Copyright (c) 2014 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial API and implementation and/or initial documentation
+ *******************************************************************************/
+
+#if !defined(MQTTFORMAT_H)
+#define MQTTFORMAT_H
+
+#define MQTT_CLIENT
+#define MQTT_SERVER
+
+#include "StackTrace.h"
+#include "MQTTPacket.h"
+
+const char* MQTTPacket_getName(unsigned short packetid);
+int MQTTStringFormat_connect(char* strbuf, int strbuflen, MQTTPacket_connectData* data);
+int MQTTStringFormat_connack(char* strbuf, int strbuflen, unsigned char connack_rc, unsigned char sessionPresent);
+int MQTTStringFormat_publish(char* strbuf, int strbuflen, unsigned char dup, int qos, unsigned char retained,
+        unsigned short packetid, MQTTString topicName, unsigned char* payload, int payloadlen);
+int MQTTStringFormat_ack(char* strbuf, int strbuflen, unsigned char packettype, unsigned char dup, unsigned short packetid);
+int MQTTStringFormat_subscribe(char* strbuf, int strbuflen, unsigned char dup, unsigned short packetid, int count,
+        MQTTString topicFilters[], int requestedQoSs[]);
+int MQTTStringFormat_suback(char* strbuf, int strbuflen, unsigned short packetid, int count, int* grantedQoSs);
+int MQTTStringFormat_unsubscribe(char* strbuf, int strbuflen, unsigned char dup, unsigned short packetid,
+        int count, MQTTString topicFilters[]);
+char* MQTTFormat_toClientString(char* strbuf, int strbuflen, unsigned char* buf, int buflen);
+char* MQTTFormat_toServerString(char* strbuf, int strbuflen, unsigned char* buf, int buflen);
+
+#endif
diff -r 1af97b41bc9d -r 0cfb74c5a621 MQTTPacket.c
--- a/MQTTPacket.c	Mon Sep 25 12:03:27 2017 +0000
+++ b/MQTTPacket.c	Tue Oct 03 17:02:36 2017 +0000
@@ -12,6 +12,7 @@
  *
  * Contributors:
  *    Ian Craggs - initial API and implementation and/or initial documentation
+ *    Sergio R. Caprile - non-blocking packet read functions for stream transport
  *******************************************************************************/
 
 #include "StackTrace.h"
@@ -284,6 +285,7 @@
  * @param buflen the length in bytes of the supplied buffer
  * @param getfn pointer to a function which will read any number of bytes from the needed source
  * @return integer MQTT packet type, or -1 on error
+ * @note  the whole message must fit into the caller's buffer
  */
 int MQTTPacket_read(unsigned char* buf, int buflen, int (*getfn)(unsigned char*, int))
 {
@@ -302,7 +304,9 @@
 	len += MQTTPacket_encode(buf + 1, rem_len); /* put the original remaining length back into the buffer */
 
 	/* 3. read the rest of the buffer using a callback to supply the rest of the data */
-	if ((*getfn)(buf + len, rem_len) != rem_len)
+	if((rem_len + len) > buflen)
+		goto exit;
+	if (rem_len && ((*getfn)(buf + len, rem_len) != rem_len))
 		goto exit;
 
 	header.byte = buf[0];
@@ -311,143 +315,97 @@
 	return rc;
 }
 
-
-const char* MQTTPacket_names[] =
+/**
+ * Decodes the message length according to the MQTT algorithm, non-blocking
+ * @param trp pointer to a transport structure holding what is needed to solve getting data from it
+ * @param value the decoded length returned
+ * @return integer the number of bytes read from the socket, 0 for call again, or -1 on error
+ */
+static int MQTTPacket_decodenb(MQTTTransport *trp)
 {
-	"RESERVED", "CONNECT", "CONNACK", "PUBLISH", "PUBACK", "PUBREC", "PUBREL",
-	"PUBCOMP", "SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK",
-	"PINGREQ", "PINGRESP", "DISCONNECT"
-};
-
-
-char* MQTTPacket_toString(char* strbuf, int strbuflen, unsigned char* buf, int buflen)
-{
-	int index = 0;
-	int rem_length = 0;
-	MQTTHeader header = {0};
-	int strindex = 0;
-
-	header.byte = buf[index++];
-	index += MQTTPacket_decodeBuf(&buf[index], &rem_length);
+	unsigned char c;
+	int rc = MQTTPACKET_READ_ERROR;
 
-	switch (header.bits.type)
-	{
-	case CONNECT:
-	{
-		MQTTPacket_connectData data;
-		if (MQTTDeserialize_connect(&data, buf, buflen) == 1)
-		{
-			strindex = snprintf(strbuf, strbuflen,
-				"CONNECT MQTT version %d, client id %.*s, clean session %d, keep alive %hd",
-				(int)data.MQTTVersion, data.clientID.lenstring.len, data.clientID.lenstring.data,
-				(int)data.cleansession, data.keepAliveInterval);
-			if (data.willFlag)
-				strindex += snprintf(&strbuf[strindex], strbuflen - strindex,
-				", will QoS %d, will retain %d, will topic %.*s, will message %.*s",
-				data.will.qos, data.will.retained,
-				data.will.topicName.lenstring.len, data.will.topicName.lenstring.data,
-				data.will.message.lenstring.len, data.will.message.lenstring.data);
-			if (data.username.lenstring.data && data.username.lenstring.len > 0)
-			{
-				printf("user name\n");
-				strindex += snprintf(&strbuf[strindex], strbuflen - strindex,
-				", user name %.*s", data.username.lenstring.len, data.username.lenstring.data);
-			}
-			if (data.password.lenstring.data && data.password.lenstring.len > 0)
-				strindex += snprintf(&strbuf[strindex], strbuflen - strindex,
-				", password %.*s", data.password.lenstring.len, data.password.lenstring.data);
-		}
-	}
-	break;
-	case CONNACK:
-	{
-		unsigned char sessionPresent, connack_rc;
-		if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) == 1)
-			strindex = snprintf(strbuf, strbuflen,
-			"CONNACK session present %d, rc %d", sessionPresent, connack_rc);
+	FUNC_ENTRY;
+	if(trp->len == 0){		/* initialize on first call */
+		trp->multiplier = 1;
+		trp->rem_len = 0;
 	}
-	break;
-	case PUBLISH:
-	{
-		unsigned char dup, retained, *payload;
-		unsigned short packetid;
-		int qos, payloadlen;
-		MQTTString topicName = MQTTString_initializer;
-		if (MQTTDeserialize_publish(&dup, &qos, &retained, &packetid, &topicName,
-				&payload, &payloadlen, buf, buflen) == 1)
-			strindex = snprintf(strbuf, strbuflen,
-				"PUBLISH dup %d, QoS %d, retained %d, packet id %d, topic %.*s, payload length %d, payload %.*s",
-				dup, qos, retained, packetid,
-				(topicName.lenstring.len < 20) ? topicName.lenstring.len : 20, topicName.lenstring.data,
-				payloadlen, (payloadlen < 20) ? payloadlen : 20, payload);
-	}
-	break;
-	case PUBACK:
-	case PUBREC:
-	case PUBREL:
-	case PUBCOMP:
-	{
-		unsigned char packettype, dup;
-		unsigned short packetid;
-		if (MQTTDeserialize_ack(&packettype, &dup, &packetid, buf, buflen) == 1)
-			strindex = snprintf(strbuf, strbuflen,
-				"%s dup %d, packet id %d",
-				MQTTPacket_names[packettype], dup, packetid);
-	}
-	break;
-	case SUBSCRIBE:
-	{
-		unsigned char dup;
-		unsigned short packetid;
-		int maxcount = 1, count = 0;
-		MQTTString topicFilters[1];
-		int requestedQoSs[1];
-		if (MQTTDeserialize_subscribe(&dup, &packetid, maxcount, &count,
-				topicFilters, requestedQoSs, buf, buflen) == 1)
-			strindex = snprintf(strbuf, strbuflen,
-				"SUBSCRIBE dup %d, packet id %d count %d topic %.*s qos %d",
-				dup, packetid, count,
-				topicFilters[0].lenstring.len, topicFilters[0].lenstring.data,
-				requestedQoSs[0]);
-	}
-	break;
-	case SUBACK:
-	{
-		unsigned short packetid;
-		int maxcount = 1, count = 0;
-		int grantedQoSs[1];
-		if (MQTTDeserialize_suback(&packetid, maxcount, &count, grantedQoSs, buf, buflen) == 1)
-			strindex = snprintf(strbuf, strbuflen,
-				"SUBACK packet id %d count %d granted qos %d",
-				packetid, count, grantedQoSs[0]);
-	}
-	break;
-	case UNSUBSCRIBE:
-	{
-		unsigned char dup;
-		unsigned short packetid;
-		int maxcount = 1, count = 0;
-		MQTTString topicFilters[1];
-		if (MQTTDeserialize_unsubscribe(&dup, &packetid, maxcount, &count, topicFilters, buf, buflen) == 1)
-			strindex = snprintf(strbuf, strbuflen,
-				"UNSUBSCRIBE dup %d, packet id %d count %d topic %.*s",
-				dup, packetid, count,
-				topicFilters[0].lenstring.len, topicFilters[0].lenstring.data);
-	}
-	break;
-	case UNSUBACK:
-	{
-		unsigned short packetid;
-		if (MQTTDeserialize_unsuback(&packetid, buf, buflen) == 1)
-			strindex = snprintf(strbuf, strbuflen,
-				"UNSUBACK packet id %d", packetid);
-	}
-	break;
-	case PINGREQ:
-	case PINGRESP:
-	case DISCONNECT:
-		strindex = snprintf(strbuf, strbuflen, "%s", MQTTPacket_names[header.bits.type]);
+	do {
+		int frc;
+		if (trp->len >= MAX_NO_OF_REMAINING_LENGTH_BYTES)
+			goto exit;
+		if ((frc=(*trp->getfn)(trp->sck, &c, 1)) == -1)
+			goto exit;
+		if (frc == 0){
+			rc = 0;
+			goto exit;
+		}
+		++(trp->len);
+		trp->rem_len += (c & 127) * trp->multiplier;
+		trp->multiplier *= 128;
+	} while ((c & 128) != 0);
+	rc = trp->len;
+exit:
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+/**
+ * Helper function to read packet data from some source into a buffer, non-blocking
+ * @param buf the buffer into which the packet will be serialized
+ * @param buflen the length in bytes of the supplied buffer
+ * @param trp pointer to a transport structure holding what is needed to solve getting data from it
+ * @return integer MQTT packet type, 0 for call again, or -1 on error
+ * @note  the whole message must fit into the caller's buffer
+ */
+int MQTTPacket_readnb(unsigned char* buf, int buflen, MQTTTransport *trp)
+{
+	int rc = -1, frc;
+	MQTTHeader header = {0};
+
+	switch(trp->state){
+	default:
+		trp->state = 0;
+		/*FALLTHROUGH*/
+	case 0:
+		/* read the header byte.  This has the packet type in it */
+		if ((frc=(*trp->getfn)(trp->sck, buf, 1)) == -1)
+			goto exit;
+		if (frc == 0)
+			return 0;
+		trp->len = 0;
+		++trp->state;
+		/*FALLTHROUGH*/
+		/* read the remaining length.  This is variable in itself */
+	case 1:
+		if((frc=MQTTPacket_decodenb(trp)) == MQTTPACKET_READ_ERROR)
+			goto exit;
+		if(frc == 0)
+			return 0;
+		trp->len = 1 + MQTTPacket_encode(buf + 1, trp->rem_len); /* put the original remaining length back into the buffer */
+		if((trp->rem_len + trp->len) > buflen)
+			goto exit;
+		++trp->state;
+		/*FALLTHROUGH*/
+	case 2:
+		if(trp->rem_len){
+			/* read the rest of the buffer using a callback to supply the rest of the data */
+			if ((frc=(*trp->getfn)(trp->sck, buf + trp->len, trp->rem_len)) == -1)
+				goto exit;
+			if (frc == 0)
+				return 0;
+			trp->rem_len -= frc;
+			trp->len += frc;
+			if(trp->rem_len)
+				return 0;
+		}
+		header.byte = buf[0];
+		rc = header.bits.type;
 		break;
 	}
-	return strbuf;
+
+exit:
+	trp->state = 0;
+	return rc;
 }
diff -r 1af97b41bc9d -r 0cfb74c5a621 MQTTPacket.h
--- a/MQTTPacket.h	Mon Sep 25 12:03:27 2017 +0000
+++ b/MQTTPacket.h	Tue Oct 03 17:02:36 2017 +0000
@@ -12,6 +12,7 @@
  *
  * Contributors:
  *    Ian Craggs - initial API and implementation and/or initial documentation
+ *    Xiang Rong - 442039 Add makefile to Embedded C client
  *******************************************************************************/
 
 #ifndef MQTTPACKET_H_
@@ -21,11 +22,22 @@
 extern "C" {
 #endif
 
+#if defined(WIN32_DLL) || defined(WIN64_DLL)
+  #define DLLImport __declspec(dllimport)
+  #define DLLExport __declspec(dllexport)
+#elif defined(LINUX_SO)
+  #define DLLImport extern
+  #define DLLExport  __attribute__ ((visibility ("default")))
+#else
+  #define DLLImport
+  #define DLLExport  
+#endif
+
 enum errors
 {
 	MQTTPACKET_BUFFER_TOO_SHORT = -2,
 	MQTTPACKET_READ_ERROR = -1,
-	MQTTPACKET_READ_COMPLETE,
+	MQTTPACKET_READ_COMPLETE
 };
 
 enum msgTypes
@@ -80,14 +92,15 @@
 #include "MQTTPublish.h"
 #include "MQTTSubscribe.h"
 #include "MQTTUnsubscribe.h"
+#include "MQTTFormat.h"
 
-int MQTTSerialize_ack(unsigned char* buf, int buflen, unsigned char type, unsigned char dup, unsigned short packetid);
-int MQTTDeserialize_ack(unsigned char* packettype, unsigned char* dup, unsigned short* packetid, unsigned char* buf, int buflen);
+DLLExport int MQTTSerialize_ack(unsigned char* buf, int buflen, unsigned char type, unsigned char dup, unsigned short packetid);
+DLLExport int MQTTDeserialize_ack(unsigned char* packettype, unsigned char* dup, unsigned short* packetid, unsigned char* buf, int buflen);
 
 int MQTTPacket_len(int rem_len);
-int MQTTPacket_equals(MQTTString* a, char* b);
+DLLExport int MQTTPacket_equals(MQTTString* a, char* b);
 
-int MQTTPacket_encode(unsigned char* buf, int length);
+DLLExport int MQTTPacket_encode(unsigned char* buf, int length);
 int MQTTPacket_decode(int (*getcharfn)(unsigned char*, int), int* value);
 int MQTTPacket_decodeBuf(unsigned char* buf, int* value);
 
@@ -99,9 +112,18 @@
 void writeCString(unsigned char** pptr, const char* string);
 void writeMQTTString(unsigned char** pptr, MQTTString mqttstring);
 
-int MQTTPacket_read(unsigned char* buf, int buflen, int (*getfn)(unsigned char*, int));
+DLLExport int MQTTPacket_read(unsigned char* buf, int buflen, int (*getfn)(unsigned char*, int));
 
-char* MQTTPacket_toString(char* strbuf, int strbuflen, unsigned char* buf, int buflen);
+typedef struct {
+	int (*getfn)(void *, unsigned char*, int); /* must return -1 for error, 0 for call again, or the number of bytes read */
+	void *sck;	/* pointer to whatever the system may use to identify the transport */
+	int multiplier;
+	int rem_len;
+	int len;
+	char state;
+}MQTTTransport;
+
+int MQTTPacket_readnb(unsigned char* buf, int buflen, MQTTTransport *trp);
 
 #ifdef __cplusplus /* If this is a C++ compiler, use C linkage */
 }
diff -r 1af97b41bc9d -r 0cfb74c5a621 MQTTSerializePublish.c
--- a/MQTTSerializePublish.c	Mon Sep 25 12:03:27 2017 +0000
+++ b/MQTTSerializePublish.c	Tue Oct 03 17:02:36 2017 +0000
@@ -12,6 +12,7 @@
  *
  * Contributors:
  *    Ian Craggs - initial API and implementation and/or initial documentation
+ *    Ian Craggs - fix for https://bugs.eclipse.org/bugs/show_bug.cgi?id=453144
  *******************************************************************************/
 
 #include "MQTTPacket.h"
@@ -114,7 +115,7 @@
 	}
 	header.bits.type = packettype;
 	header.bits.dup = dup;
-	header.bits.qos = 0;
+	header.bits.qos = (packettype == PUBREL) ? 1 : 0;
 	writeChar(&ptr, header.byte); /* write header */
 
 	ptr += MQTTPacket_encode(ptr, 2); /* write remaining length */
@@ -135,7 +136,7 @@
   */
 int MQTTSerialize_puback(unsigned char* buf, int buflen, unsigned short packetid)
 {
-	return MQTTSerialize_ack(buf, buflen, PUBACK, packetid, 0);
+	return MQTTSerialize_ack(buf, buflen, PUBACK, 0, packetid);
 }
 
 
@@ -149,7 +150,7 @@
   */
 int MQTTSerialize_pubrel(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid)
 {
-	return MQTTSerialize_ack(buf, buflen, PUBREL, packetid, dup);
+	return MQTTSerialize_ack(buf, buflen, PUBREL, dup, packetid);
 }
 
 
@@ -162,7 +163,6 @@
   */
 int MQTTSerialize_pubcomp(unsigned char* buf, int buflen, unsigned short packetid)
 {
-	return MQTTSerialize_ack(buf, buflen, PUBCOMP, packetid, 0);
+	return MQTTSerialize_ack(buf, buflen, PUBCOMP, 0, packetid);
 }
 
-