Demo using MBED TLS

Dependencies:   EthernetInterface NTPClient iothub_amqp_transport iothub_client mbed-rtos mbed

Fork of iothub_client_sample_amqp by Azure IoT

Committer:
markrad
Date:
Thu Jan 05 00:20:03 2017 +0000
Revision:
58:f50b97b08851
Sample using MBED TLS

Who changed what in which revision?

UserRevisionLine numberNew contents of line
markrad 58:f50b97b08851 1 // Copyright (c) Microsoft. All rights reserved.
markrad 58:f50b97b08851 2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
markrad 58:f50b97b08851 3
markrad 58:f50b97b08851 4 #include <stdlib.h>
markrad 58:f50b97b08851 5 #ifdef _CRTDBG_MAP_ALLOC
markrad 58:f50b97b08851 6 #include <crtdbg.h>
markrad 58:f50b97b08851 7 #endif
markrad 58:f50b97b08851 8 #include <stdbool.h>
markrad 58:f50b97b08851 9 #include <string.h>
markrad 58:f50b97b08851 10 #include "azure_c_shared_utility/xlogging.h"
markrad 58:f50b97b08851 11 #include "azure_uamqp_c/message_sender.h"
markrad 58:f50b97b08851 12 #include "azure_uamqp_c/amqpalloc.h"
markrad 58:f50b97b08851 13 #include "azure_uamqp_c/amqpvalue_to_string.h"
markrad 58:f50b97b08851 14
markrad 58:f50b97b08851 15 typedef enum MESSAGE_SEND_STATE_TAG
markrad 58:f50b97b08851 16 {
markrad 58:f50b97b08851 17 MESSAGE_SEND_STATE_NOT_SENT,
markrad 58:f50b97b08851 18 MESSAGE_SEND_STATE_PENDING
markrad 58:f50b97b08851 19 } MESSAGE_SEND_STATE;
markrad 58:f50b97b08851 20
markrad 58:f50b97b08851 21 typedef enum SEND_ONE_MESSAGE_RESULT_TAG
markrad 58:f50b97b08851 22 {
markrad 58:f50b97b08851 23 SEND_ONE_MESSAGE_OK,
markrad 58:f50b97b08851 24 SEND_ONE_MESSAGE_ERROR,
markrad 58:f50b97b08851 25 SEND_ONE_MESSAGE_BUSY
markrad 58:f50b97b08851 26 } SEND_ONE_MESSAGE_RESULT;
markrad 58:f50b97b08851 27
markrad 58:f50b97b08851 28 typedef struct MESSAGE_WITH_CALLBACK_TAG
markrad 58:f50b97b08851 29 {
markrad 58:f50b97b08851 30 MESSAGE_HANDLE message;
markrad 58:f50b97b08851 31 ON_MESSAGE_SEND_COMPLETE on_message_send_complete;
markrad 58:f50b97b08851 32 void* context;
markrad 58:f50b97b08851 33 MESSAGE_SENDER_HANDLE message_sender;
markrad 58:f50b97b08851 34 MESSAGE_SEND_STATE message_send_state;
markrad 58:f50b97b08851 35 } MESSAGE_WITH_CALLBACK;
markrad 58:f50b97b08851 36
markrad 58:f50b97b08851 37 typedef struct MESSAGE_SENDER_INSTANCE_TAG
markrad 58:f50b97b08851 38 {
markrad 58:f50b97b08851 39 LINK_HANDLE link;
markrad 58:f50b97b08851 40 size_t message_count;
markrad 58:f50b97b08851 41 MESSAGE_WITH_CALLBACK** messages;
markrad 58:f50b97b08851 42 MESSAGE_SENDER_STATE message_sender_state;
markrad 58:f50b97b08851 43 ON_MESSAGE_SENDER_STATE_CHANGED on_message_sender_state_changed;
markrad 58:f50b97b08851 44 void* on_message_sender_state_changed_context;
markrad 58:f50b97b08851 45 unsigned int is_trace_on : 1;
markrad 58:f50b97b08851 46 } MESSAGE_SENDER_INSTANCE;
markrad 58:f50b97b08851 47
markrad 58:f50b97b08851 48 static void remove_pending_message_by_index(MESSAGE_SENDER_INSTANCE* message_sender_instance, size_t index)
markrad 58:f50b97b08851 49 {
markrad 58:f50b97b08851 50 MESSAGE_WITH_CALLBACK** new_messages;
markrad 58:f50b97b08851 51
markrad 58:f50b97b08851 52 if (message_sender_instance->messages[index]->message != NULL)
markrad 58:f50b97b08851 53 {
markrad 58:f50b97b08851 54 message_destroy(message_sender_instance->messages[index]->message);
markrad 58:f50b97b08851 55 message_sender_instance->messages[index]->message = NULL;
markrad 58:f50b97b08851 56 }
markrad 58:f50b97b08851 57
markrad 58:f50b97b08851 58 amqpalloc_free(message_sender_instance->messages[index]);
markrad 58:f50b97b08851 59
markrad 58:f50b97b08851 60 if (message_sender_instance->message_count - index > 1)
markrad 58:f50b97b08851 61 {
markrad 58:f50b97b08851 62 (void)memmove(&message_sender_instance->messages[index], &message_sender_instance->messages[index + 1], sizeof(MESSAGE_WITH_CALLBACK*) * (message_sender_instance->message_count - index - 1));
markrad 58:f50b97b08851 63 }
markrad 58:f50b97b08851 64
markrad 58:f50b97b08851 65 message_sender_instance->message_count--;
markrad 58:f50b97b08851 66
markrad 58:f50b97b08851 67 if (message_sender_instance->message_count > 0)
markrad 58:f50b97b08851 68 {
markrad 58:f50b97b08851 69 new_messages = (MESSAGE_WITH_CALLBACK**)amqpalloc_realloc(message_sender_instance->messages, sizeof(MESSAGE_WITH_CALLBACK*) * (message_sender_instance->message_count));
markrad 58:f50b97b08851 70 if (new_messages != NULL)
markrad 58:f50b97b08851 71 {
markrad 58:f50b97b08851 72 message_sender_instance->messages = new_messages;
markrad 58:f50b97b08851 73 }
markrad 58:f50b97b08851 74 }
markrad 58:f50b97b08851 75 else
markrad 58:f50b97b08851 76 {
markrad 58:f50b97b08851 77 amqpalloc_free(message_sender_instance->messages);
markrad 58:f50b97b08851 78 message_sender_instance->messages = NULL;
markrad 58:f50b97b08851 79 }
markrad 58:f50b97b08851 80 }
markrad 58:f50b97b08851 81
markrad 58:f50b97b08851 82 static void remove_pending_message(MESSAGE_SENDER_INSTANCE* message_sender_instance, MESSAGE_WITH_CALLBACK* message_with_callback)
markrad 58:f50b97b08851 83 {
markrad 58:f50b97b08851 84 size_t i;
markrad 58:f50b97b08851 85
markrad 58:f50b97b08851 86 for (i = 0; i < message_sender_instance->message_count; i++)
markrad 58:f50b97b08851 87 {
markrad 58:f50b97b08851 88 if (message_sender_instance->messages[i] == message_with_callback)
markrad 58:f50b97b08851 89 {
markrad 58:f50b97b08851 90 remove_pending_message_by_index(message_sender_instance, i);
markrad 58:f50b97b08851 91 break;
markrad 58:f50b97b08851 92 }
markrad 58:f50b97b08851 93 }
markrad 58:f50b97b08851 94 }
markrad 58:f50b97b08851 95
markrad 58:f50b97b08851 96 static void on_delivery_settled(void* context, delivery_number delivery_no, AMQP_VALUE delivery_state)
markrad 58:f50b97b08851 97 {
markrad 58:f50b97b08851 98 MESSAGE_WITH_CALLBACK* message_with_callback = (MESSAGE_WITH_CALLBACK*)context;
markrad 58:f50b97b08851 99 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_with_callback->message_sender;
markrad 58:f50b97b08851 100 (void)delivery_no;
markrad 58:f50b97b08851 101
markrad 58:f50b97b08851 102 if (message_with_callback->on_message_send_complete != NULL)
markrad 58:f50b97b08851 103 {
markrad 58:f50b97b08851 104 AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(delivery_state);
markrad 58:f50b97b08851 105 if (descriptor == NULL)
markrad 58:f50b97b08851 106 {
markrad 58:f50b97b08851 107 LogError("Error getting descriptor for delivery state");
markrad 58:f50b97b08851 108 }
markrad 58:f50b97b08851 109 else
markrad 58:f50b97b08851 110 {
markrad 58:f50b97b08851 111 MESSAGE_SEND_RESULT message_send_result;
markrad 58:f50b97b08851 112
markrad 58:f50b97b08851 113 if ((delivery_state == NULL) ||
markrad 58:f50b97b08851 114 (is_accepted_type_by_descriptor(descriptor)))
markrad 58:f50b97b08851 115 {
markrad 58:f50b97b08851 116 message_send_result = MESSAGE_SEND_OK;
markrad 58:f50b97b08851 117 }
markrad 58:f50b97b08851 118 else
markrad 58:f50b97b08851 119 {
markrad 58:f50b97b08851 120 message_send_result = MESSAGE_SEND_ERROR;
markrad 58:f50b97b08851 121 }
markrad 58:f50b97b08851 122
markrad 58:f50b97b08851 123 message_with_callback->on_message_send_complete(message_with_callback->context, message_send_result);
markrad 58:f50b97b08851 124 }
markrad 58:f50b97b08851 125 }
markrad 58:f50b97b08851 126
markrad 58:f50b97b08851 127 remove_pending_message(message_sender_instance, message_with_callback);
markrad 58:f50b97b08851 128 }
markrad 58:f50b97b08851 129
markrad 58:f50b97b08851 130 static int encode_bytes(void* context, const unsigned char* bytes, size_t length)
markrad 58:f50b97b08851 131 {
markrad 58:f50b97b08851 132 PAYLOAD* payload = (PAYLOAD*)context;
markrad 58:f50b97b08851 133 (void)memcpy((unsigned char*)payload->bytes + payload->length, bytes, length);
markrad 58:f50b97b08851 134 payload->length += length;
markrad 58:f50b97b08851 135 return 0;
markrad 58:f50b97b08851 136 }
markrad 58:f50b97b08851 137
markrad 58:f50b97b08851 138 static void log_message_chunk(MESSAGE_SENDER_INSTANCE* message_sender_instance, const char* name, AMQP_VALUE value)
markrad 58:f50b97b08851 139 {
markrad 58:f50b97b08851 140 #ifdef NO_LOGGING
markrad 58:f50b97b08851 141 UNUSED(message_sender_instance);
markrad 58:f50b97b08851 142 UNUSED(name);
markrad 58:f50b97b08851 143 UNUSED(value);
markrad 58:f50b97b08851 144 #else
markrad 58:f50b97b08851 145 if (xlogging_get_log_function() != NULL && message_sender_instance->is_trace_on == 1)
markrad 58:f50b97b08851 146 {
markrad 58:f50b97b08851 147 char* value_as_string = NULL;
markrad 58:f50b97b08851 148 LOG(LOG_TRACE, 0, "%s", name);
markrad 58:f50b97b08851 149 LOG(LOG_TRACE, 0, "%s", (value_as_string = amqpvalue_to_string(value)));
markrad 58:f50b97b08851 150 if (value_as_string != NULL)
markrad 58:f50b97b08851 151 {
markrad 58:f50b97b08851 152 amqpalloc_free(value_as_string);
markrad 58:f50b97b08851 153 }
markrad 58:f50b97b08851 154 }
markrad 58:f50b97b08851 155 #endif
markrad 58:f50b97b08851 156 }
markrad 58:f50b97b08851 157
markrad 58:f50b97b08851 158 static SEND_ONE_MESSAGE_RESULT send_one_message(MESSAGE_SENDER_INSTANCE* message_sender_instance, MESSAGE_WITH_CALLBACK* message_with_callback, MESSAGE_HANDLE message)
markrad 58:f50b97b08851 159 {
markrad 58:f50b97b08851 160 SEND_ONE_MESSAGE_RESULT result;
markrad 58:f50b97b08851 161
markrad 58:f50b97b08851 162 size_t encoded_size;
markrad 58:f50b97b08851 163 size_t total_encoded_size = 0;
markrad 58:f50b97b08851 164 MESSAGE_BODY_TYPE message_body_type;
markrad 58:f50b97b08851 165 message_format message_format;
markrad 58:f50b97b08851 166
markrad 58:f50b97b08851 167 if ((message_get_body_type(message, &message_body_type) != 0) ||
markrad 58:f50b97b08851 168 (message_get_message_format(message, &message_format) != 0))
markrad 58:f50b97b08851 169 {
markrad 58:f50b97b08851 170 result = SEND_ONE_MESSAGE_ERROR;
markrad 58:f50b97b08851 171 }
markrad 58:f50b97b08851 172 else
markrad 58:f50b97b08851 173 {
markrad 58:f50b97b08851 174 // header
markrad 58:f50b97b08851 175 HEADER_HANDLE header;
markrad 58:f50b97b08851 176 AMQP_VALUE header_amqp_value;
markrad 58:f50b97b08851 177 PROPERTIES_HANDLE properties;
markrad 58:f50b97b08851 178 AMQP_VALUE properties_amqp_value;
markrad 58:f50b97b08851 179 AMQP_VALUE application_properties;
markrad 58:f50b97b08851 180 AMQP_VALUE application_properties_value;
markrad 58:f50b97b08851 181 AMQP_VALUE body_amqp_value = NULL;
markrad 58:f50b97b08851 182 size_t body_data_count = 0;
markrad 58:f50b97b08851 183 AMQP_VALUE msg_annotations = NULL;
markrad 58:f50b97b08851 184
markrad 58:f50b97b08851 185 message_get_header(message, &header);
markrad 58:f50b97b08851 186 header_amqp_value = amqpvalue_create_header(header);
markrad 58:f50b97b08851 187 if (header != NULL)
markrad 58:f50b97b08851 188 {
markrad 58:f50b97b08851 189 amqpvalue_get_encoded_size(header_amqp_value, &encoded_size);
markrad 58:f50b97b08851 190 total_encoded_size += encoded_size;
markrad 58:f50b97b08851 191 }
markrad 58:f50b97b08851 192
markrad 58:f50b97b08851 193 // message annotations
markrad 58:f50b97b08851 194 if ((message_get_message_annotations(message, &msg_annotations) == 0) &&
markrad 58:f50b97b08851 195 (msg_annotations != NULL))
markrad 58:f50b97b08851 196 {
markrad 58:f50b97b08851 197 amqpvalue_get_encoded_size(msg_annotations, &encoded_size);
markrad 58:f50b97b08851 198 total_encoded_size += encoded_size;
markrad 58:f50b97b08851 199 }
markrad 58:f50b97b08851 200
markrad 58:f50b97b08851 201 // properties
markrad 58:f50b97b08851 202 message_get_properties(message, &properties);
markrad 58:f50b97b08851 203 properties_amqp_value = amqpvalue_create_properties(properties);
markrad 58:f50b97b08851 204 if (properties != NULL)
markrad 58:f50b97b08851 205 {
markrad 58:f50b97b08851 206 amqpvalue_get_encoded_size(properties_amqp_value, &encoded_size);
markrad 58:f50b97b08851 207 total_encoded_size += encoded_size;
markrad 58:f50b97b08851 208 }
markrad 58:f50b97b08851 209
markrad 58:f50b97b08851 210 // application properties
markrad 58:f50b97b08851 211 message_get_application_properties(message, &application_properties);
markrad 58:f50b97b08851 212 application_properties_value = amqpvalue_create_application_properties(application_properties);
markrad 58:f50b97b08851 213 if (application_properties != NULL)
markrad 58:f50b97b08851 214 {
markrad 58:f50b97b08851 215 amqpvalue_get_encoded_size(application_properties_value, &encoded_size);
markrad 58:f50b97b08851 216 total_encoded_size += encoded_size;
markrad 58:f50b97b08851 217 }
markrad 58:f50b97b08851 218
markrad 58:f50b97b08851 219 result = SEND_ONE_MESSAGE_OK;
markrad 58:f50b97b08851 220
markrad 58:f50b97b08851 221 // body - amqp data
markrad 58:f50b97b08851 222 switch (message_body_type)
markrad 58:f50b97b08851 223 {
markrad 58:f50b97b08851 224 default:
markrad 58:f50b97b08851 225 result = SEND_ONE_MESSAGE_ERROR;
markrad 58:f50b97b08851 226 break;
markrad 58:f50b97b08851 227
markrad 58:f50b97b08851 228 case MESSAGE_BODY_TYPE_VALUE:
markrad 58:f50b97b08851 229 {
markrad 58:f50b97b08851 230 AMQP_VALUE message_body_amqp_value;
markrad 58:f50b97b08851 231 if (message_get_inplace_body_amqp_value(message, &message_body_amqp_value) != 0)
markrad 58:f50b97b08851 232 {
markrad 58:f50b97b08851 233 result = SEND_ONE_MESSAGE_ERROR;
markrad 58:f50b97b08851 234 }
markrad 58:f50b97b08851 235 else
markrad 58:f50b97b08851 236 {
markrad 58:f50b97b08851 237 body_amqp_value = amqpvalue_create_amqp_value(message_body_amqp_value);
markrad 58:f50b97b08851 238 if ((body_amqp_value == NULL) ||
markrad 58:f50b97b08851 239 (amqpvalue_get_encoded_size(body_amqp_value, &encoded_size) != 0))
markrad 58:f50b97b08851 240 {
markrad 58:f50b97b08851 241 result = SEND_ONE_MESSAGE_ERROR;
markrad 58:f50b97b08851 242 }
markrad 58:f50b97b08851 243 else
markrad 58:f50b97b08851 244 {
markrad 58:f50b97b08851 245 total_encoded_size += encoded_size;
markrad 58:f50b97b08851 246 }
markrad 58:f50b97b08851 247 }
markrad 58:f50b97b08851 248
markrad 58:f50b97b08851 249 break;
markrad 58:f50b97b08851 250 }
markrad 58:f50b97b08851 251
markrad 58:f50b97b08851 252 case MESSAGE_BODY_TYPE_DATA:
markrad 58:f50b97b08851 253 {
markrad 58:f50b97b08851 254 BINARY_DATA binary_data;
markrad 58:f50b97b08851 255 size_t i;
markrad 58:f50b97b08851 256
markrad 58:f50b97b08851 257 if (message_get_body_amqp_data_count(message, &body_data_count) != 0)
markrad 58:f50b97b08851 258 {
markrad 58:f50b97b08851 259 result = SEND_ONE_MESSAGE_ERROR;
markrad 58:f50b97b08851 260 }
markrad 58:f50b97b08851 261 else
markrad 58:f50b97b08851 262 {
markrad 58:f50b97b08851 263 for (i = 0; i < body_data_count; i++)
markrad 58:f50b97b08851 264 {
markrad 58:f50b97b08851 265 if (message_get_body_amqp_data(message, i, &binary_data) != 0)
markrad 58:f50b97b08851 266 {
markrad 58:f50b97b08851 267 result = SEND_ONE_MESSAGE_ERROR;
markrad 58:f50b97b08851 268 }
markrad 58:f50b97b08851 269 else
markrad 58:f50b97b08851 270 {
markrad 58:f50b97b08851 271 amqp_binary binary_value;
markrad 58:f50b97b08851 272 binary_value.bytes = binary_data.bytes;
markrad 58:f50b97b08851 273 binary_value.length = (uint32_t)binary_data.length;
markrad 58:f50b97b08851 274 AMQP_VALUE body_amqp_data = amqpvalue_create_data(binary_value);
markrad 58:f50b97b08851 275 if (body_amqp_data == NULL)
markrad 58:f50b97b08851 276 {
markrad 58:f50b97b08851 277 result = SEND_ONE_MESSAGE_ERROR;
markrad 58:f50b97b08851 278 }
markrad 58:f50b97b08851 279 else
markrad 58:f50b97b08851 280 {
markrad 58:f50b97b08851 281 if (amqpvalue_get_encoded_size(body_amqp_data, &encoded_size) != 0)
markrad 58:f50b97b08851 282 {
markrad 58:f50b97b08851 283 result = SEND_ONE_MESSAGE_ERROR;
markrad 58:f50b97b08851 284 }
markrad 58:f50b97b08851 285 else
markrad 58:f50b97b08851 286 {
markrad 58:f50b97b08851 287 total_encoded_size += encoded_size;
markrad 58:f50b97b08851 288 }
markrad 58:f50b97b08851 289
markrad 58:f50b97b08851 290 amqpvalue_destroy(body_amqp_data);
markrad 58:f50b97b08851 291 }
markrad 58:f50b97b08851 292 }
markrad 58:f50b97b08851 293 }
markrad 58:f50b97b08851 294 }
markrad 58:f50b97b08851 295 break;
markrad 58:f50b97b08851 296 }
markrad 58:f50b97b08851 297 }
markrad 58:f50b97b08851 298
markrad 58:f50b97b08851 299 if (result == 0)
markrad 58:f50b97b08851 300 {
markrad 58:f50b97b08851 301 void* data_bytes = amqpalloc_malloc(total_encoded_size);
markrad 58:f50b97b08851 302 PAYLOAD payload;
markrad 58:f50b97b08851 303 payload.bytes = data_bytes;
markrad 58:f50b97b08851 304 payload.length = 0;
markrad 58:f50b97b08851 305 result = SEND_ONE_MESSAGE_OK;
markrad 58:f50b97b08851 306
markrad 58:f50b97b08851 307 if (header != NULL)
markrad 58:f50b97b08851 308 {
markrad 58:f50b97b08851 309 if (amqpvalue_encode(header_amqp_value, encode_bytes, &payload) != 0)
markrad 58:f50b97b08851 310 {
markrad 58:f50b97b08851 311 result = SEND_ONE_MESSAGE_ERROR;
markrad 58:f50b97b08851 312 }
markrad 58:f50b97b08851 313
markrad 58:f50b97b08851 314 log_message_chunk(message_sender_instance, "Header:", header_amqp_value);
markrad 58:f50b97b08851 315 }
markrad 58:f50b97b08851 316
markrad 58:f50b97b08851 317 if ((result == SEND_ONE_MESSAGE_OK) && (msg_annotations != NULL))
markrad 58:f50b97b08851 318 {
markrad 58:f50b97b08851 319 if (amqpvalue_encode(msg_annotations, encode_bytes, &payload) != 0)
markrad 58:f50b97b08851 320 {
markrad 58:f50b97b08851 321 result = SEND_ONE_MESSAGE_ERROR;
markrad 58:f50b97b08851 322 }
markrad 58:f50b97b08851 323
markrad 58:f50b97b08851 324 log_message_chunk(message_sender_instance, "Message Annotations:", msg_annotations);
markrad 58:f50b97b08851 325 }
markrad 58:f50b97b08851 326
markrad 58:f50b97b08851 327 if ((result == SEND_ONE_MESSAGE_OK) && (properties != NULL))
markrad 58:f50b97b08851 328 {
markrad 58:f50b97b08851 329 if (amqpvalue_encode(properties_amqp_value, encode_bytes, &payload) != 0)
markrad 58:f50b97b08851 330 {
markrad 58:f50b97b08851 331 result = SEND_ONE_MESSAGE_ERROR;
markrad 58:f50b97b08851 332 }
markrad 58:f50b97b08851 333
markrad 58:f50b97b08851 334 log_message_chunk(message_sender_instance, "Properties:", properties_amqp_value);
markrad 58:f50b97b08851 335 }
markrad 58:f50b97b08851 336
markrad 58:f50b97b08851 337 if ((result == SEND_ONE_MESSAGE_OK) && (application_properties != NULL))
markrad 58:f50b97b08851 338 {
markrad 58:f50b97b08851 339 if (amqpvalue_encode(application_properties_value, encode_bytes, &payload) != 0)
markrad 58:f50b97b08851 340 {
markrad 58:f50b97b08851 341 result = SEND_ONE_MESSAGE_ERROR;
markrad 58:f50b97b08851 342 }
markrad 58:f50b97b08851 343
markrad 58:f50b97b08851 344 log_message_chunk(message_sender_instance, "Application properties:", application_properties_value);
markrad 58:f50b97b08851 345 }
markrad 58:f50b97b08851 346
markrad 58:f50b97b08851 347 if (result == SEND_ONE_MESSAGE_OK)
markrad 58:f50b97b08851 348 {
markrad 58:f50b97b08851 349 switch (message_body_type)
markrad 58:f50b97b08851 350 {
markrad 58:f50b97b08851 351 case MESSAGE_BODY_TYPE_VALUE:
markrad 58:f50b97b08851 352 {
markrad 58:f50b97b08851 353 if (amqpvalue_encode(body_amqp_value, encode_bytes, &payload) != 0)
markrad 58:f50b97b08851 354 {
markrad 58:f50b97b08851 355 result = SEND_ONE_MESSAGE_ERROR;
markrad 58:f50b97b08851 356 }
markrad 58:f50b97b08851 357
markrad 58:f50b97b08851 358 log_message_chunk(message_sender_instance, "Body - amqp value:", body_amqp_value);
markrad 58:f50b97b08851 359 break;
markrad 58:f50b97b08851 360 }
markrad 58:f50b97b08851 361 case MESSAGE_BODY_TYPE_DATA:
markrad 58:f50b97b08851 362 {
markrad 58:f50b97b08851 363 BINARY_DATA binary_data;
markrad 58:f50b97b08851 364 size_t i;
markrad 58:f50b97b08851 365
markrad 58:f50b97b08851 366 for (i = 0; i < body_data_count; i++)
markrad 58:f50b97b08851 367 {
markrad 58:f50b97b08851 368 if (message_get_body_amqp_data(message, i, &binary_data) != 0)
markrad 58:f50b97b08851 369 {
markrad 58:f50b97b08851 370 result = SEND_ONE_MESSAGE_ERROR;
markrad 58:f50b97b08851 371 }
markrad 58:f50b97b08851 372 else
markrad 58:f50b97b08851 373 {
markrad 58:f50b97b08851 374 amqp_binary binary_value;
markrad 58:f50b97b08851 375 binary_value.bytes = binary_data.bytes;
markrad 58:f50b97b08851 376 binary_value.length = (uint32_t)binary_data.length;
markrad 58:f50b97b08851 377 AMQP_VALUE body_amqp_data = amqpvalue_create_data(binary_value);
markrad 58:f50b97b08851 378 if (body_amqp_data == NULL)
markrad 58:f50b97b08851 379 {
markrad 58:f50b97b08851 380 result = SEND_ONE_MESSAGE_ERROR;
markrad 58:f50b97b08851 381 }
markrad 58:f50b97b08851 382 else
markrad 58:f50b97b08851 383 {
markrad 58:f50b97b08851 384 if (amqpvalue_encode(body_amqp_data, encode_bytes, &payload) != 0)
markrad 58:f50b97b08851 385 {
markrad 58:f50b97b08851 386 result = SEND_ONE_MESSAGE_ERROR;
markrad 58:f50b97b08851 387 break;
markrad 58:f50b97b08851 388 }
markrad 58:f50b97b08851 389
markrad 58:f50b97b08851 390 amqpvalue_destroy(body_amqp_data);
markrad 58:f50b97b08851 391 }
markrad 58:f50b97b08851 392 }
markrad 58:f50b97b08851 393 }
markrad 58:f50b97b08851 394 break;
markrad 58:f50b97b08851 395 }
markrad 58:f50b97b08851 396 }
markrad 58:f50b97b08851 397 }
markrad 58:f50b97b08851 398
markrad 58:f50b97b08851 399 if (result == SEND_ONE_MESSAGE_OK)
markrad 58:f50b97b08851 400 {
markrad 58:f50b97b08851 401 message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING;
markrad 58:f50b97b08851 402 switch (link_transfer(message_sender_instance->link, message_format, &payload, 1, on_delivery_settled, message_with_callback))
markrad 58:f50b97b08851 403 {
markrad 58:f50b97b08851 404 default:
markrad 58:f50b97b08851 405 case LINK_TRANSFER_ERROR:
markrad 58:f50b97b08851 406 if (message_with_callback->on_message_send_complete != NULL)
markrad 58:f50b97b08851 407 {
markrad 58:f50b97b08851 408 message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR);
markrad 58:f50b97b08851 409 }
markrad 58:f50b97b08851 410
markrad 58:f50b97b08851 411 result = SEND_ONE_MESSAGE_ERROR;
markrad 58:f50b97b08851 412 break;
markrad 58:f50b97b08851 413
markrad 58:f50b97b08851 414 case LINK_TRANSFER_BUSY:
markrad 58:f50b97b08851 415 message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
markrad 58:f50b97b08851 416 result = SEND_ONE_MESSAGE_BUSY;
markrad 58:f50b97b08851 417 break;
markrad 58:f50b97b08851 418
markrad 58:f50b97b08851 419 case LINK_TRANSFER_OK:
markrad 58:f50b97b08851 420 result = SEND_ONE_MESSAGE_OK;
markrad 58:f50b97b08851 421 break;
markrad 58:f50b97b08851 422 }
markrad 58:f50b97b08851 423 }
markrad 58:f50b97b08851 424
markrad 58:f50b97b08851 425 amqpalloc_free(data_bytes);
markrad 58:f50b97b08851 426
markrad 58:f50b97b08851 427 if (body_amqp_value != NULL)
markrad 58:f50b97b08851 428 {
markrad 58:f50b97b08851 429 amqpvalue_destroy(body_amqp_value);
markrad 58:f50b97b08851 430 }
markrad 58:f50b97b08851 431
markrad 58:f50b97b08851 432 amqpvalue_destroy(application_properties);
markrad 58:f50b97b08851 433 amqpvalue_destroy(application_properties_value);
markrad 58:f50b97b08851 434 amqpvalue_destroy(properties_amqp_value);
markrad 58:f50b97b08851 435 properties_destroy(properties);
markrad 58:f50b97b08851 436 }
markrad 58:f50b97b08851 437 }
markrad 58:f50b97b08851 438
markrad 58:f50b97b08851 439 return result;
markrad 58:f50b97b08851 440 }
markrad 58:f50b97b08851 441
markrad 58:f50b97b08851 442 static void send_all_pending_messages(MESSAGE_SENDER_INSTANCE* message_sender_instance)
markrad 58:f50b97b08851 443 {
markrad 58:f50b97b08851 444 size_t i;
markrad 58:f50b97b08851 445
markrad 58:f50b97b08851 446 for (i = 0; i < message_sender_instance->message_count; i++)
markrad 58:f50b97b08851 447 {
markrad 58:f50b97b08851 448 if (message_sender_instance->messages[i]->message_send_state == MESSAGE_SEND_STATE_NOT_SENT)
markrad 58:f50b97b08851 449 {
markrad 58:f50b97b08851 450 switch (send_one_message(message_sender_instance, message_sender_instance->messages[i], message_sender_instance->messages[i]->message))
markrad 58:f50b97b08851 451 {
markrad 58:f50b97b08851 452 default:
markrad 58:f50b97b08851 453 case SEND_ONE_MESSAGE_ERROR:
markrad 58:f50b97b08851 454 {
markrad 58:f50b97b08851 455 ON_MESSAGE_SEND_COMPLETE on_message_send_complete = message_sender_instance->messages[i]->on_message_send_complete;
markrad 58:f50b97b08851 456 void* context = message_sender_instance->messages[i]->context;
markrad 58:f50b97b08851 457 remove_pending_message_by_index(message_sender_instance, i);
markrad 58:f50b97b08851 458
markrad 58:f50b97b08851 459 on_message_send_complete(context, MESSAGE_SEND_ERROR);
markrad 58:f50b97b08851 460 i = message_sender_instance->message_count;
markrad 58:f50b97b08851 461 break;
markrad 58:f50b97b08851 462 }
markrad 58:f50b97b08851 463 case SEND_ONE_MESSAGE_BUSY:
markrad 58:f50b97b08851 464 i = message_sender_instance->message_count + 1;
markrad 58:f50b97b08851 465 break;
markrad 58:f50b97b08851 466
markrad 58:f50b97b08851 467 case SEND_ONE_MESSAGE_OK:
markrad 58:f50b97b08851 468 break;
markrad 58:f50b97b08851 469 }
markrad 58:f50b97b08851 470
markrad 58:f50b97b08851 471 i--;
markrad 58:f50b97b08851 472 }
markrad 58:f50b97b08851 473 }
markrad 58:f50b97b08851 474 }
markrad 58:f50b97b08851 475
markrad 58:f50b97b08851 476 static void set_message_sender_state(MESSAGE_SENDER_INSTANCE* message_sender_instance, MESSAGE_SENDER_STATE new_state)
markrad 58:f50b97b08851 477 {
markrad 58:f50b97b08851 478 MESSAGE_SENDER_STATE previous_state = message_sender_instance->message_sender_state;
markrad 58:f50b97b08851 479 message_sender_instance->message_sender_state = new_state;
markrad 58:f50b97b08851 480 if (message_sender_instance->on_message_sender_state_changed != NULL)
markrad 58:f50b97b08851 481 {
markrad 58:f50b97b08851 482 message_sender_instance->on_message_sender_state_changed(message_sender_instance->on_message_sender_state_changed_context, new_state, previous_state);
markrad 58:f50b97b08851 483 }
markrad 58:f50b97b08851 484 }
markrad 58:f50b97b08851 485
markrad 58:f50b97b08851 486 static void indicate_all_messages_as_error(MESSAGE_SENDER_INSTANCE* message_sender_instance)
markrad 58:f50b97b08851 487 {
markrad 58:f50b97b08851 488 size_t i;
markrad 58:f50b97b08851 489
markrad 58:f50b97b08851 490 for (i = 0; i < message_sender_instance->message_count; i++)
markrad 58:f50b97b08851 491 {
markrad 58:f50b97b08851 492 if (message_sender_instance->messages[i]->on_message_send_complete != NULL)
markrad 58:f50b97b08851 493 {
markrad 58:f50b97b08851 494 message_sender_instance->messages[i]->on_message_send_complete(message_sender_instance->messages[i]->context, MESSAGE_SEND_ERROR);
markrad 58:f50b97b08851 495 }
markrad 58:f50b97b08851 496
markrad 58:f50b97b08851 497 message_destroy(message_sender_instance->messages[i]->message);
markrad 58:f50b97b08851 498 amqpalloc_free(message_sender_instance->messages[i]);
markrad 58:f50b97b08851 499 }
markrad 58:f50b97b08851 500
markrad 58:f50b97b08851 501 if (message_sender_instance->messages != NULL)
markrad 58:f50b97b08851 502 {
markrad 58:f50b97b08851 503 message_sender_instance->message_count = 0;
markrad 58:f50b97b08851 504
markrad 58:f50b97b08851 505 amqpalloc_free(message_sender_instance->messages);
markrad 58:f50b97b08851 506 message_sender_instance->messages = NULL;
markrad 58:f50b97b08851 507 }
markrad 58:f50b97b08851 508 }
markrad 58:f50b97b08851 509
markrad 58:f50b97b08851 510 static void on_link_state_changed(void* context, LINK_STATE new_link_state, LINK_STATE previous_link_state)
markrad 58:f50b97b08851 511 {
markrad 58:f50b97b08851 512 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)context;
markrad 58:f50b97b08851 513 (void)previous_link_state;
markrad 58:f50b97b08851 514
markrad 58:f50b97b08851 515 switch (new_link_state)
markrad 58:f50b97b08851 516 {
markrad 58:f50b97b08851 517 case LINK_STATE_ATTACHED:
markrad 58:f50b97b08851 518 if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPENING)
markrad 58:f50b97b08851 519 {
markrad 58:f50b97b08851 520 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_OPEN);
markrad 58:f50b97b08851 521 }
markrad 58:f50b97b08851 522 break;
markrad 58:f50b97b08851 523 case LINK_STATE_DETACHED:
markrad 58:f50b97b08851 524 if ((message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPEN) ||
markrad 58:f50b97b08851 525 (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_CLOSING))
markrad 58:f50b97b08851 526 {
markrad 58:f50b97b08851 527 /* User initiated transition, we should be good */
markrad 58:f50b97b08851 528 indicate_all_messages_as_error(message_sender_instance);
markrad 58:f50b97b08851 529 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_IDLE);
markrad 58:f50b97b08851 530 }
markrad 58:f50b97b08851 531 else if (message_sender_instance->message_sender_state != MESSAGE_SENDER_STATE_IDLE)
markrad 58:f50b97b08851 532 {
markrad 58:f50b97b08851 533 /* Any other transition must be an error */
markrad 58:f50b97b08851 534 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
markrad 58:f50b97b08851 535 }
markrad 58:f50b97b08851 536 break;
markrad 58:f50b97b08851 537 case LINK_STATE_ERROR:
markrad 58:f50b97b08851 538 if (message_sender_instance->message_sender_state != MESSAGE_SENDER_STATE_ERROR)
markrad 58:f50b97b08851 539 {
markrad 58:f50b97b08851 540 indicate_all_messages_as_error(message_sender_instance);
markrad 58:f50b97b08851 541 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
markrad 58:f50b97b08851 542 }
markrad 58:f50b97b08851 543 break;
markrad 58:f50b97b08851 544 }
markrad 58:f50b97b08851 545 }
markrad 58:f50b97b08851 546
markrad 58:f50b97b08851 547 static void on_link_flow_on(void* context)
markrad 58:f50b97b08851 548 {
markrad 58:f50b97b08851 549 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)context;
markrad 58:f50b97b08851 550 send_all_pending_messages(message_sender_instance);
markrad 58:f50b97b08851 551 }
markrad 58:f50b97b08851 552
markrad 58:f50b97b08851 553 MESSAGE_SENDER_HANDLE messagesender_create(LINK_HANDLE link, ON_MESSAGE_SENDER_STATE_CHANGED on_message_sender_state_changed, void* context)
markrad 58:f50b97b08851 554 {
markrad 58:f50b97b08851 555 MESSAGE_SENDER_INSTANCE* result = amqpalloc_malloc(sizeof(MESSAGE_SENDER_INSTANCE));
markrad 58:f50b97b08851 556 if (result != NULL)
markrad 58:f50b97b08851 557 {
markrad 58:f50b97b08851 558 result->messages = NULL;
markrad 58:f50b97b08851 559 result->message_count = 0;
markrad 58:f50b97b08851 560 result->link = link;
markrad 58:f50b97b08851 561 result->on_message_sender_state_changed = on_message_sender_state_changed;
markrad 58:f50b97b08851 562 result->on_message_sender_state_changed_context = context;
markrad 58:f50b97b08851 563 result->message_sender_state = MESSAGE_SENDER_STATE_IDLE;
markrad 58:f50b97b08851 564 result->is_trace_on = 0;
markrad 58:f50b97b08851 565 }
markrad 58:f50b97b08851 566
markrad 58:f50b97b08851 567 return result;
markrad 58:f50b97b08851 568 }
markrad 58:f50b97b08851 569
markrad 58:f50b97b08851 570 void messagesender_destroy(MESSAGE_SENDER_HANDLE message_sender)
markrad 58:f50b97b08851 571 {
markrad 58:f50b97b08851 572 if (message_sender != NULL)
markrad 58:f50b97b08851 573 {
markrad 58:f50b97b08851 574 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
markrad 58:f50b97b08851 575
markrad 58:f50b97b08851 576 messagesender_close(message_sender_instance);
markrad 58:f50b97b08851 577
markrad 58:f50b97b08851 578 indicate_all_messages_as_error(message_sender_instance);
markrad 58:f50b97b08851 579
markrad 58:f50b97b08851 580 amqpalloc_free(message_sender);
markrad 58:f50b97b08851 581 }
markrad 58:f50b97b08851 582 }
markrad 58:f50b97b08851 583
markrad 58:f50b97b08851 584 int messagesender_open(MESSAGE_SENDER_HANDLE message_sender)
markrad 58:f50b97b08851 585 {
markrad 58:f50b97b08851 586 int result;
markrad 58:f50b97b08851 587
markrad 58:f50b97b08851 588 if (message_sender == NULL)
markrad 58:f50b97b08851 589 {
markrad 58:f50b97b08851 590 result = __LINE__;
markrad 58:f50b97b08851 591 }
markrad 58:f50b97b08851 592 else
markrad 58:f50b97b08851 593 {
markrad 58:f50b97b08851 594 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
markrad 58:f50b97b08851 595
markrad 58:f50b97b08851 596 if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_IDLE)
markrad 58:f50b97b08851 597 {
markrad 58:f50b97b08851 598 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_OPENING);
markrad 58:f50b97b08851 599 if (link_attach(message_sender_instance->link, NULL, on_link_state_changed, on_link_flow_on, message_sender_instance) != 0)
markrad 58:f50b97b08851 600 {
markrad 58:f50b97b08851 601 result = __LINE__;
markrad 58:f50b97b08851 602 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
markrad 58:f50b97b08851 603 }
markrad 58:f50b97b08851 604 else
markrad 58:f50b97b08851 605 {
markrad 58:f50b97b08851 606 result = 0;
markrad 58:f50b97b08851 607 }
markrad 58:f50b97b08851 608 }
markrad 58:f50b97b08851 609 else
markrad 58:f50b97b08851 610 {
markrad 58:f50b97b08851 611 result = 0;
markrad 58:f50b97b08851 612 }
markrad 58:f50b97b08851 613 }
markrad 58:f50b97b08851 614
markrad 58:f50b97b08851 615 return result;
markrad 58:f50b97b08851 616 }
markrad 58:f50b97b08851 617
markrad 58:f50b97b08851 618 int messagesender_close(MESSAGE_SENDER_HANDLE message_sender)
markrad 58:f50b97b08851 619 {
markrad 58:f50b97b08851 620 int result;
markrad 58:f50b97b08851 621
markrad 58:f50b97b08851 622 if (message_sender == NULL)
markrad 58:f50b97b08851 623 {
markrad 58:f50b97b08851 624 result = __LINE__;
markrad 58:f50b97b08851 625 }
markrad 58:f50b97b08851 626 else
markrad 58:f50b97b08851 627 {
markrad 58:f50b97b08851 628 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
markrad 58:f50b97b08851 629
markrad 58:f50b97b08851 630 if ((message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPENING) ||
markrad 58:f50b97b08851 631 (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPEN))
markrad 58:f50b97b08851 632 {
markrad 58:f50b97b08851 633 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_CLOSING);
markrad 58:f50b97b08851 634 if (link_detach(message_sender_instance->link, true) != 0)
markrad 58:f50b97b08851 635 {
markrad 58:f50b97b08851 636 result = __LINE__;
markrad 58:f50b97b08851 637 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
markrad 58:f50b97b08851 638 }
markrad 58:f50b97b08851 639 else
markrad 58:f50b97b08851 640 {
markrad 58:f50b97b08851 641 result = 0;
markrad 58:f50b97b08851 642 }
markrad 58:f50b97b08851 643 }
markrad 58:f50b97b08851 644 else
markrad 58:f50b97b08851 645 {
markrad 58:f50b97b08851 646 result = 0;
markrad 58:f50b97b08851 647 }
markrad 58:f50b97b08851 648 }
markrad 58:f50b97b08851 649
markrad 58:f50b97b08851 650 return result;
markrad 58:f50b97b08851 651 }
markrad 58:f50b97b08851 652
markrad 58:f50b97b08851 653 int messagesender_send(MESSAGE_SENDER_HANDLE message_sender, MESSAGE_HANDLE message, ON_MESSAGE_SEND_COMPLETE on_message_send_complete, void* callback_context)
markrad 58:f50b97b08851 654 {
markrad 58:f50b97b08851 655 int result;
markrad 58:f50b97b08851 656
markrad 58:f50b97b08851 657 if ((message_sender == NULL) ||
markrad 58:f50b97b08851 658 (message == NULL))
markrad 58:f50b97b08851 659 {
markrad 58:f50b97b08851 660 result = __LINE__;
markrad 58:f50b97b08851 661 }
markrad 58:f50b97b08851 662 else
markrad 58:f50b97b08851 663 {
markrad 58:f50b97b08851 664 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
markrad 58:f50b97b08851 665 if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_ERROR)
markrad 58:f50b97b08851 666 {
markrad 58:f50b97b08851 667 result = __LINE__;
markrad 58:f50b97b08851 668 }
markrad 58:f50b97b08851 669 else
markrad 58:f50b97b08851 670 {
markrad 58:f50b97b08851 671 MESSAGE_WITH_CALLBACK* message_with_callback = (MESSAGE_WITH_CALLBACK*)amqpalloc_malloc(sizeof(MESSAGE_WITH_CALLBACK));
markrad 58:f50b97b08851 672 if (message_with_callback == NULL)
markrad 58:f50b97b08851 673 {
markrad 58:f50b97b08851 674 result = __LINE__;
markrad 58:f50b97b08851 675 }
markrad 58:f50b97b08851 676 else
markrad 58:f50b97b08851 677 {
markrad 58:f50b97b08851 678 MESSAGE_WITH_CALLBACK** new_messages = (MESSAGE_WITH_CALLBACK**)amqpalloc_realloc(message_sender_instance->messages, sizeof(MESSAGE_WITH_CALLBACK*) * (message_sender_instance->message_count + 1));
markrad 58:f50b97b08851 679 if (new_messages == NULL)
markrad 58:f50b97b08851 680 {
markrad 58:f50b97b08851 681 amqpalloc_free(message_with_callback);
markrad 58:f50b97b08851 682 result = __LINE__;
markrad 58:f50b97b08851 683 }
markrad 58:f50b97b08851 684 else
markrad 58:f50b97b08851 685 {
markrad 58:f50b97b08851 686 result = 0;
markrad 58:f50b97b08851 687
markrad 58:f50b97b08851 688 message_sender_instance->messages = new_messages;
markrad 58:f50b97b08851 689 if (message_sender_instance->message_sender_state != MESSAGE_SENDER_STATE_OPEN)
markrad 58:f50b97b08851 690 {
markrad 58:f50b97b08851 691 message_with_callback->message = message_clone(message);
markrad 58:f50b97b08851 692 if (message_with_callback->message == NULL)
markrad 58:f50b97b08851 693 {
markrad 58:f50b97b08851 694 amqpalloc_free(message_with_callback);
markrad 58:f50b97b08851 695 result = __LINE__;
markrad 58:f50b97b08851 696 }
markrad 58:f50b97b08851 697
markrad 58:f50b97b08851 698 message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
markrad 58:f50b97b08851 699 }
markrad 58:f50b97b08851 700 else
markrad 58:f50b97b08851 701 {
markrad 58:f50b97b08851 702 message_with_callback->message = NULL;
markrad 58:f50b97b08851 703 message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING;
markrad 58:f50b97b08851 704 }
markrad 58:f50b97b08851 705
markrad 58:f50b97b08851 706 if (result == 0)
markrad 58:f50b97b08851 707 {
markrad 58:f50b97b08851 708 message_with_callback->on_message_send_complete = on_message_send_complete;
markrad 58:f50b97b08851 709 message_with_callback->context = callback_context;
markrad 58:f50b97b08851 710 message_with_callback->message_sender = message_sender_instance;
markrad 58:f50b97b08851 711
markrad 58:f50b97b08851 712 message_sender_instance->messages[message_sender_instance->message_count] = message_with_callback;
markrad 58:f50b97b08851 713 message_sender_instance->message_count++;
markrad 58:f50b97b08851 714
markrad 58:f50b97b08851 715 if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPEN)
markrad 58:f50b97b08851 716 {
markrad 58:f50b97b08851 717 switch (send_one_message(message_sender_instance, message_with_callback, message))
markrad 58:f50b97b08851 718 {
markrad 58:f50b97b08851 719 default:
markrad 58:f50b97b08851 720 case SEND_ONE_MESSAGE_ERROR:
markrad 58:f50b97b08851 721 remove_pending_message_by_index(message_sender_instance, message_sender_instance->message_count - 1);
markrad 58:f50b97b08851 722 result = __LINE__;
markrad 58:f50b97b08851 723 break;
markrad 58:f50b97b08851 724
markrad 58:f50b97b08851 725 case SEND_ONE_MESSAGE_BUSY:
markrad 58:f50b97b08851 726 message_with_callback->message = message_clone(message);
markrad 58:f50b97b08851 727 if (message_with_callback->message == NULL)
markrad 58:f50b97b08851 728 {
markrad 58:f50b97b08851 729 amqpalloc_free(message_with_callback);
markrad 58:f50b97b08851 730 result = __LINE__;
markrad 58:f50b97b08851 731 }
markrad 58:f50b97b08851 732 else
markrad 58:f50b97b08851 733 {
markrad 58:f50b97b08851 734 message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
markrad 58:f50b97b08851 735 result = 0;
markrad 58:f50b97b08851 736 }
markrad 58:f50b97b08851 737 break;
markrad 58:f50b97b08851 738
markrad 58:f50b97b08851 739 case SEND_ONE_MESSAGE_OK:
markrad 58:f50b97b08851 740 result = 0;
markrad 58:f50b97b08851 741 break;
markrad 58:f50b97b08851 742 }
markrad 58:f50b97b08851 743 }
markrad 58:f50b97b08851 744 }
markrad 58:f50b97b08851 745 }
markrad 58:f50b97b08851 746 }
markrad 58:f50b97b08851 747 }
markrad 58:f50b97b08851 748 }
markrad 58:f50b97b08851 749 return result;
markrad 58:f50b97b08851 750 }
markrad 58:f50b97b08851 751
markrad 58:f50b97b08851 752 void messagesender_set_trace(MESSAGE_SENDER_HANDLE message_sender, bool traceOn)
markrad 58:f50b97b08851 753 {
markrad 58:f50b97b08851 754 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
markrad 58:f50b97b08851 755 if (message_sender_instance != NULL)
markrad 58:f50b97b08851 756 {
markrad 58:f50b97b08851 757 message_sender_instance->is_trace_on = traceOn ? 1 : 0;
markrad 58:f50b97b08851 758 }
markrad 58:f50b97b08851 759 }