A small footprint MQTT library

Dependents:   STM32F746_iothub_client_sample_mqtt FXOS8700CQ_To_Azure_IoT f767zi_mqtt FXOS8700CQ_To_Azure_IoT ... more

mqtt_client.c

Committer:
AzureIoTClient
Date:
2016-07-29
Revision:
5:34779607059c
Parent:
4:e7167dabd6e4
Child:
6:d9c4702f91ca

File content as of revision 5:34779607059c:

// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

#include <stdlib.h>
#include "azure_c_shared_utility/gballoc.h"
#include "azure_c_shared_utility/platform.h"
#include "azure_c_shared_utility/tickcounter.h"
#include "azure_c_shared_utility/crt_abstractions.h"
#include "azure_c_shared_utility/xlogging.h"

#include "azure_umqtt_c/mqtt_client.h"
#include "azure_umqtt_c/mqtt_codec.h"
#include <time.h>

#define KEEP_ALIVE_BUFFER_SEC           10
#define VARIABLE_HEADER_OFFSET          2
#define RETAIN_FLAG_MASK                0x1
#define QOS_LEAST_ONCE_FLAG_MASK        0x2
#define QOS_EXACTLY_ONCE_FLAG_MASK      0x4
#define DUPLICATE_FLAG_MASK             0x8
#define CONNECT_PACKET_MASK             0xf0
#define TIME_MAX_BUFFER                 16
#define DEFAULT_MAX_PING_RESPONSE_TIME  90

static const char* FORMAT_HEX_CHAR = "0x%02x ";

typedef struct MQTT_CLIENT_TAG
{
    XIO_HANDLE xioHandle;
    MQTTCODEC_HANDLE codec_handle;
    CONTROL_PACKET_TYPE packetState;
    TICK_COUNTER_HANDLE packetTickCntr;
    uint64_t packetSendTimeMs;
    ON_MQTT_OPERATION_CALLBACK fnOperationCallback;
    ON_MQTT_MESSAGE_RECV_CALLBACK fnMessageRecv;
    void* ctx;
    QOS_VALUE qosValue;
    uint16_t keepAliveInterval;
    MQTT_CLIENT_OPTIONS mqttOptions;
    bool clientConnected;
    bool socketConnected;
    bool logTrace;
    bool rawBytesTrace;
    uint64_t timeSincePing;
    uint16_t maxPingRespTime;
} MQTT_CLIENT;

static uint16_t byteutil_read_uint16(uint8_t** buffer)
{
    uint16_t result = 0;
    if (buffer != NULL)
    {
        result = 256 * ((uint8_t)(**buffer)) + (uint8_t)(*(*buffer + 1));
        *buffer += 2; // Move the ptr
    }
    return result;
}

static char* byteutil_readUTF(uint8_t** buffer, size_t* byteLen)
{
    char* result = NULL;
    if (buffer != NULL)
    {
        // Get the length of the string
        int len = byteutil_read_uint16(buffer);
        if (len > 0)
        {
            result = (char*)malloc(len + 1);
            if (result != NULL)
            {
                (void)memcpy(result, *buffer, len);
                result[len] = '\0';
                *buffer += len;
                if (byteLen != NULL)
                {
                    *byteLen = len;
                }
            }
        }
    }
    return result;
}

static uint8_t byteutil_readByte(uint8_t** buffer)
{
    uint8_t result = 0;
    if (buffer != NULL)
    {
        result = **buffer;
        (*buffer)++;
    }
    return result;
}

static void sendComplete(void* context, IO_SEND_RESULT send_result)
{
    MQTT_CLIENT* mqttData = (MQTT_CLIENT*)context;
    if (mqttData != NULL && mqttData->fnOperationCallback != NULL && send_result == IO_SEND_OK)
    {
        if (mqttData->packetState == DISCONNECT_TYPE)
        {
            /*Codes_SRS_MQTT_CLIENT_07_032: [If the actionResult parameter is of type MQTT_CLIENT_ON_DISCONNECT or MQTT_CLIENT_ON_ERROR the the msgInfo value shall be NULL.]*/
            mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_DISCONNECT, NULL, mqttData->ctx);

            // close the xio
            if (xio_close(mqttData->xioHandle, NULL, mqttData->ctx) != 0)
            {
                LOG(LOG_ERROR, LOG_LINE, "MQTT xio_close failed to return a successful result.");
            }
            mqttData->socketConnected = false;
            mqttData->clientConnected = false;
        }
    }
    else
    {
        LOG(LOG_ERROR, LOG_LINE, "MQTT Send Complete Failure");
        if (mqttData->fnOperationCallback)
        {
            mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
        }
    }
}

