Basic C library for MQTT packet serialization and deserialization

Dependents:   MQTT MQTT MQTT MQTT ... more

Fork of MQTTPacket by MQTT

This library is part of the EclipseTM Paho project; specifically the embedded client.

A basic MQTT library in C for packet serialization and deserialization

Revision:
23:7a52009beba1
Parent:
19:99773f597e90
--- a/MQTTPacket.c	Mon Sep 25 12:03:27 2017 +0000
+++ b/MQTTPacket.c	Fri Oct 06 13:27:35 2017 +0900
@@ -12,6 +12,7 @@
  *
  * Contributors:
  *    Ian Craggs - initial API and implementation and/or initial documentation
+ *    Sergio R. Caprile - non-blocking packet read functions for stream transport
  *******************************************************************************/
 
 #include "StackTrace.h"
@@ -284,6 +285,7 @@
  * @param buflen the length in bytes of the supplied buffer
  * @param getfn pointer to a function which will read any number of bytes from the needed source
  * @return integer MQTT packet type, or -1 on error
+ * @note  the whole message must fit into the caller's buffer
  */
 int MQTTPacket_read(unsigned char* buf, int buflen, int (*getfn)(unsigned char*, int))
 {
@@ -302,7 +304,9 @@
 	len += MQTTPacket_encode(buf + 1, rem_len); /* put the original remaining length back into the buffer */
 
 	/* 3. read the rest of the buffer using a callback to supply the rest of the data */
-	if ((*getfn)(buf + len, rem_len) != rem_len)
+	if((rem_len + len) > buflen)
+		goto exit;
+	if (rem_len && ((*getfn)(buf + len, rem_len) != rem_len))
 		goto exit;
 
 	header.byte = buf[0];
@@ -311,143 +315,97 @@
 	return rc;
 }
 
