Xin Zhang / azure-iot-c-sdk-f767zi

Dependents:   samplemqtt

Committer:
XinZhangMS
Date:
Thu Aug 23 06:52:14 2018 +0000
Revision:
0:f7f1f0d76dd6
azure-c-sdk for mbed os supporting NUCLEO_F767ZI

Who changed what in which revision?

UserRevisionLine numberNew contents of line
XinZhangMS 0:f7f1f0d76dd6 1 // Copyright (c) Microsoft. All rights reserved.
XinZhangMS 0:f7f1f0d76dd6 2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
XinZhangMS 0:f7f1f0d76dd6 3
XinZhangMS 0:f7f1f0d76dd6 4 #include <stdlib.h>
XinZhangMS 0:f7f1f0d76dd6 5 #include <stdbool.h>
XinZhangMS 0:f7f1f0d76dd6 6 #include "azure_c_shared_utility/optimize_size.h"
XinZhangMS 0:f7f1f0d76dd6 7 #include "azure_c_shared_utility/crt_abstractions.h"
XinZhangMS 0:f7f1f0d76dd6 8 #include "azure_c_shared_utility/gballoc.h"
XinZhangMS 0:f7f1f0d76dd6 9 #include "azure_c_shared_utility/agenttime.h"
XinZhangMS 0:f7f1f0d76dd6 10 #include "azure_c_shared_utility/xlogging.h"
XinZhangMS 0:f7f1f0d76dd6 11 #include "azure_c_shared_utility/uniqueid.h"
XinZhangMS 0:f7f1f0d76dd6 12 #include "azure_uamqp_c/link.h"
XinZhangMS 0:f7f1f0d76dd6 13 #include "azure_uamqp_c/messaging.h"
XinZhangMS 0:f7f1f0d76dd6 14 #include "azure_uamqp_c/message_sender.h"
XinZhangMS 0:f7f1f0d76dd6 15 #include "azure_uamqp_c/message_receiver.h"
XinZhangMS 0:f7f1f0d76dd6 16 #include "internal/message_queue.h"
XinZhangMS 0:f7f1f0d76dd6 17 #include "internal/iothub_client_retry_control.h"
XinZhangMS 0:f7f1f0d76dd6 18 #include "internal/iothubtransport_amqp_messenger.h"
XinZhangMS 0:f7f1f0d76dd6 19
XinZhangMS 0:f7f1f0d76dd6 20 DEFINE_ENUM_STRINGS(AMQP_MESSENGER_SEND_STATUS, AMQP_MESSENGER_SEND_STATUS_VALUES);
XinZhangMS 0:f7f1f0d76dd6 21 DEFINE_ENUM_STRINGS(AMQP_MESSENGER_SEND_RESULT, AMQP_MESSENGER_SEND_RESULT_VALUES);
XinZhangMS 0:f7f1f0d76dd6 22 DEFINE_ENUM_STRINGS(AMQP_MESSENGER_REASON, AMQP_MESSENGER_REASON_VALUES);
XinZhangMS 0:f7f1f0d76dd6 23 DEFINE_ENUM_STRINGS(AMQP_MESSENGER_DISPOSITION_RESULT, AMQP_MESSENGER_DISPOSITION_RESULT_VALUES);
XinZhangMS 0:f7f1f0d76dd6 24 DEFINE_ENUM_STRINGS(AMQP_MESSENGER_STATE, AMQP_MESSENGER_STATE_VALUES);
XinZhangMS 0:f7f1f0d76dd6 25
XinZhangMS 0:f7f1f0d76dd6 26
XinZhangMS 0:f7f1f0d76dd6 27 #define RESULT_OK 0
XinZhangMS 0:f7f1f0d76dd6 28 #define INDEFINITE_TIME ((time_t)(-1))
XinZhangMS 0:f7f1f0d76dd6 29
XinZhangMS 0:f7f1f0d76dd6 30 // AMQP Link address format: "amqps://<iot hub fqdn>/devices/<device-id>/<suffix>"
XinZhangMS 0:f7f1f0d76dd6 31 #define LINK_ADDRESS_FORMAT "amqps://%s/devices/%s/%s"
XinZhangMS 0:f7f1f0d76dd6 32 #define LINK_ADDRESS_MODULE_FORMAT "amqps://%s/devices/%s/modules/%s/%s"
XinZhangMS 0:f7f1f0d76dd6 33 #define SEND_LINK_NAME_PREFIX "link-snd"
XinZhangMS 0:f7f1f0d76dd6 34 #define MESSAGE_SENDER_MAX_LINK_SIZE UINT64_MAX
XinZhangMS 0:f7f1f0d76dd6 35 #define RECEIVE_LINK_NAME_PREFIX "link-rcv"
XinZhangMS 0:f7f1f0d76dd6 36 #define MESSAGE_RECEIVER_MAX_LINK_SIZE 65536
XinZhangMS 0:f7f1f0d76dd6 37 #define DEFAULT_EVENT_SEND_RETRY_LIMIT 0
XinZhangMS 0:f7f1f0d76dd6 38 #define DEFAULT_EVENT_SEND_TIMEOUT_SECS 600
XinZhangMS 0:f7f1f0d76dd6 39 #define DEFAULT_MAX_SEND_ERROR_COUNT 10
XinZhangMS 0:f7f1f0d76dd6 40 #define MAX_MESSAGE_SENDER_STATE_CHANGE_TIMEOUT_SECS 300
XinZhangMS 0:f7f1f0d76dd6 41 #define MAX_MESSAGE_RECEIVER_STATE_CHANGE_TIMEOUT_SECS 300
XinZhangMS 0:f7f1f0d76dd6 42 #define UNIQUE_ID_BUFFER_SIZE 37
XinZhangMS 0:f7f1f0d76dd6 43
XinZhangMS 0:f7f1f0d76dd6 44 static const char* MESSENGER_SAVED_MQ_OPTIONS = "amqp_message_queue_options";
XinZhangMS 0:f7f1f0d76dd6 45
XinZhangMS 0:f7f1f0d76dd6 46 typedef struct AMQP_MESSENGER_INSTANCE_TAG
XinZhangMS 0:f7f1f0d76dd6 47 {
XinZhangMS 0:f7f1f0d76dd6 48 AMQP_MESSENGER_CONFIG* config;
XinZhangMS 0:f7f1f0d76dd6 49
XinZhangMS 0:f7f1f0d76dd6 50 bool receive_messages;
XinZhangMS 0:f7f1f0d76dd6 51 ON_AMQP_MESSENGER_MESSAGE_RECEIVED on_message_received_callback;
XinZhangMS 0:f7f1f0d76dd6 52 void* on_message_received_context;
XinZhangMS 0:f7f1f0d76dd6 53
XinZhangMS 0:f7f1f0d76dd6 54 MESSAGE_QUEUE_HANDLE send_queue;
XinZhangMS 0:f7f1f0d76dd6 55 AMQP_MESSENGER_STATE state;
XinZhangMS 0:f7f1f0d76dd6 56
XinZhangMS 0:f7f1f0d76dd6 57 SESSION_HANDLE session_handle;
XinZhangMS 0:f7f1f0d76dd6 58
XinZhangMS 0:f7f1f0d76dd6 59 LINK_HANDLE sender_link;
XinZhangMS 0:f7f1f0d76dd6 60 MESSAGE_SENDER_HANDLE message_sender;
XinZhangMS 0:f7f1f0d76dd6 61 MESSAGE_SENDER_STATE message_sender_current_state;
XinZhangMS 0:f7f1f0d76dd6 62 MESSAGE_SENDER_STATE message_sender_previous_state;
XinZhangMS 0:f7f1f0d76dd6 63
XinZhangMS 0:f7f1f0d76dd6 64 LINK_HANDLE receiver_link;
XinZhangMS 0:f7f1f0d76dd6 65 MESSAGE_RECEIVER_HANDLE message_receiver;
XinZhangMS 0:f7f1f0d76dd6 66 MESSAGE_RECEIVER_STATE message_receiver_current_state;
XinZhangMS 0:f7f1f0d76dd6 67 MESSAGE_RECEIVER_STATE message_receiver_previous_state;
XinZhangMS 0:f7f1f0d76dd6 68
XinZhangMS 0:f7f1f0d76dd6 69 size_t send_error_count;
XinZhangMS 0:f7f1f0d76dd6 70 size_t max_send_error_count;
XinZhangMS 0:f7f1f0d76dd6 71
XinZhangMS 0:f7f1f0d76dd6 72 time_t last_message_sender_state_change_time;
XinZhangMS 0:f7f1f0d76dd6 73 time_t last_message_receiver_state_change_time;
XinZhangMS 0:f7f1f0d76dd6 74 } AMQP_MESSENGER_INSTANCE;
XinZhangMS 0:f7f1f0d76dd6 75
XinZhangMS 0:f7f1f0d76dd6 76 typedef struct MESSAGE_SEND_CONTEXT_TAG
XinZhangMS 0:f7f1f0d76dd6 77 {
XinZhangMS 0:f7f1f0d76dd6 78 MESSAGE_HANDLE message;
XinZhangMS 0:f7f1f0d76dd6 79 bool is_destroyed;
XinZhangMS 0:f7f1f0d76dd6 80
XinZhangMS 0:f7f1f0d76dd6 81 AMQP_MESSENGER_INSTANCE* messenger;
XinZhangMS 0:f7f1f0d76dd6 82
XinZhangMS 0:f7f1f0d76dd6 83 AMQP_MESSENGER_SEND_COMPLETE_CALLBACK on_send_complete_callback;
XinZhangMS 0:f7f1f0d76dd6 84 void* user_context;
XinZhangMS 0:f7f1f0d76dd6 85
XinZhangMS 0:f7f1f0d76dd6 86 PROCESS_MESSAGE_COMPLETED_CALLBACK on_process_message_completed_callback;
XinZhangMS 0:f7f1f0d76dd6 87 } MESSAGE_SEND_CONTEXT;
XinZhangMS 0:f7f1f0d76dd6 88
XinZhangMS 0:f7f1f0d76dd6 89
XinZhangMS 0:f7f1f0d76dd6 90
XinZhangMS 0:f7f1f0d76dd6 91 static MESSAGE_SEND_CONTEXT* create_message_send_context()
XinZhangMS 0:f7f1f0d76dd6 92 {
XinZhangMS 0:f7f1f0d76dd6 93 MESSAGE_SEND_CONTEXT* result;
XinZhangMS 0:f7f1f0d76dd6 94
XinZhangMS 0:f7f1f0d76dd6 95 if ((result = (MESSAGE_SEND_CONTEXT*)malloc(sizeof(MESSAGE_SEND_CONTEXT))) == NULL)
XinZhangMS 0:f7f1f0d76dd6 96 {
XinZhangMS 0:f7f1f0d76dd6 97 LogError("Failed creating the message send context");
XinZhangMS 0:f7f1f0d76dd6 98 }
XinZhangMS 0:f7f1f0d76dd6 99 else
XinZhangMS 0:f7f1f0d76dd6 100 {
XinZhangMS 0:f7f1f0d76dd6 101 memset(result, 0, sizeof(MESSAGE_SEND_CONTEXT));
XinZhangMS 0:f7f1f0d76dd6 102 }
XinZhangMS 0:f7f1f0d76dd6 103
XinZhangMS 0:f7f1f0d76dd6 104 return result;
XinZhangMS 0:f7f1f0d76dd6 105 }
XinZhangMS 0:f7f1f0d76dd6 106
XinZhangMS 0:f7f1f0d76dd6 107 static bool is_valid_configuration(const AMQP_MESSENGER_CONFIG* config)
XinZhangMS 0:f7f1f0d76dd6 108 {
XinZhangMS 0:f7f1f0d76dd6 109 bool result;
XinZhangMS 0:f7f1f0d76dd6 110
XinZhangMS 0:f7f1f0d76dd6 111 if (config == NULL)
XinZhangMS 0:f7f1f0d76dd6 112 {
XinZhangMS 0:f7f1f0d76dd6 113 LogError("Invalid configuration (NULL)");
XinZhangMS 0:f7f1f0d76dd6 114 result = false;
XinZhangMS 0:f7f1f0d76dd6 115 }
XinZhangMS 0:f7f1f0d76dd6 116 else if (config->client_version == NULL ||
XinZhangMS 0:f7f1f0d76dd6 117 config->device_id == NULL ||
XinZhangMS 0:f7f1f0d76dd6 118 config->iothub_host_fqdn == NULL ||
XinZhangMS 0:f7f1f0d76dd6 119 config->receive_link.source_suffix == NULL ||
XinZhangMS 0:f7f1f0d76dd6 120 config->send_link.target_suffix == NULL)
XinZhangMS 0:f7f1f0d76dd6 121 {
XinZhangMS 0:f7f1f0d76dd6 122 LogError("Invalid configuration (client_version=%p, device_id=%p, iothub_host_fqdn=%p, receive_link (source_suffix=%p), send_link (target_suffix=%p))",
XinZhangMS 0:f7f1f0d76dd6 123 config->client_version, config->device_id, config->iothub_host_fqdn,
XinZhangMS 0:f7f1f0d76dd6 124 config->receive_link.source_suffix, config->send_link.target_suffix);
XinZhangMS 0:f7f1f0d76dd6 125 result = false;
XinZhangMS 0:f7f1f0d76dd6 126 }
XinZhangMS 0:f7f1f0d76dd6 127 else
XinZhangMS 0:f7f1f0d76dd6 128 {
XinZhangMS 0:f7f1f0d76dd6 129 result = true;
XinZhangMS 0:f7f1f0d76dd6 130 }
XinZhangMS 0:f7f1f0d76dd6 131
XinZhangMS 0:f7f1f0d76dd6 132 return result;
XinZhangMS 0:f7f1f0d76dd6 133 }
XinZhangMS 0:f7f1f0d76dd6 134
XinZhangMS 0:f7f1f0d76dd6 135 static void destroy_link_configuration(AMQP_MESSENGER_LINK_CONFIG* link_config)
XinZhangMS 0:f7f1f0d76dd6 136 {
XinZhangMS 0:f7f1f0d76dd6 137 if (link_config->target_suffix != NULL)
XinZhangMS 0:f7f1f0d76dd6 138 {
XinZhangMS 0:f7f1f0d76dd6 139 free((void*)link_config->target_suffix);
XinZhangMS 0:f7f1f0d76dd6 140 link_config->target_suffix = NULL;
XinZhangMS 0:f7f1f0d76dd6 141 }
XinZhangMS 0:f7f1f0d76dd6 142
XinZhangMS 0:f7f1f0d76dd6 143 if (link_config->source_suffix != NULL)
XinZhangMS 0:f7f1f0d76dd6 144 {
XinZhangMS 0:f7f1f0d76dd6 145 free((void*)link_config->source_suffix);
XinZhangMS 0:f7f1f0d76dd6 146 link_config->source_suffix = NULL;
XinZhangMS 0:f7f1f0d76dd6 147 }
XinZhangMS 0:f7f1f0d76dd6 148
XinZhangMS 0:f7f1f0d76dd6 149 if (link_config->attach_properties != NULL)
XinZhangMS 0:f7f1f0d76dd6 150 {
XinZhangMS 0:f7f1f0d76dd6 151 Map_Destroy(link_config->attach_properties);
XinZhangMS 0:f7f1f0d76dd6 152 link_config->attach_properties = NULL;
XinZhangMS 0:f7f1f0d76dd6 153 }
XinZhangMS 0:f7f1f0d76dd6 154 }
XinZhangMS 0:f7f1f0d76dd6 155
XinZhangMS 0:f7f1f0d76dd6 156 static void destroy_configuration(AMQP_MESSENGER_CONFIG* config)
XinZhangMS 0:f7f1f0d76dd6 157 {
XinZhangMS 0:f7f1f0d76dd6 158 if (config != NULL)
XinZhangMS 0:f7f1f0d76dd6 159 {
XinZhangMS 0:f7f1f0d76dd6 160 if (config->client_version != NULL)
XinZhangMS 0:f7f1f0d76dd6 161 {
XinZhangMS 0:f7f1f0d76dd6 162 free((void*)config->client_version);
XinZhangMS 0:f7f1f0d76dd6 163 }
XinZhangMS 0:f7f1f0d76dd6 164
XinZhangMS 0:f7f1f0d76dd6 165 if (config->device_id != NULL)
XinZhangMS 0:f7f1f0d76dd6 166 {
XinZhangMS 0:f7f1f0d76dd6 167 free((void*)config->device_id);
XinZhangMS 0:f7f1f0d76dd6 168 }
XinZhangMS 0:f7f1f0d76dd6 169
XinZhangMS 0:f7f1f0d76dd6 170 if (config->module_id != NULL)
XinZhangMS 0:f7f1f0d76dd6 171 {
XinZhangMS 0:f7f1f0d76dd6 172 free((void*)config->module_id);
XinZhangMS 0:f7f1f0d76dd6 173 }
XinZhangMS 0:f7f1f0d76dd6 174
XinZhangMS 0:f7f1f0d76dd6 175 if (config->iothub_host_fqdn != NULL)
XinZhangMS 0:f7f1f0d76dd6 176 {
XinZhangMS 0:f7f1f0d76dd6 177 free((void*)config->iothub_host_fqdn);
XinZhangMS 0:f7f1f0d76dd6 178 }
XinZhangMS 0:f7f1f0d76dd6 179
XinZhangMS 0:f7f1f0d76dd6 180 destroy_link_configuration(&config->send_link);
XinZhangMS 0:f7f1f0d76dd6 181 destroy_link_configuration(&config->receive_link);
XinZhangMS 0:f7f1f0d76dd6 182
XinZhangMS 0:f7f1f0d76dd6 183 free(config);
XinZhangMS 0:f7f1f0d76dd6 184 }
XinZhangMS 0:f7f1f0d76dd6 185 }
XinZhangMS 0:f7f1f0d76dd6 186
XinZhangMS 0:f7f1f0d76dd6 187 static int clone_link_configuration(role link_role, AMQP_MESSENGER_LINK_CONFIG* dst_config, const AMQP_MESSENGER_LINK_CONFIG* src_config)
XinZhangMS 0:f7f1f0d76dd6 188 {
XinZhangMS 0:f7f1f0d76dd6 189 int result;
XinZhangMS 0:f7f1f0d76dd6 190
XinZhangMS 0:f7f1f0d76dd6 191 if (link_role == role_sender &&
XinZhangMS 0:f7f1f0d76dd6 192 mallocAndStrcpy_s(&dst_config->target_suffix, src_config->target_suffix) != 0)
XinZhangMS 0:f7f1f0d76dd6 193 {
XinZhangMS 0:f7f1f0d76dd6 194 LogError("Failed copying send_link_target_suffix");
XinZhangMS 0:f7f1f0d76dd6 195 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 196 }
XinZhangMS 0:f7f1f0d76dd6 197 else if (link_role == role_receiver &&
XinZhangMS 0:f7f1f0d76dd6 198 mallocAndStrcpy_s(&dst_config->source_suffix, src_config->source_suffix) != 0)
XinZhangMS 0:f7f1f0d76dd6 199 {
XinZhangMS 0:f7f1f0d76dd6 200 LogError("Failed copying receive_link_source_suffix");
XinZhangMS 0:f7f1f0d76dd6 201 destroy_link_configuration(dst_config);
XinZhangMS 0:f7f1f0d76dd6 202 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 203 }
XinZhangMS 0:f7f1f0d76dd6 204 else if (src_config->attach_properties != NULL &&
XinZhangMS 0:f7f1f0d76dd6 205 (dst_config->attach_properties = Map_Clone(src_config->attach_properties)) == NULL)
XinZhangMS 0:f7f1f0d76dd6 206 {
XinZhangMS 0:f7f1f0d76dd6 207 LogError("Failed copying link attach properties");
XinZhangMS 0:f7f1f0d76dd6 208 destroy_link_configuration(dst_config);
XinZhangMS 0:f7f1f0d76dd6 209 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 210 }
XinZhangMS 0:f7f1f0d76dd6 211 else
XinZhangMS 0:f7f1f0d76dd6 212 {
XinZhangMS 0:f7f1f0d76dd6 213 dst_config->snd_settle_mode = src_config->snd_settle_mode;
XinZhangMS 0:f7f1f0d76dd6 214 dst_config->rcv_settle_mode = src_config->rcv_settle_mode;
XinZhangMS 0:f7f1f0d76dd6 215
XinZhangMS 0:f7f1f0d76dd6 216 result = RESULT_OK;
XinZhangMS 0:f7f1f0d76dd6 217 }
XinZhangMS 0:f7f1f0d76dd6 218
XinZhangMS 0:f7f1f0d76dd6 219 return result;
XinZhangMS 0:f7f1f0d76dd6 220 }
XinZhangMS 0:f7f1f0d76dd6 221
XinZhangMS 0:f7f1f0d76dd6 222 static AMQP_MESSENGER_CONFIG* clone_configuration(const AMQP_MESSENGER_CONFIG* config)
XinZhangMS 0:f7f1f0d76dd6 223 {
XinZhangMS 0:f7f1f0d76dd6 224 AMQP_MESSENGER_CONFIG* result;
XinZhangMS 0:f7f1f0d76dd6 225
XinZhangMS 0:f7f1f0d76dd6 226 if ((result = (AMQP_MESSENGER_CONFIG*)malloc(sizeof(AMQP_MESSENGER_CONFIG))) == NULL)
XinZhangMS 0:f7f1f0d76dd6 227 {
XinZhangMS 0:f7f1f0d76dd6 228 LogError("Failed allocating AMQP_MESSENGER_CONFIG");
XinZhangMS 0:f7f1f0d76dd6 229 }
XinZhangMS 0:f7f1f0d76dd6 230 else
XinZhangMS 0:f7f1f0d76dd6 231 {
XinZhangMS 0:f7f1f0d76dd6 232 memset(result, 0, sizeof(AMQP_MESSENGER_CONFIG));
XinZhangMS 0:f7f1f0d76dd6 233
XinZhangMS 0:f7f1f0d76dd6 234 if (mallocAndStrcpy_s(&result->client_version, config->client_version) != 0)
XinZhangMS 0:f7f1f0d76dd6 235 {
XinZhangMS 0:f7f1f0d76dd6 236 LogError("Failed copying device_id");
XinZhangMS 0:f7f1f0d76dd6 237 destroy_configuration(result);
XinZhangMS 0:f7f1f0d76dd6 238 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 239 }
XinZhangMS 0:f7f1f0d76dd6 240 else if (mallocAndStrcpy_s(&result->device_id, config->device_id) != 0)
XinZhangMS 0:f7f1f0d76dd6 241 {
XinZhangMS 0:f7f1f0d76dd6 242 LogError("Failed copying device_id");
XinZhangMS 0:f7f1f0d76dd6 243 destroy_configuration(result);
XinZhangMS 0:f7f1f0d76dd6 244 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 245 }
XinZhangMS 0:f7f1f0d76dd6 246 else if ((config->module_id != NULL) && (mallocAndStrcpy_s(&result->module_id, config->module_id) != 0))
XinZhangMS 0:f7f1f0d76dd6 247 {
XinZhangMS 0:f7f1f0d76dd6 248 LogError("Failed copying module_id");
XinZhangMS 0:f7f1f0d76dd6 249 destroy_configuration(result);
XinZhangMS 0:f7f1f0d76dd6 250 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 251 }
XinZhangMS 0:f7f1f0d76dd6 252 else if (mallocAndStrcpy_s(&result->iothub_host_fqdn, config->iothub_host_fqdn) != 0)
XinZhangMS 0:f7f1f0d76dd6 253 {
XinZhangMS 0:f7f1f0d76dd6 254 LogError("Failed copying iothub_host_fqdn");
XinZhangMS 0:f7f1f0d76dd6 255 destroy_configuration(result);
XinZhangMS 0:f7f1f0d76dd6 256 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 257 }
XinZhangMS 0:f7f1f0d76dd6 258 else if (clone_link_configuration(role_sender, &result->send_link, &config->send_link) != RESULT_OK)
XinZhangMS 0:f7f1f0d76dd6 259 {
XinZhangMS 0:f7f1f0d76dd6 260 LogError("Failed copying send link configuration");
XinZhangMS 0:f7f1f0d76dd6 261 destroy_configuration(result);
XinZhangMS 0:f7f1f0d76dd6 262 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 263 }
XinZhangMS 0:f7f1f0d76dd6 264 else if (clone_link_configuration(role_receiver, &result->receive_link, &config->receive_link) != RESULT_OK)
XinZhangMS 0:f7f1f0d76dd6 265 {
XinZhangMS 0:f7f1f0d76dd6 266 LogError("Failed copying receive link configuration");
XinZhangMS 0:f7f1f0d76dd6 267 destroy_configuration(result);
XinZhangMS 0:f7f1f0d76dd6 268 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 269 }
XinZhangMS 0:f7f1f0d76dd6 270 else
XinZhangMS 0:f7f1f0d76dd6 271 {
XinZhangMS 0:f7f1f0d76dd6 272 result->on_state_changed_callback = config->on_state_changed_callback;
XinZhangMS 0:f7f1f0d76dd6 273 result->on_state_changed_context = config->on_state_changed_context;
XinZhangMS 0:f7f1f0d76dd6 274 result->on_subscription_changed_callback = config->on_subscription_changed_callback;
XinZhangMS 0:f7f1f0d76dd6 275 result->on_subscription_changed_context = config->on_subscription_changed_context;
XinZhangMS 0:f7f1f0d76dd6 276 }
XinZhangMS 0:f7f1f0d76dd6 277 }
XinZhangMS 0:f7f1f0d76dd6 278
XinZhangMS 0:f7f1f0d76dd6 279 return result;
XinZhangMS 0:f7f1f0d76dd6 280 }
XinZhangMS 0:f7f1f0d76dd6 281
XinZhangMS 0:f7f1f0d76dd6 282 static void destroy_message_send_context(MESSAGE_SEND_CONTEXT* context)
XinZhangMS 0:f7f1f0d76dd6 283 {
XinZhangMS 0:f7f1f0d76dd6 284 free(context);
XinZhangMS 0:f7f1f0d76dd6 285 }
XinZhangMS 0:f7f1f0d76dd6 286
XinZhangMS 0:f7f1f0d76dd6 287 static STRING_HANDLE create_link_address(const char* host_fqdn, const char* device_id, const char* module_id, const char* address_suffix)
XinZhangMS 0:f7f1f0d76dd6 288 {
XinZhangMS 0:f7f1f0d76dd6 289 STRING_HANDLE link_address;
XinZhangMS 0:f7f1f0d76dd6 290
XinZhangMS 0:f7f1f0d76dd6 291 if ((link_address = STRING_new()) == NULL)
XinZhangMS 0:f7f1f0d76dd6 292 {
XinZhangMS 0:f7f1f0d76dd6 293 LogError("failed creating link_address (STRING_new failed)");
XinZhangMS 0:f7f1f0d76dd6 294 }
XinZhangMS 0:f7f1f0d76dd6 295 else
XinZhangMS 0:f7f1f0d76dd6 296 {
XinZhangMS 0:f7f1f0d76dd6 297 if (module_id != NULL)
XinZhangMS 0:f7f1f0d76dd6 298 {
XinZhangMS 0:f7f1f0d76dd6 299 if (STRING_sprintf(link_address, LINK_ADDRESS_MODULE_FORMAT, host_fqdn, device_id, module_id, address_suffix) != RESULT_OK)
XinZhangMS 0:f7f1f0d76dd6 300 {
XinZhangMS 0:f7f1f0d76dd6 301 LogError("Failed creating the link_address for a module (STRING_sprintf failed)");
XinZhangMS 0:f7f1f0d76dd6 302 STRING_delete(link_address);
XinZhangMS 0:f7f1f0d76dd6 303 link_address = NULL;
XinZhangMS 0:f7f1f0d76dd6 304 }
XinZhangMS 0:f7f1f0d76dd6 305 }
XinZhangMS 0:f7f1f0d76dd6 306 else
XinZhangMS 0:f7f1f0d76dd6 307 {
XinZhangMS 0:f7f1f0d76dd6 308 if (STRING_sprintf(link_address, LINK_ADDRESS_FORMAT, host_fqdn, device_id, address_suffix) != RESULT_OK)
XinZhangMS 0:f7f1f0d76dd6 309 {
XinZhangMS 0:f7f1f0d76dd6 310 LogError("Failed creating the link_address (STRING_sprintf failed)");
XinZhangMS 0:f7f1f0d76dd6 311 STRING_delete(link_address);
XinZhangMS 0:f7f1f0d76dd6 312 link_address = NULL;
XinZhangMS 0:f7f1f0d76dd6 313 }
XinZhangMS 0:f7f1f0d76dd6 314 }
XinZhangMS 0:f7f1f0d76dd6 315 }
XinZhangMS 0:f7f1f0d76dd6 316 return link_address;
XinZhangMS 0:f7f1f0d76dd6 317 }
XinZhangMS 0:f7f1f0d76dd6 318
XinZhangMS 0:f7f1f0d76dd6 319 static STRING_HANDLE create_link_terminus_name(STRING_HANDLE link_name, const char* suffix)
XinZhangMS 0:f7f1f0d76dd6 320 {
XinZhangMS 0:f7f1f0d76dd6 321 STRING_HANDLE terminus_name;
XinZhangMS 0:f7f1f0d76dd6 322
XinZhangMS 0:f7f1f0d76dd6 323 if ((terminus_name = STRING_new()) == NULL)
XinZhangMS 0:f7f1f0d76dd6 324 {
XinZhangMS 0:f7f1f0d76dd6 325 LogError("Failed creating the terminus name (STRING_new failed; %s)", suffix);
XinZhangMS 0:f7f1f0d76dd6 326 }
XinZhangMS 0:f7f1f0d76dd6 327 else
XinZhangMS 0:f7f1f0d76dd6 328 {
XinZhangMS 0:f7f1f0d76dd6 329 const char* link_name_char_ptr = STRING_c_str(link_name);
XinZhangMS 0:f7f1f0d76dd6 330
XinZhangMS 0:f7f1f0d76dd6 331 if (STRING_sprintf(terminus_name, "%s-%s", link_name_char_ptr, suffix) != RESULT_OK)
XinZhangMS 0:f7f1f0d76dd6 332 {
XinZhangMS 0:f7f1f0d76dd6 333 STRING_delete(terminus_name);
XinZhangMS 0:f7f1f0d76dd6 334 terminus_name = NULL;
XinZhangMS 0:f7f1f0d76dd6 335 LogError("Failed creating the terminus name (STRING_sprintf failed; %s)", suffix);
XinZhangMS 0:f7f1f0d76dd6 336 }
XinZhangMS 0:f7f1f0d76dd6 337 }
XinZhangMS 0:f7f1f0d76dd6 338
XinZhangMS 0:f7f1f0d76dd6 339 return terminus_name;
XinZhangMS 0:f7f1f0d76dd6 340 }
XinZhangMS 0:f7f1f0d76dd6 341
XinZhangMS 0:f7f1f0d76dd6 342 static STRING_HANDLE create_link_name(role link_role, const char* device_id)
XinZhangMS 0:f7f1f0d76dd6 343 {
XinZhangMS 0:f7f1f0d76dd6 344 char* unique_id;
XinZhangMS 0:f7f1f0d76dd6 345 STRING_HANDLE result;
XinZhangMS 0:f7f1f0d76dd6 346
XinZhangMS 0:f7f1f0d76dd6 347 if ((unique_id = (char*)malloc(sizeof(char) * UNIQUE_ID_BUFFER_SIZE + 1)) == NULL)
XinZhangMS 0:f7f1f0d76dd6 348 {
XinZhangMS 0:f7f1f0d76dd6 349 LogError("Failed generating an unique tag (malloc failed)");
XinZhangMS 0:f7f1f0d76dd6 350 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 351 }
XinZhangMS 0:f7f1f0d76dd6 352 else
XinZhangMS 0:f7f1f0d76dd6 353 {
XinZhangMS 0:f7f1f0d76dd6 354 memset(unique_id, 0, sizeof(char) * UNIQUE_ID_BUFFER_SIZE + 1);
XinZhangMS 0:f7f1f0d76dd6 355
XinZhangMS 0:f7f1f0d76dd6 356 if (UniqueId_Generate(unique_id, UNIQUE_ID_BUFFER_SIZE) != UNIQUEID_OK)
XinZhangMS 0:f7f1f0d76dd6 357 {
XinZhangMS 0:f7f1f0d76dd6 358 LogError("Failed generating an unique tag (UniqueId_Generate failed)");
XinZhangMS 0:f7f1f0d76dd6 359 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 360 }
XinZhangMS 0:f7f1f0d76dd6 361 else if ((result = STRING_new()) == NULL)
XinZhangMS 0:f7f1f0d76dd6 362 {
XinZhangMS 0:f7f1f0d76dd6 363 LogError("Failed generating an unique tag (STRING_new failed)");
XinZhangMS 0:f7f1f0d76dd6 364 }
XinZhangMS 0:f7f1f0d76dd6 365 else if (STRING_sprintf(result, "%s-%s-%s", (link_role == role_sender ? SEND_LINK_NAME_PREFIX : RECEIVE_LINK_NAME_PREFIX), device_id, unique_id) != 0)
XinZhangMS 0:f7f1f0d76dd6 366 {
XinZhangMS 0:f7f1f0d76dd6 367 LogError("Failed generating an unique tag (STRING_sprintf failed)");
XinZhangMS 0:f7f1f0d76dd6 368 STRING_delete(result);
XinZhangMS 0:f7f1f0d76dd6 369 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 370 }
XinZhangMS 0:f7f1f0d76dd6 371
XinZhangMS 0:f7f1f0d76dd6 372 free(unique_id);
XinZhangMS 0:f7f1f0d76dd6 373 }
XinZhangMS 0:f7f1f0d76dd6 374
XinZhangMS 0:f7f1f0d76dd6 375 return result;
XinZhangMS 0:f7f1f0d76dd6 376 }
XinZhangMS 0:f7f1f0d76dd6 377
XinZhangMS 0:f7f1f0d76dd6 378 static void update_messenger_state(AMQP_MESSENGER_INSTANCE* instance, AMQP_MESSENGER_STATE new_state)
XinZhangMS 0:f7f1f0d76dd6 379 {
XinZhangMS 0:f7f1f0d76dd6 380 if (new_state != instance->state)
XinZhangMS 0:f7f1f0d76dd6 381 {
XinZhangMS 0:f7f1f0d76dd6 382 AMQP_MESSENGER_STATE previous_state = instance->state;
XinZhangMS 0:f7f1f0d76dd6 383 instance->state = new_state;
XinZhangMS 0:f7f1f0d76dd6 384
XinZhangMS 0:f7f1f0d76dd6 385 if (instance->config != NULL && instance->config->on_state_changed_callback != NULL)
XinZhangMS 0:f7f1f0d76dd6 386 {
XinZhangMS 0:f7f1f0d76dd6 387 instance->config->on_state_changed_callback(instance->config->on_state_changed_context, previous_state, new_state);
XinZhangMS 0:f7f1f0d76dd6 388 }
XinZhangMS 0:f7f1f0d76dd6 389 }
XinZhangMS 0:f7f1f0d76dd6 390 }
XinZhangMS 0:f7f1f0d76dd6 391
XinZhangMS 0:f7f1f0d76dd6 392 static int add_link_attach_properties(LINK_HANDLE link, MAP_HANDLE user_defined_properties)
XinZhangMS 0:f7f1f0d76dd6 393 {
XinZhangMS 0:f7f1f0d76dd6 394 int result;
XinZhangMS 0:f7f1f0d76dd6 395 fields attach_properties;
XinZhangMS 0:f7f1f0d76dd6 396
XinZhangMS 0:f7f1f0d76dd6 397 if ((attach_properties = amqpvalue_create_map()) == NULL)
XinZhangMS 0:f7f1f0d76dd6 398 {
XinZhangMS 0:f7f1f0d76dd6 399 LogError("Failed to create the map for attach properties.");
XinZhangMS 0:f7f1f0d76dd6 400 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 401 }
XinZhangMS 0:f7f1f0d76dd6 402 else
XinZhangMS 0:f7f1f0d76dd6 403 {
XinZhangMS 0:f7f1f0d76dd6 404 const char* const* keys;
XinZhangMS 0:f7f1f0d76dd6 405 const char* const* values;
XinZhangMS 0:f7f1f0d76dd6 406 size_t count;
XinZhangMS 0:f7f1f0d76dd6 407
XinZhangMS 0:f7f1f0d76dd6 408 if (Map_GetInternals(user_defined_properties, &keys, &values, &count) != MAP_OK)
XinZhangMS 0:f7f1f0d76dd6 409 {
XinZhangMS 0:f7f1f0d76dd6 410 LogError("failed getting user defined properties details.");
XinZhangMS 0:f7f1f0d76dd6 411 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 412 }
XinZhangMS 0:f7f1f0d76dd6 413 else
XinZhangMS 0:f7f1f0d76dd6 414 {
XinZhangMS 0:f7f1f0d76dd6 415 size_t i;
XinZhangMS 0:f7f1f0d76dd6 416 result = RESULT_OK;
XinZhangMS 0:f7f1f0d76dd6 417
XinZhangMS 0:f7f1f0d76dd6 418 for (i = 0; i < count && result == RESULT_OK; i++)
XinZhangMS 0:f7f1f0d76dd6 419 {
XinZhangMS 0:f7f1f0d76dd6 420 AMQP_VALUE key;
XinZhangMS 0:f7f1f0d76dd6 421 AMQP_VALUE value;
XinZhangMS 0:f7f1f0d76dd6 422
XinZhangMS 0:f7f1f0d76dd6 423 if ((key = amqpvalue_create_symbol(keys[i])) == NULL)
XinZhangMS 0:f7f1f0d76dd6 424 {
XinZhangMS 0:f7f1f0d76dd6 425 LogError("Failed creating AMQP_VALUE For key %s.", keys[i]);
XinZhangMS 0:f7f1f0d76dd6 426 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 427 }
XinZhangMS 0:f7f1f0d76dd6 428 else
XinZhangMS 0:f7f1f0d76dd6 429 {
XinZhangMS 0:f7f1f0d76dd6 430 if ((value = amqpvalue_create_string(values[i])) == NULL)
XinZhangMS 0:f7f1f0d76dd6 431 {
XinZhangMS 0:f7f1f0d76dd6 432 LogError("Failed creating AMQP_VALUE For key %s value", keys[i]);
XinZhangMS 0:f7f1f0d76dd6 433 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 434 }
XinZhangMS 0:f7f1f0d76dd6 435 else
XinZhangMS 0:f7f1f0d76dd6 436 {
XinZhangMS 0:f7f1f0d76dd6 437 if (amqpvalue_set_map_value(attach_properties, key, value) != 0)
XinZhangMS 0:f7f1f0d76dd6 438 {
XinZhangMS 0:f7f1f0d76dd6 439 LogError("Failed adding property %s to map", keys[i]);
XinZhangMS 0:f7f1f0d76dd6 440 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 441 }
XinZhangMS 0:f7f1f0d76dd6 442
XinZhangMS 0:f7f1f0d76dd6 443 amqpvalue_destroy(value);
XinZhangMS 0:f7f1f0d76dd6 444 }
XinZhangMS 0:f7f1f0d76dd6 445
XinZhangMS 0:f7f1f0d76dd6 446 amqpvalue_destroy(key);
XinZhangMS 0:f7f1f0d76dd6 447 }
XinZhangMS 0:f7f1f0d76dd6 448 }
XinZhangMS 0:f7f1f0d76dd6 449
XinZhangMS 0:f7f1f0d76dd6 450 if (result == RESULT_OK)
XinZhangMS 0:f7f1f0d76dd6 451 {
XinZhangMS 0:f7f1f0d76dd6 452 if (link_set_attach_properties(link, attach_properties) != 0)
XinZhangMS 0:f7f1f0d76dd6 453 {
XinZhangMS 0:f7f1f0d76dd6 454 LogError("Failed attaching properties to link");
XinZhangMS 0:f7f1f0d76dd6 455 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 456 }
XinZhangMS 0:f7f1f0d76dd6 457 else
XinZhangMS 0:f7f1f0d76dd6 458 {
XinZhangMS 0:f7f1f0d76dd6 459 result = RESULT_OK;
XinZhangMS 0:f7f1f0d76dd6 460 }
XinZhangMS 0:f7f1f0d76dd6 461 }
XinZhangMS 0:f7f1f0d76dd6 462 }
XinZhangMS 0:f7f1f0d76dd6 463
XinZhangMS 0:f7f1f0d76dd6 464 amqpvalue_destroy(attach_properties);
XinZhangMS 0:f7f1f0d76dd6 465 }
XinZhangMS 0:f7f1f0d76dd6 466
XinZhangMS 0:f7f1f0d76dd6 467 return result;
XinZhangMS 0:f7f1f0d76dd6 468 }
XinZhangMS 0:f7f1f0d76dd6 469
XinZhangMS 0:f7f1f0d76dd6 470 static int create_link_terminus(role link_role, STRING_HANDLE link_name, STRING_HANDLE link_address, AMQP_VALUE* source, AMQP_VALUE* target)
XinZhangMS 0:f7f1f0d76dd6 471 {
XinZhangMS 0:f7f1f0d76dd6 472 int result;
XinZhangMS 0:f7f1f0d76dd6 473 STRING_HANDLE terminus_name;
XinZhangMS 0:f7f1f0d76dd6 474 const char* source_name;
XinZhangMS 0:f7f1f0d76dd6 475 const char* target_name;
XinZhangMS 0:f7f1f0d76dd6 476
XinZhangMS 0:f7f1f0d76dd6 477 if (link_role == role_sender)
XinZhangMS 0:f7f1f0d76dd6 478 {
XinZhangMS 0:f7f1f0d76dd6 479 if ((terminus_name = create_link_terminus_name(link_name, "source")) == NULL)
XinZhangMS 0:f7f1f0d76dd6 480 {
XinZhangMS 0:f7f1f0d76dd6 481 LogError("Failed creating terminus name");
XinZhangMS 0:f7f1f0d76dd6 482 source_name = NULL;
XinZhangMS 0:f7f1f0d76dd6 483 target_name = NULL;
XinZhangMS 0:f7f1f0d76dd6 484 }
XinZhangMS 0:f7f1f0d76dd6 485 else
XinZhangMS 0:f7f1f0d76dd6 486 {
XinZhangMS 0:f7f1f0d76dd6 487 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_077: [The AMQP link source shall be defined as "<link name>-source"]
XinZhangMS 0:f7f1f0d76dd6 488 source_name = STRING_c_str(terminus_name);
XinZhangMS 0:f7f1f0d76dd6 489 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_094: [The AMQP link source shall be defined as <link address>]
XinZhangMS 0:f7f1f0d76dd6 490 target_name = STRING_c_str(link_address);
XinZhangMS 0:f7f1f0d76dd6 491 }
XinZhangMS 0:f7f1f0d76dd6 492 }
XinZhangMS 0:f7f1f0d76dd6 493 else
XinZhangMS 0:f7f1f0d76dd6 494 {
XinZhangMS 0:f7f1f0d76dd6 495 if ((terminus_name = create_link_terminus_name(link_name, "target")) == NULL)
XinZhangMS 0:f7f1f0d76dd6 496 {
XinZhangMS 0:f7f1f0d76dd6 497 LogError("Failed creating terminus name");
XinZhangMS 0:f7f1f0d76dd6 498 source_name = NULL;
XinZhangMS 0:f7f1f0d76dd6 499 target_name = NULL;
XinZhangMS 0:f7f1f0d76dd6 500 }
XinZhangMS 0:f7f1f0d76dd6 501 else
XinZhangMS 0:f7f1f0d76dd6 502 {
XinZhangMS 0:f7f1f0d76dd6 503 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_078: [The AMQP link target shall be defined as <link address>]
XinZhangMS 0:f7f1f0d76dd6 504 source_name = STRING_c_str(link_address);
XinZhangMS 0:f7f1f0d76dd6 505 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_095: [The AMQP link target shall be defined as "<link name>-target"]
XinZhangMS 0:f7f1f0d76dd6 506 target_name = STRING_c_str(terminus_name);
XinZhangMS 0:f7f1f0d76dd6 507 }
XinZhangMS 0:f7f1f0d76dd6 508 }
XinZhangMS 0:f7f1f0d76dd6 509
XinZhangMS 0:f7f1f0d76dd6 510 if (source_name == NULL || target_name == NULL)
XinZhangMS 0:f7f1f0d76dd6 511 {
XinZhangMS 0:f7f1f0d76dd6 512 LogError("Failed creating link source and/or target name (source=%p, target=%p)", source_name, target_name);
XinZhangMS 0:f7f1f0d76dd6 513 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 514 }
XinZhangMS 0:f7f1f0d76dd6 515 else
XinZhangMS 0:f7f1f0d76dd6 516 {
XinZhangMS 0:f7f1f0d76dd6 517 if ((*source = messaging_create_source(source_name)) == NULL)
XinZhangMS 0:f7f1f0d76dd6 518 {
XinZhangMS 0:f7f1f0d76dd6 519 LogError("Failed creating link source");
XinZhangMS 0:f7f1f0d76dd6 520 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 521 }
XinZhangMS 0:f7f1f0d76dd6 522 else
XinZhangMS 0:f7f1f0d76dd6 523 {
XinZhangMS 0:f7f1f0d76dd6 524 if ((*target = messaging_create_target(target_name)) == NULL)
XinZhangMS 0:f7f1f0d76dd6 525 {
XinZhangMS 0:f7f1f0d76dd6 526 LogError("Failed creating link target");
XinZhangMS 0:f7f1f0d76dd6 527 amqpvalue_destroy(*source);
XinZhangMS 0:f7f1f0d76dd6 528 *source = NULL;
XinZhangMS 0:f7f1f0d76dd6 529 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 530 }
XinZhangMS 0:f7f1f0d76dd6 531 else
XinZhangMS 0:f7f1f0d76dd6 532 {
XinZhangMS 0:f7f1f0d76dd6 533 result = RESULT_OK;
XinZhangMS 0:f7f1f0d76dd6 534 }
XinZhangMS 0:f7f1f0d76dd6 535 }
XinZhangMS 0:f7f1f0d76dd6 536 }
XinZhangMS 0:f7f1f0d76dd6 537
XinZhangMS 0:f7f1f0d76dd6 538
XinZhangMS 0:f7f1f0d76dd6 539 STRING_delete(terminus_name);
XinZhangMS 0:f7f1f0d76dd6 540
XinZhangMS 0:f7f1f0d76dd6 541 return result;
XinZhangMS 0:f7f1f0d76dd6 542 }
XinZhangMS 0:f7f1f0d76dd6 543
XinZhangMS 0:f7f1f0d76dd6 544 static LINK_HANDLE create_link(role link_role, SESSION_HANDLE session_handle, AMQP_MESSENGER_LINK_CONFIG* link_config, const char* iothub_host_fqdn, const char* device_id, const char* module_id)
XinZhangMS 0:f7f1f0d76dd6 545 {
XinZhangMS 0:f7f1f0d76dd6 546 LINK_HANDLE result = NULL;
XinZhangMS 0:f7f1f0d76dd6 547 STRING_HANDLE link_address;
XinZhangMS 0:f7f1f0d76dd6 548
XinZhangMS 0:f7f1f0d76dd6 549 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_075: [The AMQP link address shall be defined as "amqps://<`iothub_host_fqdn`>/devices/<`device_id`>/<`instance-config->send_link.source_suffix`>"]
XinZhangMS 0:f7f1f0d76dd6 550 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_092: [The AMQP link address shall be defined as "amqps://<`iothub_host_fqdn`>/devices/<`device_id`>/<`instance-config->receive_link.target_suffix`>"]
XinZhangMS 0:f7f1f0d76dd6 551 if ((link_address = create_link_address(iothub_host_fqdn, device_id, module_id, (link_role == role_sender ? link_config->target_suffix : link_config->source_suffix))) == NULL)
XinZhangMS 0:f7f1f0d76dd6 552 {
XinZhangMS 0:f7f1f0d76dd6 553 LogError("Failed creating the message sender (failed creating the 'link_address')");
XinZhangMS 0:f7f1f0d76dd6 554 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 555 }
XinZhangMS 0:f7f1f0d76dd6 556 else
XinZhangMS 0:f7f1f0d76dd6 557 {
XinZhangMS 0:f7f1f0d76dd6 558 STRING_HANDLE link_name;
XinZhangMS 0:f7f1f0d76dd6 559
XinZhangMS 0:f7f1f0d76dd6 560 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_076: [The AMQP link name shall be defined as "link-snd-<`device_id`>-<locally generated UUID>"]
XinZhangMS 0:f7f1f0d76dd6 561 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_093: [The AMQP link name shall be defined as "link-rcv-<`device_id`>-<locally generated UUID>"]
XinZhangMS 0:f7f1f0d76dd6 562 if ((link_name = create_link_name(link_role, device_id)) == NULL)
XinZhangMS 0:f7f1f0d76dd6 563 {
XinZhangMS 0:f7f1f0d76dd6 564 LogError("Failed creating the link name");
XinZhangMS 0:f7f1f0d76dd6 565 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 566 }
XinZhangMS 0:f7f1f0d76dd6 567 else
XinZhangMS 0:f7f1f0d76dd6 568 {
XinZhangMS 0:f7f1f0d76dd6 569 AMQP_VALUE source = NULL;
XinZhangMS 0:f7f1f0d76dd6 570 AMQP_VALUE target = NULL;
XinZhangMS 0:f7f1f0d76dd6 571
XinZhangMS 0:f7f1f0d76dd6 572 if (create_link_terminus(link_role, link_name, link_address, &source, &target) == RESULT_OK)
XinZhangMS 0:f7f1f0d76dd6 573 {
XinZhangMS 0:f7f1f0d76dd6 574 if ((result = link_create(session_handle, STRING_c_str(link_name), link_role, source, target)) == NULL)
XinZhangMS 0:f7f1f0d76dd6 575 {
XinZhangMS 0:f7f1f0d76dd6 576 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_079: [If the link fails to be created, amqp_messenger_do_work() shall change the state to AMQP_MESSENGER_STATE_ERROR]
XinZhangMS 0:f7f1f0d76dd6 577 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_096: [If the link fails to be created, amqp_messenger_do_work() shall change the state to AMQP_MESSENGER_STATE_ERROR]
XinZhangMS 0:f7f1f0d76dd6 578 LogError("Failed creating the AMQP link");
XinZhangMS 0:f7f1f0d76dd6 579 }
XinZhangMS 0:f7f1f0d76dd6 580 else
XinZhangMS 0:f7f1f0d76dd6 581 {
XinZhangMS 0:f7f1f0d76dd6 582 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_082: [The AMQP link maximum message size shall be set to UINT64_MAX using link_set_max_message_size()]
XinZhangMS 0:f7f1f0d76dd6 583 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_099: [The AMQP link maximum message size shall be set to UINT64_MAX using link_set_max_message_size()]
XinZhangMS 0:f7f1f0d76dd6 584 if (link_set_max_message_size(result, MESSAGE_SENDER_MAX_LINK_SIZE) != RESULT_OK)
XinZhangMS 0:f7f1f0d76dd6 585 {
XinZhangMS 0:f7f1f0d76dd6 586 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_083: [If link_set_max_message_size() fails, it shall be logged and ignored.]
XinZhangMS 0:f7f1f0d76dd6 587 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_100: [If link_set_max_message_size() fails, it shall be logged and ignored.]
XinZhangMS 0:f7f1f0d76dd6 588 LogError("Failed setting link max message size.");
XinZhangMS 0:f7f1f0d76dd6 589 }
XinZhangMS 0:f7f1f0d76dd6 590
XinZhangMS 0:f7f1f0d76dd6 591 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_080: [The AMQP link shall have its ATTACH properties set using `instance->config->send_link.attach_properties`]
XinZhangMS 0:f7f1f0d76dd6 592 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_097: [The AMQP link shall have its ATTACH properties set using `instance->config->receive_link.attach_properties`]
XinZhangMS 0:f7f1f0d76dd6 593 if (link_config->attach_properties != NULL &&
XinZhangMS 0:f7f1f0d76dd6 594 add_link_attach_properties(result, link_config->attach_properties) != RESULT_OK)
XinZhangMS 0:f7f1f0d76dd6 595 {
XinZhangMS 0:f7f1f0d76dd6 596 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_081: [If the AMQP link attach properties fail to be set, amqp_messenger_do_work() shall change the state to AMQP_MESSENGER_STATE_ERROR]
XinZhangMS 0:f7f1f0d76dd6 597 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_098: [If the AMQP link attach properties fail to be set, amqp_messenger_do_work() shall change the state to AMQP_MESSENGER_STATE_ERROR]
XinZhangMS 0:f7f1f0d76dd6 598 LogError("Failed setting link attach properties");
XinZhangMS 0:f7f1f0d76dd6 599 link_destroy(result);
XinZhangMS 0:f7f1f0d76dd6 600 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 601 }
XinZhangMS 0:f7f1f0d76dd6 602 }
XinZhangMS 0:f7f1f0d76dd6 603
XinZhangMS 0:f7f1f0d76dd6 604 amqpvalue_destroy(source);
XinZhangMS 0:f7f1f0d76dd6 605 amqpvalue_destroy(target);
XinZhangMS 0:f7f1f0d76dd6 606 }
XinZhangMS 0:f7f1f0d76dd6 607
XinZhangMS 0:f7f1f0d76dd6 608 STRING_delete(link_name);
XinZhangMS 0:f7f1f0d76dd6 609 }
XinZhangMS 0:f7f1f0d76dd6 610
XinZhangMS 0:f7f1f0d76dd6 611 STRING_delete(link_address);
XinZhangMS 0:f7f1f0d76dd6 612 }
XinZhangMS 0:f7f1f0d76dd6 613
XinZhangMS 0:f7f1f0d76dd6 614 return result;
XinZhangMS 0:f7f1f0d76dd6 615 }
XinZhangMS 0:f7f1f0d76dd6 616
XinZhangMS 0:f7f1f0d76dd6 617 static void destroy_message_sender(AMQP_MESSENGER_INSTANCE* instance)
XinZhangMS 0:f7f1f0d76dd6 618 {
XinZhangMS 0:f7f1f0d76dd6 619 if (instance->message_sender != NULL)
XinZhangMS 0:f7f1f0d76dd6 620 {
XinZhangMS 0:f7f1f0d76dd6 621 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_090: [`instance->message_sender` shall be destroyed using messagesender_destroy()]
XinZhangMS 0:f7f1f0d76dd6 622 messagesender_destroy(instance->message_sender);
XinZhangMS 0:f7f1f0d76dd6 623 instance->message_sender = NULL;
XinZhangMS 0:f7f1f0d76dd6 624 }
XinZhangMS 0:f7f1f0d76dd6 625
XinZhangMS 0:f7f1f0d76dd6 626 instance->message_sender_current_state = MESSAGE_SENDER_STATE_IDLE;
XinZhangMS 0:f7f1f0d76dd6 627 instance->message_sender_previous_state = MESSAGE_SENDER_STATE_IDLE;
XinZhangMS 0:f7f1f0d76dd6 628 instance->last_message_sender_state_change_time = INDEFINITE_TIME;
XinZhangMS 0:f7f1f0d76dd6 629
XinZhangMS 0:f7f1f0d76dd6 630 if (instance->sender_link != NULL)
XinZhangMS 0:f7f1f0d76dd6 631 {
XinZhangMS 0:f7f1f0d76dd6 632 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_091: [`instance->sender_link` shall be destroyed using link_destroy()]
XinZhangMS 0:f7f1f0d76dd6 633 link_destroy(instance->sender_link);
XinZhangMS 0:f7f1f0d76dd6 634 instance->sender_link = NULL;
XinZhangMS 0:f7f1f0d76dd6 635 }
XinZhangMS 0:f7f1f0d76dd6 636 }
XinZhangMS 0:f7f1f0d76dd6 637
XinZhangMS 0:f7f1f0d76dd6 638 static void on_message_sender_state_changed_callback(void* context, MESSAGE_SENDER_STATE new_state, MESSAGE_SENDER_STATE previous_state)
XinZhangMS 0:f7f1f0d76dd6 639 {
XinZhangMS 0:f7f1f0d76dd6 640 if (context == NULL)
XinZhangMS 0:f7f1f0d76dd6 641 {
XinZhangMS 0:f7f1f0d76dd6 642 LogError("on_message_sender_state_changed_callback was invoked with a NULL context; although unexpected, this failure will be ignored");
XinZhangMS 0:f7f1f0d76dd6 643 }
XinZhangMS 0:f7f1f0d76dd6 644 else if (new_state != previous_state)
XinZhangMS 0:f7f1f0d76dd6 645 {
XinZhangMS 0:f7f1f0d76dd6 646 AMQP_MESSENGER_INSTANCE* instance = (AMQP_MESSENGER_INSTANCE*)context;
XinZhangMS 0:f7f1f0d76dd6 647
XinZhangMS 0:f7f1f0d76dd6 648 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_088: [`new_state`, `previous_state` shall be saved into `instance->message_sender_previous_state` and `instance->message_sender_current_state`]
XinZhangMS 0:f7f1f0d76dd6 649 instance->message_sender_current_state = new_state;
XinZhangMS 0:f7f1f0d76dd6 650 instance->message_sender_previous_state = previous_state;
XinZhangMS 0:f7f1f0d76dd6 651 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_089: [`instance->last_message_sender_state_change_time` shall be set using get_time()]
XinZhangMS 0:f7f1f0d76dd6 652 instance->last_message_sender_state_change_time = get_time(NULL);
XinZhangMS 0:f7f1f0d76dd6 653 }
XinZhangMS 0:f7f1f0d76dd6 654 }
XinZhangMS 0:f7f1f0d76dd6 655
XinZhangMS 0:f7f1f0d76dd6 656 static int create_message_sender(AMQP_MESSENGER_INSTANCE* instance)
XinZhangMS 0:f7f1f0d76dd6 657 {
XinZhangMS 0:f7f1f0d76dd6 658 int result;
XinZhangMS 0:f7f1f0d76dd6 659
XinZhangMS 0:f7f1f0d76dd6 660 if ((instance->sender_link = create_link(role_sender,
XinZhangMS 0:f7f1f0d76dd6 661 instance->session_handle, &instance->config->send_link, instance->config->iothub_host_fqdn, instance->config->device_id, instance->config->module_id)) == NULL)
XinZhangMS 0:f7f1f0d76dd6 662 {
XinZhangMS 0:f7f1f0d76dd6 663 LogError("Failed creating the message sender link");
XinZhangMS 0:f7f1f0d76dd6 664 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 665 }
XinZhangMS 0:f7f1f0d76dd6 666 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_084: [`instance->message_sender` shall be created using messagesender_create(), passing the `instance->sender_link` and `on_message_sender_state_changed_callback`]
XinZhangMS 0:f7f1f0d76dd6 667 else if ((instance->message_sender = messagesender_create(instance->sender_link, on_message_sender_state_changed_callback, (void*)instance)) == NULL)
XinZhangMS 0:f7f1f0d76dd6 668 {
XinZhangMS 0:f7f1f0d76dd6 669 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_085: [If messagesender_create() fails, amqp_messenger_do_work() shall fail and return]
XinZhangMS 0:f7f1f0d76dd6 670 LogError("Failed creating the message sender (messagesender_create failed)");
XinZhangMS 0:f7f1f0d76dd6 671 destroy_message_sender(instance);
XinZhangMS 0:f7f1f0d76dd6 672 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 673 }
XinZhangMS 0:f7f1f0d76dd6 674 else
XinZhangMS 0:f7f1f0d76dd6 675 {
XinZhangMS 0:f7f1f0d76dd6 676 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_086: [`instance->message_sender` shall be opened using messagesender_open()]
XinZhangMS 0:f7f1f0d76dd6 677 if (messagesender_open(instance->message_sender) != RESULT_OK)
XinZhangMS 0:f7f1f0d76dd6 678 {
XinZhangMS 0:f7f1f0d76dd6 679 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_087: [If messagesender_open() fails, amqp_messenger_do_work() shall fail and return]
XinZhangMS 0:f7f1f0d76dd6 680 LogError("Failed opening the AMQP message sender.");
XinZhangMS 0:f7f1f0d76dd6 681 destroy_message_sender(instance);
XinZhangMS 0:f7f1f0d76dd6 682 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 683 }
XinZhangMS 0:f7f1f0d76dd6 684 else
XinZhangMS 0:f7f1f0d76dd6 685 {
XinZhangMS 0:f7f1f0d76dd6 686 result = RESULT_OK;
XinZhangMS 0:f7f1f0d76dd6 687 }
XinZhangMS 0:f7f1f0d76dd6 688 }
XinZhangMS 0:f7f1f0d76dd6 689
XinZhangMS 0:f7f1f0d76dd6 690 return result;
XinZhangMS 0:f7f1f0d76dd6 691 }
XinZhangMS 0:f7f1f0d76dd6 692
XinZhangMS 0:f7f1f0d76dd6 693 static void destroy_message_receiver(AMQP_MESSENGER_INSTANCE* instance)
XinZhangMS 0:f7f1f0d76dd6 694 {
XinZhangMS 0:f7f1f0d76dd6 695 if (instance->message_receiver != NULL)
XinZhangMS 0:f7f1f0d76dd6 696 {
XinZhangMS 0:f7f1f0d76dd6 697 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_113: [`instance->message_receiver` shall be closed using messagereceiver_close()]
XinZhangMS 0:f7f1f0d76dd6 698 if (messagereceiver_close(instance->message_receiver) != RESULT_OK)
XinZhangMS 0:f7f1f0d76dd6 699 {
XinZhangMS 0:f7f1f0d76dd6 700 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_114: [If messagereceiver_close() fails, it shall be logged and ignored]
XinZhangMS 0:f7f1f0d76dd6 701 LogError("Failed closing the AMQP message receiver (this failure will be ignored).");
XinZhangMS 0:f7f1f0d76dd6 702 }
XinZhangMS 0:f7f1f0d76dd6 703
XinZhangMS 0:f7f1f0d76dd6 704 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_115: [`instance->message_receiver` shall be destroyed using messagereceiver_destroy()]
XinZhangMS 0:f7f1f0d76dd6 705 messagereceiver_destroy(instance->message_receiver);
XinZhangMS 0:f7f1f0d76dd6 706
XinZhangMS 0:f7f1f0d76dd6 707 instance->message_receiver = NULL;
XinZhangMS 0:f7f1f0d76dd6 708 }
XinZhangMS 0:f7f1f0d76dd6 709
XinZhangMS 0:f7f1f0d76dd6 710 instance->message_receiver_current_state = MESSAGE_RECEIVER_STATE_IDLE;
XinZhangMS 0:f7f1f0d76dd6 711 instance->message_receiver_previous_state = MESSAGE_RECEIVER_STATE_IDLE;
XinZhangMS 0:f7f1f0d76dd6 712 instance->last_message_receiver_state_change_time = INDEFINITE_TIME;
XinZhangMS 0:f7f1f0d76dd6 713
XinZhangMS 0:f7f1f0d76dd6 714 if (instance->receiver_link != NULL)
XinZhangMS 0:f7f1f0d76dd6 715 {
XinZhangMS 0:f7f1f0d76dd6 716 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_116: [`instance->receiver_link` shall be destroyed using link_destroy()]
XinZhangMS 0:f7f1f0d76dd6 717 link_destroy(instance->receiver_link);
XinZhangMS 0:f7f1f0d76dd6 718 instance->receiver_link = NULL;
XinZhangMS 0:f7f1f0d76dd6 719 }
XinZhangMS 0:f7f1f0d76dd6 720 }
XinZhangMS 0:f7f1f0d76dd6 721
XinZhangMS 0:f7f1f0d76dd6 722 static void on_message_receiver_state_changed_callback(const void* context, MESSAGE_RECEIVER_STATE new_state, MESSAGE_RECEIVER_STATE previous_state)
XinZhangMS 0:f7f1f0d76dd6 723 {
XinZhangMS 0:f7f1f0d76dd6 724 if (context == NULL)
XinZhangMS 0:f7f1f0d76dd6 725 {
XinZhangMS 0:f7f1f0d76dd6 726 LogError("on_message_receiver_state_changed_callback was invoked with a NULL context; although unexpected, this failure will be ignored");
XinZhangMS 0:f7f1f0d76dd6 727 }
XinZhangMS 0:f7f1f0d76dd6 728 else
XinZhangMS 0:f7f1f0d76dd6 729 {
XinZhangMS 0:f7f1f0d76dd6 730 if (new_state != previous_state)
XinZhangMS 0:f7f1f0d76dd6 731 {
XinZhangMS 0:f7f1f0d76dd6 732 AMQP_MESSENGER_INSTANCE* instance = (AMQP_MESSENGER_INSTANCE*)context;
XinZhangMS 0:f7f1f0d76dd6 733
XinZhangMS 0:f7f1f0d76dd6 734 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_105: [`new_state`, `previous_state` shall be saved into `instance->message_receiver_previous_state` and `instance->message_receiver_current_state`]
XinZhangMS 0:f7f1f0d76dd6 735 instance->message_receiver_current_state = new_state;
XinZhangMS 0:f7f1f0d76dd6 736 instance->message_receiver_previous_state = previous_state;
XinZhangMS 0:f7f1f0d76dd6 737 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_106: [`instance->last_message_receiver_state_change_time` shall be set using get_time()]
XinZhangMS 0:f7f1f0d76dd6 738 instance->last_message_receiver_state_change_time = get_time(NULL);
XinZhangMS 0:f7f1f0d76dd6 739
XinZhangMS 0:f7f1f0d76dd6 740 if (new_state == MESSAGE_RECEIVER_STATE_OPEN)
XinZhangMS 0:f7f1f0d76dd6 741 {
XinZhangMS 0:f7f1f0d76dd6 742 if (instance->config->on_subscription_changed_callback != NULL)
XinZhangMS 0:f7f1f0d76dd6 743 {
XinZhangMS 0:f7f1f0d76dd6 744 instance->config->on_subscription_changed_callback(instance->config->on_subscription_changed_context, true);
XinZhangMS 0:f7f1f0d76dd6 745 }
XinZhangMS 0:f7f1f0d76dd6 746 }
XinZhangMS 0:f7f1f0d76dd6 747 else if (previous_state == MESSAGE_RECEIVER_STATE_OPEN && new_state != MESSAGE_RECEIVER_STATE_OPEN)
XinZhangMS 0:f7f1f0d76dd6 748 {
XinZhangMS 0:f7f1f0d76dd6 749 if (instance->config->on_subscription_changed_callback != NULL)
XinZhangMS 0:f7f1f0d76dd6 750 {
XinZhangMS 0:f7f1f0d76dd6 751 instance->config->on_subscription_changed_callback(instance->config->on_subscription_changed_context, false);
XinZhangMS 0:f7f1f0d76dd6 752 }
XinZhangMS 0:f7f1f0d76dd6 753 }
XinZhangMS 0:f7f1f0d76dd6 754 }
XinZhangMS 0:f7f1f0d76dd6 755 }
XinZhangMS 0:f7f1f0d76dd6 756 }
XinZhangMS 0:f7f1f0d76dd6 757
XinZhangMS 0:f7f1f0d76dd6 758 static AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO* create_message_disposition_info(AMQP_MESSENGER_INSTANCE* messenger)
XinZhangMS 0:f7f1f0d76dd6 759 {
XinZhangMS 0:f7f1f0d76dd6 760 AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO* result;
XinZhangMS 0:f7f1f0d76dd6 761
XinZhangMS 0:f7f1f0d76dd6 762 if ((result = (AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO*)malloc(sizeof(AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO))) == NULL)
XinZhangMS 0:f7f1f0d76dd6 763 {
XinZhangMS 0:f7f1f0d76dd6 764 LogError("Failed creating AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO container (malloc failed)");
XinZhangMS 0:f7f1f0d76dd6 765 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 766 }
XinZhangMS 0:f7f1f0d76dd6 767 else
XinZhangMS 0:f7f1f0d76dd6 768 {
XinZhangMS 0:f7f1f0d76dd6 769 delivery_number message_id;
XinZhangMS 0:f7f1f0d76dd6 770
XinZhangMS 0:f7f1f0d76dd6 771 if (messagereceiver_get_received_message_id(messenger->message_receiver, &message_id) != RESULT_OK)
XinZhangMS 0:f7f1f0d76dd6 772 {
XinZhangMS 0:f7f1f0d76dd6 773 LogError("Failed creating AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO container (messagereceiver_get_received_message_id failed)");
XinZhangMS 0:f7f1f0d76dd6 774 free(result);
XinZhangMS 0:f7f1f0d76dd6 775 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 776 }
XinZhangMS 0:f7f1f0d76dd6 777 else
XinZhangMS 0:f7f1f0d76dd6 778 {
XinZhangMS 0:f7f1f0d76dd6 779 const char* link_name;
XinZhangMS 0:f7f1f0d76dd6 780
XinZhangMS 0:f7f1f0d76dd6 781 if (messagereceiver_get_link_name(messenger->message_receiver, &link_name) != RESULT_OK)
XinZhangMS 0:f7f1f0d76dd6 782 {
XinZhangMS 0:f7f1f0d76dd6 783 LogError("Failed creating AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO container (messagereceiver_get_link_name failed)");
XinZhangMS 0:f7f1f0d76dd6 784 free(result);
XinZhangMS 0:f7f1f0d76dd6 785 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 786 }
XinZhangMS 0:f7f1f0d76dd6 787 else if (mallocAndStrcpy_s(&result->source, link_name) != RESULT_OK)
XinZhangMS 0:f7f1f0d76dd6 788 {
XinZhangMS 0:f7f1f0d76dd6 789 LogError("Failed creating AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO container (failed copying link name)");
XinZhangMS 0:f7f1f0d76dd6 790 free(result);
XinZhangMS 0:f7f1f0d76dd6 791 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 792 }
XinZhangMS 0:f7f1f0d76dd6 793 else
XinZhangMS 0:f7f1f0d76dd6 794 {
XinZhangMS 0:f7f1f0d76dd6 795 result->message_id = message_id;
XinZhangMS 0:f7f1f0d76dd6 796 }
XinZhangMS 0:f7f1f0d76dd6 797 }
XinZhangMS 0:f7f1f0d76dd6 798 }
XinZhangMS 0:f7f1f0d76dd6 799
XinZhangMS 0:f7f1f0d76dd6 800 return result;
XinZhangMS 0:f7f1f0d76dd6 801 }
XinZhangMS 0:f7f1f0d76dd6 802
XinZhangMS 0:f7f1f0d76dd6 803 static void destroy_message_disposition_info(AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO* disposition_info)
XinZhangMS 0:f7f1f0d76dd6 804 {
XinZhangMS 0:f7f1f0d76dd6 805 free(disposition_info->source);
XinZhangMS 0:f7f1f0d76dd6 806 free(disposition_info);
XinZhangMS 0:f7f1f0d76dd6 807 }
XinZhangMS 0:f7f1f0d76dd6 808
XinZhangMS 0:f7f1f0d76dd6 809 static AMQP_VALUE create_uamqp_disposition_result_from(AMQP_MESSENGER_DISPOSITION_RESULT disposition_result)
XinZhangMS 0:f7f1f0d76dd6 810 {
XinZhangMS 0:f7f1f0d76dd6 811 AMQP_VALUE uamqp_disposition_result;
XinZhangMS 0:f7f1f0d76dd6 812
XinZhangMS 0:f7f1f0d76dd6 813 if (disposition_result == AMQP_MESSENGER_DISPOSITION_RESULT_NONE)
XinZhangMS 0:f7f1f0d76dd6 814 {
XinZhangMS 0:f7f1f0d76dd6 815 uamqp_disposition_result = NULL; // intentionally not sending an answer.
XinZhangMS 0:f7f1f0d76dd6 816 }
XinZhangMS 0:f7f1f0d76dd6 817 else if (disposition_result == AMQP_MESSENGER_DISPOSITION_RESULT_ACCEPTED)
XinZhangMS 0:f7f1f0d76dd6 818 {
XinZhangMS 0:f7f1f0d76dd6 819 uamqp_disposition_result = messaging_delivery_accepted();
XinZhangMS 0:f7f1f0d76dd6 820 }
XinZhangMS 0:f7f1f0d76dd6 821 else if (disposition_result == AMQP_MESSENGER_DISPOSITION_RESULT_RELEASED)
XinZhangMS 0:f7f1f0d76dd6 822 {
XinZhangMS 0:f7f1f0d76dd6 823 uamqp_disposition_result = messaging_delivery_released();
XinZhangMS 0:f7f1f0d76dd6 824 }
XinZhangMS 0:f7f1f0d76dd6 825 else // id est, if (disposition_result == AMQP_MESSENGER_DISPOSITION_RESULT_REJECTED)
XinZhangMS 0:f7f1f0d76dd6 826 {
XinZhangMS 0:f7f1f0d76dd6 827 uamqp_disposition_result = messaging_delivery_rejected("Rejected by application", "Rejected by application");
XinZhangMS 0:f7f1f0d76dd6 828 }
XinZhangMS 0:f7f1f0d76dd6 829
XinZhangMS 0:f7f1f0d76dd6 830 return uamqp_disposition_result;
XinZhangMS 0:f7f1f0d76dd6 831 }
XinZhangMS 0:f7f1f0d76dd6 832
XinZhangMS 0:f7f1f0d76dd6 833 static AMQP_VALUE on_message_received_internal_callback(const void* context, MESSAGE_HANDLE message)
XinZhangMS 0:f7f1f0d76dd6 834 {
XinZhangMS 0:f7f1f0d76dd6 835 AMQP_VALUE result;
XinZhangMS 0:f7f1f0d76dd6 836 AMQP_MESSENGER_INSTANCE* instance = (AMQP_MESSENGER_INSTANCE*)context;
XinZhangMS 0:f7f1f0d76dd6 837 AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO* message_disposition_info;
XinZhangMS 0:f7f1f0d76dd6 838
XinZhangMS 0:f7f1f0d76dd6 839 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_107: [A AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO instance shall be created containing the source link name and message delivery ID]
XinZhangMS 0:f7f1f0d76dd6 840 if ((message_disposition_info = create_message_disposition_info(instance)) == NULL)
XinZhangMS 0:f7f1f0d76dd6 841 {
XinZhangMS 0:f7f1f0d76dd6 842 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_108: [If the AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO instance fails to be created, on_message_received_internal_callback shall return the result of messaging_delivery_released()]
XinZhangMS 0:f7f1f0d76dd6 843 LogError("on_message_received_internal_callback failed (failed creating AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO).");
XinZhangMS 0:f7f1f0d76dd6 844 result = messaging_delivery_released();
XinZhangMS 0:f7f1f0d76dd6 845 }
XinZhangMS 0:f7f1f0d76dd6 846 else
XinZhangMS 0:f7f1f0d76dd6 847 {
XinZhangMS 0:f7f1f0d76dd6 848 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_109: [`instance->on_message_received_callback` shall be invoked passing the `message` and AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO instance]
XinZhangMS 0:f7f1f0d76dd6 849 AMQP_MESSENGER_DISPOSITION_RESULT disposition_result = instance->on_message_received_callback(message, message_disposition_info, instance->on_message_received_context);
XinZhangMS 0:f7f1f0d76dd6 850
XinZhangMS 0:f7f1f0d76dd6 851 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_110: [If `instance->on_message_received_callback` returns AMQP_MESSENGER_DISPOSITION_RESULT_ACCEPTED, on_message_received_internal_callback shall return the result of messaging_delivery_accepted()]
XinZhangMS 0:f7f1f0d76dd6 852 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_111: [If `instance->on_message_received_callback` returns AMQP_MESSENGER_DISPOSITION_RESULT_RELEASED, on_message_received_internal_callback shall return the result of messaging_delivery_released()]
XinZhangMS 0:f7f1f0d76dd6 853 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_112: [If `instance->on_message_received_callback` returns AMQP_MESSENGER_DISPOSITION_RESULT_REJECTED, on_message_received_internal_callback shall return the result of messaging_delivery_rejected()]
XinZhangMS 0:f7f1f0d76dd6 854 result = create_uamqp_disposition_result_from(disposition_result);
XinZhangMS 0:f7f1f0d76dd6 855 }
XinZhangMS 0:f7f1f0d76dd6 856
XinZhangMS 0:f7f1f0d76dd6 857 return result;
XinZhangMS 0:f7f1f0d76dd6 858 }
XinZhangMS 0:f7f1f0d76dd6 859
XinZhangMS 0:f7f1f0d76dd6 860 static int create_message_receiver(AMQP_MESSENGER_INSTANCE* instance)
XinZhangMS 0:f7f1f0d76dd6 861 {
XinZhangMS 0:f7f1f0d76dd6 862 int result;
XinZhangMS 0:f7f1f0d76dd6 863
XinZhangMS 0:f7f1f0d76dd6 864 if ((instance->receiver_link = create_link(role_receiver,
XinZhangMS 0:f7f1f0d76dd6 865 instance->session_handle, &instance->config->receive_link, instance->config->iothub_host_fqdn, instance->config->device_id, instance->config->module_id)) == NULL)
XinZhangMS 0:f7f1f0d76dd6 866 {
XinZhangMS 0:f7f1f0d76dd6 867 LogError("Failed creating the message receiver link");
XinZhangMS 0:f7f1f0d76dd6 868 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 869 }
XinZhangMS 0:f7f1f0d76dd6 870 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_101: [`instance->message_receiver` shall be created using messagereceiver_create(), passing the `instance->receiver_link` and `on_message_receiver_state_changed_callback`]
XinZhangMS 0:f7f1f0d76dd6 871 else if ((instance->message_receiver = messagereceiver_create(instance->receiver_link, on_message_receiver_state_changed_callback, (void*)instance)) == NULL)
XinZhangMS 0:f7f1f0d76dd6 872 {
XinZhangMS 0:f7f1f0d76dd6 873 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_102: [If messagereceiver_create() fails, amqp_messenger_do_work() shall fail and return]
XinZhangMS 0:f7f1f0d76dd6 874 LogError("Failed creating the message receiver (messagereceiver_create failed)");
XinZhangMS 0:f7f1f0d76dd6 875 destroy_message_receiver(instance);
XinZhangMS 0:f7f1f0d76dd6 876 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 877 }
XinZhangMS 0:f7f1f0d76dd6 878 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_103: [`instance->message_receiver` shall be opened using messagereceiver_open() passing `on_message_received_internal_callback`]
XinZhangMS 0:f7f1f0d76dd6 879 else if (messagereceiver_open(instance->message_receiver, on_message_received_internal_callback, (void*)instance) != RESULT_OK)
XinZhangMS 0:f7f1f0d76dd6 880 {
XinZhangMS 0:f7f1f0d76dd6 881 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_104: [If messagereceiver_open() fails, amqp_messenger_do_work() shall fail and return]
XinZhangMS 0:f7f1f0d76dd6 882 LogError("Failed opening the AMQP message receiver.");
XinZhangMS 0:f7f1f0d76dd6 883 destroy_message_receiver(instance);
XinZhangMS 0:f7f1f0d76dd6 884 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 885 }
XinZhangMS 0:f7f1f0d76dd6 886 else
XinZhangMS 0:f7f1f0d76dd6 887 {
XinZhangMS 0:f7f1f0d76dd6 888 result = RESULT_OK;
XinZhangMS 0:f7f1f0d76dd6 889 }
XinZhangMS 0:f7f1f0d76dd6 890
XinZhangMS 0:f7f1f0d76dd6 891 return result;
XinZhangMS 0:f7f1f0d76dd6 892 }
XinZhangMS 0:f7f1f0d76dd6 893
XinZhangMS 0:f7f1f0d76dd6 894 static void on_send_complete_callback(void* context, MESSAGE_SEND_RESULT send_result)
XinZhangMS 0:f7f1f0d76dd6 895 {
XinZhangMS 0:f7f1f0d76dd6 896 if (context != NULL)
XinZhangMS 0:f7f1f0d76dd6 897 {
XinZhangMS 0:f7f1f0d76dd6 898 MESSAGE_QUEUE_RESULT mq_result;
XinZhangMS 0:f7f1f0d76dd6 899 MESSAGE_SEND_CONTEXT* msg_ctx = (MESSAGE_SEND_CONTEXT*)context;
XinZhangMS 0:f7f1f0d76dd6 900
XinZhangMS 0:f7f1f0d76dd6 901 if (send_result == MESSAGE_SEND_OK)
XinZhangMS 0:f7f1f0d76dd6 902 {
XinZhangMS 0:f7f1f0d76dd6 903 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_121: [If no failure occurs, `context->on_process_message_completed_callback` shall be invoked with result MESSAGE_QUEUE_SUCCESS]
XinZhangMS 0:f7f1f0d76dd6 904 mq_result = MESSAGE_QUEUE_SUCCESS;
XinZhangMS 0:f7f1f0d76dd6 905 }
XinZhangMS 0:f7f1f0d76dd6 906 else
XinZhangMS 0:f7f1f0d76dd6 907 {
XinZhangMS 0:f7f1f0d76dd6 908 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_122: [If a failure occurred, `context->on_process_message_completed_callback` shall be invoked with result MESSAGE_QUEUE_ERROR]
XinZhangMS 0:f7f1f0d76dd6 909 mq_result = MESSAGE_QUEUE_ERROR;
XinZhangMS 0:f7f1f0d76dd6 910 }
XinZhangMS 0:f7f1f0d76dd6 911
XinZhangMS 0:f7f1f0d76dd6 912 msg_ctx->on_process_message_completed_callback(msg_ctx->messenger->send_queue, (MQ_MESSAGE_HANDLE)msg_ctx->message, mq_result, NULL);
XinZhangMS 0:f7f1f0d76dd6 913 }
XinZhangMS 0:f7f1f0d76dd6 914 }
XinZhangMS 0:f7f1f0d76dd6 915
XinZhangMS 0:f7f1f0d76dd6 916 static void on_process_message_callback(MESSAGE_QUEUE_HANDLE message_queue, MQ_MESSAGE_HANDLE message, PROCESS_MESSAGE_COMPLETED_CALLBACK on_process_message_completed_callback, void* context)
XinZhangMS 0:f7f1f0d76dd6 917 {
XinZhangMS 0:f7f1f0d76dd6 918 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_117: [If any argument is NULL, `on_process_message_callback` shall return immediatelly]
XinZhangMS 0:f7f1f0d76dd6 919 if (message_queue == NULL || message == NULL || on_process_message_completed_callback == NULL || context == NULL)
XinZhangMS 0:f7f1f0d76dd6 920 {
XinZhangMS 0:f7f1f0d76dd6 921 LogError("Invalid argument (message_queue=%p, message=%p, on_process_message_completed_callback=%p, context=%p)", message_queue, message, on_process_message_completed_callback, context);
XinZhangMS 0:f7f1f0d76dd6 922 }
XinZhangMS 0:f7f1f0d76dd6 923 else
XinZhangMS 0:f7f1f0d76dd6 924 {
XinZhangMS 0:f7f1f0d76dd6 925 MESSAGE_SEND_CONTEXT* message_context = (MESSAGE_SEND_CONTEXT*)context;
XinZhangMS 0:f7f1f0d76dd6 926 message_context->on_process_message_completed_callback = on_process_message_completed_callback;
XinZhangMS 0:f7f1f0d76dd6 927
XinZhangMS 0:f7f1f0d76dd6 928 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_118: [The MESSAGE_HANDLE shall be submitted for sending using messagesender_send(), passing `on_send_complete_callback`]
XinZhangMS 0:f7f1f0d76dd6 929 if (messagesender_send_async(message_context->messenger->message_sender, (MESSAGE_HANDLE)message, on_send_complete_callback, context, 0) == NULL)
XinZhangMS 0:f7f1f0d76dd6 930 {
XinZhangMS 0:f7f1f0d76dd6 931 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_119: [If messagesender_send() fails, `on_process_message_completed_callback` shall be invoked with result MESSAGE_QUEUE_ERROR]
XinZhangMS 0:f7f1f0d76dd6 932 LogError("Failed sending AMQP message");
XinZhangMS 0:f7f1f0d76dd6 933 on_process_message_completed_callback(message_queue, message, MESSAGE_QUEUE_ERROR, NULL);
XinZhangMS 0:f7f1f0d76dd6 934 }
XinZhangMS 0:f7f1f0d76dd6 935
XinZhangMS 0:f7f1f0d76dd6 936 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_120: [The MESSAGE_HANDLE shall be destroyed using message_destroy() and marked as destroyed in the context provided]
XinZhangMS 0:f7f1f0d76dd6 937 message_destroy((MESSAGE_HANDLE)message);
XinZhangMS 0:f7f1f0d76dd6 938 message_context->is_destroyed = true;
XinZhangMS 0:f7f1f0d76dd6 939 }
XinZhangMS 0:f7f1f0d76dd6 940 }
XinZhangMS 0:f7f1f0d76dd6 941
XinZhangMS 0:f7f1f0d76dd6 942 static void on_message_processing_completed_callback(MQ_MESSAGE_HANDLE message, MESSAGE_QUEUE_RESULT result, USER_DEFINED_REASON reason, void* message_context)
XinZhangMS 0:f7f1f0d76dd6 943 {
XinZhangMS 0:f7f1f0d76dd6 944 (void)reason;
XinZhangMS 0:f7f1f0d76dd6 945
XinZhangMS 0:f7f1f0d76dd6 946 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_020: [If `messenger_context` is NULL, `on_message_processing_completed_callback` shall return immediately]
XinZhangMS 0:f7f1f0d76dd6 947 if (message_context == NULL)
XinZhangMS 0:f7f1f0d76dd6 948 {
XinZhangMS 0:f7f1f0d76dd6 949 LogError("on_message_processing_completed_callback invoked with NULL context");
XinZhangMS 0:f7f1f0d76dd6 950 }
XinZhangMS 0:f7f1f0d76dd6 951 else
XinZhangMS 0:f7f1f0d76dd6 952 {
XinZhangMS 0:f7f1f0d76dd6 953 MESSAGE_SEND_CONTEXT* msg_ctx = (MESSAGE_SEND_CONTEXT*)message_context;
XinZhangMS 0:f7f1f0d76dd6 954 AMQP_MESSENGER_SEND_RESULT messenger_send_result;
XinZhangMS 0:f7f1f0d76dd6 955 AMQP_MESSENGER_REASON messenger_send_reason;
XinZhangMS 0:f7f1f0d76dd6 956
XinZhangMS 0:f7f1f0d76dd6 957 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_021: [If `result` is MESSAGE_QUEUE_SUCCESS, `send_ctx->on_send_complete_callback` shall be invoked with AMQP_MESSENGER_SEND_RESULT_SUCCESS and AMQP_MESSENGER_REASON_NONE]
XinZhangMS 0:f7f1f0d76dd6 958 if (result == MESSAGE_QUEUE_SUCCESS)
XinZhangMS 0:f7f1f0d76dd6 959 {
XinZhangMS 0:f7f1f0d76dd6 960 messenger_send_result = AMQP_MESSENGER_SEND_RESULT_SUCCESS;
XinZhangMS 0:f7f1f0d76dd6 961 messenger_send_reason = AMQP_MESSENGER_REASON_NONE;
XinZhangMS 0:f7f1f0d76dd6 962
XinZhangMS 0:f7f1f0d76dd6 963 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_025: [If `result` is MESSAGE_QUEUE_SUCCESS or MESSAGE_QUEUE_CANCELLED, `instance->send_error_count` shall be set to 0]
XinZhangMS 0:f7f1f0d76dd6 964 msg_ctx->messenger->send_error_count = 0;
XinZhangMS 0:f7f1f0d76dd6 965 }
XinZhangMS 0:f7f1f0d76dd6 966 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_022: [If `result` is MESSAGE_QUEUE_TIMEOUT, `send_ctx->on_send_complete_callback` shall be invoked with AMQP_MESSENGER_SEND_RESULT_ERROR and AMQP_MESSENGER_REASON_TIMEOUT]
XinZhangMS 0:f7f1f0d76dd6 967 else if (result == MESSAGE_QUEUE_TIMEOUT)
XinZhangMS 0:f7f1f0d76dd6 968 {
XinZhangMS 0:f7f1f0d76dd6 969 messenger_send_result = AMQP_MESSENGER_SEND_RESULT_ERROR;
XinZhangMS 0:f7f1f0d76dd6 970 messenger_send_reason = AMQP_MESSENGER_REASON_TIMEOUT;
XinZhangMS 0:f7f1f0d76dd6 971
XinZhangMS 0:f7f1f0d76dd6 972 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_026: [Otherwise `instance->send_error_count` shall be incremented by 1]
XinZhangMS 0:f7f1f0d76dd6 973 msg_ctx->messenger->send_error_count++;
XinZhangMS 0:f7f1f0d76dd6 974 }
XinZhangMS 0:f7f1f0d76dd6 975 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_023: [If `result` is MESSAGE_QUEUE_CANCELLED and the messenger is STOPPED, `send_ctx->on_send_complete_callback` shall be invoked with AMQP_MESSENGER_SEND_RESULT_CANCELLED and AMQP_MESSENGER_REASON_MESSENGER_DESTROYED]
XinZhangMS 0:f7f1f0d76dd6 976 else if (result == MESSAGE_QUEUE_CANCELLED && msg_ctx->messenger->state == AMQP_MESSENGER_STATE_STOPPED)
XinZhangMS 0:f7f1f0d76dd6 977 {
XinZhangMS 0:f7f1f0d76dd6 978 messenger_send_result = AMQP_MESSENGER_SEND_RESULT_CANCELLED;
XinZhangMS 0:f7f1f0d76dd6 979 messenger_send_reason = AMQP_MESSENGER_REASON_MESSENGER_DESTROYED;
XinZhangMS 0:f7f1f0d76dd6 980
XinZhangMS 0:f7f1f0d76dd6 981 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_025: [If `result` is MESSAGE_QUEUE_SUCCESS or MESSAGE_QUEUE_CANCELLED, `instance->send_error_count` shall be set to 0]
XinZhangMS 0:f7f1f0d76dd6 982 msg_ctx->messenger->send_error_count = 0;
XinZhangMS 0:f7f1f0d76dd6 983 }
XinZhangMS 0:f7f1f0d76dd6 984 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_024: [Otherwise `send_ctx->on_send_complete_callback` shall be invoked with AMQP_MESSENGER_SEND_RESULT_ERROR and AMQP_MESSENGER_REASON_FAIL_SENDING]
XinZhangMS 0:f7f1f0d76dd6 985 else
XinZhangMS 0:f7f1f0d76dd6 986 {
XinZhangMS 0:f7f1f0d76dd6 987 messenger_send_result = AMQP_MESSENGER_SEND_RESULT_ERROR;
XinZhangMS 0:f7f1f0d76dd6 988 messenger_send_reason = AMQP_MESSENGER_REASON_FAIL_SENDING;
XinZhangMS 0:f7f1f0d76dd6 989
XinZhangMS 0:f7f1f0d76dd6 990 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_026: [Otherwise `instance->send_error_count` shall be incremented by 1]
XinZhangMS 0:f7f1f0d76dd6 991 msg_ctx->messenger->send_error_count++;
XinZhangMS 0:f7f1f0d76dd6 992 }
XinZhangMS 0:f7f1f0d76dd6 993
XinZhangMS 0:f7f1f0d76dd6 994 if (msg_ctx->on_send_complete_callback != NULL)
XinZhangMS 0:f7f1f0d76dd6 995 {
XinZhangMS 0:f7f1f0d76dd6 996 msg_ctx->on_send_complete_callback(messenger_send_result, messenger_send_reason, msg_ctx->user_context);
XinZhangMS 0:f7f1f0d76dd6 997 }
XinZhangMS 0:f7f1f0d76dd6 998
XinZhangMS 0:f7f1f0d76dd6 999 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_027: [If `message` has not been destroyed, it shall be destroyed using message_destroy()]
XinZhangMS 0:f7f1f0d76dd6 1000 if (!msg_ctx->is_destroyed)
XinZhangMS 0:f7f1f0d76dd6 1001 {
XinZhangMS 0:f7f1f0d76dd6 1002 message_destroy((MESSAGE_HANDLE)message);
XinZhangMS 0:f7f1f0d76dd6 1003 }
XinZhangMS 0:f7f1f0d76dd6 1004
XinZhangMS 0:f7f1f0d76dd6 1005 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_028: [`send_ctx` shall be destroyed]
XinZhangMS 0:f7f1f0d76dd6 1006 destroy_message_send_context(msg_ctx);
XinZhangMS 0:f7f1f0d76dd6 1007 }
XinZhangMS 0:f7f1f0d76dd6 1008 }
XinZhangMS 0:f7f1f0d76dd6 1009
XinZhangMS 0:f7f1f0d76dd6 1010
XinZhangMS 0:f7f1f0d76dd6 1011 // ---------- Set/Retrieve Options Helpers ----------//
XinZhangMS 0:f7f1f0d76dd6 1012
XinZhangMS 0:f7f1f0d76dd6 1013 static void* amqp_messenger_clone_option(const char* name, const void* value)
XinZhangMS 0:f7f1f0d76dd6 1014 {
XinZhangMS 0:f7f1f0d76dd6 1015 void* result;
XinZhangMS 0:f7f1f0d76dd6 1016
XinZhangMS 0:f7f1f0d76dd6 1017 if (name == NULL || value == NULL)
XinZhangMS 0:f7f1f0d76dd6 1018 {
XinZhangMS 0:f7f1f0d76dd6 1019 LogError("invalid argument (name=%p, value=%p)", name, value);
XinZhangMS 0:f7f1f0d76dd6 1020 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 1021 }
XinZhangMS 0:f7f1f0d76dd6 1022 else
XinZhangMS 0:f7f1f0d76dd6 1023 {
XinZhangMS 0:f7f1f0d76dd6 1024 if (strcmp(MESSENGER_SAVED_MQ_OPTIONS, name) == 0)
XinZhangMS 0:f7f1f0d76dd6 1025 {
XinZhangMS 0:f7f1f0d76dd6 1026 if ((result = (void*)OptionHandler_Clone((OPTIONHANDLER_HANDLE)value)) == NULL)
XinZhangMS 0:f7f1f0d76dd6 1027 {
XinZhangMS 0:f7f1f0d76dd6 1028 LogError("failed cloning option '%s'", name);
XinZhangMS 0:f7f1f0d76dd6 1029 }
XinZhangMS 0:f7f1f0d76dd6 1030 }
XinZhangMS 0:f7f1f0d76dd6 1031 else
XinZhangMS 0:f7f1f0d76dd6 1032 {
XinZhangMS 0:f7f1f0d76dd6 1033 LogError("Failed to clone messenger option (option with name '%s' is not suppported)", name);
XinZhangMS 0:f7f1f0d76dd6 1034 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 1035 }
XinZhangMS 0:f7f1f0d76dd6 1036 }
XinZhangMS 0:f7f1f0d76dd6 1037
XinZhangMS 0:f7f1f0d76dd6 1038 return result;
XinZhangMS 0:f7f1f0d76dd6 1039 }
XinZhangMS 0:f7f1f0d76dd6 1040
XinZhangMS 0:f7f1f0d76dd6 1041 static void amqp_messenger_destroy_option(const char* name, const void* value)
XinZhangMS 0:f7f1f0d76dd6 1042 {
XinZhangMS 0:f7f1f0d76dd6 1043 if (name == NULL || value == NULL)
XinZhangMS 0:f7f1f0d76dd6 1044 {
XinZhangMS 0:f7f1f0d76dd6 1045 LogError("invalid argument (name=%p, value=%p)", name, value);
XinZhangMS 0:f7f1f0d76dd6 1046 }
XinZhangMS 0:f7f1f0d76dd6 1047 else if (strcmp(MESSENGER_SAVED_MQ_OPTIONS, name) == 0)
XinZhangMS 0:f7f1f0d76dd6 1048 {
XinZhangMS 0:f7f1f0d76dd6 1049 OptionHandler_Destroy((OPTIONHANDLER_HANDLE)value);
XinZhangMS 0:f7f1f0d76dd6 1050 }
XinZhangMS 0:f7f1f0d76dd6 1051 else
XinZhangMS 0:f7f1f0d76dd6 1052 {
XinZhangMS 0:f7f1f0d76dd6 1053 LogError("invalid argument (option '%s' is not suppported)", name);
XinZhangMS 0:f7f1f0d76dd6 1054 }
XinZhangMS 0:f7f1f0d76dd6 1055 }
XinZhangMS 0:f7f1f0d76dd6 1056
XinZhangMS 0:f7f1f0d76dd6 1057
XinZhangMS 0:f7f1f0d76dd6 1058 // Public API:
XinZhangMS 0:f7f1f0d76dd6 1059
XinZhangMS 0:f7f1f0d76dd6 1060 int amqp_messenger_subscribe_for_messages(AMQP_MESSENGER_HANDLE messenger_handle, ON_AMQP_MESSENGER_MESSAGE_RECEIVED on_message_received_callback, void* context)
XinZhangMS 0:f7f1f0d76dd6 1061 {
XinZhangMS 0:f7f1f0d76dd6 1062 int result;
XinZhangMS 0:f7f1f0d76dd6 1063
XinZhangMS 0:f7f1f0d76dd6 1064 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_035: [If `messenger_handle` or `on_message_received_callback` are NULL, amqp_messenger_subscribe_for_messages() shall fail and return non-zero value]
XinZhangMS 0:f7f1f0d76dd6 1065 if (messenger_handle == NULL || on_message_received_callback == NULL || context == NULL)
XinZhangMS 0:f7f1f0d76dd6 1066 {
XinZhangMS 0:f7f1f0d76dd6 1067 LogError("Invalid argument (messenger_handle=%p, on_message_received_callback=%p, context=%p)", messenger_handle, on_message_received_callback, context);
XinZhangMS 0:f7f1f0d76dd6 1068 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1069 }
XinZhangMS 0:f7f1f0d76dd6 1070 else
XinZhangMS 0:f7f1f0d76dd6 1071 {
XinZhangMS 0:f7f1f0d76dd6 1072 AMQP_MESSENGER_INSTANCE* instance = (AMQP_MESSENGER_INSTANCE*)messenger_handle;
XinZhangMS 0:f7f1f0d76dd6 1073
XinZhangMS 0:f7f1f0d76dd6 1074 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_036: [`on_message_received_callback` and `context` shall be saved on `instance->on_message_received_callback`]
XinZhangMS 0:f7f1f0d76dd6 1075 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_037: [amqp_messenger_subscribe_for_messages() shall set `instance->receive_messages` to true]
XinZhangMS 0:f7f1f0d76dd6 1076 instance->on_message_received_callback = on_message_received_callback;
XinZhangMS 0:f7f1f0d76dd6 1077 instance->on_message_received_context = context;
XinZhangMS 0:f7f1f0d76dd6 1078 instance->receive_messages = true;
XinZhangMS 0:f7f1f0d76dd6 1079
XinZhangMS 0:f7f1f0d76dd6 1080 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_038: [If no failures occurr, amqp_messenger_subscribe_for_messages() shall return 0]
XinZhangMS 0:f7f1f0d76dd6 1081 result = RESULT_OK;
XinZhangMS 0:f7f1f0d76dd6 1082 }
XinZhangMS 0:f7f1f0d76dd6 1083
XinZhangMS 0:f7f1f0d76dd6 1084 return result;
XinZhangMS 0:f7f1f0d76dd6 1085 }
XinZhangMS 0:f7f1f0d76dd6 1086
XinZhangMS 0:f7f1f0d76dd6 1087 int amqp_messenger_unsubscribe_for_messages(AMQP_MESSENGER_HANDLE messenger_handle)
XinZhangMS 0:f7f1f0d76dd6 1088 {
XinZhangMS 0:f7f1f0d76dd6 1089 int result;
XinZhangMS 0:f7f1f0d76dd6 1090
XinZhangMS 0:f7f1f0d76dd6 1091 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_039: [If `messenger_handle` is NULL, amqp_messenger_unsubscribe_for_messages() shall fail and return non-zero value]
XinZhangMS 0:f7f1f0d76dd6 1092 if (messenger_handle == NULL)
XinZhangMS 0:f7f1f0d76dd6 1093 {
XinZhangMS 0:f7f1f0d76dd6 1094 LogError("Invalid argument (messenger_handle is NULL)");
XinZhangMS 0:f7f1f0d76dd6 1095 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1096 }
XinZhangMS 0:f7f1f0d76dd6 1097 else
XinZhangMS 0:f7f1f0d76dd6 1098 {
XinZhangMS 0:f7f1f0d76dd6 1099 AMQP_MESSENGER_INSTANCE* instance = (AMQP_MESSENGER_INSTANCE*)messenger_handle;
XinZhangMS 0:f7f1f0d76dd6 1100
XinZhangMS 0:f7f1f0d76dd6 1101 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_040: [`instance->receive_messages` shall be saved to false]
XinZhangMS 0:f7f1f0d76dd6 1102 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_041: [`instance->on_message_received_callback` and `instance->on_message_received_context` shall be set to NULL]
XinZhangMS 0:f7f1f0d76dd6 1103 instance->receive_messages = false;
XinZhangMS 0:f7f1f0d76dd6 1104 instance->on_message_received_callback = NULL;
XinZhangMS 0:f7f1f0d76dd6 1105 instance->on_message_received_context = NULL;
XinZhangMS 0:f7f1f0d76dd6 1106
XinZhangMS 0:f7f1f0d76dd6 1107 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_042: [If no failures occurr, amqp_messenger_unsubscribe_for_messages() shall return 0]
XinZhangMS 0:f7f1f0d76dd6 1108 result = RESULT_OK;
XinZhangMS 0:f7f1f0d76dd6 1109 }
XinZhangMS 0:f7f1f0d76dd6 1110
XinZhangMS 0:f7f1f0d76dd6 1111 return result;
XinZhangMS 0:f7f1f0d76dd6 1112 }
XinZhangMS 0:f7f1f0d76dd6 1113
XinZhangMS 0:f7f1f0d76dd6 1114 int amqp_messenger_send_message_disposition(AMQP_MESSENGER_HANDLE messenger_handle, AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO* disposition_info, AMQP_MESSENGER_DISPOSITION_RESULT disposition_result)
XinZhangMS 0:f7f1f0d76dd6 1115 {
XinZhangMS 0:f7f1f0d76dd6 1116 int result;
XinZhangMS 0:f7f1f0d76dd6 1117
XinZhangMS 0:f7f1f0d76dd6 1118 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_043: [If `messenger_handle` or `disposition_info` are NULL, amqp_messenger_send_message_disposition() shall fail and return non-zero value]
XinZhangMS 0:f7f1f0d76dd6 1119 if (messenger_handle == NULL || disposition_info == NULL)
XinZhangMS 0:f7f1f0d76dd6 1120 {
XinZhangMS 0:f7f1f0d76dd6 1121 LogError("Invalid argument (messenger_handle=%p, disposition_info=%p)", messenger_handle, disposition_info);
XinZhangMS 0:f7f1f0d76dd6 1122 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1123 }
XinZhangMS 0:f7f1f0d76dd6 1124 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_044: [If `disposition_info->source` is NULL, amqp_messenger_send_message_disposition() shall fail and return non-zero value]
XinZhangMS 0:f7f1f0d76dd6 1125 else if (disposition_info->source == NULL)
XinZhangMS 0:f7f1f0d76dd6 1126 {
XinZhangMS 0:f7f1f0d76dd6 1127 LogError("Failed sending message disposition (disposition_info->source is NULL)");
XinZhangMS 0:f7f1f0d76dd6 1128 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1129 }
XinZhangMS 0:f7f1f0d76dd6 1130 else
XinZhangMS 0:f7f1f0d76dd6 1131 {
XinZhangMS 0:f7f1f0d76dd6 1132 AMQP_MESSENGER_INSTANCE* messenger = (AMQP_MESSENGER_INSTANCE*)messenger_handle;
XinZhangMS 0:f7f1f0d76dd6 1133
XinZhangMS 0:f7f1f0d76dd6 1134 if (messenger->message_receiver == NULL)
XinZhangMS 0:f7f1f0d76dd6 1135 {
XinZhangMS 0:f7f1f0d76dd6 1136 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_045: [If `messenger_handle->message_receiver` is NULL, amqp_messenger_send_message_disposition() shall fail and return non-zero value]
XinZhangMS 0:f7f1f0d76dd6 1137 LogError("Failed sending message disposition (message_receiver is not created; check if it is subscribed)");
XinZhangMS 0:f7f1f0d76dd6 1138 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1139 }
XinZhangMS 0:f7f1f0d76dd6 1140 else
XinZhangMS 0:f7f1f0d76dd6 1141 {
XinZhangMS 0:f7f1f0d76dd6 1142 AMQP_VALUE uamqp_disposition_result;
XinZhangMS 0:f7f1f0d76dd6 1143
XinZhangMS 0:f7f1f0d76dd6 1144 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_046: [An AMQP_VALUE disposition result shall be created corresponding to the `disposition_result` provided]
XinZhangMS 0:f7f1f0d76dd6 1145 if ((uamqp_disposition_result = create_uamqp_disposition_result_from(disposition_result)) == NULL)
XinZhangMS 0:f7f1f0d76dd6 1146 {
XinZhangMS 0:f7f1f0d76dd6 1147 LogError("Failed sending message disposition (disposition result %d is not supported)", disposition_result);
XinZhangMS 0:f7f1f0d76dd6 1148 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1149 }
XinZhangMS 0:f7f1f0d76dd6 1150 else
XinZhangMS 0:f7f1f0d76dd6 1151 {
XinZhangMS 0:f7f1f0d76dd6 1152 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_047: [`messagereceiver_send_message_disposition()` shall be invoked passing `disposition_info->source`, `disposition_info->message_id` and the AMQP_VALUE disposition result]
XinZhangMS 0:f7f1f0d76dd6 1153 if (messagereceiver_send_message_disposition(messenger->message_receiver, disposition_info->source, disposition_info->message_id, uamqp_disposition_result) != RESULT_OK)
XinZhangMS 0:f7f1f0d76dd6 1154 {
XinZhangMS 0:f7f1f0d76dd6 1155 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_048: [If `messagereceiver_send_message_disposition()` fails, amqp_messenger_send_message_disposition() shall fail and return non-zero value]
XinZhangMS 0:f7f1f0d76dd6 1156 LogError("Failed sending message disposition (messagereceiver_send_message_disposition failed)");
XinZhangMS 0:f7f1f0d76dd6 1157 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1158 }
XinZhangMS 0:f7f1f0d76dd6 1159 else
XinZhangMS 0:f7f1f0d76dd6 1160 {
XinZhangMS 0:f7f1f0d76dd6 1161 destroy_message_disposition_info(disposition_info);
XinZhangMS 0:f7f1f0d76dd6 1162
XinZhangMS 0:f7f1f0d76dd6 1163 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_050: [If no failures occurr, amqp_messenger_send_message_disposition() shall return 0]
XinZhangMS 0:f7f1f0d76dd6 1164 result = RESULT_OK;
XinZhangMS 0:f7f1f0d76dd6 1165 }
XinZhangMS 0:f7f1f0d76dd6 1166
XinZhangMS 0:f7f1f0d76dd6 1167 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_049: [amqp_messenger_send_message_disposition() shall destroy the AMQP_VALUE disposition result]
XinZhangMS 0:f7f1f0d76dd6 1168 amqpvalue_destroy(uamqp_disposition_result);
XinZhangMS 0:f7f1f0d76dd6 1169 }
XinZhangMS 0:f7f1f0d76dd6 1170 }
XinZhangMS 0:f7f1f0d76dd6 1171 }
XinZhangMS 0:f7f1f0d76dd6 1172
XinZhangMS 0:f7f1f0d76dd6 1173 return result;
XinZhangMS 0:f7f1f0d76dd6 1174 }
XinZhangMS 0:f7f1f0d76dd6 1175
XinZhangMS 0:f7f1f0d76dd6 1176 int amqp_messenger_send_async(AMQP_MESSENGER_HANDLE messenger_handle, MESSAGE_HANDLE message, AMQP_MESSENGER_SEND_COMPLETE_CALLBACK on_user_defined_send_complete_callback, void* user_context)
XinZhangMS 0:f7f1f0d76dd6 1177 {
XinZhangMS 0:f7f1f0d76dd6 1178 int result;
XinZhangMS 0:f7f1f0d76dd6 1179
XinZhangMS 0:f7f1f0d76dd6 1180 if (messenger_handle == NULL || message == NULL || on_user_defined_send_complete_callback == NULL)
XinZhangMS 0:f7f1f0d76dd6 1181 {
XinZhangMS 0:f7f1f0d76dd6 1182 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_010: [If `messenger_handle`, `message` or `on_event_send_complete_callback` are NULL, amqp_messenger_send_async() shall fail and return a non-zero value]
XinZhangMS 0:f7f1f0d76dd6 1183 LogError("Invalid argument (messenger_handle=%p, message=%p, on_user_defined_send_complete_callback=%p)", messenger_handle, message, on_user_defined_send_complete_callback);
XinZhangMS 0:f7f1f0d76dd6 1184 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1185 }
XinZhangMS 0:f7f1f0d76dd6 1186 else
XinZhangMS 0:f7f1f0d76dd6 1187 {
XinZhangMS 0:f7f1f0d76dd6 1188 MESSAGE_HANDLE cloned_message;
XinZhangMS 0:f7f1f0d76dd6 1189
XinZhangMS 0:f7f1f0d76dd6 1190 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_011: [`message` shall be cloned using message_clone()]
XinZhangMS 0:f7f1f0d76dd6 1191 if ((cloned_message = message_clone(message)) == NULL)
XinZhangMS 0:f7f1f0d76dd6 1192 {
XinZhangMS 0:f7f1f0d76dd6 1193 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_012: [If message_clone() fails, amqp_messenger_send_async() shall fail and return a non-zero value]
XinZhangMS 0:f7f1f0d76dd6 1194 LogError("Failed cloning AMQP message");
XinZhangMS 0:f7f1f0d76dd6 1195 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1196 }
XinZhangMS 0:f7f1f0d76dd6 1197 else
XinZhangMS 0:f7f1f0d76dd6 1198 {
XinZhangMS 0:f7f1f0d76dd6 1199 MESSAGE_SEND_CONTEXT* message_context;
XinZhangMS 0:f7f1f0d76dd6 1200 AMQP_MESSENGER_INSTANCE *instance = (AMQP_MESSENGER_INSTANCE*)messenger_handle;
XinZhangMS 0:f7f1f0d76dd6 1201
XinZhangMS 0:f7f1f0d76dd6 1202 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_013: [amqp_messenger_send_async() shall allocate memory for a MESSAGE_SEND_CONTEXT structure (aka `send_ctx`)]
XinZhangMS 0:f7f1f0d76dd6 1203 if ((message_context = create_message_send_context()) == NULL)
XinZhangMS 0:f7f1f0d76dd6 1204 {
XinZhangMS 0:f7f1f0d76dd6 1205 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_014: [If malloc() fails, amqp_messenger_send_async() shall fail and return a non-zero value]
XinZhangMS 0:f7f1f0d76dd6 1206 LogError("Failed creating context for sending message");
XinZhangMS 0:f7f1f0d76dd6 1207 message_destroy(cloned_message);
XinZhangMS 0:f7f1f0d76dd6 1208 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1209 }
XinZhangMS 0:f7f1f0d76dd6 1210 else
XinZhangMS 0:f7f1f0d76dd6 1211 {
XinZhangMS 0:f7f1f0d76dd6 1212 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_015: [The cloned `message`, callback and context shall be saved in ``send_ctx`]
XinZhangMS 0:f7f1f0d76dd6 1213 message_context->message = cloned_message;
XinZhangMS 0:f7f1f0d76dd6 1214 message_context->messenger = instance;
XinZhangMS 0:f7f1f0d76dd6 1215 message_context->on_send_complete_callback = on_user_defined_send_complete_callback;
XinZhangMS 0:f7f1f0d76dd6 1216 message_context->user_context = user_context;
XinZhangMS 0:f7f1f0d76dd6 1217
XinZhangMS 0:f7f1f0d76dd6 1218 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_016: [`send_ctx` shall be added to `instance->send_queue` using message_queue_add(), passing `on_message_processing_completed_callback`]
XinZhangMS 0:f7f1f0d76dd6 1219 if (message_queue_add(instance->send_queue, (MQ_MESSAGE_HANDLE)cloned_message, on_message_processing_completed_callback, (void*)message_context) != RESULT_OK)
XinZhangMS 0:f7f1f0d76dd6 1220 {
XinZhangMS 0:f7f1f0d76dd6 1221 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_017: [If message_queue_add() fails, amqp_messenger_send_async() shall fail and return a non-zero value]
XinZhangMS 0:f7f1f0d76dd6 1222 LogError("Failed adding message to send queue");
XinZhangMS 0:f7f1f0d76dd6 1223 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_018: [If any failure occurs, amqp_messenger_send_async() shall free any memory it has allocated]
XinZhangMS 0:f7f1f0d76dd6 1224 destroy_message_send_context(message_context);
XinZhangMS 0:f7f1f0d76dd6 1225 message_destroy(cloned_message);
XinZhangMS 0:f7f1f0d76dd6 1226 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1227 }
XinZhangMS 0:f7f1f0d76dd6 1228 else
XinZhangMS 0:f7f1f0d76dd6 1229 {
XinZhangMS 0:f7f1f0d76dd6 1230 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_019: [If no failures occur, amqp_messenger_send_async() shall return zero]
XinZhangMS 0:f7f1f0d76dd6 1231 result = RESULT_OK;
XinZhangMS 0:f7f1f0d76dd6 1232 }
XinZhangMS 0:f7f1f0d76dd6 1233 }
XinZhangMS 0:f7f1f0d76dd6 1234 }
XinZhangMS 0:f7f1f0d76dd6 1235 }
XinZhangMS 0:f7f1f0d76dd6 1236
XinZhangMS 0:f7f1f0d76dd6 1237 return result;
XinZhangMS 0:f7f1f0d76dd6 1238 }
XinZhangMS 0:f7f1f0d76dd6 1239
XinZhangMS 0:f7f1f0d76dd6 1240 int amqp_messenger_get_send_status(AMQP_MESSENGER_HANDLE messenger_handle, AMQP_MESSENGER_SEND_STATUS* send_status)
XinZhangMS 0:f7f1f0d76dd6 1241 {
XinZhangMS 0:f7f1f0d76dd6 1242 int result;
XinZhangMS 0:f7f1f0d76dd6 1243
XinZhangMS 0:f7f1f0d76dd6 1244 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_029: [If `messenger_handle` or `send_status` are NULL, amqp_messenger_get_send_status() shall fail and return a non-zero value]
XinZhangMS 0:f7f1f0d76dd6 1245 if (messenger_handle == NULL || send_status == NULL)
XinZhangMS 0:f7f1f0d76dd6 1246 {
XinZhangMS 0:f7f1f0d76dd6 1247 LogError("Invalid argument (messenger_handle=%p, send_status=%p)", messenger_handle, send_status);
XinZhangMS 0:f7f1f0d76dd6 1248 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1249 }
XinZhangMS 0:f7f1f0d76dd6 1250 else
XinZhangMS 0:f7f1f0d76dd6 1251 {
XinZhangMS 0:f7f1f0d76dd6 1252 AMQP_MESSENGER_INSTANCE* instance = (AMQP_MESSENGER_INSTANCE*)messenger_handle;
XinZhangMS 0:f7f1f0d76dd6 1253 bool is_empty;
XinZhangMS 0:f7f1f0d76dd6 1254
XinZhangMS 0:f7f1f0d76dd6 1255 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_030: [message_queue_is_empty() shall be invoked for `instance->send_queue`]
XinZhangMS 0:f7f1f0d76dd6 1256 if (message_queue_is_empty(instance->send_queue, &is_empty) != 0)
XinZhangMS 0:f7f1f0d76dd6 1257 {
XinZhangMS 0:f7f1f0d76dd6 1258 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_031: [If message_queue_is_empty() fails, amqp_messenger_get_send_status() shall fail and return a non-zero value]
XinZhangMS 0:f7f1f0d76dd6 1259 LogError("Failed verifying if send queue is empty");
XinZhangMS 0:f7f1f0d76dd6 1260 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1261 }
XinZhangMS 0:f7f1f0d76dd6 1262 else
XinZhangMS 0:f7f1f0d76dd6 1263 {
XinZhangMS 0:f7f1f0d76dd6 1264 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_032: [If message_queue_is_empty() returns `true`, `send_status` shall be set to AMQP_MESSENGER_SEND_STATUS_IDLE]
XinZhangMS 0:f7f1f0d76dd6 1265 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_033: [Otherwise, `send_status` shall be set to AMQP_MESSENGER_SEND_STATUS_BUSY]
XinZhangMS 0:f7f1f0d76dd6 1266 *send_status = (is_empty ? AMQP_MESSENGER_SEND_STATUS_IDLE : AMQP_MESSENGER_SEND_STATUS_BUSY);
XinZhangMS 0:f7f1f0d76dd6 1267 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_034: [If no failures occur, amqp_messenger_get_send_status() shall return 0]
XinZhangMS 0:f7f1f0d76dd6 1268 result = RESULT_OK;
XinZhangMS 0:f7f1f0d76dd6 1269 }
XinZhangMS 0:f7f1f0d76dd6 1270 }
XinZhangMS 0:f7f1f0d76dd6 1271
XinZhangMS 0:f7f1f0d76dd6 1272 return result;
XinZhangMS 0:f7f1f0d76dd6 1273 }
XinZhangMS 0:f7f1f0d76dd6 1274
XinZhangMS 0:f7f1f0d76dd6 1275 int amqp_messenger_start(AMQP_MESSENGER_HANDLE messenger_handle, SESSION_HANDLE session_handle)
XinZhangMS 0:f7f1f0d76dd6 1276 {
XinZhangMS 0:f7f1f0d76dd6 1277 int result;
XinZhangMS 0:f7f1f0d76dd6 1278
XinZhangMS 0:f7f1f0d76dd6 1279 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_051: [If `messenger_handle` or `session_handle` are NULL, amqp_messenger_start() shall fail and return non-zero value]
XinZhangMS 0:f7f1f0d76dd6 1280 if (messenger_handle == NULL || session_handle == NULL)
XinZhangMS 0:f7f1f0d76dd6 1281 {
XinZhangMS 0:f7f1f0d76dd6 1282 LogError("Invalid argument (session_handle is NULL)");
XinZhangMS 0:f7f1f0d76dd6 1283 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1284 }
XinZhangMS 0:f7f1f0d76dd6 1285 else
XinZhangMS 0:f7f1f0d76dd6 1286 {
XinZhangMS 0:f7f1f0d76dd6 1287 AMQP_MESSENGER_INSTANCE* instance = (AMQP_MESSENGER_INSTANCE*)messenger_handle;
XinZhangMS 0:f7f1f0d76dd6 1288
XinZhangMS 0:f7f1f0d76dd6 1289 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_052: [If `instance->state` is not AMQP_MESSENGER_STATE_STOPPED, amqp_messenger_start() shall fail and return non-zero value]
XinZhangMS 0:f7f1f0d76dd6 1290 if (instance->state != AMQP_MESSENGER_STATE_STOPPED)
XinZhangMS 0:f7f1f0d76dd6 1291 {
XinZhangMS 0:f7f1f0d76dd6 1292 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1293 LogError("amqp_messenger_start failed (current state is %d; expected AMQP_MESSENGER_STATE_STOPPED)", instance->state);
XinZhangMS 0:f7f1f0d76dd6 1294 }
XinZhangMS 0:f7f1f0d76dd6 1295 else
XinZhangMS 0:f7f1f0d76dd6 1296 {
XinZhangMS 0:f7f1f0d76dd6 1297 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_053: [`session_handle` shall be saved on `instance->session_handle`]
XinZhangMS 0:f7f1f0d76dd6 1298 instance->session_handle = session_handle;
XinZhangMS 0:f7f1f0d76dd6 1299
XinZhangMS 0:f7f1f0d76dd6 1300 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_054: [If no failures occurr, `instance->state` shall be set to AMQP_MESSENGER_STATE_STARTING, and `instance->on_state_changed_callback` invoked if provided]
XinZhangMS 0:f7f1f0d76dd6 1301 update_messenger_state(instance, AMQP_MESSENGER_STATE_STARTING);
XinZhangMS 0:f7f1f0d76dd6 1302
XinZhangMS 0:f7f1f0d76dd6 1303 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_055: [If no failures occurr, amqp_messenger_start() shall return 0]
XinZhangMS 0:f7f1f0d76dd6 1304 result = RESULT_OK;
XinZhangMS 0:f7f1f0d76dd6 1305 }
XinZhangMS 0:f7f1f0d76dd6 1306 }
XinZhangMS 0:f7f1f0d76dd6 1307
XinZhangMS 0:f7f1f0d76dd6 1308 return result;
XinZhangMS 0:f7f1f0d76dd6 1309 }
XinZhangMS 0:f7f1f0d76dd6 1310
XinZhangMS 0:f7f1f0d76dd6 1311 int amqp_messenger_stop(AMQP_MESSENGER_HANDLE messenger_handle)
XinZhangMS 0:f7f1f0d76dd6 1312 {
XinZhangMS 0:f7f1f0d76dd6 1313 int result;
XinZhangMS 0:f7f1f0d76dd6 1314
XinZhangMS 0:f7f1f0d76dd6 1315 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_056: [If `messenger_handle` is NULL, amqp_messenger_stop() shall fail and return non-zero value]
XinZhangMS 0:f7f1f0d76dd6 1316 if (messenger_handle == NULL)
XinZhangMS 0:f7f1f0d76dd6 1317 {
XinZhangMS 0:f7f1f0d76dd6 1318 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1319 LogError("Invalid argument (messenger_handle is NULL)");
XinZhangMS 0:f7f1f0d76dd6 1320 }
XinZhangMS 0:f7f1f0d76dd6 1321 else
XinZhangMS 0:f7f1f0d76dd6 1322 {
XinZhangMS 0:f7f1f0d76dd6 1323 AMQP_MESSENGER_INSTANCE* instance = (AMQP_MESSENGER_INSTANCE*)messenger_handle;
XinZhangMS 0:f7f1f0d76dd6 1324
XinZhangMS 0:f7f1f0d76dd6 1325 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_057: [If `instance->state` is AMQP_MESSENGER_STATE_STOPPED, amqp_messenger_stop() shall fail and return non-zero value]
XinZhangMS 0:f7f1f0d76dd6 1326 if (instance->state == AMQP_MESSENGER_STATE_STOPPED)
XinZhangMS 0:f7f1f0d76dd6 1327 {
XinZhangMS 0:f7f1f0d76dd6 1328 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1329 LogError("amqp_messenger_stop failed (messenger is already stopped)");
XinZhangMS 0:f7f1f0d76dd6 1330 }
XinZhangMS 0:f7f1f0d76dd6 1331 else
XinZhangMS 0:f7f1f0d76dd6 1332 {
XinZhangMS 0:f7f1f0d76dd6 1333 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_058: [`instance->state` shall be set to AMQP_MESSENGER_STATE_STOPPING, and `instance->on_state_changed_callback` invoked if provided]
XinZhangMS 0:f7f1f0d76dd6 1334 update_messenger_state(instance, AMQP_MESSENGER_STATE_STOPPING);
XinZhangMS 0:f7f1f0d76dd6 1335
XinZhangMS 0:f7f1f0d76dd6 1336 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_059: [`instance->message_sender` and `instance->message_receiver` shall be destroyed along with all its links]
XinZhangMS 0:f7f1f0d76dd6 1337 destroy_message_sender(instance);
XinZhangMS 0:f7f1f0d76dd6 1338 destroy_message_receiver(instance);
XinZhangMS 0:f7f1f0d76dd6 1339
XinZhangMS 0:f7f1f0d76dd6 1340 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_060: [`instance->send_queue` items shall be rolled back using message_queue_move_all_back_to_pending()]
XinZhangMS 0:f7f1f0d76dd6 1341 if (message_queue_move_all_back_to_pending(instance->send_queue) != RESULT_OK)
XinZhangMS 0:f7f1f0d76dd6 1342 {
XinZhangMS 0:f7f1f0d76dd6 1343 LogError("Messenger failed to move events in progress back to wait_to_send list");
XinZhangMS 0:f7f1f0d76dd6 1344
XinZhangMS 0:f7f1f0d76dd6 1345 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_061: [If message_queue_move_all_back_to_pending() fails, amqp_messenger_stop() shall change the messenger state to AMQP_MESSENGER_STATE_ERROR and return a non-zero value]
XinZhangMS 0:f7f1f0d76dd6 1346 update_messenger_state(instance, AMQP_MESSENGER_STATE_ERROR);
XinZhangMS 0:f7f1f0d76dd6 1347 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1348 }
XinZhangMS 0:f7f1f0d76dd6 1349 else
XinZhangMS 0:f7f1f0d76dd6 1350 {
XinZhangMS 0:f7f1f0d76dd6 1351 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_062: [`instance->state` shall be set to AMQP_MESSENGER_STATE_STOPPED, and `instance->on_state_changed_callback` invoked if provided]
XinZhangMS 0:f7f1f0d76dd6 1352 update_messenger_state(instance, AMQP_MESSENGER_STATE_STOPPED);
XinZhangMS 0:f7f1f0d76dd6 1353 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_063: [If no failures occurr, amqp_messenger_stop() shall return 0]
XinZhangMS 0:f7f1f0d76dd6 1354 result = RESULT_OK;
XinZhangMS 0:f7f1f0d76dd6 1355 }
XinZhangMS 0:f7f1f0d76dd6 1356 }
XinZhangMS 0:f7f1f0d76dd6 1357 }
XinZhangMS 0:f7f1f0d76dd6 1358
XinZhangMS 0:f7f1f0d76dd6 1359 return result;
XinZhangMS 0:f7f1f0d76dd6 1360 }
XinZhangMS 0:f7f1f0d76dd6 1361
XinZhangMS 0:f7f1f0d76dd6 1362 // @brief
XinZhangMS 0:f7f1f0d76dd6 1363 // Sets the messenger module state based on the state changes from messagesender and messagereceiver
XinZhangMS 0:f7f1f0d76dd6 1364 static void process_state_changes(AMQP_MESSENGER_INSTANCE* instance)
XinZhangMS 0:f7f1f0d76dd6 1365 {
XinZhangMS 0:f7f1f0d76dd6 1366 // Note: messagesender and messagereceiver are still not created or already destroyed
XinZhangMS 0:f7f1f0d76dd6 1367 // when state is AMQP_MESSENGER_STATE_STOPPED, so no checking is needed there.
XinZhangMS 0:f7f1f0d76dd6 1368
XinZhangMS 0:f7f1f0d76dd6 1369 if (instance->state == AMQP_MESSENGER_STATE_STARTED)
XinZhangMS 0:f7f1f0d76dd6 1370 {
XinZhangMS 0:f7f1f0d76dd6 1371 if (instance->message_sender_current_state != MESSAGE_SENDER_STATE_OPEN)
XinZhangMS 0:f7f1f0d76dd6 1372 {
XinZhangMS 0:f7f1f0d76dd6 1373 LogError("messagesender reported unexpected state %d while messenger was started", instance->message_sender_current_state);
XinZhangMS 0:f7f1f0d76dd6 1374 update_messenger_state(instance, AMQP_MESSENGER_STATE_ERROR);
XinZhangMS 0:f7f1f0d76dd6 1375 }
XinZhangMS 0:f7f1f0d76dd6 1376 else if (instance->message_receiver != NULL && instance->message_receiver_current_state != MESSAGE_RECEIVER_STATE_OPEN)
XinZhangMS 0:f7f1f0d76dd6 1377 {
XinZhangMS 0:f7f1f0d76dd6 1378 if (instance->message_receiver_current_state == MESSAGE_RECEIVER_STATE_OPENING)
XinZhangMS 0:f7f1f0d76dd6 1379 {
XinZhangMS 0:f7f1f0d76dd6 1380 bool is_timed_out;
XinZhangMS 0:f7f1f0d76dd6 1381 if (is_timeout_reached(instance->last_message_receiver_state_change_time, MAX_MESSAGE_RECEIVER_STATE_CHANGE_TIMEOUT_SECS, &is_timed_out) != RESULT_OK)
XinZhangMS 0:f7f1f0d76dd6 1382 {
XinZhangMS 0:f7f1f0d76dd6 1383 LogError("messenger got an error (failed to verify messagereceiver start timeout)");
XinZhangMS 0:f7f1f0d76dd6 1384 update_messenger_state(instance, AMQP_MESSENGER_STATE_ERROR);
XinZhangMS 0:f7f1f0d76dd6 1385 }
XinZhangMS 0:f7f1f0d76dd6 1386 else if (is_timed_out)
XinZhangMS 0:f7f1f0d76dd6 1387 {
XinZhangMS 0:f7f1f0d76dd6 1388 LogError("messenger got an error (messagereceiver failed to start within expected timeout (%d secs))", MAX_MESSAGE_RECEIVER_STATE_CHANGE_TIMEOUT_SECS);
XinZhangMS 0:f7f1f0d76dd6 1389 update_messenger_state(instance, AMQP_MESSENGER_STATE_ERROR);
XinZhangMS 0:f7f1f0d76dd6 1390 }
XinZhangMS 0:f7f1f0d76dd6 1391 }
XinZhangMS 0:f7f1f0d76dd6 1392 else if (instance->message_receiver_current_state == MESSAGE_RECEIVER_STATE_ERROR ||
XinZhangMS 0:f7f1f0d76dd6 1393 instance->message_receiver_current_state == MESSAGE_RECEIVER_STATE_IDLE)
XinZhangMS 0:f7f1f0d76dd6 1394 {
XinZhangMS 0:f7f1f0d76dd6 1395 LogError("messagereceiver reported unexpected state %d while messenger is starting", instance->message_receiver_current_state);
XinZhangMS 0:f7f1f0d76dd6 1396 update_messenger_state(instance, AMQP_MESSENGER_STATE_ERROR);
XinZhangMS 0:f7f1f0d76dd6 1397 }
XinZhangMS 0:f7f1f0d76dd6 1398 }
XinZhangMS 0:f7f1f0d76dd6 1399 }
XinZhangMS 0:f7f1f0d76dd6 1400 else
XinZhangMS 0:f7f1f0d76dd6 1401 {
XinZhangMS 0:f7f1f0d76dd6 1402 if (instance->state == AMQP_MESSENGER_STATE_STARTING)
XinZhangMS 0:f7f1f0d76dd6 1403 {
XinZhangMS 0:f7f1f0d76dd6 1404 if (instance->message_sender_current_state == MESSAGE_SENDER_STATE_OPEN)
XinZhangMS 0:f7f1f0d76dd6 1405 {
XinZhangMS 0:f7f1f0d76dd6 1406 update_messenger_state(instance, AMQP_MESSENGER_STATE_STARTED);
XinZhangMS 0:f7f1f0d76dd6 1407 }
XinZhangMS 0:f7f1f0d76dd6 1408 else if (instance->message_sender_current_state == MESSAGE_SENDER_STATE_OPENING)
XinZhangMS 0:f7f1f0d76dd6 1409 {
XinZhangMS 0:f7f1f0d76dd6 1410 bool is_timed_out;
XinZhangMS 0:f7f1f0d76dd6 1411 if (is_timeout_reached(instance->last_message_sender_state_change_time, MAX_MESSAGE_SENDER_STATE_CHANGE_TIMEOUT_SECS, &is_timed_out) != RESULT_OK)
XinZhangMS 0:f7f1f0d76dd6 1412 {
XinZhangMS 0:f7f1f0d76dd6 1413 LogError("messenger failed to start (failed to verify messagesender start timeout)");
XinZhangMS 0:f7f1f0d76dd6 1414 update_messenger_state(instance, AMQP_MESSENGER_STATE_ERROR);
XinZhangMS 0:f7f1f0d76dd6 1415 }
XinZhangMS 0:f7f1f0d76dd6 1416 else if (is_timed_out)
XinZhangMS 0:f7f1f0d76dd6 1417 {
XinZhangMS 0:f7f1f0d76dd6 1418 LogError("messenger failed to start (messagesender failed to start within expected timeout (%d secs))", MAX_MESSAGE_SENDER_STATE_CHANGE_TIMEOUT_SECS);
XinZhangMS 0:f7f1f0d76dd6 1419 update_messenger_state(instance, AMQP_MESSENGER_STATE_ERROR);
XinZhangMS 0:f7f1f0d76dd6 1420 }
XinZhangMS 0:f7f1f0d76dd6 1421 }
XinZhangMS 0:f7f1f0d76dd6 1422 // For this module, the only valid scenario where messagesender state is IDLE is if
XinZhangMS 0:f7f1f0d76dd6 1423 // the messagesender hasn't been created yet or already destroyed.
XinZhangMS 0:f7f1f0d76dd6 1424 else if ((instance->message_sender_current_state == MESSAGE_SENDER_STATE_ERROR) ||
XinZhangMS 0:f7f1f0d76dd6 1425 (instance->message_sender_current_state == MESSAGE_SENDER_STATE_CLOSING) ||
XinZhangMS 0:f7f1f0d76dd6 1426 (instance->message_sender_current_state == MESSAGE_SENDER_STATE_IDLE && instance->message_sender != NULL))
XinZhangMS 0:f7f1f0d76dd6 1427 {
XinZhangMS 0:f7f1f0d76dd6 1428 LogError("messagesender reported unexpected state %d while messenger is starting", instance->message_sender_current_state);
XinZhangMS 0:f7f1f0d76dd6 1429 update_messenger_state(instance, AMQP_MESSENGER_STATE_ERROR);
XinZhangMS 0:f7f1f0d76dd6 1430 }
XinZhangMS 0:f7f1f0d76dd6 1431 }
XinZhangMS 0:f7f1f0d76dd6 1432 // message sender and receiver are stopped/destroyed synchronously, so no need for state control.
XinZhangMS 0:f7f1f0d76dd6 1433 }
XinZhangMS 0:f7f1f0d76dd6 1434 }
XinZhangMS 0:f7f1f0d76dd6 1435
XinZhangMS 0:f7f1f0d76dd6 1436 static void manage_amqp_messengers(AMQP_MESSENGER_INSTANCE* msgr)
XinZhangMS 0:f7f1f0d76dd6 1437 {
XinZhangMS 0:f7f1f0d76dd6 1438 if (msgr->state == AMQP_MESSENGER_STATE_STARTING)
XinZhangMS 0:f7f1f0d76dd6 1439 {
XinZhangMS 0:f7f1f0d76dd6 1440 if (msgr->message_sender == NULL)
XinZhangMS 0:f7f1f0d76dd6 1441 {
XinZhangMS 0:f7f1f0d76dd6 1442 if (create_message_sender(msgr) != RESULT_OK)
XinZhangMS 0:f7f1f0d76dd6 1443 {
XinZhangMS 0:f7f1f0d76dd6 1444 update_messenger_state(msgr, AMQP_MESSENGER_STATE_ERROR);
XinZhangMS 0:f7f1f0d76dd6 1445 }
XinZhangMS 0:f7f1f0d76dd6 1446 }
XinZhangMS 0:f7f1f0d76dd6 1447 }
XinZhangMS 0:f7f1f0d76dd6 1448 else if (msgr->state == AMQP_MESSENGER_STATE_STARTED)
XinZhangMS 0:f7f1f0d76dd6 1449 {
XinZhangMS 0:f7f1f0d76dd6 1450 if (msgr->receive_messages == true &&
XinZhangMS 0:f7f1f0d76dd6 1451 msgr->message_receiver == NULL &&
XinZhangMS 0:f7f1f0d76dd6 1452 create_message_receiver(msgr) != RESULT_OK)
XinZhangMS 0:f7f1f0d76dd6 1453 {
XinZhangMS 0:f7f1f0d76dd6 1454 LogError("amqp_messenger_do_work warning (failed creating the message receiver [%s])", msgr->config->device_id);
XinZhangMS 0:f7f1f0d76dd6 1455 }
XinZhangMS 0:f7f1f0d76dd6 1456 else if (msgr->receive_messages == false && msgr->message_receiver != NULL)
XinZhangMS 0:f7f1f0d76dd6 1457 {
XinZhangMS 0:f7f1f0d76dd6 1458 destroy_message_receiver(msgr);
XinZhangMS 0:f7f1f0d76dd6 1459 }
XinZhangMS 0:f7f1f0d76dd6 1460 }
XinZhangMS 0:f7f1f0d76dd6 1461 }
XinZhangMS 0:f7f1f0d76dd6 1462
XinZhangMS 0:f7f1f0d76dd6 1463 static void handle_errors_and_timeouts(AMQP_MESSENGER_INSTANCE* msgr)
XinZhangMS 0:f7f1f0d76dd6 1464 {
XinZhangMS 0:f7f1f0d76dd6 1465 if (msgr->send_error_count >= msgr->max_send_error_count)
XinZhangMS 0:f7f1f0d76dd6 1466 {
XinZhangMS 0:f7f1f0d76dd6 1467 LogError("Reached max number of consecutive send failures (%d, %d)", msgr->config->device_id, msgr->max_send_error_count);
XinZhangMS 0:f7f1f0d76dd6 1468 update_messenger_state(msgr, AMQP_MESSENGER_STATE_ERROR);
XinZhangMS 0:f7f1f0d76dd6 1469 }
XinZhangMS 0:f7f1f0d76dd6 1470 }
XinZhangMS 0:f7f1f0d76dd6 1471
XinZhangMS 0:f7f1f0d76dd6 1472 void amqp_messenger_do_work(AMQP_MESSENGER_HANDLE messenger_handle)
XinZhangMS 0:f7f1f0d76dd6 1473 {
XinZhangMS 0:f7f1f0d76dd6 1474 if (messenger_handle == NULL)
XinZhangMS 0:f7f1f0d76dd6 1475 {
XinZhangMS 0:f7f1f0d76dd6 1476 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_064: [If `messenger_handle` is NULL, amqp_messenger_do_work() shall fail and return]
XinZhangMS 0:f7f1f0d76dd6 1477 LogError("Invalid argument (messenger_handle is NULL)");
XinZhangMS 0:f7f1f0d76dd6 1478 }
XinZhangMS 0:f7f1f0d76dd6 1479 else
XinZhangMS 0:f7f1f0d76dd6 1480 {
XinZhangMS 0:f7f1f0d76dd6 1481 AMQP_MESSENGER_INSTANCE* msgr = (AMQP_MESSENGER_INSTANCE*)messenger_handle;
XinZhangMS 0:f7f1f0d76dd6 1482
XinZhangMS 0:f7f1f0d76dd6 1483 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_065: [amqp_messenger_do_work() shall update the current state according to the states of message sender and receiver]
XinZhangMS 0:f7f1f0d76dd6 1484 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_068: [If the message sender state changes to MESSAGE_SENDER_STATE_OPEN, amqp_messenger_do_work() shall set the state to AMQP_MESSENGER_STATE_STARTED]
XinZhangMS 0:f7f1f0d76dd6 1485 process_state_changes(msgr);
XinZhangMS 0:f7f1f0d76dd6 1486
XinZhangMS 0:f7f1f0d76dd6 1487 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_066: [If the current state is AMQP_MESSENGER_STARTING, amqp_messenger_do_work() shall create and start `instance->message_sender`]
XinZhangMS 0:f7f1f0d76dd6 1488 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_067: [If the `instance->message_sender` fails to be created/started, amqp_messenger_do_work() shall set the state to AMQP_MESSENGER_STATE_ERROR]
XinZhangMS 0:f7f1f0d76dd6 1489 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_069: [If the `instance->receive_messages` is true, amqp_messenger_do_work() shall create and start `instance->message_receiver` if not done before]
XinZhangMS 0:f7f1f0d76dd6 1490 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_070: [If the `instance->message_receiver` fails to be created/started, amqp_messenger_do_work() shall set the state to AMQP_MESSENGER_STATE_ERROR]
XinZhangMS 0:f7f1f0d76dd6 1491 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_071: [If the `instance->receive_messages` is false, amqp_messenger_do_work() shall stop and destroy `instance->message_receiver` if not done before]
XinZhangMS 0:f7f1f0d76dd6 1492 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_072: [If the `instance->message_receiver` fails to be stopped/destroyed, amqp_messenger_do_work() shall set the state to AMQP_MESSENGER_STATE_ERROR]
XinZhangMS 0:f7f1f0d76dd6 1493 manage_amqp_messengers(msgr);
XinZhangMS 0:f7f1f0d76dd6 1494
XinZhangMS 0:f7f1f0d76dd6 1495 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_073: [amqp_messenger_do_work() shall invoke message_queue_do_work() on `instance->send_queue`]
XinZhangMS 0:f7f1f0d76dd6 1496 if (msgr->state == AMQP_MESSENGER_STATE_STARTED)
XinZhangMS 0:f7f1f0d76dd6 1497 {
XinZhangMS 0:f7f1f0d76dd6 1498 message_queue_do_work(msgr->send_queue);
XinZhangMS 0:f7f1f0d76dd6 1499 }
XinZhangMS 0:f7f1f0d76dd6 1500
XinZhangMS 0:f7f1f0d76dd6 1501 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_074: [If `instance->send_error_count` DEFAULT_MAX_SEND_ERROR_COUNT (10), amqp_messenger_do_work() shall change the state to AMQP_MESSENGER_STATE_ERROR]
XinZhangMS 0:f7f1f0d76dd6 1502 handle_errors_and_timeouts(msgr);
XinZhangMS 0:f7f1f0d76dd6 1503 }
XinZhangMS 0:f7f1f0d76dd6 1504 }
XinZhangMS 0:f7f1f0d76dd6 1505
XinZhangMS 0:f7f1f0d76dd6 1506 void amqp_messenger_destroy(AMQP_MESSENGER_HANDLE messenger_handle)
XinZhangMS 0:f7f1f0d76dd6 1507 {
XinZhangMS 0:f7f1f0d76dd6 1508 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_123: [If `messenger_handle` is NULL, amqp_messenger_destroy() shall fail and return]
XinZhangMS 0:f7f1f0d76dd6 1509 if (messenger_handle == NULL)
XinZhangMS 0:f7f1f0d76dd6 1510 {
XinZhangMS 0:f7f1f0d76dd6 1511 LogError("invalid argument (messenger_handle is NULL)");
XinZhangMS 0:f7f1f0d76dd6 1512 }
XinZhangMS 0:f7f1f0d76dd6 1513 else
XinZhangMS 0:f7f1f0d76dd6 1514 {
XinZhangMS 0:f7f1f0d76dd6 1515 AMQP_MESSENGER_INSTANCE* instance = (AMQP_MESSENGER_INSTANCE*)messenger_handle;
XinZhangMS 0:f7f1f0d76dd6 1516
XinZhangMS 0:f7f1f0d76dd6 1517 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_124: [If `instance->state` is not AMQP_MESSENGER_STATE_STOPPED, amqp_messenger_destroy() shall invoke amqp_messenger_stop()]
XinZhangMS 0:f7f1f0d76dd6 1518 if (instance->state != AMQP_MESSENGER_STATE_STOPPED)
XinZhangMS 0:f7f1f0d76dd6 1519 {
XinZhangMS 0:f7f1f0d76dd6 1520 (void)amqp_messenger_stop(messenger_handle);
XinZhangMS 0:f7f1f0d76dd6 1521 }
XinZhangMS 0:f7f1f0d76dd6 1522
XinZhangMS 0:f7f1f0d76dd6 1523 if (instance->send_queue != NULL)
XinZhangMS 0:f7f1f0d76dd6 1524 {
XinZhangMS 0:f7f1f0d76dd6 1525 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_125: [message_queue_destroy() shall be invoked for `instance->send_queue`]
XinZhangMS 0:f7f1f0d76dd6 1526 message_queue_destroy(instance->send_queue);
XinZhangMS 0:f7f1f0d76dd6 1527 }
XinZhangMS 0:f7f1f0d76dd6 1528
XinZhangMS 0:f7f1f0d76dd6 1529 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_126: [amqp_messenger_destroy() shall release all the memory allocated for `instance`]
XinZhangMS 0:f7f1f0d76dd6 1530 destroy_configuration(instance->config);
XinZhangMS 0:f7f1f0d76dd6 1531
XinZhangMS 0:f7f1f0d76dd6 1532 free((void*)instance);
XinZhangMS 0:f7f1f0d76dd6 1533 }
XinZhangMS 0:f7f1f0d76dd6 1534 }
XinZhangMS 0:f7f1f0d76dd6 1535
XinZhangMS 0:f7f1f0d76dd6 1536 AMQP_MESSENGER_HANDLE amqp_messenger_create(const AMQP_MESSENGER_CONFIG* messenger_config)
XinZhangMS 0:f7f1f0d76dd6 1537 {
XinZhangMS 0:f7f1f0d76dd6 1538 AMQP_MESSENGER_HANDLE handle;
XinZhangMS 0:f7f1f0d76dd6 1539
XinZhangMS 0:f7f1f0d76dd6 1540 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_001: [If `messenger_config` is NULL, amqp_messenger_create() shall return NULL]
XinZhangMS 0:f7f1f0d76dd6 1541 if (!is_valid_configuration(messenger_config))
XinZhangMS 0:f7f1f0d76dd6 1542 {
XinZhangMS 0:f7f1f0d76dd6 1543 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_002: [If `messenger_config`'s `device_id`, `iothub_host_fqdn`, `receive_link.source_suffix` or `send_link.target_suffix` are NULL, amqp_messenger_create() shall return NULL]
XinZhangMS 0:f7f1f0d76dd6 1544 handle = NULL;
XinZhangMS 0:f7f1f0d76dd6 1545 }
XinZhangMS 0:f7f1f0d76dd6 1546 else
XinZhangMS 0:f7f1f0d76dd6 1547 {
XinZhangMS 0:f7f1f0d76dd6 1548 AMQP_MESSENGER_INSTANCE* instance;
XinZhangMS 0:f7f1f0d76dd6 1549
XinZhangMS 0:f7f1f0d76dd6 1550 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_003: [amqp_messenger_create() shall allocate memory for the messenger instance structure (aka `instance`)]
XinZhangMS 0:f7f1f0d76dd6 1551 if ((instance = (AMQP_MESSENGER_INSTANCE*)malloc(sizeof(AMQP_MESSENGER_INSTANCE))) == NULL)
XinZhangMS 0:f7f1f0d76dd6 1552 {
XinZhangMS 0:f7f1f0d76dd6 1553 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_004: [If malloc() fails, amqp_messenger_create() shall fail and return NULL]
XinZhangMS 0:f7f1f0d76dd6 1554 LogError("Failed allocating AMQP_MESSENGER_INSTANCE");
XinZhangMS 0:f7f1f0d76dd6 1555 handle = NULL;
XinZhangMS 0:f7f1f0d76dd6 1556 }
XinZhangMS 0:f7f1f0d76dd6 1557 else
XinZhangMS 0:f7f1f0d76dd6 1558 {
XinZhangMS 0:f7f1f0d76dd6 1559 memset(instance, 0, sizeof(AMQP_MESSENGER_INSTANCE));
XinZhangMS 0:f7f1f0d76dd6 1560
XinZhangMS 0:f7f1f0d76dd6 1561 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_005: [amqp_messenger_create() shall save a copy of `messenger_config` into `instance`]
XinZhangMS 0:f7f1f0d76dd6 1562 if ((instance->config = clone_configuration(messenger_config)) == NULL)
XinZhangMS 0:f7f1f0d76dd6 1563 {
XinZhangMS 0:f7f1f0d76dd6 1564 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_006: [If the copy fails, amqp_messenger_create() shall fail and return NULL]
XinZhangMS 0:f7f1f0d76dd6 1565 LogError("Failed copying AMQP messenger configuration");
XinZhangMS 0:f7f1f0d76dd6 1566 handle = NULL;
XinZhangMS 0:f7f1f0d76dd6 1567 }
XinZhangMS 0:f7f1f0d76dd6 1568 else
XinZhangMS 0:f7f1f0d76dd6 1569 {
XinZhangMS 0:f7f1f0d76dd6 1570 MESSAGE_QUEUE_CONFIG mq_config;
XinZhangMS 0:f7f1f0d76dd6 1571 mq_config.max_retry_count = DEFAULT_EVENT_SEND_RETRY_LIMIT;
XinZhangMS 0:f7f1f0d76dd6 1572 mq_config.max_message_enqueued_time_secs = DEFAULT_EVENT_SEND_TIMEOUT_SECS;
XinZhangMS 0:f7f1f0d76dd6 1573 mq_config.max_message_processing_time_secs = 0;
XinZhangMS 0:f7f1f0d76dd6 1574 mq_config.on_process_message_callback = on_process_message_callback;
XinZhangMS 0:f7f1f0d76dd6 1575
XinZhangMS 0:f7f1f0d76dd6 1576 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_007: [`instance->send_queue` shall be set using message_queue_create(), passing `on_process_message_callback`]
XinZhangMS 0:f7f1f0d76dd6 1577 if ((instance->send_queue = message_queue_create(&mq_config)) == NULL)
XinZhangMS 0:f7f1f0d76dd6 1578 {
XinZhangMS 0:f7f1f0d76dd6 1579 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_008: [If message_queue_create() fails, amqp_messenger_create() shall fail and return NULL]
XinZhangMS 0:f7f1f0d76dd6 1580 LogError("Failed creating message queue");
XinZhangMS 0:f7f1f0d76dd6 1581 handle = NULL;
XinZhangMS 0:f7f1f0d76dd6 1582 }
XinZhangMS 0:f7f1f0d76dd6 1583 else
XinZhangMS 0:f7f1f0d76dd6 1584 {
XinZhangMS 0:f7f1f0d76dd6 1585 instance->state = AMQP_MESSENGER_STATE_STOPPED;
XinZhangMS 0:f7f1f0d76dd6 1586 instance->message_sender_current_state = MESSAGE_SENDER_STATE_IDLE;
XinZhangMS 0:f7f1f0d76dd6 1587 instance->message_sender_previous_state = MESSAGE_SENDER_STATE_IDLE;
XinZhangMS 0:f7f1f0d76dd6 1588 instance->message_receiver_current_state = MESSAGE_RECEIVER_STATE_IDLE;
XinZhangMS 0:f7f1f0d76dd6 1589 instance->message_receiver_previous_state = MESSAGE_RECEIVER_STATE_IDLE;
XinZhangMS 0:f7f1f0d76dd6 1590 instance->last_message_sender_state_change_time = INDEFINITE_TIME;
XinZhangMS 0:f7f1f0d76dd6 1591 instance->last_message_receiver_state_change_time = INDEFINITE_TIME;
XinZhangMS 0:f7f1f0d76dd6 1592 instance->max_send_error_count = DEFAULT_MAX_SEND_ERROR_COUNT;
XinZhangMS 0:f7f1f0d76dd6 1593 instance->receive_messages = false;
XinZhangMS 0:f7f1f0d76dd6 1594
XinZhangMS 0:f7f1f0d76dd6 1595 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_009: [If no failures occurr, amqp_messenger_create() shall return a handle to `instance`]
XinZhangMS 0:f7f1f0d76dd6 1596 handle = (AMQP_MESSENGER_HANDLE)instance;
XinZhangMS 0:f7f1f0d76dd6 1597 }
XinZhangMS 0:f7f1f0d76dd6 1598 }
XinZhangMS 0:f7f1f0d76dd6 1599 }
XinZhangMS 0:f7f1f0d76dd6 1600
XinZhangMS 0:f7f1f0d76dd6 1601 if (handle == NULL)
XinZhangMS 0:f7f1f0d76dd6 1602 {
XinZhangMS 0:f7f1f0d76dd6 1603 amqp_messenger_destroy((AMQP_MESSENGER_HANDLE)instance);
XinZhangMS 0:f7f1f0d76dd6 1604 }
XinZhangMS 0:f7f1f0d76dd6 1605 }
XinZhangMS 0:f7f1f0d76dd6 1606
XinZhangMS 0:f7f1f0d76dd6 1607 return handle;
XinZhangMS 0:f7f1f0d76dd6 1608 }
XinZhangMS 0:f7f1f0d76dd6 1609
XinZhangMS 0:f7f1f0d76dd6 1610 int amqp_messenger_set_option(AMQP_MESSENGER_HANDLE messenger_handle, const char* name, void* value)
XinZhangMS 0:f7f1f0d76dd6 1611 {
XinZhangMS 0:f7f1f0d76dd6 1612 int result;
XinZhangMS 0:f7f1f0d76dd6 1613
XinZhangMS 0:f7f1f0d76dd6 1614 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_127: [If `messenger_handle` or `name` or `value` is NULL, amqp_messenger_set_option shall fail and return a non-zero value]
XinZhangMS 0:f7f1f0d76dd6 1615 if (messenger_handle == NULL || name == NULL || value == NULL)
XinZhangMS 0:f7f1f0d76dd6 1616 {
XinZhangMS 0:f7f1f0d76dd6 1617 LogError("Invalid argument (messenger_handle=%p, name=%p, value=%p)",
XinZhangMS 0:f7f1f0d76dd6 1618 messenger_handle, name, value);
XinZhangMS 0:f7f1f0d76dd6 1619 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1620 }
XinZhangMS 0:f7f1f0d76dd6 1621 else
XinZhangMS 0:f7f1f0d76dd6 1622 {
XinZhangMS 0:f7f1f0d76dd6 1623 AMQP_MESSENGER_INSTANCE* instance = (AMQP_MESSENGER_INSTANCE*)messenger_handle;
XinZhangMS 0:f7f1f0d76dd6 1624
XinZhangMS 0:f7f1f0d76dd6 1625 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_128: [If name matches AMQP_MESSENGER_OPTION_EVENT_SEND_TIMEOUT_SECS, `value` shall be set on `instance->send_queue` using message_queue_set_max_message_enqueued_time_secs()]
XinZhangMS 0:f7f1f0d76dd6 1626 if (strcmp(AMQP_MESSENGER_OPTION_EVENT_SEND_TIMEOUT_SECS, name) == 0)
XinZhangMS 0:f7f1f0d76dd6 1627 {
XinZhangMS 0:f7f1f0d76dd6 1628 if (message_queue_set_max_message_enqueued_time_secs(instance->send_queue, *(size_t*)value) != 0)
XinZhangMS 0:f7f1f0d76dd6 1629 {
XinZhangMS 0:f7f1f0d76dd6 1630 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_129: [If message_queue_set_max_message_enqueued_time_secs() fails, amqp_messenger_set_option() shall fail and return a non-zero value]
XinZhangMS 0:f7f1f0d76dd6 1631 LogError("Failed setting option %s", AMQP_MESSENGER_OPTION_EVENT_SEND_TIMEOUT_SECS);
XinZhangMS 0:f7f1f0d76dd6 1632 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1633 }
XinZhangMS 0:f7f1f0d76dd6 1634 else
XinZhangMS 0:f7f1f0d76dd6 1635 {
XinZhangMS 0:f7f1f0d76dd6 1636 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_131: [If no errors occur, amqp_messenger_set_option shall return 0]
XinZhangMS 0:f7f1f0d76dd6 1637 result = RESULT_OK;
XinZhangMS 0:f7f1f0d76dd6 1638 }
XinZhangMS 0:f7f1f0d76dd6 1639 }
XinZhangMS 0:f7f1f0d76dd6 1640 else
XinZhangMS 0:f7f1f0d76dd6 1641 {
XinZhangMS 0:f7f1f0d76dd6 1642 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_130: [If `name` does not match any supported option, amqp_messenger_set_option() shall fail and return a non-zero value]
XinZhangMS 0:f7f1f0d76dd6 1643 LogError("Invalid argument (option '%s' is not valid)", name);
XinZhangMS 0:f7f1f0d76dd6 1644 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1645 }
XinZhangMS 0:f7f1f0d76dd6 1646 }
XinZhangMS 0:f7f1f0d76dd6 1647
XinZhangMS 0:f7f1f0d76dd6 1648 return result;
XinZhangMS 0:f7f1f0d76dd6 1649 }
XinZhangMS 0:f7f1f0d76dd6 1650
XinZhangMS 0:f7f1f0d76dd6 1651 OPTIONHANDLER_HANDLE amqp_messenger_retrieve_options(AMQP_MESSENGER_HANDLE messenger_handle)
XinZhangMS 0:f7f1f0d76dd6 1652 {
XinZhangMS 0:f7f1f0d76dd6 1653 OPTIONHANDLER_HANDLE result;
XinZhangMS 0:f7f1f0d76dd6 1654
XinZhangMS 0:f7f1f0d76dd6 1655 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_132: [If `messenger_handle` is NULL, amqp_messenger_retrieve_options shall fail and return NULL]
XinZhangMS 0:f7f1f0d76dd6 1656 if (messenger_handle == NULL)
XinZhangMS 0:f7f1f0d76dd6 1657 {
XinZhangMS 0:f7f1f0d76dd6 1658 LogError("Invalid argument (messenger_handle is NULL)");
XinZhangMS 0:f7f1f0d76dd6 1659 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 1660 }
XinZhangMS 0:f7f1f0d76dd6 1661 else
XinZhangMS 0:f7f1f0d76dd6 1662 {
XinZhangMS 0:f7f1f0d76dd6 1663 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_133: [An OPTIONHANDLER_HANDLE instance shall be created using OptionHandler_Create]
XinZhangMS 0:f7f1f0d76dd6 1664 result = OptionHandler_Create(amqp_messenger_clone_option, amqp_messenger_destroy_option, (pfSetOption)amqp_messenger_set_option);
XinZhangMS 0:f7f1f0d76dd6 1665
XinZhangMS 0:f7f1f0d76dd6 1666 if (result == NULL)
XinZhangMS 0:f7f1f0d76dd6 1667 {
XinZhangMS 0:f7f1f0d76dd6 1668 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_134: [If an OPTIONHANDLER_HANDLE instance fails to be created, amqp_messenger_retrieve_options shall fail and return NULL]
XinZhangMS 0:f7f1f0d76dd6 1669 LogError("Failed to retrieve options from messenger instance (OptionHandler_Create failed)");
XinZhangMS 0:f7f1f0d76dd6 1670 }
XinZhangMS 0:f7f1f0d76dd6 1671 else
XinZhangMS 0:f7f1f0d76dd6 1672 {
XinZhangMS 0:f7f1f0d76dd6 1673 AMQP_MESSENGER_INSTANCE* instance = (AMQP_MESSENGER_INSTANCE*)messenger_handle;
XinZhangMS 0:f7f1f0d76dd6 1674 OPTIONHANDLER_HANDLE mq_options;
XinZhangMS 0:f7f1f0d76dd6 1675
XinZhangMS 0:f7f1f0d76dd6 1676 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_135: [`instance->send_queue` options shall be retrieved using message_queue_retrieve_options()]
XinZhangMS 0:f7f1f0d76dd6 1677 if ((mq_options = message_queue_retrieve_options(instance->send_queue)) == NULL)
XinZhangMS 0:f7f1f0d76dd6 1678 {
XinZhangMS 0:f7f1f0d76dd6 1679 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_136: [If message_queue_retrieve_options() fails, amqp_messenger_retrieve_options shall fail and return NULL]
XinZhangMS 0:f7f1f0d76dd6 1680 LogError("failed to retrieve options from send queue)");
XinZhangMS 0:f7f1f0d76dd6 1681 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_139: [If amqp_messenger_retrieve_options fails, any allocated memory shall be freed]
XinZhangMS 0:f7f1f0d76dd6 1682 OptionHandler_Destroy(result);
XinZhangMS 0:f7f1f0d76dd6 1683 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 1684 }
XinZhangMS 0:f7f1f0d76dd6 1685 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_137: [Each option of `instance` shall be added to the OPTIONHANDLER_HANDLE instance using OptionHandler_AddOption]
XinZhangMS 0:f7f1f0d76dd6 1686 else if (OptionHandler_AddOption(result, MESSENGER_SAVED_MQ_OPTIONS, (void*)mq_options) != OPTIONHANDLER_OK)
XinZhangMS 0:f7f1f0d76dd6 1687 {
XinZhangMS 0:f7f1f0d76dd6 1688 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_138: [If OptionHandler_AddOption fails, amqp_messenger_retrieve_options shall fail and return NULL]
XinZhangMS 0:f7f1f0d76dd6 1689 LogError("failed adding option '%s'", MESSENGER_SAVED_MQ_OPTIONS);
XinZhangMS 0:f7f1f0d76dd6 1690 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_139: [If amqp_messenger_retrieve_options fails, any allocated memory shall be freed]
XinZhangMS 0:f7f1f0d76dd6 1691 OptionHandler_Destroy(mq_options);
XinZhangMS 0:f7f1f0d76dd6 1692 OptionHandler_Destroy(result);
XinZhangMS 0:f7f1f0d76dd6 1693 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 1694 }
XinZhangMS 0:f7f1f0d76dd6 1695 }
XinZhangMS 0:f7f1f0d76dd6 1696 }
XinZhangMS 0:f7f1f0d76dd6 1697
XinZhangMS 0:f7f1f0d76dd6 1698 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_140: [If no failures occur, amqp_messenger_retrieve_options shall return the OPTIONHANDLER_HANDLE instance]
XinZhangMS 0:f7f1f0d76dd6 1699 return result;
XinZhangMS 0:f7f1f0d76dd6 1700 }
XinZhangMS 0:f7f1f0d76dd6 1701
XinZhangMS 0:f7f1f0d76dd6 1702 void amqp_messenger_destroy_disposition_info(AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO* disposition_info)
XinZhangMS 0:f7f1f0d76dd6 1703 {
XinZhangMS 0:f7f1f0d76dd6 1704 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_132: [If `disposition_info` is NULL, amqp_messenger_destroy_disposition_info shall return immediately]
XinZhangMS 0:f7f1f0d76dd6 1705 if (disposition_info == NULL)
XinZhangMS 0:f7f1f0d76dd6 1706 {
XinZhangMS 0:f7f1f0d76dd6 1707 LogError("Invalid argument (disposition_info is NULL)");
XinZhangMS 0:f7f1f0d76dd6 1708 }
XinZhangMS 0:f7f1f0d76dd6 1709 else
XinZhangMS 0:f7f1f0d76dd6 1710 {
XinZhangMS 0:f7f1f0d76dd6 1711 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_132: [All memory allocated for `disposition_info` shall be released]
XinZhangMS 0:f7f1f0d76dd6 1712 destroy_message_disposition_info(disposition_info);
XinZhangMS 0:f7f1f0d76dd6 1713 }
XinZhangMS 0:f7f1f0d76dd6 1714 }