static const char* retrievePacketType(CONTROL_PACKET_TYPE packet)
{
    switch (packet&CONNECT_PACKET_MASK)
    {
        case CONNECT_TYPE: return "CONNECT";
        case CONNACK_TYPE:  return "CONNACK";
        case PUBLISH_TYPE:  return "PUBLISH";
        case PUBACK_TYPE:  return "PUBACK";
        case PUBREC_TYPE:  return "PUBREC";
        case PUBREL_TYPE:  return "PUBREL";
        case SUBSCRIBE_TYPE:  return "SUBSCRIBE";
        case SUBACK_TYPE:  return "SUBACK";
        case UNSUBSCRIBE_TYPE:  return "UNSUBSCRIBE";
        case UNSUBACK_TYPE:  return "UNSUBACK";
        case PINGREQ_TYPE:  return "PINGREQ";
        case PINGRESP_TYPE:  return "PINGRESP";
        case DISCONNECT_TYPE:  return "DISCONNECT";
        default:
        case PACKET_TYPE_ERROR:
        case UNKNOWN_TYPE:
            return "UNKNOWN";
    }
}

static void getLogTime(char* timeResult, size_t len)
{
    if (timeResult != NULL)
    {
        time_t localTime = time(NULL);
        struct tm* tmInfo = localtime(&localTime);
        if (strftime(timeResult, len, "%H:%M:%S", tmInfo) == 0)
        {
            timeResult[0] = '\0';
        }
    }
}

static void logOutgoingingMsgTrace(MQTT_CLIENT* clientData, const uint8_t* data, size_t length)
{
    if (clientData != NULL && data != NULL && length > 0 && clientData->logTrace)
    {
        char tmBuffer[TIME_MAX_BUFFER];
        getLogTime(tmBuffer, TIME_MAX_BUFFER);

        LOG(LOG_TRACE, 0, "-> %s %s: ", tmBuffer, retrievePacketType((unsigned char)data[0]));
        for (size_t index = 0; index < length; index++)
        {
            LOG(LOG_TRACE, 0, (char*)FORMAT_HEX_CHAR, data[index]);
        }

        LOG(LOG_TRACE, LOG_LINE, "");
    }
}

static void logIncomingMsgTrace(MQTT_CLIENT* clientData, CONTROL_PACKET_TYPE packet, int flags, const uint8_t* data, size_t length)
{
    if (clientData != NULL && clientData->logTrace)
    {
        if (data != NULL && length > 0)
        {
            char tmBuffer[TIME_MAX_BUFFER];
            getLogTime(tmBuffer, TIME_MAX_BUFFER);

            LOG(LOG_TRACE, 0, "<- %s %s: 0x%02x 0x%02x ", tmBuffer, retrievePacketType((CONTROL_PACKET_TYPE)packet), (unsigned char)(packet | flags), length);
            for (size_t index = 0; index < length; index++)
            {
                LOG(LOG_TRACE, 0, (char*)FORMAT_HEX_CHAR, data[index]);
            }

            LOG(LOG_TRACE, LOG_LINE, "");
        }
        else if (packet == PINGRESP_TYPE)
        {
            char tmBuffer[TIME_MAX_BUFFER];
            getLogTime(tmBuffer, TIME_MAX_BUFFER);

            LOG(LOG_TRACE, LOG_LINE, "<- %s %s: 0x%02x 0x%02x ", tmBuffer, retrievePacketType((CONTROL_PACKET_TYPE)packet), (unsigned char)(packet | flags), length);
        }
    }
}

static int sendPacketItem(MQTT_CLIENT* clientData, const unsigned char* data, size_t length)
{
    int result;

    if (tickcounter_get_current_ms(clientData->packetTickCntr, &clientData->packetSendTimeMs) != 0)
    {
        LOG(LOG_ERROR, LOG_LINE, "Failure getting current ms tickcounter");
        result = __LINE__;
    }
    else
    {
        result = xio_send(clientData->xioHandle, (const void*)data, length, sendComplete, clientData);
        if (result != 0)
        {
            LOG(LOG_ERROR, LOG_LINE, "%d: Failure sending control packet data", result);
            result = __LINE__;
        }
        else
        {
            logOutgoingingMsgTrace(clientData, (const uint8_t*)data, length);
        }
    }
    return result;
}

static void onOpenComplete(void* context, IO_OPEN_RESULT open_result)
{
    MQTT_CLIENT* mqttData = (MQTT_CLIENT*)context;
    if (mqttData != NULL)
    {
        if (open_result == IO_OPEN_OK && !mqttData->socketConnected)
        {
            mqttData->packetState = CONNECT_TYPE;
            mqttData->socketConnected = true;
            // Send the Connect packet
            BUFFER_HANDLE connPacket = mqtt_codec_connect(&mqttData->mqttOptions);
            if (connPacket == NULL)
            {
                /*Codes_SRS_MQTT_CLIENT_07_007: [If any failure is encountered then mqtt_client_connect shall return a non-zero value.]*/
                LOG(LOG_ERROR, LOG_LINE, "Error: mqtt_codec_connect failed");
            }
            else
            {
                /*Codes_SRS_MQTT_CLIENT_07_009: [On success mqtt_client_connect shall send the MQTT CONNECT to the endpoint.]*/
                if (sendPacketItem(mqttData, BUFFER_u_char(connPacket), BUFFER_length(connPacket)) != 0)
                {
                    /*Codes_SRS_MQTT_CLIENT_07_007: [If any failure is encountered then mqtt_client_connect shall return a non-zero value.]*/
                    LOG(LOG_ERROR, LOG_LINE, "Error: mqtt_codec_connect failed");
                }
                BUFFER_delete(connPacket);
            }
        }
        else if (open_result == IO_OPEN_ERROR)
        {
            mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
        }
    }
}