-
-const char* MQTTPacket_names[] =
+/**
+ * Decodes the message length according to the MQTT algorithm, non-blocking
+ * @param trp pointer to a transport structure holding what is needed to solve getting data from it
+ * @param value the decoded length returned
+ * @return integer the number of bytes read from the socket, 0 for call again, or -1 on error
+ */
+static int MQTTPacket_decodenb(MQTTTransport *trp)
 {
-	"RESERVED", "CONNECT", "CONNACK", "PUBLISH", "PUBACK", "PUBREC", "PUBREL",
-	"PUBCOMP", "SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK",
-	"PINGREQ", "PINGRESP", "DISCONNECT"
-};
-
-
-char* MQTTPacket_toString(char* strbuf, int strbuflen, unsigned char* buf, int buflen)
-{
-	int index = 0;
-	int rem_length = 0;
-	MQTTHeader header = {0};
-	int strindex = 0;
-
-	header.byte = buf[index++];
-	index += MQTTPacket_decodeBuf(&buf[index], &rem_length);
+	unsigned char c;
+	int rc = MQTTPACKET_READ_ERROR;
 
-	switch (header.bits.type)
-	{
-	case CONNECT:
-	{
-		MQTTPacket_connectData data;
-		if (MQTTDeserialize_connect(&data, buf, buflen) == 1)
-		{
-			strindex = snprintf(strbuf, strbuflen,
-				"CONNECT MQTT version %d, client id %.*s, clean session %d, keep alive %hd",
-				(int)data.MQTTVersion, data.clientID.lenstring.len, data.clientID.lenstring.data,
-				(int)data.cleansession, data.keepAliveInterval);
-			if (data.willFlag)
-				strindex += snprintf(&strbuf[strindex], strbuflen - strindex,
-				", will QoS %d, will retain %d, will topic %.*s, will message %.*s",
-				data.will.qos, data.will.retained,
-				data.will.topicName.lenstring.len, data.will.topicName.lenstring.data,
-				data.will.message.lenstring.len, data.will.message.lenstring.data);
-			if (data.username.lenstring.data && data.username.lenstring.len > 0)
-			{
-				printf("user name\n");
-				strindex += snprintf(&strbuf[strindex], strbuflen - strindex,
-				", user name %.*s", data.username.lenstring.len, data.username.lenstring.data);
-			}
-			if (data.password.lenstring.data && data.password.lenstring.len > 0)
-				strindex += snprintf(&strbuf[strindex], strbuflen - strindex,
-				", password %.*s", data.password.lenstring.len, data.password.lenstring.data);
-		}
-	}
-	break;
-	case CONNACK:
-	{
-		unsigned char sessionPresent, connack_rc;
-		if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) == 1)
-			strindex = snprintf(strbuf, strbuflen,
-			"CONNACK session present %d, rc %d", sessionPresent, connack_rc);
+	FUNC_ENTRY;
+	if(trp->len == 0){		/* initialize on first call */
+		trp->multiplier = 1;
+		trp->rem_len = 0;
 	}
-	break;
-	case PUBLISH:
-	{
-		unsigned char dup, retained, *payload;
-		unsigned short packetid;
-		int qos, payloadlen;
-		MQTTString topicName = MQTTString_initializer;
-		if (MQTTDeserialize_publish(&dup, &qos, &retained, &packetid, &topicName,
-				&payload, &payloadlen, buf, buflen) == 1)
-			strindex = snprintf(strbuf, strbuflen,
-				"PUBLISH dup %d, QoS %d, retained %d, packet id %d, topic %.*s, payload length %d, payload %.*s",
-				dup, qos, retained, packetid,
-				(topicName.lenstring.len < 20) ? topicName.lenstring.len : 20, topicName.lenstring.data,
-				payloadlen, (payloadlen < 20) ? payloadlen : 20, payload);
-	}
-	break;
-	case PUBACK:
-	case PUBREC:
-	case PUBREL:
-	case PUBCOMP:
-	{
-		unsigned char packettype, dup;
-		unsigned short packetid;
-		if (MQTTDeserialize_ack(&packettype, &dup, &packetid, buf, buflen) == 1)
-			strindex = snprintf(strbuf, strbuflen,
-				"%s dup %d, packet id %d",
-				MQTTPacket_names[packettype], dup, packetid);
-	}
-	break;
-	case SUBSCRIBE:
-	{
-		unsigned char dup;
-		unsigned short packetid;
-		int maxcount = 1, count = 0;
-		MQTTString topicFilters[1];
-		int requestedQoSs[1];
-		if (MQTTDeserialize_subscribe(&dup, &packetid, maxcount, &count,
-				topicFilters, requestedQoSs, buf, buflen) == 1)
-			strindex = snprintf(strbuf, strbuflen,
-				"SUBSCRIBE dup %d, packet id %d count %d topic %.*s qos %d",
-				dup, packetid, count,
-				topicFilters[0].lenstring.len, topicFilters[0].lenstring.data,
-				requestedQoSs[0]);
-	}
-	break;
-	case SUBACK:
-	{
-		unsigned short packetid;
-		int maxcount = 1, count = 0;
-		int grantedQoSs[1];
-		if (MQTTDeserialize_suback(&packetid, maxcount, &count, grantedQoSs, buf, buflen) == 1)
-			strindex = snprintf(strbuf, strbuflen,
-				"SUBACK packet id %d count %d granted qos %d",
-				packetid, count, grantedQoSs[0]);
-	}
-	break;
-	case UNSUBSCRIBE:
-	{
-		unsigned char dup;
-		unsigned short packetid;
-		int maxcount = 1, count = 0;
-		MQTTString topicFilters[1];
-		if (MQTTDeserialize_unsubscribe(&dup, &packetid, maxcount, &count, topicFilters, buf, buflen) == 1)
-			strindex = snprintf(strbuf, strbuflen,
-				"UNSUBSCRIBE dup %d, packet id %d count %d topic %.*s",
-				dup, packetid, count,
-				topicFilters[0].lenstring.len, topicFilters[0].lenstring.data);
-	}
-	break;
-	case UNSUBACK:
-	{
-		unsigned short packetid;
-		if (MQTTDeserialize_unsuback(&packetid, buf, buflen) == 1)
-			strindex = snprintf(strbuf, strbuflen,
-				"UNSUBACK packet id %d", packetid);
-	}
-	break;
-	case PINGREQ:
-	case PINGRESP:
-	case DISCONNECT:
-		strindex = snprintf(strbuf, strbuflen, "%s", MQTTPacket_names[header.bits.type]);
+	do {
+		int frc;
+		if (trp->len >= MAX_NO_OF_REMAINING_LENGTH_BYTES)
+			goto exit;
+		if ((frc=(*trp->getfn)(trp->sck, &c, 1)) == -1)
+			goto exit;
+		if (frc == 0){
+			rc = 0;
+			goto exit;
+		}
+		++(trp->len);
+		trp->rem_len += (c & 127) * trp->multiplier;
+		trp->multiplier *= 128;
+	} while ((c & 128) != 0);
+	rc = trp->len;
+exit:
+	FUNC_EXIT_RC(rc);
+	return rc;
+}
+
+/**
+ * Helper function to read packet data from some source into a buffer, non-blocking
+ * @param buf the buffer into which the packet will be serialized
+ * @param buflen the length in bytes of the supplied buffer
+ * @param trp pointer to a transport structure holding what is needed to solve getting data from it
+ * @return integer MQTT packet type, 0 for call again, or -1 on error
+ * @note  the whole message must fit into the caller's buffer
+ */
+int MQTTPacket_readnb(unsigned char* buf, int buflen, MQTTTransport *trp)
+{
+	int rc = -1, frc;
+	MQTTHeader header = {0};
+
+	switch(trp->state){
+	default:
+		trp->state = 0;
+		/*FALLTHROUGH*/
+	case 0:
+		/* read the header byte.  This has the packet type in it */
+		if ((frc=(*trp->getfn)(trp->sck, buf, 1)) == -1)
+			goto exit;
+		if (frc == 0)
+			return 0;
+		trp->len = 0;
+		++trp->state;
+		/*FALLTHROUGH*/
+		/* read the remaining length.  This is variable in itself */
+	case 1:
+		if((frc=MQTTPacket_decodenb(trp)) == MQTTPACKET_READ_ERROR)
+			goto exit;
+		if(frc == 0)
+			return 0;
+		trp->len = 1 + MQTTPacket_encode(buf + 1, trp->rem_len); /* put the original remaining length back into the buffer */
+		if((trp->rem_len + trp->len) > buflen)
+			goto exit;
+		++trp->state;
+		/*FALLTHROUGH*/
+	case 2:
+		if(trp->rem_len){
+			/* read the rest of the buffer using a callback to supply the rest of the data */
+			if ((frc=(*trp->getfn)(trp->sck, buf + trp->len, trp->rem_len)) == -1)
+				goto exit;
+			if (frc == 0)
+				return 0;
+			trp->rem_len -= frc;
+			trp->len += frc;
+			if(trp->rem_len)
+				return 0;
+		}
+		header.byte = buf[0];
+		rc = header.bits.type;
 		break;
 	}
-	return strbuf;
+
+exit:
+	trp->state = 0;
+	return rc;
 }