Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependents: STM32F746_iothub_client_sample_mqtt FXOS8700CQ_To_Azure_IoT f767zi_mqtt FXOS8700CQ_To_Azure_IoT ... more
Revision 0:ef4901974abc, committed 2016-04-08
- Comitter:
- Azure.IoT Build
- Date:
- Fri Apr 08 12:01:23 2016 -0700
- Child:
- 1:8dba42ff9701
- Commit message:
- 1.0.4
Changed in this revision
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/azure_umqtt_c/mqtt_client.h Fri Apr 08 12:01:23 2016 -0700
@@ -0,0 +1,58 @@
+// Copyright (c) Microsoft. All rights reserved.
+// Licensed under the MIT license. See LICENSE file in the project root for full license information.
+
+#ifndef MQTT_CLIENT_H
+#define MQTT_CLIENT_H
+
+#ifdef __cplusplus
+#include <cstdint>
+extern "C" {
+#else
+#include <stdint.h>
+#endif // __cplusplus
+
+#include "azure_c_shared_utility/xio.h"
+#include "azure_c_shared_utility/xlogging.h"
+#include "azure_c_shared_utility/macro_utils.h"
+#include "azure_c_shared_utility/list.h"
+#include "azure_umqtt_c/mqttconst.h"
+#include "azure_umqtt_c/mqtt_message.h"
+
+typedef struct MQTT_CLIENT_TAG* MQTT_CLIENT_HANDLE;
+
+#define MQTT_CLIENT_EVENT_VALUES \
+ MQTT_CLIENT_ON_CONNACK, \
+ MQTT_CLIENT_ON_PUBLISH_ACK, \
+ MQTT_CLIENT_ON_PUBLISH_RECV, \
+ MQTT_CLIENT_ON_PUBLISH_REL, \
+ MQTT_CLIENT_ON_PUBLISH_COMP, \
+ MQTT_CLIENT_ON_SUBSCRIBE_ACK, \
+ MQTT_CLIENT_ON_UNSUBSCRIBE_ACK, \
+ MQTT_CLIENT_ON_DISCONNECT, \
+ MQTT_CLIENT_ON_ERROR
+
+DEFINE_ENUM(MQTT_CLIENT_EVENT_RESULT, MQTT_CLIENT_EVENT_VALUES);
+
+typedef void(*ON_MQTT_OPERATION_CALLBACK)(MQTT_CLIENT_HANDLE handle, MQTT_CLIENT_EVENT_RESULT actionResult, const void* msgInfo, void* callbackCtx);
+typedef void(*ON_MQTT_MESSAGE_RECV_CALLBACK)(MQTT_MESSAGE_HANDLE msgHandle, void* callbackCtx);
+
+extern MQTT_CLIENT_HANDLE mqtt_client_init(ON_MQTT_MESSAGE_RECV_CALLBACK msgRecv, ON_MQTT_OPERATION_CALLBACK opCallback, void* callbackCtx, LOGGER_LOG logger);
+extern void mqtt_client_deinit(MQTT_CLIENT_HANDLE handle);
+
+extern int mqtt_client_connect(MQTT_CLIENT_HANDLE handle, XIO_HANDLE xioHandle, MQTT_CLIENT_OPTIONS* mqttOptions);
+extern int mqtt_client_disconnect(MQTT_CLIENT_HANDLE handle);
+
+extern int mqtt_client_subscribe(MQTT_CLIENT_HANDLE handle, uint16_t packetId, SUBSCRIBE_PAYLOAD* subscribeList, size_t count);
+extern int mqtt_client_unsubscribe(MQTT_CLIENT_HANDLE handle, uint16_t packetId, const char** unsubscribeList, size_t count);
+
+extern int mqtt_client_publish(MQTT_CLIENT_HANDLE handle, MQTT_MESSAGE_HANDLE msgHandle);
+
+extern void mqtt_client_dowork(MQTT_CLIENT_HANDLE handle);
+
+extern void mqtt_client_set_trace(MQTT_CLIENT_HANDLE handle, bool traceOn, bool rawBytesOn);
+
+#ifdef __cplusplus
+}
+#endif // __cplusplus
+
+#endif // MQTTCLIENT_H
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/azure_umqtt_c/mqtt_codec.h Fri Apr 08 12:01:23 2016 -0700
@@ -0,0 +1,44 @@
+// Copyright (c) Microsoft. All rights reserved.
+// Licensed under the MIT license. See LICENSE file in the project root for full license information.
+
+#ifndef MQTT_CODEC_H
+#define MQTT_CODEC_H
+
+#ifdef __cplusplus
+#include <cstdint>
+#include <cstdbool>
+extern "C" {
+#else
+#include <stdint.h>
+#include <stdbool.h>
+#endif // __cplusplus
+
+#include "azure_c_shared_utility/xio.h"
+#include "azure_c_shared_utility/buffer_.h"
+#include "azure_umqtt_c/mqttconst.h"
+
+typedef struct MQTTCODEC_INSTANCE_TAG* MQTTCODEC_HANDLE;
+
+typedef void(*ON_PACKET_COMPLETE_CALLBACK)(void* context, CONTROL_PACKET_TYPE packet, int flags, BUFFER_HANDLE headerData);
+
+extern MQTTCODEC_HANDLE mqtt_codec_create(ON_PACKET_COMPLETE_CALLBACK packetComplete, void* callbackCtx);
+extern void mqtt_codec_destroy(MQTTCODEC_HANDLE handle);
+
+extern BUFFER_HANDLE mqtt_codec_connect(const MQTT_CLIENT_OPTIONS* mqttOptions);
+extern BUFFER_HANDLE mqtt_codec_disconnect();
+extern BUFFER_HANDLE mqtt_codec_publish(QOS_VALUE qosValue, bool duplicateMsg, bool serverRetain, uint16_t packetId, const char* topicName, const uint8_t* msgBuffer, size_t buffLen);
+extern BUFFER_HANDLE mqtt_codec_publishAck(uint16_t packetId);
+extern BUFFER_HANDLE mqtt_codec_publishReceived(uint16_t packetId);
+extern BUFFER_HANDLE mqtt_codec_publishRelease(uint16_t packetId);
+extern BUFFER_HANDLE mqtt_codec_publishComplete(uint16_t packetId);
+extern BUFFER_HANDLE mqtt_codec_ping();
+extern BUFFER_HANDLE mqtt_codec_subscribe(uint16_t packetId, SUBSCRIBE_PAYLOAD* subscribeList, size_t count);
+extern BUFFER_HANDLE mqtt_codec_unsubscribe(uint16_t packetId, const char** unsubscribeList, size_t count);
+
+extern int mqtt_codec_bytesReceived(MQTTCODEC_HANDLE handle, const unsigned char* buffer, size_t size);
+
+#ifdef __cplusplus
+}
+#endif // __cplusplus
+
+#endif // MQTT_CODEC_H
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/azure_umqtt_c/mqtt_message.h Fri Apr 08 12:01:23 2016 -0700
@@ -0,0 +1,35 @@
+// Copyright (c) Microsoft. All rights reserved.
+// Licensed under the MIT license. See LICENSE file in the project root for full license information.
+
+#ifndef MQTT_MESSAGE_H
+#define MQTT_MESSAGE_H
+
+#ifdef __cplusplus
+#include <cstdint>
+extern "C" {
+#else
+#include <stdint.h>
+#endif // __cplusplus
+
+#include "azure_umqtt_c/mqttconst.h"
+
+typedef struct MQTT_MESSAGE_TAG* MQTT_MESSAGE_HANDLE;
+
+extern MQTT_MESSAGE_HANDLE mqttmessage_create(uint16_t packetId, const char* topicName, QOS_VALUE qosValue, const uint8_t* appMsg, size_t appMsgLength);
+extern void mqttmessage_destroy(MQTT_MESSAGE_HANDLE handle);
+extern MQTT_MESSAGE_HANDLE mqttmessage_clone(MQTT_MESSAGE_HANDLE handle);
+
+extern uint16_t mqttmessage_getPacketId(MQTT_MESSAGE_HANDLE handle);
+extern const char* mqttmessage_getTopicName(MQTT_MESSAGE_HANDLE handle);
+extern QOS_VALUE mqttmessage_getQosType(MQTT_MESSAGE_HANDLE handle);
+extern bool mqttmessage_getIsDuplicateMsg(MQTT_MESSAGE_HANDLE handle);
+extern bool mqttmessage_getIsRetained(MQTT_MESSAGE_HANDLE handle);
+extern int mqttmessage_setIsDuplicateMsg(MQTT_MESSAGE_HANDLE handle, bool duplicateMsg);
+extern int mqttmessage_setIsRetained(MQTT_MESSAGE_HANDLE handle, bool retainMsg);
+extern const APP_PAYLOAD* mqttmessage_getApplicationMsg(MQTT_MESSAGE_HANDLE handle);
+
+#ifdef __cplusplus
+}
+#endif // __cplusplus
+
+#endif // MQTT_MESSAGE_H
\ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/azure_umqtt_c/mqttconst.h Fri Apr 08 12:01:23 2016 -0700
@@ -0,0 +1,110 @@
+// Copyright (c) Microsoft. All rights reserved.
+// Licensed under the MIT license. See LICENSE file in the project root for full license information.
+
+#ifndef MQTTCONST_H
+#define MQTTCONST_H
+
+#ifdef __cplusplus
+#include <cstddef>
+#include <cstdint>
+extern "C" {
+#else
+#include <stddef.h>
+#include <stdint.h>
+#endif /* __cplusplus */
+
+#include "azure_c_shared_utility/crt_abstractions.h"
+
+typedef enum CONTROL_PACKET_TYPE_TAG
+{
+ CONNECT_TYPE = 0x10,
+ CONNACK_TYPE = 0x20,
+ PUBLISH_TYPE = 0x30,
+ PUBACK_TYPE = 0x40,
+ PUBREC_TYPE = 0x50,
+ PUBREL_TYPE = 0x60,
+ PUBCOMP_TYPE = 0x70,
+ SUBSCRIBE_TYPE = 0x80,
+ SUBACK_TYPE = 0x90,
+ UNSUBSCRIBE_TYPE = 0xA0,
+ UNSUBACK_TYPE = 0xB0,
+ PINGREQ_TYPE = 0xC0,
+ PINGRESP_TYPE = 0xD0,
+ DISCONNECT_TYPE = 0xE0,
+ PACKET_TYPE_ERROR,
+ UNKNOWN_TYPE
+} CONTROL_PACKET_TYPE;
+
+typedef enum QOS_VALUE_TAG
+{
+ DELIVER_AT_MOST_ONCE = 0x00,
+ DELIVER_AT_LEAST_ONCE = 0x01,
+ DELIVER_EXACTLY_ONCE = 0x02,
+ DELIVER_FAILURE = 0x80
+} QOS_VALUE;
+
+typedef struct APP_PAYLOAD_TAG
+{
+ uint8_t* message;
+ size_t length;
+} APP_PAYLOAD;
+
+typedef struct MQTT_CLIENT_OPTIONS_TAG
+{
+ char* clientId;
+ char* willTopic;
+ char* willMessage;
+ char* username;
+ char* password;
+ int keepAliveInterval;
+ bool messageRetain;
+ bool useCleanSession;
+ QOS_VALUE qualityOfServiceValue;
+ bool log_trace;
+} MQTT_CLIENT_OPTIONS;
+
+typedef enum CONNECT_RETURN_CODE_TAG
+{
+ CONNECTION_ACCEPTED = 0x00,
+ CONN_REFUSED_UNACCEPTABLE_VERSION = 0x01,
+ CONN_REFUSED_ID_REJECTED = 0x02,
+ CONN_REFUSED_SERVER_UNAVAIL = 0x03,
+ CONN_REFUSED_BAD_USERNAME_PASSWORD = 0x04,
+ CONN_REFUSED_NOT_AUTHORIZED = 0x05,
+ CONN_REFUSED_UNKNOWN
+} CONNECT_RETURN_CODE;
+
+typedef struct CONNECT_ACK_TAG
+{
+ bool isSessionPresent;
+ CONNECT_RETURN_CODE returnCode;
+} CONNECT_ACK;
+
+typedef struct SUBSCRIBE_PAYLOAD_TAG
+{
+ const char* subscribeTopic;
+ QOS_VALUE qosReturn;
+} SUBSCRIBE_PAYLOAD;
+
+typedef struct SUBSCRIBE_ACK_TAG
+{
+ uint16_t packetId;
+ QOS_VALUE* qosReturn;
+ size_t qosCount;
+} SUBSCRIBE_ACK;
+
+typedef struct UNSUBSCRIBE_ACK_TAG
+{
+ uint16_t packetId;
+} UNSUBSCRIBE_ACK;
+
+typedef struct PUBLISH_ACK_TAG
+{
+ uint16_t packetId;
+} PUBLISH_ACK;
+
+#ifdef __cplusplus
+}
+#endif /* __cplusplus */
+
+#endif // MQTTCONST_H
\ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/mqtt_client.c Fri Apr 08 12:01:23 2016 -0700
@@ -0,0 +1,795 @@
+// Copyright (c) Microsoft. All rights reserved.
+// Licensed under the MIT license. See LICENSE file in the project root for full license information.
+
+#include <stdlib.h>
+#include "azure_c_shared_utility/gballoc.h"
+#include "azure_c_shared_utility/platform.h"
+#include "azure_c_shared_utility/tickcounter.h"
+#include "azure_c_shared_utility/crt_abstractions.h"
+
+#include "azure_umqtt_c/mqtt_client.h"
+#include "azure_umqtt_c/mqtt_codec.h"
+
+#define KEEP_ALIVE_BUFFER_SEC 10
+#define VARIABLE_HEADER_OFFSET 2
+#define RETAIN_FLAG_MASK 0x1
+#define QOS_LEAST_ONCE_FLAG_MASK 0x2
+#define QOS_EXACTLY_ONCE_FLAG_MASK 0x4
+#define DUPLICATE_FLAG_MASK 0x8
+#define CONNECT_PACKET_MASK 0xf0
+
+static const char* FORMAT_HEX_CHAR = "0x%02x ";
+
+typedef struct MQTT_CLIENT_TAG
+{
+ XIO_HANDLE xioHandle;
+ MQTTCODEC_HANDLE codec_handle;
+ CONTROL_PACKET_TYPE packetState;
+ LOGGER_LOG logFunc;
+ TICK_COUNTER_HANDLE packetTickCntr;
+ uint64_t packetSendTimeMs;
+ ON_MQTT_OPERATION_CALLBACK fnOperationCallback;
+ ON_MQTT_MESSAGE_RECV_CALLBACK fnMessageRecv;
+ void* ctx;
+ QOS_VALUE qosValue;
+ uint16_t keepAliveInterval;
+ MQTT_CLIENT_OPTIONS mqttOptions;
+ bool clientConnected;
+ bool socketConnected;
+ bool logTrace;
+ bool rawBytesTrace;
+} MQTT_CLIENT;
+
+static uint16_t byteutil_read_uint16(uint8_t** buffer)
+{
+ uint16_t result = 0;
+ if (buffer != NULL)
+ {
+ result = 256 * ((uint8_t)(**buffer)) + (uint8_t)(*(*buffer + 1));
+ *buffer += 2; // Move the ptr
+ }
+ return result;
+}
+
+static char* byteutil_readUTF(uint8_t** buffer, size_t* byteLen)
+{
+ char* result = NULL;
+ if (buffer != NULL)
+ {
+ // Get the length of the string
+ int len = byteutil_read_uint16(buffer);
+ if (len > 0)
+ {
+ result = (char*)malloc(len + 1);
+ if (result != NULL)
+ {
+ (void)memcpy(result, *buffer, len);
+ result[len] = '\0';
+ *buffer += len;
+ if (byteLen != NULL)
+ {
+ *byteLen = len;
+ }
+ }
+ }
+ }
+ return result;
+}
+
+static uint8_t byteutil_readByte(uint8_t** buffer)
+{
+ uint8_t result = 0;
+ if (buffer != NULL)
+ {
+ result = **buffer;
+ (*buffer)++;
+ }
+ return result;
+}
+
+static void sendComplete(void* context, IO_SEND_RESULT send_result)
+{
+ MQTT_CLIENT* mqttData = (MQTT_CLIENT*)context;
+ if (mqttData != NULL && mqttData->fnOperationCallback != NULL)
+ {
+ if (mqttData->packetState == DISCONNECT_TYPE)
+ {
+ /*Codes_SRS_MQTT_CLIENT_07_032: [If the actionResult parameter is of type MQTT_CLIENT_ON_DISCONNECT or MQTT_CLIENT_ON_ERROR the the msgInfo value shall be NULL.]*/
+ mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_DISCONNECT, NULL, mqttData->ctx);
+
+ // close the xio
+ (void)xio_close(mqttData->xioHandle, NULL, mqttData->ctx);
+ mqttData->socketConnected = false;
+ mqttData->clientConnected = false;
+ }
+ }
+}
+
+static const char* retrievePacketType(CONTROL_PACKET_TYPE packet)
+{
+ switch (packet&CONNECT_PACKET_MASK)
+ {
+ case CONNECT_TYPE: return "CONNECT";
+ case CONNACK_TYPE: return "CONNACK";
+ case PUBLISH_TYPE: return "PUBLISH";
+ case PUBACK_TYPE: return "PUBACK";
+ case PUBREC_TYPE: return "PUBREC";
+ case PUBREL_TYPE: return "PUBREL";
+ case SUBSCRIBE_TYPE: return "SUBSCRIBE";
+ case SUBACK_TYPE: return "SUBACK";
+ case UNSUBSCRIBE_TYPE: return "UNSUBSCRIBE";
+ case UNSUBACK_TYPE: return "UNSUBACK";
+ case PINGREQ_TYPE: return "PINGREQ";
+ case PINGRESP_TYPE: return "PINGRESP";
+ case DISCONNECT_TYPE: return "DISCONNECT";
+ default:
+ case PACKET_TYPE_ERROR:
+ case UNKNOWN_TYPE:
+ return "UNKNOWN";
+ }
+}
+
+static void logOutgoingingMsgTrace(MQTT_CLIENT* clientData, const uint8_t* data, size_t length)
+{
+ if (clientData != NULL && data != NULL && length > 0 && clientData->logTrace)
+ {
+ LOG(clientData->logFunc, 0, "-> %s: ", retrievePacketType((unsigned char)data[0]));
+ for (size_t index = 0; index < length; index++)
+ {
+ LOG(clientData->logFunc, 0, (char*)FORMAT_HEX_CHAR, (unsigned char)data[index]);
+ }
+ LOG(clientData->logFunc, LOG_LINE, "");
+ }
+}
+
+static void logIncomingMsgTrace(MQTT_CLIENT* clientData, CONTROL_PACKET_TYPE packet, int flags, const uint8_t* data, size_t length)
+{
+ if (clientData != NULL && data != NULL && length > 0 && clientData->logTrace)
+ {
+ LOG(clientData->logFunc, 0, "<- %s: 0x%02x 0x%02x ", retrievePacketType((unsigned char)packet), (unsigned char)(packet | flags), length);
+ for (size_t index = 0; index < length; index++)
+ {
+ LOG(clientData->logFunc, 0, (char*)FORMAT_HEX_CHAR, (unsigned char)data[index]);
+ }
+ LOG(clientData->logFunc, LOG_LINE, "");
+ }
+}
+
+static int sendPacketItem(MQTT_CLIENT* clientData, const int8_t* data, size_t length)
+{
+ int result;
+ logOutgoingingMsgTrace(clientData, data, length);
+
+ if (tickcounter_get_current_ms(clientData->packetTickCntr, &clientData->packetSendTimeMs) != 0)
+ {
+ LOG(clientData->logFunc, LOG_LINE, "Failure getting current ms tickcounter");
+ result = __LINE__;
+ }
+ else
+ {
+ result = xio_send(clientData->xioHandle, data, length, sendComplete, clientData);
+ if (result != 0)
+ {
+ LOG(clientData->logFunc, LOG_LINE, "%d: Failure sending control packet data", result);
+ result = __LINE__;
+ }
+ }
+ return result;
+}
+
+static void onOpenComplete(void* context, IO_OPEN_RESULT open_result)
+{
+ MQTT_CLIENT* mqttData = (MQTT_CLIENT*)context;
+ if (mqttData != NULL)
+ {
+ if (open_result == IO_OPEN_OK && !mqttData->socketConnected)
+ {
+ mqttData->packetState = CONNECT_TYPE;
+ mqttData->socketConnected = true;
+ // Send the Connect packet
+ BUFFER_HANDLE connPacket = mqtt_codec_connect(&mqttData->mqttOptions);
+ if (connPacket == NULL)
+ {
+ /*Codes_SRS_MQTT_CLIENT_07_007: [If any failure is encountered then mqtt_client_connect shall return a non-zero value.]*/
+ LOG(mqttData->logFunc, LOG_LINE, "Error: mqtt_codec_connect failed");
+ }
+ else
+ {
+ /*Codes_SRS_MQTT_CLIENT_07_009: [On success mqtt_client_connect shall send the MQTT CONNECT to the endpoint.]*/
+ if (sendPacketItem(mqttData, BUFFER_u_char(connPacket), BUFFER_length(connPacket)) != 0)
+ {
+ /*Codes_SRS_MQTT_CLIENT_07_007: [If any failure is encountered then mqtt_client_connect shall return a non-zero value.]*/
+ LOG(mqttData->logFunc, LOG_LINE, "Error: mqtt_codec_connect failed");
+ }
+ BUFFER_delete(connPacket);
+ }
+ }
+ else if (open_result == IO_OPEN_ERROR)
+ {
+ (void)mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
+ }
+ }
+}
+
+static void onBytesReceived(void* context, const unsigned char* buffer, size_t size)
+{
+ MQTT_CLIENT* mqttData = (MQTT_CLIENT*)context;
+ if (mqttData != NULL)
+ {
+ if (mqtt_codec_bytesReceived(mqttData->codec_handle, buffer, size) != 0)
+ {
+ if (mqttData->fnOperationCallback)
+ {
+ mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
+ }
+ }
+ }
+}
+
+static void onIoError(void* context)
+{
+ MQTT_CLIENT* mqttData = (MQTT_CLIENT*)context;
+ if (mqttData != NULL && mqttData->fnOperationCallback)
+ {
+ /*Codes_SRS_MQTT_CLIENT_07_032: [If the actionResult parameter is of type MQTT_CLIENT_ON_DISCONNECT or MQTT_CLIENT_ON_ERROR the the msgInfo value shall be NULL.]*/
+ mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
+ mqttData->socketConnected = false;
+ }
+}
+
+static int cloneMqttOptions(MQTT_CLIENT* mqttData, const MQTT_CLIENT_OPTIONS* mqttOptions)
+{
+ int result = 0;
+ if (mqttOptions->clientId != NULL)
+ {
+ if (mallocAndStrcpy_s(&mqttData->mqttOptions.clientId, mqttOptions->clientId) != 0)
+ {
+ result = __LINE__;
+ }
+ }
+ if (result == 0 && mqttOptions->willTopic != NULL)
+ {
+ if (mallocAndStrcpy_s(&mqttData->mqttOptions.willTopic, mqttOptions->willTopic) != 0)
+ {
+ result = __LINE__;
+ }
+ }
+ if (result == 0 && mqttOptions->willMessage != NULL)
+ {
+ if (mallocAndStrcpy_s(&mqttData->mqttOptions.willMessage, mqttOptions->willMessage) != 0)
+ {
+ result = __LINE__;
+ }
+ }
+ if (result == 0 && mqttOptions->username != NULL)
+ {
+ if (mallocAndStrcpy_s(&mqttData->mqttOptions.username, mqttOptions->username) != 0)
+ {
+ result = __LINE__;
+ }
+ }
+ if (result == 0 && mqttOptions->password != NULL)
+ {
+ if (mallocAndStrcpy_s(&mqttData->mqttOptions.password, mqttOptions->password) != 0)
+ {
+ result = __LINE__;
+ }
+ }
+ if (result == 0)
+ {
+ mqttData->mqttOptions.keepAliveInterval = mqttOptions->keepAliveInterval;
+ mqttData->mqttOptions.messageRetain = mqttOptions->messageRetain;
+ mqttData->mqttOptions.useCleanSession = mqttOptions->useCleanSession;
+ mqttData->mqttOptions.qualityOfServiceValue = mqttOptions->qualityOfServiceValue;
+ }
+ else
+ {
+ free(mqttData->mqttOptions.clientId);
+ free(mqttData->mqttOptions.willTopic);
+ free(mqttData->mqttOptions.willMessage);
+ free(mqttData->mqttOptions.username);
+ free(mqttData->mqttOptions.password);
+ }
+ return result;
+}
+
+static void recvCompleteCallback(void* context, CONTROL_PACKET_TYPE packet, int flags, BUFFER_HANDLE headerData)
+{
+ MQTT_CLIENT* mqttData = (MQTT_CLIENT*)context;
+ if (mqttData != NULL && headerData != NULL)
+ {
+ size_t len = BUFFER_length(headerData);
+ uint8_t* iterator = BUFFER_u_char(headerData);
+
+ logIncomingMsgTrace(mqttData, packet, flags, iterator, len);
+
+ if (iterator != NULL && len > 0)
+ {
+ switch (packet)
+ {
+ case CONNACK_TYPE:
+ {
+ if (mqttData->fnOperationCallback != NULL)
+ {
+ /*Codes_SRS_MQTT_CLIENT_07_028: [If the actionResult parameter is of type CONNECT_ACK then the msgInfo value shall be a CONNECT_ACK structure.]*/
+ CONNECT_ACK connack = { 0 };
+ connack.isSessionPresent = (byteutil_readByte(&iterator) == 0x1) ? true : false;
+ connack.returnCode = byteutil_readByte(&iterator);
+
+ mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_CONNACK, (void*)&connack, mqttData->ctx);
+
+ if (connack.returnCode == CONNECTION_ACCEPTED)
+ {
+ mqttData->clientConnected = true;
+ }
+ }
+ break;
+ }
+ case PUBLISH_TYPE:
+ {
+ if (mqttData->fnMessageRecv != NULL)
+ {
+ //uint8_t ctrlPacket = byteutil_readByte(&iterator);
+ bool isDuplicateMsg = (flags & DUPLICATE_FLAG_MASK) ? true : false;
+ bool isRetainMsg = (flags & RETAIN_FLAG_MASK) ? true : false;
+ QOS_VALUE qosValue = (flags == 0) ? DELIVER_AT_MOST_ONCE : (flags & QOS_LEAST_ONCE_FLAG_MASK) ? DELIVER_AT_LEAST_ONCE : DELIVER_EXACTLY_ONCE;
+
+ uint8_t* initialPos = iterator;
+ char* topicName = byteutil_readUTF(&iterator, NULL);
+ uint16_t packetId = 0;
+ if (qosValue != DELIVER_AT_MOST_ONCE)
+ {
+ packetId = byteutil_read_uint16(&iterator);
+ }
+ size_t length = len - (iterator - initialPos);
+
+ MQTT_MESSAGE_HANDLE msgHandle = mqttmessage_create(packetId, topicName, qosValue, iterator, length);
+ if (msgHandle == NULL)
+ {
+ LOG(mqttData->logFunc, LOG_LINE, "failure in mqttmessage_create");
+ }
+ else
+ {
+ (void)mqttmessage_setIsDuplicateMsg(msgHandle, isDuplicateMsg);
+ (void)mqttmessage_setIsRetained(msgHandle, isRetainMsg);
+ mqttData->fnMessageRecv(msgHandle, mqttData->ctx);
+
+ BUFFER_HANDLE pubRel = NULL;
+ if (qosValue == DELIVER_EXACTLY_ONCE)
+ {
+ pubRel = mqtt_codec_publishReceived(packetId);
+ }
+ else if (qosValue == DELIVER_AT_LEAST_ONCE)
+ {
+ pubRel = mqtt_codec_publishAck(packetId);
+ }
+ if (pubRel != NULL)
+ {
+ (void)sendPacketItem(mqttData, BUFFER_u_char(pubRel), BUFFER_length(pubRel));
+ BUFFER_delete(pubRel);
+ }
+ free(topicName);
+ mqttmessage_destroy(msgHandle);
+ }
+ }
+ break;
+ }
+ case PUBACK_TYPE:
+ case PUBREC_TYPE:
+ case PUBREL_TYPE:
+ case PUBCOMP_TYPE:
+ {
+ if (mqttData->fnOperationCallback)
+ {
+ /*Codes_SRS_MQTT_CLIENT_07_029: [If the actionResult parameter are of types PUBACK_TYPE, PUBREC_TYPE, PUBREL_TYPE or PUBCOMP_TYPE then the msgInfo value shall be a PUBLISH_ACK structure.]*/
+ MQTT_CLIENT_EVENT_RESULT action = (packet == PUBACK_TYPE) ? MQTT_CLIENT_ON_PUBLISH_ACK :
+ (packet == PUBREC_TYPE) ? MQTT_CLIENT_ON_PUBLISH_RECV :
+ (packet == PUBREL_TYPE) ? MQTT_CLIENT_ON_PUBLISH_REL : MQTT_CLIENT_ON_PUBLISH_COMP;
+
+ PUBLISH_ACK publish_ack = { 0 };
+ publish_ack.packetId = byteutil_read_uint16(&iterator);
+
+ BUFFER_HANDLE pubRel = NULL;
+ mqttData->fnOperationCallback(mqttData, action, (void*)&publish_ack, mqttData->ctx);
+ if (packet == PUBREC_TYPE)
+ {
+ pubRel = mqtt_codec_publishRelease(publish_ack.packetId);
+ }
+ else if (packet == PUBREL_TYPE)
+ {
+ pubRel = mqtt_codec_publishComplete(publish_ack.packetId);
+ }
+ if (pubRel != NULL)
+ {
+ (void)sendPacketItem(mqttData, BUFFER_u_char(pubRel), BUFFER_length(pubRel));
+ BUFFER_delete(pubRel);
+ }
+ }
+ break;
+ }
+ case SUBACK_TYPE:
+ {
+ if (mqttData->fnOperationCallback)
+ {
+ /*Codes_SRS_MQTT_CLIENT_07_030: [If the actionResult parameter is of type SUBACK_TYPE then the msgInfo value shall be a SUBSCRIBE_ACK structure.]*/
+ SUBSCRIBE_ACK suback = { 0 };
+
+ size_t remainLen = len;
+ suback.packetId = byteutil_read_uint16(&iterator);
+ remainLen -= 2;
+
+ // Allocate the remaining len
+ suback.qosReturn = (QOS_VALUE*)malloc(sizeof(QOS_VALUE)*remainLen);
+ if (suback.qosReturn != NULL)
+ {
+ while (remainLen > 0)
+ {
+ suback.qosReturn[suback.qosCount++] = byteutil_readByte(&iterator);
+ remainLen--;
+ }
+ (void)mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_SUBSCRIBE_ACK, (void*)&suback, mqttData->ctx);
+ free(suback.qosReturn);
+ }
+ }
+ break;
+ }
+ case UNSUBACK_TYPE:
+ {
+ if (mqttData->fnOperationCallback)
+ {
+ /*Codes_SRS_MQTT_CLIENT_07_031: [If the actionResult parameter is of type UNSUBACK_TYPE then the msgInfo value shall be a UNSUBSCRIBE_ACK structure.]*/
+ UNSUBSCRIBE_ACK unsuback = { 0 };
+ iterator += VARIABLE_HEADER_OFFSET;
+ unsuback.packetId = byteutil_read_uint16(&iterator);
+
+ (void)mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_UNSUBSCRIBE_ACK, (void*)&unsuback, mqttData->ctx);
+ }
+ break;
+ }
+ case PINGRESP_TYPE:
+ // Ping responses do not get forwarded
+ break;
+ default:
+ break;
+ }
+ }
+ }
+}
+
+MQTT_CLIENT_HANDLE mqtt_client_init(ON_MQTT_MESSAGE_RECV_CALLBACK msgRecv, ON_MQTT_OPERATION_CALLBACK opCallback, void* callbackCtx, LOGGER_LOG logger)
+{
+ MQTT_CLIENT* result;
+ /*Codes_SRS_MQTT_CLIENT_07_001: [If the parameters ON_MQTT_MESSAGE_RECV_CALLBACK is NULL then mqttclient_init shall return NULL.]*/
+ if (msgRecv == NULL)
+ {
+ result = NULL;
+ }
+ else
+ {
+ result = malloc(sizeof(MQTT_CLIENT));
+ if (result == NULL)
+ {
+ /*Codes_SRS_MQTT_CLIENT_07_002: [If any failure is encountered then mqttclient_init shall return NULL.]*/
+ LOG(logger, LOG_LINE, "mqtt_client_inti failure: Allocation Failure");
+ }
+ else
+ {
+ /*Codes_SRS_MQTT_CLIENT_07_003: [mqttclient_init shall allocate MQTTCLIENT_DATA_INSTANCE and return the MQTTCLIENT_HANDLE on success.]*/
+ result->xioHandle = NULL;
+ result->packetState = UNKNOWN_TYPE;
+ result->logFunc = logger;
+ result->packetSendTimeMs = 0;
+ result->fnOperationCallback = opCallback;
+ result->fnMessageRecv = msgRecv;
+ result->ctx = callbackCtx;
+ result->qosValue = DELIVER_AT_MOST_ONCE;
+ result->keepAliveInterval = 0;
+ result->packetTickCntr = tickcounter_create();
+ result->mqttOptions.clientId = NULL;
+ result->mqttOptions.willTopic = NULL;
+ result->mqttOptions.willMessage = NULL;
+ result->mqttOptions.username = NULL;
+ result->mqttOptions.password = NULL;
+ result->socketConnected = false;
+ result->clientConnected = false;
+ result->logTrace = false;
+ result->rawBytesTrace = false;
+ if (result->packetTickCntr == NULL)
+ {
+ /*Codes_SRS_MQTT_CLIENT_07_002: [If any failure is encountered then mqttclient_init shall return NULL.]*/
+ LOG(logger, LOG_LINE, "mqtt_client_init failure: tickcounter_create failure");
+ free(result);
+ result = NULL;
+ }
+ else
+ {
+ result->codec_handle = mqtt_codec_create(recvCompleteCallback, result);
+ if (result->codec_handle == NULL)
+ {
+ /*Codes_SRS_MQTT_CLIENT_07_002: [If any failure is encountered then mqttclient_init shall return NULL.]*/
+ LOG(logger, LOG_LINE, "mqtt_client_init failure: mqtt_codec_create failure");
+ tickcounter_destroy(result->packetTickCntr);
+ free(result);
+ result = NULL;
+ }
+ }
+ }
+ }
+ return result;
+}
+
+void mqtt_client_deinit(MQTT_CLIENT_HANDLE handle)
+{
+ /*Codes_SRS_MQTT_CLIENT_07_004: [If the parameter handle is NULL then function mqtt_client_deinit shall do nothing.]*/
+ if (handle != NULL)
+ {
+ /*Codes_SRS_MQTT_CLIENT_07_005: [mqtt_client_deinit shall deallocate all memory allocated in this unit.]*/
+ MQTT_CLIENT* mqttData = (MQTT_CLIENT*)handle;
+ tickcounter_destroy(mqttData->packetTickCntr);
+ mqtt_codec_destroy(mqttData->codec_handle);
+ free(mqttData->mqttOptions.clientId);
+ free(mqttData->mqttOptions.willTopic);
+ free(mqttData->mqttOptions.willMessage);
+ free(mqttData->mqttOptions.username);
+ free(mqttData->mqttOptions.password);
+ free(mqttData);
+ }
+}
+
+int mqtt_client_connect(MQTT_CLIENT_HANDLE handle, XIO_HANDLE xioHandle, MQTT_CLIENT_OPTIONS* mqttOptions)
+{
+ int result;
+ /*SRS_MQTT_CLIENT_07_006: [If any of the parameters handle, ioHandle, or mqttOptions are NULL then mqtt_client_connect shall return a non-zero value.]*/
+ if (handle == NULL || mqttOptions == NULL)
+ {
+ result = __LINE__;
+ }
+ else
+ {
+ MQTT_CLIENT* mqttData = (MQTT_CLIENT*)handle;
+ if (xioHandle == NULL)
+ {
+ /*Codes_SRS_MQTT_CLIENT_07_007: [If any failure is encountered then mqtt_client_connect shall return a non-zero value.]*/
+ LOG(mqttData->logFunc, LOG_LINE, "Error: mqttcodec_connect failed");
+ result = __LINE__;
+ }
+ else
+ {
+ mqttData->xioHandle = xioHandle;
+ mqttData->packetState = UNKNOWN_TYPE;
+ mqttData->qosValue = mqttOptions->qualityOfServiceValue;
+ mqttData->keepAliveInterval = mqttOptions->keepAliveInterval;
+ if (cloneMqttOptions(mqttData, mqttOptions) != 0)
+ {
+ LOG(mqttData->logFunc, LOG_LINE, "Error: Clone Mqtt Options failed");
+ result = __LINE__;
+ }
+ /*Codes_SRS_MQTT_CLIENT_07_008: [mqtt_client_connect shall open the XIO_HANDLE by calling into the xio_open interface.]*/
+ else if (xio_open(xioHandle, onOpenComplete, mqttData, onBytesReceived, mqttData, onIoError, mqttData) != 0)
+ {
+ /*Codes_SRS_MQTT_CLIENT_07_007: [If any failure is encountered then mqtt_client_connect shall return a non-zero value.]*/
+ LOG(mqttData->logFunc, LOG_LINE, "Error: io_open failed");
+ result = __LINE__;
+ }
+ else
+ {
+ result = 0;
+ }
+ }
+ }
+ return result;
+}
+
+int mqtt_client_publish(MQTT_CLIENT_HANDLE handle, MQTT_MESSAGE_HANDLE msgHandle)
+{
+ int result;
+ MQTT_CLIENT* mqttData = (MQTT_CLIENT*)handle;
+ if (mqttData == NULL || msgHandle == NULL)
+ {
+ /*Codes_SRS_MQTT_CLIENT_07_019: [If one of the parameters handle or msgHandle is NULL then mqtt_client_publish shall return a non-zero value.]*/
+ result = __LINE__;
+ }
+ else
+ {
+ /*Codes_SRS_MQTT_CLIENT_07_021: [mqtt_client_publish shall get the message information from the MQTT_MESSAGE_HANDLE.]*/
+ const APP_PAYLOAD* payload = mqttmessage_getApplicationMsg(msgHandle);
+ if (payload == NULL)
+ {
+ /*Codes_SRS_MQTT_CLIENT_07_020: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/
+ LOG(mqttData->logFunc, LOG_LINE, "Error: mqttmessage_getApplicationMsg failed");
+ result = __LINE__;
+ }
+ else
+ {
+ BUFFER_HANDLE publishPacket = mqtt_codec_publish(mqttmessage_getQosType(msgHandle), mqttmessage_getIsDuplicateMsg(msgHandle),
+ mqttmessage_getIsRetained(msgHandle), mqttmessage_getPacketId(msgHandle), mqttmessage_getTopicName(msgHandle), payload->message, payload->length);
+ if (publishPacket == NULL)
+ {
+ /*Codes_SRS_MQTT_CLIENT_07_020: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/
+ LOG(mqttData->logFunc, LOG_LINE, "Error: mqtt_codec_publish failed");
+ result = __LINE__;
+ }
+ else
+ {
+ mqttData->packetState = PUBLISH_TYPE;
+
+ /*Codes_SRS_MQTT_CLIENT_07_022: [On success mqtt_client_publish shall send the MQTT SUBCRIBE packet to the endpoint.]*/
+ if (sendPacketItem(mqttData, BUFFER_u_char(publishPacket), BUFFER_length(publishPacket)) != 0)
+ {
+ /*Codes_SRS_MQTT_CLIENT_07_020: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/
+ LOG(mqttData->logFunc, LOG_LINE, "Error: mqtt_client_publish send failed");
+ result = __LINE__;
+ }
+ else
+ {
+ result = 0;
+ }
+ BUFFER_delete(publishPacket);
+ }
+ }
+ }
+ return result;
+}
+
+int mqtt_client_subscribe(MQTT_CLIENT_HANDLE handle, uint16_t packetId, SUBSCRIBE_PAYLOAD* subscribeList, size_t count)
+{
+ int result;
+ MQTT_CLIENT* mqttData = (MQTT_CLIENT*)handle;
+ if (mqttData == NULL || subscribeList == NULL || count == 0)
+ {
+ /*Codes_SRS_MQTT_CLIENT_07_013: [If any of the parameters handle, subscribeList is NULL or count is 0 then mqtt_client_subscribe shall return a non-zero value.]*/
+ result = __LINE__;
+ }
+ else
+ {
+ BUFFER_HANDLE subPacket = mqtt_codec_subscribe(packetId, subscribeList, count);
+ if (subPacket == NULL)
+ {
+ /*Codes_SRS_MQTT_CLIENT_07_014: [If any failure is encountered then mqtt_client_subscribe shall return a non-zero value.]*/
+ LOG(mqttData->logFunc, LOG_LINE, "Error: mqtt_codec_subscribe failed");
+ result = __LINE__;
+ }
+ else
+ {
+ mqttData->packetState = SUBSCRIBE_TYPE;
+
+ /*Codes_SRS_MQTT_CLIENT_07_015: [On success mqtt_client_subscribe shall send the MQTT SUBCRIBE packet to the endpoint.]*/
+ if (sendPacketItem(mqttData, BUFFER_u_char(subPacket), BUFFER_length(subPacket)) != 0)
+ {
+ /*Codes_SRS_MQTT_CLIENT_07_014: [If any failure is encountered then mqtt_client_subscribe shall return a non-zero value.]*/
+ LOG(mqttData->logFunc, LOG_LINE, "Error: mqtt_client_subscribe send failed");
+ result = __LINE__;
+ }
+ else
+ {
+ result = 0;
+ }
+ BUFFER_delete(subPacket);
+ }
+ }
+ return result;
+}
+
+int mqtt_client_unsubscribe(MQTT_CLIENT_HANDLE handle, uint16_t packetId, const char** unsubscribeList, size_t count)
+{
+ int result;
+ MQTT_CLIENT* mqttData = (MQTT_CLIENT*)handle;
+ if (mqttData == NULL || unsubscribeList == NULL || count == 0)
+ {
+ /*Codes_SRS_MQTT_CLIENT_07_016: [If any of the parameters handle, unsubscribeList is NULL or count is 0 then mqtt_client_unsubscribe shall return a non-zero value.]*/
+ result = __LINE__;
+ }
+ else
+ {
+ BUFFER_HANDLE unsubPacket = mqtt_codec_unsubscribe(packetId, unsubscribeList, count);
+ if (unsubPacket == NULL)
+ {
+ /*Codes_SRS_MQTT_CLIENT_07_017: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/
+ LOG(mqttData->logFunc, LOG_LINE, "Error: mqtt_codec_unsubscribe failed");
+ result = __LINE__;
+ }
+ else
+ {
+ mqttData->packetState = UNSUBSCRIBE_TYPE;
+
+ /*Codes_SRS_MQTT_CLIENT_07_018: [On success mqtt_client_unsubscribe shall send the MQTT SUBCRIBE packet to the endpoint.]*/
+ LOG(mqttData->logFunc, LOG_LINE, "MQTT unsubscribe");
+ if (sendPacketItem(mqttData, BUFFER_u_char(unsubPacket), BUFFER_length(unsubPacket)) != 0)
+ {
+ /*Codes_SRS_MQTT_CLIENT_07_017: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.].]*/
+ LOG(mqttData->logFunc, LOG_LINE, "Error: mqtt_client_unsubscribe send failed");
+ result = __LINE__;
+ }
+ else
+ {
+ result = 0;
+ }
+ BUFFER_delete(unsubPacket);
+ }
+ }
+ return result;
+}
+
+int mqtt_client_disconnect(MQTT_CLIENT_HANDLE handle)
+{
+ int result;
+ MQTT_CLIENT* mqttData = (MQTT_CLIENT*)handle;
+ if (mqttData == NULL)
+ {
+ /*Codes_SRS_MQTT_CLIENT_07_010: [If the parameters handle is NULL then mqtt_client_disconnect shall return a non-zero value.]*/
+ result = __LINE__;
+ }
+ else
+ {
+ mqttData->packetState = DISCONNECT_TYPE;
+ BUFFER_HANDLE disconnectPacket = mqtt_codec_disconnect();
+ if (disconnectPacket == NULL)
+ {
+ /*Codes_SRS_MQTT_CLIENT_07_011: [If any failure is encountered then mqtt_client_disconnect shall return a non-zero value.]*/
+ LOG(mqttData->logFunc, LOG_LINE, "Error: mqtt_client_disconnect failed");
+ result = __LINE__;
+ }
+ else
+ {
+ mqttData->packetState = DISCONNECT_TYPE;
+
+ /*Codes_SRS_MQTT_CLIENT_07_012: [On success mqtt_client_disconnect shall send the MQTT DISCONNECT packet to the endpoint.]*/
+ if (sendPacketItem(mqttData, BUFFER_u_char(disconnectPacket), BUFFER_length(disconnectPacket)) != 0)
+ {
+ /*Codes_SRS_MQTT_CLIENT_07_011: [If any failure is encountered then mqtt_client_disconnect shall return a non-zero value.]*/
+ LOG(mqttData->logFunc, LOG_LINE, "Error: mqtt_client_disconnect send failed");
+ result = __LINE__;
+ }
+ else
+ {
+ result = 0;
+ }
+ BUFFER_delete(disconnectPacket);
+ }
+ }
+ return result;
+}
+
+void mqtt_client_dowork(MQTT_CLIENT_HANDLE handle)
+{
+ MQTT_CLIENT* mqttData = (MQTT_CLIENT*)handle;
+ /*Codes_SRS_MQTT_CLIENT_07_023: [If the parameter handle is NULL then mqtt_client_dowork shall do nothing.]*/
+ if (mqttData != NULL)
+ {
+ /*Codes_SRS_MQTT_CLIENT_07_024: [mqtt_client_dowork shall call the xio_dowork function to complete operations.]*/
+ xio_dowork(mqttData->xioHandle);
+
+ /*Codes_SRS_MQTT_CLIENT_07_025: [mqtt_client_dowork shall retrieve the the last packet send value and ...]*/
+ if (mqttData->socketConnected && mqttData->clientConnected && mqttData->keepAliveInterval > 0)
+ {
+ uint64_t current_ms;
+ if (tickcounter_get_current_ms(mqttData->packetTickCntr, ¤t_ms) != 0)
+ {
+ LOG(mqttData->logFunc, LOG_LINE, "Error: tickcounter_get_current_ms failed");
+ }
+ else
+ {
+ if ((((current_ms - mqttData->packetSendTimeMs) / 1000) + KEEP_ALIVE_BUFFER_SEC) > mqttData->keepAliveInterval)
+ {
+ /*Codes_SRS_MQTT_CLIENT_07_026: [if keepAliveInternal is > 0 and the send time is greater than the MQTT KeepAliveInterval then it shall construct an MQTT PINGREQ packet.]*/
+ BUFFER_HANDLE pingPacket = mqtt_codec_ping();
+ if (pingPacket != NULL)
+ {
+ (void)sendPacketItem(mqttData, BUFFER_u_char(pingPacket), BUFFER_length(pingPacket));
+ BUFFER_delete(pingPacket);
+ }
+ }
+ }
+ }
+ }
+}
+
+void mqtt_client_set_trace(MQTT_CLIENT_HANDLE handle, bool traceOn, bool rawBytesOn)
+{
+ MQTT_CLIENT* mqttData = (MQTT_CLIENT*)handle;
+ if (mqttData != NULL)
+ {
+ mqttData->logTrace = traceOn;
+ mqttData->rawBytesTrace = rawBytesOn;
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/mqtt_codec.c Fri Apr 08 12:01:23 2016 -0700
@@ -0,0 +1,956 @@
+// Copyright (c) Microsoft. All rights reserved.
+// Licensed under the MIT license. See LICENSE file in the project root for full license information.
+
+#include <stdlib.h>
+#include <limits.h>
+#include "azure_c_shared_utility/gballoc.h"
+#include "azure_c_shared_utility/buffer_.h"
+#include "azure_c_shared_utility/macro_utils.h"
+#include "azure_c_shared_utility/xlogging.h"
+#include "azure_umqtt_c/mqtt_codec.h"
+
+#define PAYLOAD_OFFSET 5
+#define PACKET_TYPE_BYTE(p) ((uint8_t)(((uint8_t)(p)) & 0xf0))
+#define FLAG_VALUE_BYTE(p) ((uint8_t)(((uint8_t)(p)) & 0xf))
+
+#define USERNAME_FLAG 0x80
+#define PASSWORD_FLAG 0x40
+#define WILL_RETAIN_FLAG 0x20
+#define WILL_QOS_FLAG_ 0x18
+#define WILL_FLAG_FLAG 0x04
+#define CLEAN_SESSION_FLAG 0x02
+
+#define NEXT_128_CHUNK 0x80
+#define PUBLISH_DUP_FLAG 0x8
+#define PUBLISH_QOS_EXACTLY_ONCE 0x4
+#define PUBLISH_QOS_AT_LEAST_ONCE 0x2
+#define PUBLISH_QOS_RETAIN 0x1
+
+#define PROTOCOL_NUMBER 4
+#define CONN_FLAG_BYTE_OFFSET 7
+
+#define CONNECT_FIXED_HEADER_SIZE 2
+#define CONNECT_VARIABLE_HEADER_SIZE 10
+#define SUBSCRIBE_FIXED_HEADER_FLAG 0x2
+#define UNSUBSCRIBE_FIXED_HEADER_FLAG 0x2
+
+#define MAX_SEND_SIZE 0xFFFFFF7F
+
+#define CODEC_STATE_VALUES \
+ CODEC_STATE_FIXED_HEADER, \
+ CODEC_STATE_VAR_HEADER, \
+ CODEC_STATE_PAYLOAD
+
+DEFINE_ENUM(CODEC_STATE_RESULT, CODEC_STATE_VALUES);
+
+typedef struct MQTTCODEC_INSTANCE_TAG
+{
+ CONTROL_PACKET_TYPE currPacket;
+ CODEC_STATE_RESULT codecState;
+ size_t bufferOffset;
+ int headerFlags;
+ BUFFER_HANDLE headerData;
+ ON_PACKET_COMPLETE_CALLBACK packetComplete;
+ void* callContext;
+ uint8_t storeRemainLen[4];
+ size_t remainLenIndex;
+} MQTTCODEC_INSTANCE;
+
+typedef struct PUBLISH_HEADER_INFO_TAG
+{
+ const char* topicName;
+ uint16_t packetId;
+ const char* msgBuffer;
+ QOS_VALUE qualityOfServiceValue;
+} PUBLISH_HEADER_INFO;
+
+void byteutil_writeByte(uint8_t** buffer, uint8_t value)
+{
+ if (buffer != NULL)
+ {
+ **buffer = value;
+ (*buffer)++;
+ }
+}
+
+void byteutil_writeInt(uint8_t** buffer, uint16_t value)
+{
+ if (buffer != NULL)
+ {
+ **buffer = (char)(value / 256);
+ (*buffer)++;
+ **buffer = (char)(value % 256);
+ (*buffer)++;
+ }
+}
+
+void byteutil_writeUTF(uint8_t** buffer, const char* stringData, uint16_t len)
+{
+ if (buffer != NULL)
+ {
+ byteutil_writeInt(buffer, len);
+ (void)memcpy(*buffer, stringData, len);
+ *buffer += len;
+ }
+}
+
+CONTROL_PACKET_TYPE processControlPacketType(uint8_t pktByte, int* flags)
+{
+ CONTROL_PACKET_TYPE result;
+ result = PACKET_TYPE_BYTE(pktByte);
+ if (flags != NULL)
+ {
+ *flags = FLAG_VALUE_BYTE(pktByte);
+ }
+ return result;
+}
+
+static int addListItemsToUnsubscribePacket(BUFFER_HANDLE ctrlPacket, const char** payloadList, size_t payloadCount)
+{
+ int result = 0;
+ if (payloadList == NULL || ctrlPacket == NULL)
+ {
+ result = __LINE__;
+ }
+ else
+ {
+ for (size_t index = 0; index < payloadCount && result == 0; index++)
+ {
+ // Add the Payload
+ size_t offsetLen = BUFFER_length(ctrlPacket);
+ size_t topicLen = strlen(payloadList[index]);
+ if (topicLen > USHRT_MAX)
+ {
+ result = __LINE__;
+ }
+ else if (BUFFER_enlarge(ctrlPacket, topicLen + 2) != 0)
+ {
+ result = __LINE__;
+ }
+ else
+ {
+ uint8_t* iterator = BUFFER_u_char(ctrlPacket);
+ iterator += offsetLen;
+ byteutil_writeUTF(&iterator, payloadList[index], (uint16_t)topicLen);
+ }
+ }
+ }
+ return result;
+}
+
+static int addListItemsToSubscribePacket(BUFFER_HANDLE ctrlPacket, SUBSCRIBE_PAYLOAD* payloadList, size_t payloadCount)
+{
+ int result = 0;
+ if (payloadList == NULL || ctrlPacket == NULL)
+ {
+ result = __LINE__;
+ }
+ else
+ {
+ for (size_t index = 0; index < payloadCount && result == 0; index++)
+ {
+ // Add the Payload
+ size_t offsetLen = BUFFER_length(ctrlPacket);
+ size_t topicLen = strlen(payloadList[index].subscribeTopic);
+ if (topicLen > USHRT_MAX)
+ {
+ result = __LINE__;
+ }
+ else if (BUFFER_enlarge(ctrlPacket, topicLen + 2 + 1) != 0)
+ {
+ result = __LINE__;
+ }
+ else
+ {
+ uint8_t* iterator = BUFFER_u_char(ctrlPacket);
+ iterator += offsetLen;
+ byteutil_writeUTF(&iterator, payloadList[index].subscribeTopic, (uint16_t)topicLen);
+ *iterator = payloadList[index].qosReturn;
+ }
+ }
+ }
+ return result;
+}
+
+static int constructConnectVariableHeader(BUFFER_HANDLE ctrlPacket, const MQTT_CLIENT_OPTIONS* mqttOptions)
+{
+ int result = 0;
+ if (BUFFER_enlarge(ctrlPacket, CONNECT_VARIABLE_HEADER_SIZE) != 0)
+ {
+ result = __LINE__;
+ }
+ else
+ {
+ uint8_t* iterator = BUFFER_u_char(ctrlPacket);
+ if (iterator == NULL)
+ {
+ result = __LINE__;
+ }
+ else
+ {
+ byteutil_writeUTF(&iterator, "MQTT", 4);
+ byteutil_writeByte(&iterator, PROTOCOL_NUMBER);
+ byteutil_writeByte(&iterator, 0); // Flags will be entered later
+ byteutil_writeInt(&iterator, mqttOptions->keepAliveInterval);
+ result = 0;
+ }
+ }
+ return result;
+}
+
+static int constructPublishVariableHeader(BUFFER_HANDLE ctrlPacket, const PUBLISH_HEADER_INFO* publishHeader)
+{
+ int result = 0;
+ size_t topicLen = 0;
+ size_t spaceLen = 0;
+ size_t idLen = 0;
+
+ size_t currLen = BUFFER_length(ctrlPacket);
+
+ topicLen = strlen(publishHeader->topicName);
+ spaceLen += 2;
+
+ if (publishHeader->qualityOfServiceValue != DELIVER_AT_MOST_ONCE)
+ {
+ // Packet Id is only set if the QOS is not 0
+ idLen = 2;
+ }
+
+ if (topicLen > USHRT_MAX)
+ {
+ result = __LINE__;
+ }
+ else if (BUFFER_enlarge(ctrlPacket, topicLen + idLen + spaceLen) != 0)
+ {
+ result = __LINE__;
+ }
+ else
+ {
+ uint8_t* iterator = BUFFER_u_char(ctrlPacket);
+ if (iterator == NULL)
+ {
+ result = __LINE__;
+ }
+ else
+ {
+ iterator += currLen;
+ /* The Topic Name MUST be present as the first field in the PUBLISH Packet Variable header.It MUST be 792 a UTF-8 encoded string [MQTT-3.3.2-1] as defined in section 1.5.3.*/
+ byteutil_writeUTF(&iterator, publishHeader->topicName, (uint16_t)topicLen);
+ if (idLen > 0)
+ {
+ byteutil_writeInt(&iterator, publishHeader->packetId);
+ }
+ result = 0;
+ }
+ }
+ return result;
+}
+
+static int constructSubscibeTypeVariableHeader(BUFFER_HANDLE ctrlPacket, uint16_t packetId)
+{
+ int result = 0;
+ if (BUFFER_enlarge(ctrlPacket, 2) != 0)
+ {
+ result = __LINE__;
+ }
+ else
+ {
+ uint8_t* iterator = BUFFER_u_char(ctrlPacket);
+ if (iterator == NULL)
+ {
+ result = __LINE__;
+ }
+ else
+ {
+ byteutil_writeInt(&iterator, packetId);
+ result = 0;
+ }
+ }
+ return result;
+}
+
+static BUFFER_HANDLE constructPublishReply(CONTROL_PACKET_TYPE type, int flags, uint16_t packetId)
+{
+ BUFFER_HANDLE result = BUFFER_new();
+ if (result != NULL)
+ {
+ if (BUFFER_pre_build(result, 4) != 0)
+ {
+ BUFFER_delete(result);
+ result = NULL;
+ }
+ else
+ {
+ uint8_t* iterator = BUFFER_u_char(result);
+ if (iterator == NULL)
+ {
+ BUFFER_delete(result);
+ result = NULL;
+ }
+ else
+ {
+ *iterator = type | flags;
+ iterator++;
+ *iterator = 0x2;
+ iterator++;
+ byteutil_writeInt(&iterator, packetId);
+ }
+ }
+ }
+ return result;
+}
+
+static int constructFixedHeader(BUFFER_HANDLE ctrlPacket, CONTROL_PACKET_TYPE packetType, uint8_t flags)
+{
+ int result;
+ if (ctrlPacket == NULL)
+ {
+ return __LINE__;
+ }
+ else
+ {
+ size_t packetLen = BUFFER_length(ctrlPacket);
+ uint8_t remainSize[4] = { 0 };
+ size_t index = 0;
+
+ // Calculate the length of packet
+ do
+ {
+ uint8_t encode = packetLen % 128;
+ packetLen /= 128;
+ // if there are more data to encode, set the top bit of this byte
+ if (packetLen > 0)
+ {
+ encode |= NEXT_128_CHUNK;
+ }
+ remainSize[index++] = encode;
+ } while (packetLen > 0);
+
+ BUFFER_HANDLE fixedHeader = BUFFER_new();
+ if (fixedHeader == NULL)
+ {
+ result = __LINE__;
+ }
+ else if (BUFFER_pre_build(fixedHeader, index + 1) != 0)
+ {
+ BUFFER_delete(fixedHeader);
+ result = __LINE__;
+ }
+ else
+ {
+ uint8_t* iterator = BUFFER_u_char(fixedHeader);
+ *iterator = packetType | flags;
+ iterator++;
+ (void)memcpy(iterator, remainSize, index);
+
+ result = BUFFER_prepend(ctrlPacket, fixedHeader);
+ BUFFER_delete(fixedHeader);
+ }
+ }
+ return result;
+}
+
+static int constructConnPayload(BUFFER_HANDLE ctrlPacket, const MQTT_CLIENT_OPTIONS* mqttOptions)
+{
+ int result = 0;
+ if (mqttOptions == NULL || ctrlPacket == NULL)
+ {
+ result = __LINE__;
+ }
+ else
+ {
+ size_t clientLen = 0;
+ size_t usernameLen = 0;
+ size_t passwordLen = 0;
+ size_t willMessageLen = 0;
+ size_t willTopicLen = 0;
+ size_t spaceLen = 0;
+
+ if (mqttOptions->clientId != NULL)
+ {
+ spaceLen += 2;
+ clientLen = strlen(mqttOptions->clientId);
+ }
+ if (mqttOptions->username != NULL)
+ {
+ spaceLen += 2;
+ usernameLen = strlen(mqttOptions->username);
+ }
+ if (mqttOptions->password != NULL)
+ {
+ spaceLen += 2;
+ passwordLen = strlen(mqttOptions->password);
+ }
+ if (mqttOptions->willMessage != NULL)
+ {
+ spaceLen += 2;
+ willMessageLen = strlen(mqttOptions->willMessage);
+ }
+ if (mqttOptions->willTopic != NULL)
+ {
+ spaceLen += 2;
+ willTopicLen = strlen(mqttOptions->willTopic);
+ }
+
+ size_t currLen = BUFFER_length(ctrlPacket);
+ size_t totalLen = clientLen + usernameLen + passwordLen + willMessageLen + willTopicLen + spaceLen;
+
+ // Validate the Username & Password
+ if (clientLen > USHRT_MAX)
+ {
+ result = __LINE__;
+ }
+ else if (usernameLen > 0 && passwordLen == 0)
+ {
+ result = __LINE__;
+ }
+ else if ( (willMessageLen > 0 && willTopicLen == 0) || (willTopicLen > 0 && willMessageLen == 0) )
+ {
+ result = __LINE__;
+ }
+ else if (BUFFER_enlarge(ctrlPacket, totalLen) != 0)
+ {
+ result = __LINE__;
+ }
+ else
+ {
+ uint8_t* packet = BUFFER_u_char(ctrlPacket);
+ uint8_t* iterator = packet;
+
+ iterator += currLen;
+ byteutil_writeUTF(&iterator, mqttOptions->clientId, (uint16_t)clientLen);
+
+ // TODO: Read on the Will Topic
+ if (willMessageLen > USHRT_MAX || willTopicLen > USHRT_MAX || usernameLen > USHRT_MAX || passwordLen > USHRT_MAX)
+ {
+ result = __LINE__;
+ }
+ else
+ {
+ if (willMessageLen > 0 && willTopicLen > 0)
+ {
+ packet[CONN_FLAG_BYTE_OFFSET] |= WILL_FLAG_FLAG;
+ byteutil_writeUTF(&iterator, mqttOptions->willTopic, (uint16_t)willTopicLen);
+ packet[CONN_FLAG_BYTE_OFFSET] |= mqttOptions->qualityOfServiceValue;
+ if (mqttOptions->messageRetain)
+ {
+ packet[CONN_FLAG_BYTE_OFFSET] |= WILL_RETAIN_FLAG;
+ }
+ byteutil_writeUTF(&iterator, mqttOptions->willMessage, (uint16_t)willMessageLen);
+ }
+ if (usernameLen > 0)
+ {
+ packet[CONN_FLAG_BYTE_OFFSET] |= USERNAME_FLAG | PASSWORD_FLAG;
+ byteutil_writeUTF(&iterator, mqttOptions->username, (uint16_t)usernameLen);
+ byteutil_writeUTF(&iterator, mqttOptions->password, (uint16_t)passwordLen);
+ }
+ // TODO: Get the rest of the flags
+ if (mqttOptions->useCleanSession)
+ {
+ packet[CONN_FLAG_BYTE_OFFSET] |= CLEAN_SESSION_FLAG;
+ }
+ result = 0;
+ }
+ }
+ }
+ return result;
+}
+
+static int prepareheaderDataInfo(MQTTCODEC_INSTANCE* codecData, uint8_t remainLen)
+{
+ int result;
+ if (codecData == NULL)
+ {
+ result = __LINE__;
+ }
+ else
+ {
+ result = 0;
+ codecData->storeRemainLen[codecData->remainLenIndex++] = remainLen;
+ if (remainLen < 0x7f)
+ {
+ int multiplier = 1;
+ int totalLen = 0;
+ size_t index = 0;
+ uint8_t encodeByte = 0;
+ do
+ {
+ encodeByte = codecData->storeRemainLen[index++];
+ totalLen += (encodeByte & 127) * multiplier;
+ multiplier *= NEXT_128_CHUNK;
+
+ if (multiplier > 128 * 128 * 128)
+ {
+ result = __LINE__;
+ break;
+ }
+ } while ( (encodeByte & NEXT_128_CHUNK) != 0);
+
+ if (totalLen > 0)
+ {
+ codecData->headerData = BUFFER_new();
+ (void)BUFFER_pre_build(codecData->headerData, totalLen);
+ codecData->bufferOffset = 0;
+ }
+ codecData->codecState = CODEC_STATE_VAR_HEADER;
+
+ // Reset remainLen Index
+ codecData->remainLenIndex = 0;
+ memset(codecData->storeRemainLen, 0, 4 * sizeof(uint8_t));
+ }
+ }
+ return result;
+}
+
+static void completePacketData(MQTTCODEC_INSTANCE* codecData)
+{
+ if (codecData)
+ {
+ if (codecData->packetComplete != NULL)
+ {
+ codecData->packetComplete(codecData->callContext, codecData->currPacket, codecData->headerFlags, codecData->headerData);
+ }
+
+ // Clean up data
+ codecData->currPacket = UNKNOWN_TYPE;
+ codecData->codecState = CODEC_STATE_FIXED_HEADER;
+ codecData->headerFlags = 0;
+ BUFFER_delete(codecData->headerData);
+ codecData->headerData = NULL;
+ }
+}
+
+MQTTCODEC_HANDLE mqtt_codec_create(ON_PACKET_COMPLETE_CALLBACK packetComplete, void* callbackCtx)
+{
+ MQTTCODEC_HANDLE result;
+ result = malloc(sizeof(MQTTCODEC_INSTANCE) );
+ /* Codes_SRS_MQTT_CODEC_07_001: [If a failure is encountered then mqtt_codec_create shall return NULL.] */
+ if (result != NULL)
+ {
+ /* Codes_SRS_MQTT_CODEC_07_002: [On success mqtt_codec_create shall return a MQTTCODEC_HANDLE value.] */
+ result->currPacket = UNKNOWN_TYPE;
+ result->codecState = CODEC_STATE_FIXED_HEADER;
+ result->headerFlags = 0;
+ result->bufferOffset = 0;
+ result->packetComplete = packetComplete;
+ result->callContext = callbackCtx;
+ result->headerData = NULL;
+ memset(result->storeRemainLen, 0, 4 * sizeof(uint8_t));
+ result->remainLenIndex = 0;
+ }
+ return result;
+}
+
+void mqtt_codec_destroy(MQTTCODEC_HANDLE handle)
+{
+ /* Codes_SRS_MQTT_CODEC_07_003: [If the handle parameter is NULL then mqtt_codec_destroy shall do nothing.] */
+ if (handle != NULL)
+ {
+ MQTTCODEC_INSTANCE* codecData = (MQTTCODEC_INSTANCE*)handle;
+ /* Codes_SRS_MQTT_CODEC_07_004: [mqtt_codec_destroy shall deallocate all memory that has been allocated by this object.] */
+ BUFFER_delete(codecData->headerData);
+ free(codecData);
+ }
+}
+
+BUFFER_HANDLE mqtt_codec_connect(const MQTT_CLIENT_OPTIONS* mqttOptions)
+{
+ BUFFER_HANDLE result;
+ /* Codes_SRS_MQTT_CODEC_07_008: [If the parameters mqttOptions is NULL then mqtt_codec_connect shall return a null value.] */
+ if (mqttOptions == NULL)
+ {
+ result = NULL;
+ }
+ else
+ {
+ /* Codes_SRS_MQTT_CODEC_07_009: [mqtt_codec_connect shall construct a BUFFER_HANDLE that represents a MQTT CONNECT packet.] */
+ result = BUFFER_new();
+ if (result != NULL)
+ {
+ // Add Variable Header Information
+ if (constructConnectVariableHeader(result, mqttOptions) != 0)
+ {
+ /* Codes_SRS_MQTT_CODEC_07_010: [If any error is encountered then mqtt_codec_connect shall return NULL.] */
+ BUFFER_delete(result);
+ result = NULL;
+ }
+ else
+ {
+ if (constructConnPayload(result, mqttOptions) != 0)
+ {
+ /* Codes_SRS_MQTT_CODEC_07_010: [If any error is encountered then mqtt_codec_connect shall return NULL.] */
+ BUFFER_delete(result);
+ result = NULL;
+ }
+ else
+ {
+ if (constructFixedHeader(result, CONNECT_TYPE, 0) != 0)
+ {
+ /* Codes_SRS_MQTT_CODEC_07_010: [If any error is encountered then mqtt_codec_connect shall return NULL.] */
+ BUFFER_delete(result);
+ result = NULL;
+ }
+ }
+ }
+ }
+ }
+ return result;
+}
+
+BUFFER_HANDLE mqtt_codec_disconnect()
+{
+ /* Codes_SRS_MQTT_CODEC_07_011: [On success mqtt_codec_disconnect shall construct a BUFFER_HANDLE that represents a MQTT DISCONNECT packet.] */
+ BUFFER_HANDLE result = BUFFER_new();
+ if (result != NULL)
+ {
+ if (BUFFER_enlarge(result, 2) != 0)
+ {
+ /* Codes_SRS_MQTT_CODEC_07_012: [If any error is encountered mqtt_codec_disconnect shall return NULL.] */
+ BUFFER_delete(result);
+ result = NULL;
+ }
+ else
+ {
+ uint8_t* iterator = BUFFER_u_char(result);
+ if (iterator == NULL)
+ {
+ /* Codes_SRS_MQTT_CODEC_07_012: [If any error is encountered mqtt_codec_disconnect shall return NULL.] */
+ BUFFER_delete(result);
+ result = NULL;
+ }
+ else
+ {
+ iterator[0] = DISCONNECT_TYPE;
+ iterator[1] = 0;
+ }
+ }
+ }
+ return result;
+}
+
+BUFFER_HANDLE mqtt_codec_publish(QOS_VALUE qosValue, bool duplicateMsg, bool serverRetain, uint16_t packetId, const char* topicName, const uint8_t* msgBuffer, size_t buffLen)
+{
+ BUFFER_HANDLE result;
+ /* Codes_SRS_MQTT_CODEC_07_005: [If the parameters topicName is NULL then mqtt_codec_publish shall return NULL.] */
+ if (topicName == NULL)
+ {
+ result = NULL;
+ }
+ /* Codes_SRS_MQTT_CODEC_07_036: [mqtt_codec_publish shall return NULL if the buffLen variable is greater than the MAX_SEND_SIZE (0xFFFFFF7F).] */
+ else if (buffLen > MAX_SEND_SIZE)
+ {
+ /* Codes_SRS_MQTT_CODEC_07_006: [If any error is encountered then mqtt_codec_publish shall return NULL.] */
+ result = NULL;
+ }
+ else
+ {
+ PUBLISH_HEADER_INFO publishInfo = { 0 };
+ publishInfo.topicName = topicName;
+ publishInfo.packetId = packetId;
+ publishInfo.qualityOfServiceValue = qosValue;
+
+ uint8_t headerFlags = 0;
+ if (duplicateMsg) headerFlags |= PUBLISH_DUP_FLAG;
+ if (serverRetain) headerFlags |= PUBLISH_QOS_RETAIN;
+ if (qosValue != DELIVER_AT_MOST_ONCE)
+ {
+ if (qosValue == DELIVER_AT_LEAST_ONCE)
+ {
+ headerFlags |= PUBLISH_QOS_AT_LEAST_ONCE;
+ }
+ else
+ {
+ headerFlags |= PUBLISH_QOS_EXACTLY_ONCE;
+ }
+ }
+
+ /* Codes_SRS_MQTT_CODEC_07_007: [mqtt_codec_publish shall return a BUFFER_HANDLE that represents a MQTT PUBLISH message.] */
+ result = BUFFER_new();
+ if (result != NULL)
+ {
+ if (constructPublishVariableHeader(result, &publishInfo) != 0)
+ {
+ /* Codes_SRS_MQTT_CODEC_07_006: [If any error is encountered then mqtt_codec_publish shall return NULL.] */
+ BUFFER_delete(result);
+ result = NULL;
+ }
+ else
+ {
+ size_t payloadOffset = BUFFER_length(result);
+ if (buffLen > 0)
+ {
+ if (BUFFER_enlarge(result, buffLen) != 0)
+ {
+ /* Codes_SRS_MQTT_CODEC_07_006: [If any error is encountered then mqtt_codec_publish shall return NULL.] */
+ BUFFER_delete(result);
+ result = NULL;
+ }
+ else
+ {
+ uint8_t* iterator = BUFFER_u_char(result);
+ if (iterator == NULL)
+ {
+ /* Codes_SRS_MQTT_CODEC_07_006: [If any error is encountered then mqtt_codec_publish shall return NULL.] */
+ BUFFER_delete(result);
+ result = NULL;
+ }
+ else
+ {
+ iterator += payloadOffset;
+ // Write Message
+ memcpy(iterator, msgBuffer, buffLen);
+ }
+ }
+ }
+
+ if (result != NULL)
+ {
+ if (constructFixedHeader(result, PUBLISH_TYPE, headerFlags) != 0)
+ {
+ /* Codes_SRS_MQTT_CODEC_07_006: [If any error is encountered then mqtt_codec_publish shall return NULL.] */
+ BUFFER_delete(result);
+ result = NULL;
+ }
+ }
+ }
+ }
+ }
+ return result;
+}
+
+BUFFER_HANDLE mqtt_codec_publishAck(uint16_t packetId)
+{
+ /* Codes_SRS_MQTT_CODEC_07_013: [On success mqtt_codec_publishAck shall return a BUFFER_HANDLE representation of a MQTT PUBACK packet.] */
+ /* Codes_SRS_MQTT_CODEC_07_014 : [If any error is encountered then mqtt_codec_publishAck shall return NULL.] */
+ BUFFER_HANDLE result = constructPublishReply(PUBACK_TYPE, 0, packetId);
+ return result;
+}
+
+BUFFER_HANDLE mqtt_codec_publishReceived(uint16_t packetId)
+{
+ /* Codes_SRS_MQTT_CODEC_07_015: [On success mqtt_codec_publishRecieved shall return a BUFFER_HANDLE representation of a MQTT PUBREC packet.] */
+ /* Codes_SRS_MQTT_CODEC_07_016 : [If any error is encountered then mqtt_codec_publishRecieved shall return NULL.] */
+ BUFFER_HANDLE result = constructPublishReply(PUBREC_TYPE, 0, packetId);
+ return result;
+}
+
+BUFFER_HANDLE mqtt_codec_publishRelease(uint16_t packetId)
+{
+ /* Codes_SRS_MQTT_CODEC_07_017: [On success mqtt_codec_publishRelease shall return a BUFFER_HANDLE representation of a MQTT PUBREL packet.] */
+ /* Codes_SRS_MQTT_CODEC_07_018 : [If any error is encountered then mqtt_codec_publishRelease shall return NULL.] */
+ BUFFER_HANDLE result = constructPublishReply(PUBREL_TYPE, 2, packetId);
+ return result;
+}
+
+BUFFER_HANDLE mqtt_codec_publishComplete(uint16_t packetId)
+{
+ /* Codes_SRS_MQTT_CODEC_07_019: [On success mqtt_codec_publishComplete shall return a BUFFER_HANDLE representation of a MQTT PUBCOMP packet.] */
+ /* Codes_SRS_MQTT_CODEC_07_020 : [If any error is encountered then mqtt_codec_publishComplete shall return NULL.] */
+ BUFFER_HANDLE result = constructPublishReply(PUBCOMP_TYPE, 0, packetId);
+ return result;
+}
+
+BUFFER_HANDLE mqtt_codec_ping()
+{
+ /* Codes_SRS_MQTT_CODEC_07_021: [On success mqtt_codec_ping shall construct a BUFFER_HANDLE that represents a MQTT PINGREQ packet.] */
+ BUFFER_HANDLE result = BUFFER_new();
+ if (result != NULL)
+ {
+ if (BUFFER_enlarge(result, 2) != 0)
+ {
+ /* Codes_SRS_MQTT_CODEC_07_022: [If any error is encountered mqtt_codec_ping shall return NULL.] */
+ BUFFER_delete(result);
+ result = NULL;
+ }
+ else
+ {
+ uint8_t* iterator = BUFFER_u_char(result);
+ if (iterator == NULL)
+ {
+ /* Codes_SRS_MQTT_CODEC_07_022: [If any error is encountered mqtt_codec_ping shall return NULL.] */
+ BUFFER_delete(result);
+ result = NULL;
+ }
+ else
+ {
+ iterator[0] = PINGREQ_TYPE;
+ iterator[1] = 0;
+ }
+ }
+ }
+ return result;
+}
+
+BUFFER_HANDLE mqtt_codec_subscribe(uint16_t packetId, SUBSCRIBE_PAYLOAD* subscribeList, size_t count)
+{
+ BUFFER_HANDLE result;
+ /* Codes_SRS_MQTT_CODEC_07_023: [If the parameters subscribeList is NULL or if count is 0 then mqtt_codec_subscribe shall return NULL.] */
+ if (subscribeList == NULL || count == 0)
+ {
+ result = NULL;
+ }
+ else
+ {
+ /* Codes_SRS_MQTT_CODEC_07_026: [mqtt_codec_subscribe shall return a BUFFER_HANDLE that represents a MQTT SUBSCRIBE message.]*/
+ result = BUFFER_new();
+ if (result != NULL)
+ {
+ if (constructSubscibeTypeVariableHeader(result, packetId) != 0)
+ {
+ /* Codes_SRS_MQTT_CODEC_07_025: [If any error is encountered then mqtt_codec_subscribe shall return NULL.] */
+ BUFFER_delete(result);
+ result = NULL;
+ }
+ else
+ {
+ /* Codes_SRS_MQTT_CODEC_07_024: [mqtt_codec_subscribe shall iterate through count items in the subscribeList.] */
+ if (addListItemsToSubscribePacket(result, subscribeList, count) != 0)
+ {
+ /* Codes_SRS_MQTT_CODEC_07_025: [If any error is encountered then mqtt_codec_subscribe shall return NULL.] */
+ BUFFER_delete(result);
+ result = NULL;
+ }
+ else
+ {
+ if (constructFixedHeader(result, SUBSCRIBE_TYPE, SUBSCRIBE_FIXED_HEADER_FLAG) != 0)
+ {
+ /* Codes_SRS_MQTT_CODEC_07_025: [If any error is encountered then mqtt_codec_subscribe shall return NULL.] */
+ BUFFER_delete(result);
+ result = NULL;
+ }
+ }
+ }
+ }
+ }
+ return result;
+}
+
+BUFFER_HANDLE mqtt_codec_unsubscribe(uint16_t packetId, const char** unsubscribeList, size_t count)
+{
+ BUFFER_HANDLE result;
+ /* Codes_SRS_MQTT_CODEC_07_027: [If the parameters unsubscribeList is NULL or if count is 0 then mqtt_codec_unsubscribe shall return NULL.] */
+ if (unsubscribeList == NULL || count == 0)
+ {
+ result = NULL;
+ }
+ else
+ {
+ /* Codes_SRS_MQTT_CODEC_07_030: [mqtt_codec_unsubscribe shall return a BUFFER_HANDLE that represents a MQTT SUBSCRIBE message.] */
+ result = BUFFER_new();
+ if (result != NULL)
+ {
+ if (constructSubscibeTypeVariableHeader(result, packetId) != 0)
+ {
+ /* Codes_SRS_MQTT_CODEC_07_029: [If any error is encountered then mqtt_codec_unsubscribe shall return NULL.] */
+ BUFFER_delete(result);
+ result = NULL;
+ }
+ else
+ {
+ /* Codes_SRS_MQTT_CODEC_07_028: [mqtt_codec_unsubscribe shall iterate through count items in the unsubscribeList.] */
+ if (addListItemsToUnsubscribePacket(result, unsubscribeList, count) != 0)
+ {
+ /* Codes_SRS_MQTT_CODEC_07_029: [If any error is encountered then mqtt_codec_unsubscribe shall return NULL.] */
+ BUFFER_delete(result);
+ result = NULL;
+ }
+ else
+ {
+ if (constructFixedHeader(result, UNSUBSCRIBE_TYPE, UNSUBSCRIBE_FIXED_HEADER_FLAG) != 0)
+ {
+ /* Codes_SRS_MQTT_CODEC_07_029: [If any error is encountered then mqtt_codec_unsubscribe shall return NULL.] */
+ BUFFER_delete(result);
+ result = NULL;
+ }
+ }
+ }
+ }
+ }
+ return result;
+}
+
+int mqtt_codec_bytesReceived(MQTTCODEC_HANDLE handle, const unsigned char* buffer, size_t size)
+{
+ int result;
+ MQTTCODEC_INSTANCE* codec_Data = (MQTTCODEC_INSTANCE*)handle;
+ /* Codes_SRS_MQTT_CODEC_07_031: [If the parameters handle or buffer is NULL then mqtt_codec_bytesReceived shall return a non-zero value.] */
+ if (codec_Data == NULL)
+ {
+ result = __LINE__;
+ }
+ /* Codes_SRS_MQTT_CODEC_07_031: [If the parameters handle or buffer is NULL then mqtt_codec_bytesReceived shall return a non-zero value.] */
+ /* Codes_SRS_MQTT_CODEC_07_032: [If the parameters size is zero then mqtt_codec_bytesReceived shall return a non-zero value.] */
+ else if (buffer == NULL || size == 0)
+ {
+ codec_Data->currPacket = PACKET_TYPE_ERROR;
+ result = __LINE__;
+ }
+ else
+ {
+ /* Codes_SRS_MQTT_CODEC_07_033: [mqtt_codec_bytesReceived constructs a sequence of bytes into the corresponding MQTT packets and on success returns zero.] */
+ result = 0;
+ for (size_t index = 0; index < size && result == 0; index++)
+ {
+ uint8_t iterator = ((int8_t*)buffer)[index];
+ if (codec_Data->codecState == CODEC_STATE_FIXED_HEADER)
+ {
+ if (codec_Data->currPacket == UNKNOWN_TYPE)
+ {
+ codec_Data->currPacket = processControlPacketType(iterator, &codec_Data->headerFlags);
+ }
+ else
+ {
+ if (prepareheaderDataInfo(codec_Data, iterator) != 0)
+ {
+ /* Codes_SRS_MQTT_CODEC_07_035: [If any error is encountered then the packet state will be marked as error and mqtt_codec_bytesReceived shall return a non-zero value.] */
+ codec_Data->currPacket = PACKET_TYPE_ERROR;
+ result = __LINE__;
+ }
+ if (codec_Data->currPacket == PINGRESP_TYPE)
+ {
+ /* Codes_SRS_MQTT_CODEC_07_034: [Upon a constructing a complete MQTT packet mqtt_codec_bytesReceived shall call the ON_PACKET_COMPLETE_CALLBACK function.] */
+ completePacketData(codec_Data);
+ }
+ }
+ }
+ else if (codec_Data->codecState == CODEC_STATE_VAR_HEADER)
+ {
+ size_t payloadLen = 0;
+ if (codec_Data->headerData == NULL)
+ {
+ codec_Data->codecState = CODEC_STATE_PAYLOAD;
+ }
+ else
+ {
+ uint8_t* dataBytes = BUFFER_u_char(codec_Data->headerData);
+ if (dataBytes == NULL)
+ {
+ /* Codes_SRS_MQTT_CODEC_07_035: [If any error is encountered then the packet state will be marked as error and mqtt_codec_bytesReceived shall return a non-zero value.] */
+ codec_Data->currPacket = PACKET_TYPE_ERROR;
+ result = __LINE__;
+ }
+ else
+ {
+ // Increment the data
+ dataBytes += codec_Data->bufferOffset++;
+ *dataBytes = iterator;
+
+ size_t totalLen = BUFFER_length(codec_Data->headerData);
+ if (codec_Data->bufferOffset >= totalLen)
+ {
+ /* Codes_SRS_MQTT_CODEC_07_034: [Upon a constructing a complete MQTT packet mqtt_codec_bytesReceived shall call the ON_PACKET_COMPLETE_CALLBACK function.] */
+ completePacketData(codec_Data);
+ }
+ }
+ }
+ }
+ else
+ {
+ /* Codes_SRS_MQTT_CODEC_07_035: [If any error is encountered then the packet state will be marked as error and mqtt_codec_bytesReceived shall return a non-zero value.] */
+ codec_Data->currPacket = PACKET_TYPE_ERROR;
+ result = __LINE__;
+ }
+ }
+ }
+ return result;
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/mqtt_message.c Fri Apr 08 12:01:23 2016 -0700
@@ -0,0 +1,247 @@
+// Copyright (c) Microsoft. All rights reserved.
+// Licensed under the MIT license. See LICENSE file in the project root for full license information.
+
+#include <stdlib.h>
+#include "azure_umqtt_c/mqtt_message.h"
+#include "azure_c_shared_utility/gballoc.h"
+
+typedef struct MQTT_MESSAGE_TAG
+{
+ uint16_t packetId;
+ char* topicName;
+ QOS_VALUE qosInfo;
+ APP_PAYLOAD appPayload;
+ bool isDuplicateMsg;
+ bool isMessageRetained;
+} MQTT_MESSAGE;
+
+MQTT_MESSAGE_HANDLE mqttmessage_create(uint16_t packetId, const char* topicName, QOS_VALUE qosValue, const uint8_t* appMsg, size_t appMsgLength)
+{
+ /* Codes_SRS_MQTTMESSAGE_07_001:[If the parameters topicName is NULL is zero then mqttmessage_create shall return NULL.] */
+ MQTT_MESSAGE* result;
+ if (topicName == NULL)
+ {
+ result = NULL;
+ }
+ else
+ {
+ /* Codes_SRS_MQTTMESSAGE_07_002: [mqttmessage_create shall allocate and copy the topicName and appMsg parameters.] */
+ result = malloc(sizeof(MQTT_MESSAGE));
+ if (result != NULL)
+ {
+ if (mallocAndStrcpy_s(&result->topicName, topicName) != 0)
+ {
+ /* Codes_SRS_MQTTMESSAGE_07_003: [If any memory allocation fails mqttmessage_create shall free any allocated memory and return NULL.] */
+ free(result);
+ result = NULL;
+ }
+ else
+ {
+ result->packetId = packetId;
+ result->isDuplicateMsg = false;
+ result->isMessageRetained = false;
+ result->qosInfo = qosValue;
+
+ /* Codes_SRS_MQTTMESSAGE_07_002: [mqttmessage_create shall allocate and copy the topicName and appMsg parameters.] */
+ result->appPayload.length = appMsgLength;
+ if (result->appPayload.length > 0)
+ {
+ result->appPayload.message = malloc(appMsgLength);
+ if (result->appPayload.message == NULL)
+ {
+ /* Codes_SRS_MQTTMESSAGE_07_003: [If any memory allocation fails mqttmessage_create shall free any allocated memory and return NULL.] */
+ free(result->topicName);
+ free(result);
+ result = NULL;
+ }
+ else
+ {
+ memcpy(result->appPayload.message, appMsg, appMsgLength);
+ }
+ }
+ else
+ {
+ result->appPayload.message = NULL;
+ }
+ }
+ }
+ }
+ /* Codes_SRS_MQTTMESSAGE_07_004: [If mqttmessage_createMessage succeeds the it shall return a NON-NULL MQTT_MESSAGE_HANDLE value.] */
+ return result;
+}
+
+void mqttmessage_destroy(MQTT_MESSAGE_HANDLE handle)
+{
+ MQTT_MESSAGE* msgInfo = (MQTT_MESSAGE*)handle;
+ /* Codes_SRS_MQTTMESSAGE_07_005: [If the handle parameter is NULL then mqttmessage_destroyMessage shall do nothing] */
+ if (msgInfo != NULL)
+ {
+ /* Codes_SRS_MQTTMESSAGE_07_006: [mqttmessage_destroyMessage shall free all resources associated with the MQTT_MESSAGE_HANDLE value] */
+ free(msgInfo->topicName);
+ if (msgInfo->appPayload.message != NULL)
+ {
+ free(msgInfo->appPayload.message);
+ }
+ free(msgInfo);
+ }
+}
+
+MQTT_MESSAGE_HANDLE mqttmessage_clone(MQTT_MESSAGE_HANDLE handle)
+{
+ MQTT_MESSAGE_HANDLE result;
+ if (handle == NULL)
+ {
+ /* Codes_SRS_MQTTMESSAGE_07_007: [If handle parameter is NULL then mqttmessage_clone shall return NULL.] */
+ result = NULL;
+ }
+ else
+ {
+ /* Codes_SRS_MQTTMESSAGE_07_008: [mqttmessage_clone shall create a new MQTT_MESSAGE_HANDLE with data content identical of the handle value.] */
+ MQTT_MESSAGE* mqtt_message = (MQTT_MESSAGE*)handle;
+ result = mqttmessage_create(mqtt_message->packetId, mqtt_message->topicName, mqtt_message->qosInfo, mqtt_message->appPayload.message, mqtt_message->appPayload.length);
+ if (result != NULL)
+ {
+ (void)mqttmessage_setIsDuplicateMsg(result, mqtt_message->isDuplicateMsg);
+ (void)mqttmessage_setIsRetained(result, mqtt_message->isMessageRetained);
+ }
+ }
+ return result;
+}
+
+uint16_t mqttmessage_getPacketId(MQTT_MESSAGE_HANDLE handle)
+{
+ uint16_t result;
+ if (handle == NULL)
+ {
+ /* Codes_SRS_MQTTMESSAGE_07_010: [If handle is NULL then mqttmessage_getPacketId shall return 0.] */
+ result = 0;
+ }
+ else
+ {
+ /* Codes_SRS_MQTTMESSAGE_07_011: [mqttmessage_getPacketId shall return the packetId value contained in MQTT_MESSAGE_HANDLE handle.] */
+ MQTT_MESSAGE* msgInfo = (MQTT_MESSAGE*)handle;
+ result = msgInfo->packetId;
+ }
+ return result;
+}
+
+const char* mqttmessage_getTopicName(MQTT_MESSAGE_HANDLE handle)
+{
+ const char* result;
+ if (handle == NULL)
+ {
+ /* Codes_SRS_MQTTMESSAGE_07_012: [If handle is NULL then mqttmessage_getTopicName shall return a NULL string.] */
+ result = NULL;
+ }
+ else
+ {
+ /* Codes_SRS_MQTTMESSAGE_07_013: [mqttmessage_getTopicName shall return the topicName contained in MQTT_MESSAGE_HANDLE handle.] */
+ MQTT_MESSAGE* msgInfo = (MQTT_MESSAGE*)handle;
+ result = msgInfo->topicName;
+ }
+ return result;
+}
+
+QOS_VALUE mqttmessage_getQosType(MQTT_MESSAGE_HANDLE handle)
+{
+ QOS_VALUE result;
+ if (handle == NULL)
+ {
+ /* Codes_SRS_MQTTMESSAGE_07_014: [If handle is NULL then mqttmessage_getQosType shall return the default DELIVER_AT_MOST_ONCE value.] */
+ result = DELIVER_AT_MOST_ONCE;
+ }
+ else
+ {
+ /* Codes_SRS_MQTTMESSAGE_07_015: [mqttmessage_getQosType shall return the QOS Type value contained in MQTT_MESSAGE_HANDLE handle.] */
+ MQTT_MESSAGE* msgInfo = (MQTT_MESSAGE*)handle;
+ result = msgInfo->qosInfo;
+ }
+ return result;
+}
+
+bool mqttmessage_getIsDuplicateMsg(MQTT_MESSAGE_HANDLE handle)
+{
+ bool result;
+ if (handle == NULL)
+ {
+ /* Codes_SRS_MQTTMESSAGE_07_016: [If handle is NULL then mqttmessage_getIsDuplicateMsg shall return false.] */
+ result = false;
+ }
+ else
+ {
+ /* Codes_SRS_MQTTMESSAGE_07_017: [mqttmessage_getIsDuplicateMsg shall return the isDuplicateMsg value contained in MQTT_MESSAGE_HANDLE handle.] */
+ MQTT_MESSAGE* msgInfo = (MQTT_MESSAGE*)handle;
+ result = msgInfo->isDuplicateMsg;
+ }
+ return result;
+}
+
+bool mqttmessage_getIsRetained(MQTT_MESSAGE_HANDLE handle)
+{
+ bool result;
+ if (handle == NULL)
+ {
+ /* Codes_SRS_MQTTMESSAGE_07_018: [If handle is NULL then mqttmessage_getIsRetained shall return false.] */
+ result = false;
+ }
+ else
+ {
+ /* Codes_SRS_MQTTMESSAGE_07_019: [mqttmessage_getIsRetained shall return the isRetained value contained in MQTT_MESSAGE_HANDLE handle.] */
+ MQTT_MESSAGE* msgInfo = (MQTT_MESSAGE*)handle;
+ result = msgInfo->isMessageRetained;
+ }
+ return result;
+}
+
+int mqttmessage_setIsDuplicateMsg(MQTT_MESSAGE_HANDLE handle, bool duplicateMsg)
+{
+ int result;
+ /* Codes_SRS_MQTTMESSAGE_07_022: [If handle is NULL then mqttmessage_setIsDuplicateMsg shall return a non-zero value.] */
+ if (handle == NULL)
+ {
+ result = __LINE__;
+ }
+ else
+ {
+ /* Codes_SRS_MQTTMESSAGE_07_023: [mqttmessage_setIsDuplicateMsg shall store the duplicateMsg value in the MQTT_MESSAGE_HANDLE handle.] */
+ MQTT_MESSAGE* msgInfo = (MQTT_MESSAGE*)handle;
+ msgInfo->isDuplicateMsg = duplicateMsg;
+ result = 0;
+ }
+ return result;
+}
+
+int mqttmessage_setIsRetained(MQTT_MESSAGE_HANDLE handle, bool retainMsg)
+{
+ int result;
+ /* Codes_SRS_MQTTMESSAGE_07_024: [If handle is NULL then mqttmessage_setIsRetained shall return a non-zero value.] */
+ if (handle == NULL)
+ {
+ result = __LINE__;
+ }
+ else
+ {
+ /* Codes_SRS_MQTTMESSAGE_07_025: [mqttmessage_setIsRetained shall store the retainMsg value in the MQTT_MESSAGE_HANDLE handle.] */
+ MQTT_MESSAGE* msgInfo = (MQTT_MESSAGE*)handle;
+ msgInfo->isMessageRetained = retainMsg;
+ result = 0;
+ }
+ return result;
+}
+
+const APP_PAYLOAD* mqttmessage_getApplicationMsg(MQTT_MESSAGE_HANDLE handle)
+{
+ const APP_PAYLOAD* result;
+ if (handle == NULL)
+ {
+ /* Codes_SRS_MQTTMESSAGE_07_020: [If handle is NULL or if msgLen is 0 then mqttmessage_getApplicationMsg shall return NULL.] */
+ result = NULL;
+ }
+ else
+ {
+ /* Codes_SRS_MQTTMESSAGE_07_021: [mqttmessage_getApplicationMsg shall return the applicationMsg value contained in MQTT_MESSAGE_HANDLE handle and the length of the appMsg in the msgLen parameter.] */
+ MQTT_MESSAGE* msgInfo = (MQTT_MESSAGE*)handle;
+ result = &msgInfo->appPayload;
+ }
+ return result;
+}