static void onBytesReceived(void* context, const unsigned char* buffer, size_t size)
{
    MQTT_CLIENT* mqttData = (MQTT_CLIENT*)context;
    if (mqttData != NULL)
    {
        if (mqtt_codec_bytesReceived(mqttData->codec_handle, buffer, size) != 0)
        {
            if (mqttData->fnOperationCallback)
            {
                mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
            }
        }
    }
}

static void onIoError(void* context)
{
    MQTT_CLIENT* mqttData = (MQTT_CLIENT*)context;
    if (mqttData != NULL && mqttData->fnOperationCallback)
    {
        /*Codes_SRS_MQTT_CLIENT_07_032: [If the actionResult parameter is of type MQTT_CLIENT_ON_DISCONNECT or MQTT_CLIENT_ON_ERROR the the msgInfo value shall be NULL.]*/
        mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
        mqttData->socketConnected = false;
    }
}

static int cloneMqttOptions(MQTT_CLIENT* mqttData, const MQTT_CLIENT_OPTIONS* mqttOptions)
{
    int result = 0;
    if (mqttOptions->clientId != NULL)
    {
        if (mallocAndStrcpy_s(&mqttData->mqttOptions.clientId, mqttOptions->clientId) != 0)
        {
            result = __LINE__;
        }
    }
    if (result == 0 && mqttOptions->willTopic != NULL)
    {
        if (mallocAndStrcpy_s(&mqttData->mqttOptions.willTopic, mqttOptions->willTopic) != 0)
        {
            result = __LINE__;
        }
    }
    if (result == 0 && mqttOptions->willMessage != NULL)
    {
        if (mallocAndStrcpy_s(&mqttData->mqttOptions.willMessage, mqttOptions->willMessage) != 0)
        {
            result = __LINE__;
        }
    }
    if (result == 0 && mqttOptions->username != NULL)
    {
        if (mallocAndStrcpy_s(&mqttData->mqttOptions.username, mqttOptions->username) != 0)
        {
            result = __LINE__;
        }
    }
    if (result == 0 && mqttOptions->password != NULL)
    {
        if (mallocAndStrcpy_s(&mqttData->mqttOptions.password, mqttOptions->password) != 0)
        {
            result = __LINE__;
        }
    }
    if (result == 0)
    {
        mqttData->mqttOptions.keepAliveInterval = mqttOptions->keepAliveInterval;
        mqttData->mqttOptions.messageRetain = mqttOptions->messageRetain;
        mqttData->mqttOptions.useCleanSession = mqttOptions->useCleanSession;
        mqttData->mqttOptions.qualityOfServiceValue = mqttOptions->qualityOfServiceValue;
    }
    else
    {
        free(mqttData->mqttOptions.clientId);
        free(mqttData->mqttOptions.willTopic);
        free(mqttData->mqttOptions.willMessage);
        free(mqttData->mqttOptions.username);
        free(mqttData->mqttOptions.password);
    }
    return result;
}

