A small footprint MQTT library
Dependents: STM32F746_iothub_client_sample_mqtt FXOS8700CQ_To_Azure_IoT f767zi_mqtt FXOS8700CQ_To_Azure_IoT ... more
Diff: mqtt_client.c
- Revision:
- 0:ef4901974abc
- Child:
- 1:8dba42ff9701
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/mqtt_client.c Fri Apr 08 12:01:23 2016 -0700 @@ -0,0 +1,795 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +#include <stdlib.h> +#include "azure_c_shared_utility/gballoc.h" +#include "azure_c_shared_utility/platform.h" +#include "azure_c_shared_utility/tickcounter.h" +#include "azure_c_shared_utility/crt_abstractions.h" + +#include "azure_umqtt_c/mqtt_client.h" +#include "azure_umqtt_c/mqtt_codec.h" + +#define KEEP_ALIVE_BUFFER_SEC 10 +#define VARIABLE_HEADER_OFFSET 2 +#define RETAIN_FLAG_MASK 0x1 +#define QOS_LEAST_ONCE_FLAG_MASK 0x2 +#define QOS_EXACTLY_ONCE_FLAG_MASK 0x4 +#define DUPLICATE_FLAG_MASK 0x8 +#define CONNECT_PACKET_MASK 0xf0 + +static const char* FORMAT_HEX_CHAR = "0x%02x "; + +typedef struct MQTT_CLIENT_TAG +{ + XIO_HANDLE xioHandle; + MQTTCODEC_HANDLE codec_handle; + CONTROL_PACKET_TYPE packetState; + LOGGER_LOG logFunc; + TICK_COUNTER_HANDLE packetTickCntr; + uint64_t packetSendTimeMs; + ON_MQTT_OPERATION_CALLBACK fnOperationCallback; + ON_MQTT_MESSAGE_RECV_CALLBACK fnMessageRecv; + void* ctx; + QOS_VALUE qosValue; + uint16_t keepAliveInterval; + MQTT_CLIENT_OPTIONS mqttOptions; + bool clientConnected; + bool socketConnected; + bool logTrace; + bool rawBytesTrace; +} MQTT_CLIENT; + +static uint16_t byteutil_read_uint16(uint8_t** buffer) +{ + uint16_t result = 0; + if (buffer != NULL) + { + result = 256 * ((uint8_t)(**buffer)) + (uint8_t)(*(*buffer + 1)); + *buffer += 2; // Move the ptr + } + return result; +} + +static char* byteutil_readUTF(uint8_t** buffer, size_t* byteLen) +{ + char* result = NULL; + if (buffer != NULL) + { + // Get the length of the string + int len = byteutil_read_uint16(buffer); + if (len > 0) + { + result = (char*)malloc(len + 1); + if (result != NULL) + { + (void)memcpy(result, *buffer, len); + result[len] = '\0'; + *buffer += len; + if (byteLen != NULL) + { + *byteLen = len; + } + } + } + } + return result; +} + +static uint8_t byteutil_readByte(uint8_t** buffer) +{ + uint8_t result = 0; + if (buffer != NULL) + { + result = **buffer; + (*buffer)++; + } + return result; +} + +static void sendComplete(void* context, IO_SEND_RESULT send_result) +{ + MQTT_CLIENT* mqttData = (MQTT_CLIENT*)context; + if (mqttData != NULL && mqttData->fnOperationCallback != NULL) + { + if (mqttData->packetState == DISCONNECT_TYPE) + { + /*Codes_SRS_MQTT_CLIENT_07_032: [If the actionResult parameter is of type MQTT_CLIENT_ON_DISCONNECT or MQTT_CLIENT_ON_ERROR the the msgInfo value shall be NULL.]*/ + mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_DISCONNECT, NULL, mqttData->ctx); + + // close the xio + (void)xio_close(mqttData->xioHandle, NULL, mqttData->ctx); + mqttData->socketConnected = false; + mqttData->clientConnected = false; + } + } +} + +static const char* retrievePacketType(CONTROL_PACKET_TYPE packet) +{ + switch (packet&CONNECT_PACKET_MASK) + { + case CONNECT_TYPE: return "CONNECT"; + case CONNACK_TYPE: return "CONNACK"; + case PUBLISH_TYPE: return "PUBLISH"; + case PUBACK_TYPE: return "PUBACK"; + case PUBREC_TYPE: return "PUBREC"; + case PUBREL_TYPE: return "PUBREL"; + case SUBSCRIBE_TYPE: return "SUBSCRIBE"; + case SUBACK_TYPE: return "SUBACK"; + case UNSUBSCRIBE_TYPE: return "UNSUBSCRIBE"; + case UNSUBACK_TYPE: return "UNSUBACK"; + case PINGREQ_TYPE: return "PINGREQ"; + case PINGRESP_TYPE: return "PINGRESP"; + case DISCONNECT_TYPE: return "DISCONNECT"; + default: + case PACKET_TYPE_ERROR: + case UNKNOWN_TYPE: + return "UNKNOWN"; + } +} + +static void logOutgoingingMsgTrace(MQTT_CLIENT* clientData, const uint8_t* data, size_t length) +{ + if (clientData != NULL && data != NULL && length > 0 && clientData->logTrace) + { + LOG(clientData->logFunc, 0, "-> %s: ", retrievePacketType((unsigned char)data[0])); + for (size_t index = 0; index < length; index++) + { + LOG(clientData->logFunc, 0, (char*)FORMAT_HEX_CHAR, (unsigned char)data[index]); + } + LOG(clientData->logFunc, LOG_LINE, ""); + } +} + +static void logIncomingMsgTrace(MQTT_CLIENT* clientData, CONTROL_PACKET_TYPE packet, int flags, const uint8_t* data, size_t length) +{ + if (clientData != NULL && data != NULL && length > 0 && clientData->logTrace) + { + LOG(clientData->logFunc, 0, "<- %s: 0x%02x 0x%02x ", retrievePacketType((unsigned char)packet), (unsigned char)(packet | flags), length); + for (size_t index = 0; index < length; index++) + { + LOG(clientData->logFunc, 0, (char*)FORMAT_HEX_CHAR, (unsigned char)data[index]); + } + LOG(clientData->logFunc, LOG_LINE, ""); + } +} + +static int sendPacketItem(MQTT_CLIENT* clientData, const int8_t* data, size_t length) +{ + int result; + logOutgoingingMsgTrace(clientData, data, length); + + if (tickcounter_get_current_ms(clientData->packetTickCntr, &clientData->packetSendTimeMs) != 0) + { + LOG(clientData->logFunc, LOG_LINE, "Failure getting current ms tickcounter"); + result = __LINE__; + } + else + { + result = xio_send(clientData->xioHandle, data, length, sendComplete, clientData); + if (result != 0) + { + LOG(clientData->logFunc, LOG_LINE, "%d: Failure sending control packet data", result); + result = __LINE__; + } + } + return result; +} + +static void onOpenComplete(void* context, IO_OPEN_RESULT open_result) +{ + MQTT_CLIENT* mqttData = (MQTT_CLIENT*)context; + if (mqttData != NULL) + { + if (open_result == IO_OPEN_OK && !mqttData->socketConnected) + { + mqttData->packetState = CONNECT_TYPE; + mqttData->socketConnected = true; + // Send the Connect packet + BUFFER_HANDLE connPacket = mqtt_codec_connect(&mqttData->mqttOptions); + if (connPacket == NULL) + { + /*Codes_SRS_MQTT_CLIENT_07_007: [If any failure is encountered then mqtt_client_connect shall return a non-zero value.]*/ + LOG(mqttData->logFunc, LOG_LINE, "Error: mqtt_codec_connect failed"); + } + else + { + /*Codes_SRS_MQTT_CLIENT_07_009: [On success mqtt_client_connect shall send the MQTT CONNECT to the endpoint.]*/ + if (sendPacketItem(mqttData, BUFFER_u_char(connPacket), BUFFER_length(connPacket)) != 0) + { + /*Codes_SRS_MQTT_CLIENT_07_007: [If any failure is encountered then mqtt_client_connect shall return a non-zero value.]*/ + LOG(mqttData->logFunc, LOG_LINE, "Error: mqtt_codec_connect failed"); + } + BUFFER_delete(connPacket); + } + } + else if (open_result == IO_OPEN_ERROR) + { + (void)mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx); + } + } +} + +static void onBytesReceived(void* context, const unsigned char* buffer, size_t size) +{ + MQTT_CLIENT* mqttData = (MQTT_CLIENT*)context; + if (mqttData != NULL) + { + if (mqtt_codec_bytesReceived(mqttData->codec_handle, buffer, size) != 0) + { + if (mqttData->fnOperationCallback) + { + mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx); + } + } + } +} + +static void onIoError(void* context) +{ + MQTT_CLIENT* mqttData = (MQTT_CLIENT*)context; + if (mqttData != NULL && mqttData->fnOperationCallback) + { + /*Codes_SRS_MQTT_CLIENT_07_032: [If the actionResult parameter is of type MQTT_CLIENT_ON_DISCONNECT or MQTT_CLIENT_ON_ERROR the the msgInfo value shall be NULL.]*/ + mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx); + mqttData->socketConnected = false; + } +} + +static int cloneMqttOptions(MQTT_CLIENT* mqttData, const MQTT_CLIENT_OPTIONS* mqttOptions) +{ + int result = 0; + if (mqttOptions->clientId != NULL) + { + if (mallocAndStrcpy_s(&mqttData->mqttOptions.clientId, mqttOptions->clientId) != 0) + { + result = __LINE__; + } + } + if (result == 0 && mqttOptions->willTopic != NULL) + { + if (mallocAndStrcpy_s(&mqttData->mqttOptions.willTopic, mqttOptions->willTopic) != 0) + { + result = __LINE__; + } + } + if (result == 0 && mqttOptions->willMessage != NULL) + { + if (mallocAndStrcpy_s(&mqttData->mqttOptions.willMessage, mqttOptions->willMessage) != 0) + { + result = __LINE__; + } + } + if (result == 0 && mqttOptions->username != NULL) + { + if (mallocAndStrcpy_s(&mqttData->mqttOptions.username, mqttOptions->username) != 0) + { + result = __LINE__; + } + } + if (result == 0 && mqttOptions->password != NULL) + { + if (mallocAndStrcpy_s(&mqttData->mqttOptions.password, mqttOptions->password) != 0) + { + result = __LINE__; + } + } + if (result == 0) + { + mqttData->mqttOptions.keepAliveInterval = mqttOptions->keepAliveInterval; + mqttData->mqttOptions.messageRetain = mqttOptions->messageRetain; + mqttData->mqttOptions.useCleanSession = mqttOptions->useCleanSession; + mqttData->mqttOptions.qualityOfServiceValue = mqttOptions->qualityOfServiceValue; + } + else + { + free(mqttData->mqttOptions.clientId); + free(mqttData->mqttOptions.willTopic); + free(mqttData->mqttOptions.willMessage); + free(mqttData->mqttOptions.username); + free(mqttData->mqttOptions.password); + } + return result; +} + +static void recvCompleteCallback(void* context, CONTROL_PACKET_TYPE packet, int flags, BUFFER_HANDLE headerData) +{ + MQTT_CLIENT* mqttData = (MQTT_CLIENT*)context; + if (mqttData != NULL && headerData != NULL) + { + size_t len = BUFFER_length(headerData); + uint8_t* iterator = BUFFER_u_char(headerData); + + logIncomingMsgTrace(mqttData, packet, flags, iterator, len); + + if (iterator != NULL && len > 0) + { + switch (packet) + { + case CONNACK_TYPE: + { + if (mqttData->fnOperationCallback != NULL) + { + /*Codes_SRS_MQTT_CLIENT_07_028: [If the actionResult parameter is of type CONNECT_ACK then the msgInfo value shall be a CONNECT_ACK structure.]*/ + CONNECT_ACK connack = { 0 }; + connack.isSessionPresent = (byteutil_readByte(&iterator) == 0x1) ? true : false; + connack.returnCode = byteutil_readByte(&iterator); + + mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_CONNACK, (void*)&connack, mqttData->ctx); + + if (connack.returnCode == CONNECTION_ACCEPTED) + { + mqttData->clientConnected = true; + } + } + break; + } + case PUBLISH_TYPE: + { + if (mqttData->fnMessageRecv != NULL) + { + //uint8_t ctrlPacket = byteutil_readByte(&iterator); + bool isDuplicateMsg = (flags & DUPLICATE_FLAG_MASK) ? true : false; + bool isRetainMsg = (flags & RETAIN_FLAG_MASK) ? true : false; + QOS_VALUE qosValue = (flags == 0) ? DELIVER_AT_MOST_ONCE : (flags & QOS_LEAST_ONCE_FLAG_MASK) ? DELIVER_AT_LEAST_ONCE : DELIVER_EXACTLY_ONCE; + + uint8_t* initialPos = iterator; + char* topicName = byteutil_readUTF(&iterator, NULL); + uint16_t packetId = 0; + if (qosValue != DELIVER_AT_MOST_ONCE) + { + packetId = byteutil_read_uint16(&iterator); + } + size_t length = len - (iterator - initialPos); + + MQTT_MESSAGE_HANDLE msgHandle = mqttmessage_create(packetId, topicName, qosValue, iterator, length); + if (msgHandle == NULL) + { + LOG(mqttData->logFunc, LOG_LINE, "failure in mqttmessage_create"); + } + else + { + (void)mqttmessage_setIsDuplicateMsg(msgHandle, isDuplicateMsg); + (void)mqttmessage_setIsRetained(msgHandle, isRetainMsg); + mqttData->fnMessageRecv(msgHandle, mqttData->ctx); + + BUFFER_HANDLE pubRel = NULL; + if (qosValue == DELIVER_EXACTLY_ONCE) + { + pubRel = mqtt_codec_publishReceived(packetId); + } + else if (qosValue == DELIVER_AT_LEAST_ONCE) + { + pubRel = mqtt_codec_publishAck(packetId); + } + if (pubRel != NULL) + { + (void)sendPacketItem(mqttData, BUFFER_u_char(pubRel), BUFFER_length(pubRel)); + BUFFER_delete(pubRel); + } + free(topicName); + mqttmessage_destroy(msgHandle); + } + } + break; + } + case PUBACK_TYPE: + case PUBREC_TYPE: + case PUBREL_TYPE: + case PUBCOMP_TYPE: + { + if (mqttData->fnOperationCallback) + { + /*Codes_SRS_MQTT_CLIENT_07_029: [If the actionResult parameter are of types PUBACK_TYPE, PUBREC_TYPE, PUBREL_TYPE or PUBCOMP_TYPE then the msgInfo value shall be a PUBLISH_ACK structure.]*/ + MQTT_CLIENT_EVENT_RESULT action = (packet == PUBACK_TYPE) ? MQTT_CLIENT_ON_PUBLISH_ACK : + (packet == PUBREC_TYPE) ? MQTT_CLIENT_ON_PUBLISH_RECV : + (packet == PUBREL_TYPE) ? MQTT_CLIENT_ON_PUBLISH_REL : MQTT_CLIENT_ON_PUBLISH_COMP; + + PUBLISH_ACK publish_ack = { 0 }; + publish_ack.packetId = byteutil_read_uint16(&iterator); + + BUFFER_HANDLE pubRel = NULL; + mqttData->fnOperationCallback(mqttData, action, (void*)&publish_ack, mqttData->ctx); + if (packet == PUBREC_TYPE) + { + pubRel = mqtt_codec_publishRelease(publish_ack.packetId); + } + else if (packet == PUBREL_TYPE) + { + pubRel = mqtt_codec_publishComplete(publish_ack.packetId); + } + if (pubRel != NULL) + { + (void)sendPacketItem(mqttData, BUFFER_u_char(pubRel), BUFFER_length(pubRel)); + BUFFER_delete(pubRel); + } + } + break; + } + case SUBACK_TYPE: + { + if (mqttData->fnOperationCallback) + { + /*Codes_SRS_MQTT_CLIENT_07_030: [If the actionResult parameter is of type SUBACK_TYPE then the msgInfo value shall be a SUBSCRIBE_ACK structure.]*/ + SUBSCRIBE_ACK suback = { 0 }; + + size_t remainLen = len; + suback.packetId = byteutil_read_uint16(&iterator); + remainLen -= 2; + + // Allocate the remaining len + suback.qosReturn = (QOS_VALUE*)malloc(sizeof(QOS_VALUE)*remainLen); + if (suback.qosReturn != NULL) + { + while (remainLen > 0) + { + suback.qosReturn[suback.qosCount++] = byteutil_readByte(&iterator); + remainLen--; + } + (void)mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_SUBSCRIBE_ACK, (void*)&suback, mqttData->ctx); + free(suback.qosReturn); + } + } + break; + } + case UNSUBACK_TYPE: + { + if (mqttData->fnOperationCallback) + { + /*Codes_SRS_MQTT_CLIENT_07_031: [If the actionResult parameter is of type UNSUBACK_TYPE then the msgInfo value shall be a UNSUBSCRIBE_ACK structure.]*/ + UNSUBSCRIBE_ACK unsuback = { 0 }; + iterator += VARIABLE_HEADER_OFFSET; + unsuback.packetId = byteutil_read_uint16(&iterator); + + (void)mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_UNSUBSCRIBE_ACK, (void*)&unsuback, mqttData->ctx); + } + break; + } + case PINGRESP_TYPE: + // Ping responses do not get forwarded + break; + default: + break; + } + } + } +} + +MQTT_CLIENT_HANDLE mqtt_client_init(ON_MQTT_MESSAGE_RECV_CALLBACK msgRecv, ON_MQTT_OPERATION_CALLBACK opCallback, void* callbackCtx, LOGGER_LOG logger) +{ + MQTT_CLIENT* result; + /*Codes_SRS_MQTT_CLIENT_07_001: [If the parameters ON_MQTT_MESSAGE_RECV_CALLBACK is NULL then mqttclient_init shall return NULL.]*/ + if (msgRecv == NULL) + { + result = NULL; + } + else + { + result = malloc(sizeof(MQTT_CLIENT)); + if (result == NULL) + { + /*Codes_SRS_MQTT_CLIENT_07_002: [If any failure is encountered then mqttclient_init shall return NULL.]*/ + LOG(logger, LOG_LINE, "mqtt_client_inti failure: Allocation Failure"); + } + else + { + /*Codes_SRS_MQTT_CLIENT_07_003: [mqttclient_init shall allocate MQTTCLIENT_DATA_INSTANCE and return the MQTTCLIENT_HANDLE on success.]*/ + result->xioHandle = NULL; + result->packetState = UNKNOWN_TYPE; + result->logFunc = logger; + result->packetSendTimeMs = 0; + result->fnOperationCallback = opCallback; + result->fnMessageRecv = msgRecv; + result->ctx = callbackCtx; + result->qosValue = DELIVER_AT_MOST_ONCE; + result->keepAliveInterval = 0; + result->packetTickCntr = tickcounter_create(); + result->mqttOptions.clientId = NULL; + result->mqttOptions.willTopic = NULL; + result->mqttOptions.willMessage = NULL; + result->mqttOptions.username = NULL; + result->mqttOptions.password = NULL; + result->socketConnected = false; + result->clientConnected = false; + result->logTrace = false; + result->rawBytesTrace = false; + if (result->packetTickCntr == NULL) + { + /*Codes_SRS_MQTT_CLIENT_07_002: [If any failure is encountered then mqttclient_init shall return NULL.]*/ + LOG(logger, LOG_LINE, "mqtt_client_init failure: tickcounter_create failure"); + free(result); + result = NULL; + } + else + { + result->codec_handle = mqtt_codec_create(recvCompleteCallback, result); + if (result->codec_handle == NULL) + { + /*Codes_SRS_MQTT_CLIENT_07_002: [If any failure is encountered then mqttclient_init shall return NULL.]*/ + LOG(logger, LOG_LINE, "mqtt_client_init failure: mqtt_codec_create failure"); + tickcounter_destroy(result->packetTickCntr); + free(result); + result = NULL; + } + } + } + } + return result; +} + +void mqtt_client_deinit(MQTT_CLIENT_HANDLE handle) +{ + /*Codes_SRS_MQTT_CLIENT_07_004: [If the parameter handle is NULL then function mqtt_client_deinit shall do nothing.]*/ + if (handle != NULL) + { + /*Codes_SRS_MQTT_CLIENT_07_005: [mqtt_client_deinit shall deallocate all memory allocated in this unit.]*/ + MQTT_CLIENT* mqttData = (MQTT_CLIENT*)handle; + tickcounter_destroy(mqttData->packetTickCntr); + mqtt_codec_destroy(mqttData->codec_handle); + free(mqttData->mqttOptions.clientId); + free(mqttData->mqttOptions.willTopic); + free(mqttData->mqttOptions.willMessage); + free(mqttData->mqttOptions.username); + free(mqttData->mqttOptions.password); + free(mqttData); + } +} + +int mqtt_client_connect(MQTT_CLIENT_HANDLE handle, XIO_HANDLE xioHandle, MQTT_CLIENT_OPTIONS* mqttOptions) +{ + int result; + /*SRS_MQTT_CLIENT_07_006: [If any of the parameters handle, ioHandle, or mqttOptions are NULL then mqtt_client_connect shall return a non-zero value.]*/ + if (handle == NULL || mqttOptions == NULL) + { + result = __LINE__; + } + else + { + MQTT_CLIENT* mqttData = (MQTT_CLIENT*)handle; + if (xioHandle == NULL) + { + /*Codes_SRS_MQTT_CLIENT_07_007: [If any failure is encountered then mqtt_client_connect shall return a non-zero value.]*/ + LOG(mqttData->logFunc, LOG_LINE, "Error: mqttcodec_connect failed"); + result = __LINE__; + } + else + { + mqttData->xioHandle = xioHandle; + mqttData->packetState = UNKNOWN_TYPE; + mqttData->qosValue = mqttOptions->qualityOfServiceValue; + mqttData->keepAliveInterval = mqttOptions->keepAliveInterval; + if (cloneMqttOptions(mqttData, mqttOptions) != 0) + { + LOG(mqttData->logFunc, LOG_LINE, "Error: Clone Mqtt Options failed"); + result = __LINE__; + } + /*Codes_SRS_MQTT_CLIENT_07_008: [mqtt_client_connect shall open the XIO_HANDLE by calling into the xio_open interface.]*/ + else if (xio_open(xioHandle, onOpenComplete, mqttData, onBytesReceived, mqttData, onIoError, mqttData) != 0) + { + /*Codes_SRS_MQTT_CLIENT_07_007: [If any failure is encountered then mqtt_client_connect shall return a non-zero value.]*/ + LOG(mqttData->logFunc, LOG_LINE, "Error: io_open failed"); + result = __LINE__; + } + else + { + result = 0; + } + } + } + return result; +} + +int mqtt_client_publish(MQTT_CLIENT_HANDLE handle, MQTT_MESSAGE_HANDLE msgHandle) +{ + int result; + MQTT_CLIENT* mqttData = (MQTT_CLIENT*)handle; + if (mqttData == NULL || msgHandle == NULL) + { + /*Codes_SRS_MQTT_CLIENT_07_019: [If one of the parameters handle or msgHandle is NULL then mqtt_client_publish shall return a non-zero value.]*/ + result = __LINE__; + } + else + { + /*Codes_SRS_MQTT_CLIENT_07_021: [mqtt_client_publish shall get the message information from the MQTT_MESSAGE_HANDLE.]*/ + const APP_PAYLOAD* payload = mqttmessage_getApplicationMsg(msgHandle); + if (payload == NULL) + { + /*Codes_SRS_MQTT_CLIENT_07_020: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/ + LOG(mqttData->logFunc, LOG_LINE, "Error: mqttmessage_getApplicationMsg failed"); + result = __LINE__; + } + else + { + BUFFER_HANDLE publishPacket = mqtt_codec_publish(mqttmessage_getQosType(msgHandle), mqttmessage_getIsDuplicateMsg(msgHandle), + mqttmessage_getIsRetained(msgHandle), mqttmessage_getPacketId(msgHandle), mqttmessage_getTopicName(msgHandle), payload->message, payload->length); + if (publishPacket == NULL) + { + /*Codes_SRS_MQTT_CLIENT_07_020: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/ + LOG(mqttData->logFunc, LOG_LINE, "Error: mqtt_codec_publish failed"); + result = __LINE__; + } + else + { + mqttData->packetState = PUBLISH_TYPE; + + /*Codes_SRS_MQTT_CLIENT_07_022: [On success mqtt_client_publish shall send the MQTT SUBCRIBE packet to the endpoint.]*/ + if (sendPacketItem(mqttData, BUFFER_u_char(publishPacket), BUFFER_length(publishPacket)) != 0) + { + /*Codes_SRS_MQTT_CLIENT_07_020: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/ + LOG(mqttData->logFunc, LOG_LINE, "Error: mqtt_client_publish send failed"); + result = __LINE__; + } + else + { + result = 0; + } + BUFFER_delete(publishPacket); + } + } + } + return result; +} + +int mqtt_client_subscribe(MQTT_CLIENT_HANDLE handle, uint16_t packetId, SUBSCRIBE_PAYLOAD* subscribeList, size_t count) +{ + int result; + MQTT_CLIENT* mqttData = (MQTT_CLIENT*)handle; + if (mqttData == NULL || subscribeList == NULL || count == 0) + { + /*Codes_SRS_MQTT_CLIENT_07_013: [If any of the parameters handle, subscribeList is NULL or count is 0 then mqtt_client_subscribe shall return a non-zero value.]*/ + result = __LINE__; + } + else + { + BUFFER_HANDLE subPacket = mqtt_codec_subscribe(packetId, subscribeList, count); + if (subPacket == NULL) + { + /*Codes_SRS_MQTT_CLIENT_07_014: [If any failure is encountered then mqtt_client_subscribe shall return a non-zero value.]*/ + LOG(mqttData->logFunc, LOG_LINE, "Error: mqtt_codec_subscribe failed"); + result = __LINE__; + } + else + { + mqttData->packetState = SUBSCRIBE_TYPE; + + /*Codes_SRS_MQTT_CLIENT_07_015: [On success mqtt_client_subscribe shall send the MQTT SUBCRIBE packet to the endpoint.]*/ + if (sendPacketItem(mqttData, BUFFER_u_char(subPacket), BUFFER_length(subPacket)) != 0) + { + /*Codes_SRS_MQTT_CLIENT_07_014: [If any failure is encountered then mqtt_client_subscribe shall return a non-zero value.]*/ + LOG(mqttData->logFunc, LOG_LINE, "Error: mqtt_client_subscribe send failed"); + result = __LINE__; + } + else + { + result = 0; + } + BUFFER_delete(subPacket); + } + } + return result; +} + +int mqtt_client_unsubscribe(MQTT_CLIENT_HANDLE handle, uint16_t packetId, const char** unsubscribeList, size_t count) +{ + int result; + MQTT_CLIENT* mqttData = (MQTT_CLIENT*)handle; + if (mqttData == NULL || unsubscribeList == NULL || count == 0) + { + /*Codes_SRS_MQTT_CLIENT_07_016: [If any of the parameters handle, unsubscribeList is NULL or count is 0 then mqtt_client_unsubscribe shall return a non-zero value.]*/ + result = __LINE__; + } + else + { + BUFFER_HANDLE unsubPacket = mqtt_codec_unsubscribe(packetId, unsubscribeList, count); + if (unsubPacket == NULL) + { + /*Codes_SRS_MQTT_CLIENT_07_017: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/ + LOG(mqttData->logFunc, LOG_LINE, "Error: mqtt_codec_unsubscribe failed"); + result = __LINE__; + } + else + { + mqttData->packetState = UNSUBSCRIBE_TYPE; + + /*Codes_SRS_MQTT_CLIENT_07_018: [On success mqtt_client_unsubscribe shall send the MQTT SUBCRIBE packet to the endpoint.]*/ + LOG(mqttData->logFunc, LOG_LINE, "MQTT unsubscribe"); + if (sendPacketItem(mqttData, BUFFER_u_char(unsubPacket), BUFFER_length(unsubPacket)) != 0) + { + /*Codes_SRS_MQTT_CLIENT_07_017: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.].]*/ + LOG(mqttData->logFunc, LOG_LINE, "Error: mqtt_client_unsubscribe send failed"); + result = __LINE__; + } + else + { + result = 0; + } + BUFFER_delete(unsubPacket); + } + } + return result; +} + +int mqtt_client_disconnect(MQTT_CLIENT_HANDLE handle) +{ + int result; + MQTT_CLIENT* mqttData = (MQTT_CLIENT*)handle; + if (mqttData == NULL) + { + /*Codes_SRS_MQTT_CLIENT_07_010: [If the parameters handle is NULL then mqtt_client_disconnect shall return a non-zero value.]*/ + result = __LINE__; + } + else + { + mqttData->packetState = DISCONNECT_TYPE; + BUFFER_HANDLE disconnectPacket = mqtt_codec_disconnect(); + if (disconnectPacket == NULL) + { + /*Codes_SRS_MQTT_CLIENT_07_011: [If any failure is encountered then mqtt_client_disconnect shall return a non-zero value.]*/ + LOG(mqttData->logFunc, LOG_LINE, "Error: mqtt_client_disconnect failed"); + result = __LINE__; + } + else + { + mqttData->packetState = DISCONNECT_TYPE; + + /*Codes_SRS_MQTT_CLIENT_07_012: [On success mqtt_client_disconnect shall send the MQTT DISCONNECT packet to the endpoint.]*/ + if (sendPacketItem(mqttData, BUFFER_u_char(disconnectPacket), BUFFER_length(disconnectPacket)) != 0) + { + /*Codes_SRS_MQTT_CLIENT_07_011: [If any failure is encountered then mqtt_client_disconnect shall return a non-zero value.]*/ + LOG(mqttData->logFunc, LOG_LINE, "Error: mqtt_client_disconnect send failed"); + result = __LINE__; + } + else + { + result = 0; + } + BUFFER_delete(disconnectPacket); + } + } + return result; +} + +void mqtt_client_dowork(MQTT_CLIENT_HANDLE handle) +{ + MQTT_CLIENT* mqttData = (MQTT_CLIENT*)handle; + /*Codes_SRS_MQTT_CLIENT_07_023: [If the parameter handle is NULL then mqtt_client_dowork shall do nothing.]*/ + if (mqttData != NULL) + { + /*Codes_SRS_MQTT_CLIENT_07_024: [mqtt_client_dowork shall call the xio_dowork function to complete operations.]*/ + xio_dowork(mqttData->xioHandle); + + /*Codes_SRS_MQTT_CLIENT_07_025: [mqtt_client_dowork shall retrieve the the last packet send value and ...]*/ + if (mqttData->socketConnected && mqttData->clientConnected && mqttData->keepAliveInterval > 0) + { + uint64_t current_ms; + if (tickcounter_get_current_ms(mqttData->packetTickCntr, ¤t_ms) != 0) + { + LOG(mqttData->logFunc, LOG_LINE, "Error: tickcounter_get_current_ms failed"); + } + else + { + if ((((current_ms - mqttData->packetSendTimeMs) / 1000) + KEEP_ALIVE_BUFFER_SEC) > mqttData->keepAliveInterval) + { + /*Codes_SRS_MQTT_CLIENT_07_026: [if keepAliveInternal is > 0 and the send time is greater than the MQTT KeepAliveInterval then it shall construct an MQTT PINGREQ packet.]*/ + BUFFER_HANDLE pingPacket = mqtt_codec_ping(); + if (pingPacket != NULL) + { + (void)sendPacketItem(mqttData, BUFFER_u_char(pingPacket), BUFFER_length(pingPacket)); + BUFFER_delete(pingPacket); + } + } + } + } + } +} + +void mqtt_client_set_trace(MQTT_CLIENT_HANDLE handle, bool traceOn, bool rawBytesOn) +{ + MQTT_CLIENT* mqttData = (MQTT_CLIENT*)handle; + if (mqttData != NULL) + { + mqttData->logTrace = traceOn; + mqttData->rawBytesTrace = rawBytesOn; + } +}