Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
umqtt/src/mqtt_client.c@0:f7f1f0d76dd6, 2018-08-23 (annotated)
- Committer:
- XinZhangMS
- Date:
- Thu Aug 23 06:52:14 2018 +0000
- Revision:
- 0:f7f1f0d76dd6
azure-c-sdk for mbed os supporting NUCLEO_F767ZI
Who changed what in which revision?
| User | Revision | Line number | New contents of line |
|---|---|---|---|
| XinZhangMS | 0:f7f1f0d76dd6 | 1 | // Copyright (c) Microsoft. All rights reserved. |
| XinZhangMS | 0:f7f1f0d76dd6 | 2 | // Licensed under the MIT license. See LICENSE file in the project root for full license information. |
| XinZhangMS | 0:f7f1f0d76dd6 | 3 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 4 | #include <stdlib.h> |
| XinZhangMS | 0:f7f1f0d76dd6 | 5 | #include "azure_c_shared_utility/optimize_size.h" |
| XinZhangMS | 0:f7f1f0d76dd6 | 6 | #include "azure_c_shared_utility/gballoc.h" |
| XinZhangMS | 0:f7f1f0d76dd6 | 7 | #include "azure_c_shared_utility/platform.h" |
| XinZhangMS | 0:f7f1f0d76dd6 | 8 | #include "azure_c_shared_utility/tickcounter.h" |
| XinZhangMS | 0:f7f1f0d76dd6 | 9 | #include "azure_c_shared_utility/crt_abstractions.h" |
| XinZhangMS | 0:f7f1f0d76dd6 | 10 | #include "azure_c_shared_utility/xlogging.h" |
| XinZhangMS | 0:f7f1f0d76dd6 | 11 | #include "azure_c_shared_utility/strings.h" |
| XinZhangMS | 0:f7f1f0d76dd6 | 12 | #include "azure_c_shared_utility/agenttime.h" |
| XinZhangMS | 0:f7f1f0d76dd6 | 13 | #include "azure_c_shared_utility/threadapi.h" |
| XinZhangMS | 0:f7f1f0d76dd6 | 14 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 15 | #include "azure_umqtt_c/mqtt_client.h" |
| XinZhangMS | 0:f7f1f0d76dd6 | 16 | #include "azure_umqtt_c/mqtt_codec.h" |
| XinZhangMS | 0:f7f1f0d76dd6 | 17 | #include <inttypes.h> |
| XinZhangMS | 0:f7f1f0d76dd6 | 18 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 19 | #define VARIABLE_HEADER_OFFSET 2 |
| XinZhangMS | 0:f7f1f0d76dd6 | 20 | #define RETAIN_FLAG_MASK 0x1 |
| XinZhangMS | 0:f7f1f0d76dd6 | 21 | #define QOS_LEAST_ONCE_FLAG_MASK 0x2 |
| XinZhangMS | 0:f7f1f0d76dd6 | 22 | #define QOS_EXACTLY_ONCE_FLAG_MASK 0x4 |
| XinZhangMS | 0:f7f1f0d76dd6 | 23 | #define DUPLICATE_FLAG_MASK 0x8 |
| XinZhangMS | 0:f7f1f0d76dd6 | 24 | #define CONNECT_PACKET_MASK 0xf0 |
| XinZhangMS | 0:f7f1f0d76dd6 | 25 | #define TIME_MAX_BUFFER 16 |
| XinZhangMS | 0:f7f1f0d76dd6 | 26 | #define DEFAULT_MAX_PING_RESPONSE_TIME 80 // % of time to send pings |
| XinZhangMS | 0:f7f1f0d76dd6 | 27 | #define MAX_CLOSE_RETRIES 2 |
| XinZhangMS | 0:f7f1f0d76dd6 | 28 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 29 | static const char* const TRUE_CONST = "true"; |
| XinZhangMS | 0:f7f1f0d76dd6 | 30 | static const char* const FALSE_CONST = "false"; |
| XinZhangMS | 0:f7f1f0d76dd6 | 31 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 32 | DEFINE_ENUM_STRINGS(QOS_VALUE, QOS_VALUE_VALUES); |
| XinZhangMS | 0:f7f1f0d76dd6 | 33 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 34 | typedef struct MQTT_CLIENT_TAG |
| XinZhangMS | 0:f7f1f0d76dd6 | 35 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 36 | XIO_HANDLE xioHandle; |
| XinZhangMS | 0:f7f1f0d76dd6 | 37 | MQTTCODEC_HANDLE codec_handle; |
| XinZhangMS | 0:f7f1f0d76dd6 | 38 | CONTROL_PACKET_TYPE packetState; |
| XinZhangMS | 0:f7f1f0d76dd6 | 39 | TICK_COUNTER_HANDLE packetTickCntr; |
| XinZhangMS | 0:f7f1f0d76dd6 | 40 | tickcounter_ms_t packetSendTimeMs; |
| XinZhangMS | 0:f7f1f0d76dd6 | 41 | ON_MQTT_OPERATION_CALLBACK fnOperationCallback; |
| XinZhangMS | 0:f7f1f0d76dd6 | 42 | ON_MQTT_MESSAGE_RECV_CALLBACK fnMessageRecv; |
| XinZhangMS | 0:f7f1f0d76dd6 | 43 | void* ctx; |
| XinZhangMS | 0:f7f1f0d76dd6 | 44 | ON_MQTT_ERROR_CALLBACK fnOnErrorCallBack; |
| XinZhangMS | 0:f7f1f0d76dd6 | 45 | void* errorCBCtx; |
| XinZhangMS | 0:f7f1f0d76dd6 | 46 | ON_MQTT_DISCONNECTED_CALLBACK disconnect_cb; |
| XinZhangMS | 0:f7f1f0d76dd6 | 47 | void* disconnect_ctx; |
| XinZhangMS | 0:f7f1f0d76dd6 | 48 | QOS_VALUE qosValue; |
| XinZhangMS | 0:f7f1f0d76dd6 | 49 | uint16_t keepAliveInterval; |
| XinZhangMS | 0:f7f1f0d76dd6 | 50 | MQTT_CLIENT_OPTIONS mqttOptions; |
| XinZhangMS | 0:f7f1f0d76dd6 | 51 | bool clientConnected; |
| XinZhangMS | 0:f7f1f0d76dd6 | 52 | bool socketConnected; |
| XinZhangMS | 0:f7f1f0d76dd6 | 53 | bool logTrace; |
| XinZhangMS | 0:f7f1f0d76dd6 | 54 | bool rawBytesTrace; |
| XinZhangMS | 0:f7f1f0d76dd6 | 55 | tickcounter_ms_t timeSincePing; |
| XinZhangMS | 0:f7f1f0d76dd6 | 56 | uint16_t maxPingRespTime; |
| XinZhangMS | 0:f7f1f0d76dd6 | 57 | } MQTT_CLIENT; |
| XinZhangMS | 0:f7f1f0d76dd6 | 58 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 59 | static void on_connection_closed(void* context) |
| XinZhangMS | 0:f7f1f0d76dd6 | 60 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 61 | MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context; |
| XinZhangMS | 0:f7f1f0d76dd6 | 62 | if (mqtt_client != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 63 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 64 | mqtt_client->socketConnected = false; |
| XinZhangMS | 0:f7f1f0d76dd6 | 65 | mqtt_client->clientConnected = false; |
| XinZhangMS | 0:f7f1f0d76dd6 | 66 | if (mqtt_client->disconnect_cb) |
| XinZhangMS | 0:f7f1f0d76dd6 | 67 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 68 | mqtt_client->disconnect_cb(mqtt_client->disconnect_ctx); |
| XinZhangMS | 0:f7f1f0d76dd6 | 69 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 70 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 71 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 72 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 73 | static void close_connection(MQTT_CLIENT* mqtt_client) |
| XinZhangMS | 0:f7f1f0d76dd6 | 74 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 75 | if (mqtt_client->socketConnected) |
| XinZhangMS | 0:f7f1f0d76dd6 | 76 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 77 | (void)xio_close(mqtt_client->xioHandle, on_connection_closed, mqtt_client); |
| XinZhangMS | 0:f7f1f0d76dd6 | 78 | if (mqtt_client->disconnect_cb == NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 79 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 80 | size_t counter = 0; |
| XinZhangMS | 0:f7f1f0d76dd6 | 81 | do |
| XinZhangMS | 0:f7f1f0d76dd6 | 82 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 83 | xio_dowork(mqtt_client->xioHandle); |
| XinZhangMS | 0:f7f1f0d76dd6 | 84 | counter++; |
| XinZhangMS | 0:f7f1f0d76dd6 | 85 | ThreadAPI_Sleep(2); |
| XinZhangMS | 0:f7f1f0d76dd6 | 86 | } while (mqtt_client->clientConnected && counter < MAX_CLOSE_RETRIES); |
| XinZhangMS | 0:f7f1f0d76dd6 | 87 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 88 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 89 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 90 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 91 | if (mqtt_client->disconnect_cb) |
| XinZhangMS | 0:f7f1f0d76dd6 | 92 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 93 | mqtt_client->disconnect_cb(mqtt_client->disconnect_ctx); |
| XinZhangMS | 0:f7f1f0d76dd6 | 94 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 95 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 96 | mqtt_client->xioHandle = NULL; |
| XinZhangMS | 0:f7f1f0d76dd6 | 97 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 98 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 99 | static void set_error_callback(MQTT_CLIENT* mqtt_client, MQTT_CLIENT_EVENT_ERROR error_type) |
| XinZhangMS | 0:f7f1f0d76dd6 | 100 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 101 | if (mqtt_client->fnOnErrorCallBack) |
| XinZhangMS | 0:f7f1f0d76dd6 | 102 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 103 | mqtt_client->fnOnErrorCallBack(mqtt_client, error_type, mqtt_client->errorCBCtx); |
| XinZhangMS | 0:f7f1f0d76dd6 | 104 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 105 | close_connection(mqtt_client); |
| XinZhangMS | 0:f7f1f0d76dd6 | 106 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 107 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 108 | static STRING_HANDLE construct_trace_log_handle(MQTT_CLIENT* mqtt_client) |
| XinZhangMS | 0:f7f1f0d76dd6 | 109 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 110 | STRING_HANDLE trace_log; |
| XinZhangMS | 0:f7f1f0d76dd6 | 111 | if (mqtt_client->logTrace) |
| XinZhangMS | 0:f7f1f0d76dd6 | 112 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 113 | trace_log = STRING_new(); |
| XinZhangMS | 0:f7f1f0d76dd6 | 114 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 115 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 116 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 117 | trace_log = NULL; |
| XinZhangMS | 0:f7f1f0d76dd6 | 118 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 119 | return trace_log; |
| XinZhangMS | 0:f7f1f0d76dd6 | 120 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 121 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 122 | static uint16_t byteutil_read_uint16(uint8_t** buffer, size_t len) |
| XinZhangMS | 0:f7f1f0d76dd6 | 123 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 124 | uint16_t result = 0; |
| XinZhangMS | 0:f7f1f0d76dd6 | 125 | if (buffer != NULL && *buffer != NULL && len >= 2) |
| XinZhangMS | 0:f7f1f0d76dd6 | 126 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 127 | result = 256 * (**buffer) + (*(*buffer + 1)); |
| XinZhangMS | 0:f7f1f0d76dd6 | 128 | *buffer += 2; // Move the ptr |
| XinZhangMS | 0:f7f1f0d76dd6 | 129 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 130 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 131 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 132 | LOG(AZ_LOG_ERROR, LOG_LINE, "byteutil_read_uint16 == NULL or less than 2"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 133 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 134 | return result; |
| XinZhangMS | 0:f7f1f0d76dd6 | 135 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 136 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 137 | static char* byteutil_readUTF(uint8_t** buffer, size_t* byteLen) |
| XinZhangMS | 0:f7f1f0d76dd6 | 138 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 139 | char* result = NULL; |
| XinZhangMS | 0:f7f1f0d76dd6 | 140 | if (buffer != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 141 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 142 | // Get the length of the string |
| XinZhangMS | 0:f7f1f0d76dd6 | 143 | uint16_t len = byteutil_read_uint16(buffer, *byteLen); |
| XinZhangMS | 0:f7f1f0d76dd6 | 144 | if (len > 0) |
| XinZhangMS | 0:f7f1f0d76dd6 | 145 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 146 | result = (char*)malloc(len + 1); |
| XinZhangMS | 0:f7f1f0d76dd6 | 147 | if (result != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 148 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 149 | (void)memcpy(result, *buffer, len); |
| XinZhangMS | 0:f7f1f0d76dd6 | 150 | result[len] = '\0'; |
| XinZhangMS | 0:f7f1f0d76dd6 | 151 | *buffer += len; |
| XinZhangMS | 0:f7f1f0d76dd6 | 152 | if (byteLen != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 153 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 154 | *byteLen = len; |
| XinZhangMS | 0:f7f1f0d76dd6 | 155 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 156 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 157 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 158 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 159 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 160 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 161 | LOG(AZ_LOG_ERROR, LOG_LINE, "readByte buffer == NULL."); |
| XinZhangMS | 0:f7f1f0d76dd6 | 162 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 163 | return result; |
| XinZhangMS | 0:f7f1f0d76dd6 | 164 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 165 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 166 | static uint8_t byteutil_readByte(uint8_t** buffer) |
| XinZhangMS | 0:f7f1f0d76dd6 | 167 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 168 | uint8_t result = 0; |
| XinZhangMS | 0:f7f1f0d76dd6 | 169 | if (buffer != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 170 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 171 | result = **buffer; |
| XinZhangMS | 0:f7f1f0d76dd6 | 172 | (*buffer)++; |
| XinZhangMS | 0:f7f1f0d76dd6 | 173 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 174 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 175 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 176 | LOG(AZ_LOG_ERROR, LOG_LINE, "readByte buffer == NULL."); |
| XinZhangMS | 0:f7f1f0d76dd6 | 177 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 178 | return result; |
| XinZhangMS | 0:f7f1f0d76dd6 | 179 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 180 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 181 | static void sendComplete(void* context, IO_SEND_RESULT send_result) |
| XinZhangMS | 0:f7f1f0d76dd6 | 182 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 183 | MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context; |
| XinZhangMS | 0:f7f1f0d76dd6 | 184 | if (mqtt_client != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 185 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 186 | if (send_result == IO_SEND_OK) |
| XinZhangMS | 0:f7f1f0d76dd6 | 187 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 188 | if (mqtt_client->packetState == DISCONNECT_TYPE) |
| XinZhangMS | 0:f7f1f0d76dd6 | 189 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 190 | /*Codes_SRS_MQTT_CLIENT_07_032: [If the actionResult parameter is of type MQTT_CLIENT_ON_DISCONNECT the the msgInfo value shall be NULL.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 191 | if (mqtt_client->fnOperationCallback != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 192 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 193 | mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_DISCONNECT, NULL, mqtt_client->ctx); |
| XinZhangMS | 0:f7f1f0d76dd6 | 194 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 195 | // close the xio |
| XinZhangMS | 0:f7f1f0d76dd6 | 196 | close_connection(mqtt_client); |
| XinZhangMS | 0:f7f1f0d76dd6 | 197 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 198 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 199 | else if (send_result == IO_SEND_ERROR) |
| XinZhangMS | 0:f7f1f0d76dd6 | 200 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 201 | LOG(AZ_LOG_ERROR, LOG_LINE, "MQTT Send Complete Failure send_result: %d", (int)send_result); |
| XinZhangMS | 0:f7f1f0d76dd6 | 202 | set_error_callback(mqtt_client, MQTT_CLIENT_COMMUNICATION_ERROR); |
| XinZhangMS | 0:f7f1f0d76dd6 | 203 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 204 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 205 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 206 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 207 | LOG(AZ_LOG_ERROR, LOG_LINE, "MQTT Send Complete Failure with NULL mqtt_client"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 208 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 209 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 210 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 211 | #ifndef NO_LOGGING |
| XinZhangMS | 0:f7f1f0d76dd6 | 212 | static const char* retrievePacketType(CONTROL_PACKET_TYPE packet) |
| XinZhangMS | 0:f7f1f0d76dd6 | 213 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 214 | switch (packet&CONNECT_PACKET_MASK) |
| XinZhangMS | 0:f7f1f0d76dd6 | 215 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 216 | case CONNECT_TYPE: return "CONNECT"; |
| XinZhangMS | 0:f7f1f0d76dd6 | 217 | case CONNACK_TYPE: return "CONNACK"; |
| XinZhangMS | 0:f7f1f0d76dd6 | 218 | case PUBLISH_TYPE: return "PUBLISH"; |
| XinZhangMS | 0:f7f1f0d76dd6 | 219 | case PUBACK_TYPE: return "PUBACK"; |
| XinZhangMS | 0:f7f1f0d76dd6 | 220 | case PUBREC_TYPE: return "PUBREC"; |
| XinZhangMS | 0:f7f1f0d76dd6 | 221 | case PUBREL_TYPE: return "PUBREL"; |
| XinZhangMS | 0:f7f1f0d76dd6 | 222 | case SUBSCRIBE_TYPE: return "SUBSCRIBE"; |
| XinZhangMS | 0:f7f1f0d76dd6 | 223 | case SUBACK_TYPE: return "SUBACK"; |
| XinZhangMS | 0:f7f1f0d76dd6 | 224 | case UNSUBSCRIBE_TYPE: return "UNSUBSCRIBE"; |
| XinZhangMS | 0:f7f1f0d76dd6 | 225 | case UNSUBACK_TYPE: return "UNSUBACK"; |
| XinZhangMS | 0:f7f1f0d76dd6 | 226 | case PINGREQ_TYPE: return "PINGREQ"; |
| XinZhangMS | 0:f7f1f0d76dd6 | 227 | case PINGRESP_TYPE: return "PINGRESP"; |
| XinZhangMS | 0:f7f1f0d76dd6 | 228 | case DISCONNECT_TYPE: return "DISCONNECT"; |
| XinZhangMS | 0:f7f1f0d76dd6 | 229 | default: |
| XinZhangMS | 0:f7f1f0d76dd6 | 230 | case PACKET_TYPE_ERROR: |
| XinZhangMS | 0:f7f1f0d76dd6 | 231 | case UNKNOWN_TYPE: |
| XinZhangMS | 0:f7f1f0d76dd6 | 232 | return "UNKNOWN"; |
| XinZhangMS | 0:f7f1f0d76dd6 | 233 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 234 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 235 | #endif // NO_LOGGING |
| XinZhangMS | 0:f7f1f0d76dd6 | 236 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 237 | static void getLogTime(char* timeResult, size_t len) |
| XinZhangMS | 0:f7f1f0d76dd6 | 238 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 239 | if (timeResult != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 240 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 241 | time_t agent_time = get_time(NULL); |
| XinZhangMS | 0:f7f1f0d76dd6 | 242 | if (agent_time == (time_t)-1) |
| XinZhangMS | 0:f7f1f0d76dd6 | 243 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 244 | timeResult[0] = '\0'; |
| XinZhangMS | 0:f7f1f0d76dd6 | 245 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 246 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 247 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 248 | struct tm* tmInfo = localtime(&agent_time); |
| XinZhangMS | 0:f7f1f0d76dd6 | 249 | if (tmInfo == NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 250 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 251 | timeResult[0] = '\0'; |
| XinZhangMS | 0:f7f1f0d76dd6 | 252 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 253 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 254 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 255 | if (strftime(timeResult, len, "%H:%M:%S", tmInfo) == 0) |
| XinZhangMS | 0:f7f1f0d76dd6 | 256 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 257 | timeResult[0] = '\0'; |
| XinZhangMS | 0:f7f1f0d76dd6 | 258 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 259 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 260 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 261 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 262 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 263 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 264 | static void log_outgoing_trace(MQTT_CLIENT* mqtt_client, STRING_HANDLE trace_log) |
| XinZhangMS | 0:f7f1f0d76dd6 | 265 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 266 | if (mqtt_client != NULL && mqtt_client->logTrace && trace_log != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 267 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 268 | char tmBuffer[TIME_MAX_BUFFER]; |
| XinZhangMS | 0:f7f1f0d76dd6 | 269 | getLogTime(tmBuffer, TIME_MAX_BUFFER); |
| XinZhangMS | 0:f7f1f0d76dd6 | 270 | LOG(AZ_LOG_TRACE, LOG_LINE, "-> %s %s", tmBuffer, STRING_c_str(trace_log)); |
| XinZhangMS | 0:f7f1f0d76dd6 | 271 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 272 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 273 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 274 | static void logOutgoingRawTrace(MQTT_CLIENT* mqtt_client, const uint8_t* data, size_t length) |
| XinZhangMS | 0:f7f1f0d76dd6 | 275 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 276 | if (mqtt_client != NULL && data != NULL && length > 0 && mqtt_client->rawBytesTrace) |
| XinZhangMS | 0:f7f1f0d76dd6 | 277 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 278 | char tmBuffer[TIME_MAX_BUFFER]; |
| XinZhangMS | 0:f7f1f0d76dd6 | 279 | getLogTime(tmBuffer, TIME_MAX_BUFFER); |
| XinZhangMS | 0:f7f1f0d76dd6 | 280 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 281 | LOG(AZ_LOG_TRACE, 0, "-> %s %s: ", tmBuffer, retrievePacketType((unsigned char)data[0])); |
| XinZhangMS | 0:f7f1f0d76dd6 | 282 | size_t index = 0; |
| XinZhangMS | 0:f7f1f0d76dd6 | 283 | for (index = 0; index < length; index++) |
| XinZhangMS | 0:f7f1f0d76dd6 | 284 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 285 | LOG(AZ_LOG_TRACE, 0, "0x%02x ", data[index]); |
| XinZhangMS | 0:f7f1f0d76dd6 | 286 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 287 | LOG(AZ_LOG_TRACE, LOG_LINE, ""); |
| XinZhangMS | 0:f7f1f0d76dd6 | 288 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 289 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 290 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 291 | static void log_incoming_trace(MQTT_CLIENT* mqtt_client, STRING_HANDLE trace_log) |
| XinZhangMS | 0:f7f1f0d76dd6 | 292 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 293 | if (mqtt_client != NULL && mqtt_client->logTrace && trace_log != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 294 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 295 | char tmBuffer[TIME_MAX_BUFFER]; |
| XinZhangMS | 0:f7f1f0d76dd6 | 296 | getLogTime(tmBuffer, TIME_MAX_BUFFER); |
| XinZhangMS | 0:f7f1f0d76dd6 | 297 | LOG(AZ_LOG_TRACE, LOG_LINE, "<- %s %s", tmBuffer, STRING_c_str(trace_log) ); |
| XinZhangMS | 0:f7f1f0d76dd6 | 298 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 299 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 300 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 301 | static void logIncomingRawTrace(MQTT_CLIENT* mqtt_client, CONTROL_PACKET_TYPE packet, uint8_t flags, const uint8_t* data, size_t length) |
| XinZhangMS | 0:f7f1f0d76dd6 | 302 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 303 | #ifdef NO_LOGGING |
| XinZhangMS | 0:f7f1f0d76dd6 | 304 | UNUSED(flags); |
| XinZhangMS | 0:f7f1f0d76dd6 | 305 | #endif |
| XinZhangMS | 0:f7f1f0d76dd6 | 306 | if (mqtt_client != NULL && mqtt_client->rawBytesTrace) |
| XinZhangMS | 0:f7f1f0d76dd6 | 307 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 308 | if (data != NULL && length > 0) |
| XinZhangMS | 0:f7f1f0d76dd6 | 309 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 310 | char tmBuffer[TIME_MAX_BUFFER]; |
| XinZhangMS | 0:f7f1f0d76dd6 | 311 | getLogTime(tmBuffer, TIME_MAX_BUFFER); |
| XinZhangMS | 0:f7f1f0d76dd6 | 312 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 313 | LOG(AZ_LOG_TRACE, 0, "<- %s %s: 0x%02x 0x%02x ", tmBuffer, retrievePacketType((CONTROL_PACKET_TYPE)packet), (unsigned char)(packet | flags), length); |
| XinZhangMS | 0:f7f1f0d76dd6 | 314 | size_t index = 0; |
| XinZhangMS | 0:f7f1f0d76dd6 | 315 | for (index = 0; index < length; index++) |
| XinZhangMS | 0:f7f1f0d76dd6 | 316 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 317 | LOG(AZ_LOG_TRACE, 0, "0x%02x ", data[index]); |
| XinZhangMS | 0:f7f1f0d76dd6 | 318 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 319 | LOG(AZ_LOG_TRACE, LOG_LINE, ""); |
| XinZhangMS | 0:f7f1f0d76dd6 | 320 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 321 | else if (packet == PINGRESP_TYPE) |
| XinZhangMS | 0:f7f1f0d76dd6 | 322 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 323 | char tmBuffer[TIME_MAX_BUFFER]; |
| XinZhangMS | 0:f7f1f0d76dd6 | 324 | getLogTime(tmBuffer, TIME_MAX_BUFFER); |
| XinZhangMS | 0:f7f1f0d76dd6 | 325 | LOG(AZ_LOG_TRACE, LOG_LINE, "<- %s %s: 0x%02x 0x%02x ", tmBuffer, retrievePacketType((CONTROL_PACKET_TYPE)packet), (unsigned char)(packet | flags), length); |
| XinZhangMS | 0:f7f1f0d76dd6 | 326 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 327 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 328 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 329 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 330 | static int sendPacketItem(MQTT_CLIENT* mqtt_client, const unsigned char* data, size_t length) |
| XinZhangMS | 0:f7f1f0d76dd6 | 331 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 332 | int result; |
| XinZhangMS | 0:f7f1f0d76dd6 | 333 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 334 | if (tickcounter_get_current_ms(mqtt_client->packetTickCntr, &mqtt_client->packetSendTimeMs) != 0) |
| XinZhangMS | 0:f7f1f0d76dd6 | 335 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 336 | LOG(AZ_LOG_ERROR, LOG_LINE, "Failure getting current ms tickcounter"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 337 | result = __FAILURE__; |
| XinZhangMS | 0:f7f1f0d76dd6 | 338 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 339 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 340 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 341 | result = xio_send(mqtt_client->xioHandle, (const void*)data, length, sendComplete, mqtt_client); |
| XinZhangMS | 0:f7f1f0d76dd6 | 342 | if (result != 0) |
| XinZhangMS | 0:f7f1f0d76dd6 | 343 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 344 | LOG(AZ_LOG_ERROR, LOG_LINE, "%d: Failure sending control packet data", result); |
| XinZhangMS | 0:f7f1f0d76dd6 | 345 | result = __FAILURE__; |
| XinZhangMS | 0:f7f1f0d76dd6 | 346 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 347 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 348 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 349 | logOutgoingRawTrace(mqtt_client, (const uint8_t*)data, length); |
| XinZhangMS | 0:f7f1f0d76dd6 | 350 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 351 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 352 | return result; |
| XinZhangMS | 0:f7f1f0d76dd6 | 353 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 354 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 355 | static void onOpenComplete(void* context, IO_OPEN_RESULT open_result) |
| XinZhangMS | 0:f7f1f0d76dd6 | 356 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 357 | MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context; |
| XinZhangMS | 0:f7f1f0d76dd6 | 358 | if (mqtt_client != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 359 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 360 | if (open_result == IO_OPEN_OK && !mqtt_client->socketConnected) |
| XinZhangMS | 0:f7f1f0d76dd6 | 361 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 362 | mqtt_client->packetState = CONNECT_TYPE; |
| XinZhangMS | 0:f7f1f0d76dd6 | 363 | mqtt_client->socketConnected = true; |
| XinZhangMS | 0:f7f1f0d76dd6 | 364 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 365 | STRING_HANDLE trace_log = construct_trace_log_handle(mqtt_client); |
| XinZhangMS | 0:f7f1f0d76dd6 | 366 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 367 | // Send the Connect packet |
| XinZhangMS | 0:f7f1f0d76dd6 | 368 | BUFFER_HANDLE connPacket = mqtt_codec_connect(&mqtt_client->mqttOptions, trace_log); |
| XinZhangMS | 0:f7f1f0d76dd6 | 369 | if (connPacket == NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 370 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 371 | LOG(AZ_LOG_ERROR, LOG_LINE, "Error: mqtt_codec_connect failed"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 372 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 373 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 374 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 375 | size_t size = BUFFER_length(connPacket); |
| XinZhangMS | 0:f7f1f0d76dd6 | 376 | /*Codes_SRS_MQTT_CLIENT_07_009: [On success mqtt_client_connect shall send the MQTT CONNECT to the endpoint.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 377 | if (sendPacketItem(mqtt_client, BUFFER_u_char(connPacket), size) != 0) |
| XinZhangMS | 0:f7f1f0d76dd6 | 378 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 379 | LOG(AZ_LOG_ERROR, LOG_LINE, "Error: mqtt_codec_connect failed"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 380 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 381 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 382 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 383 | log_outgoing_trace(mqtt_client, trace_log); |
| XinZhangMS | 0:f7f1f0d76dd6 | 384 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 385 | BUFFER_delete(connPacket); |
| XinZhangMS | 0:f7f1f0d76dd6 | 386 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 387 | if (trace_log != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 388 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 389 | STRING_delete(trace_log); |
| XinZhangMS | 0:f7f1f0d76dd6 | 390 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 391 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 392 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 393 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 394 | if (mqtt_client->socketConnected == false && mqtt_client->fnOnErrorCallBack) |
| XinZhangMS | 0:f7f1f0d76dd6 | 395 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 396 | mqtt_client->fnOnErrorCallBack(mqtt_client, MQTT_CLIENT_CONNECTION_ERROR, mqtt_client->errorCBCtx); |
| XinZhangMS | 0:f7f1f0d76dd6 | 397 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 398 | close_connection(mqtt_client); |
| XinZhangMS | 0:f7f1f0d76dd6 | 399 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 400 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 401 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 402 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 403 | LOG(AZ_LOG_ERROR, LOG_LINE, "Error: mqtt_client is NULL"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 404 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 405 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 406 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 407 | static void onBytesReceived(void* context, const unsigned char* buffer, size_t size) |
| XinZhangMS | 0:f7f1f0d76dd6 | 408 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 409 | MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context; |
| XinZhangMS | 0:f7f1f0d76dd6 | 410 | if (mqtt_client != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 411 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 412 | if (mqtt_codec_bytesReceived(mqtt_client->codec_handle, buffer, size) != 0) |
| XinZhangMS | 0:f7f1f0d76dd6 | 413 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 414 | set_error_callback(mqtt_client, MQTT_CLIENT_PARSE_ERROR); |
| XinZhangMS | 0:f7f1f0d76dd6 | 415 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 416 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 417 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 418 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 419 | LOG(AZ_LOG_ERROR, LOG_LINE, "Error: mqtt_client is NULL"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 420 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 421 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 422 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 423 | static void onIoError(void* context) |
| XinZhangMS | 0:f7f1f0d76dd6 | 424 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 425 | MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context; |
| XinZhangMS | 0:f7f1f0d76dd6 | 426 | if (mqtt_client != NULL && mqtt_client->fnOperationCallback) |
| XinZhangMS | 0:f7f1f0d76dd6 | 427 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 428 | /*Codes_SRS_MQTT_CLIENT_07_032: [If the actionResult parameter is of type MQTT_CLIENT_ON_DISCONNECT the the msgInfo value shall be NULL.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 429 | /* Codes_SRS_MQTT_CLIENT_07_036: [ If an error is encountered by the ioHandle the mqtt_client shall call xio_close. ] */ |
| XinZhangMS | 0:f7f1f0d76dd6 | 430 | set_error_callback(mqtt_client, MQTT_CLIENT_CONNECTION_ERROR); |
| XinZhangMS | 0:f7f1f0d76dd6 | 431 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 432 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 433 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 434 | LOG(AZ_LOG_ERROR, LOG_LINE, "Error invalid parameter: mqtt_client: %p", mqtt_client); |
| XinZhangMS | 0:f7f1f0d76dd6 | 435 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 436 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 437 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 438 | static void clear_mqtt_options(MQTT_CLIENT* mqtt_client) |
| XinZhangMS | 0:f7f1f0d76dd6 | 439 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 440 | if (mqtt_client->mqttOptions.clientId != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 441 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 442 | free(mqtt_client->mqttOptions.clientId); |
| XinZhangMS | 0:f7f1f0d76dd6 | 443 | mqtt_client->mqttOptions.clientId = NULL; |
| XinZhangMS | 0:f7f1f0d76dd6 | 444 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 445 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 446 | if (mqtt_client->mqttOptions.willTopic != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 447 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 448 | free(mqtt_client->mqttOptions.willTopic); |
| XinZhangMS | 0:f7f1f0d76dd6 | 449 | mqtt_client->mqttOptions.willTopic = NULL; |
| XinZhangMS | 0:f7f1f0d76dd6 | 450 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 451 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 452 | if (mqtt_client->mqttOptions.willMessage != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 453 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 454 | free(mqtt_client->mqttOptions.willMessage); |
| XinZhangMS | 0:f7f1f0d76dd6 | 455 | mqtt_client->mqttOptions.willMessage = NULL; |
| XinZhangMS | 0:f7f1f0d76dd6 | 456 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 457 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 458 | if (mqtt_client->mqttOptions.username != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 459 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 460 | free(mqtt_client->mqttOptions.username); |
| XinZhangMS | 0:f7f1f0d76dd6 | 461 | mqtt_client->mqttOptions.username = NULL; |
| XinZhangMS | 0:f7f1f0d76dd6 | 462 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 463 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 464 | if (mqtt_client->mqttOptions.password != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 465 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 466 | free(mqtt_client->mqttOptions.password); |
| XinZhangMS | 0:f7f1f0d76dd6 | 467 | mqtt_client->mqttOptions.password = NULL; |
| XinZhangMS | 0:f7f1f0d76dd6 | 468 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 469 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 470 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 471 | static int cloneMqttOptions(MQTT_CLIENT* mqtt_client, const MQTT_CLIENT_OPTIONS* mqttOptions) |
| XinZhangMS | 0:f7f1f0d76dd6 | 472 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 473 | int result = 0; |
| XinZhangMS | 0:f7f1f0d76dd6 | 474 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 475 | if (mqttOptions->clientId != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 476 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 477 | char* clientId; |
| XinZhangMS | 0:f7f1f0d76dd6 | 478 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 479 | if (mallocAndStrcpy_s(&clientId, mqttOptions->clientId) != 0) |
| XinZhangMS | 0:f7f1f0d76dd6 | 480 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 481 | result = __FAILURE__; |
| XinZhangMS | 0:f7f1f0d76dd6 | 482 | LOG(AZ_LOG_ERROR, LOG_LINE, "mallocAndStrcpy_s clientId"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 483 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 484 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 485 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 486 | if (mqtt_client->mqttOptions.clientId != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 487 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 488 | free(mqtt_client->mqttOptions.clientId); |
| XinZhangMS | 0:f7f1f0d76dd6 | 489 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 490 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 491 | mqtt_client->mqttOptions.clientId = clientId; |
| XinZhangMS | 0:f7f1f0d76dd6 | 492 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 493 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 494 | if (result == 0 && mqttOptions->willTopic != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 495 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 496 | char* willTopic; |
| XinZhangMS | 0:f7f1f0d76dd6 | 497 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 498 | if (mallocAndStrcpy_s(&willTopic, mqttOptions->willTopic) != 0) |
| XinZhangMS | 0:f7f1f0d76dd6 | 499 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 500 | result = __FAILURE__; |
| XinZhangMS | 0:f7f1f0d76dd6 | 501 | LOG(AZ_LOG_ERROR, LOG_LINE, "mallocAndStrcpy_s willTopic"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 502 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 503 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 504 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 505 | if (mqtt_client->mqttOptions.willTopic != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 506 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 507 | free(mqtt_client->mqttOptions.willTopic); |
| XinZhangMS | 0:f7f1f0d76dd6 | 508 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 509 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 510 | mqtt_client->mqttOptions.willTopic = willTopic; |
| XinZhangMS | 0:f7f1f0d76dd6 | 511 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 512 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 513 | if (result == 0 && mqttOptions->willMessage != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 514 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 515 | char* willMessage; |
| XinZhangMS | 0:f7f1f0d76dd6 | 516 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 517 | if (mallocAndStrcpy_s(&willMessage, mqttOptions->willMessage) != 0) |
| XinZhangMS | 0:f7f1f0d76dd6 | 518 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 519 | LOG(AZ_LOG_ERROR, LOG_LINE, "mallocAndStrcpy_s willMessage"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 520 | result = __FAILURE__; |
| XinZhangMS | 0:f7f1f0d76dd6 | 521 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 522 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 523 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 524 | if (mqtt_client->mqttOptions.willMessage != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 525 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 526 | free(mqtt_client->mqttOptions.willMessage); |
| XinZhangMS | 0:f7f1f0d76dd6 | 527 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 528 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 529 | mqtt_client->mqttOptions.willMessage = willMessage; |
| XinZhangMS | 0:f7f1f0d76dd6 | 530 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 531 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 532 | if (result == 0 && mqttOptions->username != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 533 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 534 | char* username; |
| XinZhangMS | 0:f7f1f0d76dd6 | 535 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 536 | if (mallocAndStrcpy_s(&username, mqttOptions->username) != 0) |
| XinZhangMS | 0:f7f1f0d76dd6 | 537 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 538 | LOG(AZ_LOG_ERROR, LOG_LINE, "mallocAndStrcpy_s username"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 539 | result = __FAILURE__; |
| XinZhangMS | 0:f7f1f0d76dd6 | 540 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 541 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 542 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 543 | if (mqtt_client->mqttOptions.username != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 544 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 545 | free(mqtt_client->mqttOptions.username); |
| XinZhangMS | 0:f7f1f0d76dd6 | 546 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 547 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 548 | mqtt_client->mqttOptions.username = username; |
| XinZhangMS | 0:f7f1f0d76dd6 | 549 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 550 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 551 | if (result == 0 && mqttOptions->password != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 552 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 553 | char* password; |
| XinZhangMS | 0:f7f1f0d76dd6 | 554 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 555 | if (mallocAndStrcpy_s(&password, mqttOptions->password) != 0) |
| XinZhangMS | 0:f7f1f0d76dd6 | 556 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 557 | LOG(AZ_LOG_ERROR, LOG_LINE, "mallocAndStrcpy_s password"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 558 | result = __FAILURE__; |
| XinZhangMS | 0:f7f1f0d76dd6 | 559 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 560 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 561 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 562 | if (mqtt_client->mqttOptions.password != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 563 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 564 | free(mqtt_client->mqttOptions.password); |
| XinZhangMS | 0:f7f1f0d76dd6 | 565 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 566 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 567 | mqtt_client->mqttOptions.password = password; |
| XinZhangMS | 0:f7f1f0d76dd6 | 568 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 569 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 570 | if (result == 0) |
| XinZhangMS | 0:f7f1f0d76dd6 | 571 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 572 | mqtt_client->mqttOptions.keepAliveInterval = mqttOptions->keepAliveInterval; |
| XinZhangMS | 0:f7f1f0d76dd6 | 573 | mqtt_client->mqttOptions.messageRetain = mqttOptions->messageRetain; |
| XinZhangMS | 0:f7f1f0d76dd6 | 574 | mqtt_client->mqttOptions.useCleanSession = mqttOptions->useCleanSession; |
| XinZhangMS | 0:f7f1f0d76dd6 | 575 | mqtt_client->mqttOptions.qualityOfServiceValue = mqttOptions->qualityOfServiceValue; |
| XinZhangMS | 0:f7f1f0d76dd6 | 576 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 577 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 578 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 579 | clear_mqtt_options(mqtt_client); |
| XinZhangMS | 0:f7f1f0d76dd6 | 580 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 581 | return result; |
| XinZhangMS | 0:f7f1f0d76dd6 | 582 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 583 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 584 | static void recvCompleteCallback(void* context, CONTROL_PACKET_TYPE packet, int flags, BUFFER_HANDLE headerData) |
| XinZhangMS | 0:f7f1f0d76dd6 | 585 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 586 | MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context; |
| XinZhangMS | 0:f7f1f0d76dd6 | 587 | if ((mqtt_client != NULL && headerData != NULL) || packet == PINGRESP_TYPE) |
| XinZhangMS | 0:f7f1f0d76dd6 | 588 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 589 | size_t len = BUFFER_length(headerData); |
| XinZhangMS | 0:f7f1f0d76dd6 | 590 | uint8_t* iterator = BUFFER_u_char(headerData); |
| XinZhangMS | 0:f7f1f0d76dd6 | 591 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 592 | logIncomingRawTrace(mqtt_client, packet, (uint8_t)flags, iterator, len); |
| XinZhangMS | 0:f7f1f0d76dd6 | 593 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 594 | if ((iterator != NULL && len > 0) || packet == PINGRESP_TYPE) |
| XinZhangMS | 0:f7f1f0d76dd6 | 595 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 596 | switch (packet) |
| XinZhangMS | 0:f7f1f0d76dd6 | 597 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 598 | case CONNACK_TYPE: |
| XinZhangMS | 0:f7f1f0d76dd6 | 599 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 600 | if (mqtt_client->fnOperationCallback != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 601 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 602 | STRING_HANDLE trace_log = NULL; |
| XinZhangMS | 0:f7f1f0d76dd6 | 603 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 604 | /*Codes_SRS_MQTT_CLIENT_07_028: [If the actionResult parameter is of type CONNECT_ACK then the msgInfo value shall be a CONNECT_ACK structure.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 605 | CONNECT_ACK connack = { 0 }; |
| XinZhangMS | 0:f7f1f0d76dd6 | 606 | connack.isSessionPresent = (byteutil_readByte(&iterator) == 0x1) ? true : false; |
| XinZhangMS | 0:f7f1f0d76dd6 | 607 | uint8_t rc = byteutil_readByte(&iterator); |
| XinZhangMS | 0:f7f1f0d76dd6 | 608 | connack.returnCode = |
| XinZhangMS | 0:f7f1f0d76dd6 | 609 | (rc < ((uint8_t)CONN_REFUSED_UNKNOWN)) ? |
| XinZhangMS | 0:f7f1f0d76dd6 | 610 | (CONNECT_RETURN_CODE)rc : CONN_REFUSED_UNKNOWN; |
| XinZhangMS | 0:f7f1f0d76dd6 | 611 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 612 | if (mqtt_client->logTrace) |
| XinZhangMS | 0:f7f1f0d76dd6 | 613 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 614 | trace_log = STRING_construct_sprintf("CONNACK | SESSION_PRESENT: %s | RETURN_CODE: 0x%x", connack.isSessionPresent ? TRUE_CONST : FALSE_CONST, connack.returnCode); |
| XinZhangMS | 0:f7f1f0d76dd6 | 615 | log_incoming_trace(mqtt_client, trace_log); |
| XinZhangMS | 0:f7f1f0d76dd6 | 616 | STRING_delete(trace_log); |
| XinZhangMS | 0:f7f1f0d76dd6 | 617 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 618 | mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_CONNACK, (void*)&connack, mqtt_client->ctx); |
| XinZhangMS | 0:f7f1f0d76dd6 | 619 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 620 | if (connack.returnCode == CONNECTION_ACCEPTED) |
| XinZhangMS | 0:f7f1f0d76dd6 | 621 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 622 | mqtt_client->clientConnected = true; |
| XinZhangMS | 0:f7f1f0d76dd6 | 623 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 624 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 625 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 626 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 627 | LOG(AZ_LOG_ERROR, LOG_LINE, "fnOperationCallback NULL"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 628 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 629 | break; |
| XinZhangMS | 0:f7f1f0d76dd6 | 630 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 631 | case PUBLISH_TYPE: |
| XinZhangMS | 0:f7f1f0d76dd6 | 632 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 633 | if (mqtt_client->fnMessageRecv != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 634 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 635 | STRING_HANDLE trace_log = NULL; |
| XinZhangMS | 0:f7f1f0d76dd6 | 636 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 637 | bool isDuplicateMsg = (flags & DUPLICATE_FLAG_MASK) ? true : false; |
| XinZhangMS | 0:f7f1f0d76dd6 | 638 | bool isRetainMsg = (flags & RETAIN_FLAG_MASK) ? true : false; |
| XinZhangMS | 0:f7f1f0d76dd6 | 639 | QOS_VALUE qosValue = (flags == 0) ? DELIVER_AT_MOST_ONCE : (flags & QOS_LEAST_ONCE_FLAG_MASK) ? DELIVER_AT_LEAST_ONCE : DELIVER_EXACTLY_ONCE; |
| XinZhangMS | 0:f7f1f0d76dd6 | 640 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 641 | if (mqtt_client->logTrace) |
| XinZhangMS | 0:f7f1f0d76dd6 | 642 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 643 | trace_log = STRING_construct_sprintf("PUBLISH | IS_DUP: %s | RETAIN: %d | QOS: %s", isDuplicateMsg ? TRUE_CONST : FALSE_CONST, |
| XinZhangMS | 0:f7f1f0d76dd6 | 644 | isRetainMsg ? 1 : 0, ENUM_TO_STRING(QOS_VALUE, qosValue) ); |
| XinZhangMS | 0:f7f1f0d76dd6 | 645 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 646 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 647 | uint8_t* initialPos = iterator; |
| XinZhangMS | 0:f7f1f0d76dd6 | 648 | size_t length = len - (iterator - initialPos); |
| XinZhangMS | 0:f7f1f0d76dd6 | 649 | char* topicName = byteutil_readUTF(&iterator, &length); |
| XinZhangMS | 0:f7f1f0d76dd6 | 650 | if (topicName == NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 651 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 652 | LOG(AZ_LOG_ERROR, LOG_LINE, "Publish MSG: failure reading topic name"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 653 | set_error_callback(mqtt_client, MQTT_CLIENT_PARSE_ERROR); |
| XinZhangMS | 0:f7f1f0d76dd6 | 654 | if (trace_log != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 655 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 656 | STRING_delete(trace_log); |
| XinZhangMS | 0:f7f1f0d76dd6 | 657 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 658 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 659 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 660 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 661 | if (mqtt_client->logTrace) |
| XinZhangMS | 0:f7f1f0d76dd6 | 662 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 663 | STRING_sprintf(trace_log, " | TOPIC_NAME: %s", topicName); |
| XinZhangMS | 0:f7f1f0d76dd6 | 664 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 665 | uint16_t packetId = 0; |
| XinZhangMS | 0:f7f1f0d76dd6 | 666 | length = len - (iterator - initialPos); |
| XinZhangMS | 0:f7f1f0d76dd6 | 667 | if (qosValue != DELIVER_AT_MOST_ONCE) |
| XinZhangMS | 0:f7f1f0d76dd6 | 668 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 669 | packetId = byteutil_read_uint16(&iterator, length); |
| XinZhangMS | 0:f7f1f0d76dd6 | 670 | if (mqtt_client->logTrace) |
| XinZhangMS | 0:f7f1f0d76dd6 | 671 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 672 | STRING_sprintf(trace_log, " | PACKET_ID: %"PRIu16, packetId); |
| XinZhangMS | 0:f7f1f0d76dd6 | 673 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 674 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 675 | length = len - (iterator - initialPos); |
| XinZhangMS | 0:f7f1f0d76dd6 | 676 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 677 | MQTT_MESSAGE_HANDLE msgHandle = mqttmessage_create(packetId, topicName, qosValue, iterator, length); |
| XinZhangMS | 0:f7f1f0d76dd6 | 678 | if (msgHandle == NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 679 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 680 | LOG(AZ_LOG_ERROR, LOG_LINE, "failure in mqttmessage_create"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 681 | set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR); |
| XinZhangMS | 0:f7f1f0d76dd6 | 682 | if (trace_log != NULL) { |
| XinZhangMS | 0:f7f1f0d76dd6 | 683 | STRING_delete(trace_log); |
| XinZhangMS | 0:f7f1f0d76dd6 | 684 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 685 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 686 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 687 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 688 | if (mqttmessage_setIsDuplicateMsg(msgHandle, isDuplicateMsg) != 0 || |
| XinZhangMS | 0:f7f1f0d76dd6 | 689 | mqttmessage_setIsRetained(msgHandle, isRetainMsg) != 0) |
| XinZhangMS | 0:f7f1f0d76dd6 | 690 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 691 | LOG(AZ_LOG_ERROR, LOG_LINE, "failure setting mqtt message property"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 692 | set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR); |
| XinZhangMS | 0:f7f1f0d76dd6 | 693 | if (trace_log != NULL) { |
| XinZhangMS | 0:f7f1f0d76dd6 | 694 | STRING_delete(trace_log); |
| XinZhangMS | 0:f7f1f0d76dd6 | 695 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 696 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 697 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 698 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 699 | if (mqtt_client->logTrace) |
| XinZhangMS | 0:f7f1f0d76dd6 | 700 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 701 | STRING_sprintf(trace_log, " | PAYLOAD_LEN: %zu", length); |
| XinZhangMS | 0:f7f1f0d76dd6 | 702 | log_incoming_trace(mqtt_client, trace_log); |
| XinZhangMS | 0:f7f1f0d76dd6 | 703 | STRING_delete(trace_log); |
| XinZhangMS | 0:f7f1f0d76dd6 | 704 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 705 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 706 | mqtt_client->fnMessageRecv(msgHandle, mqtt_client->ctx); |
| XinZhangMS | 0:f7f1f0d76dd6 | 707 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 708 | BUFFER_HANDLE pubRel = NULL; |
| XinZhangMS | 0:f7f1f0d76dd6 | 709 | if (qosValue == DELIVER_EXACTLY_ONCE) |
| XinZhangMS | 0:f7f1f0d76dd6 | 710 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 711 | pubRel = mqtt_codec_publishReceived(packetId); |
| XinZhangMS | 0:f7f1f0d76dd6 | 712 | if (pubRel == NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 713 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 714 | LOG(AZ_LOG_ERROR, LOG_LINE, "Failed to allocate publish receive message."); |
| XinZhangMS | 0:f7f1f0d76dd6 | 715 | set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR); |
| XinZhangMS | 0:f7f1f0d76dd6 | 716 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 717 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 718 | else if (qosValue == DELIVER_AT_LEAST_ONCE) |
| XinZhangMS | 0:f7f1f0d76dd6 | 719 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 720 | pubRel = mqtt_codec_publishAck(packetId); |
| XinZhangMS | 0:f7f1f0d76dd6 | 721 | if (pubRel == NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 722 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 723 | LOG(AZ_LOG_ERROR, LOG_LINE, "Failed to allocate publish ack message."); |
| XinZhangMS | 0:f7f1f0d76dd6 | 724 | set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR); |
| XinZhangMS | 0:f7f1f0d76dd6 | 725 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 726 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 727 | if (pubRel != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 728 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 729 | size_t size = BUFFER_length(pubRel); |
| XinZhangMS | 0:f7f1f0d76dd6 | 730 | (void)sendPacketItem(mqtt_client, BUFFER_u_char(pubRel), size); |
| XinZhangMS | 0:f7f1f0d76dd6 | 731 | BUFFER_delete(pubRel); |
| XinZhangMS | 0:f7f1f0d76dd6 | 732 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 733 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 734 | mqttmessage_destroy(msgHandle); |
| XinZhangMS | 0:f7f1f0d76dd6 | 735 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 736 | free(topicName); |
| XinZhangMS | 0:f7f1f0d76dd6 | 737 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 738 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 739 | break; |
| XinZhangMS | 0:f7f1f0d76dd6 | 740 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 741 | case PUBACK_TYPE: |
| XinZhangMS | 0:f7f1f0d76dd6 | 742 | case PUBREC_TYPE: |
| XinZhangMS | 0:f7f1f0d76dd6 | 743 | case PUBREL_TYPE: |
| XinZhangMS | 0:f7f1f0d76dd6 | 744 | case PUBCOMP_TYPE: |
| XinZhangMS | 0:f7f1f0d76dd6 | 745 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 746 | if (mqtt_client->fnOperationCallback) |
| XinZhangMS | 0:f7f1f0d76dd6 | 747 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 748 | STRING_HANDLE trace_log = NULL; |
| XinZhangMS | 0:f7f1f0d76dd6 | 749 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 750 | /*Codes_SRS_MQTT_CLIENT_07_029: [If the actionResult parameter are of types PUBACK_TYPE, PUBREC_TYPE, PUBREL_TYPE or PUBCOMP_TYPE then the msgInfo value shall be a PUBLISH_ACK structure.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 751 | MQTT_CLIENT_EVENT_RESULT action = (packet == PUBACK_TYPE) ? MQTT_CLIENT_ON_PUBLISH_ACK : |
| XinZhangMS | 0:f7f1f0d76dd6 | 752 | (packet == PUBREC_TYPE) ? MQTT_CLIENT_ON_PUBLISH_RECV : |
| XinZhangMS | 0:f7f1f0d76dd6 | 753 | (packet == PUBREL_TYPE) ? MQTT_CLIENT_ON_PUBLISH_REL : MQTT_CLIENT_ON_PUBLISH_COMP; |
| XinZhangMS | 0:f7f1f0d76dd6 | 754 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 755 | PUBLISH_ACK publish_ack = { 0 }; |
| XinZhangMS | 0:f7f1f0d76dd6 | 756 | publish_ack.packetId = byteutil_read_uint16(&iterator, len); |
| XinZhangMS | 0:f7f1f0d76dd6 | 757 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 758 | if (mqtt_client->logTrace) |
| XinZhangMS | 0:f7f1f0d76dd6 | 759 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 760 | trace_log = STRING_construct_sprintf("%s | PACKET_ID: %"PRIu16, packet == PUBACK_TYPE ? "PUBACK" : (packet == PUBREC_TYPE) ? "PUBREC" : (packet == PUBREL_TYPE) ? "PUBREL" : "PUBCOMP", |
| XinZhangMS | 0:f7f1f0d76dd6 | 761 | publish_ack.packetId); |
| XinZhangMS | 0:f7f1f0d76dd6 | 762 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 763 | log_incoming_trace(mqtt_client, trace_log); |
| XinZhangMS | 0:f7f1f0d76dd6 | 764 | STRING_delete(trace_log); |
| XinZhangMS | 0:f7f1f0d76dd6 | 765 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 766 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 767 | BUFFER_HANDLE pubRel = NULL; |
| XinZhangMS | 0:f7f1f0d76dd6 | 768 | mqtt_client->fnOperationCallback(mqtt_client, action, (void*)&publish_ack, mqtt_client->ctx); |
| XinZhangMS | 0:f7f1f0d76dd6 | 769 | if (packet == PUBREC_TYPE) |
| XinZhangMS | 0:f7f1f0d76dd6 | 770 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 771 | pubRel = mqtt_codec_publishRelease(publish_ack.packetId); |
| XinZhangMS | 0:f7f1f0d76dd6 | 772 | if (pubRel == NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 773 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 774 | LOG(AZ_LOG_ERROR, LOG_LINE, "Failed to allocate publish release message."); |
| XinZhangMS | 0:f7f1f0d76dd6 | 775 | set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR); |
| XinZhangMS | 0:f7f1f0d76dd6 | 776 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 777 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 778 | else if (packet == PUBREL_TYPE) |
| XinZhangMS | 0:f7f1f0d76dd6 | 779 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 780 | pubRel = mqtt_codec_publishComplete(publish_ack.packetId); |
| XinZhangMS | 0:f7f1f0d76dd6 | 781 | if (pubRel == NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 782 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 783 | LOG(AZ_LOG_ERROR, LOG_LINE, "Failed to allocate publish complete message."); |
| XinZhangMS | 0:f7f1f0d76dd6 | 784 | set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR); |
| XinZhangMS | 0:f7f1f0d76dd6 | 785 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 786 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 787 | if (pubRel != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 788 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 789 | size_t size = BUFFER_length(pubRel); |
| XinZhangMS | 0:f7f1f0d76dd6 | 790 | (void)sendPacketItem(mqtt_client, BUFFER_u_char(pubRel), size); |
| XinZhangMS | 0:f7f1f0d76dd6 | 791 | BUFFER_delete(pubRel); |
| XinZhangMS | 0:f7f1f0d76dd6 | 792 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 793 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 794 | break; |
| XinZhangMS | 0:f7f1f0d76dd6 | 795 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 796 | case SUBACK_TYPE: |
| XinZhangMS | 0:f7f1f0d76dd6 | 797 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 798 | if (mqtt_client->fnOperationCallback) |
| XinZhangMS | 0:f7f1f0d76dd6 | 799 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 800 | STRING_HANDLE trace_log = NULL; |
| XinZhangMS | 0:f7f1f0d76dd6 | 801 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 802 | /*Codes_SRS_MQTT_CLIENT_07_030: [If the actionResult parameter is of type SUBACK_TYPE then the msgInfo value shall be a SUBSCRIBE_ACK structure.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 803 | SUBSCRIBE_ACK suback = { 0 }; |
| XinZhangMS | 0:f7f1f0d76dd6 | 804 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 805 | size_t remainLen = len; |
| XinZhangMS | 0:f7f1f0d76dd6 | 806 | suback.packetId = byteutil_read_uint16(&iterator, len); |
| XinZhangMS | 0:f7f1f0d76dd6 | 807 | remainLen -= 2; |
| XinZhangMS | 0:f7f1f0d76dd6 | 808 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 809 | if (mqtt_client->logTrace) |
| XinZhangMS | 0:f7f1f0d76dd6 | 810 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 811 | trace_log = STRING_construct_sprintf("SUBACK | PACKET_ID: %"PRIu16, suback.packetId); |
| XinZhangMS | 0:f7f1f0d76dd6 | 812 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 813 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 814 | // Allocate the remaining len |
| XinZhangMS | 0:f7f1f0d76dd6 | 815 | suback.qosReturn = (QOS_VALUE*)malloc(sizeof(QOS_VALUE)*remainLen); |
| XinZhangMS | 0:f7f1f0d76dd6 | 816 | if (suback.qosReturn != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 817 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 818 | while (remainLen > 0) |
| XinZhangMS | 0:f7f1f0d76dd6 | 819 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 820 | uint8_t qosRet = byteutil_readByte(&iterator); |
| XinZhangMS | 0:f7f1f0d76dd6 | 821 | suback.qosReturn[suback.qosCount++] = |
| XinZhangMS | 0:f7f1f0d76dd6 | 822 | (qosRet <= ((uint8_t)DELIVER_EXACTLY_ONCE)) ? |
| XinZhangMS | 0:f7f1f0d76dd6 | 823 | (QOS_VALUE)qosRet : DELIVER_FAILURE; |
| XinZhangMS | 0:f7f1f0d76dd6 | 824 | remainLen--; |
| XinZhangMS | 0:f7f1f0d76dd6 | 825 | if (mqtt_client->logTrace) |
| XinZhangMS | 0:f7f1f0d76dd6 | 826 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 827 | STRING_sprintf(trace_log, " | RETURN_CODE: %"PRIu16, suback.qosReturn[suback.qosCount-1]); |
| XinZhangMS | 0:f7f1f0d76dd6 | 828 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 829 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 830 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 831 | if (mqtt_client->logTrace) |
| XinZhangMS | 0:f7f1f0d76dd6 | 832 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 833 | log_incoming_trace(mqtt_client, trace_log); |
| XinZhangMS | 0:f7f1f0d76dd6 | 834 | STRING_delete(trace_log); |
| XinZhangMS | 0:f7f1f0d76dd6 | 835 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 836 | mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_SUBSCRIBE_ACK, (void*)&suback, mqtt_client->ctx); |
| XinZhangMS | 0:f7f1f0d76dd6 | 837 | free(suback.qosReturn); |
| XinZhangMS | 0:f7f1f0d76dd6 | 838 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 839 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 840 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 841 | LOG(AZ_LOG_ERROR, LOG_LINE, "allocation of quality of service value failed."); |
| XinZhangMS | 0:f7f1f0d76dd6 | 842 | set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR); |
| XinZhangMS | 0:f7f1f0d76dd6 | 843 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 844 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 845 | break; |
| XinZhangMS | 0:f7f1f0d76dd6 | 846 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 847 | case UNSUBACK_TYPE: |
| XinZhangMS | 0:f7f1f0d76dd6 | 848 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 849 | if (mqtt_client->fnOperationCallback) |
| XinZhangMS | 0:f7f1f0d76dd6 | 850 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 851 | STRING_HANDLE trace_log = NULL; |
| XinZhangMS | 0:f7f1f0d76dd6 | 852 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 853 | /*Codes_SRS_MQTT_CLIENT_07_031: [If the actionResult parameter is of type UNSUBACK_TYPE then the msgInfo value shall be a UNSUBSCRIBE_ACK structure.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 854 | UNSUBSCRIBE_ACK unsuback = { 0 }; |
| XinZhangMS | 0:f7f1f0d76dd6 | 855 | unsuback.packetId = byteutil_read_uint16(&iterator, len); |
| XinZhangMS | 0:f7f1f0d76dd6 | 856 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 857 | if (mqtt_client->logTrace) |
| XinZhangMS | 0:f7f1f0d76dd6 | 858 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 859 | trace_log = STRING_construct_sprintf("UNSUBACK | PACKET_ID: %"PRIu16, unsuback.packetId); |
| XinZhangMS | 0:f7f1f0d76dd6 | 860 | log_incoming_trace(mqtt_client, trace_log); |
| XinZhangMS | 0:f7f1f0d76dd6 | 861 | STRING_delete(trace_log); |
| XinZhangMS | 0:f7f1f0d76dd6 | 862 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 863 | mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_UNSUBSCRIBE_ACK, (void*)&unsuback, mqtt_client->ctx); |
| XinZhangMS | 0:f7f1f0d76dd6 | 864 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 865 | break; |
| XinZhangMS | 0:f7f1f0d76dd6 | 866 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 867 | case PINGRESP_TYPE: |
| XinZhangMS | 0:f7f1f0d76dd6 | 868 | mqtt_client->timeSincePing = 0; |
| XinZhangMS | 0:f7f1f0d76dd6 | 869 | if (mqtt_client->logTrace) |
| XinZhangMS | 0:f7f1f0d76dd6 | 870 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 871 | STRING_HANDLE trace_log = STRING_construct_sprintf("PINGRESP"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 872 | log_incoming_trace(mqtt_client, trace_log); |
| XinZhangMS | 0:f7f1f0d76dd6 | 873 | STRING_delete(trace_log); |
| XinZhangMS | 0:f7f1f0d76dd6 | 874 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 875 | // Forward ping response to operation callback |
| XinZhangMS | 0:f7f1f0d76dd6 | 876 | if (mqtt_client->fnOperationCallback) |
| XinZhangMS | 0:f7f1f0d76dd6 | 877 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 878 | mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_PING_RESPONSE, NULL, mqtt_client->ctx); |
| XinZhangMS | 0:f7f1f0d76dd6 | 879 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 880 | break; |
| XinZhangMS | 0:f7f1f0d76dd6 | 881 | default: |
| XinZhangMS | 0:f7f1f0d76dd6 | 882 | break; |
| XinZhangMS | 0:f7f1f0d76dd6 | 883 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 884 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 885 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 886 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 887 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 888 | MQTT_CLIENT_HANDLE mqtt_client_init(ON_MQTT_MESSAGE_RECV_CALLBACK msgRecv, ON_MQTT_OPERATION_CALLBACK opCallback, void* opCallbackCtx, ON_MQTT_ERROR_CALLBACK onErrorCallBack, void* errorCBCtx) |
| XinZhangMS | 0:f7f1f0d76dd6 | 889 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 890 | MQTT_CLIENT* result; |
| XinZhangMS | 0:f7f1f0d76dd6 | 891 | /*Codes_SRS_MQTT_CLIENT_07_001: [If the parameters ON_MQTT_MESSAGE_RECV_CALLBACK is NULL then mqttclient_init shall return NULL.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 892 | if (msgRecv == NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 893 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 894 | result = NULL; |
| XinZhangMS | 0:f7f1f0d76dd6 | 895 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 896 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 897 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 898 | result = malloc(sizeof(MQTT_CLIENT)); |
| XinZhangMS | 0:f7f1f0d76dd6 | 899 | if (result == NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 900 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 901 | /*Codes_SRS_MQTT_CLIENT_07_002: [If any failure is encountered then mqttclient_init shall return NULL.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 902 | LOG(AZ_LOG_ERROR, LOG_LINE, "mqtt_client_init failure: Allocation Failure"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 903 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 904 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 905 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 906 | memset(result, 0, sizeof(MQTT_CLIENT)); |
| XinZhangMS | 0:f7f1f0d76dd6 | 907 | /*Codes_SRS_MQTT_CLIENT_07_003: [mqttclient_init shall allocate MQTTCLIENT_DATA_INSTANCE and return the MQTTCLIENT_HANDLE on success.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 908 | result->packetState = UNKNOWN_TYPE; |
| XinZhangMS | 0:f7f1f0d76dd6 | 909 | result->fnOperationCallback = opCallback; |
| XinZhangMS | 0:f7f1f0d76dd6 | 910 | result->ctx = opCallbackCtx; |
| XinZhangMS | 0:f7f1f0d76dd6 | 911 | result->fnMessageRecv = msgRecv; |
| XinZhangMS | 0:f7f1f0d76dd6 | 912 | result->fnOnErrorCallBack = onErrorCallBack; |
| XinZhangMS | 0:f7f1f0d76dd6 | 913 | result->errorCBCtx = errorCBCtx; |
| XinZhangMS | 0:f7f1f0d76dd6 | 914 | result->qosValue = DELIVER_AT_MOST_ONCE; |
| XinZhangMS | 0:f7f1f0d76dd6 | 915 | result->packetTickCntr = tickcounter_create(); |
| XinZhangMS | 0:f7f1f0d76dd6 | 916 | result->maxPingRespTime = DEFAULT_MAX_PING_RESPONSE_TIME; |
| XinZhangMS | 0:f7f1f0d76dd6 | 917 | if (result->packetTickCntr == NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 918 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 919 | /*Codes_SRS_MQTT_CLIENT_07_002: [If any failure is encountered then mqttclient_init shall return NULL.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 920 | LOG(AZ_LOG_ERROR, LOG_LINE, "mqtt_client_init failure: tickcounter_create failure"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 921 | free(result); |
| XinZhangMS | 0:f7f1f0d76dd6 | 922 | result = NULL; |
| XinZhangMS | 0:f7f1f0d76dd6 | 923 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 924 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 925 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 926 | result->codec_handle = mqtt_codec_create(recvCompleteCallback, result); |
| XinZhangMS | 0:f7f1f0d76dd6 | 927 | if (result->codec_handle == NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 928 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 929 | /*Codes_SRS_MQTT_CLIENT_07_002: [If any failure is encountered then mqttclient_init shall return NULL.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 930 | LOG(AZ_LOG_ERROR, LOG_LINE, "mqtt_client_init failure: mqtt_codec_create failure"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 931 | tickcounter_destroy(result->packetTickCntr); |
| XinZhangMS | 0:f7f1f0d76dd6 | 932 | free(result); |
| XinZhangMS | 0:f7f1f0d76dd6 | 933 | result = NULL; |
| XinZhangMS | 0:f7f1f0d76dd6 | 934 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 935 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 936 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 937 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 938 | return result; |
| XinZhangMS | 0:f7f1f0d76dd6 | 939 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 940 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 941 | void mqtt_client_deinit(MQTT_CLIENT_HANDLE handle) |
| XinZhangMS | 0:f7f1f0d76dd6 | 942 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 943 | /*Codes_SRS_MQTT_CLIENT_07_004: [If the parameter handle is NULL then function mqtt_client_deinit shall do nothing.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 944 | if (handle != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 945 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 946 | /*Codes_SRS_MQTT_CLIENT_07_005: [mqtt_client_deinit shall deallocate all memory allocated in this unit.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 947 | MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle; |
| XinZhangMS | 0:f7f1f0d76dd6 | 948 | tickcounter_destroy(mqtt_client->packetTickCntr); |
| XinZhangMS | 0:f7f1f0d76dd6 | 949 | mqtt_codec_destroy(mqtt_client->codec_handle); |
| XinZhangMS | 0:f7f1f0d76dd6 | 950 | clear_mqtt_options(mqtt_client); |
| XinZhangMS | 0:f7f1f0d76dd6 | 951 | free(mqtt_client); |
| XinZhangMS | 0:f7f1f0d76dd6 | 952 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 953 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 954 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 955 | int mqtt_client_connect(MQTT_CLIENT_HANDLE handle, XIO_HANDLE xioHandle, MQTT_CLIENT_OPTIONS* mqttOptions) |
| XinZhangMS | 0:f7f1f0d76dd6 | 956 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 957 | int result; |
| XinZhangMS | 0:f7f1f0d76dd6 | 958 | /*SRS_MQTT_CLIENT_07_006: [If any of the parameters handle, ioHandle, or mqttOptions are NULL then mqtt_client_connect shall return a non-zero value.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 959 | if (handle == NULL || mqttOptions == NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 960 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 961 | LOG(AZ_LOG_ERROR, LOG_LINE, "mqtt_client_connect: NULL argument (handle = %p, mqttOptions = %p)", handle, mqttOptions); |
| XinZhangMS | 0:f7f1f0d76dd6 | 962 | result = __FAILURE__; |
| XinZhangMS | 0:f7f1f0d76dd6 | 963 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 964 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 965 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 966 | MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle; |
| XinZhangMS | 0:f7f1f0d76dd6 | 967 | if (xioHandle == NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 968 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 969 | /*Codes_SRS_MQTT_CLIENT_07_007: [If any failure is encountered then mqtt_client_connect shall return a non-zero value.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 970 | LOG(AZ_LOG_ERROR, LOG_LINE, "Error: mqttcodec_connect failed"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 971 | result = __FAILURE__; |
| XinZhangMS | 0:f7f1f0d76dd6 | 972 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 973 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 974 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 975 | mqtt_client->xioHandle = xioHandle; |
| XinZhangMS | 0:f7f1f0d76dd6 | 976 | mqtt_client->packetState = UNKNOWN_TYPE; |
| XinZhangMS | 0:f7f1f0d76dd6 | 977 | mqtt_client->qosValue = mqttOptions->qualityOfServiceValue; |
| XinZhangMS | 0:f7f1f0d76dd6 | 978 | mqtt_client->keepAliveInterval = mqttOptions->keepAliveInterval; |
| XinZhangMS | 0:f7f1f0d76dd6 | 979 | mqtt_client->maxPingRespTime = (DEFAULT_MAX_PING_RESPONSE_TIME < mqttOptions->keepAliveInterval/2) ? DEFAULT_MAX_PING_RESPONSE_TIME : mqttOptions->keepAliveInterval/2; |
| XinZhangMS | 0:f7f1f0d76dd6 | 980 | if (cloneMqttOptions(mqtt_client, mqttOptions) != 0) |
| XinZhangMS | 0:f7f1f0d76dd6 | 981 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 982 | LOG(AZ_LOG_ERROR, LOG_LINE, "Error: Clone Mqtt Options failed"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 983 | result = __FAILURE__; |
| XinZhangMS | 0:f7f1f0d76dd6 | 984 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 985 | /*Codes_SRS_MQTT_CLIENT_07_008: [mqtt_client_connect shall open the XIO_HANDLE by calling into the xio_open interface.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 986 | else if (xio_open(xioHandle, onOpenComplete, mqtt_client, onBytesReceived, mqtt_client, onIoError, mqtt_client) != 0) |
| XinZhangMS | 0:f7f1f0d76dd6 | 987 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 988 | /*Codes_SRS_MQTT_CLIENT_07_007: [If any failure is encountered then mqtt_client_connect shall return a non-zero value.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 989 | LOG(AZ_LOG_ERROR, LOG_LINE, "Error: io_open failed"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 990 | result = __FAILURE__; |
| XinZhangMS | 0:f7f1f0d76dd6 | 991 | // Remove cloned options |
| XinZhangMS | 0:f7f1f0d76dd6 | 992 | clear_mqtt_options(mqtt_client); |
| XinZhangMS | 0:f7f1f0d76dd6 | 993 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 994 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 995 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 996 | result = 0; |
| XinZhangMS | 0:f7f1f0d76dd6 | 997 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 998 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 999 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1000 | return result; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1001 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1002 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 1003 | int mqtt_client_publish(MQTT_CLIENT_HANDLE handle, MQTT_MESSAGE_HANDLE msgHandle) |
| XinZhangMS | 0:f7f1f0d76dd6 | 1004 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1005 | int result; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1006 | MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1007 | if (mqtt_client == NULL || msgHandle == NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 1008 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1009 | /*Codes_SRS_MQTT_CLIENT_07_019: [If one of the parameters handle or msgHandle is NULL then mqtt_client_publish shall return a non-zero value.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 1010 | LogError("Invalid parameter specified mqtt_client: %p, msgHandle: %p", mqtt_client, msgHandle); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1011 | result = __FAILURE__; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1012 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1013 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 1014 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1015 | /*Codes_SRS_MQTT_CLIENT_07_021: [mqtt_client_publish shall get the message information from the MQTT_MESSAGE_HANDLE.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 1016 | const APP_PAYLOAD* payload = mqttmessage_getApplicationMsg(msgHandle); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1017 | if (payload == NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 1018 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1019 | /*Codes_SRS_MQTT_CLIENT_07_020: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 1020 | LOG(AZ_LOG_ERROR, LOG_LINE, "Error: mqttmessage_getApplicationMsg failed"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1021 | result = __FAILURE__; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1022 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1023 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 1024 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1025 | STRING_HANDLE trace_log = construct_trace_log_handle(mqtt_client); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1026 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 1027 | QOS_VALUE qos = mqttmessage_getQosType(msgHandle); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1028 | bool isDuplicate = mqttmessage_getIsDuplicateMsg(msgHandle); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1029 | bool isRetained = mqttmessage_getIsRetained(msgHandle); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1030 | uint16_t packetId = mqttmessage_getPacketId(msgHandle); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1031 | const char* topicName = mqttmessage_getTopicName(msgHandle); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1032 | BUFFER_HANDLE publishPacket = mqtt_codec_publish(qos, isDuplicate, isRetained, packetId, topicName, payload->message, payload->length, trace_log); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1033 | if (publishPacket == NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 1034 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1035 | /*Codes_SRS_MQTT_CLIENT_07_020: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 1036 | LOG(AZ_LOG_ERROR, LOG_LINE, "Error: mqtt_codec_publish failed"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1037 | result = __FAILURE__; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1038 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1039 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 1040 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1041 | mqtt_client->packetState = PUBLISH_TYPE; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1042 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 1043 | /*Codes_SRS_MQTT_CLIENT_07_022: [On success mqtt_client_publish shall send the MQTT SUBCRIBE packet to the endpoint.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 1044 | size_t size = BUFFER_length(publishPacket); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1045 | if (sendPacketItem(mqtt_client, BUFFER_u_char(publishPacket), size) != 0) |
| XinZhangMS | 0:f7f1f0d76dd6 | 1046 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1047 | /*Codes_SRS_MQTT_CLIENT_07_020: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 1048 | LOG(AZ_LOG_ERROR, LOG_LINE, "Error: mqtt_client_publish send failed"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1049 | result = __FAILURE__; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1050 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1051 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 1052 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1053 | log_outgoing_trace(mqtt_client, trace_log); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1054 | result = 0; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1055 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1056 | BUFFER_delete(publishPacket); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1057 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1058 | if (trace_log != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 1059 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1060 | STRING_delete(trace_log); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1061 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1062 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1063 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1064 | return result; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1065 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1066 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 1067 | int mqtt_client_subscribe(MQTT_CLIENT_HANDLE handle, uint16_t packetId, SUBSCRIBE_PAYLOAD* subscribeList, size_t count) |
| XinZhangMS | 0:f7f1f0d76dd6 | 1068 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1069 | int result; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1070 | MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1071 | if (mqtt_client == NULL || subscribeList == NULL || count == 0 || packetId == 0) |
| XinZhangMS | 0:f7f1f0d76dd6 | 1072 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1073 | /*Codes_SRS_MQTT_CLIENT_07_013: [If any of the parameters handle, subscribeList is NULL or count is 0 then mqtt_client_subscribe shall return a non-zero value.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 1074 | LogError("Invalid parameter specified mqtt_client: %p, subscribeList: %p, count: %d, packetId: %d", mqtt_client, subscribeList, count, packetId); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1075 | result = __FAILURE__; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1076 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1077 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 1078 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1079 | STRING_HANDLE trace_log = construct_trace_log_handle(mqtt_client); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1080 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 1081 | BUFFER_HANDLE subPacket = mqtt_codec_subscribe(packetId, subscribeList, count, trace_log); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1082 | if (subPacket == NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 1083 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1084 | /*Codes_SRS_MQTT_CLIENT_07_014: [If any failure is encountered then mqtt_client_subscribe shall return a non-zero value.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 1085 | LOG(AZ_LOG_ERROR, LOG_LINE, "Error: mqtt_codec_subscribe failed"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1086 | result = __FAILURE__; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1087 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1088 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 1089 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1090 | mqtt_client->packetState = SUBSCRIBE_TYPE; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1091 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 1092 | size_t size = BUFFER_length(subPacket); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1093 | /*Codes_SRS_MQTT_CLIENT_07_015: [On success mqtt_client_subscribe shall send the MQTT SUBCRIBE packet to the endpoint.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 1094 | if (sendPacketItem(mqtt_client, BUFFER_u_char(subPacket), size) != 0) |
| XinZhangMS | 0:f7f1f0d76dd6 | 1095 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1096 | /*Codes_SRS_MQTT_CLIENT_07_014: [If any failure is encountered then mqtt_client_subscribe shall return a non-zero value.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 1097 | LOG(AZ_LOG_ERROR, LOG_LINE, "Error: mqtt_client_subscribe send failed"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1098 | result = __FAILURE__; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1099 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1100 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 1101 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1102 | log_outgoing_trace(mqtt_client, trace_log); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1103 | result = 0; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1104 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1105 | BUFFER_delete(subPacket); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1106 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1107 | if (trace_log != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 1108 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1109 | STRING_delete(trace_log); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1110 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1111 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1112 | return result; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1113 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1114 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 1115 | int mqtt_client_unsubscribe(MQTT_CLIENT_HANDLE handle, uint16_t packetId, const char** unsubscribeList, size_t count) |
| XinZhangMS | 0:f7f1f0d76dd6 | 1116 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1117 | int result; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1118 | MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1119 | if (mqtt_client == NULL || unsubscribeList == NULL || count == 0 || packetId == 0) |
| XinZhangMS | 0:f7f1f0d76dd6 | 1120 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1121 | /*Codes_SRS_MQTT_CLIENT_07_016: [If any of the parameters handle, unsubscribeList is NULL or count is 0 then mqtt_client_unsubscribe shall return a non-zero value.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 1122 | LogError("Invalid parameter specified mqtt_client: %p, unsubscribeList: %p, count: %d, packetId: %d", mqtt_client, unsubscribeList, count, packetId); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1123 | result = __FAILURE__; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1124 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1125 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 1126 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1127 | STRING_HANDLE trace_log = construct_trace_log_handle(mqtt_client); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1128 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 1129 | BUFFER_HANDLE unsubPacket = mqtt_codec_unsubscribe(packetId, unsubscribeList, count, trace_log); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1130 | if (unsubPacket == NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 1131 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1132 | /*Codes_SRS_MQTT_CLIENT_07_017: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 1133 | LOG(AZ_LOG_ERROR, LOG_LINE, "Error: mqtt_codec_unsubscribe failed"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1134 | result = __FAILURE__; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1135 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1136 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 1137 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1138 | mqtt_client->packetState = UNSUBSCRIBE_TYPE; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1139 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 1140 | size_t size = BUFFER_length(unsubPacket); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1141 | /*Codes_SRS_MQTT_CLIENT_07_018: [On success mqtt_client_unsubscribe shall send the MQTT SUBCRIBE packet to the endpoint.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 1142 | if (sendPacketItem(mqtt_client, BUFFER_u_char(unsubPacket), size) != 0) |
| XinZhangMS | 0:f7f1f0d76dd6 | 1143 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1144 | /*Codes_SRS_MQTT_CLIENT_07_017: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.].]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 1145 | LOG(AZ_LOG_ERROR, LOG_LINE, "Error: mqtt_client_unsubscribe send failed"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1146 | result = __FAILURE__; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1147 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1148 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 1149 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1150 | log_outgoing_trace(mqtt_client, trace_log); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1151 | result = 0; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1152 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1153 | BUFFER_delete(unsubPacket); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1154 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1155 | if (trace_log != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 1156 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1157 | STRING_delete(trace_log); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1158 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1159 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1160 | return result; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1161 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1162 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 1163 | int mqtt_client_disconnect(MQTT_CLIENT_HANDLE handle, ON_MQTT_DISCONNECTED_CALLBACK callback, void* ctx) |
| XinZhangMS | 0:f7f1f0d76dd6 | 1164 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1165 | int result; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1166 | MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1167 | if (mqtt_client == NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 1168 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1169 | /*Codes_SRS_MQTT_CLIENT_07_010: [If the parameters handle is NULL then mqtt_client_disconnect shall return a non-zero value.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 1170 | result = __FAILURE__; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1171 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1172 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 1173 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1174 | if (mqtt_client->clientConnected) |
| XinZhangMS | 0:f7f1f0d76dd6 | 1175 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1176 | BUFFER_HANDLE disconnectPacket = mqtt_codec_disconnect(); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1177 | if (disconnectPacket == NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 1178 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1179 | /*Codes_SRS_MQTT_CLIENT_07_011: [If any failure is encountered then mqtt_client_disconnect shall return a non-zero value.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 1180 | LOG(AZ_LOG_ERROR, LOG_LINE, "Error: mqtt_client_disconnect failed"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1181 | mqtt_client->packetState = PACKET_TYPE_ERROR; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1182 | result = __FAILURE__; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1183 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1184 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 1185 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1186 | /* Codes_SRS_MQTT_CLIENT_07_037: [ if callback is not NULL callback shall be called once the mqtt connection has been disconnected ] */ |
| XinZhangMS | 0:f7f1f0d76dd6 | 1187 | mqtt_client->disconnect_cb = callback; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1188 | mqtt_client->disconnect_ctx = ctx; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1189 | mqtt_client->packetState = DISCONNECT_TYPE; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1190 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 1191 | size_t size = BUFFER_length(disconnectPacket); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1192 | /*Codes_SRS_MQTT_CLIENT_07_012: [On success mqtt_client_disconnect shall send the MQTT DISCONNECT packet to the endpoint.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 1193 | if (sendPacketItem(mqtt_client, BUFFER_u_char(disconnectPacket), size) != 0) |
| XinZhangMS | 0:f7f1f0d76dd6 | 1194 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1195 | /*Codes_SRS_MQTT_CLIENT_07_011: [If any failure is encountered then mqtt_client_disconnect shall return a non-zero value.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 1196 | LOG(AZ_LOG_ERROR, LOG_LINE, "Error: mqtt_client_disconnect send failed"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1197 | result = __FAILURE__; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1198 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1199 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 1200 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1201 | if (mqtt_client->logTrace) |
| XinZhangMS | 0:f7f1f0d76dd6 | 1202 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1203 | STRING_HANDLE trace_log = STRING_construct("DISCONNECT"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1204 | log_outgoing_trace(mqtt_client, trace_log); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1205 | STRING_delete(trace_log); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1206 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1207 | result = 0; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1208 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1209 | BUFFER_delete(disconnectPacket); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1210 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1211 | clear_mqtt_options(mqtt_client); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1212 | mqtt_client->xioHandle = NULL; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1213 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1214 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 1215 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1216 | // If the client is not connected then just close the underlying socket |
| XinZhangMS | 0:f7f1f0d76dd6 | 1217 | mqtt_client->disconnect_cb = callback; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1218 | mqtt_client->disconnect_ctx = ctx; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1219 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 1220 | close_connection(mqtt_client); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1221 | clear_mqtt_options(mqtt_client); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1222 | result = 0; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1223 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1224 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1225 | return result; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1226 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1227 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 1228 | void mqtt_client_dowork(MQTT_CLIENT_HANDLE handle) |
| XinZhangMS | 0:f7f1f0d76dd6 | 1229 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1230 | MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1231 | /*Codes_SRS_MQTT_CLIENT_18_001: [If the client is disconnected, mqtt_client_dowork shall do nothing.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 1232 | /*Codes_SRS_MQTT_CLIENT_07_023: [If the parameter handle is NULL then mqtt_client_dowork shall do nothing.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 1233 | if (mqtt_client != NULL && mqtt_client->xioHandle != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 1234 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1235 | /*Codes_SRS_MQTT_CLIENT_07_024: [mqtt_client_dowork shall call the xio_dowork function to complete operations.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 1236 | xio_dowork(mqtt_client->xioHandle); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1237 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 1238 | /*Codes_SRS_MQTT_CLIENT_07_025: [mqtt_client_dowork shall retrieve the the last packet send value and ...]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 1239 | if (mqtt_client->socketConnected && mqtt_client->clientConnected && mqtt_client->keepAliveInterval > 0) |
| XinZhangMS | 0:f7f1f0d76dd6 | 1240 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1241 | tickcounter_ms_t current_ms; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1242 | if (tickcounter_get_current_ms(mqtt_client->packetTickCntr, ¤t_ms) != 0) |
| XinZhangMS | 0:f7f1f0d76dd6 | 1243 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1244 | LOG(AZ_LOG_ERROR, LOG_LINE, "Error: tickcounter_get_current_ms failed"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1245 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1246 | else |
| XinZhangMS | 0:f7f1f0d76dd6 | 1247 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1248 | /* Codes_SRS_MQTT_CLIENT_07_035: [If the timeSincePing has expired past the maxPingRespTime then mqtt_client_dowork shall call the Error Callback function with the message MQTT_CLIENT_NO_PING_RESPONSE] */ |
| XinZhangMS | 0:f7f1f0d76dd6 | 1249 | if (mqtt_client->timeSincePing > 0 && ((current_ms - mqtt_client->timeSincePing)/1000) > mqtt_client->maxPingRespTime) |
| XinZhangMS | 0:f7f1f0d76dd6 | 1250 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1251 | // We haven't gotten a ping response in the alloted time |
| XinZhangMS | 0:f7f1f0d76dd6 | 1252 | set_error_callback(mqtt_client, MQTT_CLIENT_NO_PING_RESPONSE); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1253 | mqtt_client->timeSincePing = 0; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1254 | mqtt_client->packetSendTimeMs = 0; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1255 | mqtt_client->packetState = UNKNOWN_TYPE; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1256 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1257 | else if (((current_ms - mqtt_client->packetSendTimeMs) / 1000) >= mqtt_client->keepAliveInterval) |
| XinZhangMS | 0:f7f1f0d76dd6 | 1258 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1259 | /*Codes_SRS_MQTT_CLIENT_07_026: [if keepAliveInternal is > 0 and the send time is greater than the MQTT KeepAliveInterval then it shall construct an MQTT PINGREQ packet.]*/ |
| XinZhangMS | 0:f7f1f0d76dd6 | 1260 | BUFFER_HANDLE pingPacket = mqtt_codec_ping(); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1261 | if (pingPacket != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 1262 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1263 | size_t size = BUFFER_length(pingPacket); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1264 | (void)sendPacketItem(mqtt_client, BUFFER_u_char(pingPacket), size); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1265 | BUFFER_delete(pingPacket); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1266 | (void)tickcounter_get_current_ms(mqtt_client->packetTickCntr, &mqtt_client->timeSincePing); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1267 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 1268 | if (mqtt_client->logTrace) |
| XinZhangMS | 0:f7f1f0d76dd6 | 1269 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1270 | STRING_HANDLE trace_log = STRING_construct("PINGREQ"); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1271 | log_outgoing_trace(mqtt_client, trace_log); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1272 | STRING_delete(trace_log); |
| XinZhangMS | 0:f7f1f0d76dd6 | 1273 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1274 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1275 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1276 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1277 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1278 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1279 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1280 | |
| XinZhangMS | 0:f7f1f0d76dd6 | 1281 | void mqtt_client_set_trace(MQTT_CLIENT_HANDLE handle, bool traceOn, bool rawBytesOn) |
| XinZhangMS | 0:f7f1f0d76dd6 | 1282 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1283 | MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1284 | if (mqtt_client != NULL) |
| XinZhangMS | 0:f7f1f0d76dd6 | 1285 | { |
| XinZhangMS | 0:f7f1f0d76dd6 | 1286 | mqtt_client->logTrace = traceOn; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1287 | mqtt_client->rawBytesTrace = rawBytesOn; |
| XinZhangMS | 0:f7f1f0d76dd6 | 1288 | } |
| XinZhangMS | 0:f7f1f0d76dd6 | 1289 | } |