static void recvCompleteCallback(void* context, CONTROL_PACKET_TYPE packet, int flags, BUFFER_HANDLE headerData)
{
    MQTT_CLIENT* mqttData = (MQTT_CLIENT*)context;
    if ((mqttData != NULL && headerData != NULL) || packet == PINGRESP_TYPE)
    {
        size_t len = BUFFER_length(headerData);
        uint8_t* iterator = BUFFER_u_char(headerData);

        logIncomingMsgTrace(mqttData, packet, flags, iterator, len);

        if ((iterator != NULL && len > 0) || packet == PINGRESP_TYPE)
        {
            switch (packet)
            {
                case CONNACK_TYPE:
                {
                    if (mqttData->fnOperationCallback != NULL)
                    {
                        /*Codes_SRS_MQTT_CLIENT_07_028: [If the actionResult parameter is of type CONNECT_ACK then the msgInfo value shall be a CONNECT_ACK structure.]*/
                        CONNECT_ACK connack = { 0 };
                        connack.isSessionPresent = (byteutil_readByte(&iterator) == 0x1) ? true : false;
                        connack.returnCode = byteutil_readByte(&iterator);

                        mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_CONNACK, (void*)&connack, mqttData->ctx);

                        if (connack.returnCode == CONNECTION_ACCEPTED)
                        {
                            mqttData->clientConnected = true;
                        }
                    }
                    break;
                }
                case PUBLISH_TYPE:
                {
                    if (mqttData->fnMessageRecv != NULL)
                    {
                        bool isDuplicateMsg = (flags & DUPLICATE_FLAG_MASK) ? true : false;
                        bool isRetainMsg = (flags & RETAIN_FLAG_MASK) ? true : false;
                        QOS_VALUE qosValue = (flags == 0) ? DELIVER_AT_MOST_ONCE : (flags & QOS_LEAST_ONCE_FLAG_MASK) ? DELIVER_AT_LEAST_ONCE : DELIVER_EXACTLY_ONCE;

                        uint8_t* initialPos = iterator;
                        char* topicName = byteutil_readUTF(&iterator, NULL);
                        if (topicName == NULL)
                        {
                            LOG(LOG_ERROR, LOG_LINE, "Publish MSG: failure reading topic name");
                            if (mqttData->fnOperationCallback)
                            {
                                mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
                            }
                        }
                        else
                        {
                            uint16_t packetId = 0;
                            if (qosValue != DELIVER_AT_MOST_ONCE)
                            {
                                packetId = byteutil_read_uint16(&iterator);
                            }
                            size_t length = len - (iterator - initialPos);

                            MQTT_MESSAGE_HANDLE msgHandle = mqttmessage_create(packetId, topicName, qosValue, iterator, length);
                            if (msgHandle == NULL)
                            {
                                LOG(LOG_ERROR, LOG_LINE, "failure in mqttmessage_create");
                                if (mqttData->fnOperationCallback)
                                {
                                    mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
                                }
                            }
                            else
                            {
                                if (mqttmessage_setIsDuplicateMsg(msgHandle, isDuplicateMsg) != 0 ||
                                    mqttmessage_setIsRetained(msgHandle, isRetainMsg) != 0)
                                {
                                    LOG(LOG_ERROR, LOG_LINE, "failure setting mqtt message property");
                                    if (mqttData->fnOperationCallback)
                                    {
                                        mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
                                    }
                                }
                                else
                                {
                                    mqttData->fnMessageRecv(msgHandle, mqttData->ctx);

                                    BUFFER_HANDLE pubRel = NULL;
                                    if (qosValue == DELIVER_EXACTLY_ONCE)
                                    {
                                        pubRel = mqtt_codec_publishReceived(packetId);
                                        if (pubRel == NULL)
                                        {
                                            LOG(LOG_ERROR, LOG_LINE, "Failed to allocate publish receive message.");
                                            if (mqttData->fnOperationCallback)
                                            {
                                                mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
                                            }
                                        }
                                    }
                                    else if (qosValue == DELIVER_AT_LEAST_ONCE)
                                    {
                                        pubRel = mqtt_codec_publishAck(packetId);
                                        if (pubRel == NULL)
                                        {
                                            LOG(LOG_ERROR, LOG_LINE, "Failed to allocate publish ack message.");
                                            if (mqttData->fnOperationCallback)
                                            {
                                                mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
                                            }
                                        }
                                    }
                                    if (pubRel != NULL)
                                    {
                                        (void)sendPacketItem(mqttData, BUFFER_u_char(pubRel), BUFFER_length(pubRel));
                                        BUFFER_delete(pubRel);
                                    }
                                }
                                mqttmessage_destroy(msgHandle);
                            }
                            free(topicName);
                        }
                    }
                    break;
                }
                case PUBACK_TYPE:
                case PUBREC_TYPE:
                case PUBREL_TYPE:
                case PUBCOMP_TYPE:
                {
                    if (mqttData->fnOperationCallback)
                    {
                        /*Codes_SRS_MQTT_CLIENT_07_029: [If the actionResult parameter are of types PUBACK_TYPE, PUBREC_TYPE, PUBREL_TYPE or PUBCOMP_TYPE then the msgInfo value shall be a PUBLISH_ACK structure.]*/
                        MQTT_CLIENT_EVENT_RESULT action = (packet == PUBACK_TYPE) ? MQTT_CLIENT_ON_PUBLISH_ACK :
                            (packet == PUBREC_TYPE) ? MQTT_CLIENT_ON_PUBLISH_RECV :
                            (packet == PUBREL_TYPE) ? MQTT_CLIENT_ON_PUBLISH_REL : MQTT_CLIENT_ON_PUBLISH_COMP;

                        PUBLISH_ACK publish_ack = { 0 };
                        publish_ack.packetId = byteutil_read_uint16(&iterator);

                        BUFFER_HANDLE pubRel = NULL;
                        mqttData->fnOperationCallback(mqttData, action, (void*)&publish_ack, mqttData->ctx);
                        if (packet == PUBREC_TYPE)
                        {
                            pubRel = mqtt_codec_publishRelease(publish_ack.packetId);
                            if (pubRel == NULL)
                            {
                                LOG(LOG_ERROR, LOG_LINE, "Failed to allocate publish release message.");
                                if (mqttData->fnOperationCallback)
                                {
                                    mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
                                }
                            }
                        }
                        else if (packet == PUBREL_TYPE)
                        {
                            pubRel = mqtt_codec_publishComplete(publish_ack.packetId);
                            if (pubRel == NULL)
                            {
                                LOG(LOG_ERROR, LOG_LINE, "Failed to allocate publish complete message.");
                                if (mqttData->fnOperationCallback)
                                {
                                    mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
                                }
                            }
                        }
                        if (pubRel != NULL)
                        {
                            (void)sendPacketItem(mqttData, BUFFER_u_char(pubRel), BUFFER_length(pubRel));
                            BUFFER_delete(pubRel);
                        }
                    }
                    break;
                }
                case SUBACK_TYPE:
                {
                    if (mqttData->fnOperationCallback)
                    {
                        /*Codes_SRS_MQTT_CLIENT_07_030: [If the actionResult parameter is of type SUBACK_TYPE then the msgInfo value shall be a SUBSCRIBE_ACK structure.]*/
                        SUBSCRIBE_ACK suback = { 0 };

                        size_t remainLen = len;
                        suback.packetId = byteutil_read_uint16(&iterator);
                        remainLen -= 2;

                        // Allocate the remaining len
                        suback.qosReturn = (QOS_VALUE*)malloc(sizeof(QOS_VALUE)*remainLen);
                        if (suback.qosReturn != NULL)
                        {
                            while (remainLen > 0)
                            {
                                suback.qosReturn[suback.qosCount++] = byteutil_readByte(&iterator);
                                remainLen--;
                            }
                            mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_SUBSCRIBE_ACK, (void*)&suback, mqttData->ctx);
                            free(suback.qosReturn);
                        }
                        else
                        {
                            LOG(LOG_ERROR, LOG_LINE, "allocation of quality of service value failed.");
                            if (mqttData->fnOperationCallback)
                            {
                                mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
                            }
                        }
                    }
                    break;
                }
                case UNSUBACK_TYPE:
                {
                    if (mqttData->fnOperationCallback)
                    {
                        /*Codes_SRS_MQTT_CLIENT_07_031: [If the actionResult parameter is of type UNSUBACK_TYPE then the msgInfo value shall be a UNSUBSCRIBE_ACK structure.]*/
                        UNSUBSCRIBE_ACK unsuback = { 0 };
                        iterator += VARIABLE_HEADER_OFFSET;
                        unsuback.packetId = byteutil_read_uint16(&iterator);

                        mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_UNSUBSCRIBE_ACK, (void*)&unsuback, mqttData->ctx);
                    }
                    break;
                }
                case PINGRESP_TYPE:
                    mqttData->timeSincePing = 0;
                    // Ping responses do not get forwarded
                    break;
                default:
                    break;
            }
        }
    }
}

