Xin Zhang / azure-iot-c-sdk-f767zi

Dependents:   samplemqtt

Committer:
XinZhangMS
Date:
Thu Aug 23 06:52:14 2018 +0000
Revision:
0:f7f1f0d76dd6
azure-c-sdk for mbed os supporting NUCLEO_F767ZI

Who changed what in which revision?

UserRevisionLine numberNew contents of line
XinZhangMS 0:f7f1f0d76dd6 1 // Copyright (c) Microsoft. All rights reserved.
XinZhangMS 0:f7f1f0d76dd6 2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
XinZhangMS 0:f7f1f0d76dd6 3
XinZhangMS 0:f7f1f0d76dd6 4 #include <stdlib.h>
XinZhangMS 0:f7f1f0d76dd6 5 #include "azure_c_shared_utility/optimize_size.h"
XinZhangMS 0:f7f1f0d76dd6 6 #include "azure_c_shared_utility/gballoc.h"
XinZhangMS 0:f7f1f0d76dd6 7 #include "azure_c_shared_utility/platform.h"
XinZhangMS 0:f7f1f0d76dd6 8 #include "azure_c_shared_utility/tickcounter.h"
XinZhangMS 0:f7f1f0d76dd6 9 #include "azure_c_shared_utility/crt_abstractions.h"
XinZhangMS 0:f7f1f0d76dd6 10 #include "azure_c_shared_utility/xlogging.h"
XinZhangMS 0:f7f1f0d76dd6 11 #include "azure_c_shared_utility/strings.h"
XinZhangMS 0:f7f1f0d76dd6 12 #include "azure_c_shared_utility/agenttime.h"
XinZhangMS 0:f7f1f0d76dd6 13 #include "azure_c_shared_utility/threadapi.h"
XinZhangMS 0:f7f1f0d76dd6 14
XinZhangMS 0:f7f1f0d76dd6 15 #include "azure_umqtt_c/mqtt_client.h"
XinZhangMS 0:f7f1f0d76dd6 16 #include "azure_umqtt_c/mqtt_codec.h"
XinZhangMS 0:f7f1f0d76dd6 17 #include <inttypes.h>
XinZhangMS 0:f7f1f0d76dd6 18
XinZhangMS 0:f7f1f0d76dd6 19 #define VARIABLE_HEADER_OFFSET 2
XinZhangMS 0:f7f1f0d76dd6 20 #define RETAIN_FLAG_MASK 0x1
XinZhangMS 0:f7f1f0d76dd6 21 #define QOS_LEAST_ONCE_FLAG_MASK 0x2
XinZhangMS 0:f7f1f0d76dd6 22 #define QOS_EXACTLY_ONCE_FLAG_MASK 0x4
XinZhangMS 0:f7f1f0d76dd6 23 #define DUPLICATE_FLAG_MASK 0x8
XinZhangMS 0:f7f1f0d76dd6 24 #define CONNECT_PACKET_MASK 0xf0
XinZhangMS 0:f7f1f0d76dd6 25 #define TIME_MAX_BUFFER 16
XinZhangMS 0:f7f1f0d76dd6 26 #define DEFAULT_MAX_PING_RESPONSE_TIME 80 // % of time to send pings
XinZhangMS 0:f7f1f0d76dd6 27 #define MAX_CLOSE_RETRIES 2
XinZhangMS 0:f7f1f0d76dd6 28
XinZhangMS 0:f7f1f0d76dd6 29 static const char* const TRUE_CONST = "true";
XinZhangMS 0:f7f1f0d76dd6 30 static const char* const FALSE_CONST = "false";
XinZhangMS 0:f7f1f0d76dd6 31
XinZhangMS 0:f7f1f0d76dd6 32 DEFINE_ENUM_STRINGS(QOS_VALUE, QOS_VALUE_VALUES);
XinZhangMS 0:f7f1f0d76dd6 33
XinZhangMS 0:f7f1f0d76dd6 34 typedef struct MQTT_CLIENT_TAG
XinZhangMS 0:f7f1f0d76dd6 35 {
XinZhangMS 0:f7f1f0d76dd6 36 XIO_HANDLE xioHandle;
XinZhangMS 0:f7f1f0d76dd6 37 MQTTCODEC_HANDLE codec_handle;
XinZhangMS 0:f7f1f0d76dd6 38 CONTROL_PACKET_TYPE packetState;
XinZhangMS 0:f7f1f0d76dd6 39 TICK_COUNTER_HANDLE packetTickCntr;
XinZhangMS 0:f7f1f0d76dd6 40 tickcounter_ms_t packetSendTimeMs;
XinZhangMS 0:f7f1f0d76dd6 41 ON_MQTT_OPERATION_CALLBACK fnOperationCallback;
XinZhangMS 0:f7f1f0d76dd6 42 ON_MQTT_MESSAGE_RECV_CALLBACK fnMessageRecv;
XinZhangMS 0:f7f1f0d76dd6 43 void* ctx;
XinZhangMS 0:f7f1f0d76dd6 44 ON_MQTT_ERROR_CALLBACK fnOnErrorCallBack;
XinZhangMS 0:f7f1f0d76dd6 45 void* errorCBCtx;
XinZhangMS 0:f7f1f0d76dd6 46 ON_MQTT_DISCONNECTED_CALLBACK disconnect_cb;
XinZhangMS 0:f7f1f0d76dd6 47 void* disconnect_ctx;
XinZhangMS 0:f7f1f0d76dd6 48 QOS_VALUE qosValue;
XinZhangMS 0:f7f1f0d76dd6 49 uint16_t keepAliveInterval;
XinZhangMS 0:f7f1f0d76dd6 50 MQTT_CLIENT_OPTIONS mqttOptions;
XinZhangMS 0:f7f1f0d76dd6 51 bool clientConnected;
XinZhangMS 0:f7f1f0d76dd6 52 bool socketConnected;
XinZhangMS 0:f7f1f0d76dd6 53 bool logTrace;
XinZhangMS 0:f7f1f0d76dd6 54 bool rawBytesTrace;
XinZhangMS 0:f7f1f0d76dd6 55 tickcounter_ms_t timeSincePing;
XinZhangMS 0:f7f1f0d76dd6 56 uint16_t maxPingRespTime;
XinZhangMS 0:f7f1f0d76dd6 57 } MQTT_CLIENT;
XinZhangMS 0:f7f1f0d76dd6 58
XinZhangMS 0:f7f1f0d76dd6 59 static void on_connection_closed(void* context)
XinZhangMS 0:f7f1f0d76dd6 60 {
XinZhangMS 0:f7f1f0d76dd6 61 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
XinZhangMS 0:f7f1f0d76dd6 62 if (mqtt_client != NULL)
XinZhangMS 0:f7f1f0d76dd6 63 {
XinZhangMS 0:f7f1f0d76dd6 64 mqtt_client->socketConnected = false;
XinZhangMS 0:f7f1f0d76dd6 65 mqtt_client->clientConnected = false;
XinZhangMS 0:f7f1f0d76dd6 66 if (mqtt_client->disconnect_cb)
XinZhangMS 0:f7f1f0d76dd6 67 {
XinZhangMS 0:f7f1f0d76dd6 68 mqtt_client->disconnect_cb(mqtt_client->disconnect_ctx);
XinZhangMS 0:f7f1f0d76dd6 69 }
XinZhangMS 0:f7f1f0d76dd6 70 }
XinZhangMS 0:f7f1f0d76dd6 71 }
XinZhangMS 0:f7f1f0d76dd6 72
XinZhangMS 0:f7f1f0d76dd6 73 static void close_connection(MQTT_CLIENT* mqtt_client)
XinZhangMS 0:f7f1f0d76dd6 74 {
XinZhangMS 0:f7f1f0d76dd6 75 if (mqtt_client->socketConnected)
XinZhangMS 0:f7f1f0d76dd6 76 {
XinZhangMS 0:f7f1f0d76dd6 77 (void)xio_close(mqtt_client->xioHandle, on_connection_closed, mqtt_client);
XinZhangMS 0:f7f1f0d76dd6 78 if (mqtt_client->disconnect_cb == NULL)
XinZhangMS 0:f7f1f0d76dd6 79 {
XinZhangMS 0:f7f1f0d76dd6 80 size_t counter = 0;
XinZhangMS 0:f7f1f0d76dd6 81 do
XinZhangMS 0:f7f1f0d76dd6 82 {
XinZhangMS 0:f7f1f0d76dd6 83 xio_dowork(mqtt_client->xioHandle);
XinZhangMS 0:f7f1f0d76dd6 84 counter++;
XinZhangMS 0:f7f1f0d76dd6 85 ThreadAPI_Sleep(2);
XinZhangMS 0:f7f1f0d76dd6 86 } while (mqtt_client->clientConnected && counter < MAX_CLOSE_RETRIES);
XinZhangMS 0:f7f1f0d76dd6 87 }
XinZhangMS 0:f7f1f0d76dd6 88 }
XinZhangMS 0:f7f1f0d76dd6 89 else
XinZhangMS 0:f7f1f0d76dd6 90 {
XinZhangMS 0:f7f1f0d76dd6 91 if (mqtt_client->disconnect_cb)
XinZhangMS 0:f7f1f0d76dd6 92 {
XinZhangMS 0:f7f1f0d76dd6 93 mqtt_client->disconnect_cb(mqtt_client->disconnect_ctx);
XinZhangMS 0:f7f1f0d76dd6 94 }
XinZhangMS 0:f7f1f0d76dd6 95 }
XinZhangMS 0:f7f1f0d76dd6 96 mqtt_client->xioHandle = NULL;
XinZhangMS 0:f7f1f0d76dd6 97 }
XinZhangMS 0:f7f1f0d76dd6 98
XinZhangMS 0:f7f1f0d76dd6 99 static void set_error_callback(MQTT_CLIENT* mqtt_client, MQTT_CLIENT_EVENT_ERROR error_type)
XinZhangMS 0:f7f1f0d76dd6 100 {
XinZhangMS 0:f7f1f0d76dd6 101 if (mqtt_client->fnOnErrorCallBack)
XinZhangMS 0:f7f1f0d76dd6 102 {
XinZhangMS 0:f7f1f0d76dd6 103 mqtt_client->fnOnErrorCallBack(mqtt_client, error_type, mqtt_client->errorCBCtx);
XinZhangMS 0:f7f1f0d76dd6 104 }
XinZhangMS 0:f7f1f0d76dd6 105 close_connection(mqtt_client);
XinZhangMS 0:f7f1f0d76dd6 106 }
XinZhangMS 0:f7f1f0d76dd6 107
XinZhangMS 0:f7f1f0d76dd6 108 static STRING_HANDLE construct_trace_log_handle(MQTT_CLIENT* mqtt_client)
XinZhangMS 0:f7f1f0d76dd6 109 {
XinZhangMS 0:f7f1f0d76dd6 110 STRING_HANDLE trace_log;
XinZhangMS 0:f7f1f0d76dd6 111 if (mqtt_client->logTrace)
XinZhangMS 0:f7f1f0d76dd6 112 {
XinZhangMS 0:f7f1f0d76dd6 113 trace_log = STRING_new();
XinZhangMS 0:f7f1f0d76dd6 114 }
XinZhangMS 0:f7f1f0d76dd6 115 else
XinZhangMS 0:f7f1f0d76dd6 116 {
XinZhangMS 0:f7f1f0d76dd6 117 trace_log = NULL;
XinZhangMS 0:f7f1f0d76dd6 118 }
XinZhangMS 0:f7f1f0d76dd6 119 return trace_log;
XinZhangMS 0:f7f1f0d76dd6 120 }
XinZhangMS 0:f7f1f0d76dd6 121
XinZhangMS 0:f7f1f0d76dd6 122 static uint16_t byteutil_read_uint16(uint8_t** buffer, size_t len)
XinZhangMS 0:f7f1f0d76dd6 123 {
XinZhangMS 0:f7f1f0d76dd6 124 uint16_t result = 0;
XinZhangMS 0:f7f1f0d76dd6 125 if (buffer != NULL && *buffer != NULL && len >= 2)
XinZhangMS 0:f7f1f0d76dd6 126 {
XinZhangMS 0:f7f1f0d76dd6 127 result = 256 * (**buffer) + (*(*buffer + 1));
XinZhangMS 0:f7f1f0d76dd6 128 *buffer += 2; // Move the ptr
XinZhangMS 0:f7f1f0d76dd6 129 }
XinZhangMS 0:f7f1f0d76dd6 130 else
XinZhangMS 0:f7f1f0d76dd6 131 {
XinZhangMS 0:f7f1f0d76dd6 132 LOG(AZ_LOG_ERROR, LOG_LINE, "byteutil_read_uint16 == NULL or less than 2");
XinZhangMS 0:f7f1f0d76dd6 133 }
XinZhangMS 0:f7f1f0d76dd6 134 return result;
XinZhangMS 0:f7f1f0d76dd6 135 }
XinZhangMS 0:f7f1f0d76dd6 136
XinZhangMS 0:f7f1f0d76dd6 137 static char* byteutil_readUTF(uint8_t** buffer, size_t* byteLen)
XinZhangMS 0:f7f1f0d76dd6 138 {
XinZhangMS 0:f7f1f0d76dd6 139 char* result = NULL;
XinZhangMS 0:f7f1f0d76dd6 140 if (buffer != NULL)
XinZhangMS 0:f7f1f0d76dd6 141 {
XinZhangMS 0:f7f1f0d76dd6 142 // Get the length of the string
XinZhangMS 0:f7f1f0d76dd6 143 uint16_t len = byteutil_read_uint16(buffer, *byteLen);
XinZhangMS 0:f7f1f0d76dd6 144 if (len > 0)
XinZhangMS 0:f7f1f0d76dd6 145 {
XinZhangMS 0:f7f1f0d76dd6 146 result = (char*)malloc(len + 1);
XinZhangMS 0:f7f1f0d76dd6 147 if (result != NULL)
XinZhangMS 0:f7f1f0d76dd6 148 {
XinZhangMS 0:f7f1f0d76dd6 149 (void)memcpy(result, *buffer, len);
XinZhangMS 0:f7f1f0d76dd6 150 result[len] = '\0';
XinZhangMS 0:f7f1f0d76dd6 151 *buffer += len;
XinZhangMS 0:f7f1f0d76dd6 152 if (byteLen != NULL)
XinZhangMS 0:f7f1f0d76dd6 153 {
XinZhangMS 0:f7f1f0d76dd6 154 *byteLen = len;
XinZhangMS 0:f7f1f0d76dd6 155 }
XinZhangMS 0:f7f1f0d76dd6 156 }
XinZhangMS 0:f7f1f0d76dd6 157 }
XinZhangMS 0:f7f1f0d76dd6 158 }
XinZhangMS 0:f7f1f0d76dd6 159 else
XinZhangMS 0:f7f1f0d76dd6 160 {
XinZhangMS 0:f7f1f0d76dd6 161 LOG(AZ_LOG_ERROR, LOG_LINE, "readByte buffer == NULL.");
XinZhangMS 0:f7f1f0d76dd6 162 }
XinZhangMS 0:f7f1f0d76dd6 163 return result;
XinZhangMS 0:f7f1f0d76dd6 164 }
XinZhangMS 0:f7f1f0d76dd6 165
XinZhangMS 0:f7f1f0d76dd6 166 static uint8_t byteutil_readByte(uint8_t** buffer)
XinZhangMS 0:f7f1f0d76dd6 167 {
XinZhangMS 0:f7f1f0d76dd6 168 uint8_t result = 0;
XinZhangMS 0:f7f1f0d76dd6 169 if (buffer != NULL)
XinZhangMS 0:f7f1f0d76dd6 170 {
XinZhangMS 0:f7f1f0d76dd6 171 result = **buffer;
XinZhangMS 0:f7f1f0d76dd6 172 (*buffer)++;
XinZhangMS 0:f7f1f0d76dd6 173 }
XinZhangMS 0:f7f1f0d76dd6 174 else
XinZhangMS 0:f7f1f0d76dd6 175 {
XinZhangMS 0:f7f1f0d76dd6 176 LOG(AZ_LOG_ERROR, LOG_LINE, "readByte buffer == NULL.");
XinZhangMS 0:f7f1f0d76dd6 177 }
XinZhangMS 0:f7f1f0d76dd6 178 return result;
XinZhangMS 0:f7f1f0d76dd6 179 }
XinZhangMS 0:f7f1f0d76dd6 180
XinZhangMS 0:f7f1f0d76dd6 181 static void sendComplete(void* context, IO_SEND_RESULT send_result)
XinZhangMS 0:f7f1f0d76dd6 182 {
XinZhangMS 0:f7f1f0d76dd6 183 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
XinZhangMS 0:f7f1f0d76dd6 184 if (mqtt_client != NULL)
XinZhangMS 0:f7f1f0d76dd6 185 {
XinZhangMS 0:f7f1f0d76dd6 186 if (send_result == IO_SEND_OK)
XinZhangMS 0:f7f1f0d76dd6 187 {
XinZhangMS 0:f7f1f0d76dd6 188 if (mqtt_client->packetState == DISCONNECT_TYPE)
XinZhangMS 0:f7f1f0d76dd6 189 {
XinZhangMS 0:f7f1f0d76dd6 190 /*Codes_SRS_MQTT_CLIENT_07_032: [If the actionResult parameter is of type MQTT_CLIENT_ON_DISCONNECT the the msgInfo value shall be NULL.]*/
XinZhangMS 0:f7f1f0d76dd6 191 if (mqtt_client->fnOperationCallback != NULL)
XinZhangMS 0:f7f1f0d76dd6 192 {
XinZhangMS 0:f7f1f0d76dd6 193 mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_DISCONNECT, NULL, mqtt_client->ctx);
XinZhangMS 0:f7f1f0d76dd6 194 }
XinZhangMS 0:f7f1f0d76dd6 195 // close the xio
XinZhangMS 0:f7f1f0d76dd6 196 close_connection(mqtt_client);
XinZhangMS 0:f7f1f0d76dd6 197 }
XinZhangMS 0:f7f1f0d76dd6 198 }
XinZhangMS 0:f7f1f0d76dd6 199 else if (send_result == IO_SEND_ERROR)
XinZhangMS 0:f7f1f0d76dd6 200 {
XinZhangMS 0:f7f1f0d76dd6 201 LOG(AZ_LOG_ERROR, LOG_LINE, "MQTT Send Complete Failure send_result: %d", (int)send_result);
XinZhangMS 0:f7f1f0d76dd6 202 set_error_callback(mqtt_client, MQTT_CLIENT_COMMUNICATION_ERROR);
XinZhangMS 0:f7f1f0d76dd6 203 }
XinZhangMS 0:f7f1f0d76dd6 204 }
XinZhangMS 0:f7f1f0d76dd6 205 else
XinZhangMS 0:f7f1f0d76dd6 206 {
XinZhangMS 0:f7f1f0d76dd6 207 LOG(AZ_LOG_ERROR, LOG_LINE, "MQTT Send Complete Failure with NULL mqtt_client");
XinZhangMS 0:f7f1f0d76dd6 208 }
XinZhangMS 0:f7f1f0d76dd6 209 }
XinZhangMS 0:f7f1f0d76dd6 210
XinZhangMS 0:f7f1f0d76dd6 211 #ifndef NO_LOGGING
XinZhangMS 0:f7f1f0d76dd6 212 static const char* retrievePacketType(CONTROL_PACKET_TYPE packet)
XinZhangMS 0:f7f1f0d76dd6 213 {
XinZhangMS 0:f7f1f0d76dd6 214 switch (packet&CONNECT_PACKET_MASK)
XinZhangMS 0:f7f1f0d76dd6 215 {
XinZhangMS 0:f7f1f0d76dd6 216 case CONNECT_TYPE: return "CONNECT";
XinZhangMS 0:f7f1f0d76dd6 217 case CONNACK_TYPE: return "CONNACK";
XinZhangMS 0:f7f1f0d76dd6 218 case PUBLISH_TYPE: return "PUBLISH";
XinZhangMS 0:f7f1f0d76dd6 219 case PUBACK_TYPE: return "PUBACK";
XinZhangMS 0:f7f1f0d76dd6 220 case PUBREC_TYPE: return "PUBREC";
XinZhangMS 0:f7f1f0d76dd6 221 case PUBREL_TYPE: return "PUBREL";
XinZhangMS 0:f7f1f0d76dd6 222 case SUBSCRIBE_TYPE: return "SUBSCRIBE";
XinZhangMS 0:f7f1f0d76dd6 223 case SUBACK_TYPE: return "SUBACK";
XinZhangMS 0:f7f1f0d76dd6 224 case UNSUBSCRIBE_TYPE: return "UNSUBSCRIBE";
XinZhangMS 0:f7f1f0d76dd6 225 case UNSUBACK_TYPE: return "UNSUBACK";
XinZhangMS 0:f7f1f0d76dd6 226 case PINGREQ_TYPE: return "PINGREQ";
XinZhangMS 0:f7f1f0d76dd6 227 case PINGRESP_TYPE: return "PINGRESP";
XinZhangMS 0:f7f1f0d76dd6 228 case DISCONNECT_TYPE: return "DISCONNECT";
XinZhangMS 0:f7f1f0d76dd6 229 default:
XinZhangMS 0:f7f1f0d76dd6 230 case PACKET_TYPE_ERROR:
XinZhangMS 0:f7f1f0d76dd6 231 case UNKNOWN_TYPE:
XinZhangMS 0:f7f1f0d76dd6 232 return "UNKNOWN";
XinZhangMS 0:f7f1f0d76dd6 233 }
XinZhangMS 0:f7f1f0d76dd6 234 }
XinZhangMS 0:f7f1f0d76dd6 235 #endif // NO_LOGGING
XinZhangMS 0:f7f1f0d76dd6 236
XinZhangMS 0:f7f1f0d76dd6 237 static void getLogTime(char* timeResult, size_t len)
XinZhangMS 0:f7f1f0d76dd6 238 {
XinZhangMS 0:f7f1f0d76dd6 239 if (timeResult != NULL)
XinZhangMS 0:f7f1f0d76dd6 240 {
XinZhangMS 0:f7f1f0d76dd6 241 time_t agent_time = get_time(NULL);
XinZhangMS 0:f7f1f0d76dd6 242 if (agent_time == (time_t)-1)
XinZhangMS 0:f7f1f0d76dd6 243 {
XinZhangMS 0:f7f1f0d76dd6 244 timeResult[0] = '\0';
XinZhangMS 0:f7f1f0d76dd6 245 }
XinZhangMS 0:f7f1f0d76dd6 246 else
XinZhangMS 0:f7f1f0d76dd6 247 {
XinZhangMS 0:f7f1f0d76dd6 248 struct tm* tmInfo = localtime(&agent_time);
XinZhangMS 0:f7f1f0d76dd6 249 if (tmInfo == NULL)
XinZhangMS 0:f7f1f0d76dd6 250 {
XinZhangMS 0:f7f1f0d76dd6 251 timeResult[0] = '\0';
XinZhangMS 0:f7f1f0d76dd6 252 }
XinZhangMS 0:f7f1f0d76dd6 253 else
XinZhangMS 0:f7f1f0d76dd6 254 {
XinZhangMS 0:f7f1f0d76dd6 255 if (strftime(timeResult, len, "%H:%M:%S", tmInfo) == 0)
XinZhangMS 0:f7f1f0d76dd6 256 {
XinZhangMS 0:f7f1f0d76dd6 257 timeResult[0] = '\0';
XinZhangMS 0:f7f1f0d76dd6 258 }
XinZhangMS 0:f7f1f0d76dd6 259 }
XinZhangMS 0:f7f1f0d76dd6 260 }
XinZhangMS 0:f7f1f0d76dd6 261 }
XinZhangMS 0:f7f1f0d76dd6 262 }
XinZhangMS 0:f7f1f0d76dd6 263
XinZhangMS 0:f7f1f0d76dd6 264 static void log_outgoing_trace(MQTT_CLIENT* mqtt_client, STRING_HANDLE trace_log)
XinZhangMS 0:f7f1f0d76dd6 265 {
XinZhangMS 0:f7f1f0d76dd6 266 if (mqtt_client != NULL && mqtt_client->logTrace && trace_log != NULL)
XinZhangMS 0:f7f1f0d76dd6 267 {
XinZhangMS 0:f7f1f0d76dd6 268 char tmBuffer[TIME_MAX_BUFFER];
XinZhangMS 0:f7f1f0d76dd6 269 getLogTime(tmBuffer, TIME_MAX_BUFFER);
XinZhangMS 0:f7f1f0d76dd6 270 LOG(AZ_LOG_TRACE, LOG_LINE, "-> %s %s", tmBuffer, STRING_c_str(trace_log));
XinZhangMS 0:f7f1f0d76dd6 271 }
XinZhangMS 0:f7f1f0d76dd6 272 }
XinZhangMS 0:f7f1f0d76dd6 273
XinZhangMS 0:f7f1f0d76dd6 274 static void logOutgoingRawTrace(MQTT_CLIENT* mqtt_client, const uint8_t* data, size_t length)
XinZhangMS 0:f7f1f0d76dd6 275 {
XinZhangMS 0:f7f1f0d76dd6 276 if (mqtt_client != NULL && data != NULL && length > 0 && mqtt_client->rawBytesTrace)
XinZhangMS 0:f7f1f0d76dd6 277 {
XinZhangMS 0:f7f1f0d76dd6 278 char tmBuffer[TIME_MAX_BUFFER];
XinZhangMS 0:f7f1f0d76dd6 279 getLogTime(tmBuffer, TIME_MAX_BUFFER);
XinZhangMS 0:f7f1f0d76dd6 280
XinZhangMS 0:f7f1f0d76dd6 281 LOG(AZ_LOG_TRACE, 0, "-> %s %s: ", tmBuffer, retrievePacketType((unsigned char)data[0]));
XinZhangMS 0:f7f1f0d76dd6 282 size_t index = 0;
XinZhangMS 0:f7f1f0d76dd6 283 for (index = 0; index < length; index++)
XinZhangMS 0:f7f1f0d76dd6 284 {
XinZhangMS 0:f7f1f0d76dd6 285 LOG(AZ_LOG_TRACE, 0, "0x%02x ", data[index]);
XinZhangMS 0:f7f1f0d76dd6 286 }
XinZhangMS 0:f7f1f0d76dd6 287 LOG(AZ_LOG_TRACE, LOG_LINE, "");
XinZhangMS 0:f7f1f0d76dd6 288 }
XinZhangMS 0:f7f1f0d76dd6 289 }
XinZhangMS 0:f7f1f0d76dd6 290
XinZhangMS 0:f7f1f0d76dd6 291 static void log_incoming_trace(MQTT_CLIENT* mqtt_client, STRING_HANDLE trace_log)
XinZhangMS 0:f7f1f0d76dd6 292 {
XinZhangMS 0:f7f1f0d76dd6 293 if (mqtt_client != NULL && mqtt_client->logTrace && trace_log != NULL)
XinZhangMS 0:f7f1f0d76dd6 294 {
XinZhangMS 0:f7f1f0d76dd6 295 char tmBuffer[TIME_MAX_BUFFER];
XinZhangMS 0:f7f1f0d76dd6 296 getLogTime(tmBuffer, TIME_MAX_BUFFER);
XinZhangMS 0:f7f1f0d76dd6 297 LOG(AZ_LOG_TRACE, LOG_LINE, "<- %s %s", tmBuffer, STRING_c_str(trace_log) );
XinZhangMS 0:f7f1f0d76dd6 298 }
XinZhangMS 0:f7f1f0d76dd6 299 }
XinZhangMS 0:f7f1f0d76dd6 300
XinZhangMS 0:f7f1f0d76dd6 301 static void logIncomingRawTrace(MQTT_CLIENT* mqtt_client, CONTROL_PACKET_TYPE packet, uint8_t flags, const uint8_t* data, size_t length)
XinZhangMS 0:f7f1f0d76dd6 302 {
XinZhangMS 0:f7f1f0d76dd6 303 #ifdef NO_LOGGING
XinZhangMS 0:f7f1f0d76dd6 304 UNUSED(flags);
XinZhangMS 0:f7f1f0d76dd6 305 #endif
XinZhangMS 0:f7f1f0d76dd6 306 if (mqtt_client != NULL && mqtt_client->rawBytesTrace)
XinZhangMS 0:f7f1f0d76dd6 307 {
XinZhangMS 0:f7f1f0d76dd6 308 if (data != NULL && length > 0)
XinZhangMS 0:f7f1f0d76dd6 309 {
XinZhangMS 0:f7f1f0d76dd6 310 char tmBuffer[TIME_MAX_BUFFER];
XinZhangMS 0:f7f1f0d76dd6 311 getLogTime(tmBuffer, TIME_MAX_BUFFER);
XinZhangMS 0:f7f1f0d76dd6 312
XinZhangMS 0:f7f1f0d76dd6 313 LOG(AZ_LOG_TRACE, 0, "<- %s %s: 0x%02x 0x%02x ", tmBuffer, retrievePacketType((CONTROL_PACKET_TYPE)packet), (unsigned char)(packet | flags), length);
XinZhangMS 0:f7f1f0d76dd6 314 size_t index = 0;
XinZhangMS 0:f7f1f0d76dd6 315 for (index = 0; index < length; index++)
XinZhangMS 0:f7f1f0d76dd6 316 {
XinZhangMS 0:f7f1f0d76dd6 317 LOG(AZ_LOG_TRACE, 0, "0x%02x ", data[index]);
XinZhangMS 0:f7f1f0d76dd6 318 }
XinZhangMS 0:f7f1f0d76dd6 319 LOG(AZ_LOG_TRACE, LOG_LINE, "");
XinZhangMS 0:f7f1f0d76dd6 320 }
XinZhangMS 0:f7f1f0d76dd6 321 else if (packet == PINGRESP_TYPE)
XinZhangMS 0:f7f1f0d76dd6 322 {
XinZhangMS 0:f7f1f0d76dd6 323 char tmBuffer[TIME_MAX_BUFFER];
XinZhangMS 0:f7f1f0d76dd6 324 getLogTime(tmBuffer, TIME_MAX_BUFFER);
XinZhangMS 0:f7f1f0d76dd6 325 LOG(AZ_LOG_TRACE, LOG_LINE, "<- %s %s: 0x%02x 0x%02x ", tmBuffer, retrievePacketType((CONTROL_PACKET_TYPE)packet), (unsigned char)(packet | flags), length);
XinZhangMS 0:f7f1f0d76dd6 326 }
XinZhangMS 0:f7f1f0d76dd6 327 }
XinZhangMS 0:f7f1f0d76dd6 328 }
XinZhangMS 0:f7f1f0d76dd6 329
XinZhangMS 0:f7f1f0d76dd6 330 static int sendPacketItem(MQTT_CLIENT* mqtt_client, const unsigned char* data, size_t length)
XinZhangMS 0:f7f1f0d76dd6 331 {
XinZhangMS 0:f7f1f0d76dd6 332 int result;
XinZhangMS 0:f7f1f0d76dd6 333
XinZhangMS 0:f7f1f0d76dd6 334 if (tickcounter_get_current_ms(mqtt_client->packetTickCntr, &mqtt_client->packetSendTimeMs) != 0)
XinZhangMS 0:f7f1f0d76dd6 335 {
XinZhangMS 0:f7f1f0d76dd6 336 LOG(AZ_LOG_ERROR, LOG_LINE, "Failure getting current ms tickcounter");
XinZhangMS 0:f7f1f0d76dd6 337 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 338 }
XinZhangMS 0:f7f1f0d76dd6 339 else
XinZhangMS 0:f7f1f0d76dd6 340 {
XinZhangMS 0:f7f1f0d76dd6 341 result = xio_send(mqtt_client->xioHandle, (const void*)data, length, sendComplete, mqtt_client);
XinZhangMS 0:f7f1f0d76dd6 342 if (result != 0)
XinZhangMS 0:f7f1f0d76dd6 343 {
XinZhangMS 0:f7f1f0d76dd6 344 LOG(AZ_LOG_ERROR, LOG_LINE, "%d: Failure sending control packet data", result);
XinZhangMS 0:f7f1f0d76dd6 345 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 346 }
XinZhangMS 0:f7f1f0d76dd6 347 else
XinZhangMS 0:f7f1f0d76dd6 348 {
XinZhangMS 0:f7f1f0d76dd6 349 logOutgoingRawTrace(mqtt_client, (const uint8_t*)data, length);
XinZhangMS 0:f7f1f0d76dd6 350 }
XinZhangMS 0:f7f1f0d76dd6 351 }
XinZhangMS 0:f7f1f0d76dd6 352 return result;
XinZhangMS 0:f7f1f0d76dd6 353 }
XinZhangMS 0:f7f1f0d76dd6 354
XinZhangMS 0:f7f1f0d76dd6 355 static void onOpenComplete(void* context, IO_OPEN_RESULT open_result)
XinZhangMS 0:f7f1f0d76dd6 356 {
XinZhangMS 0:f7f1f0d76dd6 357 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
XinZhangMS 0:f7f1f0d76dd6 358 if (mqtt_client != NULL)
XinZhangMS 0:f7f1f0d76dd6 359 {
XinZhangMS 0:f7f1f0d76dd6 360 if (open_result == IO_OPEN_OK && !mqtt_client->socketConnected)
XinZhangMS 0:f7f1f0d76dd6 361 {
XinZhangMS 0:f7f1f0d76dd6 362 mqtt_client->packetState = CONNECT_TYPE;
XinZhangMS 0:f7f1f0d76dd6 363 mqtt_client->socketConnected = true;
XinZhangMS 0:f7f1f0d76dd6 364
XinZhangMS 0:f7f1f0d76dd6 365 STRING_HANDLE trace_log = construct_trace_log_handle(mqtt_client);
XinZhangMS 0:f7f1f0d76dd6 366
XinZhangMS 0:f7f1f0d76dd6 367 // Send the Connect packet
XinZhangMS 0:f7f1f0d76dd6 368 BUFFER_HANDLE connPacket = mqtt_codec_connect(&mqtt_client->mqttOptions, trace_log);
XinZhangMS 0:f7f1f0d76dd6 369 if (connPacket == NULL)
XinZhangMS 0:f7f1f0d76dd6 370 {
XinZhangMS 0:f7f1f0d76dd6 371 LOG(AZ_LOG_ERROR, LOG_LINE, "Error: mqtt_codec_connect failed");
XinZhangMS 0:f7f1f0d76dd6 372 }
XinZhangMS 0:f7f1f0d76dd6 373 else
XinZhangMS 0:f7f1f0d76dd6 374 {
XinZhangMS 0:f7f1f0d76dd6 375 size_t size = BUFFER_length(connPacket);
XinZhangMS 0:f7f1f0d76dd6 376 /*Codes_SRS_MQTT_CLIENT_07_009: [On success mqtt_client_connect shall send the MQTT CONNECT to the endpoint.]*/
XinZhangMS 0:f7f1f0d76dd6 377 if (sendPacketItem(mqtt_client, BUFFER_u_char(connPacket), size) != 0)
XinZhangMS 0:f7f1f0d76dd6 378 {
XinZhangMS 0:f7f1f0d76dd6 379 LOG(AZ_LOG_ERROR, LOG_LINE, "Error: mqtt_codec_connect failed");
XinZhangMS 0:f7f1f0d76dd6 380 }
XinZhangMS 0:f7f1f0d76dd6 381 else
XinZhangMS 0:f7f1f0d76dd6 382 {
XinZhangMS 0:f7f1f0d76dd6 383 log_outgoing_trace(mqtt_client, trace_log);
XinZhangMS 0:f7f1f0d76dd6 384 }
XinZhangMS 0:f7f1f0d76dd6 385 BUFFER_delete(connPacket);
XinZhangMS 0:f7f1f0d76dd6 386 }
XinZhangMS 0:f7f1f0d76dd6 387 if (trace_log != NULL)
XinZhangMS 0:f7f1f0d76dd6 388 {
XinZhangMS 0:f7f1f0d76dd6 389 STRING_delete(trace_log);
XinZhangMS 0:f7f1f0d76dd6 390 }
XinZhangMS 0:f7f1f0d76dd6 391 }
XinZhangMS 0:f7f1f0d76dd6 392 else
XinZhangMS 0:f7f1f0d76dd6 393 {
XinZhangMS 0:f7f1f0d76dd6 394 if (mqtt_client->socketConnected == false && mqtt_client->fnOnErrorCallBack)
XinZhangMS 0:f7f1f0d76dd6 395 {
XinZhangMS 0:f7f1f0d76dd6 396 mqtt_client->fnOnErrorCallBack(mqtt_client, MQTT_CLIENT_CONNECTION_ERROR, mqtt_client->errorCBCtx);
XinZhangMS 0:f7f1f0d76dd6 397 }
XinZhangMS 0:f7f1f0d76dd6 398 close_connection(mqtt_client);
XinZhangMS 0:f7f1f0d76dd6 399 }
XinZhangMS 0:f7f1f0d76dd6 400 }
XinZhangMS 0:f7f1f0d76dd6 401 else
XinZhangMS 0:f7f1f0d76dd6 402 {
XinZhangMS 0:f7f1f0d76dd6 403 LOG(AZ_LOG_ERROR, LOG_LINE, "Error: mqtt_client is NULL");
XinZhangMS 0:f7f1f0d76dd6 404 }
XinZhangMS 0:f7f1f0d76dd6 405 }
XinZhangMS 0:f7f1f0d76dd6 406
XinZhangMS 0:f7f1f0d76dd6 407 static void onBytesReceived(void* context, const unsigned char* buffer, size_t size)
XinZhangMS 0:f7f1f0d76dd6 408 {
XinZhangMS 0:f7f1f0d76dd6 409 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
XinZhangMS 0:f7f1f0d76dd6 410 if (mqtt_client != NULL)
XinZhangMS 0:f7f1f0d76dd6 411 {
XinZhangMS 0:f7f1f0d76dd6 412 if (mqtt_codec_bytesReceived(mqtt_client->codec_handle, buffer, size) != 0)
XinZhangMS 0:f7f1f0d76dd6 413 {
XinZhangMS 0:f7f1f0d76dd6 414 set_error_callback(mqtt_client, MQTT_CLIENT_PARSE_ERROR);
XinZhangMS 0:f7f1f0d76dd6 415 }
XinZhangMS 0:f7f1f0d76dd6 416 }
XinZhangMS 0:f7f1f0d76dd6 417 else
XinZhangMS 0:f7f1f0d76dd6 418 {
XinZhangMS 0:f7f1f0d76dd6 419 LOG(AZ_LOG_ERROR, LOG_LINE, "Error: mqtt_client is NULL");
XinZhangMS 0:f7f1f0d76dd6 420 }
XinZhangMS 0:f7f1f0d76dd6 421 }
XinZhangMS 0:f7f1f0d76dd6 422
XinZhangMS 0:f7f1f0d76dd6 423 static void onIoError(void* context)
XinZhangMS 0:f7f1f0d76dd6 424 {
XinZhangMS 0:f7f1f0d76dd6 425 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
XinZhangMS 0:f7f1f0d76dd6 426 if (mqtt_client != NULL && mqtt_client->fnOperationCallback)
XinZhangMS 0:f7f1f0d76dd6 427 {
XinZhangMS 0:f7f1f0d76dd6 428 /*Codes_SRS_MQTT_CLIENT_07_032: [If the actionResult parameter is of type MQTT_CLIENT_ON_DISCONNECT the the msgInfo value shall be NULL.]*/
XinZhangMS 0:f7f1f0d76dd6 429 /* Codes_SRS_MQTT_CLIENT_07_036: [ If an error is encountered by the ioHandle the mqtt_client shall call xio_close. ] */
XinZhangMS 0:f7f1f0d76dd6 430 set_error_callback(mqtt_client, MQTT_CLIENT_CONNECTION_ERROR);
XinZhangMS 0:f7f1f0d76dd6 431 }
XinZhangMS 0:f7f1f0d76dd6 432 else
XinZhangMS 0:f7f1f0d76dd6 433 {
XinZhangMS 0:f7f1f0d76dd6 434 LOG(AZ_LOG_ERROR, LOG_LINE, "Error invalid parameter: mqtt_client: %p", mqtt_client);
XinZhangMS 0:f7f1f0d76dd6 435 }
XinZhangMS 0:f7f1f0d76dd6 436 }
XinZhangMS 0:f7f1f0d76dd6 437
XinZhangMS 0:f7f1f0d76dd6 438 static void clear_mqtt_options(MQTT_CLIENT* mqtt_client)
XinZhangMS 0:f7f1f0d76dd6 439 {
XinZhangMS 0:f7f1f0d76dd6 440 if (mqtt_client->mqttOptions.clientId != NULL)
XinZhangMS 0:f7f1f0d76dd6 441 {
XinZhangMS 0:f7f1f0d76dd6 442 free(mqtt_client->mqttOptions.clientId);
XinZhangMS 0:f7f1f0d76dd6 443 mqtt_client->mqttOptions.clientId = NULL;
XinZhangMS 0:f7f1f0d76dd6 444 }
XinZhangMS 0:f7f1f0d76dd6 445
XinZhangMS 0:f7f1f0d76dd6 446 if (mqtt_client->mqttOptions.willTopic != NULL)
XinZhangMS 0:f7f1f0d76dd6 447 {
XinZhangMS 0:f7f1f0d76dd6 448 free(mqtt_client->mqttOptions.willTopic);
XinZhangMS 0:f7f1f0d76dd6 449 mqtt_client->mqttOptions.willTopic = NULL;
XinZhangMS 0:f7f1f0d76dd6 450 }
XinZhangMS 0:f7f1f0d76dd6 451
XinZhangMS 0:f7f1f0d76dd6 452 if (mqtt_client->mqttOptions.willMessage != NULL)
XinZhangMS 0:f7f1f0d76dd6 453 {
XinZhangMS 0:f7f1f0d76dd6 454 free(mqtt_client->mqttOptions.willMessage);
XinZhangMS 0:f7f1f0d76dd6 455 mqtt_client->mqttOptions.willMessage = NULL;
XinZhangMS 0:f7f1f0d76dd6 456 }
XinZhangMS 0:f7f1f0d76dd6 457
XinZhangMS 0:f7f1f0d76dd6 458 if (mqtt_client->mqttOptions.username != NULL)
XinZhangMS 0:f7f1f0d76dd6 459 {
XinZhangMS 0:f7f1f0d76dd6 460 free(mqtt_client->mqttOptions.username);
XinZhangMS 0:f7f1f0d76dd6 461 mqtt_client->mqttOptions.username = NULL;
XinZhangMS 0:f7f1f0d76dd6 462 }
XinZhangMS 0:f7f1f0d76dd6 463
XinZhangMS 0:f7f1f0d76dd6 464 if (mqtt_client->mqttOptions.password != NULL)
XinZhangMS 0:f7f1f0d76dd6 465 {
XinZhangMS 0:f7f1f0d76dd6 466 free(mqtt_client->mqttOptions.password);
XinZhangMS 0:f7f1f0d76dd6 467 mqtt_client->mqttOptions.password = NULL;
XinZhangMS 0:f7f1f0d76dd6 468 }
XinZhangMS 0:f7f1f0d76dd6 469 }
XinZhangMS 0:f7f1f0d76dd6 470
XinZhangMS 0:f7f1f0d76dd6 471 static int cloneMqttOptions(MQTT_CLIENT* mqtt_client, const MQTT_CLIENT_OPTIONS* mqttOptions)
XinZhangMS 0:f7f1f0d76dd6 472 {
XinZhangMS 0:f7f1f0d76dd6 473 int result = 0;
XinZhangMS 0:f7f1f0d76dd6 474
XinZhangMS 0:f7f1f0d76dd6 475 if (mqttOptions->clientId != NULL)
XinZhangMS 0:f7f1f0d76dd6 476 {
XinZhangMS 0:f7f1f0d76dd6 477 char* clientId;
XinZhangMS 0:f7f1f0d76dd6 478
XinZhangMS 0:f7f1f0d76dd6 479 if (mallocAndStrcpy_s(&clientId, mqttOptions->clientId) != 0)
XinZhangMS 0:f7f1f0d76dd6 480 {
XinZhangMS 0:f7f1f0d76dd6 481 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 482 LOG(AZ_LOG_ERROR, LOG_LINE, "mallocAndStrcpy_s clientId");
XinZhangMS 0:f7f1f0d76dd6 483 }
XinZhangMS 0:f7f1f0d76dd6 484 else
XinZhangMS 0:f7f1f0d76dd6 485 {
XinZhangMS 0:f7f1f0d76dd6 486 if (mqtt_client->mqttOptions.clientId != NULL)
XinZhangMS 0:f7f1f0d76dd6 487 {
XinZhangMS 0:f7f1f0d76dd6 488 free(mqtt_client->mqttOptions.clientId);
XinZhangMS 0:f7f1f0d76dd6 489 }
XinZhangMS 0:f7f1f0d76dd6 490
XinZhangMS 0:f7f1f0d76dd6 491 mqtt_client->mqttOptions.clientId = clientId;
XinZhangMS 0:f7f1f0d76dd6 492 }
XinZhangMS 0:f7f1f0d76dd6 493 }
XinZhangMS 0:f7f1f0d76dd6 494 if (result == 0 && mqttOptions->willTopic != NULL)
XinZhangMS 0:f7f1f0d76dd6 495 {
XinZhangMS 0:f7f1f0d76dd6 496 char* willTopic;
XinZhangMS 0:f7f1f0d76dd6 497
XinZhangMS 0:f7f1f0d76dd6 498 if (mallocAndStrcpy_s(&willTopic, mqttOptions->willTopic) != 0)
XinZhangMS 0:f7f1f0d76dd6 499 {
XinZhangMS 0:f7f1f0d76dd6 500 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 501 LOG(AZ_LOG_ERROR, LOG_LINE, "mallocAndStrcpy_s willTopic");
XinZhangMS 0:f7f1f0d76dd6 502 }
XinZhangMS 0:f7f1f0d76dd6 503 else
XinZhangMS 0:f7f1f0d76dd6 504 {
XinZhangMS 0:f7f1f0d76dd6 505 if (mqtt_client->mqttOptions.willTopic != NULL)
XinZhangMS 0:f7f1f0d76dd6 506 {
XinZhangMS 0:f7f1f0d76dd6 507 free(mqtt_client->mqttOptions.willTopic);
XinZhangMS 0:f7f1f0d76dd6 508 }
XinZhangMS 0:f7f1f0d76dd6 509
XinZhangMS 0:f7f1f0d76dd6 510 mqtt_client->mqttOptions.willTopic = willTopic;
XinZhangMS 0:f7f1f0d76dd6 511 }
XinZhangMS 0:f7f1f0d76dd6 512 }
XinZhangMS 0:f7f1f0d76dd6 513 if (result == 0 && mqttOptions->willMessage != NULL)
XinZhangMS 0:f7f1f0d76dd6 514 {
XinZhangMS 0:f7f1f0d76dd6 515 char* willMessage;
XinZhangMS 0:f7f1f0d76dd6 516
XinZhangMS 0:f7f1f0d76dd6 517 if (mallocAndStrcpy_s(&willMessage, mqttOptions->willMessage) != 0)
XinZhangMS 0:f7f1f0d76dd6 518 {
XinZhangMS 0:f7f1f0d76dd6 519 LOG(AZ_LOG_ERROR, LOG_LINE, "mallocAndStrcpy_s willMessage");
XinZhangMS 0:f7f1f0d76dd6 520 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 521 }
XinZhangMS 0:f7f1f0d76dd6 522 else
XinZhangMS 0:f7f1f0d76dd6 523 {
XinZhangMS 0:f7f1f0d76dd6 524 if (mqtt_client->mqttOptions.willMessage != NULL)
XinZhangMS 0:f7f1f0d76dd6 525 {
XinZhangMS 0:f7f1f0d76dd6 526 free(mqtt_client->mqttOptions.willMessage);
XinZhangMS 0:f7f1f0d76dd6 527 }
XinZhangMS 0:f7f1f0d76dd6 528
XinZhangMS 0:f7f1f0d76dd6 529 mqtt_client->mqttOptions.willMessage = willMessage;
XinZhangMS 0:f7f1f0d76dd6 530 }
XinZhangMS 0:f7f1f0d76dd6 531 }
XinZhangMS 0:f7f1f0d76dd6 532 if (result == 0 && mqttOptions->username != NULL)
XinZhangMS 0:f7f1f0d76dd6 533 {
XinZhangMS 0:f7f1f0d76dd6 534 char* username;
XinZhangMS 0:f7f1f0d76dd6 535
XinZhangMS 0:f7f1f0d76dd6 536 if (mallocAndStrcpy_s(&username, mqttOptions->username) != 0)
XinZhangMS 0:f7f1f0d76dd6 537 {
XinZhangMS 0:f7f1f0d76dd6 538 LOG(AZ_LOG_ERROR, LOG_LINE, "mallocAndStrcpy_s username");
XinZhangMS 0:f7f1f0d76dd6 539 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 540 }
XinZhangMS 0:f7f1f0d76dd6 541 else
XinZhangMS 0:f7f1f0d76dd6 542 {
XinZhangMS 0:f7f1f0d76dd6 543 if (mqtt_client->mqttOptions.username != NULL)
XinZhangMS 0:f7f1f0d76dd6 544 {
XinZhangMS 0:f7f1f0d76dd6 545 free(mqtt_client->mqttOptions.username);
XinZhangMS 0:f7f1f0d76dd6 546 }
XinZhangMS 0:f7f1f0d76dd6 547
XinZhangMS 0:f7f1f0d76dd6 548 mqtt_client->mqttOptions.username = username;
XinZhangMS 0:f7f1f0d76dd6 549 }
XinZhangMS 0:f7f1f0d76dd6 550 }
XinZhangMS 0:f7f1f0d76dd6 551 if (result == 0 && mqttOptions->password != NULL)
XinZhangMS 0:f7f1f0d76dd6 552 {
XinZhangMS 0:f7f1f0d76dd6 553 char* password;
XinZhangMS 0:f7f1f0d76dd6 554
XinZhangMS 0:f7f1f0d76dd6 555 if (mallocAndStrcpy_s(&password, mqttOptions->password) != 0)
XinZhangMS 0:f7f1f0d76dd6 556 {
XinZhangMS 0:f7f1f0d76dd6 557 LOG(AZ_LOG_ERROR, LOG_LINE, "mallocAndStrcpy_s password");
XinZhangMS 0:f7f1f0d76dd6 558 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 559 }
XinZhangMS 0:f7f1f0d76dd6 560 else
XinZhangMS 0:f7f1f0d76dd6 561 {
XinZhangMS 0:f7f1f0d76dd6 562 if (mqtt_client->mqttOptions.password != NULL)
XinZhangMS 0:f7f1f0d76dd6 563 {
XinZhangMS 0:f7f1f0d76dd6 564 free(mqtt_client->mqttOptions.password);
XinZhangMS 0:f7f1f0d76dd6 565 }
XinZhangMS 0:f7f1f0d76dd6 566
XinZhangMS 0:f7f1f0d76dd6 567 mqtt_client->mqttOptions.password = password;
XinZhangMS 0:f7f1f0d76dd6 568 }
XinZhangMS 0:f7f1f0d76dd6 569 }
XinZhangMS 0:f7f1f0d76dd6 570 if (result == 0)
XinZhangMS 0:f7f1f0d76dd6 571 {
XinZhangMS 0:f7f1f0d76dd6 572 mqtt_client->mqttOptions.keepAliveInterval = mqttOptions->keepAliveInterval;
XinZhangMS 0:f7f1f0d76dd6 573 mqtt_client->mqttOptions.messageRetain = mqttOptions->messageRetain;
XinZhangMS 0:f7f1f0d76dd6 574 mqtt_client->mqttOptions.useCleanSession = mqttOptions->useCleanSession;
XinZhangMS 0:f7f1f0d76dd6 575 mqtt_client->mqttOptions.qualityOfServiceValue = mqttOptions->qualityOfServiceValue;
XinZhangMS 0:f7f1f0d76dd6 576 }
XinZhangMS 0:f7f1f0d76dd6 577 else
XinZhangMS 0:f7f1f0d76dd6 578 {
XinZhangMS 0:f7f1f0d76dd6 579 clear_mqtt_options(mqtt_client);
XinZhangMS 0:f7f1f0d76dd6 580 }
XinZhangMS 0:f7f1f0d76dd6 581 return result;
XinZhangMS 0:f7f1f0d76dd6 582 }
XinZhangMS 0:f7f1f0d76dd6 583
XinZhangMS 0:f7f1f0d76dd6 584 static void recvCompleteCallback(void* context, CONTROL_PACKET_TYPE packet, int flags, BUFFER_HANDLE headerData)
XinZhangMS 0:f7f1f0d76dd6 585 {
XinZhangMS 0:f7f1f0d76dd6 586 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
XinZhangMS 0:f7f1f0d76dd6 587 if ((mqtt_client != NULL && headerData != NULL) || packet == PINGRESP_TYPE)
XinZhangMS 0:f7f1f0d76dd6 588 {
XinZhangMS 0:f7f1f0d76dd6 589 size_t len = BUFFER_length(headerData);
XinZhangMS 0:f7f1f0d76dd6 590 uint8_t* iterator = BUFFER_u_char(headerData);
XinZhangMS 0:f7f1f0d76dd6 591
XinZhangMS 0:f7f1f0d76dd6 592 logIncomingRawTrace(mqtt_client, packet, (uint8_t)flags, iterator, len);
XinZhangMS 0:f7f1f0d76dd6 593
XinZhangMS 0:f7f1f0d76dd6 594 if ((iterator != NULL && len > 0) || packet == PINGRESP_TYPE)
XinZhangMS 0:f7f1f0d76dd6 595 {
XinZhangMS 0:f7f1f0d76dd6 596 switch (packet)
XinZhangMS 0:f7f1f0d76dd6 597 {
XinZhangMS 0:f7f1f0d76dd6 598 case CONNACK_TYPE:
XinZhangMS 0:f7f1f0d76dd6 599 {
XinZhangMS 0:f7f1f0d76dd6 600 if (mqtt_client->fnOperationCallback != NULL)
XinZhangMS 0:f7f1f0d76dd6 601 {
XinZhangMS 0:f7f1f0d76dd6 602 STRING_HANDLE trace_log = NULL;
XinZhangMS 0:f7f1f0d76dd6 603
XinZhangMS 0:f7f1f0d76dd6 604 /*Codes_SRS_MQTT_CLIENT_07_028: [If the actionResult parameter is of type CONNECT_ACK then the msgInfo value shall be a CONNECT_ACK structure.]*/
XinZhangMS 0:f7f1f0d76dd6 605 CONNECT_ACK connack = { 0 };
XinZhangMS 0:f7f1f0d76dd6 606 connack.isSessionPresent = (byteutil_readByte(&iterator) == 0x1) ? true : false;
XinZhangMS 0:f7f1f0d76dd6 607 uint8_t rc = byteutil_readByte(&iterator);
XinZhangMS 0:f7f1f0d76dd6 608 connack.returnCode =
XinZhangMS 0:f7f1f0d76dd6 609 (rc < ((uint8_t)CONN_REFUSED_UNKNOWN)) ?
XinZhangMS 0:f7f1f0d76dd6 610 (CONNECT_RETURN_CODE)rc : CONN_REFUSED_UNKNOWN;
XinZhangMS 0:f7f1f0d76dd6 611
XinZhangMS 0:f7f1f0d76dd6 612 if (mqtt_client->logTrace)
XinZhangMS 0:f7f1f0d76dd6 613 {
XinZhangMS 0:f7f1f0d76dd6 614 trace_log = STRING_construct_sprintf("CONNACK | SESSION_PRESENT: %s | RETURN_CODE: 0x%x", connack.isSessionPresent ? TRUE_CONST : FALSE_CONST, connack.returnCode);
XinZhangMS 0:f7f1f0d76dd6 615 log_incoming_trace(mqtt_client, trace_log);
XinZhangMS 0:f7f1f0d76dd6 616 STRING_delete(trace_log);
XinZhangMS 0:f7f1f0d76dd6 617 }
XinZhangMS 0:f7f1f0d76dd6 618 mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_CONNACK, (void*)&connack, mqtt_client->ctx);
XinZhangMS 0:f7f1f0d76dd6 619
XinZhangMS 0:f7f1f0d76dd6 620 if (connack.returnCode == CONNECTION_ACCEPTED)
XinZhangMS 0:f7f1f0d76dd6 621 {
XinZhangMS 0:f7f1f0d76dd6 622 mqtt_client->clientConnected = true;
XinZhangMS 0:f7f1f0d76dd6 623 }
XinZhangMS 0:f7f1f0d76dd6 624 }
XinZhangMS 0:f7f1f0d76dd6 625 else
XinZhangMS 0:f7f1f0d76dd6 626 {
XinZhangMS 0:f7f1f0d76dd6 627 LOG(AZ_LOG_ERROR, LOG_LINE, "fnOperationCallback NULL");
XinZhangMS 0:f7f1f0d76dd6 628 }
XinZhangMS 0:f7f1f0d76dd6 629 break;
XinZhangMS 0:f7f1f0d76dd6 630 }
XinZhangMS 0:f7f1f0d76dd6 631 case PUBLISH_TYPE:
XinZhangMS 0:f7f1f0d76dd6 632 {
XinZhangMS 0:f7f1f0d76dd6 633 if (mqtt_client->fnMessageRecv != NULL)
XinZhangMS 0:f7f1f0d76dd6 634 {
XinZhangMS 0:f7f1f0d76dd6 635 STRING_HANDLE trace_log = NULL;
XinZhangMS 0:f7f1f0d76dd6 636
XinZhangMS 0:f7f1f0d76dd6 637 bool isDuplicateMsg = (flags & DUPLICATE_FLAG_MASK) ? true : false;
XinZhangMS 0:f7f1f0d76dd6 638 bool isRetainMsg = (flags & RETAIN_FLAG_MASK) ? true : false;
XinZhangMS 0:f7f1f0d76dd6 639 QOS_VALUE qosValue = (flags == 0) ? DELIVER_AT_MOST_ONCE : (flags & QOS_LEAST_ONCE_FLAG_MASK) ? DELIVER_AT_LEAST_ONCE : DELIVER_EXACTLY_ONCE;
XinZhangMS 0:f7f1f0d76dd6 640
XinZhangMS 0:f7f1f0d76dd6 641 if (mqtt_client->logTrace)
XinZhangMS 0:f7f1f0d76dd6 642 {
XinZhangMS 0:f7f1f0d76dd6 643 trace_log = STRING_construct_sprintf("PUBLISH | IS_DUP: %s | RETAIN: %d | QOS: %s", isDuplicateMsg ? TRUE_CONST : FALSE_CONST,
XinZhangMS 0:f7f1f0d76dd6 644 isRetainMsg ? 1 : 0, ENUM_TO_STRING(QOS_VALUE, qosValue) );
XinZhangMS 0:f7f1f0d76dd6 645 }
XinZhangMS 0:f7f1f0d76dd6 646
XinZhangMS 0:f7f1f0d76dd6 647 uint8_t* initialPos = iterator;
XinZhangMS 0:f7f1f0d76dd6 648 size_t length = len - (iterator - initialPos);
XinZhangMS 0:f7f1f0d76dd6 649 char* topicName = byteutil_readUTF(&iterator, &length);
XinZhangMS 0:f7f1f0d76dd6 650 if (topicName == NULL)
XinZhangMS 0:f7f1f0d76dd6 651 {
XinZhangMS 0:f7f1f0d76dd6 652 LOG(AZ_LOG_ERROR, LOG_LINE, "Publish MSG: failure reading topic name");
XinZhangMS 0:f7f1f0d76dd6 653 set_error_callback(mqtt_client, MQTT_CLIENT_PARSE_ERROR);
XinZhangMS 0:f7f1f0d76dd6 654 if (trace_log != NULL)
XinZhangMS 0:f7f1f0d76dd6 655 {
XinZhangMS 0:f7f1f0d76dd6 656 STRING_delete(trace_log);
XinZhangMS 0:f7f1f0d76dd6 657 }
XinZhangMS 0:f7f1f0d76dd6 658 }
XinZhangMS 0:f7f1f0d76dd6 659 else
XinZhangMS 0:f7f1f0d76dd6 660 {
XinZhangMS 0:f7f1f0d76dd6 661 if (mqtt_client->logTrace)
XinZhangMS 0:f7f1f0d76dd6 662 {
XinZhangMS 0:f7f1f0d76dd6 663 STRING_sprintf(trace_log, " | TOPIC_NAME: %s", topicName);
XinZhangMS 0:f7f1f0d76dd6 664 }
XinZhangMS 0:f7f1f0d76dd6 665 uint16_t packetId = 0;
XinZhangMS 0:f7f1f0d76dd6 666 length = len - (iterator - initialPos);
XinZhangMS 0:f7f1f0d76dd6 667 if (qosValue != DELIVER_AT_MOST_ONCE)
XinZhangMS 0:f7f1f0d76dd6 668 {
XinZhangMS 0:f7f1f0d76dd6 669 packetId = byteutil_read_uint16(&iterator, length);
XinZhangMS 0:f7f1f0d76dd6 670 if (mqtt_client->logTrace)
XinZhangMS 0:f7f1f0d76dd6 671 {
XinZhangMS 0:f7f1f0d76dd6 672 STRING_sprintf(trace_log, " | PACKET_ID: %"PRIu16, packetId);
XinZhangMS 0:f7f1f0d76dd6 673 }
XinZhangMS 0:f7f1f0d76dd6 674 }
XinZhangMS 0:f7f1f0d76dd6 675 length = len - (iterator - initialPos);
XinZhangMS 0:f7f1f0d76dd6 676
XinZhangMS 0:f7f1f0d76dd6 677 MQTT_MESSAGE_HANDLE msgHandle = mqttmessage_create(packetId, topicName, qosValue, iterator, length);
XinZhangMS 0:f7f1f0d76dd6 678 if (msgHandle == NULL)
XinZhangMS 0:f7f1f0d76dd6 679 {
XinZhangMS 0:f7f1f0d76dd6 680 LOG(AZ_LOG_ERROR, LOG_LINE, "failure in mqttmessage_create");
XinZhangMS 0:f7f1f0d76dd6 681 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
XinZhangMS 0:f7f1f0d76dd6 682 if (trace_log != NULL) {
XinZhangMS 0:f7f1f0d76dd6 683 STRING_delete(trace_log);
XinZhangMS 0:f7f1f0d76dd6 684 }
XinZhangMS 0:f7f1f0d76dd6 685 }
XinZhangMS 0:f7f1f0d76dd6 686 else
XinZhangMS 0:f7f1f0d76dd6 687 {
XinZhangMS 0:f7f1f0d76dd6 688 if (mqttmessage_setIsDuplicateMsg(msgHandle, isDuplicateMsg) != 0 ||
XinZhangMS 0:f7f1f0d76dd6 689 mqttmessage_setIsRetained(msgHandle, isRetainMsg) != 0)
XinZhangMS 0:f7f1f0d76dd6 690 {
XinZhangMS 0:f7f1f0d76dd6 691 LOG(AZ_LOG_ERROR, LOG_LINE, "failure setting mqtt message property");
XinZhangMS 0:f7f1f0d76dd6 692 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
XinZhangMS 0:f7f1f0d76dd6 693 if (trace_log != NULL) {
XinZhangMS 0:f7f1f0d76dd6 694 STRING_delete(trace_log);
XinZhangMS 0:f7f1f0d76dd6 695 }
XinZhangMS 0:f7f1f0d76dd6 696 }
XinZhangMS 0:f7f1f0d76dd6 697 else
XinZhangMS 0:f7f1f0d76dd6 698 {
XinZhangMS 0:f7f1f0d76dd6 699 if (mqtt_client->logTrace)
XinZhangMS 0:f7f1f0d76dd6 700 {
XinZhangMS 0:f7f1f0d76dd6 701 STRING_sprintf(trace_log, " | PAYLOAD_LEN: %zu", length);
XinZhangMS 0:f7f1f0d76dd6 702 log_incoming_trace(mqtt_client, trace_log);
XinZhangMS 0:f7f1f0d76dd6 703 STRING_delete(trace_log);
XinZhangMS 0:f7f1f0d76dd6 704 }
XinZhangMS 0:f7f1f0d76dd6 705
XinZhangMS 0:f7f1f0d76dd6 706 mqtt_client->fnMessageRecv(msgHandle, mqtt_client->ctx);
XinZhangMS 0:f7f1f0d76dd6 707
XinZhangMS 0:f7f1f0d76dd6 708 BUFFER_HANDLE pubRel = NULL;
XinZhangMS 0:f7f1f0d76dd6 709 if (qosValue == DELIVER_EXACTLY_ONCE)
XinZhangMS 0:f7f1f0d76dd6 710 {
XinZhangMS 0:f7f1f0d76dd6 711 pubRel = mqtt_codec_publishReceived(packetId);
XinZhangMS 0:f7f1f0d76dd6 712 if (pubRel == NULL)
XinZhangMS 0:f7f1f0d76dd6 713 {
XinZhangMS 0:f7f1f0d76dd6 714 LOG(AZ_LOG_ERROR, LOG_LINE, "Failed to allocate publish receive message.");
XinZhangMS 0:f7f1f0d76dd6 715 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
XinZhangMS 0:f7f1f0d76dd6 716 }
XinZhangMS 0:f7f1f0d76dd6 717 }
XinZhangMS 0:f7f1f0d76dd6 718 else if (qosValue == DELIVER_AT_LEAST_ONCE)
XinZhangMS 0:f7f1f0d76dd6 719 {
XinZhangMS 0:f7f1f0d76dd6 720 pubRel = mqtt_codec_publishAck(packetId);
XinZhangMS 0:f7f1f0d76dd6 721 if (pubRel == NULL)
XinZhangMS 0:f7f1f0d76dd6 722 {
XinZhangMS 0:f7f1f0d76dd6 723 LOG(AZ_LOG_ERROR, LOG_LINE, "Failed to allocate publish ack message.");
XinZhangMS 0:f7f1f0d76dd6 724 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
XinZhangMS 0:f7f1f0d76dd6 725 }
XinZhangMS 0:f7f1f0d76dd6 726 }
XinZhangMS 0:f7f1f0d76dd6 727 if (pubRel != NULL)
XinZhangMS 0:f7f1f0d76dd6 728 {
XinZhangMS 0:f7f1f0d76dd6 729 size_t size = BUFFER_length(pubRel);
XinZhangMS 0:f7f1f0d76dd6 730 (void)sendPacketItem(mqtt_client, BUFFER_u_char(pubRel), size);
XinZhangMS 0:f7f1f0d76dd6 731 BUFFER_delete(pubRel);
XinZhangMS 0:f7f1f0d76dd6 732 }
XinZhangMS 0:f7f1f0d76dd6 733 }
XinZhangMS 0:f7f1f0d76dd6 734 mqttmessage_destroy(msgHandle);
XinZhangMS 0:f7f1f0d76dd6 735 }
XinZhangMS 0:f7f1f0d76dd6 736 free(topicName);
XinZhangMS 0:f7f1f0d76dd6 737 }
XinZhangMS 0:f7f1f0d76dd6 738 }
XinZhangMS 0:f7f1f0d76dd6 739 break;
XinZhangMS 0:f7f1f0d76dd6 740 }
XinZhangMS 0:f7f1f0d76dd6 741 case PUBACK_TYPE:
XinZhangMS 0:f7f1f0d76dd6 742 case PUBREC_TYPE:
XinZhangMS 0:f7f1f0d76dd6 743 case PUBREL_TYPE:
XinZhangMS 0:f7f1f0d76dd6 744 case PUBCOMP_TYPE:
XinZhangMS 0:f7f1f0d76dd6 745 {
XinZhangMS 0:f7f1f0d76dd6 746 if (mqtt_client->fnOperationCallback)
XinZhangMS 0:f7f1f0d76dd6 747 {
XinZhangMS 0:f7f1f0d76dd6 748 STRING_HANDLE trace_log = NULL;
XinZhangMS 0:f7f1f0d76dd6 749
XinZhangMS 0:f7f1f0d76dd6 750 /*Codes_SRS_MQTT_CLIENT_07_029: [If the actionResult parameter are of types PUBACK_TYPE, PUBREC_TYPE, PUBREL_TYPE or PUBCOMP_TYPE then the msgInfo value shall be a PUBLISH_ACK structure.]*/
XinZhangMS 0:f7f1f0d76dd6 751 MQTT_CLIENT_EVENT_RESULT action = (packet == PUBACK_TYPE) ? MQTT_CLIENT_ON_PUBLISH_ACK :
XinZhangMS 0:f7f1f0d76dd6 752 (packet == PUBREC_TYPE) ? MQTT_CLIENT_ON_PUBLISH_RECV :
XinZhangMS 0:f7f1f0d76dd6 753 (packet == PUBREL_TYPE) ? MQTT_CLIENT_ON_PUBLISH_REL : MQTT_CLIENT_ON_PUBLISH_COMP;
XinZhangMS 0:f7f1f0d76dd6 754
XinZhangMS 0:f7f1f0d76dd6 755 PUBLISH_ACK publish_ack = { 0 };
XinZhangMS 0:f7f1f0d76dd6 756 publish_ack.packetId = byteutil_read_uint16(&iterator, len);
XinZhangMS 0:f7f1f0d76dd6 757
XinZhangMS 0:f7f1f0d76dd6 758 if (mqtt_client->logTrace)
XinZhangMS 0:f7f1f0d76dd6 759 {
XinZhangMS 0:f7f1f0d76dd6 760 trace_log = STRING_construct_sprintf("%s | PACKET_ID: %"PRIu16, packet == PUBACK_TYPE ? "PUBACK" : (packet == PUBREC_TYPE) ? "PUBREC" : (packet == PUBREL_TYPE) ? "PUBREL" : "PUBCOMP",
XinZhangMS 0:f7f1f0d76dd6 761 publish_ack.packetId);
XinZhangMS 0:f7f1f0d76dd6 762
XinZhangMS 0:f7f1f0d76dd6 763 log_incoming_trace(mqtt_client, trace_log);
XinZhangMS 0:f7f1f0d76dd6 764 STRING_delete(trace_log);
XinZhangMS 0:f7f1f0d76dd6 765 }
XinZhangMS 0:f7f1f0d76dd6 766
XinZhangMS 0:f7f1f0d76dd6 767 BUFFER_HANDLE pubRel = NULL;
XinZhangMS 0:f7f1f0d76dd6 768 mqtt_client->fnOperationCallback(mqtt_client, action, (void*)&publish_ack, mqtt_client->ctx);
XinZhangMS 0:f7f1f0d76dd6 769 if (packet == PUBREC_TYPE)
XinZhangMS 0:f7f1f0d76dd6 770 {
XinZhangMS 0:f7f1f0d76dd6 771 pubRel = mqtt_codec_publishRelease(publish_ack.packetId);
XinZhangMS 0:f7f1f0d76dd6 772 if (pubRel == NULL)
XinZhangMS 0:f7f1f0d76dd6 773 {
XinZhangMS 0:f7f1f0d76dd6 774 LOG(AZ_LOG_ERROR, LOG_LINE, "Failed to allocate publish release message.");
XinZhangMS 0:f7f1f0d76dd6 775 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
XinZhangMS 0:f7f1f0d76dd6 776 }
XinZhangMS 0:f7f1f0d76dd6 777 }
XinZhangMS 0:f7f1f0d76dd6 778 else if (packet == PUBREL_TYPE)
XinZhangMS 0:f7f1f0d76dd6 779 {
XinZhangMS 0:f7f1f0d76dd6 780 pubRel = mqtt_codec_publishComplete(publish_ack.packetId);
XinZhangMS 0:f7f1f0d76dd6 781 if (pubRel == NULL)
XinZhangMS 0:f7f1f0d76dd6 782 {
XinZhangMS 0:f7f1f0d76dd6 783 LOG(AZ_LOG_ERROR, LOG_LINE, "Failed to allocate publish complete message.");
XinZhangMS 0:f7f1f0d76dd6 784 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
XinZhangMS 0:f7f1f0d76dd6 785 }
XinZhangMS 0:f7f1f0d76dd6 786 }
XinZhangMS 0:f7f1f0d76dd6 787 if (pubRel != NULL)
XinZhangMS 0:f7f1f0d76dd6 788 {
XinZhangMS 0:f7f1f0d76dd6 789 size_t size = BUFFER_length(pubRel);
XinZhangMS 0:f7f1f0d76dd6 790 (void)sendPacketItem(mqtt_client, BUFFER_u_char(pubRel), size);
XinZhangMS 0:f7f1f0d76dd6 791 BUFFER_delete(pubRel);
XinZhangMS 0:f7f1f0d76dd6 792 }
XinZhangMS 0:f7f1f0d76dd6 793 }
XinZhangMS 0:f7f1f0d76dd6 794 break;
XinZhangMS 0:f7f1f0d76dd6 795 }
XinZhangMS 0:f7f1f0d76dd6 796 case SUBACK_TYPE:
XinZhangMS 0:f7f1f0d76dd6 797 {
XinZhangMS 0:f7f1f0d76dd6 798 if (mqtt_client->fnOperationCallback)
XinZhangMS 0:f7f1f0d76dd6 799 {
XinZhangMS 0:f7f1f0d76dd6 800 STRING_HANDLE trace_log = NULL;
XinZhangMS 0:f7f1f0d76dd6 801
XinZhangMS 0:f7f1f0d76dd6 802 /*Codes_SRS_MQTT_CLIENT_07_030: [If the actionResult parameter is of type SUBACK_TYPE then the msgInfo value shall be a SUBSCRIBE_ACK structure.]*/
XinZhangMS 0:f7f1f0d76dd6 803 SUBSCRIBE_ACK suback = { 0 };
XinZhangMS 0:f7f1f0d76dd6 804
XinZhangMS 0:f7f1f0d76dd6 805 size_t remainLen = len;
XinZhangMS 0:f7f1f0d76dd6 806 suback.packetId = byteutil_read_uint16(&iterator, len);
XinZhangMS 0:f7f1f0d76dd6 807 remainLen -= 2;
XinZhangMS 0:f7f1f0d76dd6 808
XinZhangMS 0:f7f1f0d76dd6 809 if (mqtt_client->logTrace)
XinZhangMS 0:f7f1f0d76dd6 810 {
XinZhangMS 0:f7f1f0d76dd6 811 trace_log = STRING_construct_sprintf("SUBACK | PACKET_ID: %"PRIu16, suback.packetId);
XinZhangMS 0:f7f1f0d76dd6 812 }
XinZhangMS 0:f7f1f0d76dd6 813
XinZhangMS 0:f7f1f0d76dd6 814 // Allocate the remaining len
XinZhangMS 0:f7f1f0d76dd6 815 suback.qosReturn = (QOS_VALUE*)malloc(sizeof(QOS_VALUE)*remainLen);
XinZhangMS 0:f7f1f0d76dd6 816 if (suback.qosReturn != NULL)
XinZhangMS 0:f7f1f0d76dd6 817 {
XinZhangMS 0:f7f1f0d76dd6 818 while (remainLen > 0)
XinZhangMS 0:f7f1f0d76dd6 819 {
XinZhangMS 0:f7f1f0d76dd6 820 uint8_t qosRet = byteutil_readByte(&iterator);
XinZhangMS 0:f7f1f0d76dd6 821 suback.qosReturn[suback.qosCount++] =
XinZhangMS 0:f7f1f0d76dd6 822 (qosRet <= ((uint8_t)DELIVER_EXACTLY_ONCE)) ?
XinZhangMS 0:f7f1f0d76dd6 823 (QOS_VALUE)qosRet : DELIVER_FAILURE;
XinZhangMS 0:f7f1f0d76dd6 824 remainLen--;
XinZhangMS 0:f7f1f0d76dd6 825 if (mqtt_client->logTrace)
XinZhangMS 0:f7f1f0d76dd6 826 {
XinZhangMS 0:f7f1f0d76dd6 827 STRING_sprintf(trace_log, " | RETURN_CODE: %"PRIu16, suback.qosReturn[suback.qosCount-1]);
XinZhangMS 0:f7f1f0d76dd6 828 }
XinZhangMS 0:f7f1f0d76dd6 829 }
XinZhangMS 0:f7f1f0d76dd6 830
XinZhangMS 0:f7f1f0d76dd6 831 if (mqtt_client->logTrace)
XinZhangMS 0:f7f1f0d76dd6 832 {
XinZhangMS 0:f7f1f0d76dd6 833 log_incoming_trace(mqtt_client, trace_log);
XinZhangMS 0:f7f1f0d76dd6 834 STRING_delete(trace_log);
XinZhangMS 0:f7f1f0d76dd6 835 }
XinZhangMS 0:f7f1f0d76dd6 836 mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_SUBSCRIBE_ACK, (void*)&suback, mqtt_client->ctx);
XinZhangMS 0:f7f1f0d76dd6 837 free(suback.qosReturn);
XinZhangMS 0:f7f1f0d76dd6 838 }
XinZhangMS 0:f7f1f0d76dd6 839 else
XinZhangMS 0:f7f1f0d76dd6 840 {
XinZhangMS 0:f7f1f0d76dd6 841 LOG(AZ_LOG_ERROR, LOG_LINE, "allocation of quality of service value failed.");
XinZhangMS 0:f7f1f0d76dd6 842 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
XinZhangMS 0:f7f1f0d76dd6 843 }
XinZhangMS 0:f7f1f0d76dd6 844 }
XinZhangMS 0:f7f1f0d76dd6 845 break;
XinZhangMS 0:f7f1f0d76dd6 846 }
XinZhangMS 0:f7f1f0d76dd6 847 case UNSUBACK_TYPE:
XinZhangMS 0:f7f1f0d76dd6 848 {
XinZhangMS 0:f7f1f0d76dd6 849 if (mqtt_client->fnOperationCallback)
XinZhangMS 0:f7f1f0d76dd6 850 {
XinZhangMS 0:f7f1f0d76dd6 851 STRING_HANDLE trace_log = NULL;
XinZhangMS 0:f7f1f0d76dd6 852
XinZhangMS 0:f7f1f0d76dd6 853 /*Codes_SRS_MQTT_CLIENT_07_031: [If the actionResult parameter is of type UNSUBACK_TYPE then the msgInfo value shall be a UNSUBSCRIBE_ACK structure.]*/
XinZhangMS 0:f7f1f0d76dd6 854 UNSUBSCRIBE_ACK unsuback = { 0 };
XinZhangMS 0:f7f1f0d76dd6 855 unsuback.packetId = byteutil_read_uint16(&iterator, len);
XinZhangMS 0:f7f1f0d76dd6 856
XinZhangMS 0:f7f1f0d76dd6 857 if (mqtt_client->logTrace)
XinZhangMS 0:f7f1f0d76dd6 858 {
XinZhangMS 0:f7f1f0d76dd6 859 trace_log = STRING_construct_sprintf("UNSUBACK | PACKET_ID: %"PRIu16, unsuback.packetId);
XinZhangMS 0:f7f1f0d76dd6 860 log_incoming_trace(mqtt_client, trace_log);
XinZhangMS 0:f7f1f0d76dd6 861 STRING_delete(trace_log);
XinZhangMS 0:f7f1f0d76dd6 862 }
XinZhangMS 0:f7f1f0d76dd6 863 mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_UNSUBSCRIBE_ACK, (void*)&unsuback, mqtt_client->ctx);
XinZhangMS 0:f7f1f0d76dd6 864 }
XinZhangMS 0:f7f1f0d76dd6 865 break;
XinZhangMS 0:f7f1f0d76dd6 866 }
XinZhangMS 0:f7f1f0d76dd6 867 case PINGRESP_TYPE:
XinZhangMS 0:f7f1f0d76dd6 868 mqtt_client->timeSincePing = 0;
XinZhangMS 0:f7f1f0d76dd6 869 if (mqtt_client->logTrace)
XinZhangMS 0:f7f1f0d76dd6 870 {
XinZhangMS 0:f7f1f0d76dd6 871 STRING_HANDLE trace_log = STRING_construct_sprintf("PINGRESP");
XinZhangMS 0:f7f1f0d76dd6 872 log_incoming_trace(mqtt_client, trace_log);
XinZhangMS 0:f7f1f0d76dd6 873 STRING_delete(trace_log);
XinZhangMS 0:f7f1f0d76dd6 874 }
XinZhangMS 0:f7f1f0d76dd6 875 // Forward ping response to operation callback
XinZhangMS 0:f7f1f0d76dd6 876 if (mqtt_client->fnOperationCallback)
XinZhangMS 0:f7f1f0d76dd6 877 {
XinZhangMS 0:f7f1f0d76dd6 878 mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_PING_RESPONSE, NULL, mqtt_client->ctx);
XinZhangMS 0:f7f1f0d76dd6 879 }
XinZhangMS 0:f7f1f0d76dd6 880 break;
XinZhangMS 0:f7f1f0d76dd6 881 default:
XinZhangMS 0:f7f1f0d76dd6 882 break;
XinZhangMS 0:f7f1f0d76dd6 883 }
XinZhangMS 0:f7f1f0d76dd6 884 }
XinZhangMS 0:f7f1f0d76dd6 885 }
XinZhangMS 0:f7f1f0d76dd6 886 }
XinZhangMS 0:f7f1f0d76dd6 887
XinZhangMS 0:f7f1f0d76dd6 888 MQTT_CLIENT_HANDLE mqtt_client_init(ON_MQTT_MESSAGE_RECV_CALLBACK msgRecv, ON_MQTT_OPERATION_CALLBACK opCallback, void* opCallbackCtx, ON_MQTT_ERROR_CALLBACK onErrorCallBack, void* errorCBCtx)
XinZhangMS 0:f7f1f0d76dd6 889 {
XinZhangMS 0:f7f1f0d76dd6 890 MQTT_CLIENT* result;
XinZhangMS 0:f7f1f0d76dd6 891 /*Codes_SRS_MQTT_CLIENT_07_001: [If the parameters ON_MQTT_MESSAGE_RECV_CALLBACK is NULL then mqttclient_init shall return NULL.]*/
XinZhangMS 0:f7f1f0d76dd6 892 if (msgRecv == NULL)
XinZhangMS 0:f7f1f0d76dd6 893 {
XinZhangMS 0:f7f1f0d76dd6 894 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 895 }
XinZhangMS 0:f7f1f0d76dd6 896 else
XinZhangMS 0:f7f1f0d76dd6 897 {
XinZhangMS 0:f7f1f0d76dd6 898 result = malloc(sizeof(MQTT_CLIENT));
XinZhangMS 0:f7f1f0d76dd6 899 if (result == NULL)
XinZhangMS 0:f7f1f0d76dd6 900 {
XinZhangMS 0:f7f1f0d76dd6 901 /*Codes_SRS_MQTT_CLIENT_07_002: [If any failure is encountered then mqttclient_init shall return NULL.]*/
XinZhangMS 0:f7f1f0d76dd6 902 LOG(AZ_LOG_ERROR, LOG_LINE, "mqtt_client_init failure: Allocation Failure");
XinZhangMS 0:f7f1f0d76dd6 903 }
XinZhangMS 0:f7f1f0d76dd6 904 else
XinZhangMS 0:f7f1f0d76dd6 905 {
XinZhangMS 0:f7f1f0d76dd6 906 memset(result, 0, sizeof(MQTT_CLIENT));
XinZhangMS 0:f7f1f0d76dd6 907 /*Codes_SRS_MQTT_CLIENT_07_003: [mqttclient_init shall allocate MQTTCLIENT_DATA_INSTANCE and return the MQTTCLIENT_HANDLE on success.]*/
XinZhangMS 0:f7f1f0d76dd6 908 result->packetState = UNKNOWN_TYPE;
XinZhangMS 0:f7f1f0d76dd6 909 result->fnOperationCallback = opCallback;
XinZhangMS 0:f7f1f0d76dd6 910 result->ctx = opCallbackCtx;
XinZhangMS 0:f7f1f0d76dd6 911 result->fnMessageRecv = msgRecv;
XinZhangMS 0:f7f1f0d76dd6 912 result->fnOnErrorCallBack = onErrorCallBack;
XinZhangMS 0:f7f1f0d76dd6 913 result->errorCBCtx = errorCBCtx;
XinZhangMS 0:f7f1f0d76dd6 914 result->qosValue = DELIVER_AT_MOST_ONCE;
XinZhangMS 0:f7f1f0d76dd6 915 result->packetTickCntr = tickcounter_create();
XinZhangMS 0:f7f1f0d76dd6 916 result->maxPingRespTime = DEFAULT_MAX_PING_RESPONSE_TIME;
XinZhangMS 0:f7f1f0d76dd6 917 if (result->packetTickCntr == NULL)
XinZhangMS 0:f7f1f0d76dd6 918 {
XinZhangMS 0:f7f1f0d76dd6 919 /*Codes_SRS_MQTT_CLIENT_07_002: [If any failure is encountered then mqttclient_init shall return NULL.]*/
XinZhangMS 0:f7f1f0d76dd6 920 LOG(AZ_LOG_ERROR, LOG_LINE, "mqtt_client_init failure: tickcounter_create failure");
XinZhangMS 0:f7f1f0d76dd6 921 free(result);
XinZhangMS 0:f7f1f0d76dd6 922 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 923 }
XinZhangMS 0:f7f1f0d76dd6 924 else
XinZhangMS 0:f7f1f0d76dd6 925 {
XinZhangMS 0:f7f1f0d76dd6 926 result->codec_handle = mqtt_codec_create(recvCompleteCallback, result);
XinZhangMS 0:f7f1f0d76dd6 927 if (result->codec_handle == NULL)
XinZhangMS 0:f7f1f0d76dd6 928 {
XinZhangMS 0:f7f1f0d76dd6 929 /*Codes_SRS_MQTT_CLIENT_07_002: [If any failure is encountered then mqttclient_init shall return NULL.]*/
XinZhangMS 0:f7f1f0d76dd6 930 LOG(AZ_LOG_ERROR, LOG_LINE, "mqtt_client_init failure: mqtt_codec_create failure");
XinZhangMS 0:f7f1f0d76dd6 931 tickcounter_destroy(result->packetTickCntr);
XinZhangMS 0:f7f1f0d76dd6 932 free(result);
XinZhangMS 0:f7f1f0d76dd6 933 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 934 }
XinZhangMS 0:f7f1f0d76dd6 935 }
XinZhangMS 0:f7f1f0d76dd6 936 }
XinZhangMS 0:f7f1f0d76dd6 937 }
XinZhangMS 0:f7f1f0d76dd6 938 return result;
XinZhangMS 0:f7f1f0d76dd6 939 }
XinZhangMS 0:f7f1f0d76dd6 940
XinZhangMS 0:f7f1f0d76dd6 941 void mqtt_client_deinit(MQTT_CLIENT_HANDLE handle)
XinZhangMS 0:f7f1f0d76dd6 942 {
XinZhangMS 0:f7f1f0d76dd6 943 /*Codes_SRS_MQTT_CLIENT_07_004: [If the parameter handle is NULL then function mqtt_client_deinit shall do nothing.]*/
XinZhangMS 0:f7f1f0d76dd6 944 if (handle != NULL)
XinZhangMS 0:f7f1f0d76dd6 945 {
XinZhangMS 0:f7f1f0d76dd6 946 /*Codes_SRS_MQTT_CLIENT_07_005: [mqtt_client_deinit shall deallocate all memory allocated in this unit.]*/
XinZhangMS 0:f7f1f0d76dd6 947 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
XinZhangMS 0:f7f1f0d76dd6 948 tickcounter_destroy(mqtt_client->packetTickCntr);
XinZhangMS 0:f7f1f0d76dd6 949 mqtt_codec_destroy(mqtt_client->codec_handle);
XinZhangMS 0:f7f1f0d76dd6 950 clear_mqtt_options(mqtt_client);
XinZhangMS 0:f7f1f0d76dd6 951 free(mqtt_client);
XinZhangMS 0:f7f1f0d76dd6 952 }
XinZhangMS 0:f7f1f0d76dd6 953 }
XinZhangMS 0:f7f1f0d76dd6 954
XinZhangMS 0:f7f1f0d76dd6 955 int mqtt_client_connect(MQTT_CLIENT_HANDLE handle, XIO_HANDLE xioHandle, MQTT_CLIENT_OPTIONS* mqttOptions)
XinZhangMS 0:f7f1f0d76dd6 956 {
XinZhangMS 0:f7f1f0d76dd6 957 int result;
XinZhangMS 0:f7f1f0d76dd6 958 /*SRS_MQTT_CLIENT_07_006: [If any of the parameters handle, ioHandle, or mqttOptions are NULL then mqtt_client_connect shall return a non-zero value.]*/
XinZhangMS 0:f7f1f0d76dd6 959 if (handle == NULL || mqttOptions == NULL)
XinZhangMS 0:f7f1f0d76dd6 960 {
XinZhangMS 0:f7f1f0d76dd6 961 LOG(AZ_LOG_ERROR, LOG_LINE, "mqtt_client_connect: NULL argument (handle = %p, mqttOptions = %p)", handle, mqttOptions);
XinZhangMS 0:f7f1f0d76dd6 962 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 963 }
XinZhangMS 0:f7f1f0d76dd6 964 else
XinZhangMS 0:f7f1f0d76dd6 965 {
XinZhangMS 0:f7f1f0d76dd6 966 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
XinZhangMS 0:f7f1f0d76dd6 967 if (xioHandle == NULL)
XinZhangMS 0:f7f1f0d76dd6 968 {
XinZhangMS 0:f7f1f0d76dd6 969 /*Codes_SRS_MQTT_CLIENT_07_007: [If any failure is encountered then mqtt_client_connect shall return a non-zero value.]*/
XinZhangMS 0:f7f1f0d76dd6 970 LOG(AZ_LOG_ERROR, LOG_LINE, "Error: mqttcodec_connect failed");
XinZhangMS 0:f7f1f0d76dd6 971 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 972 }
XinZhangMS 0:f7f1f0d76dd6 973 else
XinZhangMS 0:f7f1f0d76dd6 974 {
XinZhangMS 0:f7f1f0d76dd6 975 mqtt_client->xioHandle = xioHandle;
XinZhangMS 0:f7f1f0d76dd6 976 mqtt_client->packetState = UNKNOWN_TYPE;
XinZhangMS 0:f7f1f0d76dd6 977 mqtt_client->qosValue = mqttOptions->qualityOfServiceValue;
XinZhangMS 0:f7f1f0d76dd6 978 mqtt_client->keepAliveInterval = mqttOptions->keepAliveInterval;
XinZhangMS 0:f7f1f0d76dd6 979 mqtt_client->maxPingRespTime = (DEFAULT_MAX_PING_RESPONSE_TIME < mqttOptions->keepAliveInterval/2) ? DEFAULT_MAX_PING_RESPONSE_TIME : mqttOptions->keepAliveInterval/2;
XinZhangMS 0:f7f1f0d76dd6 980 if (cloneMqttOptions(mqtt_client, mqttOptions) != 0)
XinZhangMS 0:f7f1f0d76dd6 981 {
XinZhangMS 0:f7f1f0d76dd6 982 LOG(AZ_LOG_ERROR, LOG_LINE, "Error: Clone Mqtt Options failed");
XinZhangMS 0:f7f1f0d76dd6 983 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 984 }
XinZhangMS 0:f7f1f0d76dd6 985 /*Codes_SRS_MQTT_CLIENT_07_008: [mqtt_client_connect shall open the XIO_HANDLE by calling into the xio_open interface.]*/
XinZhangMS 0:f7f1f0d76dd6 986 else if (xio_open(xioHandle, onOpenComplete, mqtt_client, onBytesReceived, mqtt_client, onIoError, mqtt_client) != 0)
XinZhangMS 0:f7f1f0d76dd6 987 {
XinZhangMS 0:f7f1f0d76dd6 988 /*Codes_SRS_MQTT_CLIENT_07_007: [If any failure is encountered then mqtt_client_connect shall return a non-zero value.]*/
XinZhangMS 0:f7f1f0d76dd6 989 LOG(AZ_LOG_ERROR, LOG_LINE, "Error: io_open failed");
XinZhangMS 0:f7f1f0d76dd6 990 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 991 // Remove cloned options
XinZhangMS 0:f7f1f0d76dd6 992 clear_mqtt_options(mqtt_client);
XinZhangMS 0:f7f1f0d76dd6 993 }
XinZhangMS 0:f7f1f0d76dd6 994 else
XinZhangMS 0:f7f1f0d76dd6 995 {
XinZhangMS 0:f7f1f0d76dd6 996 result = 0;
XinZhangMS 0:f7f1f0d76dd6 997 }
XinZhangMS 0:f7f1f0d76dd6 998 }
XinZhangMS 0:f7f1f0d76dd6 999 }
XinZhangMS 0:f7f1f0d76dd6 1000 return result;
XinZhangMS 0:f7f1f0d76dd6 1001 }
XinZhangMS 0:f7f1f0d76dd6 1002
XinZhangMS 0:f7f1f0d76dd6 1003 int mqtt_client_publish(MQTT_CLIENT_HANDLE handle, MQTT_MESSAGE_HANDLE msgHandle)
XinZhangMS 0:f7f1f0d76dd6 1004 {
XinZhangMS 0:f7f1f0d76dd6 1005 int result;
XinZhangMS 0:f7f1f0d76dd6 1006 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
XinZhangMS 0:f7f1f0d76dd6 1007 if (mqtt_client == NULL || msgHandle == NULL)
XinZhangMS 0:f7f1f0d76dd6 1008 {
XinZhangMS 0:f7f1f0d76dd6 1009 /*Codes_SRS_MQTT_CLIENT_07_019: [If one of the parameters handle or msgHandle is NULL then mqtt_client_publish shall return a non-zero value.]*/
XinZhangMS 0:f7f1f0d76dd6 1010 LogError("Invalid parameter specified mqtt_client: %p, msgHandle: %p", mqtt_client, msgHandle);
XinZhangMS 0:f7f1f0d76dd6 1011 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1012 }
XinZhangMS 0:f7f1f0d76dd6 1013 else
XinZhangMS 0:f7f1f0d76dd6 1014 {
XinZhangMS 0:f7f1f0d76dd6 1015 /*Codes_SRS_MQTT_CLIENT_07_021: [mqtt_client_publish shall get the message information from the MQTT_MESSAGE_HANDLE.]*/
XinZhangMS 0:f7f1f0d76dd6 1016 const APP_PAYLOAD* payload = mqttmessage_getApplicationMsg(msgHandle);
XinZhangMS 0:f7f1f0d76dd6 1017 if (payload == NULL)
XinZhangMS 0:f7f1f0d76dd6 1018 {
XinZhangMS 0:f7f1f0d76dd6 1019 /*Codes_SRS_MQTT_CLIENT_07_020: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/
XinZhangMS 0:f7f1f0d76dd6 1020 LOG(AZ_LOG_ERROR, LOG_LINE, "Error: mqttmessage_getApplicationMsg failed");
XinZhangMS 0:f7f1f0d76dd6 1021 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1022 }
XinZhangMS 0:f7f1f0d76dd6 1023 else
XinZhangMS 0:f7f1f0d76dd6 1024 {
XinZhangMS 0:f7f1f0d76dd6 1025 STRING_HANDLE trace_log = construct_trace_log_handle(mqtt_client);
XinZhangMS 0:f7f1f0d76dd6 1026
XinZhangMS 0:f7f1f0d76dd6 1027 QOS_VALUE qos = mqttmessage_getQosType(msgHandle);
XinZhangMS 0:f7f1f0d76dd6 1028 bool isDuplicate = mqttmessage_getIsDuplicateMsg(msgHandle);
XinZhangMS 0:f7f1f0d76dd6 1029 bool isRetained = mqttmessage_getIsRetained(msgHandle);
XinZhangMS 0:f7f1f0d76dd6 1030 uint16_t packetId = mqttmessage_getPacketId(msgHandle);
XinZhangMS 0:f7f1f0d76dd6 1031 const char* topicName = mqttmessage_getTopicName(msgHandle);
XinZhangMS 0:f7f1f0d76dd6 1032 BUFFER_HANDLE publishPacket = mqtt_codec_publish(qos, isDuplicate, isRetained, packetId, topicName, payload->message, payload->length, trace_log);
XinZhangMS 0:f7f1f0d76dd6 1033 if (publishPacket == NULL)
XinZhangMS 0:f7f1f0d76dd6 1034 {
XinZhangMS 0:f7f1f0d76dd6 1035 /*Codes_SRS_MQTT_CLIENT_07_020: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/
XinZhangMS 0:f7f1f0d76dd6 1036 LOG(AZ_LOG_ERROR, LOG_LINE, "Error: mqtt_codec_publish failed");
XinZhangMS 0:f7f1f0d76dd6 1037 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1038 }
XinZhangMS 0:f7f1f0d76dd6 1039 else
XinZhangMS 0:f7f1f0d76dd6 1040 {
XinZhangMS 0:f7f1f0d76dd6 1041 mqtt_client->packetState = PUBLISH_TYPE;
XinZhangMS 0:f7f1f0d76dd6 1042
XinZhangMS 0:f7f1f0d76dd6 1043 /*Codes_SRS_MQTT_CLIENT_07_022: [On success mqtt_client_publish shall send the MQTT SUBCRIBE packet to the endpoint.]*/
XinZhangMS 0:f7f1f0d76dd6 1044 size_t size = BUFFER_length(publishPacket);
XinZhangMS 0:f7f1f0d76dd6 1045 if (sendPacketItem(mqtt_client, BUFFER_u_char(publishPacket), size) != 0)
XinZhangMS 0:f7f1f0d76dd6 1046 {
XinZhangMS 0:f7f1f0d76dd6 1047 /*Codes_SRS_MQTT_CLIENT_07_020: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/
XinZhangMS 0:f7f1f0d76dd6 1048 LOG(AZ_LOG_ERROR, LOG_LINE, "Error: mqtt_client_publish send failed");
XinZhangMS 0:f7f1f0d76dd6 1049 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1050 }
XinZhangMS 0:f7f1f0d76dd6 1051 else
XinZhangMS 0:f7f1f0d76dd6 1052 {
XinZhangMS 0:f7f1f0d76dd6 1053 log_outgoing_trace(mqtt_client, trace_log);
XinZhangMS 0:f7f1f0d76dd6 1054 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1055 }
XinZhangMS 0:f7f1f0d76dd6 1056 BUFFER_delete(publishPacket);
XinZhangMS 0:f7f1f0d76dd6 1057 }
XinZhangMS 0:f7f1f0d76dd6 1058 if (trace_log != NULL)
XinZhangMS 0:f7f1f0d76dd6 1059 {
XinZhangMS 0:f7f1f0d76dd6 1060 STRING_delete(trace_log);
XinZhangMS 0:f7f1f0d76dd6 1061 }
XinZhangMS 0:f7f1f0d76dd6 1062 }
XinZhangMS 0:f7f1f0d76dd6 1063 }
XinZhangMS 0:f7f1f0d76dd6 1064 return result;
XinZhangMS 0:f7f1f0d76dd6 1065 }
XinZhangMS 0:f7f1f0d76dd6 1066
XinZhangMS 0:f7f1f0d76dd6 1067 int mqtt_client_subscribe(MQTT_CLIENT_HANDLE handle, uint16_t packetId, SUBSCRIBE_PAYLOAD* subscribeList, size_t count)
XinZhangMS 0:f7f1f0d76dd6 1068 {
XinZhangMS 0:f7f1f0d76dd6 1069 int result;
XinZhangMS 0:f7f1f0d76dd6 1070 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
XinZhangMS 0:f7f1f0d76dd6 1071 if (mqtt_client == NULL || subscribeList == NULL || count == 0 || packetId == 0)
XinZhangMS 0:f7f1f0d76dd6 1072 {
XinZhangMS 0:f7f1f0d76dd6 1073 /*Codes_SRS_MQTT_CLIENT_07_013: [If any of the parameters handle, subscribeList is NULL or count is 0 then mqtt_client_subscribe shall return a non-zero value.]*/
XinZhangMS 0:f7f1f0d76dd6 1074 LogError("Invalid parameter specified mqtt_client: %p, subscribeList: %p, count: %d, packetId: %d", mqtt_client, subscribeList, count, packetId);
XinZhangMS 0:f7f1f0d76dd6 1075 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1076 }
XinZhangMS 0:f7f1f0d76dd6 1077 else
XinZhangMS 0:f7f1f0d76dd6 1078 {
XinZhangMS 0:f7f1f0d76dd6 1079 STRING_HANDLE trace_log = construct_trace_log_handle(mqtt_client);
XinZhangMS 0:f7f1f0d76dd6 1080
XinZhangMS 0:f7f1f0d76dd6 1081 BUFFER_HANDLE subPacket = mqtt_codec_subscribe(packetId, subscribeList, count, trace_log);
XinZhangMS 0:f7f1f0d76dd6 1082 if (subPacket == NULL)
XinZhangMS 0:f7f1f0d76dd6 1083 {
XinZhangMS 0:f7f1f0d76dd6 1084 /*Codes_SRS_MQTT_CLIENT_07_014: [If any failure is encountered then mqtt_client_subscribe shall return a non-zero value.]*/
XinZhangMS 0:f7f1f0d76dd6 1085 LOG(AZ_LOG_ERROR, LOG_LINE, "Error: mqtt_codec_subscribe failed");
XinZhangMS 0:f7f1f0d76dd6 1086 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1087 }
XinZhangMS 0:f7f1f0d76dd6 1088 else
XinZhangMS 0:f7f1f0d76dd6 1089 {
XinZhangMS 0:f7f1f0d76dd6 1090 mqtt_client->packetState = SUBSCRIBE_TYPE;
XinZhangMS 0:f7f1f0d76dd6 1091
XinZhangMS 0:f7f1f0d76dd6 1092 size_t size = BUFFER_length(subPacket);
XinZhangMS 0:f7f1f0d76dd6 1093 /*Codes_SRS_MQTT_CLIENT_07_015: [On success mqtt_client_subscribe shall send the MQTT SUBCRIBE packet to the endpoint.]*/
XinZhangMS 0:f7f1f0d76dd6 1094 if (sendPacketItem(mqtt_client, BUFFER_u_char(subPacket), size) != 0)
XinZhangMS 0:f7f1f0d76dd6 1095 {
XinZhangMS 0:f7f1f0d76dd6 1096 /*Codes_SRS_MQTT_CLIENT_07_014: [If any failure is encountered then mqtt_client_subscribe shall return a non-zero value.]*/
XinZhangMS 0:f7f1f0d76dd6 1097 LOG(AZ_LOG_ERROR, LOG_LINE, "Error: mqtt_client_subscribe send failed");
XinZhangMS 0:f7f1f0d76dd6 1098 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1099 }
XinZhangMS 0:f7f1f0d76dd6 1100 else
XinZhangMS 0:f7f1f0d76dd6 1101 {
XinZhangMS 0:f7f1f0d76dd6 1102 log_outgoing_trace(mqtt_client, trace_log);
XinZhangMS 0:f7f1f0d76dd6 1103 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1104 }
XinZhangMS 0:f7f1f0d76dd6 1105 BUFFER_delete(subPacket);
XinZhangMS 0:f7f1f0d76dd6 1106 }
XinZhangMS 0:f7f1f0d76dd6 1107 if (trace_log != NULL)
XinZhangMS 0:f7f1f0d76dd6 1108 {
XinZhangMS 0:f7f1f0d76dd6 1109 STRING_delete(trace_log);
XinZhangMS 0:f7f1f0d76dd6 1110 }
XinZhangMS 0:f7f1f0d76dd6 1111 }
XinZhangMS 0:f7f1f0d76dd6 1112 return result;
XinZhangMS 0:f7f1f0d76dd6 1113 }
XinZhangMS 0:f7f1f0d76dd6 1114
XinZhangMS 0:f7f1f0d76dd6 1115 int mqtt_client_unsubscribe(MQTT_CLIENT_HANDLE handle, uint16_t packetId, const char** unsubscribeList, size_t count)
XinZhangMS 0:f7f1f0d76dd6 1116 {
XinZhangMS 0:f7f1f0d76dd6 1117 int result;
XinZhangMS 0:f7f1f0d76dd6 1118 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
XinZhangMS 0:f7f1f0d76dd6 1119 if (mqtt_client == NULL || unsubscribeList == NULL || count == 0 || packetId == 0)
XinZhangMS 0:f7f1f0d76dd6 1120 {
XinZhangMS 0:f7f1f0d76dd6 1121 /*Codes_SRS_MQTT_CLIENT_07_016: [If any of the parameters handle, unsubscribeList is NULL or count is 0 then mqtt_client_unsubscribe shall return a non-zero value.]*/
XinZhangMS 0:f7f1f0d76dd6 1122 LogError("Invalid parameter specified mqtt_client: %p, unsubscribeList: %p, count: %d, packetId: %d", mqtt_client, unsubscribeList, count, packetId);
XinZhangMS 0:f7f1f0d76dd6 1123 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1124 }
XinZhangMS 0:f7f1f0d76dd6 1125 else
XinZhangMS 0:f7f1f0d76dd6 1126 {
XinZhangMS 0:f7f1f0d76dd6 1127 STRING_HANDLE trace_log = construct_trace_log_handle(mqtt_client);
XinZhangMS 0:f7f1f0d76dd6 1128
XinZhangMS 0:f7f1f0d76dd6 1129 BUFFER_HANDLE unsubPacket = mqtt_codec_unsubscribe(packetId, unsubscribeList, count, trace_log);
XinZhangMS 0:f7f1f0d76dd6 1130 if (unsubPacket == NULL)
XinZhangMS 0:f7f1f0d76dd6 1131 {
XinZhangMS 0:f7f1f0d76dd6 1132 /*Codes_SRS_MQTT_CLIENT_07_017: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/
XinZhangMS 0:f7f1f0d76dd6 1133 LOG(AZ_LOG_ERROR, LOG_LINE, "Error: mqtt_codec_unsubscribe failed");
XinZhangMS 0:f7f1f0d76dd6 1134 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1135 }
XinZhangMS 0:f7f1f0d76dd6 1136 else
XinZhangMS 0:f7f1f0d76dd6 1137 {
XinZhangMS 0:f7f1f0d76dd6 1138 mqtt_client->packetState = UNSUBSCRIBE_TYPE;
XinZhangMS 0:f7f1f0d76dd6 1139
XinZhangMS 0:f7f1f0d76dd6 1140 size_t size = BUFFER_length(unsubPacket);
XinZhangMS 0:f7f1f0d76dd6 1141 /*Codes_SRS_MQTT_CLIENT_07_018: [On success mqtt_client_unsubscribe shall send the MQTT SUBCRIBE packet to the endpoint.]*/
XinZhangMS 0:f7f1f0d76dd6 1142 if (sendPacketItem(mqtt_client, BUFFER_u_char(unsubPacket), size) != 0)
XinZhangMS 0:f7f1f0d76dd6 1143 {
XinZhangMS 0:f7f1f0d76dd6 1144 /*Codes_SRS_MQTT_CLIENT_07_017: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.].]*/
XinZhangMS 0:f7f1f0d76dd6 1145 LOG(AZ_LOG_ERROR, LOG_LINE, "Error: mqtt_client_unsubscribe send failed");
XinZhangMS 0:f7f1f0d76dd6 1146 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1147 }
XinZhangMS 0:f7f1f0d76dd6 1148 else
XinZhangMS 0:f7f1f0d76dd6 1149 {
XinZhangMS 0:f7f1f0d76dd6 1150 log_outgoing_trace(mqtt_client, trace_log);
XinZhangMS 0:f7f1f0d76dd6 1151 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1152 }
XinZhangMS 0:f7f1f0d76dd6 1153 BUFFER_delete(unsubPacket);
XinZhangMS 0:f7f1f0d76dd6 1154 }
XinZhangMS 0:f7f1f0d76dd6 1155 if (trace_log != NULL)
XinZhangMS 0:f7f1f0d76dd6 1156 {
XinZhangMS 0:f7f1f0d76dd6 1157 STRING_delete(trace_log);
XinZhangMS 0:f7f1f0d76dd6 1158 }
XinZhangMS 0:f7f1f0d76dd6 1159 }
XinZhangMS 0:f7f1f0d76dd6 1160 return result;
XinZhangMS 0:f7f1f0d76dd6 1161 }
XinZhangMS 0:f7f1f0d76dd6 1162
XinZhangMS 0:f7f1f0d76dd6 1163 int mqtt_client_disconnect(MQTT_CLIENT_HANDLE handle, ON_MQTT_DISCONNECTED_CALLBACK callback, void* ctx)
XinZhangMS 0:f7f1f0d76dd6 1164 {
XinZhangMS 0:f7f1f0d76dd6 1165 int result;
XinZhangMS 0:f7f1f0d76dd6 1166 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
XinZhangMS 0:f7f1f0d76dd6 1167 if (mqtt_client == NULL)
XinZhangMS 0:f7f1f0d76dd6 1168 {
XinZhangMS 0:f7f1f0d76dd6 1169 /*Codes_SRS_MQTT_CLIENT_07_010: [If the parameters handle is NULL then mqtt_client_disconnect shall return a non-zero value.]*/
XinZhangMS 0:f7f1f0d76dd6 1170 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1171 }
XinZhangMS 0:f7f1f0d76dd6 1172 else
XinZhangMS 0:f7f1f0d76dd6 1173 {
XinZhangMS 0:f7f1f0d76dd6 1174 if (mqtt_client->clientConnected)
XinZhangMS 0:f7f1f0d76dd6 1175 {
XinZhangMS 0:f7f1f0d76dd6 1176 BUFFER_HANDLE disconnectPacket = mqtt_codec_disconnect();
XinZhangMS 0:f7f1f0d76dd6 1177 if (disconnectPacket == NULL)
XinZhangMS 0:f7f1f0d76dd6 1178 {
XinZhangMS 0:f7f1f0d76dd6 1179 /*Codes_SRS_MQTT_CLIENT_07_011: [If any failure is encountered then mqtt_client_disconnect shall return a non-zero value.]*/
XinZhangMS 0:f7f1f0d76dd6 1180 LOG(AZ_LOG_ERROR, LOG_LINE, "Error: mqtt_client_disconnect failed");
XinZhangMS 0:f7f1f0d76dd6 1181 mqtt_client->packetState = PACKET_TYPE_ERROR;
XinZhangMS 0:f7f1f0d76dd6 1182 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1183 }
XinZhangMS 0:f7f1f0d76dd6 1184 else
XinZhangMS 0:f7f1f0d76dd6 1185 {
XinZhangMS 0:f7f1f0d76dd6 1186 /* Codes_SRS_MQTT_CLIENT_07_037: [ if callback is not NULL callback shall be called once the mqtt connection has been disconnected ] */
XinZhangMS 0:f7f1f0d76dd6 1187 mqtt_client->disconnect_cb = callback;
XinZhangMS 0:f7f1f0d76dd6 1188 mqtt_client->disconnect_ctx = ctx;
XinZhangMS 0:f7f1f0d76dd6 1189 mqtt_client->packetState = DISCONNECT_TYPE;
XinZhangMS 0:f7f1f0d76dd6 1190
XinZhangMS 0:f7f1f0d76dd6 1191 size_t size = BUFFER_length(disconnectPacket);
XinZhangMS 0:f7f1f0d76dd6 1192 /*Codes_SRS_MQTT_CLIENT_07_012: [On success mqtt_client_disconnect shall send the MQTT DISCONNECT packet to the endpoint.]*/
XinZhangMS 0:f7f1f0d76dd6 1193 if (sendPacketItem(mqtt_client, BUFFER_u_char(disconnectPacket), size) != 0)
XinZhangMS 0:f7f1f0d76dd6 1194 {
XinZhangMS 0:f7f1f0d76dd6 1195 /*Codes_SRS_MQTT_CLIENT_07_011: [If any failure is encountered then mqtt_client_disconnect shall return a non-zero value.]*/
XinZhangMS 0:f7f1f0d76dd6 1196 LOG(AZ_LOG_ERROR, LOG_LINE, "Error: mqtt_client_disconnect send failed");
XinZhangMS 0:f7f1f0d76dd6 1197 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1198 }
XinZhangMS 0:f7f1f0d76dd6 1199 else
XinZhangMS 0:f7f1f0d76dd6 1200 {
XinZhangMS 0:f7f1f0d76dd6 1201 if (mqtt_client->logTrace)
XinZhangMS 0:f7f1f0d76dd6 1202 {
XinZhangMS 0:f7f1f0d76dd6 1203 STRING_HANDLE trace_log = STRING_construct("DISCONNECT");
XinZhangMS 0:f7f1f0d76dd6 1204 log_outgoing_trace(mqtt_client, trace_log);
XinZhangMS 0:f7f1f0d76dd6 1205 STRING_delete(trace_log);
XinZhangMS 0:f7f1f0d76dd6 1206 }
XinZhangMS 0:f7f1f0d76dd6 1207 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1208 }
XinZhangMS 0:f7f1f0d76dd6 1209 BUFFER_delete(disconnectPacket);
XinZhangMS 0:f7f1f0d76dd6 1210 }
XinZhangMS 0:f7f1f0d76dd6 1211 clear_mqtt_options(mqtt_client);
XinZhangMS 0:f7f1f0d76dd6 1212 mqtt_client->xioHandle = NULL;
XinZhangMS 0:f7f1f0d76dd6 1213 }
XinZhangMS 0:f7f1f0d76dd6 1214 else
XinZhangMS 0:f7f1f0d76dd6 1215 {
XinZhangMS 0:f7f1f0d76dd6 1216 // If the client is not connected then just close the underlying socket
XinZhangMS 0:f7f1f0d76dd6 1217 mqtt_client->disconnect_cb = callback;
XinZhangMS 0:f7f1f0d76dd6 1218 mqtt_client->disconnect_ctx = ctx;
XinZhangMS 0:f7f1f0d76dd6 1219
XinZhangMS 0:f7f1f0d76dd6 1220 close_connection(mqtt_client);
XinZhangMS 0:f7f1f0d76dd6 1221 clear_mqtt_options(mqtt_client);
XinZhangMS 0:f7f1f0d76dd6 1222 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1223 }
XinZhangMS 0:f7f1f0d76dd6 1224 }
XinZhangMS 0:f7f1f0d76dd6 1225 return result;
XinZhangMS 0:f7f1f0d76dd6 1226 }
XinZhangMS 0:f7f1f0d76dd6 1227
XinZhangMS 0:f7f1f0d76dd6 1228 void mqtt_client_dowork(MQTT_CLIENT_HANDLE handle)
XinZhangMS 0:f7f1f0d76dd6 1229 {
XinZhangMS 0:f7f1f0d76dd6 1230 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
XinZhangMS 0:f7f1f0d76dd6 1231 /*Codes_SRS_MQTT_CLIENT_18_001: [If the client is disconnected, mqtt_client_dowork shall do nothing.]*/
XinZhangMS 0:f7f1f0d76dd6 1232 /*Codes_SRS_MQTT_CLIENT_07_023: [If the parameter handle is NULL then mqtt_client_dowork shall do nothing.]*/
XinZhangMS 0:f7f1f0d76dd6 1233 if (mqtt_client != NULL && mqtt_client->xioHandle != NULL)
XinZhangMS 0:f7f1f0d76dd6 1234 {
XinZhangMS 0:f7f1f0d76dd6 1235 /*Codes_SRS_MQTT_CLIENT_07_024: [mqtt_client_dowork shall call the xio_dowork function to complete operations.]*/
XinZhangMS 0:f7f1f0d76dd6 1236 xio_dowork(mqtt_client->xioHandle);
XinZhangMS 0:f7f1f0d76dd6 1237
XinZhangMS 0:f7f1f0d76dd6 1238 /*Codes_SRS_MQTT_CLIENT_07_025: [mqtt_client_dowork shall retrieve the the last packet send value and ...]*/
XinZhangMS 0:f7f1f0d76dd6 1239 if (mqtt_client->socketConnected && mqtt_client->clientConnected && mqtt_client->keepAliveInterval > 0)
XinZhangMS 0:f7f1f0d76dd6 1240 {
XinZhangMS 0:f7f1f0d76dd6 1241 tickcounter_ms_t current_ms;
XinZhangMS 0:f7f1f0d76dd6 1242 if (tickcounter_get_current_ms(mqtt_client->packetTickCntr, &current_ms) != 0)
XinZhangMS 0:f7f1f0d76dd6 1243 {
XinZhangMS 0:f7f1f0d76dd6 1244 LOG(AZ_LOG_ERROR, LOG_LINE, "Error: tickcounter_get_current_ms failed");
XinZhangMS 0:f7f1f0d76dd6 1245 }
XinZhangMS 0:f7f1f0d76dd6 1246 else
XinZhangMS 0:f7f1f0d76dd6 1247 {
XinZhangMS 0:f7f1f0d76dd6 1248 /* Codes_SRS_MQTT_CLIENT_07_035: [If the timeSincePing has expired past the maxPingRespTime then mqtt_client_dowork shall call the Error Callback function with the message MQTT_CLIENT_NO_PING_RESPONSE] */
XinZhangMS 0:f7f1f0d76dd6 1249 if (mqtt_client->timeSincePing > 0 && ((current_ms - mqtt_client->timeSincePing)/1000) > mqtt_client->maxPingRespTime)
XinZhangMS 0:f7f1f0d76dd6 1250 {
XinZhangMS 0:f7f1f0d76dd6 1251 // We haven't gotten a ping response in the alloted time
XinZhangMS 0:f7f1f0d76dd6 1252 set_error_callback(mqtt_client, MQTT_CLIENT_NO_PING_RESPONSE);
XinZhangMS 0:f7f1f0d76dd6 1253 mqtt_client->timeSincePing = 0;
XinZhangMS 0:f7f1f0d76dd6 1254 mqtt_client->packetSendTimeMs = 0;
XinZhangMS 0:f7f1f0d76dd6 1255 mqtt_client->packetState = UNKNOWN_TYPE;
XinZhangMS 0:f7f1f0d76dd6 1256 }
XinZhangMS 0:f7f1f0d76dd6 1257 else if (((current_ms - mqtt_client->packetSendTimeMs) / 1000) >= mqtt_client->keepAliveInterval)
XinZhangMS 0:f7f1f0d76dd6 1258 {
XinZhangMS 0:f7f1f0d76dd6 1259 /*Codes_SRS_MQTT_CLIENT_07_026: [if keepAliveInternal is > 0 and the send time is greater than the MQTT KeepAliveInterval then it shall construct an MQTT PINGREQ packet.]*/
XinZhangMS 0:f7f1f0d76dd6 1260 BUFFER_HANDLE pingPacket = mqtt_codec_ping();
XinZhangMS 0:f7f1f0d76dd6 1261 if (pingPacket != NULL)
XinZhangMS 0:f7f1f0d76dd6 1262 {
XinZhangMS 0:f7f1f0d76dd6 1263 size_t size = BUFFER_length(pingPacket);
XinZhangMS 0:f7f1f0d76dd6 1264 (void)sendPacketItem(mqtt_client, BUFFER_u_char(pingPacket), size);
XinZhangMS 0:f7f1f0d76dd6 1265 BUFFER_delete(pingPacket);
XinZhangMS 0:f7f1f0d76dd6 1266 (void)tickcounter_get_current_ms(mqtt_client->packetTickCntr, &mqtt_client->timeSincePing);
XinZhangMS 0:f7f1f0d76dd6 1267
XinZhangMS 0:f7f1f0d76dd6 1268 if (mqtt_client->logTrace)
XinZhangMS 0:f7f1f0d76dd6 1269 {
XinZhangMS 0:f7f1f0d76dd6 1270 STRING_HANDLE trace_log = STRING_construct("PINGREQ");
XinZhangMS 0:f7f1f0d76dd6 1271 log_outgoing_trace(mqtt_client, trace_log);
XinZhangMS 0:f7f1f0d76dd6 1272 STRING_delete(trace_log);
XinZhangMS 0:f7f1f0d76dd6 1273 }
XinZhangMS 0:f7f1f0d76dd6 1274 }
XinZhangMS 0:f7f1f0d76dd6 1275 }
XinZhangMS 0:f7f1f0d76dd6 1276 }
XinZhangMS 0:f7f1f0d76dd6 1277 }
XinZhangMS 0:f7f1f0d76dd6 1278 }
XinZhangMS 0:f7f1f0d76dd6 1279 }
XinZhangMS 0:f7f1f0d76dd6 1280
XinZhangMS 0:f7f1f0d76dd6 1281 void mqtt_client_set_trace(MQTT_CLIENT_HANDLE handle, bool traceOn, bool rawBytesOn)
XinZhangMS 0:f7f1f0d76dd6 1282 {
XinZhangMS 0:f7f1f0d76dd6 1283 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
XinZhangMS 0:f7f1f0d76dd6 1284 if (mqtt_client != NULL)
XinZhangMS 0:f7f1f0d76dd6 1285 {
XinZhangMS 0:f7f1f0d76dd6 1286 mqtt_client->logTrace = traceOn;
XinZhangMS 0:f7f1f0d76dd6 1287 mqtt_client->rawBytesTrace = rawBytesOn;
XinZhangMS 0:f7f1f0d76dd6 1288 }
XinZhangMS 0:f7f1f0d76dd6 1289 }