MQTTPacket fixes
Fork of MQTTPacket by
Diff: MQTTPacket.c
- Revision:
- 23:0cfb74c5a621
- Parent:
- 19:99773f597e90
--- a/MQTTPacket.c Mon Sep 25 12:03:27 2017 +0000 +++ b/MQTTPacket.c Tue Oct 03 17:02:36 2017 +0000 @@ -12,6 +12,7 @@ * * Contributors: * Ian Craggs - initial API and implementation and/or initial documentation + * Sergio R. Caprile - non-blocking packet read functions for stream transport *******************************************************************************/ #include "StackTrace.h" @@ -284,6 +285,7 @@ * @param buflen the length in bytes of the supplied buffer * @param getfn pointer to a function which will read any number of bytes from the needed source * @return integer MQTT packet type, or -1 on error + * @note the whole message must fit into the caller's buffer */ int MQTTPacket_read(unsigned char* buf, int buflen, int (*getfn)(unsigned char*, int)) { @@ -302,7 +304,9 @@ len += MQTTPacket_encode(buf + 1, rem_len); /* put the original remaining length back into the buffer */ /* 3. read the rest of the buffer using a callback to supply the rest of the data */ - if ((*getfn)(buf + len, rem_len) != rem_len) + if((rem_len + len) > buflen) + goto exit; + if (rem_len && ((*getfn)(buf + len, rem_len) != rem_len)) goto exit; header.byte = buf[0]; @@ -311,143 +315,97 @@ return rc; } - -const char* MQTTPacket_names[] = +/** + * Decodes the message length according to the MQTT algorithm, non-blocking + * @param trp pointer to a transport structure holding what is needed to solve getting data from it + * @param value the decoded length returned + * @return integer the number of bytes read from the socket, 0 for call again, or -1 on error + */ +static int MQTTPacket_decodenb(MQTTTransport *trp) { - "RESERVED", "CONNECT", "CONNACK", "PUBLISH", "PUBACK", "PUBREC", "PUBREL", - "PUBCOMP", "SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK", - "PINGREQ", "PINGRESP", "DISCONNECT" -}; - - -char* MQTTPacket_toString(char* strbuf, int strbuflen, unsigned char* buf, int buflen) -{ - int index = 0; - int rem_length = 0; - MQTTHeader header = {0}; - int strindex = 0; - - header.byte = buf[index++]; - index += MQTTPacket_decodeBuf(&buf[index], &rem_length); + unsigned char c; + int rc = MQTTPACKET_READ_ERROR; - switch (header.bits.type) - { - case CONNECT: - { - MQTTPacket_connectData data; - if (MQTTDeserialize_connect(&data, buf, buflen) == 1) - { - strindex = snprintf(strbuf, strbuflen, - "CONNECT MQTT version %d, client id %.*s, clean session %d, keep alive %hd", - (int)data.MQTTVersion, data.clientID.lenstring.len, data.clientID.lenstring.data, - (int)data.cleansession, data.keepAliveInterval); - if (data.willFlag) - strindex += snprintf(&strbuf[strindex], strbuflen - strindex, - ", will QoS %d, will retain %d, will topic %.*s, will message %.*s", - data.will.qos, data.will.retained, - data.will.topicName.lenstring.len, data.will.topicName.lenstring.data, - data.will.message.lenstring.len, data.will.message.lenstring.data); - if (data.username.lenstring.data && data.username.lenstring.len > 0) - { - printf("user name\n"); - strindex += snprintf(&strbuf[strindex], strbuflen - strindex, - ", user name %.*s", data.username.lenstring.len, data.username.lenstring.data); - } - if (data.password.lenstring.data && data.password.lenstring.len > 0) - strindex += snprintf(&strbuf[strindex], strbuflen - strindex, - ", password %.*s", data.password.lenstring.len, data.password.lenstring.data); - } - } - break; - case CONNACK: - { - unsigned char sessionPresent, connack_rc; - if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) == 1) - strindex = snprintf(strbuf, strbuflen, - "CONNACK session present %d, rc %d", sessionPresent, connack_rc); + FUNC_ENTRY; + if(trp->len == 0){ /* initialize on first call */ + trp->multiplier = 1; + trp->rem_len = 0; } - break; - case PUBLISH: - { - unsigned char dup, retained, *payload; - unsigned short packetid; - int qos, payloadlen; - MQTTString topicName = MQTTString_initializer; - if (MQTTDeserialize_publish(&dup, &qos, &retained, &packetid, &topicName, - &payload, &payloadlen, buf, buflen) == 1) - strindex = snprintf(strbuf, strbuflen, - "PUBLISH dup %d, QoS %d, retained %d, packet id %d, topic %.*s, payload length %d, payload %.*s", - dup, qos, retained, packetid, - (topicName.lenstring.len < 20) ? topicName.lenstring.len : 20, topicName.lenstring.data, - payloadlen, (payloadlen < 20) ? payloadlen : 20, payload); - } - break; - case PUBACK: - case PUBREC: - case PUBREL: - case PUBCOMP: - { - unsigned char packettype, dup; - unsigned short packetid; - if (MQTTDeserialize_ack(&packettype, &dup, &packetid, buf, buflen) == 1) - strindex = snprintf(strbuf, strbuflen, - "%s dup %d, packet id %d", - MQTTPacket_names[packettype], dup, packetid); - } - break; - case SUBSCRIBE: - { - unsigned char dup; - unsigned short packetid; - int maxcount = 1, count = 0; - MQTTString topicFilters[1]; - int requestedQoSs[1]; - if (MQTTDeserialize_subscribe(&dup, &packetid, maxcount, &count, - topicFilters, requestedQoSs, buf, buflen) == 1) - strindex = snprintf(strbuf, strbuflen, - "SUBSCRIBE dup %d, packet id %d count %d topic %.*s qos %d", - dup, packetid, count, - topicFilters[0].lenstring.len, topicFilters[0].lenstring.data, - requestedQoSs[0]); - } - break; - case SUBACK: - { - unsigned short packetid; - int maxcount = 1, count = 0; - int grantedQoSs[1]; - if (MQTTDeserialize_suback(&packetid, maxcount, &count, grantedQoSs, buf, buflen) == 1) - strindex = snprintf(strbuf, strbuflen, - "SUBACK packet id %d count %d granted qos %d", - packetid, count, grantedQoSs[0]); - } - break; - case UNSUBSCRIBE: - { - unsigned char dup; - unsigned short packetid; - int maxcount = 1, count = 0; - MQTTString topicFilters[1]; - if (MQTTDeserialize_unsubscribe(&dup, &packetid, maxcount, &count, topicFilters, buf, buflen) == 1) - strindex = snprintf(strbuf, strbuflen, - "UNSUBSCRIBE dup %d, packet id %d count %d topic %.*s", - dup, packetid, count, - topicFilters[0].lenstring.len, topicFilters[0].lenstring.data); - } - break; - case UNSUBACK: - { - unsigned short packetid; - if (MQTTDeserialize_unsuback(&packetid, buf, buflen) == 1) - strindex = snprintf(strbuf, strbuflen, - "UNSUBACK packet id %d", packetid); - } - break; - case PINGREQ: - case PINGRESP: - case DISCONNECT: - strindex = snprintf(strbuf, strbuflen, "%s", MQTTPacket_names[header.bits.type]); + do { + int frc; + if (trp->len >= MAX_NO_OF_REMAINING_LENGTH_BYTES) + goto exit; + if ((frc=(*trp->getfn)(trp->sck, &c, 1)) == -1) + goto exit; + if (frc == 0){ + rc = 0; + goto exit; + } + ++(trp->len); + trp->rem_len += (c & 127) * trp->multiplier; + trp->multiplier *= 128; + } while ((c & 128) != 0); + rc = trp->len; +exit: + FUNC_EXIT_RC(rc); + return rc; +} + +/** + * Helper function to read packet data from some source into a buffer, non-blocking + * @param buf the buffer into which the packet will be serialized + * @param buflen the length in bytes of the supplied buffer + * @param trp pointer to a transport structure holding what is needed to solve getting data from it + * @return integer MQTT packet type, 0 for call again, or -1 on error + * @note the whole message must fit into the caller's buffer + */ +int MQTTPacket_readnb(unsigned char* buf, int buflen, MQTTTransport *trp) +{ + int rc = -1, frc; + MQTTHeader header = {0}; + + switch(trp->state){ + default: + trp->state = 0; + /*FALLTHROUGH*/ + case 0: + /* read the header byte. This has the packet type in it */ + if ((frc=(*trp->getfn)(trp->sck, buf, 1)) == -1) + goto exit; + if (frc == 0) + return 0; + trp->len = 0; + ++trp->state; + /*FALLTHROUGH*/ + /* read the remaining length. This is variable in itself */ + case 1: + if((frc=MQTTPacket_decodenb(trp)) == MQTTPACKET_READ_ERROR) + goto exit; + if(frc == 0) + return 0; + trp->len = 1 + MQTTPacket_encode(buf + 1, trp->rem_len); /* put the original remaining length back into the buffer */ + if((trp->rem_len + trp->len) > buflen) + goto exit; + ++trp->state; + /*FALLTHROUGH*/ + case 2: + if(trp->rem_len){ + /* read the rest of the buffer using a callback to supply the rest of the data */ + if ((frc=(*trp->getfn)(trp->sck, buf + trp->len, trp->rem_len)) == -1) + goto exit; + if (frc == 0) + return 0; + trp->rem_len -= frc; + trp->len += frc; + if(trp->rem_len) + return 0; + } + header.byte = buf[0]; + rc = header.bits.type; break; } - return strbuf; + +exit: + trp->state = 0; + return rc; }