MQTT_CLIENT_HANDLE mqtt_client_init(ON_MQTT_MESSAGE_RECV_CALLBACK msgRecv, ON_MQTT_OPERATION_CALLBACK opCallback, void* callbackCtx)
{
    MQTT_CLIENT* result;
    /*Codes_SRS_MQTT_CLIENT_07_001: [If the parameters ON_MQTT_MESSAGE_RECV_CALLBACK is NULL then mqttclient_init shall return NULL.]*/
    if (msgRecv == NULL)
    {
        result = NULL;
    }
    else
    {
        result = malloc(sizeof(MQTT_CLIENT));
        if (result == NULL)
        {
            /*Codes_SRS_MQTT_CLIENT_07_002: [If any failure is encountered then mqttclient_init shall return NULL.]*/
            LOG(LOG_ERROR, LOG_LINE, "mqtt_client_init failure: Allocation Failure");
        }
        else
        {
            /*Codes_SRS_MQTT_CLIENT_07_003: [mqttclient_init shall allocate MQTTCLIENT_DATA_INSTANCE and return the MQTTCLIENT_HANDLE on success.]*/
            result->xioHandle = NULL;
            result->packetState = UNKNOWN_TYPE;
            result->packetSendTimeMs = 0;
            result->fnOperationCallback = opCallback;
            result->fnMessageRecv = msgRecv;
            result->ctx = callbackCtx;
            result->qosValue = DELIVER_AT_MOST_ONCE;
            result->keepAliveInterval = 0;
            result->packetTickCntr = tickcounter_create();
            result->mqttOptions.clientId = NULL;
            result->mqttOptions.willTopic = NULL;
            result->mqttOptions.willMessage = NULL;
            result->mqttOptions.username = NULL;
            result->mqttOptions.password = NULL;
            result->socketConnected = false;
            result->clientConnected = false;
            result->logTrace = false;
            result->rawBytesTrace = false;
            result->timeSincePing = 0;
            result->maxPingRespTime = DEFAULT_MAX_PING_RESPONSE_TIME;
            if (result->packetTickCntr == NULL)
            {
                /*Codes_SRS_MQTT_CLIENT_07_002: [If any failure is encountered then mqttclient_init shall return NULL.]*/
                LOG(LOG_ERROR, LOG_LINE, "mqtt_client_init failure: tickcounter_create failure");
                free(result);
                result = NULL;
            }
            else
            {
                result->codec_handle = mqtt_codec_create(recvCompleteCallback, result);
                if (result->codec_handle == NULL)
                {
                    /*Codes_SRS_MQTT_CLIENT_07_002: [If any failure is encountered then mqttclient_init shall return NULL.]*/
                    LOG(LOG_ERROR, LOG_LINE, "mqtt_client_init failure: mqtt_codec_create failure");
                    tickcounter_destroy(result->packetTickCntr);
                    free(result);
                    result = NULL;
                }
            }
        }
    }
    return result;
}

