A small footprint MQTT library
Dependents: STM32F746_iothub_client_sample_mqtt FXOS8700CQ_To_Azure_IoT f767zi_mqtt FXOS8700CQ_To_Azure_IoT ... more
Diff: mqtt_codec.c
- Revision:
- 0:ef4901974abc
- Child:
- 5:34779607059c
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/mqtt_codec.c Fri Apr 08 12:01:23 2016 -0700 @@ -0,0 +1,956 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +#include <stdlib.h> +#include <limits.h> +#include "azure_c_shared_utility/gballoc.h" +#include "azure_c_shared_utility/buffer_.h" +#include "azure_c_shared_utility/macro_utils.h" +#include "azure_c_shared_utility/xlogging.h" +#include "azure_umqtt_c/mqtt_codec.h" + +#define PAYLOAD_OFFSET 5 +#define PACKET_TYPE_BYTE(p) ((uint8_t)(((uint8_t)(p)) & 0xf0)) +#define FLAG_VALUE_BYTE(p) ((uint8_t)(((uint8_t)(p)) & 0xf)) + +#define USERNAME_FLAG 0x80 +#define PASSWORD_FLAG 0x40 +#define WILL_RETAIN_FLAG 0x20 +#define WILL_QOS_FLAG_ 0x18 +#define WILL_FLAG_FLAG 0x04 +#define CLEAN_SESSION_FLAG 0x02 + +#define NEXT_128_CHUNK 0x80 +#define PUBLISH_DUP_FLAG 0x8 +#define PUBLISH_QOS_EXACTLY_ONCE 0x4 +#define PUBLISH_QOS_AT_LEAST_ONCE 0x2 +#define PUBLISH_QOS_RETAIN 0x1 + +#define PROTOCOL_NUMBER 4 +#define CONN_FLAG_BYTE_OFFSET 7 + +#define CONNECT_FIXED_HEADER_SIZE 2 +#define CONNECT_VARIABLE_HEADER_SIZE 10 +#define SUBSCRIBE_FIXED_HEADER_FLAG 0x2 +#define UNSUBSCRIBE_FIXED_HEADER_FLAG 0x2 + +#define MAX_SEND_SIZE 0xFFFFFF7F + +#define CODEC_STATE_VALUES \ + CODEC_STATE_FIXED_HEADER, \ + CODEC_STATE_VAR_HEADER, \ + CODEC_STATE_PAYLOAD + +DEFINE_ENUM(CODEC_STATE_RESULT, CODEC_STATE_VALUES); + +typedef struct MQTTCODEC_INSTANCE_TAG +{ + CONTROL_PACKET_TYPE currPacket; + CODEC_STATE_RESULT codecState; + size_t bufferOffset; + int headerFlags; + BUFFER_HANDLE headerData; + ON_PACKET_COMPLETE_CALLBACK packetComplete; + void* callContext; + uint8_t storeRemainLen[4]; + size_t remainLenIndex; +} MQTTCODEC_INSTANCE; + +typedef struct PUBLISH_HEADER_INFO_TAG +{ + const char* topicName; + uint16_t packetId; + const char* msgBuffer; + QOS_VALUE qualityOfServiceValue; +} PUBLISH_HEADER_INFO; + +void byteutil_writeByte(uint8_t** buffer, uint8_t value) +{ + if (buffer != NULL) + { + **buffer = value; + (*buffer)++; + } +} + +void byteutil_writeInt(uint8_t** buffer, uint16_t value) +{ + if (buffer != NULL) + { + **buffer = (char)(value / 256); + (*buffer)++; + **buffer = (char)(value % 256); + (*buffer)++; + } +} + +void byteutil_writeUTF(uint8_t** buffer, const char* stringData, uint16_t len) +{ + if (buffer != NULL) + { + byteutil_writeInt(buffer, len); + (void)memcpy(*buffer, stringData, len); + *buffer += len; + } +} + +CONTROL_PACKET_TYPE processControlPacketType(uint8_t pktByte, int* flags) +{ + CONTROL_PACKET_TYPE result; + result = PACKET_TYPE_BYTE(pktByte); + if (flags != NULL) + { + *flags = FLAG_VALUE_BYTE(pktByte); + } + return result; +} + +static int addListItemsToUnsubscribePacket(BUFFER_HANDLE ctrlPacket, const char** payloadList, size_t payloadCount) +{ + int result = 0; + if (payloadList == NULL || ctrlPacket == NULL) + { + result = __LINE__; + } + else + { + for (size_t index = 0; index < payloadCount && result == 0; index++) + { + // Add the Payload + size_t offsetLen = BUFFER_length(ctrlPacket); + size_t topicLen = strlen(payloadList[index]); + if (topicLen > USHRT_MAX) + { + result = __LINE__; + } + else if (BUFFER_enlarge(ctrlPacket, topicLen + 2) != 0) + { + result = __LINE__; + } + else + { + uint8_t* iterator = BUFFER_u_char(ctrlPacket); + iterator += offsetLen; + byteutil_writeUTF(&iterator, payloadList[index], (uint16_t)topicLen); + } + } + } + return result; +} + +static int addListItemsToSubscribePacket(BUFFER_HANDLE ctrlPacket, SUBSCRIBE_PAYLOAD* payloadList, size_t payloadCount) +{ + int result = 0; + if (payloadList == NULL || ctrlPacket == NULL) + { + result = __LINE__; + } + else + { + for (size_t index = 0; index < payloadCount && result == 0; index++) + { + // Add the Payload + size_t offsetLen = BUFFER_length(ctrlPacket); + size_t topicLen = strlen(payloadList[index].subscribeTopic); + if (topicLen > USHRT_MAX) + { + result = __LINE__; + } + else if (BUFFER_enlarge(ctrlPacket, topicLen + 2 + 1) != 0) + { + result = __LINE__; + } + else + { + uint8_t* iterator = BUFFER_u_char(ctrlPacket); + iterator += offsetLen; + byteutil_writeUTF(&iterator, payloadList[index].subscribeTopic, (uint16_t)topicLen); + *iterator = payloadList[index].qosReturn; + } + } + } + return result; +} + +static int constructConnectVariableHeader(BUFFER_HANDLE ctrlPacket, const MQTT_CLIENT_OPTIONS* mqttOptions) +{ + int result = 0; + if (BUFFER_enlarge(ctrlPacket, CONNECT_VARIABLE_HEADER_SIZE) != 0) + { + result = __LINE__; + } + else + { + uint8_t* iterator = BUFFER_u_char(ctrlPacket); + if (iterator == NULL) + { + result = __LINE__; + } + else + { + byteutil_writeUTF(&iterator, "MQTT", 4); + byteutil_writeByte(&iterator, PROTOCOL_NUMBER); + byteutil_writeByte(&iterator, 0); // Flags will be entered later + byteutil_writeInt(&iterator, mqttOptions->keepAliveInterval); + result = 0; + } + } + return result; +} + +static int constructPublishVariableHeader(BUFFER_HANDLE ctrlPacket, const PUBLISH_HEADER_INFO* publishHeader) +{ + int result = 0; + size_t topicLen = 0; + size_t spaceLen = 0; + size_t idLen = 0; + + size_t currLen = BUFFER_length(ctrlPacket); + + topicLen = strlen(publishHeader->topicName); + spaceLen += 2; + + if (publishHeader->qualityOfServiceValue != DELIVER_AT_MOST_ONCE) + { + // Packet Id is only set if the QOS is not 0 + idLen = 2; + } + + if (topicLen > USHRT_MAX) + { + result = __LINE__; + } + else if (BUFFER_enlarge(ctrlPacket, topicLen + idLen + spaceLen) != 0) + { + result = __LINE__; + } + else + { + uint8_t* iterator = BUFFER_u_char(ctrlPacket); + if (iterator == NULL) + { + result = __LINE__; + } + else + { + iterator += currLen; + /* The Topic Name MUST be present as the first field in the PUBLISH Packet Variable header.It MUST be 792 a UTF-8 encoded string [MQTT-3.3.2-1] as defined in section 1.5.3.*/ + byteutil_writeUTF(&iterator, publishHeader->topicName, (uint16_t)topicLen); + if (idLen > 0) + { + byteutil_writeInt(&iterator, publishHeader->packetId); + } + result = 0; + } + } + return result; +} + +static int constructSubscibeTypeVariableHeader(BUFFER_HANDLE ctrlPacket, uint16_t packetId) +{ + int result = 0; + if (BUFFER_enlarge(ctrlPacket, 2) != 0) + { + result = __LINE__; + } + else + { + uint8_t* iterator = BUFFER_u_char(ctrlPacket); + if (iterator == NULL) + { + result = __LINE__; + } + else + { + byteutil_writeInt(&iterator, packetId); + result = 0; + } + } + return result; +} + +static BUFFER_HANDLE constructPublishReply(CONTROL_PACKET_TYPE type, int flags, uint16_t packetId) +{ + BUFFER_HANDLE result = BUFFER_new(); + if (result != NULL) + { + if (BUFFER_pre_build(result, 4) != 0) + { + BUFFER_delete(result); + result = NULL; + } + else + { + uint8_t* iterator = BUFFER_u_char(result); + if (iterator == NULL) + { + BUFFER_delete(result); + result = NULL; + } + else + { + *iterator = type | flags; + iterator++; + *iterator = 0x2; + iterator++; + byteutil_writeInt(&iterator, packetId); + } + } + } + return result; +} + +static int constructFixedHeader(BUFFER_HANDLE ctrlPacket, CONTROL_PACKET_TYPE packetType, uint8_t flags) +{ + int result; + if (ctrlPacket == NULL) + { + return __LINE__; + } + else + { + size_t packetLen = BUFFER_length(ctrlPacket); + uint8_t remainSize[4] = { 0 }; + size_t index = 0; + + // Calculate the length of packet + do + { + uint8_t encode = packetLen % 128; + packetLen /= 128; + // if there are more data to encode, set the top bit of this byte + if (packetLen > 0) + { + encode |= NEXT_128_CHUNK; + } + remainSize[index++] = encode; + } while (packetLen > 0); + + BUFFER_HANDLE fixedHeader = BUFFER_new(); + if (fixedHeader == NULL) + { + result = __LINE__; + } + else if (BUFFER_pre_build(fixedHeader, index + 1) != 0) + { + BUFFER_delete(fixedHeader); + result = __LINE__; + } + else + { + uint8_t* iterator = BUFFER_u_char(fixedHeader); + *iterator = packetType | flags; + iterator++; + (void)memcpy(iterator, remainSize, index); + + result = BUFFER_prepend(ctrlPacket, fixedHeader); + BUFFER_delete(fixedHeader); + } + } + return result; +} + +static int constructConnPayload(BUFFER_HANDLE ctrlPacket, const MQTT_CLIENT_OPTIONS* mqttOptions) +{ + int result = 0; + if (mqttOptions == NULL || ctrlPacket == NULL) + { + result = __LINE__; + } + else + { + size_t clientLen = 0; + size_t usernameLen = 0; + size_t passwordLen = 0; + size_t willMessageLen = 0; + size_t willTopicLen = 0; + size_t spaceLen = 0; + + if (mqttOptions->clientId != NULL) + { + spaceLen += 2; + clientLen = strlen(mqttOptions->clientId); + } + if (mqttOptions->username != NULL) + { + spaceLen += 2; + usernameLen = strlen(mqttOptions->username); + } + if (mqttOptions->password != NULL) + { + spaceLen += 2; + passwordLen = strlen(mqttOptions->password); + } + if (mqttOptions->willMessage != NULL) + { + spaceLen += 2; + willMessageLen = strlen(mqttOptions->willMessage); + } + if (mqttOptions->willTopic != NULL) + { + spaceLen += 2; + willTopicLen = strlen(mqttOptions->willTopic); + } + + size_t currLen = BUFFER_length(ctrlPacket); + size_t totalLen = clientLen + usernameLen + passwordLen + willMessageLen + willTopicLen + spaceLen; + + // Validate the Username & Password + if (clientLen > USHRT_MAX) + { + result = __LINE__; + } + else if (usernameLen > 0 && passwordLen == 0) + { + result = __LINE__; + } + else if ( (willMessageLen > 0 && willTopicLen == 0) || (willTopicLen > 0 && willMessageLen == 0) ) + { + result = __LINE__; + } + else if (BUFFER_enlarge(ctrlPacket, totalLen) != 0) + { + result = __LINE__; + } + else + { + uint8_t* packet = BUFFER_u_char(ctrlPacket); + uint8_t* iterator = packet; + + iterator += currLen; + byteutil_writeUTF(&iterator, mqttOptions->clientId, (uint16_t)clientLen); + + // TODO: Read on the Will Topic + if (willMessageLen > USHRT_MAX || willTopicLen > USHRT_MAX || usernameLen > USHRT_MAX || passwordLen > USHRT_MAX) + { + result = __LINE__; + } + else + { + if (willMessageLen > 0 && willTopicLen > 0) + { + packet[CONN_FLAG_BYTE_OFFSET] |= WILL_FLAG_FLAG; + byteutil_writeUTF(&iterator, mqttOptions->willTopic, (uint16_t)willTopicLen); + packet[CONN_FLAG_BYTE_OFFSET] |= mqttOptions->qualityOfServiceValue; + if (mqttOptions->messageRetain) + { + packet[CONN_FLAG_BYTE_OFFSET] |= WILL_RETAIN_FLAG; + } + byteutil_writeUTF(&iterator, mqttOptions->willMessage, (uint16_t)willMessageLen); + } + if (usernameLen > 0) + { + packet[CONN_FLAG_BYTE_OFFSET] |= USERNAME_FLAG | PASSWORD_FLAG; + byteutil_writeUTF(&iterator, mqttOptions->username, (uint16_t)usernameLen); + byteutil_writeUTF(&iterator, mqttOptions->password, (uint16_t)passwordLen); + } + // TODO: Get the rest of the flags + if (mqttOptions->useCleanSession) + { + packet[CONN_FLAG_BYTE_OFFSET] |= CLEAN_SESSION_FLAG; + } + result = 0; + } + } + } + return result; +} + +static int prepareheaderDataInfo(MQTTCODEC_INSTANCE* codecData, uint8_t remainLen) +{ + int result; + if (codecData == NULL) + { + result = __LINE__; + } + else + { + result = 0; + codecData->storeRemainLen[codecData->remainLenIndex++] = remainLen; + if (remainLen < 0x7f) + { + int multiplier = 1; + int totalLen = 0; + size_t index = 0; + uint8_t encodeByte = 0; + do + { + encodeByte = codecData->storeRemainLen[index++]; + totalLen += (encodeByte & 127) * multiplier; + multiplier *= NEXT_128_CHUNK; + + if (multiplier > 128 * 128 * 128) + { + result = __LINE__; + break; + } + } while ( (encodeByte & NEXT_128_CHUNK) != 0); + + if (totalLen > 0) + { + codecData->headerData = BUFFER_new(); + (void)BUFFER_pre_build(codecData->headerData, totalLen); + codecData->bufferOffset = 0; + } + codecData->codecState = CODEC_STATE_VAR_HEADER; + + // Reset remainLen Index + codecData->remainLenIndex = 0; + memset(codecData->storeRemainLen, 0, 4 * sizeof(uint8_t)); + } + } + return result; +} + +static void completePacketData(MQTTCODEC_INSTANCE* codecData) +{ + if (codecData) + { + if (codecData->packetComplete != NULL) + { + codecData->packetComplete(codecData->callContext, codecData->currPacket, codecData->headerFlags, codecData->headerData); + } + + // Clean up data + codecData->currPacket = UNKNOWN_TYPE; + codecData->codecState = CODEC_STATE_FIXED_HEADER; + codecData->headerFlags = 0; + BUFFER_delete(codecData->headerData); + codecData->headerData = NULL; + } +} + +MQTTCODEC_HANDLE mqtt_codec_create(ON_PACKET_COMPLETE_CALLBACK packetComplete, void* callbackCtx) +{ + MQTTCODEC_HANDLE result; + result = malloc(sizeof(MQTTCODEC_INSTANCE) ); + /* Codes_SRS_MQTT_CODEC_07_001: [If a failure is encountered then mqtt_codec_create shall return NULL.] */ + if (result != NULL) + { + /* Codes_SRS_MQTT_CODEC_07_002: [On success mqtt_codec_create shall return a MQTTCODEC_HANDLE value.] */ + result->currPacket = UNKNOWN_TYPE; + result->codecState = CODEC_STATE_FIXED_HEADER; + result->headerFlags = 0; + result->bufferOffset = 0; + result->packetComplete = packetComplete; + result->callContext = callbackCtx; + result->headerData = NULL; + memset(result->storeRemainLen, 0, 4 * sizeof(uint8_t)); + result->remainLenIndex = 0; + } + return result; +} + +void mqtt_codec_destroy(MQTTCODEC_HANDLE handle) +{ + /* Codes_SRS_MQTT_CODEC_07_003: [If the handle parameter is NULL then mqtt_codec_destroy shall do nothing.] */ + if (handle != NULL) + { + MQTTCODEC_INSTANCE* codecData = (MQTTCODEC_INSTANCE*)handle; + /* Codes_SRS_MQTT_CODEC_07_004: [mqtt_codec_destroy shall deallocate all memory that has been allocated by this object.] */ + BUFFER_delete(codecData->headerData); + free(codecData); + } +} + +BUFFER_HANDLE mqtt_codec_connect(const MQTT_CLIENT_OPTIONS* mqttOptions) +{ + BUFFER_HANDLE result; + /* Codes_SRS_MQTT_CODEC_07_008: [If the parameters mqttOptions is NULL then mqtt_codec_connect shall return a null value.] */ + if (mqttOptions == NULL) + { + result = NULL; + } + else + { + /* Codes_SRS_MQTT_CODEC_07_009: [mqtt_codec_connect shall construct a BUFFER_HANDLE that represents a MQTT CONNECT packet.] */ + result = BUFFER_new(); + if (result != NULL) + { + // Add Variable Header Information + if (constructConnectVariableHeader(result, mqttOptions) != 0) + { + /* Codes_SRS_MQTT_CODEC_07_010: [If any error is encountered then mqtt_codec_connect shall return NULL.] */ + BUFFER_delete(result); + result = NULL; + } + else + { + if (constructConnPayload(result, mqttOptions) != 0) + { + /* Codes_SRS_MQTT_CODEC_07_010: [If any error is encountered then mqtt_codec_connect shall return NULL.] */ + BUFFER_delete(result); + result = NULL; + } + else + { + if (constructFixedHeader(result, CONNECT_TYPE, 0) != 0) + { + /* Codes_SRS_MQTT_CODEC_07_010: [If any error is encountered then mqtt_codec_connect shall return NULL.] */ + BUFFER_delete(result); + result = NULL; + } + } + } + } + } + return result; +} + +BUFFER_HANDLE mqtt_codec_disconnect() +{ + /* Codes_SRS_MQTT_CODEC_07_011: [On success mqtt_codec_disconnect shall construct a BUFFER_HANDLE that represents a MQTT DISCONNECT packet.] */ + BUFFER_HANDLE result = BUFFER_new(); + if (result != NULL) + { + if (BUFFER_enlarge(result, 2) != 0) + { + /* Codes_SRS_MQTT_CODEC_07_012: [If any error is encountered mqtt_codec_disconnect shall return NULL.] */ + BUFFER_delete(result); + result = NULL; + } + else + { + uint8_t* iterator = BUFFER_u_char(result); + if (iterator == NULL) + { + /* Codes_SRS_MQTT_CODEC_07_012: [If any error is encountered mqtt_codec_disconnect shall return NULL.] */ + BUFFER_delete(result); + result = NULL; + } + else + { + iterator[0] = DISCONNECT_TYPE; + iterator[1] = 0; + } + } + } + return result; +} + +BUFFER_HANDLE mqtt_codec_publish(QOS_VALUE qosValue, bool duplicateMsg, bool serverRetain, uint16_t packetId, const char* topicName, const uint8_t* msgBuffer, size_t buffLen) +{ + BUFFER_HANDLE result; + /* Codes_SRS_MQTT_CODEC_07_005: [If the parameters topicName is NULL then mqtt_codec_publish shall return NULL.] */ + if (topicName == NULL) + { + result = NULL; + } + /* Codes_SRS_MQTT_CODEC_07_036: [mqtt_codec_publish shall return NULL if the buffLen variable is greater than the MAX_SEND_SIZE (0xFFFFFF7F).] */ + else if (buffLen > MAX_SEND_SIZE) + { + /* Codes_SRS_MQTT_CODEC_07_006: [If any error is encountered then mqtt_codec_publish shall return NULL.] */ + result = NULL; + } + else + { + PUBLISH_HEADER_INFO publishInfo = { 0 }; + publishInfo.topicName = topicName; + publishInfo.packetId = packetId; + publishInfo.qualityOfServiceValue = qosValue; + + uint8_t headerFlags = 0; + if (duplicateMsg) headerFlags |= PUBLISH_DUP_FLAG; + if (serverRetain) headerFlags |= PUBLISH_QOS_RETAIN; + if (qosValue != DELIVER_AT_MOST_ONCE) + { + if (qosValue == DELIVER_AT_LEAST_ONCE) + { + headerFlags |= PUBLISH_QOS_AT_LEAST_ONCE; + } + else + { + headerFlags |= PUBLISH_QOS_EXACTLY_ONCE; + } + } + + /* Codes_SRS_MQTT_CODEC_07_007: [mqtt_codec_publish shall return a BUFFER_HANDLE that represents a MQTT PUBLISH message.] */ + result = BUFFER_new(); + if (result != NULL) + { + if (constructPublishVariableHeader(result, &publishInfo) != 0) + { + /* Codes_SRS_MQTT_CODEC_07_006: [If any error is encountered then mqtt_codec_publish shall return NULL.] */ + BUFFER_delete(result); + result = NULL; + } + else + { + size_t payloadOffset = BUFFER_length(result); + if (buffLen > 0) + { + if (BUFFER_enlarge(result, buffLen) != 0) + { + /* Codes_SRS_MQTT_CODEC_07_006: [If any error is encountered then mqtt_codec_publish shall return NULL.] */ + BUFFER_delete(result); + result = NULL; + } + else + { + uint8_t* iterator = BUFFER_u_char(result); + if (iterator == NULL) + { + /* Codes_SRS_MQTT_CODEC_07_006: [If any error is encountered then mqtt_codec_publish shall return NULL.] */ + BUFFER_delete(result); + result = NULL; + } + else + { + iterator += payloadOffset; + // Write Message + memcpy(iterator, msgBuffer, buffLen); + } + } + } + + if (result != NULL) + { + if (constructFixedHeader(result, PUBLISH_TYPE, headerFlags) != 0) + { + /* Codes_SRS_MQTT_CODEC_07_006: [If any error is encountered then mqtt_codec_publish shall return NULL.] */ + BUFFER_delete(result); + result = NULL; + } + } + } + } + } + return result; +} + +BUFFER_HANDLE mqtt_codec_publishAck(uint16_t packetId) +{ + /* Codes_SRS_MQTT_CODEC_07_013: [On success mqtt_codec_publishAck shall return a BUFFER_HANDLE representation of a MQTT PUBACK packet.] */ + /* Codes_SRS_MQTT_CODEC_07_014 : [If any error is encountered then mqtt_codec_publishAck shall return NULL.] */ + BUFFER_HANDLE result = constructPublishReply(PUBACK_TYPE, 0, packetId); + return result; +} + +BUFFER_HANDLE mqtt_codec_publishReceived(uint16_t packetId) +{ + /* Codes_SRS_MQTT_CODEC_07_015: [On success mqtt_codec_publishRecieved shall return a BUFFER_HANDLE representation of a MQTT PUBREC packet.] */ + /* Codes_SRS_MQTT_CODEC_07_016 : [If any error is encountered then mqtt_codec_publishRecieved shall return NULL.] */ + BUFFER_HANDLE result = constructPublishReply(PUBREC_TYPE, 0, packetId); + return result; +} + +BUFFER_HANDLE mqtt_codec_publishRelease(uint16_t packetId) +{ + /* Codes_SRS_MQTT_CODEC_07_017: [On success mqtt_codec_publishRelease shall return a BUFFER_HANDLE representation of a MQTT PUBREL packet.] */ + /* Codes_SRS_MQTT_CODEC_07_018 : [If any error is encountered then mqtt_codec_publishRelease shall return NULL.] */ + BUFFER_HANDLE result = constructPublishReply(PUBREL_TYPE, 2, packetId); + return result; +} + +BUFFER_HANDLE mqtt_codec_publishComplete(uint16_t packetId) +{ + /* Codes_SRS_MQTT_CODEC_07_019: [On success mqtt_codec_publishComplete shall return a BUFFER_HANDLE representation of a MQTT PUBCOMP packet.] */ + /* Codes_SRS_MQTT_CODEC_07_020 : [If any error is encountered then mqtt_codec_publishComplete shall return NULL.] */ + BUFFER_HANDLE result = constructPublishReply(PUBCOMP_TYPE, 0, packetId); + return result; +} + +BUFFER_HANDLE mqtt_codec_ping() +{ + /* Codes_SRS_MQTT_CODEC_07_021: [On success mqtt_codec_ping shall construct a BUFFER_HANDLE that represents a MQTT PINGREQ packet.] */ + BUFFER_HANDLE result = BUFFER_new(); + if (result != NULL) + { + if (BUFFER_enlarge(result, 2) != 0) + { + /* Codes_SRS_MQTT_CODEC_07_022: [If any error is encountered mqtt_codec_ping shall return NULL.] */ + BUFFER_delete(result); + result = NULL; + } + else + { + uint8_t* iterator = BUFFER_u_char(result); + if (iterator == NULL) + { + /* Codes_SRS_MQTT_CODEC_07_022: [If any error is encountered mqtt_codec_ping shall return NULL.] */ + BUFFER_delete(result); + result = NULL; + } + else + { + iterator[0] = PINGREQ_TYPE; + iterator[1] = 0; + } + } + } + return result; +} + +BUFFER_HANDLE mqtt_codec_subscribe(uint16_t packetId, SUBSCRIBE_PAYLOAD* subscribeList, size_t count) +{ + BUFFER_HANDLE result; + /* Codes_SRS_MQTT_CODEC_07_023: [If the parameters subscribeList is NULL or if count is 0 then mqtt_codec_subscribe shall return NULL.] */ + if (subscribeList == NULL || count == 0) + { + result = NULL; + } + else + { + /* Codes_SRS_MQTT_CODEC_07_026: [mqtt_codec_subscribe shall return a BUFFER_HANDLE that represents a MQTT SUBSCRIBE message.]*/ + result = BUFFER_new(); + if (result != NULL) + { + if (constructSubscibeTypeVariableHeader(result, packetId) != 0) + { + /* Codes_SRS_MQTT_CODEC_07_025: [If any error is encountered then mqtt_codec_subscribe shall return NULL.] */ + BUFFER_delete(result); + result = NULL; + } + else + { + /* Codes_SRS_MQTT_CODEC_07_024: [mqtt_codec_subscribe shall iterate through count items in the subscribeList.] */ + if (addListItemsToSubscribePacket(result, subscribeList, count) != 0) + { + /* Codes_SRS_MQTT_CODEC_07_025: [If any error is encountered then mqtt_codec_subscribe shall return NULL.] */ + BUFFER_delete(result); + result = NULL; + } + else + { + if (constructFixedHeader(result, SUBSCRIBE_TYPE, SUBSCRIBE_FIXED_HEADER_FLAG) != 0) + { + /* Codes_SRS_MQTT_CODEC_07_025: [If any error is encountered then mqtt_codec_subscribe shall return NULL.] */ + BUFFER_delete(result); + result = NULL; + } + } + } + } + } + return result; +} + +BUFFER_HANDLE mqtt_codec_unsubscribe(uint16_t packetId, const char** unsubscribeList, size_t count) +{ + BUFFER_HANDLE result; + /* Codes_SRS_MQTT_CODEC_07_027: [If the parameters unsubscribeList is NULL or if count is 0 then mqtt_codec_unsubscribe shall return NULL.] */ + if (unsubscribeList == NULL || count == 0) + { + result = NULL; + } + else + { + /* Codes_SRS_MQTT_CODEC_07_030: [mqtt_codec_unsubscribe shall return a BUFFER_HANDLE that represents a MQTT SUBSCRIBE message.] */ + result = BUFFER_new(); + if (result != NULL) + { + if (constructSubscibeTypeVariableHeader(result, packetId) != 0) + { + /* Codes_SRS_MQTT_CODEC_07_029: [If any error is encountered then mqtt_codec_unsubscribe shall return NULL.] */ + BUFFER_delete(result); + result = NULL; + } + else + { + /* Codes_SRS_MQTT_CODEC_07_028: [mqtt_codec_unsubscribe shall iterate through count items in the unsubscribeList.] */ + if (addListItemsToUnsubscribePacket(result, unsubscribeList, count) != 0) + { + /* Codes_SRS_MQTT_CODEC_07_029: [If any error is encountered then mqtt_codec_unsubscribe shall return NULL.] */ + BUFFER_delete(result); + result = NULL; + } + else + { + if (constructFixedHeader(result, UNSUBSCRIBE_TYPE, UNSUBSCRIBE_FIXED_HEADER_FLAG) != 0) + { + /* Codes_SRS_MQTT_CODEC_07_029: [If any error is encountered then mqtt_codec_unsubscribe shall return NULL.] */ + BUFFER_delete(result); + result = NULL; + } + } + } + } + } + return result; +} + +int mqtt_codec_bytesReceived(MQTTCODEC_HANDLE handle, const unsigned char* buffer, size_t size) +{ + int result; + MQTTCODEC_INSTANCE* codec_Data = (MQTTCODEC_INSTANCE*)handle; + /* Codes_SRS_MQTT_CODEC_07_031: [If the parameters handle or buffer is NULL then mqtt_codec_bytesReceived shall return a non-zero value.] */ + if (codec_Data == NULL) + { + result = __LINE__; + } + /* Codes_SRS_MQTT_CODEC_07_031: [If the parameters handle or buffer is NULL then mqtt_codec_bytesReceived shall return a non-zero value.] */ + /* Codes_SRS_MQTT_CODEC_07_032: [If the parameters size is zero then mqtt_codec_bytesReceived shall return a non-zero value.] */ + else if (buffer == NULL || size == 0) + { + codec_Data->currPacket = PACKET_TYPE_ERROR; + result = __LINE__; + } + else + { + /* Codes_SRS_MQTT_CODEC_07_033: [mqtt_codec_bytesReceived constructs a sequence of bytes into the corresponding MQTT packets and on success returns zero.] */ + result = 0; + for (size_t index = 0; index < size && result == 0; index++) + { + uint8_t iterator = ((int8_t*)buffer)[index]; + if (codec_Data->codecState == CODEC_STATE_FIXED_HEADER) + { + if (codec_Data->currPacket == UNKNOWN_TYPE) + { + codec_Data->currPacket = processControlPacketType(iterator, &codec_Data->headerFlags); + } + else + { + if (prepareheaderDataInfo(codec_Data, iterator) != 0) + { + /* Codes_SRS_MQTT_CODEC_07_035: [If any error is encountered then the packet state will be marked as error and mqtt_codec_bytesReceived shall return a non-zero value.] */ + codec_Data->currPacket = PACKET_TYPE_ERROR; + result = __LINE__; + } + if (codec_Data->currPacket == PINGRESP_TYPE) + { + /* Codes_SRS_MQTT_CODEC_07_034: [Upon a constructing a complete MQTT packet mqtt_codec_bytesReceived shall call the ON_PACKET_COMPLETE_CALLBACK function.] */ + completePacketData(codec_Data); + } + } + } + else if (codec_Data->codecState == CODEC_STATE_VAR_HEADER) + { + size_t payloadLen = 0; + if (codec_Data->headerData == NULL) + { + codec_Data->codecState = CODEC_STATE_PAYLOAD; + } + else + { + uint8_t* dataBytes = BUFFER_u_char(codec_Data->headerData); + if (dataBytes == NULL) + { + /* Codes_SRS_MQTT_CODEC_07_035: [If any error is encountered then the packet state will be marked as error and mqtt_codec_bytesReceived shall return a non-zero value.] */ + codec_Data->currPacket = PACKET_TYPE_ERROR; + result = __LINE__; + } + else + { + // Increment the data + dataBytes += codec_Data->bufferOffset++; + *dataBytes = iterator; + + size_t totalLen = BUFFER_length(codec_Data->headerData); + if (codec_Data->bufferOffset >= totalLen) + { + /* Codes_SRS_MQTT_CODEC_07_034: [Upon a constructing a complete MQTT packet mqtt_codec_bytesReceived shall call the ON_PACKET_COMPLETE_CALLBACK function.] */ + completePacketData(codec_Data); + } + } + } + } + else + { + /* Codes_SRS_MQTT_CODEC_07_035: [If any error is encountered then the packet state will be marked as error and mqtt_codec_bytesReceived shall return a non-zero value.] */ + codec_Data->currPacket = PACKET_TYPE_ERROR; + result = __LINE__; + } + } + } + return result; +}