Azure IoT / iothub_amqp_transport

Dependents:   sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more

Committer:
AzureIoTClient
Date:
Mon Jun 11 15:38:09 2018 -0700
Revision:
53:e21e1e88460f
Parent:
51:269e65571b39
Child:
54:830550fef7ea
1.2.5

Who changed what in which revision?

UserRevisionLine numberNew contents of line
AzureIoTClient 39:e98d5df6dc74 1 // Copyright (c) Microsoft. All rights reserved.
AzureIoTClient 39:e98d5df6dc74 2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
AzureIoTClient 39:e98d5df6dc74 3
AzureIoTClient 39:e98d5df6dc74 4 #include <stdlib.h>
AzureIoTClient 39:e98d5df6dc74 5 #include <stdbool.h>
AzureIoTClient 39:e98d5df6dc74 6 #include "azure_c_shared_utility/optimize_size.h"
AzureIoTClient 39:e98d5df6dc74 7 #include "azure_c_shared_utility/crt_abstractions.h"
AzureIoTClient 39:e98d5df6dc74 8 #include "azure_c_shared_utility/gballoc.h"
AzureIoTClient 39:e98d5df6dc74 9 #include "azure_c_shared_utility/agenttime.h"
AzureIoTClient 39:e98d5df6dc74 10 #include "azure_c_shared_utility/xlogging.h"
AzureIoTClient 39:e98d5df6dc74 11 #include "azure_c_shared_utility/uniqueid.h"
AzureIoTClient 39:e98d5df6dc74 12 #include "azure_uamqp_c/link.h"
AzureIoTClient 39:e98d5df6dc74 13 #include "azure_uamqp_c/messaging.h"
AzureIoTClient 39:e98d5df6dc74 14 #include "azure_uamqp_c/message_sender.h"
AzureIoTClient 39:e98d5df6dc74 15 #include "azure_uamqp_c/message_receiver.h"
AzureIoTClient 53:e21e1e88460f 16 #include "internal/message_queue.h"
AzureIoTClient 53:e21e1e88460f 17 #include "internal/iothub_client_retry_control.h"
AzureIoTClient 53:e21e1e88460f 18 #include "internal/iothubtransport_amqp_messenger.h"
AzureIoTClient 39:e98d5df6dc74 19
AzureIoTClient 39:e98d5df6dc74 20 DEFINE_ENUM_STRINGS(AMQP_MESSENGER_SEND_STATUS, AMQP_MESSENGER_SEND_STATUS_VALUES);
AzureIoTClient 39:e98d5df6dc74 21 DEFINE_ENUM_STRINGS(AMQP_MESSENGER_SEND_RESULT, AMQP_MESSENGER_SEND_RESULT_VALUES);
AzureIoTClient 39:e98d5df6dc74 22 DEFINE_ENUM_STRINGS(AMQP_MESSENGER_REASON, AMQP_MESSENGER_REASON_VALUES);
AzureIoTClient 39:e98d5df6dc74 23 DEFINE_ENUM_STRINGS(AMQP_MESSENGER_DISPOSITION_RESULT, AMQP_MESSENGER_DISPOSITION_RESULT_VALUES);
AzureIoTClient 39:e98d5df6dc74 24 DEFINE_ENUM_STRINGS(AMQP_MESSENGER_STATE, AMQP_MESSENGER_STATE_VALUES);
AzureIoTClient 39:e98d5df6dc74 25
AzureIoTClient 39:e98d5df6dc74 26
AzureIoTClient 39:e98d5df6dc74 27 #define RESULT_OK 0
AzureIoTClient 39:e98d5df6dc74 28 #define INDEFINITE_TIME ((time_t)(-1))
AzureIoTClient 39:e98d5df6dc74 29
AzureIoTClient 39:e98d5df6dc74 30 // AMQP Link address format: "amqps://<iot hub fqdn>/devices/<device-id>/<suffix>"
AzureIoTClient 39:e98d5df6dc74 31 #define LINK_ADDRESS_FORMAT "amqps://%s/devices/%s/%s"
AzureIoTClient 39:e98d5df6dc74 32 #define SEND_LINK_NAME_PREFIX "link-snd"
AzureIoTClient 39:e98d5df6dc74 33 #define MESSAGE_SENDER_MAX_LINK_SIZE UINT64_MAX
AzureIoTClient 39:e98d5df6dc74 34 #define RECEIVE_LINK_NAME_PREFIX "link-rcv"
AzureIoTClient 39:e98d5df6dc74 35 #define MESSAGE_RECEIVER_MAX_LINK_SIZE 65536
AzureIoTClient 39:e98d5df6dc74 36 #define DEFAULT_EVENT_SEND_RETRY_LIMIT 0
AzureIoTClient 39:e98d5df6dc74 37 #define DEFAULT_EVENT_SEND_TIMEOUT_SECS 600
AzureIoTClient 39:e98d5df6dc74 38 #define DEFAULT_MAX_SEND_ERROR_COUNT 10
AzureIoTClient 39:e98d5df6dc74 39 #define MAX_MESSAGE_SENDER_STATE_CHANGE_TIMEOUT_SECS 300
AzureIoTClient 39:e98d5df6dc74 40 #define MAX_MESSAGE_RECEIVER_STATE_CHANGE_TIMEOUT_SECS 300
AzureIoTClient 39:e98d5df6dc74 41 #define UNIQUE_ID_BUFFER_SIZE 37
AzureIoTClient 39:e98d5df6dc74 42
AzureIoTClient 39:e98d5df6dc74 43 static const char* MESSENGER_SAVED_MQ_OPTIONS = "amqp_message_queue_options";
AzureIoTClient 39:e98d5df6dc74 44
AzureIoTClient 39:e98d5df6dc74 45 typedef struct AMQP_MESSENGER_INSTANCE_TAG
AzureIoTClient 39:e98d5df6dc74 46 {
AzureIoTClient 39:e98d5df6dc74 47 AMQP_MESSENGER_CONFIG* config;
AzureIoTClient 39:e98d5df6dc74 48
AzureIoTClient 39:e98d5df6dc74 49 bool receive_messages;
AzureIoTClient 39:e98d5df6dc74 50 ON_AMQP_MESSENGER_MESSAGE_RECEIVED on_message_received_callback;
AzureIoTClient 39:e98d5df6dc74 51 void* on_message_received_context;
AzureIoTClient 39:e98d5df6dc74 52
AzureIoTClient 39:e98d5df6dc74 53 MESSAGE_QUEUE_HANDLE send_queue;
AzureIoTClient 39:e98d5df6dc74 54 AMQP_MESSENGER_STATE state;
AzureIoTClient 39:e98d5df6dc74 55
AzureIoTClient 39:e98d5df6dc74 56 SESSION_HANDLE session_handle;
AzureIoTClient 39:e98d5df6dc74 57
AzureIoTClient 39:e98d5df6dc74 58 LINK_HANDLE sender_link;
AzureIoTClient 39:e98d5df6dc74 59 MESSAGE_SENDER_HANDLE message_sender;
AzureIoTClient 39:e98d5df6dc74 60 MESSAGE_SENDER_STATE message_sender_current_state;
AzureIoTClient 39:e98d5df6dc74 61 MESSAGE_SENDER_STATE message_sender_previous_state;
AzureIoTClient 39:e98d5df6dc74 62
AzureIoTClient 39:e98d5df6dc74 63 LINK_HANDLE receiver_link;
AzureIoTClient 39:e98d5df6dc74 64 MESSAGE_RECEIVER_HANDLE message_receiver;
AzureIoTClient 39:e98d5df6dc74 65 MESSAGE_RECEIVER_STATE message_receiver_current_state;
AzureIoTClient 39:e98d5df6dc74 66 MESSAGE_RECEIVER_STATE message_receiver_previous_state;
AzureIoTClient 39:e98d5df6dc74 67
AzureIoTClient 39:e98d5df6dc74 68 size_t send_error_count;
AzureIoTClient 39:e98d5df6dc74 69 size_t max_send_error_count;
AzureIoTClient 39:e98d5df6dc74 70
AzureIoTClient 39:e98d5df6dc74 71 time_t last_message_sender_state_change_time;
AzureIoTClient 39:e98d5df6dc74 72 time_t last_message_receiver_state_change_time;
AzureIoTClient 39:e98d5df6dc74 73 } AMQP_MESSENGER_INSTANCE;
AzureIoTClient 39:e98d5df6dc74 74
AzureIoTClient 39:e98d5df6dc74 75 typedef struct MESSAGE_SEND_CONTEXT_TAG
AzureIoTClient 39:e98d5df6dc74 76 {
AzureIoTClient 39:e98d5df6dc74 77 MESSAGE_HANDLE message;
AzureIoTClient 39:e98d5df6dc74 78 bool is_destroyed;
AzureIoTClient 39:e98d5df6dc74 79
AzureIoTClient 39:e98d5df6dc74 80 AMQP_MESSENGER_INSTANCE* messenger;
AzureIoTClient 39:e98d5df6dc74 81
AzureIoTClient 39:e98d5df6dc74 82 AMQP_MESSENGER_SEND_COMPLETE_CALLBACK on_send_complete_callback;
AzureIoTClient 39:e98d5df6dc74 83 void* user_context;
AzureIoTClient 39:e98d5df6dc74 84
AzureIoTClient 39:e98d5df6dc74 85 PROCESS_MESSAGE_COMPLETED_CALLBACK on_process_message_completed_callback;
AzureIoTClient 39:e98d5df6dc74 86 } MESSAGE_SEND_CONTEXT;
AzureIoTClient 39:e98d5df6dc74 87
AzureIoTClient 39:e98d5df6dc74 88
AzureIoTClient 39:e98d5df6dc74 89
AzureIoTClient 39:e98d5df6dc74 90 static MESSAGE_SEND_CONTEXT* create_message_send_context()
AzureIoTClient 39:e98d5df6dc74 91 {
AzureIoTClient 39:e98d5df6dc74 92 MESSAGE_SEND_CONTEXT* result;
AzureIoTClient 39:e98d5df6dc74 93
AzureIoTClient 39:e98d5df6dc74 94 if ((result = (MESSAGE_SEND_CONTEXT*)malloc(sizeof(MESSAGE_SEND_CONTEXT))) == NULL)
AzureIoTClient 39:e98d5df6dc74 95 {
AzureIoTClient 39:e98d5df6dc74 96 LogError("Failed creating the message send context");
AzureIoTClient 39:e98d5df6dc74 97 }
AzureIoTClient 39:e98d5df6dc74 98 else
AzureIoTClient 39:e98d5df6dc74 99 {
AzureIoTClient 39:e98d5df6dc74 100 memset(result, 0, sizeof(MESSAGE_SEND_CONTEXT));
AzureIoTClient 39:e98d5df6dc74 101 }
AzureIoTClient 39:e98d5df6dc74 102
AzureIoTClient 39:e98d5df6dc74 103 return result;
AzureIoTClient 39:e98d5df6dc74 104 }
AzureIoTClient 39:e98d5df6dc74 105
AzureIoTClient 39:e98d5df6dc74 106 static bool is_valid_configuration(const AMQP_MESSENGER_CONFIG* config)
AzureIoTClient 39:e98d5df6dc74 107 {
AzureIoTClient 39:e98d5df6dc74 108 bool result;
AzureIoTClient 39:e98d5df6dc74 109
AzureIoTClient 39:e98d5df6dc74 110 if (config == NULL)
AzureIoTClient 39:e98d5df6dc74 111 {
AzureIoTClient 39:e98d5df6dc74 112 LogError("Invalid configuration (NULL)");
AzureIoTClient 39:e98d5df6dc74 113 result = false;
AzureIoTClient 39:e98d5df6dc74 114 }
AzureIoTClient 39:e98d5df6dc74 115 else if (config->client_version == NULL ||
AzureIoTClient 39:e98d5df6dc74 116 config->device_id == NULL ||
AzureIoTClient 39:e98d5df6dc74 117 config->iothub_host_fqdn == NULL ||
AzureIoTClient 39:e98d5df6dc74 118 config->receive_link.source_suffix == NULL ||
AzureIoTClient 39:e98d5df6dc74 119 config->send_link.target_suffix == NULL)
AzureIoTClient 39:e98d5df6dc74 120 {
AzureIoTClient 39:e98d5df6dc74 121 LogError("Invalid configuration (client_version=%p, device_id=%p, iothub_host_fqdn=%p, receive_link (source_suffix=%p), send_link (target_suffix=%p))",
AzureIoTClient 39:e98d5df6dc74 122 config->client_version, config->device_id, config->iothub_host_fqdn,
AzureIoTClient 39:e98d5df6dc74 123 config->receive_link.source_suffix, config->send_link.target_suffix);
AzureIoTClient 39:e98d5df6dc74 124 result = false;
AzureIoTClient 39:e98d5df6dc74 125 }
AzureIoTClient 39:e98d5df6dc74 126 else
AzureIoTClient 39:e98d5df6dc74 127 {
AzureIoTClient 39:e98d5df6dc74 128 result = true;
AzureIoTClient 39:e98d5df6dc74 129 }
AzureIoTClient 39:e98d5df6dc74 130
AzureIoTClient 39:e98d5df6dc74 131 return result;
AzureIoTClient 39:e98d5df6dc74 132 }
AzureIoTClient 39:e98d5df6dc74 133
AzureIoTClient 39:e98d5df6dc74 134 static void destroy_link_configuration(AMQP_MESSENGER_LINK_CONFIG* link_config)
AzureIoTClient 39:e98d5df6dc74 135 {
AzureIoTClient 39:e98d5df6dc74 136 if (link_config->target_suffix != NULL)
AzureIoTClient 39:e98d5df6dc74 137 {
AzureIoTClient 39:e98d5df6dc74 138 free((void*)link_config->target_suffix);
AzureIoTClient 39:e98d5df6dc74 139 link_config->target_suffix = NULL;
AzureIoTClient 39:e98d5df6dc74 140 }
AzureIoTClient 39:e98d5df6dc74 141
AzureIoTClient 39:e98d5df6dc74 142 if (link_config->source_suffix != NULL)
AzureIoTClient 39:e98d5df6dc74 143 {
AzureIoTClient 39:e98d5df6dc74 144 free((void*)link_config->source_suffix);
AzureIoTClient 39:e98d5df6dc74 145 link_config->source_suffix = NULL;
AzureIoTClient 39:e98d5df6dc74 146 }
AzureIoTClient 39:e98d5df6dc74 147
AzureIoTClient 39:e98d5df6dc74 148 if (link_config->attach_properties != NULL)
AzureIoTClient 39:e98d5df6dc74 149 {
AzureIoTClient 39:e98d5df6dc74 150 Map_Destroy(link_config->attach_properties);
AzureIoTClient 39:e98d5df6dc74 151 link_config->attach_properties = NULL;
AzureIoTClient 39:e98d5df6dc74 152 }
AzureIoTClient 39:e98d5df6dc74 153 }
AzureIoTClient 39:e98d5df6dc74 154
AzureIoTClient 39:e98d5df6dc74 155 static void destroy_configuration(AMQP_MESSENGER_CONFIG* config)
AzureIoTClient 39:e98d5df6dc74 156 {
AzureIoTClient 39:e98d5df6dc74 157 if (config != NULL)
AzureIoTClient 39:e98d5df6dc74 158 {
AzureIoTClient 39:e98d5df6dc74 159 if (config->client_version != NULL)
AzureIoTClient 39:e98d5df6dc74 160 {
AzureIoTClient 39:e98d5df6dc74 161 free((void*)config->client_version);
AzureIoTClient 39:e98d5df6dc74 162 }
AzureIoTClient 39:e98d5df6dc74 163
AzureIoTClient 39:e98d5df6dc74 164 if (config->device_id != NULL)
AzureIoTClient 39:e98d5df6dc74 165 {
AzureIoTClient 39:e98d5df6dc74 166 free((void*)config->device_id);
AzureIoTClient 39:e98d5df6dc74 167 }
AzureIoTClient 39:e98d5df6dc74 168
AzureIoTClient 39:e98d5df6dc74 169 if (config->iothub_host_fqdn != NULL)
AzureIoTClient 39:e98d5df6dc74 170 {
AzureIoTClient 39:e98d5df6dc74 171 free((void*)config->iothub_host_fqdn);
AzureIoTClient 39:e98d5df6dc74 172 }
AzureIoTClient 39:e98d5df6dc74 173
AzureIoTClient 39:e98d5df6dc74 174 destroy_link_configuration(&config->send_link);
AzureIoTClient 39:e98d5df6dc74 175 destroy_link_configuration(&config->receive_link);
AzureIoTClient 39:e98d5df6dc74 176
AzureIoTClient 39:e98d5df6dc74 177 free(config);
AzureIoTClient 39:e98d5df6dc74 178 }
AzureIoTClient 39:e98d5df6dc74 179 }
AzureIoTClient 39:e98d5df6dc74 180
AzureIoTClient 39:e98d5df6dc74 181 static int clone_link_configuration(role link_role, AMQP_MESSENGER_LINK_CONFIG* dst_config, const AMQP_MESSENGER_LINK_CONFIG* src_config)
AzureIoTClient 39:e98d5df6dc74 182 {
AzureIoTClient 39:e98d5df6dc74 183 int result;
AzureIoTClient 39:e98d5df6dc74 184
AzureIoTClient 39:e98d5df6dc74 185 if (link_role == role_sender &&
AzureIoTClient 39:e98d5df6dc74 186 mallocAndStrcpy_s(&dst_config->target_suffix, src_config->target_suffix) != 0)
AzureIoTClient 39:e98d5df6dc74 187 {
AzureIoTClient 39:e98d5df6dc74 188 LogError("Failed copying send_link_target_suffix");
AzureIoTClient 39:e98d5df6dc74 189 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 190 }
AzureIoTClient 39:e98d5df6dc74 191 else if (link_role == role_receiver &&
AzureIoTClient 39:e98d5df6dc74 192 mallocAndStrcpy_s(&dst_config->source_suffix, src_config->source_suffix) != 0)
AzureIoTClient 39:e98d5df6dc74 193 {
AzureIoTClient 39:e98d5df6dc74 194 LogError("Failed copying receive_link_source_suffix");
AzureIoTClient 39:e98d5df6dc74 195 destroy_link_configuration(dst_config);
AzureIoTClient 39:e98d5df6dc74 196 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 197 }
AzureIoTClient 39:e98d5df6dc74 198 else if (src_config->attach_properties != NULL &&
AzureIoTClient 39:e98d5df6dc74 199 (dst_config->attach_properties = Map_Clone(src_config->attach_properties)) == NULL)
AzureIoTClient 39:e98d5df6dc74 200 {
AzureIoTClient 39:e98d5df6dc74 201 LogError("Failed copying link attach properties");
AzureIoTClient 39:e98d5df6dc74 202 destroy_link_configuration(dst_config);
AzureIoTClient 39:e98d5df6dc74 203 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 204 }
AzureIoTClient 39:e98d5df6dc74 205 else
AzureIoTClient 39:e98d5df6dc74 206 {
AzureIoTClient 39:e98d5df6dc74 207 dst_config->snd_settle_mode = src_config->snd_settle_mode;
AzureIoTClient 39:e98d5df6dc74 208 dst_config->rcv_settle_mode = src_config->rcv_settle_mode;
AzureIoTClient 39:e98d5df6dc74 209
AzureIoTClient 39:e98d5df6dc74 210 result = RESULT_OK;
AzureIoTClient 39:e98d5df6dc74 211 }
AzureIoTClient 39:e98d5df6dc74 212
AzureIoTClient 39:e98d5df6dc74 213 return result;
AzureIoTClient 39:e98d5df6dc74 214 }
AzureIoTClient 39:e98d5df6dc74 215
AzureIoTClient 39:e98d5df6dc74 216 static AMQP_MESSENGER_CONFIG* clone_configuration(const AMQP_MESSENGER_CONFIG* config)
AzureIoTClient 39:e98d5df6dc74 217 {
AzureIoTClient 39:e98d5df6dc74 218 AMQP_MESSENGER_CONFIG* result;
AzureIoTClient 39:e98d5df6dc74 219
AzureIoTClient 39:e98d5df6dc74 220 if ((result = (AMQP_MESSENGER_CONFIG*)malloc(sizeof(AMQP_MESSENGER_CONFIG))) == NULL)
AzureIoTClient 39:e98d5df6dc74 221 {
AzureIoTClient 39:e98d5df6dc74 222 LogError("Failed allocating AMQP_MESSENGER_CONFIG");
AzureIoTClient 39:e98d5df6dc74 223 }
AzureIoTClient 39:e98d5df6dc74 224 else
AzureIoTClient 39:e98d5df6dc74 225 {
AzureIoTClient 39:e98d5df6dc74 226 memset(result, 0, sizeof(AMQP_MESSENGER_CONFIG));
AzureIoTClient 39:e98d5df6dc74 227
AzureIoTClient 39:e98d5df6dc74 228 if (mallocAndStrcpy_s(&result->client_version, config->client_version) != 0)
AzureIoTClient 39:e98d5df6dc74 229 {
AzureIoTClient 39:e98d5df6dc74 230 LogError("Failed copying device_id");
AzureIoTClient 39:e98d5df6dc74 231 destroy_configuration(result);
AzureIoTClient 39:e98d5df6dc74 232 result = NULL;
AzureIoTClient 39:e98d5df6dc74 233 }
AzureIoTClient 39:e98d5df6dc74 234 else if (mallocAndStrcpy_s(&result->device_id, config->device_id) != 0)
AzureIoTClient 39:e98d5df6dc74 235 {
AzureIoTClient 39:e98d5df6dc74 236 LogError("Failed copying device_id");
AzureIoTClient 39:e98d5df6dc74 237 destroy_configuration(result);
AzureIoTClient 39:e98d5df6dc74 238 result = NULL;
AzureIoTClient 39:e98d5df6dc74 239 }
AzureIoTClient 39:e98d5df6dc74 240 else if (mallocAndStrcpy_s(&result->iothub_host_fqdn, config->iothub_host_fqdn) != 0)
AzureIoTClient 39:e98d5df6dc74 241 {
AzureIoTClient 39:e98d5df6dc74 242 LogError("Failed copying iothub_host_fqdn");
AzureIoTClient 39:e98d5df6dc74 243 destroy_configuration(result);
AzureIoTClient 39:e98d5df6dc74 244 result = NULL;
AzureIoTClient 39:e98d5df6dc74 245 }
AzureIoTClient 39:e98d5df6dc74 246 else if (clone_link_configuration(role_sender, &result->send_link, &config->send_link) != RESULT_OK)
AzureIoTClient 39:e98d5df6dc74 247 {
AzureIoTClient 39:e98d5df6dc74 248 LogError("Failed copying send link configuration");
AzureIoTClient 39:e98d5df6dc74 249 destroy_configuration(result);
AzureIoTClient 39:e98d5df6dc74 250 result = NULL;
AzureIoTClient 39:e98d5df6dc74 251 }
AzureIoTClient 39:e98d5df6dc74 252 else if (clone_link_configuration(role_receiver, &result->receive_link, &config->receive_link) != RESULT_OK)
AzureIoTClient 39:e98d5df6dc74 253 {
AzureIoTClient 39:e98d5df6dc74 254 LogError("Failed copying receive link configuration");
AzureIoTClient 39:e98d5df6dc74 255 destroy_configuration(result);
AzureIoTClient 39:e98d5df6dc74 256 result = NULL;
AzureIoTClient 39:e98d5df6dc74 257 }
AzureIoTClient 39:e98d5df6dc74 258 else
AzureIoTClient 39:e98d5df6dc74 259 {
AzureIoTClient 39:e98d5df6dc74 260 result->on_state_changed_callback = config->on_state_changed_callback;
AzureIoTClient 39:e98d5df6dc74 261 result->on_state_changed_context = config->on_state_changed_context;
AzureIoTClient 39:e98d5df6dc74 262 result->on_subscription_changed_callback = config->on_subscription_changed_callback;
AzureIoTClient 39:e98d5df6dc74 263 result->on_subscription_changed_context = config->on_subscription_changed_context;
AzureIoTClient 39:e98d5df6dc74 264 }
AzureIoTClient 39:e98d5df6dc74 265 }
AzureIoTClient 39:e98d5df6dc74 266
AzureIoTClient 39:e98d5df6dc74 267 return result;
AzureIoTClient 39:e98d5df6dc74 268 }
AzureIoTClient 39:e98d5df6dc74 269
AzureIoTClient 39:e98d5df6dc74 270 static void destroy_message_send_context(MESSAGE_SEND_CONTEXT* context)
AzureIoTClient 39:e98d5df6dc74 271 {
AzureIoTClient 39:e98d5df6dc74 272 free(context);
AzureIoTClient 39:e98d5df6dc74 273 }
AzureIoTClient 39:e98d5df6dc74 274
AzureIoTClient 39:e98d5df6dc74 275 static STRING_HANDLE create_link_address(const char* host_fqdn, const char* device_id, const char* address_suffix)
AzureIoTClient 39:e98d5df6dc74 276 {
AzureIoTClient 39:e98d5df6dc74 277 STRING_HANDLE link_address;
AzureIoTClient 39:e98d5df6dc74 278
AzureIoTClient 39:e98d5df6dc74 279 if ((link_address = STRING_new()) == NULL)
AzureIoTClient 39:e98d5df6dc74 280 {
AzureIoTClient 39:e98d5df6dc74 281 LogError("failed creating link_address (STRING_new failed)");
AzureIoTClient 39:e98d5df6dc74 282 }
AzureIoTClient 39:e98d5df6dc74 283 else if (STRING_sprintf(link_address, LINK_ADDRESS_FORMAT, host_fqdn, device_id, address_suffix) != RESULT_OK)
AzureIoTClient 39:e98d5df6dc74 284 {
AzureIoTClient 39:e98d5df6dc74 285 LogError("Failed creating the link_address (STRING_sprintf failed)");
AzureIoTClient 39:e98d5df6dc74 286 STRING_delete(link_address);
AzureIoTClient 39:e98d5df6dc74 287 link_address = NULL;
AzureIoTClient 39:e98d5df6dc74 288 }
AzureIoTClient 39:e98d5df6dc74 289
AzureIoTClient 39:e98d5df6dc74 290 return link_address;
AzureIoTClient 39:e98d5df6dc74 291 }
AzureIoTClient 39:e98d5df6dc74 292
AzureIoTClient 39:e98d5df6dc74 293 static STRING_HANDLE create_link_terminus_name(STRING_HANDLE link_name, const char* suffix)
AzureIoTClient 39:e98d5df6dc74 294 {
AzureIoTClient 39:e98d5df6dc74 295 STRING_HANDLE terminus_name;
AzureIoTClient 39:e98d5df6dc74 296
AzureIoTClient 39:e98d5df6dc74 297 if ((terminus_name = STRING_new()) == NULL)
AzureIoTClient 39:e98d5df6dc74 298 {
AzureIoTClient 39:e98d5df6dc74 299 LogError("Failed creating the terminus name (STRING_new failed; %s)", suffix);
AzureIoTClient 39:e98d5df6dc74 300 }
AzureIoTClient 39:e98d5df6dc74 301 else
AzureIoTClient 39:e98d5df6dc74 302 {
AzureIoTClient 39:e98d5df6dc74 303 const char* link_name_char_ptr = STRING_c_str(link_name);
AzureIoTClient 39:e98d5df6dc74 304
AzureIoTClient 39:e98d5df6dc74 305 if (STRING_sprintf(terminus_name, "%s-%s", link_name_char_ptr, suffix) != RESULT_OK)
AzureIoTClient 39:e98d5df6dc74 306 {
AzureIoTClient 39:e98d5df6dc74 307 STRING_delete(terminus_name);
AzureIoTClient 39:e98d5df6dc74 308 terminus_name = NULL;
AzureIoTClient 39:e98d5df6dc74 309 LogError("Failed creating the terminus name (STRING_sprintf failed; %s)", suffix);
AzureIoTClient 39:e98d5df6dc74 310 }
AzureIoTClient 39:e98d5df6dc74 311 }
AzureIoTClient 39:e98d5df6dc74 312
AzureIoTClient 39:e98d5df6dc74 313 return terminus_name;
AzureIoTClient 39:e98d5df6dc74 314 }
AzureIoTClient 39:e98d5df6dc74 315
AzureIoTClient 39:e98d5df6dc74 316 static STRING_HANDLE create_link_name(role link_role, const char* device_id)
AzureIoTClient 39:e98d5df6dc74 317 {
AzureIoTClient 39:e98d5df6dc74 318 char* unique_id;
AzureIoTClient 39:e98d5df6dc74 319 STRING_HANDLE result;
AzureIoTClient 39:e98d5df6dc74 320
AzureIoTClient 39:e98d5df6dc74 321 if ((unique_id = (char*)malloc(sizeof(char) * UNIQUE_ID_BUFFER_SIZE + 1)) == NULL)
AzureIoTClient 39:e98d5df6dc74 322 {
AzureIoTClient 39:e98d5df6dc74 323 LogError("Failed generating an unique tag (malloc failed)");
AzureIoTClient 39:e98d5df6dc74 324 result = NULL;
AzureIoTClient 39:e98d5df6dc74 325 }
AzureIoTClient 39:e98d5df6dc74 326 else
AzureIoTClient 39:e98d5df6dc74 327 {
AzureIoTClient 39:e98d5df6dc74 328 memset(unique_id, 0, sizeof(char) * UNIQUE_ID_BUFFER_SIZE + 1);
AzureIoTClient 39:e98d5df6dc74 329
AzureIoTClient 39:e98d5df6dc74 330 if (UniqueId_Generate(unique_id, UNIQUE_ID_BUFFER_SIZE) != UNIQUEID_OK)
AzureIoTClient 39:e98d5df6dc74 331 {
AzureIoTClient 39:e98d5df6dc74 332 LogError("Failed generating an unique tag (UniqueId_Generate failed)");
AzureIoTClient 39:e98d5df6dc74 333 result = NULL;
AzureIoTClient 39:e98d5df6dc74 334 }
AzureIoTClient 39:e98d5df6dc74 335 else if ((result = STRING_new()) == NULL)
AzureIoTClient 39:e98d5df6dc74 336 {
AzureIoTClient 39:e98d5df6dc74 337 LogError("Failed generating an unique tag (STRING_new failed)");
AzureIoTClient 39:e98d5df6dc74 338 }
AzureIoTClient 39:e98d5df6dc74 339 else if (STRING_sprintf(result, "%s-%s-%s", (link_role == role_sender ? SEND_LINK_NAME_PREFIX : RECEIVE_LINK_NAME_PREFIX), device_id, unique_id) != 0)
AzureIoTClient 39:e98d5df6dc74 340 {
AzureIoTClient 39:e98d5df6dc74 341 LogError("Failed generating an unique tag (STRING_sprintf failed)");
AzureIoTClient 39:e98d5df6dc74 342 STRING_delete(result);
AzureIoTClient 39:e98d5df6dc74 343 result = NULL;
AzureIoTClient 39:e98d5df6dc74 344 }
AzureIoTClient 39:e98d5df6dc74 345
AzureIoTClient 39:e98d5df6dc74 346 free(unique_id);
AzureIoTClient 39:e98d5df6dc74 347 }
AzureIoTClient 39:e98d5df6dc74 348
AzureIoTClient 39:e98d5df6dc74 349 return result;
AzureIoTClient 39:e98d5df6dc74 350 }
AzureIoTClient 39:e98d5df6dc74 351
AzureIoTClient 39:e98d5df6dc74 352 static void update_messenger_state(AMQP_MESSENGER_INSTANCE* instance, AMQP_MESSENGER_STATE new_state)
AzureIoTClient 39:e98d5df6dc74 353 {
AzureIoTClient 39:e98d5df6dc74 354 if (new_state != instance->state)
AzureIoTClient 39:e98d5df6dc74 355 {
AzureIoTClient 39:e98d5df6dc74 356 AMQP_MESSENGER_STATE previous_state = instance->state;
AzureIoTClient 39:e98d5df6dc74 357 instance->state = new_state;
AzureIoTClient 39:e98d5df6dc74 358
AzureIoTClient 39:e98d5df6dc74 359 if (instance->config != NULL && instance->config->on_state_changed_callback != NULL)
AzureIoTClient 39:e98d5df6dc74 360 {
AzureIoTClient 39:e98d5df6dc74 361 instance->config->on_state_changed_callback(instance->config->on_state_changed_context, previous_state, new_state);
AzureIoTClient 39:e98d5df6dc74 362 }
AzureIoTClient 39:e98d5df6dc74 363 }
AzureIoTClient 39:e98d5df6dc74 364 }
AzureIoTClient 39:e98d5df6dc74 365
AzureIoTClient 39:e98d5df6dc74 366 static int add_link_attach_properties(LINK_HANDLE link, MAP_HANDLE user_defined_properties)
AzureIoTClient 39:e98d5df6dc74 367 {
AzureIoTClient 39:e98d5df6dc74 368 int result;
AzureIoTClient 39:e98d5df6dc74 369 fields attach_properties;
AzureIoTClient 39:e98d5df6dc74 370
AzureIoTClient 39:e98d5df6dc74 371 if ((attach_properties = amqpvalue_create_map()) == NULL)
AzureIoTClient 39:e98d5df6dc74 372 {
AzureIoTClient 39:e98d5df6dc74 373 LogError("Failed to create the map for attach properties.");
AzureIoTClient 39:e98d5df6dc74 374 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 375 }
AzureIoTClient 39:e98d5df6dc74 376 else
AzureIoTClient 39:e98d5df6dc74 377 {
AzureIoTClient 39:e98d5df6dc74 378 const char* const* keys;
AzureIoTClient 39:e98d5df6dc74 379 const char* const* values;
AzureIoTClient 39:e98d5df6dc74 380 size_t count;
AzureIoTClient 39:e98d5df6dc74 381
AzureIoTClient 39:e98d5df6dc74 382 if (Map_GetInternals(user_defined_properties, &keys, &values, &count) != MAP_OK)
AzureIoTClient 39:e98d5df6dc74 383 {
AzureIoTClient 39:e98d5df6dc74 384 LogError("failed getting user defined properties details.");
AzureIoTClient 39:e98d5df6dc74 385 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 386 }
AzureIoTClient 39:e98d5df6dc74 387 else
AzureIoTClient 39:e98d5df6dc74 388 {
AzureIoTClient 39:e98d5df6dc74 389 size_t i;
AzureIoTClient 39:e98d5df6dc74 390 result = RESULT_OK;
AzureIoTClient 39:e98d5df6dc74 391
AzureIoTClient 39:e98d5df6dc74 392 for (i = 0; i < count && result == RESULT_OK; i++)
AzureIoTClient 39:e98d5df6dc74 393 {
AzureIoTClient 39:e98d5df6dc74 394 AMQP_VALUE key;
AzureIoTClient 39:e98d5df6dc74 395 AMQP_VALUE value;
AzureIoTClient 39:e98d5df6dc74 396
AzureIoTClient 39:e98d5df6dc74 397 if ((key = amqpvalue_create_symbol(keys[i])) == NULL)
AzureIoTClient 39:e98d5df6dc74 398 {
AzureIoTClient 39:e98d5df6dc74 399 LogError("Failed creating AMQP_VALUE For key %s.", keys[i]);
AzureIoTClient 39:e98d5df6dc74 400 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 401 }
AzureIoTClient 39:e98d5df6dc74 402 else
AzureIoTClient 39:e98d5df6dc74 403 {
AzureIoTClient 39:e98d5df6dc74 404 if ((value = amqpvalue_create_string(values[i])) == NULL)
AzureIoTClient 39:e98d5df6dc74 405 {
AzureIoTClient 39:e98d5df6dc74 406 LogError("Failed creating AMQP_VALUE For key %s value", keys[i]);
AzureIoTClient 39:e98d5df6dc74 407 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 408 }
AzureIoTClient 39:e98d5df6dc74 409 else
AzureIoTClient 39:e98d5df6dc74 410 {
AzureIoTClient 39:e98d5df6dc74 411 if (amqpvalue_set_map_value(attach_properties, key, value) != 0)
AzureIoTClient 39:e98d5df6dc74 412 {
AzureIoTClient 39:e98d5df6dc74 413 LogError("Failed adding property %s to map", keys[i]);
AzureIoTClient 39:e98d5df6dc74 414 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 415 }
AzureIoTClient 39:e98d5df6dc74 416
AzureIoTClient 39:e98d5df6dc74 417 amqpvalue_destroy(value);
AzureIoTClient 39:e98d5df6dc74 418 }
AzureIoTClient 39:e98d5df6dc74 419
AzureIoTClient 39:e98d5df6dc74 420 amqpvalue_destroy(key);
AzureIoTClient 39:e98d5df6dc74 421 }
AzureIoTClient 39:e98d5df6dc74 422 }
AzureIoTClient 39:e98d5df6dc74 423
AzureIoTClient 39:e98d5df6dc74 424 if (result == RESULT_OK)
AzureIoTClient 39:e98d5df6dc74 425 {
AzureIoTClient 39:e98d5df6dc74 426 if (link_set_attach_properties(link, attach_properties) != 0)
AzureIoTClient 39:e98d5df6dc74 427 {
AzureIoTClient 39:e98d5df6dc74 428 LogError("Failed attaching properties to link");
AzureIoTClient 39:e98d5df6dc74 429 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 430 }
AzureIoTClient 39:e98d5df6dc74 431 else
AzureIoTClient 39:e98d5df6dc74 432 {
AzureIoTClient 39:e98d5df6dc74 433 result = RESULT_OK;
AzureIoTClient 39:e98d5df6dc74 434 }
AzureIoTClient 39:e98d5df6dc74 435 }
AzureIoTClient 39:e98d5df6dc74 436 }
AzureIoTClient 39:e98d5df6dc74 437
AzureIoTClient 39:e98d5df6dc74 438 amqpvalue_destroy(attach_properties);
AzureIoTClient 39:e98d5df6dc74 439 }
AzureIoTClient 39:e98d5df6dc74 440
AzureIoTClient 39:e98d5df6dc74 441 return result;
AzureIoTClient 39:e98d5df6dc74 442 }
AzureIoTClient 39:e98d5df6dc74 443
AzureIoTClient 39:e98d5df6dc74 444 static int create_link_terminus(role link_role, STRING_HANDLE link_name, STRING_HANDLE link_address, AMQP_VALUE* source, AMQP_VALUE* target)
AzureIoTClient 39:e98d5df6dc74 445 {
AzureIoTClient 39:e98d5df6dc74 446 int result;
AzureIoTClient 39:e98d5df6dc74 447 STRING_HANDLE terminus_name;
AzureIoTClient 39:e98d5df6dc74 448 const char* source_name;
AzureIoTClient 39:e98d5df6dc74 449 const char* target_name;
AzureIoTClient 39:e98d5df6dc74 450
AzureIoTClient 39:e98d5df6dc74 451 if (link_role == role_sender)
AzureIoTClient 39:e98d5df6dc74 452 {
AzureIoTClient 39:e98d5df6dc74 453 if ((terminus_name = create_link_terminus_name(link_name, "source")) == NULL)
AzureIoTClient 39:e98d5df6dc74 454 {
AzureIoTClient 39:e98d5df6dc74 455 LogError("Failed creating terminus name");
AzureIoTClient 39:e98d5df6dc74 456 source_name = NULL;
AzureIoTClient 39:e98d5df6dc74 457 target_name = NULL;
AzureIoTClient 39:e98d5df6dc74 458 }
AzureIoTClient 39:e98d5df6dc74 459 else
AzureIoTClient 39:e98d5df6dc74 460 {
AzureIoTClient 39:e98d5df6dc74 461 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_077: [The AMQP link source shall be defined as "<link name>-source"]
AzureIoTClient 39:e98d5df6dc74 462 source_name = STRING_c_str(terminus_name);
AzureIoTClient 39:e98d5df6dc74 463 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_094: [The AMQP link source shall be defined as <link address>]
AzureIoTClient 39:e98d5df6dc74 464 target_name = STRING_c_str(link_address);
AzureIoTClient 39:e98d5df6dc74 465 }
AzureIoTClient 39:e98d5df6dc74 466 }
AzureIoTClient 39:e98d5df6dc74 467 else
AzureIoTClient 39:e98d5df6dc74 468 {
AzureIoTClient 39:e98d5df6dc74 469 if ((terminus_name = create_link_terminus_name(link_name, "target")) == NULL)
AzureIoTClient 39:e98d5df6dc74 470 {
AzureIoTClient 39:e98d5df6dc74 471 LogError("Failed creating terminus name");
AzureIoTClient 39:e98d5df6dc74 472 source_name = NULL;
AzureIoTClient 39:e98d5df6dc74 473 target_name = NULL;
AzureIoTClient 39:e98d5df6dc74 474 }
AzureIoTClient 39:e98d5df6dc74 475 else
AzureIoTClient 39:e98d5df6dc74 476 {
AzureIoTClient 39:e98d5df6dc74 477 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_078: [The AMQP link target shall be defined as <link address>]
AzureIoTClient 39:e98d5df6dc74 478 source_name = STRING_c_str(link_address);
AzureIoTClient 39:e98d5df6dc74 479 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_095: [The AMQP link target shall be defined as "<link name>-target"]
AzureIoTClient 39:e98d5df6dc74 480 target_name = STRING_c_str(terminus_name);
AzureIoTClient 39:e98d5df6dc74 481 }
AzureIoTClient 39:e98d5df6dc74 482 }
AzureIoTClient 39:e98d5df6dc74 483
AzureIoTClient 39:e98d5df6dc74 484 if (source_name == NULL || target_name == NULL)
AzureIoTClient 39:e98d5df6dc74 485 {
AzureIoTClient 39:e98d5df6dc74 486 LogError("Failed creating link source and/or target name (source=%p, target=%p)", source_name, target_name);
AzureIoTClient 39:e98d5df6dc74 487 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 488 }
AzureIoTClient 39:e98d5df6dc74 489 else
AzureIoTClient 39:e98d5df6dc74 490 {
AzureIoTClient 39:e98d5df6dc74 491 if ((*source = messaging_create_source(source_name)) == NULL)
AzureIoTClient 39:e98d5df6dc74 492 {
AzureIoTClient 39:e98d5df6dc74 493 LogError("Failed creating link source");
AzureIoTClient 39:e98d5df6dc74 494 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 495 }
AzureIoTClient 39:e98d5df6dc74 496 else
AzureIoTClient 39:e98d5df6dc74 497 {
AzureIoTClient 39:e98d5df6dc74 498 if ((*target = messaging_create_target(target_name)) == NULL)
AzureIoTClient 39:e98d5df6dc74 499 {
AzureIoTClient 39:e98d5df6dc74 500 LogError("Failed creating link target");
AzureIoTClient 39:e98d5df6dc74 501 amqpvalue_destroy(*source);
AzureIoTClient 39:e98d5df6dc74 502 *source = NULL;
AzureIoTClient 39:e98d5df6dc74 503 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 504 }
AzureIoTClient 39:e98d5df6dc74 505 else
AzureIoTClient 39:e98d5df6dc74 506 {
AzureIoTClient 39:e98d5df6dc74 507 result = RESULT_OK;
AzureIoTClient 39:e98d5df6dc74 508 }
AzureIoTClient 39:e98d5df6dc74 509 }
AzureIoTClient 39:e98d5df6dc74 510 }
AzureIoTClient 39:e98d5df6dc74 511
AzureIoTClient 39:e98d5df6dc74 512
AzureIoTClient 39:e98d5df6dc74 513 STRING_delete(terminus_name);
AzureIoTClient 39:e98d5df6dc74 514
AzureIoTClient 39:e98d5df6dc74 515 return result;
AzureIoTClient 39:e98d5df6dc74 516 }
AzureIoTClient 39:e98d5df6dc74 517
AzureIoTClient 39:e98d5df6dc74 518 static LINK_HANDLE create_link(role link_role, SESSION_HANDLE session_handle, AMQP_MESSENGER_LINK_CONFIG* link_config, const char* iothub_host_fqdn, const char* device_id)
AzureIoTClient 39:e98d5df6dc74 519 {
AzureIoTClient 39:e98d5df6dc74 520 LINK_HANDLE result = NULL;
AzureIoTClient 39:e98d5df6dc74 521 STRING_HANDLE link_address;
AzureIoTClient 39:e98d5df6dc74 522
AzureIoTClient 39:e98d5df6dc74 523 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_075: [The AMQP link address shall be defined as "amqps://<`iothub_host_fqdn`>/devices/<`device_id`>/<`instance-config->send_link.source_suffix`>"]
AzureIoTClient 39:e98d5df6dc74 524 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_092: [The AMQP link address shall be defined as "amqps://<`iothub_host_fqdn`>/devices/<`device_id`>/<`instance-config->receive_link.target_suffix`>"]
AzureIoTClient 39:e98d5df6dc74 525 if ((link_address = create_link_address(iothub_host_fqdn, device_id, (link_role == role_sender ? link_config->target_suffix : link_config->source_suffix))) == NULL)
AzureIoTClient 39:e98d5df6dc74 526 {
AzureIoTClient 39:e98d5df6dc74 527 LogError("Failed creating the message sender (failed creating the 'link_address')");
AzureIoTClient 39:e98d5df6dc74 528 result = NULL;
AzureIoTClient 39:e98d5df6dc74 529 }
AzureIoTClient 39:e98d5df6dc74 530 else
AzureIoTClient 39:e98d5df6dc74 531 {
AzureIoTClient 39:e98d5df6dc74 532 STRING_HANDLE link_name;
AzureIoTClient 39:e98d5df6dc74 533
AzureIoTClient 39:e98d5df6dc74 534 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_076: [The AMQP link name shall be defined as "link-snd-<`device_id`>-<locally generated UUID>"]
AzureIoTClient 39:e98d5df6dc74 535 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_093: [The AMQP link name shall be defined as "link-rcv-<`device_id`>-<locally generated UUID>"]
AzureIoTClient 39:e98d5df6dc74 536 if ((link_name = create_link_name(link_role, device_id)) == NULL)
AzureIoTClient 39:e98d5df6dc74 537 {
AzureIoTClient 39:e98d5df6dc74 538 LogError("Failed creating the link name");
AzureIoTClient 39:e98d5df6dc74 539 result = NULL;
AzureIoTClient 39:e98d5df6dc74 540 }
AzureIoTClient 39:e98d5df6dc74 541 else
AzureIoTClient 39:e98d5df6dc74 542 {
AzureIoTClient 39:e98d5df6dc74 543 AMQP_VALUE source = NULL;
AzureIoTClient 39:e98d5df6dc74 544 AMQP_VALUE target = NULL;
AzureIoTClient 39:e98d5df6dc74 545
AzureIoTClient 39:e98d5df6dc74 546 if (create_link_terminus(link_role, link_name, link_address, &source, &target) == RESULT_OK)
AzureIoTClient 39:e98d5df6dc74 547 {
AzureIoTClient 39:e98d5df6dc74 548 if ((result = link_create(session_handle, STRING_c_str(link_name), link_role, source, target)) == NULL)
AzureIoTClient 39:e98d5df6dc74 549 {
AzureIoTClient 39:e98d5df6dc74 550 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_079: [If the link fails to be created, amqp_messenger_do_work() shall change the state to AMQP_MESSENGER_STATE_ERROR]
AzureIoTClient 39:e98d5df6dc74 551 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_096: [If the link fails to be created, amqp_messenger_do_work() shall change the state to AMQP_MESSENGER_STATE_ERROR]
AzureIoTClient 39:e98d5df6dc74 552 LogError("Failed creating the AMQP link");
AzureIoTClient 39:e98d5df6dc74 553 }
AzureIoTClient 39:e98d5df6dc74 554 else
AzureIoTClient 39:e98d5df6dc74 555 {
AzureIoTClient 39:e98d5df6dc74 556 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_082: [The AMQP link maximum message size shall be set to UINT64_MAX using link_set_max_message_size()]
AzureIoTClient 39:e98d5df6dc74 557 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_099: [The AMQP link maximum message size shall be set to UINT64_MAX using link_set_max_message_size()]
AzureIoTClient 39:e98d5df6dc74 558 if (link_set_max_message_size(result, MESSAGE_SENDER_MAX_LINK_SIZE) != RESULT_OK)
AzureIoTClient 39:e98d5df6dc74 559 {
AzureIoTClient 39:e98d5df6dc74 560 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_083: [If link_set_max_message_size() fails, it shall be logged and ignored.]
AzureIoTClient 39:e98d5df6dc74 561 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_100: [If link_set_max_message_size() fails, it shall be logged and ignored.]
AzureIoTClient 39:e98d5df6dc74 562 LogError("Failed setting link max message size.");
AzureIoTClient 39:e98d5df6dc74 563 }
AzureIoTClient 39:e98d5df6dc74 564
AzureIoTClient 39:e98d5df6dc74 565 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_080: [The AMQP link shall have its ATTACH properties set using `instance->config->send_link.attach_properties`]
AzureIoTClient 39:e98d5df6dc74 566 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_097: [The AMQP link shall have its ATTACH properties set using `instance->config->receive_link.attach_properties`]
AzureIoTClient 39:e98d5df6dc74 567 if (link_config->attach_properties != NULL &&
AzureIoTClient 39:e98d5df6dc74 568 add_link_attach_properties(result, link_config->attach_properties) != RESULT_OK)
AzureIoTClient 39:e98d5df6dc74 569 {
AzureIoTClient 39:e98d5df6dc74 570 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_081: [If the AMQP link attach properties fail to be set, amqp_messenger_do_work() shall change the state to AMQP_MESSENGER_STATE_ERROR]
AzureIoTClient 39:e98d5df6dc74 571 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_098: [If the AMQP link attach properties fail to be set, amqp_messenger_do_work() shall change the state to AMQP_MESSENGER_STATE_ERROR]
AzureIoTClient 39:e98d5df6dc74 572 LogError("Failed setting link attach properties");
AzureIoTClient 39:e98d5df6dc74 573 link_destroy(result);
AzureIoTClient 39:e98d5df6dc74 574 result = NULL;
AzureIoTClient 39:e98d5df6dc74 575 }
AzureIoTClient 39:e98d5df6dc74 576 }
AzureIoTClient 39:e98d5df6dc74 577
AzureIoTClient 39:e98d5df6dc74 578 amqpvalue_destroy(source);
AzureIoTClient 39:e98d5df6dc74 579 amqpvalue_destroy(target);
AzureIoTClient 39:e98d5df6dc74 580 }
AzureIoTClient 39:e98d5df6dc74 581
AzureIoTClient 39:e98d5df6dc74 582 STRING_delete(link_name);
AzureIoTClient 39:e98d5df6dc74 583 }
AzureIoTClient 39:e98d5df6dc74 584
AzureIoTClient 39:e98d5df6dc74 585 STRING_delete(link_address);
AzureIoTClient 39:e98d5df6dc74 586 }
AzureIoTClient 39:e98d5df6dc74 587
AzureIoTClient 39:e98d5df6dc74 588 return result;
AzureIoTClient 39:e98d5df6dc74 589 }
AzureIoTClient 39:e98d5df6dc74 590
AzureIoTClient 39:e98d5df6dc74 591 static void destroy_message_sender(AMQP_MESSENGER_INSTANCE* instance)
AzureIoTClient 39:e98d5df6dc74 592 {
AzureIoTClient 39:e98d5df6dc74 593 if (instance->message_sender != NULL)
AzureIoTClient 39:e98d5df6dc74 594 {
AzureIoTClient 39:e98d5df6dc74 595 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_090: [`instance->message_sender` shall be destroyed using messagesender_destroy()]
AzureIoTClient 39:e98d5df6dc74 596 messagesender_destroy(instance->message_sender);
AzureIoTClient 39:e98d5df6dc74 597 instance->message_sender = NULL;
AzureIoTClient 39:e98d5df6dc74 598 }
AzureIoTClient 39:e98d5df6dc74 599
AzureIoTClient 45:c09fac270e60 600 instance->message_sender_current_state = MESSAGE_SENDER_STATE_IDLE;
AzureIoTClient 45:c09fac270e60 601 instance->message_sender_previous_state = MESSAGE_SENDER_STATE_IDLE;
AzureIoTClient 45:c09fac270e60 602 instance->last_message_sender_state_change_time = INDEFINITE_TIME;
AzureIoTClient 45:c09fac270e60 603
AzureIoTClient 39:e98d5df6dc74 604 if (instance->sender_link != NULL)
AzureIoTClient 39:e98d5df6dc74 605 {
AzureIoTClient 39:e98d5df6dc74 606 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_091: [`instance->sender_link` shall be destroyed using link_destroy()]
AzureIoTClient 39:e98d5df6dc74 607 link_destroy(instance->sender_link);
AzureIoTClient 39:e98d5df6dc74 608 instance->sender_link = NULL;
AzureIoTClient 39:e98d5df6dc74 609 }
AzureIoTClient 39:e98d5df6dc74 610 }
AzureIoTClient 39:e98d5df6dc74 611
AzureIoTClient 39:e98d5df6dc74 612 static void on_message_sender_state_changed_callback(void* context, MESSAGE_SENDER_STATE new_state, MESSAGE_SENDER_STATE previous_state)
AzureIoTClient 39:e98d5df6dc74 613 {
AzureIoTClient 39:e98d5df6dc74 614 if (context == NULL)
AzureIoTClient 39:e98d5df6dc74 615 {
AzureIoTClient 39:e98d5df6dc74 616 LogError("on_message_sender_state_changed_callback was invoked with a NULL context; although unexpected, this failure will be ignored");
AzureIoTClient 39:e98d5df6dc74 617 }
AzureIoTClient 39:e98d5df6dc74 618 else if (new_state != previous_state)
AzureIoTClient 39:e98d5df6dc74 619 {
AzureIoTClient 39:e98d5df6dc74 620 AMQP_MESSENGER_INSTANCE* instance = (AMQP_MESSENGER_INSTANCE*)context;
AzureIoTClient 39:e98d5df6dc74 621
AzureIoTClient 39:e98d5df6dc74 622 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_088: [`new_state`, `previous_state` shall be saved into `instance->message_sender_previous_state` and `instance->message_sender_current_state`]
AzureIoTClient 39:e98d5df6dc74 623 instance->message_sender_current_state = new_state;
AzureIoTClient 39:e98d5df6dc74 624 instance->message_sender_previous_state = previous_state;
AzureIoTClient 39:e98d5df6dc74 625 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_089: [`instance->last_message_sender_state_change_time` shall be set using get_time()]
AzureIoTClient 39:e98d5df6dc74 626 instance->last_message_sender_state_change_time = get_time(NULL);
AzureIoTClient 39:e98d5df6dc74 627 }
AzureIoTClient 39:e98d5df6dc74 628 }
AzureIoTClient 39:e98d5df6dc74 629
AzureIoTClient 39:e98d5df6dc74 630 static int create_message_sender(AMQP_MESSENGER_INSTANCE* instance)
AzureIoTClient 39:e98d5df6dc74 631 {
AzureIoTClient 39:e98d5df6dc74 632 int result;
AzureIoTClient 39:e98d5df6dc74 633
AzureIoTClient 39:e98d5df6dc74 634 if ((instance->sender_link = create_link(role_sender,
AzureIoTClient 39:e98d5df6dc74 635 instance->session_handle, &instance->config->send_link, instance->config->iothub_host_fqdn, instance->config->device_id)) == NULL)
AzureIoTClient 39:e98d5df6dc74 636 {
AzureIoTClient 39:e98d5df6dc74 637 LogError("Failed creating the message sender link");
AzureIoTClient 39:e98d5df6dc74 638 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 639 }
AzureIoTClient 39:e98d5df6dc74 640 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_084: [`instance->message_sender` shall be created using messagesender_create(), passing the `instance->sender_link` and `on_message_sender_state_changed_callback`]
AzureIoTClient 39:e98d5df6dc74 641 else if ((instance->message_sender = messagesender_create(instance->sender_link, on_message_sender_state_changed_callback, (void*)instance)) == NULL)
AzureIoTClient 39:e98d5df6dc74 642 {
AzureIoTClient 39:e98d5df6dc74 643 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_085: [If messagesender_create() fails, amqp_messenger_do_work() shall fail and return]
AzureIoTClient 39:e98d5df6dc74 644 LogError("Failed creating the message sender (messagesender_create failed)");
AzureIoTClient 39:e98d5df6dc74 645 destroy_message_sender(instance);
AzureIoTClient 39:e98d5df6dc74 646 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 647 }
AzureIoTClient 39:e98d5df6dc74 648 else
AzureIoTClient 39:e98d5df6dc74 649 {
AzureIoTClient 39:e98d5df6dc74 650 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_086: [`instance->message_sender` shall be opened using messagesender_open()]
AzureIoTClient 39:e98d5df6dc74 651 if (messagesender_open(instance->message_sender) != RESULT_OK)
AzureIoTClient 39:e98d5df6dc74 652 {
AzureIoTClient 39:e98d5df6dc74 653 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_087: [If messagesender_open() fails, amqp_messenger_do_work() shall fail and return]
AzureIoTClient 39:e98d5df6dc74 654 LogError("Failed opening the AMQP message sender.");
AzureIoTClient 39:e98d5df6dc74 655 destroy_message_sender(instance);
AzureIoTClient 39:e98d5df6dc74 656 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 657 }
AzureIoTClient 39:e98d5df6dc74 658 else
AzureIoTClient 39:e98d5df6dc74 659 {
AzureIoTClient 39:e98d5df6dc74 660 result = RESULT_OK;
AzureIoTClient 39:e98d5df6dc74 661 }
AzureIoTClient 39:e98d5df6dc74 662 }
AzureIoTClient 39:e98d5df6dc74 663
AzureIoTClient 39:e98d5df6dc74 664 return result;
AzureIoTClient 39:e98d5df6dc74 665 }
AzureIoTClient 39:e98d5df6dc74 666
AzureIoTClient 39:e98d5df6dc74 667 static void destroy_message_receiver(AMQP_MESSENGER_INSTANCE* instance)
AzureIoTClient 39:e98d5df6dc74 668 {
AzureIoTClient 39:e98d5df6dc74 669 if (instance->message_receiver != NULL)
AzureIoTClient 39:e98d5df6dc74 670 {
AzureIoTClient 39:e98d5df6dc74 671 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_113: [`instance->message_receiver` shall be closed using messagereceiver_close()]
AzureIoTClient 39:e98d5df6dc74 672 if (messagereceiver_close(instance->message_receiver) != RESULT_OK)
AzureIoTClient 39:e98d5df6dc74 673 {
AzureIoTClient 39:e98d5df6dc74 674 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_114: [If messagereceiver_close() fails, it shall be logged and ignored]
AzureIoTClient 39:e98d5df6dc74 675 LogError("Failed closing the AMQP message receiver (this failure will be ignored).");
AzureIoTClient 39:e98d5df6dc74 676 }
AzureIoTClient 39:e98d5df6dc74 677
AzureIoTClient 39:e98d5df6dc74 678 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_115: [`instance->message_receiver` shall be destroyed using messagereceiver_destroy()]
AzureIoTClient 39:e98d5df6dc74 679 messagereceiver_destroy(instance->message_receiver);
AzureIoTClient 39:e98d5df6dc74 680
AzureIoTClient 39:e98d5df6dc74 681 instance->message_receiver = NULL;
AzureIoTClient 39:e98d5df6dc74 682 }
AzureIoTClient 39:e98d5df6dc74 683
AzureIoTClient 45:c09fac270e60 684 instance->message_receiver_current_state = MESSAGE_RECEIVER_STATE_IDLE;
AzureIoTClient 45:c09fac270e60 685 instance->message_receiver_previous_state = MESSAGE_RECEIVER_STATE_IDLE;
AzureIoTClient 45:c09fac270e60 686 instance->last_message_receiver_state_change_time = INDEFINITE_TIME;
AzureIoTClient 45:c09fac270e60 687
AzureIoTClient 39:e98d5df6dc74 688 if (instance->receiver_link != NULL)
AzureIoTClient 39:e98d5df6dc74 689 {
AzureIoTClient 39:e98d5df6dc74 690 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_116: [`instance->receiver_link` shall be destroyed using link_destroy()]
AzureIoTClient 39:e98d5df6dc74 691 link_destroy(instance->receiver_link);
AzureIoTClient 39:e98d5df6dc74 692 instance->receiver_link = NULL;
AzureIoTClient 39:e98d5df6dc74 693 }
AzureIoTClient 39:e98d5df6dc74 694 }
AzureIoTClient 39:e98d5df6dc74 695
AzureIoTClient 39:e98d5df6dc74 696 static void on_message_receiver_state_changed_callback(const void* context, MESSAGE_RECEIVER_STATE new_state, MESSAGE_RECEIVER_STATE previous_state)
AzureIoTClient 39:e98d5df6dc74 697 {
AzureIoTClient 39:e98d5df6dc74 698 if (context == NULL)
AzureIoTClient 39:e98d5df6dc74 699 {
AzureIoTClient 39:e98d5df6dc74 700 LogError("on_message_receiver_state_changed_callback was invoked with a NULL context; although unexpected, this failure will be ignored");
AzureIoTClient 39:e98d5df6dc74 701 }
AzureIoTClient 39:e98d5df6dc74 702 else
AzureIoTClient 39:e98d5df6dc74 703 {
AzureIoTClient 39:e98d5df6dc74 704 if (new_state != previous_state)
AzureIoTClient 39:e98d5df6dc74 705 {
AzureIoTClient 39:e98d5df6dc74 706 AMQP_MESSENGER_INSTANCE* instance = (AMQP_MESSENGER_INSTANCE*)context;
AzureIoTClient 39:e98d5df6dc74 707
AzureIoTClient 39:e98d5df6dc74 708 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_105: [`new_state`, `previous_state` shall be saved into `instance->message_receiver_previous_state` and `instance->message_receiver_current_state`]
AzureIoTClient 39:e98d5df6dc74 709 instance->message_receiver_current_state = new_state;
AzureIoTClient 39:e98d5df6dc74 710 instance->message_receiver_previous_state = previous_state;
AzureIoTClient 39:e98d5df6dc74 711 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_106: [`instance->last_message_receiver_state_change_time` shall be set using get_time()]
AzureIoTClient 39:e98d5df6dc74 712 instance->last_message_receiver_state_change_time = get_time(NULL);
AzureIoTClient 39:e98d5df6dc74 713
AzureIoTClient 39:e98d5df6dc74 714 if (new_state == MESSAGE_RECEIVER_STATE_OPEN)
AzureIoTClient 39:e98d5df6dc74 715 {
AzureIoTClient 39:e98d5df6dc74 716 if (instance->config->on_subscription_changed_callback != NULL)
AzureIoTClient 39:e98d5df6dc74 717 {
AzureIoTClient 39:e98d5df6dc74 718 instance->config->on_subscription_changed_callback(instance->config->on_subscription_changed_context, true);
AzureIoTClient 39:e98d5df6dc74 719 }
AzureIoTClient 39:e98d5df6dc74 720 }
AzureIoTClient 39:e98d5df6dc74 721 else if (previous_state == MESSAGE_RECEIVER_STATE_OPEN && new_state != MESSAGE_RECEIVER_STATE_OPEN)
AzureIoTClient 39:e98d5df6dc74 722 {
AzureIoTClient 39:e98d5df6dc74 723 if (instance->config->on_subscription_changed_callback != NULL)
AzureIoTClient 39:e98d5df6dc74 724 {
AzureIoTClient 39:e98d5df6dc74 725 instance->config->on_subscription_changed_callback(instance->config->on_subscription_changed_context, false);
AzureIoTClient 39:e98d5df6dc74 726 }
AzureIoTClient 39:e98d5df6dc74 727 }
AzureIoTClient 39:e98d5df6dc74 728 }
AzureIoTClient 39:e98d5df6dc74 729 }
AzureIoTClient 39:e98d5df6dc74 730 }
AzureIoTClient 39:e98d5df6dc74 731
AzureIoTClient 39:e98d5df6dc74 732 static AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO* create_message_disposition_info(AMQP_MESSENGER_INSTANCE* messenger)
AzureIoTClient 39:e98d5df6dc74 733 {
AzureIoTClient 39:e98d5df6dc74 734 AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO* result;
AzureIoTClient 39:e98d5df6dc74 735
AzureIoTClient 39:e98d5df6dc74 736 if ((result = (AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO*)malloc(sizeof(AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO))) == NULL)
AzureIoTClient 39:e98d5df6dc74 737 {
AzureIoTClient 39:e98d5df6dc74 738 LogError("Failed creating AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO container (malloc failed)");
AzureIoTClient 39:e98d5df6dc74 739 result = NULL;
AzureIoTClient 39:e98d5df6dc74 740 }
AzureIoTClient 39:e98d5df6dc74 741 else
AzureIoTClient 39:e98d5df6dc74 742 {
AzureIoTClient 39:e98d5df6dc74 743 delivery_number message_id;
AzureIoTClient 39:e98d5df6dc74 744
AzureIoTClient 39:e98d5df6dc74 745 if (messagereceiver_get_received_message_id(messenger->message_receiver, &message_id) != RESULT_OK)
AzureIoTClient 39:e98d5df6dc74 746 {
AzureIoTClient 39:e98d5df6dc74 747 LogError("Failed creating AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO container (messagereceiver_get_received_message_id failed)");
AzureIoTClient 39:e98d5df6dc74 748 free(result);
AzureIoTClient 39:e98d5df6dc74 749 result = NULL;
AzureIoTClient 39:e98d5df6dc74 750 }
AzureIoTClient 39:e98d5df6dc74 751 else
AzureIoTClient 39:e98d5df6dc74 752 {
AzureIoTClient 39:e98d5df6dc74 753 const char* link_name;
AzureIoTClient 39:e98d5df6dc74 754
AzureIoTClient 39:e98d5df6dc74 755 if (messagereceiver_get_link_name(messenger->message_receiver, &link_name) != RESULT_OK)
AzureIoTClient 39:e98d5df6dc74 756 {
AzureIoTClient 39:e98d5df6dc74 757 LogError("Failed creating AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO container (messagereceiver_get_link_name failed)");
AzureIoTClient 39:e98d5df6dc74 758 free(result);
AzureIoTClient 39:e98d5df6dc74 759 result = NULL;
AzureIoTClient 39:e98d5df6dc74 760 }
AzureIoTClient 39:e98d5df6dc74 761 else if (mallocAndStrcpy_s(&result->source, link_name) != RESULT_OK)
AzureIoTClient 39:e98d5df6dc74 762 {
AzureIoTClient 39:e98d5df6dc74 763 LogError("Failed creating AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO container (failed copying link name)");
AzureIoTClient 39:e98d5df6dc74 764 free(result);
AzureIoTClient 39:e98d5df6dc74 765 result = NULL;
AzureIoTClient 39:e98d5df6dc74 766 }
AzureIoTClient 39:e98d5df6dc74 767 else
AzureIoTClient 39:e98d5df6dc74 768 {
AzureIoTClient 39:e98d5df6dc74 769 result->message_id = message_id;
AzureIoTClient 39:e98d5df6dc74 770 }
AzureIoTClient 39:e98d5df6dc74 771 }
AzureIoTClient 39:e98d5df6dc74 772 }
AzureIoTClient 39:e98d5df6dc74 773
AzureIoTClient 39:e98d5df6dc74 774 return result;
AzureIoTClient 39:e98d5df6dc74 775 }
AzureIoTClient 39:e98d5df6dc74 776
AzureIoTClient 39:e98d5df6dc74 777 static void destroy_message_disposition_info(AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO* disposition_info)
AzureIoTClient 39:e98d5df6dc74 778 {
AzureIoTClient 39:e98d5df6dc74 779 free(disposition_info->source);
AzureIoTClient 39:e98d5df6dc74 780 free(disposition_info);
AzureIoTClient 39:e98d5df6dc74 781 }
AzureIoTClient 39:e98d5df6dc74 782
AzureIoTClient 39:e98d5df6dc74 783 static AMQP_VALUE create_uamqp_disposition_result_from(AMQP_MESSENGER_DISPOSITION_RESULT disposition_result)
AzureIoTClient 39:e98d5df6dc74 784 {
AzureIoTClient 39:e98d5df6dc74 785 AMQP_VALUE uamqp_disposition_result;
AzureIoTClient 39:e98d5df6dc74 786
AzureIoTClient 39:e98d5df6dc74 787 if (disposition_result == AMQP_MESSENGER_DISPOSITION_RESULT_NONE)
AzureIoTClient 39:e98d5df6dc74 788 {
AzureIoTClient 39:e98d5df6dc74 789 uamqp_disposition_result = NULL; // intentionally not sending an answer.
AzureIoTClient 39:e98d5df6dc74 790 }
AzureIoTClient 39:e98d5df6dc74 791 else if (disposition_result == AMQP_MESSENGER_DISPOSITION_RESULT_ACCEPTED)
AzureIoTClient 39:e98d5df6dc74 792 {
AzureIoTClient 39:e98d5df6dc74 793 uamqp_disposition_result = messaging_delivery_accepted();
AzureIoTClient 39:e98d5df6dc74 794 }
AzureIoTClient 39:e98d5df6dc74 795 else if (disposition_result == AMQP_MESSENGER_DISPOSITION_RESULT_RELEASED)
AzureIoTClient 39:e98d5df6dc74 796 {
AzureIoTClient 39:e98d5df6dc74 797 uamqp_disposition_result = messaging_delivery_released();
AzureIoTClient 39:e98d5df6dc74 798 }
AzureIoTClient 39:e98d5df6dc74 799 else // id est, if (disposition_result == AMQP_MESSENGER_DISPOSITION_RESULT_REJECTED)
AzureIoTClient 39:e98d5df6dc74 800 {
AzureIoTClient 39:e98d5df6dc74 801 uamqp_disposition_result = messaging_delivery_rejected("Rejected by application", "Rejected by application");
AzureIoTClient 39:e98d5df6dc74 802 }
AzureIoTClient 39:e98d5df6dc74 803
AzureIoTClient 39:e98d5df6dc74 804 return uamqp_disposition_result;
AzureIoTClient 39:e98d5df6dc74 805 }
AzureIoTClient 39:e98d5df6dc74 806
AzureIoTClient 39:e98d5df6dc74 807 static AMQP_VALUE on_message_received_internal_callback(const void* context, MESSAGE_HANDLE message)
AzureIoTClient 39:e98d5df6dc74 808 {
AzureIoTClient 39:e98d5df6dc74 809 AMQP_VALUE result;
AzureIoTClient 39:e98d5df6dc74 810 AMQP_MESSENGER_INSTANCE* instance = (AMQP_MESSENGER_INSTANCE*)context;
AzureIoTClient 39:e98d5df6dc74 811 AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO* message_disposition_info;
AzureIoTClient 39:e98d5df6dc74 812
AzureIoTClient 39:e98d5df6dc74 813 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_107: [A AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO instance shall be created containing the source link name and message delivery ID]
AzureIoTClient 39:e98d5df6dc74 814 if ((message_disposition_info = create_message_disposition_info(instance)) == NULL)
AzureIoTClient 39:e98d5df6dc74 815 {
AzureIoTClient 39:e98d5df6dc74 816 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_108: [If the AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO instance fails to be created, on_message_received_internal_callback shall return the result of messaging_delivery_released()]
AzureIoTClient 39:e98d5df6dc74 817 LogError("on_message_received_internal_callback failed (failed creating AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO).");
AzureIoTClient 39:e98d5df6dc74 818 result = messaging_delivery_released();
AzureIoTClient 39:e98d5df6dc74 819 }
AzureIoTClient 39:e98d5df6dc74 820 else
AzureIoTClient 39:e98d5df6dc74 821 {
AzureIoTClient 39:e98d5df6dc74 822 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_109: [`instance->on_message_received_callback` shall be invoked passing the `message` and AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO instance]
AzureIoTClient 39:e98d5df6dc74 823 AMQP_MESSENGER_DISPOSITION_RESULT disposition_result = instance->on_message_received_callback(message, message_disposition_info, instance->on_message_received_context);
AzureIoTClient 39:e98d5df6dc74 824
AzureIoTClient 39:e98d5df6dc74 825 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_110: [If `instance->on_message_received_callback` returns AMQP_MESSENGER_DISPOSITION_RESULT_ACCEPTED, on_message_received_internal_callback shall return the result of messaging_delivery_accepted()]
AzureIoTClient 39:e98d5df6dc74 826 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_111: [If `instance->on_message_received_callback` returns AMQP_MESSENGER_DISPOSITION_RESULT_RELEASED, on_message_received_internal_callback shall return the result of messaging_delivery_released()]
AzureIoTClient 39:e98d5df6dc74 827 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_112: [If `instance->on_message_received_callback` returns AMQP_MESSENGER_DISPOSITION_RESULT_REJECTED, on_message_received_internal_callback shall return the result of messaging_delivery_rejected()]
AzureIoTClient 39:e98d5df6dc74 828 result = create_uamqp_disposition_result_from(disposition_result);
AzureIoTClient 39:e98d5df6dc74 829 }
AzureIoTClient 39:e98d5df6dc74 830
AzureIoTClient 39:e98d5df6dc74 831 return result;
AzureIoTClient 39:e98d5df6dc74 832 }
AzureIoTClient 39:e98d5df6dc74 833
AzureIoTClient 39:e98d5df6dc74 834 static int create_message_receiver(AMQP_MESSENGER_INSTANCE* instance)
AzureIoTClient 39:e98d5df6dc74 835 {
AzureIoTClient 39:e98d5df6dc74 836 int result;
AzureIoTClient 39:e98d5df6dc74 837
AzureIoTClient 39:e98d5df6dc74 838 if ((instance->receiver_link = create_link(role_receiver,
AzureIoTClient 39:e98d5df6dc74 839 instance->session_handle, &instance->config->receive_link, instance->config->iothub_host_fqdn, instance->config->device_id)) == NULL)
AzureIoTClient 39:e98d5df6dc74 840 {
AzureIoTClient 39:e98d5df6dc74 841 LogError("Failed creating the message receiver link");
AzureIoTClient 39:e98d5df6dc74 842 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 843 }
AzureIoTClient 39:e98d5df6dc74 844 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_101: [`instance->message_receiver` shall be created using messagereceiver_create(), passing the `instance->receiver_link` and `on_message_receiver_state_changed_callback`]
AzureIoTClient 39:e98d5df6dc74 845 else if ((instance->message_receiver = messagereceiver_create(instance->receiver_link, on_message_receiver_state_changed_callback, (void*)instance)) == NULL)
AzureIoTClient 39:e98d5df6dc74 846 {
AzureIoTClient 39:e98d5df6dc74 847 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_102: [If messagereceiver_create() fails, amqp_messenger_do_work() shall fail and return]
AzureIoTClient 39:e98d5df6dc74 848 LogError("Failed creating the message receiver (messagereceiver_create failed)");
AzureIoTClient 39:e98d5df6dc74 849 destroy_message_receiver(instance);
AzureIoTClient 39:e98d5df6dc74 850 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 851 }
AzureIoTClient 39:e98d5df6dc74 852 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_103: [`instance->message_receiver` shall be opened using messagereceiver_open() passing `on_message_received_internal_callback`]
AzureIoTClient 39:e98d5df6dc74 853 else if (messagereceiver_open(instance->message_receiver, on_message_received_internal_callback, (void*)instance) != RESULT_OK)
AzureIoTClient 39:e98d5df6dc74 854 {
AzureIoTClient 39:e98d5df6dc74 855 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_104: [If messagereceiver_open() fails, amqp_messenger_do_work() shall fail and return]
AzureIoTClient 39:e98d5df6dc74 856 LogError("Failed opening the AMQP message receiver.");
AzureIoTClient 39:e98d5df6dc74 857 destroy_message_receiver(instance);
AzureIoTClient 39:e98d5df6dc74 858 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 859 }
AzureIoTClient 39:e98d5df6dc74 860 else
AzureIoTClient 39:e98d5df6dc74 861 {
AzureIoTClient 39:e98d5df6dc74 862 result = RESULT_OK;
AzureIoTClient 39:e98d5df6dc74 863 }
AzureIoTClient 39:e98d5df6dc74 864
AzureIoTClient 39:e98d5df6dc74 865 return result;
AzureIoTClient 39:e98d5df6dc74 866 }
AzureIoTClient 39:e98d5df6dc74 867
AzureIoTClient 39:e98d5df6dc74 868 static void on_send_complete_callback(void* context, MESSAGE_SEND_RESULT send_result)
AzureIoTClient 39:e98d5df6dc74 869 {
AzureIoTClient 39:e98d5df6dc74 870 if (context != NULL)
AzureIoTClient 39:e98d5df6dc74 871 {
AzureIoTClient 39:e98d5df6dc74 872 MESSAGE_QUEUE_RESULT mq_result;
AzureIoTClient 39:e98d5df6dc74 873 MESSAGE_SEND_CONTEXT* msg_ctx = (MESSAGE_SEND_CONTEXT*)context;
AzureIoTClient 39:e98d5df6dc74 874
AzureIoTClient 39:e98d5df6dc74 875 if (send_result == MESSAGE_SEND_OK)
AzureIoTClient 39:e98d5df6dc74 876 {
AzureIoTClient 39:e98d5df6dc74 877 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_121: [If no failure occurs, `context->on_process_message_completed_callback` shall be invoked with result MESSAGE_QUEUE_SUCCESS]
AzureIoTClient 39:e98d5df6dc74 878 mq_result = MESSAGE_QUEUE_SUCCESS;
AzureIoTClient 39:e98d5df6dc74 879 }
AzureIoTClient 39:e98d5df6dc74 880 else
AzureIoTClient 39:e98d5df6dc74 881 {
AzureIoTClient 39:e98d5df6dc74 882 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_122: [If a failure occurred, `context->on_process_message_completed_callback` shall be invoked with result MESSAGE_QUEUE_ERROR]
AzureIoTClient 39:e98d5df6dc74 883 mq_result = MESSAGE_QUEUE_ERROR;
AzureIoTClient 39:e98d5df6dc74 884 }
AzureIoTClient 39:e98d5df6dc74 885
AzureIoTClient 39:e98d5df6dc74 886 msg_ctx->on_process_message_completed_callback(msg_ctx->messenger->send_queue, (MQ_MESSAGE_HANDLE)msg_ctx->message, mq_result, NULL);
AzureIoTClient 39:e98d5df6dc74 887 }
AzureIoTClient 39:e98d5df6dc74 888 }
AzureIoTClient 39:e98d5df6dc74 889
AzureIoTClient 39:e98d5df6dc74 890 static void on_process_message_callback(MESSAGE_QUEUE_HANDLE message_queue, MQ_MESSAGE_HANDLE message, PROCESS_MESSAGE_COMPLETED_CALLBACK on_process_message_completed_callback, void* context)
AzureIoTClient 39:e98d5df6dc74 891 {
AzureIoTClient 39:e98d5df6dc74 892 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_117: [If any argument is NULL, `on_process_message_callback` shall return immediatelly]
AzureIoTClient 39:e98d5df6dc74 893 if (message_queue == NULL || message == NULL || on_process_message_completed_callback == NULL || context == NULL)
AzureIoTClient 39:e98d5df6dc74 894 {
AzureIoTClient 39:e98d5df6dc74 895 LogError("Invalid argument (message_queue=%p, message=%p, on_process_message_completed_callback=%p, context=%p)", message_queue, message, on_process_message_completed_callback, context);
AzureIoTClient 39:e98d5df6dc74 896 }
AzureIoTClient 39:e98d5df6dc74 897 else
AzureIoTClient 39:e98d5df6dc74 898 {
AzureIoTClient 39:e98d5df6dc74 899 MESSAGE_SEND_CONTEXT* message_context = (MESSAGE_SEND_CONTEXT*)context;
AzureIoTClient 39:e98d5df6dc74 900 message_context->on_process_message_completed_callback = on_process_message_completed_callback;
AzureIoTClient 39:e98d5df6dc74 901
AzureIoTClient 39:e98d5df6dc74 902 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_118: [The MESSAGE_HANDLE shall be submitted for sending using messagesender_send(), passing `on_send_complete_callback`]
AzureIoTClient 43:3da2d93bb955 903 if (messagesender_send_async(message_context->messenger->message_sender, (MESSAGE_HANDLE)message, on_send_complete_callback, context, 0) == NULL)
AzureIoTClient 39:e98d5df6dc74 904 {
AzureIoTClient 39:e98d5df6dc74 905 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_119: [If messagesender_send() fails, `on_process_message_completed_callback` shall be invoked with result MESSAGE_QUEUE_ERROR]
AzureIoTClient 39:e98d5df6dc74 906 LogError("Failed sending AMQP message");
AzureIoTClient 39:e98d5df6dc74 907 on_process_message_completed_callback(message_queue, message, MESSAGE_QUEUE_ERROR, NULL);
AzureIoTClient 39:e98d5df6dc74 908 }
AzureIoTClient 39:e98d5df6dc74 909
AzureIoTClient 39:e98d5df6dc74 910 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_120: [The MESSAGE_HANDLE shall be destroyed using message_destroy() and marked as destroyed in the context provided]
AzureIoTClient 39:e98d5df6dc74 911 message_destroy((MESSAGE_HANDLE)message);
AzureIoTClient 39:e98d5df6dc74 912 message_context->is_destroyed = true;
AzureIoTClient 39:e98d5df6dc74 913 }
AzureIoTClient 39:e98d5df6dc74 914 }
AzureIoTClient 39:e98d5df6dc74 915
AzureIoTClient 39:e98d5df6dc74 916 static void on_message_processing_completed_callback(MQ_MESSAGE_HANDLE message, MESSAGE_QUEUE_RESULT result, USER_DEFINED_REASON reason, void* message_context)
AzureIoTClient 39:e98d5df6dc74 917 {
AzureIoTClient 39:e98d5df6dc74 918 (void)reason;
AzureIoTClient 39:e98d5df6dc74 919
AzureIoTClient 39:e98d5df6dc74 920 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_020: [If `messenger_context` is NULL, `on_message_processing_completed_callback` shall return immediately]
AzureIoTClient 39:e98d5df6dc74 921 if (message_context == NULL)
AzureIoTClient 39:e98d5df6dc74 922 {
AzureIoTClient 39:e98d5df6dc74 923 LogError("on_message_processing_completed_callback invoked with NULL context");
AzureIoTClient 39:e98d5df6dc74 924 }
AzureIoTClient 39:e98d5df6dc74 925 else
AzureIoTClient 39:e98d5df6dc74 926 {
AzureIoTClient 39:e98d5df6dc74 927 MESSAGE_SEND_CONTEXT* msg_ctx = (MESSAGE_SEND_CONTEXT*)message_context;
AzureIoTClient 39:e98d5df6dc74 928 AMQP_MESSENGER_SEND_RESULT messenger_send_result;
AzureIoTClient 39:e98d5df6dc74 929 AMQP_MESSENGER_REASON messenger_send_reason;
AzureIoTClient 39:e98d5df6dc74 930
AzureIoTClient 39:e98d5df6dc74 931 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_021: [If `result` is MESSAGE_QUEUE_SUCCESS, `send_ctx->on_send_complete_callback` shall be invoked with AMQP_MESSENGER_SEND_RESULT_SUCCESS and AMQP_MESSENGER_REASON_NONE]
AzureIoTClient 39:e98d5df6dc74 932 if (result == MESSAGE_QUEUE_SUCCESS)
AzureIoTClient 39:e98d5df6dc74 933 {
AzureIoTClient 39:e98d5df6dc74 934 messenger_send_result = AMQP_MESSENGER_SEND_RESULT_SUCCESS;
AzureIoTClient 39:e98d5df6dc74 935 messenger_send_reason = AMQP_MESSENGER_REASON_NONE;
AzureIoTClient 39:e98d5df6dc74 936
AzureIoTClient 39:e98d5df6dc74 937 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_025: [If `result` is MESSAGE_QUEUE_SUCCESS or MESSAGE_QUEUE_CANCELLED, `instance->send_error_count` shall be set to 0]
AzureIoTClient 39:e98d5df6dc74 938 msg_ctx->messenger->send_error_count = 0;
AzureIoTClient 39:e98d5df6dc74 939 }
AzureIoTClient 39:e98d5df6dc74 940 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_022: [If `result` is MESSAGE_QUEUE_TIMEOUT, `send_ctx->on_send_complete_callback` shall be invoked with AMQP_MESSENGER_SEND_RESULT_ERROR and AMQP_MESSENGER_REASON_TIMEOUT]
AzureIoTClient 39:e98d5df6dc74 941 else if (result == MESSAGE_QUEUE_TIMEOUT)
AzureIoTClient 39:e98d5df6dc74 942 {
AzureIoTClient 39:e98d5df6dc74 943 messenger_send_result = AMQP_MESSENGER_SEND_RESULT_ERROR;
AzureIoTClient 39:e98d5df6dc74 944 messenger_send_reason = AMQP_MESSENGER_REASON_TIMEOUT;
AzureIoTClient 39:e98d5df6dc74 945
AzureIoTClient 39:e98d5df6dc74 946 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_026: [Otherwise `instance->send_error_count` shall be incremented by 1]
AzureIoTClient 39:e98d5df6dc74 947 msg_ctx->messenger->send_error_count++;
AzureIoTClient 39:e98d5df6dc74 948 }
AzureIoTClient 39:e98d5df6dc74 949 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_023: [If `result` is MESSAGE_QUEUE_CANCELLED and the messenger is STOPPED, `send_ctx->on_send_complete_callback` shall be invoked with AMQP_MESSENGER_SEND_RESULT_CANCELLED and AMQP_MESSENGER_REASON_MESSENGER_DESTROYED]
AzureIoTClient 39:e98d5df6dc74 950 else if (result == MESSAGE_QUEUE_CANCELLED && msg_ctx->messenger->state == AMQP_MESSENGER_STATE_STOPPED)
AzureIoTClient 39:e98d5df6dc74 951 {
AzureIoTClient 39:e98d5df6dc74 952 messenger_send_result = AMQP_MESSENGER_SEND_RESULT_CANCELLED;
AzureIoTClient 39:e98d5df6dc74 953 messenger_send_reason = AMQP_MESSENGER_REASON_MESSENGER_DESTROYED;
AzureIoTClient 39:e98d5df6dc74 954
AzureIoTClient 39:e98d5df6dc74 955 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_025: [If `result` is MESSAGE_QUEUE_SUCCESS or MESSAGE_QUEUE_CANCELLED, `instance->send_error_count` shall be set to 0]
AzureIoTClient 39:e98d5df6dc74 956 msg_ctx->messenger->send_error_count = 0;
AzureIoTClient 39:e98d5df6dc74 957 }
AzureIoTClient 39:e98d5df6dc74 958 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_024: [Otherwise `send_ctx->on_send_complete_callback` shall be invoked with AMQP_MESSENGER_SEND_RESULT_ERROR and AMQP_MESSENGER_REASON_FAIL_SENDING]
AzureIoTClient 39:e98d5df6dc74 959 else
AzureIoTClient 39:e98d5df6dc74 960 {
AzureIoTClient 39:e98d5df6dc74 961 messenger_send_result = AMQP_MESSENGER_SEND_RESULT_ERROR;
AzureIoTClient 39:e98d5df6dc74 962 messenger_send_reason = AMQP_MESSENGER_REASON_FAIL_SENDING;
AzureIoTClient 39:e98d5df6dc74 963
AzureIoTClient 39:e98d5df6dc74 964 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_026: [Otherwise `instance->send_error_count` shall be incremented by 1]
AzureIoTClient 39:e98d5df6dc74 965 msg_ctx->messenger->send_error_count++;
AzureIoTClient 39:e98d5df6dc74 966 }
AzureIoTClient 39:e98d5df6dc74 967
AzureIoTClient 39:e98d5df6dc74 968 if (msg_ctx->on_send_complete_callback != NULL)
AzureIoTClient 39:e98d5df6dc74 969 {
AzureIoTClient 39:e98d5df6dc74 970 msg_ctx->on_send_complete_callback(messenger_send_result, messenger_send_reason, msg_ctx->user_context);
AzureIoTClient 39:e98d5df6dc74 971 }
AzureIoTClient 39:e98d5df6dc74 972
AzureIoTClient 39:e98d5df6dc74 973 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_027: [If `message` has not been destroyed, it shall be destroyed using message_destroy()]
AzureIoTClient 39:e98d5df6dc74 974 if (!msg_ctx->is_destroyed)
AzureIoTClient 39:e98d5df6dc74 975 {
AzureIoTClient 39:e98d5df6dc74 976 message_destroy((MESSAGE_HANDLE)message);
AzureIoTClient 39:e98d5df6dc74 977 }
AzureIoTClient 39:e98d5df6dc74 978
AzureIoTClient 39:e98d5df6dc74 979 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_028: [`send_ctx` shall be destroyed]
AzureIoTClient 39:e98d5df6dc74 980 destroy_message_send_context(msg_ctx);
AzureIoTClient 39:e98d5df6dc74 981 }
AzureIoTClient 39:e98d5df6dc74 982 }
AzureIoTClient 39:e98d5df6dc74 983
AzureIoTClient 39:e98d5df6dc74 984
AzureIoTClient 39:e98d5df6dc74 985 // ---------- Set/Retrieve Options Helpers ----------//
AzureIoTClient 39:e98d5df6dc74 986
AzureIoTClient 39:e98d5df6dc74 987 static void* amqp_messenger_clone_option(const char* name, const void* value)
AzureIoTClient 39:e98d5df6dc74 988 {
AzureIoTClient 39:e98d5df6dc74 989 void* result;
AzureIoTClient 39:e98d5df6dc74 990
AzureIoTClient 39:e98d5df6dc74 991 if (name == NULL || value == NULL)
AzureIoTClient 39:e98d5df6dc74 992 {
AzureIoTClient 39:e98d5df6dc74 993 LogError("invalid argument (name=%p, value=%p)", name, value);
AzureIoTClient 39:e98d5df6dc74 994 result = NULL;
AzureIoTClient 39:e98d5df6dc74 995 }
AzureIoTClient 39:e98d5df6dc74 996 else
AzureIoTClient 39:e98d5df6dc74 997 {
AzureIoTClient 39:e98d5df6dc74 998 if (strcmp(MESSENGER_SAVED_MQ_OPTIONS, name) == 0)
AzureIoTClient 39:e98d5df6dc74 999 {
AzureIoTClient 39:e98d5df6dc74 1000 if ((result = (void*)OptionHandler_Clone((OPTIONHANDLER_HANDLE)value)) == NULL)
AzureIoTClient 39:e98d5df6dc74 1001 {
AzureIoTClient 39:e98d5df6dc74 1002 LogError("failed cloning option '%s'", name);
AzureIoTClient 39:e98d5df6dc74 1003 }
AzureIoTClient 39:e98d5df6dc74 1004 }
AzureIoTClient 39:e98d5df6dc74 1005 else
AzureIoTClient 39:e98d5df6dc74 1006 {
AzureIoTClient 39:e98d5df6dc74 1007 LogError("Failed to clone messenger option (option with name '%s' is not suppported)", name);
AzureIoTClient 39:e98d5df6dc74 1008 result = NULL;
AzureIoTClient 39:e98d5df6dc74 1009 }
AzureIoTClient 39:e98d5df6dc74 1010 }
AzureIoTClient 39:e98d5df6dc74 1011
AzureIoTClient 39:e98d5df6dc74 1012 return result;
AzureIoTClient 39:e98d5df6dc74 1013 }
AzureIoTClient 39:e98d5df6dc74 1014
AzureIoTClient 39:e98d5df6dc74 1015 static void amqp_messenger_destroy_option(const char* name, const void* value)
AzureIoTClient 39:e98d5df6dc74 1016 {
AzureIoTClient 39:e98d5df6dc74 1017 if (name == NULL || value == NULL)
AzureIoTClient 39:e98d5df6dc74 1018 {
AzureIoTClient 39:e98d5df6dc74 1019 LogError("invalid argument (name=%p, value=%p)", name, value);
AzureIoTClient 39:e98d5df6dc74 1020 }
AzureIoTClient 39:e98d5df6dc74 1021 else if (strcmp(MESSENGER_SAVED_MQ_OPTIONS, name) == 0)
AzureIoTClient 39:e98d5df6dc74 1022 {
AzureIoTClient 39:e98d5df6dc74 1023 OptionHandler_Destroy((OPTIONHANDLER_HANDLE)value);
AzureIoTClient 39:e98d5df6dc74 1024 }
AzureIoTClient 39:e98d5df6dc74 1025 else
AzureIoTClient 39:e98d5df6dc74 1026 {
AzureIoTClient 39:e98d5df6dc74 1027 LogError("invalid argument (option '%s' is not suppported)", name);
AzureIoTClient 39:e98d5df6dc74 1028 }
AzureIoTClient 39:e98d5df6dc74 1029 }
AzureIoTClient 39:e98d5df6dc74 1030
AzureIoTClient 39:e98d5df6dc74 1031
AzureIoTClient 39:e98d5df6dc74 1032 // Public API:
AzureIoTClient 39:e98d5df6dc74 1033
AzureIoTClient 39:e98d5df6dc74 1034 int amqp_messenger_subscribe_for_messages(AMQP_MESSENGER_HANDLE messenger_handle, ON_AMQP_MESSENGER_MESSAGE_RECEIVED on_message_received_callback, void* context)
AzureIoTClient 39:e98d5df6dc74 1035 {
AzureIoTClient 39:e98d5df6dc74 1036 int result;
AzureIoTClient 39:e98d5df6dc74 1037
AzureIoTClient 39:e98d5df6dc74 1038 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_035: [If `messenger_handle` or `on_message_received_callback` are NULL, amqp_messenger_subscribe_for_messages() shall fail and return non-zero value]
AzureIoTClient 39:e98d5df6dc74 1039 if (messenger_handle == NULL || on_message_received_callback == NULL || context == NULL)
AzureIoTClient 39:e98d5df6dc74 1040 {
AzureIoTClient 39:e98d5df6dc74 1041 LogError("Invalid argument (messenger_handle=%p, on_message_received_callback=%p, context=%p)", messenger_handle, on_message_received_callback, context);
AzureIoTClient 39:e98d5df6dc74 1042 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 1043 }
AzureIoTClient 39:e98d5df6dc74 1044 else
AzureIoTClient 39:e98d5df6dc74 1045 {
AzureIoTClient 39:e98d5df6dc74 1046 AMQP_MESSENGER_INSTANCE* instance = (AMQP_MESSENGER_INSTANCE*)messenger_handle;
AzureIoTClient 39:e98d5df6dc74 1047
AzureIoTClient 39:e98d5df6dc74 1048 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_036: [`on_message_received_callback` and `context` shall be saved on `instance->on_message_received_callback`]
AzureIoTClient 39:e98d5df6dc74 1049 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_037: [amqp_messenger_subscribe_for_messages() shall set `instance->receive_messages` to true]
AzureIoTClient 39:e98d5df6dc74 1050 instance->on_message_received_callback = on_message_received_callback;
AzureIoTClient 39:e98d5df6dc74 1051 instance->on_message_received_context = context;
AzureIoTClient 39:e98d5df6dc74 1052 instance->receive_messages = true;
AzureIoTClient 39:e98d5df6dc74 1053
AzureIoTClient 39:e98d5df6dc74 1054 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_038: [If no failures occurr, amqp_messenger_subscribe_for_messages() shall return 0]
AzureIoTClient 39:e98d5df6dc74 1055 result = RESULT_OK;
AzureIoTClient 39:e98d5df6dc74 1056 }
AzureIoTClient 39:e98d5df6dc74 1057
AzureIoTClient 39:e98d5df6dc74 1058 return result;
AzureIoTClient 39:e98d5df6dc74 1059 }
AzureIoTClient 39:e98d5df6dc74 1060
AzureIoTClient 39:e98d5df6dc74 1061 int amqp_messenger_unsubscribe_for_messages(AMQP_MESSENGER_HANDLE messenger_handle)
AzureIoTClient 39:e98d5df6dc74 1062 {
AzureIoTClient 39:e98d5df6dc74 1063 int result;
AzureIoTClient 39:e98d5df6dc74 1064
AzureIoTClient 39:e98d5df6dc74 1065 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_039: [If `messenger_handle` is NULL, amqp_messenger_unsubscribe_for_messages() shall fail and return non-zero value]
AzureIoTClient 39:e98d5df6dc74 1066 if (messenger_handle == NULL)
AzureIoTClient 39:e98d5df6dc74 1067 {
AzureIoTClient 39:e98d5df6dc74 1068 LogError("Invalid argument (messenger_handle is NULL)");
AzureIoTClient 39:e98d5df6dc74 1069 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 1070 }
AzureIoTClient 39:e98d5df6dc74 1071 else
AzureIoTClient 39:e98d5df6dc74 1072 {
AzureIoTClient 39:e98d5df6dc74 1073 AMQP_MESSENGER_INSTANCE* instance = (AMQP_MESSENGER_INSTANCE*)messenger_handle;
AzureIoTClient 39:e98d5df6dc74 1074
AzureIoTClient 39:e98d5df6dc74 1075 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_040: [`instance->receive_messages` shall be saved to false]
AzureIoTClient 39:e98d5df6dc74 1076 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_041: [`instance->on_message_received_callback` and `instance->on_message_received_context` shall be set to NULL]
AzureIoTClient 39:e98d5df6dc74 1077 instance->receive_messages = false;
AzureIoTClient 39:e98d5df6dc74 1078 instance->on_message_received_callback = NULL;
AzureIoTClient 39:e98d5df6dc74 1079 instance->on_message_received_context = NULL;
AzureIoTClient 39:e98d5df6dc74 1080
AzureIoTClient 39:e98d5df6dc74 1081 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_042: [If no failures occurr, amqp_messenger_unsubscribe_for_messages() shall return 0]
AzureIoTClient 39:e98d5df6dc74 1082 result = RESULT_OK;
AzureIoTClient 39:e98d5df6dc74 1083 }
AzureIoTClient 39:e98d5df6dc74 1084
AzureIoTClient 39:e98d5df6dc74 1085 return result;
AzureIoTClient 39:e98d5df6dc74 1086 }
AzureIoTClient 39:e98d5df6dc74 1087
AzureIoTClient 39:e98d5df6dc74 1088 int amqp_messenger_send_message_disposition(AMQP_MESSENGER_HANDLE messenger_handle, AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO* disposition_info, AMQP_MESSENGER_DISPOSITION_RESULT disposition_result)
AzureIoTClient 39:e98d5df6dc74 1089 {
AzureIoTClient 39:e98d5df6dc74 1090 int result;
AzureIoTClient 39:e98d5df6dc74 1091
AzureIoTClient 39:e98d5df6dc74 1092 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_043: [If `messenger_handle` or `disposition_info` are NULL, amqp_messenger_send_message_disposition() shall fail and return non-zero value]
AzureIoTClient 39:e98d5df6dc74 1093 if (messenger_handle == NULL || disposition_info == NULL)
AzureIoTClient 39:e98d5df6dc74 1094 {
AzureIoTClient 39:e98d5df6dc74 1095 LogError("Invalid argument (messenger_handle=%p, disposition_info=%p)", messenger_handle, disposition_info);
AzureIoTClient 39:e98d5df6dc74 1096 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 1097 }
AzureIoTClient 39:e98d5df6dc74 1098 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_044: [If `disposition_info->source` is NULL, amqp_messenger_send_message_disposition() shall fail and return non-zero value]
AzureIoTClient 39:e98d5df6dc74 1099 else if (disposition_info->source == NULL)
AzureIoTClient 39:e98d5df6dc74 1100 {
AzureIoTClient 39:e98d5df6dc74 1101 LogError("Failed sending message disposition (disposition_info->source is NULL)");
AzureIoTClient 39:e98d5df6dc74 1102 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 1103 }
AzureIoTClient 39:e98d5df6dc74 1104 else
AzureIoTClient 39:e98d5df6dc74 1105 {
AzureIoTClient 39:e98d5df6dc74 1106 AMQP_MESSENGER_INSTANCE* messenger = (AMQP_MESSENGER_INSTANCE*)messenger_handle;
AzureIoTClient 39:e98d5df6dc74 1107
AzureIoTClient 39:e98d5df6dc74 1108 if (messenger->message_receiver == NULL)
AzureIoTClient 39:e98d5df6dc74 1109 {
AzureIoTClient 39:e98d5df6dc74 1110 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_045: [If `messenger_handle->message_receiver` is NULL, amqp_messenger_send_message_disposition() shall fail and return non-zero value]
AzureIoTClient 39:e98d5df6dc74 1111 LogError("Failed sending message disposition (message_receiver is not created; check if it is subscribed)");
AzureIoTClient 39:e98d5df6dc74 1112 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 1113 }
AzureIoTClient 39:e98d5df6dc74 1114 else
AzureIoTClient 39:e98d5df6dc74 1115 {
AzureIoTClient 39:e98d5df6dc74 1116 AMQP_VALUE uamqp_disposition_result;
AzureIoTClient 39:e98d5df6dc74 1117
AzureIoTClient 39:e98d5df6dc74 1118 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_046: [An AMQP_VALUE disposition result shall be created corresponding to the `disposition_result` provided]
AzureIoTClient 39:e98d5df6dc74 1119 if ((uamqp_disposition_result = create_uamqp_disposition_result_from(disposition_result)) == NULL)
AzureIoTClient 39:e98d5df6dc74 1120 {
AzureIoTClient 39:e98d5df6dc74 1121 LogError("Failed sending message disposition (disposition result %d is not supported)", disposition_result);
AzureIoTClient 39:e98d5df6dc74 1122 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 1123 }
AzureIoTClient 39:e98d5df6dc74 1124 else
AzureIoTClient 39:e98d5df6dc74 1125 {
AzureIoTClient 39:e98d5df6dc74 1126 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_047: [`messagereceiver_send_message_disposition()` shall be invoked passing `disposition_info->source`, `disposition_info->message_id` and the AMQP_VALUE disposition result]
AzureIoTClient 39:e98d5df6dc74 1127 if (messagereceiver_send_message_disposition(messenger->message_receiver, disposition_info->source, disposition_info->message_id, uamqp_disposition_result) != RESULT_OK)
AzureIoTClient 39:e98d5df6dc74 1128 {
AzureIoTClient 39:e98d5df6dc74 1129 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_048: [If `messagereceiver_send_message_disposition()` fails, amqp_messenger_send_message_disposition() shall fail and return non-zero value]
AzureIoTClient 39:e98d5df6dc74 1130 LogError("Failed sending message disposition (messagereceiver_send_message_disposition failed)");
AzureIoTClient 39:e98d5df6dc74 1131 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 1132 }
AzureIoTClient 39:e98d5df6dc74 1133 else
AzureIoTClient 39:e98d5df6dc74 1134 {
AzureIoTClient 39:e98d5df6dc74 1135 destroy_message_disposition_info(disposition_info);
AzureIoTClient 39:e98d5df6dc74 1136
AzureIoTClient 39:e98d5df6dc74 1137 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_050: [If no failures occurr, amqp_messenger_send_message_disposition() shall return 0]
AzureIoTClient 39:e98d5df6dc74 1138 result = RESULT_OK;
AzureIoTClient 39:e98d5df6dc74 1139 }
AzureIoTClient 39:e98d5df6dc74 1140
AzureIoTClient 39:e98d5df6dc74 1141 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_049: [amqp_messenger_send_message_disposition() shall destroy the AMQP_VALUE disposition result]
AzureIoTClient 39:e98d5df6dc74 1142 amqpvalue_destroy(uamqp_disposition_result);
AzureIoTClient 39:e98d5df6dc74 1143 }
AzureIoTClient 39:e98d5df6dc74 1144 }
AzureIoTClient 39:e98d5df6dc74 1145 }
AzureIoTClient 39:e98d5df6dc74 1146
AzureIoTClient 39:e98d5df6dc74 1147 return result;
AzureIoTClient 39:e98d5df6dc74 1148 }
AzureIoTClient 39:e98d5df6dc74 1149
AzureIoTClient 39:e98d5df6dc74 1150 int amqp_messenger_send_async(AMQP_MESSENGER_HANDLE messenger_handle, MESSAGE_HANDLE message, AMQP_MESSENGER_SEND_COMPLETE_CALLBACK on_user_defined_send_complete_callback, void* user_context)
AzureIoTClient 39:e98d5df6dc74 1151 {
AzureIoTClient 39:e98d5df6dc74 1152 int result;
AzureIoTClient 39:e98d5df6dc74 1153
AzureIoTClient 39:e98d5df6dc74 1154 if (messenger_handle == NULL || message == NULL || on_user_defined_send_complete_callback == NULL)
AzureIoTClient 39:e98d5df6dc74 1155 {
AzureIoTClient 39:e98d5df6dc74 1156 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_010: [If `messenger_handle`, `message` or `on_event_send_complete_callback` are NULL, amqp_messenger_send_async() shall fail and return a non-zero value]
AzureIoTClient 39:e98d5df6dc74 1157 LogError("Invalid argument (messenger_handle=%p, message=%p, on_user_defined_send_complete_callback=%p)", messenger_handle, message, on_user_defined_send_complete_callback);
AzureIoTClient 39:e98d5df6dc74 1158 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 1159 }
AzureIoTClient 39:e98d5df6dc74 1160 else
AzureIoTClient 39:e98d5df6dc74 1161 {
AzureIoTClient 39:e98d5df6dc74 1162 MESSAGE_HANDLE cloned_message;
AzureIoTClient 39:e98d5df6dc74 1163
AzureIoTClient 39:e98d5df6dc74 1164 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_011: [`message` shall be cloned using message_clone()]
AzureIoTClient 39:e98d5df6dc74 1165 if ((cloned_message = message_clone(message)) == NULL)
AzureIoTClient 39:e98d5df6dc74 1166 {
AzureIoTClient 39:e98d5df6dc74 1167 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_012: [If message_clone() fails, amqp_messenger_send_async() shall fail and return a non-zero value]
AzureIoTClient 39:e98d5df6dc74 1168 LogError("Failed cloning AMQP message");
AzureIoTClient 39:e98d5df6dc74 1169 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 1170 }
AzureIoTClient 39:e98d5df6dc74 1171 else
AzureIoTClient 39:e98d5df6dc74 1172 {
AzureIoTClient 39:e98d5df6dc74 1173 MESSAGE_SEND_CONTEXT* message_context;
AzureIoTClient 39:e98d5df6dc74 1174 AMQP_MESSENGER_INSTANCE *instance = (AMQP_MESSENGER_INSTANCE*)messenger_handle;
AzureIoTClient 39:e98d5df6dc74 1175
AzureIoTClient 39:e98d5df6dc74 1176 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_013: [amqp_messenger_send_async() shall allocate memory for a MESSAGE_SEND_CONTEXT structure (aka `send_ctx`)]
AzureIoTClient 39:e98d5df6dc74 1177 if ((message_context = create_message_send_context()) == NULL)
AzureIoTClient 39:e98d5df6dc74 1178 {
AzureIoTClient 39:e98d5df6dc74 1179 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_014: [If malloc() fails, amqp_messenger_send_async() shall fail and return a non-zero value]
AzureIoTClient 39:e98d5df6dc74 1180 LogError("Failed creating context for sending message");
AzureIoTClient 39:e98d5df6dc74 1181 message_destroy(cloned_message);
AzureIoTClient 39:e98d5df6dc74 1182 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 1183 }
AzureIoTClient 39:e98d5df6dc74 1184 else
AzureIoTClient 39:e98d5df6dc74 1185 {
AzureIoTClient 39:e98d5df6dc74 1186 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_015: [The cloned `message`, callback and context shall be saved in ``send_ctx`]
AzureIoTClient 39:e98d5df6dc74 1187 message_context->message = cloned_message;
AzureIoTClient 39:e98d5df6dc74 1188 message_context->messenger = instance;
AzureIoTClient 39:e98d5df6dc74 1189 message_context->on_send_complete_callback = on_user_defined_send_complete_callback;
AzureIoTClient 39:e98d5df6dc74 1190 message_context->user_context = user_context;
AzureIoTClient 39:e98d5df6dc74 1191
AzureIoTClient 39:e98d5df6dc74 1192 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_016: [`send_ctx` shall be added to `instance->send_queue` using message_queue_add(), passing `on_message_processing_completed_callback`]
AzureIoTClient 39:e98d5df6dc74 1193 if (message_queue_add(instance->send_queue, (MQ_MESSAGE_HANDLE)cloned_message, on_message_processing_completed_callback, (void*)message_context) != RESULT_OK)
AzureIoTClient 39:e98d5df6dc74 1194 {
AzureIoTClient 39:e98d5df6dc74 1195 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_017: [If message_queue_add() fails, amqp_messenger_send_async() shall fail and return a non-zero value]
AzureIoTClient 39:e98d5df6dc74 1196 LogError("Failed adding message to send queue");
AzureIoTClient 39:e98d5df6dc74 1197 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_018: [If any failure occurs, amqp_messenger_send_async() shall free any memory it has allocated]
AzureIoTClient 39:e98d5df6dc74 1198 destroy_message_send_context(message_context);
AzureIoTClient 39:e98d5df6dc74 1199 message_destroy(cloned_message);
AzureIoTClient 39:e98d5df6dc74 1200 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 1201 }
AzureIoTClient 39:e98d5df6dc74 1202 else
AzureIoTClient 39:e98d5df6dc74 1203 {
AzureIoTClient 39:e98d5df6dc74 1204 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_019: [If no failures occur, amqp_messenger_send_async() shall return zero]
AzureIoTClient 39:e98d5df6dc74 1205 result = RESULT_OK;
AzureIoTClient 39:e98d5df6dc74 1206 }
AzureIoTClient 39:e98d5df6dc74 1207 }
AzureIoTClient 39:e98d5df6dc74 1208 }
AzureIoTClient 39:e98d5df6dc74 1209 }
AzureIoTClient 39:e98d5df6dc74 1210
AzureIoTClient 39:e98d5df6dc74 1211 return result;
AzureIoTClient 39:e98d5df6dc74 1212 }
AzureIoTClient 39:e98d5df6dc74 1213
AzureIoTClient 39:e98d5df6dc74 1214 int amqp_messenger_get_send_status(AMQP_MESSENGER_HANDLE messenger_handle, AMQP_MESSENGER_SEND_STATUS* send_status)
AzureIoTClient 39:e98d5df6dc74 1215 {
AzureIoTClient 39:e98d5df6dc74 1216 int result;
AzureIoTClient 39:e98d5df6dc74 1217
AzureIoTClient 39:e98d5df6dc74 1218 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_029: [If `messenger_handle` or `send_status` are NULL, amqp_messenger_get_send_status() shall fail and return a non-zero value]
AzureIoTClient 39:e98d5df6dc74 1219 if (messenger_handle == NULL || send_status == NULL)
AzureIoTClient 39:e98d5df6dc74 1220 {
AzureIoTClient 39:e98d5df6dc74 1221 LogError("Invalid argument (messenger_handle=%p, send_status=%p)", messenger_handle, send_status);
AzureIoTClient 39:e98d5df6dc74 1222 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 1223 }
AzureIoTClient 39:e98d5df6dc74 1224 else
AzureIoTClient 39:e98d5df6dc74 1225 {
AzureIoTClient 39:e98d5df6dc74 1226 AMQP_MESSENGER_INSTANCE* instance = (AMQP_MESSENGER_INSTANCE*)messenger_handle;
AzureIoTClient 39:e98d5df6dc74 1227 bool is_empty;
AzureIoTClient 39:e98d5df6dc74 1228
AzureIoTClient 39:e98d5df6dc74 1229 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_030: [message_queue_is_empty() shall be invoked for `instance->send_queue`]
AzureIoTClient 39:e98d5df6dc74 1230 if (message_queue_is_empty(instance->send_queue, &is_empty) != 0)
AzureIoTClient 39:e98d5df6dc74 1231 {
AzureIoTClient 39:e98d5df6dc74 1232 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_031: [If message_queue_is_empty() fails, amqp_messenger_get_send_status() shall fail and return a non-zero value]
AzureIoTClient 39:e98d5df6dc74 1233 LogError("Failed verifying if send queue is empty");
AzureIoTClient 39:e98d5df6dc74 1234 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 1235 }
AzureIoTClient 39:e98d5df6dc74 1236 else
AzureIoTClient 39:e98d5df6dc74 1237 {
AzureIoTClient 39:e98d5df6dc74 1238 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_032: [If message_queue_is_empty() returns `true`, `send_status` shall be set to AMQP_MESSENGER_SEND_STATUS_IDLE]
AzureIoTClient 39:e98d5df6dc74 1239 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_033: [Otherwise, `send_status` shall be set to AMQP_MESSENGER_SEND_STATUS_BUSY]
AzureIoTClient 39:e98d5df6dc74 1240 *send_status = (is_empty ? AMQP_MESSENGER_SEND_STATUS_IDLE : AMQP_MESSENGER_SEND_STATUS_BUSY);
AzureIoTClient 39:e98d5df6dc74 1241 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_034: [If no failures occur, amqp_messenger_get_send_status() shall return 0]
AzureIoTClient 39:e98d5df6dc74 1242 result = RESULT_OK;
AzureIoTClient 39:e98d5df6dc74 1243 }
AzureIoTClient 39:e98d5df6dc74 1244 }
AzureIoTClient 39:e98d5df6dc74 1245
AzureIoTClient 39:e98d5df6dc74 1246 return result;
AzureIoTClient 39:e98d5df6dc74 1247 }
AzureIoTClient 39:e98d5df6dc74 1248
AzureIoTClient 39:e98d5df6dc74 1249 int amqp_messenger_start(AMQP_MESSENGER_HANDLE messenger_handle, SESSION_HANDLE session_handle)
AzureIoTClient 39:e98d5df6dc74 1250 {
AzureIoTClient 39:e98d5df6dc74 1251 int result;
AzureIoTClient 39:e98d5df6dc74 1252
AzureIoTClient 39:e98d5df6dc74 1253 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_051: [If `messenger_handle` or `session_handle` are NULL, amqp_messenger_start() shall fail and return non-zero value]
AzureIoTClient 39:e98d5df6dc74 1254 if (messenger_handle == NULL || session_handle == NULL)
AzureIoTClient 39:e98d5df6dc74 1255 {
AzureIoTClient 39:e98d5df6dc74 1256 LogError("Invalid argument (session_handle is NULL)");
AzureIoTClient 39:e98d5df6dc74 1257 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 1258 }
AzureIoTClient 39:e98d5df6dc74 1259 else
AzureIoTClient 39:e98d5df6dc74 1260 {
AzureIoTClient 39:e98d5df6dc74 1261 AMQP_MESSENGER_INSTANCE* instance = (AMQP_MESSENGER_INSTANCE*)messenger_handle;
AzureIoTClient 39:e98d5df6dc74 1262
AzureIoTClient 39:e98d5df6dc74 1263 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_052: [If `instance->state` is not AMQP_MESSENGER_STATE_STOPPED, amqp_messenger_start() shall fail and return non-zero value]
AzureIoTClient 39:e98d5df6dc74 1264 if (instance->state != AMQP_MESSENGER_STATE_STOPPED)
AzureIoTClient 39:e98d5df6dc74 1265 {
AzureIoTClient 39:e98d5df6dc74 1266 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 1267 LogError("amqp_messenger_start failed (current state is %d; expected AMQP_MESSENGER_STATE_STOPPED)", instance->state);
AzureIoTClient 39:e98d5df6dc74 1268 }
AzureIoTClient 39:e98d5df6dc74 1269 else
AzureIoTClient 39:e98d5df6dc74 1270 {
AzureIoTClient 39:e98d5df6dc74 1271 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_053: [`session_handle` shall be saved on `instance->session_handle`]
AzureIoTClient 39:e98d5df6dc74 1272 instance->session_handle = session_handle;
AzureIoTClient 39:e98d5df6dc74 1273
AzureIoTClient 39:e98d5df6dc74 1274 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_054: [If no failures occurr, `instance->state` shall be set to AMQP_MESSENGER_STATE_STARTING, and `instance->on_state_changed_callback` invoked if provided]
AzureIoTClient 39:e98d5df6dc74 1275 update_messenger_state(instance, AMQP_MESSENGER_STATE_STARTING);
AzureIoTClient 39:e98d5df6dc74 1276
AzureIoTClient 39:e98d5df6dc74 1277 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_055: [If no failures occurr, amqp_messenger_start() shall return 0]
AzureIoTClient 39:e98d5df6dc74 1278 result = RESULT_OK;
AzureIoTClient 39:e98d5df6dc74 1279 }
AzureIoTClient 39:e98d5df6dc74 1280 }
AzureIoTClient 39:e98d5df6dc74 1281
AzureIoTClient 39:e98d5df6dc74 1282 return result;
AzureIoTClient 39:e98d5df6dc74 1283 }
AzureIoTClient 39:e98d5df6dc74 1284
AzureIoTClient 39:e98d5df6dc74 1285 int amqp_messenger_stop(AMQP_MESSENGER_HANDLE messenger_handle)
AzureIoTClient 39:e98d5df6dc74 1286 {
AzureIoTClient 39:e98d5df6dc74 1287 int result;
AzureIoTClient 39:e98d5df6dc74 1288
AzureIoTClient 39:e98d5df6dc74 1289 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_056: [If `messenger_handle` is NULL, amqp_messenger_stop() shall fail and return non-zero value]
AzureIoTClient 39:e98d5df6dc74 1290 if (messenger_handle == NULL)
AzureIoTClient 39:e98d5df6dc74 1291 {
AzureIoTClient 39:e98d5df6dc74 1292 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 1293 LogError("Invalid argument (messenger_handle is NULL)");
AzureIoTClient 39:e98d5df6dc74 1294 }
AzureIoTClient 39:e98d5df6dc74 1295 else
AzureIoTClient 39:e98d5df6dc74 1296 {
AzureIoTClient 39:e98d5df6dc74 1297 AMQP_MESSENGER_INSTANCE* instance = (AMQP_MESSENGER_INSTANCE*)messenger_handle;
AzureIoTClient 39:e98d5df6dc74 1298
AzureIoTClient 39:e98d5df6dc74 1299 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_057: [If `instance->state` is AMQP_MESSENGER_STATE_STOPPED, amqp_messenger_stop() shall fail and return non-zero value]
AzureIoTClient 39:e98d5df6dc74 1300 if (instance->state == AMQP_MESSENGER_STATE_STOPPED)
AzureIoTClient 39:e98d5df6dc74 1301 {
AzureIoTClient 39:e98d5df6dc74 1302 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 1303 LogError("amqp_messenger_stop failed (messenger is already stopped)");
AzureIoTClient 39:e98d5df6dc74 1304 }
AzureIoTClient 39:e98d5df6dc74 1305 else
AzureIoTClient 39:e98d5df6dc74 1306 {
AzureIoTClient 39:e98d5df6dc74 1307 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_058: [`instance->state` shall be set to AMQP_MESSENGER_STATE_STOPPING, and `instance->on_state_changed_callback` invoked if provided]
AzureIoTClient 39:e98d5df6dc74 1308 update_messenger_state(instance, AMQP_MESSENGER_STATE_STOPPING);
AzureIoTClient 39:e98d5df6dc74 1309
AzureIoTClient 39:e98d5df6dc74 1310 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_059: [`instance->message_sender` and `instance->message_receiver` shall be destroyed along with all its links]
AzureIoTClient 39:e98d5df6dc74 1311 destroy_message_sender(instance);
AzureIoTClient 39:e98d5df6dc74 1312 destroy_message_receiver(instance);
AzureIoTClient 39:e98d5df6dc74 1313
AzureIoTClient 39:e98d5df6dc74 1314 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_060: [`instance->send_queue` items shall be rolled back using message_queue_move_all_back_to_pending()]
AzureIoTClient 39:e98d5df6dc74 1315 if (message_queue_move_all_back_to_pending(instance->send_queue) != RESULT_OK)
AzureIoTClient 39:e98d5df6dc74 1316 {
AzureIoTClient 39:e98d5df6dc74 1317 LogError("Messenger failed to move events in progress back to wait_to_send list");
AzureIoTClient 39:e98d5df6dc74 1318
AzureIoTClient 39:e98d5df6dc74 1319 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_061: [If message_queue_move_all_back_to_pending() fails, amqp_messenger_stop() shall change the messenger state to AMQP_MESSENGER_STATE_ERROR and return a non-zero value]
AzureIoTClient 39:e98d5df6dc74 1320 update_messenger_state(instance, AMQP_MESSENGER_STATE_ERROR);
AzureIoTClient 39:e98d5df6dc74 1321 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 1322 }
AzureIoTClient 39:e98d5df6dc74 1323 else
AzureIoTClient 39:e98d5df6dc74 1324 {
AzureIoTClient 39:e98d5df6dc74 1325 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_062: [`instance->state` shall be set to AMQP_MESSENGER_STATE_STOPPED, and `instance->on_state_changed_callback` invoked if provided]
AzureIoTClient 39:e98d5df6dc74 1326 update_messenger_state(instance, AMQP_MESSENGER_STATE_STOPPED);
AzureIoTClient 39:e98d5df6dc74 1327 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_063: [If no failures occurr, amqp_messenger_stop() shall return 0]
AzureIoTClient 39:e98d5df6dc74 1328 result = RESULT_OK;
AzureIoTClient 39:e98d5df6dc74 1329 }
AzureIoTClient 39:e98d5df6dc74 1330 }
AzureIoTClient 39:e98d5df6dc74 1331 }
AzureIoTClient 39:e98d5df6dc74 1332
AzureIoTClient 39:e98d5df6dc74 1333 return result;
AzureIoTClient 39:e98d5df6dc74 1334 }
AzureIoTClient 39:e98d5df6dc74 1335
AzureIoTClient 39:e98d5df6dc74 1336 // @brief
AzureIoTClient 39:e98d5df6dc74 1337 // Sets the messenger module state based on the state changes from messagesender and messagereceiver
AzureIoTClient 39:e98d5df6dc74 1338 static void process_state_changes(AMQP_MESSENGER_INSTANCE* instance)
AzureIoTClient 39:e98d5df6dc74 1339 {
AzureIoTClient 39:e98d5df6dc74 1340 // Note: messagesender and messagereceiver are still not created or already destroyed
AzureIoTClient 39:e98d5df6dc74 1341 // when state is AMQP_MESSENGER_STATE_STOPPED, so no checking is needed there.
AzureIoTClient 39:e98d5df6dc74 1342
AzureIoTClient 39:e98d5df6dc74 1343 if (instance->state == AMQP_MESSENGER_STATE_STARTED)
AzureIoTClient 39:e98d5df6dc74 1344 {
AzureIoTClient 39:e98d5df6dc74 1345 if (instance->message_sender_current_state != MESSAGE_SENDER_STATE_OPEN)
AzureIoTClient 39:e98d5df6dc74 1346 {
AzureIoTClient 39:e98d5df6dc74 1347 LogError("messagesender reported unexpected state %d while messenger was started", instance->message_sender_current_state);
AzureIoTClient 39:e98d5df6dc74 1348 update_messenger_state(instance, AMQP_MESSENGER_STATE_ERROR);
AzureIoTClient 39:e98d5df6dc74 1349 }
AzureIoTClient 39:e98d5df6dc74 1350 else if (instance->message_receiver != NULL && instance->message_receiver_current_state != MESSAGE_RECEIVER_STATE_OPEN)
AzureIoTClient 39:e98d5df6dc74 1351 {
AzureIoTClient 39:e98d5df6dc74 1352 if (instance->message_receiver_current_state == MESSAGE_RECEIVER_STATE_OPENING)
AzureIoTClient 39:e98d5df6dc74 1353 {
AzureIoTClient 39:e98d5df6dc74 1354 bool is_timed_out;
AzureIoTClient 39:e98d5df6dc74 1355 if (is_timeout_reached(instance->last_message_receiver_state_change_time, MAX_MESSAGE_RECEIVER_STATE_CHANGE_TIMEOUT_SECS, &is_timed_out) != RESULT_OK)
AzureIoTClient 39:e98d5df6dc74 1356 {
AzureIoTClient 39:e98d5df6dc74 1357 LogError("messenger got an error (failed to verify messagereceiver start timeout)");
AzureIoTClient 39:e98d5df6dc74 1358 update_messenger_state(instance, AMQP_MESSENGER_STATE_ERROR);
AzureIoTClient 39:e98d5df6dc74 1359 }
AzureIoTClient 39:e98d5df6dc74 1360 else if (is_timed_out)
AzureIoTClient 39:e98d5df6dc74 1361 {
AzureIoTClient 39:e98d5df6dc74 1362 LogError("messenger got an error (messagereceiver failed to start within expected timeout (%d secs))", MAX_MESSAGE_RECEIVER_STATE_CHANGE_TIMEOUT_SECS);
AzureIoTClient 39:e98d5df6dc74 1363 update_messenger_state(instance, AMQP_MESSENGER_STATE_ERROR);
AzureIoTClient 39:e98d5df6dc74 1364 }
AzureIoTClient 39:e98d5df6dc74 1365 }
AzureIoTClient 39:e98d5df6dc74 1366 else if (instance->message_receiver_current_state == MESSAGE_RECEIVER_STATE_ERROR ||
AzureIoTClient 39:e98d5df6dc74 1367 instance->message_receiver_current_state == MESSAGE_RECEIVER_STATE_IDLE)
AzureIoTClient 39:e98d5df6dc74 1368 {
AzureIoTClient 39:e98d5df6dc74 1369 LogError("messagereceiver reported unexpected state %d while messenger is starting", instance->message_receiver_current_state);
AzureIoTClient 39:e98d5df6dc74 1370 update_messenger_state(instance, AMQP_MESSENGER_STATE_ERROR);
AzureIoTClient 39:e98d5df6dc74 1371 }
AzureIoTClient 39:e98d5df6dc74 1372 }
AzureIoTClient 39:e98d5df6dc74 1373 }
AzureIoTClient 39:e98d5df6dc74 1374 else
AzureIoTClient 39:e98d5df6dc74 1375 {
AzureIoTClient 39:e98d5df6dc74 1376 if (instance->state == AMQP_MESSENGER_STATE_STARTING)
AzureIoTClient 39:e98d5df6dc74 1377 {
AzureIoTClient 39:e98d5df6dc74 1378 if (instance->message_sender_current_state == MESSAGE_SENDER_STATE_OPEN)
AzureIoTClient 39:e98d5df6dc74 1379 {
AzureIoTClient 39:e98d5df6dc74 1380 update_messenger_state(instance, AMQP_MESSENGER_STATE_STARTED);
AzureIoTClient 39:e98d5df6dc74 1381 }
AzureIoTClient 39:e98d5df6dc74 1382 else if (instance->message_sender_current_state == MESSAGE_SENDER_STATE_OPENING)
AzureIoTClient 39:e98d5df6dc74 1383 {
AzureIoTClient 39:e98d5df6dc74 1384 bool is_timed_out;
AzureIoTClient 39:e98d5df6dc74 1385 if (is_timeout_reached(instance->last_message_sender_state_change_time, MAX_MESSAGE_SENDER_STATE_CHANGE_TIMEOUT_SECS, &is_timed_out) != RESULT_OK)
AzureIoTClient 39:e98d5df6dc74 1386 {
AzureIoTClient 39:e98d5df6dc74 1387 LogError("messenger failed to start (failed to verify messagesender start timeout)");
AzureIoTClient 39:e98d5df6dc74 1388 update_messenger_state(instance, AMQP_MESSENGER_STATE_ERROR);
AzureIoTClient 39:e98d5df6dc74 1389 }
AzureIoTClient 39:e98d5df6dc74 1390 else if (is_timed_out)
AzureIoTClient 39:e98d5df6dc74 1391 {
AzureIoTClient 39:e98d5df6dc74 1392 LogError("messenger failed to start (messagesender failed to start within expected timeout (%d secs))", MAX_MESSAGE_SENDER_STATE_CHANGE_TIMEOUT_SECS);
AzureIoTClient 39:e98d5df6dc74 1393 update_messenger_state(instance, AMQP_MESSENGER_STATE_ERROR);
AzureIoTClient 39:e98d5df6dc74 1394 }
AzureIoTClient 39:e98d5df6dc74 1395 }
AzureIoTClient 39:e98d5df6dc74 1396 // For this module, the only valid scenario where messagesender state is IDLE is if
AzureIoTClient 39:e98d5df6dc74 1397 // the messagesender hasn't been created yet or already destroyed.
AzureIoTClient 39:e98d5df6dc74 1398 else if ((instance->message_sender_current_state == MESSAGE_SENDER_STATE_ERROR) ||
AzureIoTClient 39:e98d5df6dc74 1399 (instance->message_sender_current_state == MESSAGE_SENDER_STATE_CLOSING) ||
AzureIoTClient 39:e98d5df6dc74 1400 (instance->message_sender_current_state == MESSAGE_SENDER_STATE_IDLE && instance->message_sender != NULL))
AzureIoTClient 39:e98d5df6dc74 1401 {
AzureIoTClient 39:e98d5df6dc74 1402 LogError("messagesender reported unexpected state %d while messenger is starting", instance->message_sender_current_state);
AzureIoTClient 39:e98d5df6dc74 1403 update_messenger_state(instance, AMQP_MESSENGER_STATE_ERROR);
AzureIoTClient 39:e98d5df6dc74 1404 }
AzureIoTClient 39:e98d5df6dc74 1405 }
AzureIoTClient 39:e98d5df6dc74 1406 // message sender and receiver are stopped/destroyed synchronously, so no need for state control.
AzureIoTClient 39:e98d5df6dc74 1407 }
AzureIoTClient 39:e98d5df6dc74 1408 }
AzureIoTClient 39:e98d5df6dc74 1409
AzureIoTClient 39:e98d5df6dc74 1410 static void manage_amqp_messengers(AMQP_MESSENGER_INSTANCE* msgr)
AzureIoTClient 39:e98d5df6dc74 1411 {
AzureIoTClient 39:e98d5df6dc74 1412 if (msgr->state == AMQP_MESSENGER_STATE_STARTING)
AzureIoTClient 39:e98d5df6dc74 1413 {
AzureIoTClient 39:e98d5df6dc74 1414 if (msgr->message_sender == NULL)
AzureIoTClient 39:e98d5df6dc74 1415 {
AzureIoTClient 39:e98d5df6dc74 1416 if (create_message_sender(msgr) != RESULT_OK)
AzureIoTClient 39:e98d5df6dc74 1417 {
AzureIoTClient 39:e98d5df6dc74 1418 update_messenger_state(msgr, AMQP_MESSENGER_STATE_ERROR);
AzureIoTClient 39:e98d5df6dc74 1419 }
AzureIoTClient 39:e98d5df6dc74 1420 }
AzureIoTClient 39:e98d5df6dc74 1421 }
AzureIoTClient 39:e98d5df6dc74 1422 else if (msgr->state == AMQP_MESSENGER_STATE_STARTED)
AzureIoTClient 39:e98d5df6dc74 1423 {
AzureIoTClient 39:e98d5df6dc74 1424 if (msgr->receive_messages == true &&
AzureIoTClient 39:e98d5df6dc74 1425 msgr->message_receiver == NULL &&
AzureIoTClient 39:e98d5df6dc74 1426 create_message_receiver(msgr) != RESULT_OK)
AzureIoTClient 39:e98d5df6dc74 1427 {
AzureIoTClient 39:e98d5df6dc74 1428 LogError("amqp_messenger_do_work warning (failed creating the message receiver [%s])", msgr->config->device_id);
AzureIoTClient 39:e98d5df6dc74 1429 }
AzureIoTClient 39:e98d5df6dc74 1430 else if (msgr->receive_messages == false && msgr->message_receiver != NULL)
AzureIoTClient 39:e98d5df6dc74 1431 {
AzureIoTClient 39:e98d5df6dc74 1432 destroy_message_receiver(msgr);
AzureIoTClient 39:e98d5df6dc74 1433 }
AzureIoTClient 39:e98d5df6dc74 1434 }
AzureIoTClient 39:e98d5df6dc74 1435 }
AzureIoTClient 39:e98d5df6dc74 1436
AzureIoTClient 39:e98d5df6dc74 1437 static void handle_errors_and_timeouts(AMQP_MESSENGER_INSTANCE* msgr)
AzureIoTClient 39:e98d5df6dc74 1438 {
AzureIoTClient 39:e98d5df6dc74 1439 if (msgr->send_error_count >= msgr->max_send_error_count)
AzureIoTClient 39:e98d5df6dc74 1440 {
AzureIoTClient 39:e98d5df6dc74 1441 LogError("Reached max number of consecutive send failures (%d, %d)", msgr->config->device_id, msgr->max_send_error_count);
AzureIoTClient 39:e98d5df6dc74 1442 update_messenger_state(msgr, AMQP_MESSENGER_STATE_ERROR);
AzureIoTClient 39:e98d5df6dc74 1443 }
AzureIoTClient 39:e98d5df6dc74 1444 }
AzureIoTClient 39:e98d5df6dc74 1445
AzureIoTClient 39:e98d5df6dc74 1446 void amqp_messenger_do_work(AMQP_MESSENGER_HANDLE messenger_handle)
AzureIoTClient 39:e98d5df6dc74 1447 {
AzureIoTClient 39:e98d5df6dc74 1448 if (messenger_handle == NULL)
AzureIoTClient 39:e98d5df6dc74 1449 {
AzureIoTClient 39:e98d5df6dc74 1450 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_064: [If `messenger_handle` is NULL, amqp_messenger_do_work() shall fail and return]
AzureIoTClient 39:e98d5df6dc74 1451 LogError("Invalid argument (messenger_handle is NULL)");
AzureIoTClient 39:e98d5df6dc74 1452 }
AzureIoTClient 39:e98d5df6dc74 1453 else
AzureIoTClient 39:e98d5df6dc74 1454 {
AzureIoTClient 39:e98d5df6dc74 1455 AMQP_MESSENGER_INSTANCE* msgr = (AMQP_MESSENGER_INSTANCE*)messenger_handle;
AzureIoTClient 39:e98d5df6dc74 1456
AzureIoTClient 39:e98d5df6dc74 1457 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_065: [amqp_messenger_do_work() shall update the current state according to the states of message sender and receiver]
AzureIoTClient 39:e98d5df6dc74 1458 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_068: [If the message sender state changes to MESSAGE_SENDER_STATE_OPEN, amqp_messenger_do_work() shall set the state to AMQP_MESSENGER_STATE_STARTED]
AzureIoTClient 39:e98d5df6dc74 1459 process_state_changes(msgr);
AzureIoTClient 39:e98d5df6dc74 1460
AzureIoTClient 39:e98d5df6dc74 1461 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_066: [If the current state is AMQP_MESSENGER_STARTING, amqp_messenger_do_work() shall create and start `instance->message_sender`]
AzureIoTClient 39:e98d5df6dc74 1462 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_067: [If the `instance->message_sender` fails to be created/started, amqp_messenger_do_work() shall set the state to AMQP_MESSENGER_STATE_ERROR]
AzureIoTClient 39:e98d5df6dc74 1463 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_069: [If the `instance->receive_messages` is true, amqp_messenger_do_work() shall create and start `instance->message_receiver` if not done before]
AzureIoTClient 39:e98d5df6dc74 1464 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_070: [If the `instance->message_receiver` fails to be created/started, amqp_messenger_do_work() shall set the state to AMQP_MESSENGER_STATE_ERROR]
AzureIoTClient 39:e98d5df6dc74 1465 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_071: [If the `instance->receive_messages` is false, amqp_messenger_do_work() shall stop and destroy `instance->message_receiver` if not done before]
AzureIoTClient 39:e98d5df6dc74 1466 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_072: [If the `instance->message_receiver` fails to be stopped/destroyed, amqp_messenger_do_work() shall set the state to AMQP_MESSENGER_STATE_ERROR]
AzureIoTClient 39:e98d5df6dc74 1467 manage_amqp_messengers(msgr);
AzureIoTClient 39:e98d5df6dc74 1468
AzureIoTClient 39:e98d5df6dc74 1469 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_073: [amqp_messenger_do_work() shall invoke message_queue_do_work() on `instance->send_queue`]
AzureIoTClient 39:e98d5df6dc74 1470 if (msgr->state == AMQP_MESSENGER_STATE_STARTED)
AzureIoTClient 39:e98d5df6dc74 1471 {
AzureIoTClient 39:e98d5df6dc74 1472 message_queue_do_work(msgr->send_queue);
AzureIoTClient 39:e98d5df6dc74 1473 }
AzureIoTClient 39:e98d5df6dc74 1474
AzureIoTClient 39:e98d5df6dc74 1475 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_074: [If `instance->send_error_count` DEFAULT_MAX_SEND_ERROR_COUNT (10), amqp_messenger_do_work() shall change the state to AMQP_MESSENGER_STATE_ERROR]
AzureIoTClient 39:e98d5df6dc74 1476 handle_errors_and_timeouts(msgr);
AzureIoTClient 39:e98d5df6dc74 1477 }
AzureIoTClient 39:e98d5df6dc74 1478 }
AzureIoTClient 39:e98d5df6dc74 1479
AzureIoTClient 39:e98d5df6dc74 1480 void amqp_messenger_destroy(AMQP_MESSENGER_HANDLE messenger_handle)
AzureIoTClient 39:e98d5df6dc74 1481 {
AzureIoTClient 39:e98d5df6dc74 1482 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_123: [If `messenger_handle` is NULL, amqp_messenger_destroy() shall fail and return]
AzureIoTClient 39:e98d5df6dc74 1483 if (messenger_handle == NULL)
AzureIoTClient 39:e98d5df6dc74 1484 {
AzureIoTClient 39:e98d5df6dc74 1485 LogError("invalid argument (messenger_handle is NULL)");
AzureIoTClient 39:e98d5df6dc74 1486 }
AzureIoTClient 39:e98d5df6dc74 1487 else
AzureIoTClient 39:e98d5df6dc74 1488 {
AzureIoTClient 39:e98d5df6dc74 1489 AMQP_MESSENGER_INSTANCE* instance = (AMQP_MESSENGER_INSTANCE*)messenger_handle;
AzureIoTClient 39:e98d5df6dc74 1490
AzureIoTClient 39:e98d5df6dc74 1491 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_124: [If `instance->state` is not AMQP_MESSENGER_STATE_STOPPED, amqp_messenger_destroy() shall invoke amqp_messenger_stop()]
AzureIoTClient 39:e98d5df6dc74 1492 if (instance->state != AMQP_MESSENGER_STATE_STOPPED)
AzureIoTClient 39:e98d5df6dc74 1493 {
AzureIoTClient 39:e98d5df6dc74 1494 (void)amqp_messenger_stop(messenger_handle);
AzureIoTClient 39:e98d5df6dc74 1495 }
AzureIoTClient 39:e98d5df6dc74 1496
AzureIoTClient 39:e98d5df6dc74 1497 if (instance->send_queue != NULL)
AzureIoTClient 39:e98d5df6dc74 1498 {
AzureIoTClient 39:e98d5df6dc74 1499 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_125: [message_queue_destroy() shall be invoked for `instance->send_queue`]
AzureIoTClient 39:e98d5df6dc74 1500 message_queue_destroy(instance->send_queue);
AzureIoTClient 39:e98d5df6dc74 1501 }
AzureIoTClient 39:e98d5df6dc74 1502
AzureIoTClient 39:e98d5df6dc74 1503 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_126: [amqp_messenger_destroy() shall release all the memory allocated for `instance`]
AzureIoTClient 39:e98d5df6dc74 1504 destroy_configuration(instance->config);
AzureIoTClient 39:e98d5df6dc74 1505
AzureIoTClient 39:e98d5df6dc74 1506 free((void*)instance);
AzureIoTClient 39:e98d5df6dc74 1507 }
AzureIoTClient 39:e98d5df6dc74 1508 }
AzureIoTClient 39:e98d5df6dc74 1509
AzureIoTClient 39:e98d5df6dc74 1510 AMQP_MESSENGER_HANDLE amqp_messenger_create(const AMQP_MESSENGER_CONFIG* messenger_config)
AzureIoTClient 39:e98d5df6dc74 1511 {
AzureIoTClient 39:e98d5df6dc74 1512 AMQP_MESSENGER_HANDLE handle;
AzureIoTClient 39:e98d5df6dc74 1513
AzureIoTClient 39:e98d5df6dc74 1514 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_001: [If `messenger_config` is NULL, amqp_messenger_create() shall return NULL]
AzureIoTClient 39:e98d5df6dc74 1515 if (!is_valid_configuration(messenger_config))
AzureIoTClient 39:e98d5df6dc74 1516 {
AzureIoTClient 39:e98d5df6dc74 1517 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_002: [If `messenger_config`'s `device_id`, `iothub_host_fqdn`, `receive_link.source_suffix` or `send_link.target_suffix` are NULL, amqp_messenger_create() shall return NULL]
AzureIoTClient 39:e98d5df6dc74 1518 handle = NULL;
AzureIoTClient 39:e98d5df6dc74 1519 }
AzureIoTClient 39:e98d5df6dc74 1520 else
AzureIoTClient 39:e98d5df6dc74 1521 {
AzureIoTClient 39:e98d5df6dc74 1522 AMQP_MESSENGER_INSTANCE* instance;
AzureIoTClient 39:e98d5df6dc74 1523
AzureIoTClient 39:e98d5df6dc74 1524 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_003: [amqp_messenger_create() shall allocate memory for the messenger instance structure (aka `instance`)]
AzureIoTClient 39:e98d5df6dc74 1525 if ((instance = (AMQP_MESSENGER_INSTANCE*)malloc(sizeof(AMQP_MESSENGER_INSTANCE))) == NULL)
AzureIoTClient 39:e98d5df6dc74 1526 {
AzureIoTClient 39:e98d5df6dc74 1527 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_004: [If malloc() fails, amqp_messenger_create() shall fail and return NULL]
AzureIoTClient 39:e98d5df6dc74 1528 LogError("Failed allocating AMQP_MESSENGER_INSTANCE");
AzureIoTClient 39:e98d5df6dc74 1529 handle = NULL;
AzureIoTClient 39:e98d5df6dc74 1530 }
AzureIoTClient 39:e98d5df6dc74 1531 else
AzureIoTClient 39:e98d5df6dc74 1532 {
AzureIoTClient 39:e98d5df6dc74 1533 memset(instance, 0, sizeof(AMQP_MESSENGER_INSTANCE));
AzureIoTClient 39:e98d5df6dc74 1534
AzureIoTClient 39:e98d5df6dc74 1535 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_005: [amqp_messenger_create() shall save a copy of `messenger_config` into `instance`]
AzureIoTClient 39:e98d5df6dc74 1536 if ((instance->config = clone_configuration(messenger_config)) == NULL)
AzureIoTClient 39:e98d5df6dc74 1537 {
AzureIoTClient 39:e98d5df6dc74 1538 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_006: [If the copy fails, amqp_messenger_create() shall fail and return NULL]
AzureIoTClient 39:e98d5df6dc74 1539 LogError("Failed copying AMQP messenger configuration");
AzureIoTClient 39:e98d5df6dc74 1540 handle = NULL;
AzureIoTClient 39:e98d5df6dc74 1541 }
AzureIoTClient 39:e98d5df6dc74 1542 else
AzureIoTClient 39:e98d5df6dc74 1543 {
AzureIoTClient 39:e98d5df6dc74 1544 MESSAGE_QUEUE_CONFIG mq_config;
AzureIoTClient 39:e98d5df6dc74 1545 mq_config.max_retry_count = DEFAULT_EVENT_SEND_RETRY_LIMIT;
AzureIoTClient 39:e98d5df6dc74 1546 mq_config.max_message_enqueued_time_secs = DEFAULT_EVENT_SEND_TIMEOUT_SECS;
AzureIoTClient 39:e98d5df6dc74 1547 mq_config.max_message_processing_time_secs = 0;
AzureIoTClient 39:e98d5df6dc74 1548 mq_config.on_process_message_callback = on_process_message_callback;
AzureIoTClient 39:e98d5df6dc74 1549
AzureIoTClient 39:e98d5df6dc74 1550 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_007: [`instance->send_queue` shall be set using message_queue_create(), passing `on_process_message_callback`]
AzureIoTClient 39:e98d5df6dc74 1551 if ((instance->send_queue = message_queue_create(&mq_config)) == NULL)
AzureIoTClient 39:e98d5df6dc74 1552 {
AzureIoTClient 39:e98d5df6dc74 1553 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_008: [If message_queue_create() fails, amqp_messenger_create() shall fail and return NULL]
AzureIoTClient 39:e98d5df6dc74 1554 LogError("Failed creating message queue");
AzureIoTClient 39:e98d5df6dc74 1555 handle = NULL;
AzureIoTClient 39:e98d5df6dc74 1556 }
AzureIoTClient 39:e98d5df6dc74 1557 else
AzureIoTClient 39:e98d5df6dc74 1558 {
AzureIoTClient 39:e98d5df6dc74 1559 instance->state = AMQP_MESSENGER_STATE_STOPPED;
AzureIoTClient 39:e98d5df6dc74 1560 instance->message_sender_current_state = MESSAGE_SENDER_STATE_IDLE;
AzureIoTClient 39:e98d5df6dc74 1561 instance->message_sender_previous_state = MESSAGE_SENDER_STATE_IDLE;
AzureIoTClient 39:e98d5df6dc74 1562 instance->message_receiver_current_state = MESSAGE_RECEIVER_STATE_IDLE;
AzureIoTClient 39:e98d5df6dc74 1563 instance->message_receiver_previous_state = MESSAGE_RECEIVER_STATE_IDLE;
AzureIoTClient 39:e98d5df6dc74 1564 instance->last_message_sender_state_change_time = INDEFINITE_TIME;
AzureIoTClient 39:e98d5df6dc74 1565 instance->last_message_receiver_state_change_time = INDEFINITE_TIME;
AzureIoTClient 39:e98d5df6dc74 1566 instance->max_send_error_count = DEFAULT_MAX_SEND_ERROR_COUNT;
AzureIoTClient 39:e98d5df6dc74 1567 instance->receive_messages = false;
AzureIoTClient 39:e98d5df6dc74 1568
AzureIoTClient 39:e98d5df6dc74 1569 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_009: [If no failures occurr, amqp_messenger_create() shall return a handle to `instance`]
AzureIoTClient 39:e98d5df6dc74 1570 handle = (AMQP_MESSENGER_HANDLE)instance;
AzureIoTClient 39:e98d5df6dc74 1571 }
AzureIoTClient 39:e98d5df6dc74 1572 }
AzureIoTClient 39:e98d5df6dc74 1573 }
AzureIoTClient 39:e98d5df6dc74 1574
AzureIoTClient 39:e98d5df6dc74 1575 if (handle == NULL)
AzureIoTClient 39:e98d5df6dc74 1576 {
AzureIoTClient 39:e98d5df6dc74 1577 amqp_messenger_destroy((AMQP_MESSENGER_HANDLE)instance);
AzureIoTClient 39:e98d5df6dc74 1578 }
AzureIoTClient 39:e98d5df6dc74 1579 }
AzureIoTClient 39:e98d5df6dc74 1580
AzureIoTClient 39:e98d5df6dc74 1581 return handle;
AzureIoTClient 39:e98d5df6dc74 1582 }
AzureIoTClient 39:e98d5df6dc74 1583
AzureIoTClient 39:e98d5df6dc74 1584 int amqp_messenger_set_option(AMQP_MESSENGER_HANDLE messenger_handle, const char* name, void* value)
AzureIoTClient 39:e98d5df6dc74 1585 {
AzureIoTClient 39:e98d5df6dc74 1586 int result;
AzureIoTClient 39:e98d5df6dc74 1587
AzureIoTClient 39:e98d5df6dc74 1588 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_127: [If `messenger_handle` or `name` or `value` is NULL, amqp_messenger_set_option shall fail and return a non-zero value]
AzureIoTClient 39:e98d5df6dc74 1589 if (messenger_handle == NULL || name == NULL || value == NULL)
AzureIoTClient 39:e98d5df6dc74 1590 {
AzureIoTClient 39:e98d5df6dc74 1591 LogError("Invalid argument (messenger_handle=%p, name=%p, value=%p)",
AzureIoTClient 39:e98d5df6dc74 1592 messenger_handle, name, value);
AzureIoTClient 39:e98d5df6dc74 1593 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 1594 }
AzureIoTClient 39:e98d5df6dc74 1595 else
AzureIoTClient 39:e98d5df6dc74 1596 {
AzureIoTClient 39:e98d5df6dc74 1597 AMQP_MESSENGER_INSTANCE* instance = (AMQP_MESSENGER_INSTANCE*)messenger_handle;
AzureIoTClient 39:e98d5df6dc74 1598
AzureIoTClient 51:269e65571b39 1599 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_128: [If name matches AMQP_MESSENGER_OPTION_EVENT_SEND_TIMEOUT_SECS, `value` shall be set on `instance->send_queue` using message_queue_set_max_message_enqueued_time_secs()]
AzureIoTClient 51:269e65571b39 1600 if (strcmp(AMQP_MESSENGER_OPTION_EVENT_SEND_TIMEOUT_SECS, name) == 0)
AzureIoTClient 39:e98d5df6dc74 1601 {
AzureIoTClient 39:e98d5df6dc74 1602 if (message_queue_set_max_message_enqueued_time_secs(instance->send_queue, *(size_t*)value) != 0)
AzureIoTClient 39:e98d5df6dc74 1603 {
AzureIoTClient 39:e98d5df6dc74 1604 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_129: [If message_queue_set_max_message_enqueued_time_secs() fails, amqp_messenger_set_option() shall fail and return a non-zero value]
AzureIoTClient 51:269e65571b39 1605 LogError("Failed setting option %s", AMQP_MESSENGER_OPTION_EVENT_SEND_TIMEOUT_SECS);
AzureIoTClient 39:e98d5df6dc74 1606 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 1607 }
AzureIoTClient 39:e98d5df6dc74 1608 else
AzureIoTClient 39:e98d5df6dc74 1609 {
AzureIoTClient 39:e98d5df6dc74 1610 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_131: [If no errors occur, amqp_messenger_set_option shall return 0]
AzureIoTClient 39:e98d5df6dc74 1611 result = RESULT_OK;
AzureIoTClient 39:e98d5df6dc74 1612 }
AzureIoTClient 39:e98d5df6dc74 1613 }
AzureIoTClient 39:e98d5df6dc74 1614 else
AzureIoTClient 39:e98d5df6dc74 1615 {
AzureIoTClient 39:e98d5df6dc74 1616 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_130: [If `name` does not match any supported option, amqp_messenger_set_option() shall fail and return a non-zero value]
AzureIoTClient 39:e98d5df6dc74 1617 LogError("Invalid argument (option '%s' is not valid)", name);
AzureIoTClient 39:e98d5df6dc74 1618 result = __FAILURE__;
AzureIoTClient 39:e98d5df6dc74 1619 }
AzureIoTClient 39:e98d5df6dc74 1620 }
AzureIoTClient 39:e98d5df6dc74 1621
AzureIoTClient 39:e98d5df6dc74 1622 return result;
AzureIoTClient 39:e98d5df6dc74 1623 }
AzureIoTClient 39:e98d5df6dc74 1624
AzureIoTClient 39:e98d5df6dc74 1625 OPTIONHANDLER_HANDLE amqp_messenger_retrieve_options(AMQP_MESSENGER_HANDLE messenger_handle)
AzureIoTClient 39:e98d5df6dc74 1626 {
AzureIoTClient 39:e98d5df6dc74 1627 OPTIONHANDLER_HANDLE result;
AzureIoTClient 39:e98d5df6dc74 1628
AzureIoTClient 39:e98d5df6dc74 1629 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_132: [If `messenger_handle` is NULL, amqp_messenger_retrieve_options shall fail and return NULL]
AzureIoTClient 39:e98d5df6dc74 1630 if (messenger_handle == NULL)
AzureIoTClient 39:e98d5df6dc74 1631 {
AzureIoTClient 39:e98d5df6dc74 1632 LogError("Invalid argument (messenger_handle is NULL)");
AzureIoTClient 39:e98d5df6dc74 1633 result = NULL;
AzureIoTClient 39:e98d5df6dc74 1634 }
AzureIoTClient 39:e98d5df6dc74 1635 else
AzureIoTClient 39:e98d5df6dc74 1636 {
AzureIoTClient 39:e98d5df6dc74 1637 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_133: [An OPTIONHANDLER_HANDLE instance shall be created using OptionHandler_Create]
AzureIoTClient 39:e98d5df6dc74 1638 result = OptionHandler_Create(amqp_messenger_clone_option, amqp_messenger_destroy_option, (pfSetOption)amqp_messenger_set_option);
AzureIoTClient 39:e98d5df6dc74 1639
AzureIoTClient 39:e98d5df6dc74 1640 if (result == NULL)
AzureIoTClient 39:e98d5df6dc74 1641 {
AzureIoTClient 39:e98d5df6dc74 1642 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_134: [If an OPTIONHANDLER_HANDLE instance fails to be created, amqp_messenger_retrieve_options shall fail and return NULL]
AzureIoTClient 39:e98d5df6dc74 1643 LogError("Failed to retrieve options from messenger instance (OptionHandler_Create failed)");
AzureIoTClient 39:e98d5df6dc74 1644 }
AzureIoTClient 39:e98d5df6dc74 1645 else
AzureIoTClient 39:e98d5df6dc74 1646 {
AzureIoTClient 39:e98d5df6dc74 1647 AMQP_MESSENGER_INSTANCE* instance = (AMQP_MESSENGER_INSTANCE*)messenger_handle;
AzureIoTClient 39:e98d5df6dc74 1648 OPTIONHANDLER_HANDLE mq_options;
AzureIoTClient 39:e98d5df6dc74 1649
AzureIoTClient 39:e98d5df6dc74 1650 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_135: [`instance->send_queue` options shall be retrieved using message_queue_retrieve_options()]
AzureIoTClient 39:e98d5df6dc74 1651 if ((mq_options = message_queue_retrieve_options(instance->send_queue)) == NULL)
AzureIoTClient 39:e98d5df6dc74 1652 {
AzureIoTClient 39:e98d5df6dc74 1653 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_136: [If message_queue_retrieve_options() fails, amqp_messenger_retrieve_options shall fail and return NULL]
AzureIoTClient 39:e98d5df6dc74 1654 LogError("failed to retrieve options from send queue)");
AzureIoTClient 39:e98d5df6dc74 1655 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_139: [If amqp_messenger_retrieve_options fails, any allocated memory shall be freed]
AzureIoTClient 39:e98d5df6dc74 1656 OptionHandler_Destroy(result);
AzureIoTClient 39:e98d5df6dc74 1657 result = NULL;
AzureIoTClient 39:e98d5df6dc74 1658 }
AzureIoTClient 39:e98d5df6dc74 1659 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_137: [Each option of `instance` shall be added to the OPTIONHANDLER_HANDLE instance using OptionHandler_AddOption]
AzureIoTClient 39:e98d5df6dc74 1660 else if (OptionHandler_AddOption(result, MESSENGER_SAVED_MQ_OPTIONS, (void*)mq_options) != OPTIONHANDLER_OK)
AzureIoTClient 39:e98d5df6dc74 1661 {
AzureIoTClient 39:e98d5df6dc74 1662 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_138: [If OptionHandler_AddOption fails, amqp_messenger_retrieve_options shall fail and return NULL]
AzureIoTClient 39:e98d5df6dc74 1663 LogError("failed adding option '%s'", MESSENGER_SAVED_MQ_OPTIONS);
AzureIoTClient 39:e98d5df6dc74 1664 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_139: [If amqp_messenger_retrieve_options fails, any allocated memory shall be freed]
AzureIoTClient 39:e98d5df6dc74 1665 OptionHandler_Destroy(mq_options);
AzureIoTClient 39:e98d5df6dc74 1666 OptionHandler_Destroy(result);
AzureIoTClient 39:e98d5df6dc74 1667 result = NULL;
AzureIoTClient 39:e98d5df6dc74 1668 }
AzureIoTClient 39:e98d5df6dc74 1669 }
AzureIoTClient 39:e98d5df6dc74 1670 }
AzureIoTClient 39:e98d5df6dc74 1671
AzureIoTClient 39:e98d5df6dc74 1672 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_140: [If no failures occur, amqp_messenger_retrieve_options shall return the OPTIONHANDLER_HANDLE instance]
AzureIoTClient 39:e98d5df6dc74 1673 return result;
AzureIoTClient 39:e98d5df6dc74 1674 }
AzureIoTClient 39:e98d5df6dc74 1675
AzureIoTClient 39:e98d5df6dc74 1676 void amqp_messenger_destroy_disposition_info(AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO* disposition_info)
AzureIoTClient 39:e98d5df6dc74 1677 {
AzureIoTClient 39:e98d5df6dc74 1678 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_132: [If `disposition_info` is NULL, amqp_messenger_destroy_disposition_info shall return immediately]
AzureIoTClient 39:e98d5df6dc74 1679 if (disposition_info == NULL)
AzureIoTClient 39:e98d5df6dc74 1680 {
AzureIoTClient 39:e98d5df6dc74 1681 LogError("Invalid argument (disposition_info is NULL)");
AzureIoTClient 39:e98d5df6dc74 1682 }
AzureIoTClient 39:e98d5df6dc74 1683 else
AzureIoTClient 39:e98d5df6dc74 1684 {
AzureIoTClient 39:e98d5df6dc74 1685 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_132: [All memory allocated for `disposition_info` shall be released]
AzureIoTClient 39:e98d5df6dc74 1686 destroy_message_disposition_info(disposition_info);
AzureIoTClient 39:e98d5df6dc74 1687 }
AzureIoTClient 39:e98d5df6dc74 1688 }