void mqtt_client_deinit(MQTT_CLIENT_HANDLE handle)
{
    /*Codes_SRS_MQTT_CLIENT_07_004: [If the parameter handle is NULL then function mqtt_client_deinit shall do nothing.]*/
    if (handle != NULL)
    {
        /*Codes_SRS_MQTT_CLIENT_07_005: [mqtt_client_deinit shall deallocate all memory allocated in this unit.]*/
        MQTT_CLIENT* mqttData = (MQTT_CLIENT*)handle;
        tickcounter_destroy(mqttData->packetTickCntr);
        mqtt_codec_destroy(mqttData->codec_handle);
        free(mqttData->mqttOptions.clientId);
        free(mqttData->mqttOptions.willTopic);
        free(mqttData->mqttOptions.willMessage);
        free(mqttData->mqttOptions.username);
        free(mqttData->mqttOptions.password);
        free(mqttData);
    }
}

int mqtt_client_connect(MQTT_CLIENT_HANDLE handle, XIO_HANDLE xioHandle, MQTT_CLIENT_OPTIONS* mqttOptions)
{
    int result;
    /*SRS_MQTT_CLIENT_07_006: [If any of the parameters handle, ioHandle, or mqttOptions are NULL then mqtt_client_connect shall return a non-zero value.]*/
    if (handle == NULL || mqttOptions == NULL)
    {
        LOG(LOG_ERROR, LOG_LINE, "mqtt_client_connect: NULL argument (handle = %p, mqttOptions = %p)", handle, mqttOptions);
        result = __LINE__;
    }
    else
    {
        MQTT_CLIENT* mqttData = (MQTT_CLIENT*)handle;
        if (xioHandle == NULL)
        {
            /*Codes_SRS_MQTT_CLIENT_07_007: [If any failure is encountered then mqtt_client_connect shall return a non-zero value.]*/
            LOG(LOG_ERROR, LOG_LINE, "Error: mqttcodec_connect failed");
            result = __LINE__;
        }
        else
        {
            mqttData->xioHandle = xioHandle;
            mqttData->packetState = UNKNOWN_TYPE;
            mqttData->qosValue = mqttOptions->qualityOfServiceValue;
            mqttData->keepAliveInterval = mqttOptions->keepAliveInterval;
            mqttData->maxPingRespTime = (DEFAULT_MAX_PING_RESPONSE_TIME < mqttOptions->keepAliveInterval/2) ? DEFAULT_MAX_PING_RESPONSE_TIME : mqttOptions->keepAliveInterval/2;
            if (cloneMqttOptions(mqttData, mqttOptions) != 0)
            {
                LOG(LOG_ERROR, LOG_LINE, "Error: Clone Mqtt Options failed");
                result = __LINE__;
            }
            /*Codes_SRS_MQTT_CLIENT_07_008: [mqtt_client_connect shall open the XIO_HANDLE by calling into the xio_open interface.]*/
            else if (xio_open(xioHandle, onOpenComplete, mqttData, onBytesReceived, mqttData, onIoError, mqttData) != 0)
            {
                /*Codes_SRS_MQTT_CLIENT_07_007: [If any failure is encountered then mqtt_client_connect shall return a non-zero value.]*/
                LOG(LOG_ERROR, LOG_LINE, "Error: io_open failed");
                result = __LINE__;
            }
            else
            {
                result = 0;
            }
        }
    }
    return result;
}

int mqtt_client_publish(MQTT_CLIENT_HANDLE handle, MQTT_MESSAGE_HANDLE msgHandle)
{
    int result;
    MQTT_CLIENT* mqttData = (MQTT_CLIENT*)handle;
    if (mqttData == NULL || msgHandle == NULL)
    {
        /*Codes_SRS_MQTT_CLIENT_07_019: [If one of the parameters handle or msgHandle is NULL then mqtt_client_publish shall return a non-zero value.]*/
        result = __LINE__;
    }
    else
    {
        /*Codes_SRS_MQTT_CLIENT_07_021: [mqtt_client_publish shall get the message information from the MQTT_MESSAGE_HANDLE.]*/
        const APP_PAYLOAD* payload = mqttmessage_getApplicationMsg(msgHandle);
        if (payload == NULL)
        {
            /*Codes_SRS_MQTT_CLIENT_07_020: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/
            LOG(LOG_ERROR, LOG_LINE, "Error: mqttmessage_getApplicationMsg failed");
            result = __LINE__;
        }
        else
        {
            BUFFER_HANDLE publishPacket = mqtt_codec_publish(mqttmessage_getQosType(msgHandle), mqttmessage_getIsDuplicateMsg(msgHandle),
                mqttmessage_getIsRetained(msgHandle), mqttmessage_getPacketId(msgHandle), mqttmessage_getTopicName(msgHandle), payload->message, payload->length);
            if (publishPacket == NULL)
            {
                /*Codes_SRS_MQTT_CLIENT_07_020: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/
                LOG(LOG_ERROR, LOG_LINE, "Error: mqtt_codec_publish failed");
                result = __LINE__;
            }
            else
            {
                mqttData->packetState = PUBLISH_TYPE;

                /*Codes_SRS_MQTT_CLIENT_07_022: [On success mqtt_client_publish shall send the MQTT SUBCRIBE packet to the endpoint.]*/
                if (sendPacketItem(mqttData, BUFFER_u_char(publishPacket), BUFFER_length(publishPacket)) != 0)
                {
                    /*Codes_SRS_MQTT_CLIENT_07_020: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/
                    LOG(LOG_ERROR, LOG_LINE, "Error: mqtt_client_publish send failed");
                    result = __LINE__;
                }
                else
                {
                    result = 0;
                }
                BUFFER_delete(publishPacket);
            }
        }
    }
    return result;
}

int mqtt_client_subscribe(MQTT_CLIENT_HANDLE handle, uint16_t packetId, SUBSCRIBE_PAYLOAD* subscribeList, size_t count)
{
    int result;
    MQTT_CLIENT* mqttData = (MQTT_CLIENT*)handle;
    if (mqttData == NULL || subscribeList == NULL || count == 0)
    {
        /*Codes_SRS_MQTT_CLIENT_07_013: [If any of the parameters handle, subscribeList is NULL or count is 0 then mqtt_client_subscribe shall return a non-zero value.]*/
        result = __LINE__;
    }
    else
    {
        BUFFER_HANDLE subPacket = mqtt_codec_subscribe(packetId, subscribeList, count);
        if (subPacket == NULL)
        {
            /*Codes_SRS_MQTT_CLIENT_07_014: [If any failure is encountered then mqtt_client_subscribe shall return a non-zero value.]*/
            LOG(LOG_ERROR, LOG_LINE, "Error: mqtt_codec_subscribe failed");
            result = __LINE__;
        }
        else
        {
            mqttData->packetState = SUBSCRIBE_TYPE;

            /*Codes_SRS_MQTT_CLIENT_07_015: [On success mqtt_client_subscribe shall send the MQTT SUBCRIBE packet to the endpoint.]*/
            if (sendPacketItem(mqttData, BUFFER_u_char(subPacket), BUFFER_length(subPacket)) != 0)
            {
                /*Codes_SRS_MQTT_CLIENT_07_014: [If any failure is encountered then mqtt_client_subscribe shall return a non-zero value.]*/
                LOG(LOG_ERROR, LOG_LINE, "Error: mqtt_client_subscribe send failed");
                result = __LINE__;
            }
            else
            {
                result = 0;
            }
            BUFFER_delete(subPacket);
        }
    }
    return result;
}

int mqtt_client_unsubscribe(MQTT_CLIENT_HANDLE handle, uint16_t packetId, const char** unsubscribeList, size_t count)
{
    int result;
    MQTT_CLIENT* mqttData = (MQTT_CLIENT*)handle;
    if (mqttData == NULL || unsubscribeList == NULL || count == 0)
    {
        /*Codes_SRS_MQTT_CLIENT_07_016: [If any of the parameters handle, unsubscribeList is NULL or count is 0 then mqtt_client_unsubscribe shall return a non-zero value.]*/
        result = __LINE__;
    }
    else
    {
        BUFFER_HANDLE unsubPacket = mqtt_codec_unsubscribe(packetId, unsubscribeList, count);
        if (unsubPacket == NULL)
        {
            /*Codes_SRS_MQTT_CLIENT_07_017: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/
            LOG(LOG_ERROR, LOG_LINE, "Error: mqtt_codec_unsubscribe failed");
            result = __LINE__;
        }
        else
        {
            mqttData->packetState = UNSUBSCRIBE_TYPE;

            /*Codes_SRS_MQTT_CLIENT_07_018: [On success mqtt_client_unsubscribe shall send the MQTT SUBCRIBE packet to the endpoint.]*/
            if (sendPacketItem(mqttData, BUFFER_u_char(unsubPacket), BUFFER_length(unsubPacket)) != 0)
            {
                /*Codes_SRS_MQTT_CLIENT_07_017: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.].]*/
                LOG(LOG_ERROR, LOG_LINE, "Error: mqtt_client_unsubscribe send failed");
                result = __LINE__;
            }
            else
            {
                result = 0;
            }
            BUFFER_delete(unsubPacket);
        }
    }
    return result;
}

int mqtt_client_disconnect(MQTT_CLIENT_HANDLE handle)
{
    int result;
    MQTT_CLIENT* mqttData = (MQTT_CLIENT*)handle;
    if (mqttData == NULL)
    {
        /*Codes_SRS_MQTT_CLIENT_07_010: [If the parameters handle is NULL then mqtt_client_disconnect shall return a non-zero value.]*/
        result = __LINE__;
    }
    else
    {
        BUFFER_HANDLE disconnectPacket = mqtt_codec_disconnect();
        if (disconnectPacket == NULL)
        {
            /*Codes_SRS_MQTT_CLIENT_07_011: [If any failure is encountered then mqtt_client_disconnect shall return a non-zero value.]*/
            LOG(LOG_ERROR, LOG_LINE, "Error: mqtt_client_disconnect failed");
            mqttData->packetState = PACKET_TYPE_ERROR;
            result = __LINE__;
        }
        else
        {
            mqttData->packetState = DISCONNECT_TYPE;

            /*Codes_SRS_MQTT_CLIENT_07_012: [On success mqtt_client_disconnect shall send the MQTT DISCONNECT packet to the endpoint.]*/
            if (sendPacketItem(mqttData, BUFFER_u_char(disconnectPacket), BUFFER_length(disconnectPacket)) != 0)
            {
                /*Codes_SRS_MQTT_CLIENT_07_011: [If any failure is encountered then mqtt_client_disconnect shall return a non-zero value.]*/
                LOG(LOG_ERROR, LOG_LINE, "Error: mqtt_client_disconnect send failed");
                result = __LINE__;
            }
            else
            {
                result = 0;
            }
            BUFFER_delete(disconnectPacket);
        }
    }
    return result;
}

void mqtt_client_dowork(MQTT_CLIENT_HANDLE handle)
{
    MQTT_CLIENT* mqttData = (MQTT_CLIENT*)handle;
    /*Codes_SRS_MQTT_CLIENT_07_023: [If the parameter handle is NULL then mqtt_client_dowork shall do nothing.]*/
    if (mqttData != NULL)
    {
        /*Codes_SRS_MQTT_CLIENT_07_024: [mqtt_client_dowork shall call the xio_dowork function to complete operations.]*/
        xio_dowork(mqttData->xioHandle);

        /*Codes_SRS_MQTT_CLIENT_07_025: [mqtt_client_dowork shall retrieve the the last packet send value and ...]*/
        if (mqttData->socketConnected && mqttData->clientConnected && mqttData->keepAliveInterval > 0)
        {
            uint64_t current_ms;
            if (tickcounter_get_current_ms(mqttData->packetTickCntr, &current_ms) != 0)
            {
                LOG(LOG_ERROR, LOG_LINE, "Error: tickcounter_get_current_ms failed");
            }
            else
            {
                /* Codes_SRS_MQTT_CLIENT_07_035: [If the timeSincePing has expired past the maxPingRespTime then mqtt_client_dowork shall call the Operation Callback function with the message MQTT_CLIENT_NO_PING_RESPONSE] */
                if (mqttData->timeSincePing > 0 && ((current_ms - mqttData->timeSincePing)/1000) > mqttData->maxPingRespTime)
                {
                    // We haven't gotten a ping response in the alloted time
                    if (mqttData->fnOperationCallback != NULL)
                    {
                        mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_NO_PING_RESPONSE, NULL, mqttData->ctx);
                    }
                    mqttData->socketConnected = false;
                    mqttData->clientConnected = false;
                    mqttData->timeSincePing = 0;
                    mqttData->packetSendTimeMs = 0;
                    mqttData->packetState = UNKNOWN_TYPE;
                }
                else if ((((current_ms - mqttData->packetSendTimeMs) / 1000) + KEEP_ALIVE_BUFFER_SEC) > mqttData->keepAliveInterval)
                {
                    /*Codes_SRS_MQTT_CLIENT_07_026: [if keepAliveInternal is > 0 and the send time is greater than the MQTT KeepAliveInterval then it shall construct an MQTT PINGREQ packet.]*/
                    BUFFER_HANDLE pingPacket = mqtt_codec_ping();
                    if (pingPacket != NULL)
                    {
                        (void)sendPacketItem(mqttData, BUFFER_u_char(pingPacket), BUFFER_length(pingPacket));
                        BUFFER_delete(pingPacket);
                        (void)tickcounter_get_current_ms(mqttData->packetTickCntr, &mqttData->timeSincePing);
                    }
                }
            }
        }
    }
}

void mqtt_client_set_trace(MQTT_CLIENT_HANDLE handle, bool traceOn, bool rawBytesOn)
{
    MQTT_CLIENT* mqttData = (MQTT_CLIENT*)handle;
    if (mqttData != NULL)
    {
        mqttData->logTrace = traceOn;
        mqttData->rawBytesTrace = rawBytesOn;
    }
}