A small memory footprint AMQP implimentation
Dependents: iothub_client_sample_amqp remote_monitoring simplesample_amqp
Diff: message_sender.c
- Revision:
- 34:6be9c2058664
- Parent:
- 33:08b53020ff0d
- Child:
- 35:d0bed2404ee9
diff -r 08b53020ff0d -r 6be9c2058664 message_sender.c --- a/message_sender.c Mon Sep 25 13:38:40 2017 -0700 +++ b/message_sender.c Sat Oct 21 20:12:19 2017 +0000 @@ -7,8 +7,10 @@ #include "azure_c_shared_utility/optimize_size.h" #include "azure_c_shared_utility/gballoc.h" #include "azure_c_shared_utility/xlogging.h" +#include "azure_c_shared_utility/tickcounter.h" #include "azure_uamqp_c/message_sender.h" #include "azure_uamqp_c/amqpvalue_to_string.h" +#include "azure_uamqp_c/async_operation.h" typedef enum MESSAGE_SEND_STATE_TAG { @@ -30,62 +32,66 @@ void* context; MESSAGE_SENDER_HANDLE message_sender; MESSAGE_SEND_STATE message_send_state; + tickcounter_ms_t timeout; } MESSAGE_WITH_CALLBACK; +DEFINE_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK); + typedef struct MESSAGE_SENDER_INSTANCE_TAG { LINK_HANDLE link; size_t message_count; - MESSAGE_WITH_CALLBACK** messages; + ASYNC_OPERATION_HANDLE* messages; MESSAGE_SENDER_STATE message_sender_state; ON_MESSAGE_SENDER_STATE_CHANGED on_message_sender_state_changed; void* on_message_sender_state_changed_context; unsigned int is_trace_on : 1; } MESSAGE_SENDER_INSTANCE; -static void remove_pending_message_by_index(MESSAGE_SENDER_INSTANCE* message_sender_instance, size_t index) +static void remove_pending_message_by_index(MESSAGE_SENDER_HANDLE message_sender, size_t index) { - MESSAGE_WITH_CALLBACK** new_messages; + ASYNC_OPERATION_HANDLE* new_messages; + MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, message_sender->messages[index]); - if (message_sender_instance->messages[index]->message != NULL) + if (message_with_callback->message != NULL) { - message_destroy(message_sender_instance->messages[index]->message); - message_sender_instance->messages[index]->message = NULL; + message_destroy(message_with_callback->message); + message_with_callback->message = NULL; } - free(message_sender_instance->messages[index]); + async_operation_destroy(message_sender->messages[index]); - if (message_sender_instance->message_count - index > 1) + if (message_sender->message_count - index > 1) { - (void)memmove(&message_sender_instance->messages[index], &message_sender_instance->messages[index + 1], sizeof(MESSAGE_WITH_CALLBACK*) * (message_sender_instance->message_count - index - 1)); + (void)memmove(&message_sender->messages[index], &message_sender->messages[index + 1], sizeof(ASYNC_OPERATION_HANDLE) * (message_sender->message_count - index - 1)); } - message_sender_instance->message_count--; + message_sender->message_count--; - if (message_sender_instance->message_count > 0) + if (message_sender->message_count > 0) { - new_messages = (MESSAGE_WITH_CALLBACK**)realloc(message_sender_instance->messages, sizeof(MESSAGE_WITH_CALLBACK*) * (message_sender_instance->message_count)); + new_messages = (ASYNC_OPERATION_HANDLE*)realloc(message_sender->messages, sizeof(ASYNC_OPERATION_HANDLE) * (message_sender->message_count)); if (new_messages != NULL) { - message_sender_instance->messages = new_messages; + message_sender->messages = new_messages; } } else { - free(message_sender_instance->messages); - message_sender_instance->messages = NULL; + free(message_sender->messages); + message_sender->messages = NULL; } } -static void remove_pending_message(MESSAGE_SENDER_INSTANCE* message_sender_instance, MESSAGE_WITH_CALLBACK* message_with_callback) +static void remove_pending_message(MESSAGE_SENDER_INSTANCE* message_sender, ASYNC_OPERATION_HANDLE pending_send) { size_t i; - for (i = 0; i < message_sender_instance->message_count; i++) + for (i = 0; i < message_sender->message_count; i++) { - if (message_sender_instance->messages[i] == message_with_callback) + if (message_sender->messages[i] == pending_send) { - remove_pending_message_by_index(message_sender_instance, i); + remove_pending_message_by_index(message_sender, i); break; } } @@ -93,8 +99,9 @@ static void on_delivery_settled(void* context, delivery_number delivery_no, LINK_DELIVERY_SETTLE_REASON reason, AMQP_VALUE delivery_state) { - MESSAGE_WITH_CALLBACK* message_with_callback = (MESSAGE_WITH_CALLBACK*)context; - MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_with_callback->message_sender; + ASYNC_OPERATION_HANDLE pending_send = (ASYNC_OPERATION_HANDLE)context; + MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, pending_send); + MESSAGE_SENDER_INSTANCE* message_sender = (MESSAGE_SENDER_INSTANCE*)message_with_callback->message_sender; (void)delivery_no; if (message_with_callback->on_message_send_complete != NULL) @@ -128,6 +135,9 @@ case LINK_DELIVERY_SETTLE_REASON_SETTLED: message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_OK); break; + case LINK_DELIVERY_SETTLE_REASON_TIMEOUT: + message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_TIMEOUT); + break; case LINK_DELIVERY_SETTLE_REASON_NOT_DELIVERED: default: message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR); @@ -135,7 +145,7 @@ } } - remove_pending_message(message_sender_instance, message_with_callback); + remove_pending_message(message_sender, pending_send); } static int encode_bytes(void* context, const unsigned char* bytes, size_t length) @@ -146,18 +156,18 @@ return 0; } -static void log_message_chunk(MESSAGE_SENDER_INSTANCE* message_sender_instance, const char* name, AMQP_VALUE value) +static void log_message_chunk(MESSAGE_SENDER_INSTANCE* message_sender, const char* name, AMQP_VALUE value) { #ifdef NO_LOGGING - UNUSED(message_sender_instance); + UNUSED(message_sender); UNUSED(name); UNUSED(value); #else - if (xlogging_get_log_function() != NULL && message_sender_instance->is_trace_on == 1) + if (xlogging_get_log_function() != NULL && message_sender->is_trace_on == 1) { char* value_as_string = NULL; - LOG(AZ_LOG_TRACE, 0, "%s", name); - LOG(AZ_LOG_TRACE, 0, "%s", (value_as_string = amqpvalue_to_string(value))); + LOG(AZ_LOG_TRACE, 0, "%s", P_OR_NULL(name)); + LOG(AZ_LOG_TRACE, 0, "%s", P_OR_NULL((value_as_string = amqpvalue_to_string(value)))); if (value_as_string != NULL) { free(value_as_string); @@ -166,7 +176,7 @@ #endif } -static SEND_ONE_MESSAGE_RESULT send_one_message(MESSAGE_SENDER_INSTANCE* message_sender_instance, MESSAGE_WITH_CALLBACK* message_with_callback, MESSAGE_HANDLE message) +static SEND_ONE_MESSAGE_RESULT send_one_message(MESSAGE_SENDER_INSTANCE* message_sender, ASYNC_OPERATION_HANDLE pending_send, MESSAGE_HANDLE message) { SEND_ONE_MESSAGE_RESULT result; @@ -193,51 +203,112 @@ AMQP_VALUE body_amqp_value = NULL; size_t body_data_count = 0; AMQP_VALUE msg_annotations = NULL; + bool is_error = false; - message_get_header(message, &header); - header_amqp_value = amqpvalue_create_header(header); - if (header != NULL) + // message header + if ((message_get_header(message, &header) == 0) && + (header != NULL)) { - amqpvalue_get_encoded_size(header_amqp_value, &encoded_size); - total_encoded_size += encoded_size; + header_amqp_value = amqpvalue_create_header(header); + if (header_amqp_value == NULL) + { + LogError("Cannot create header AMQP value"); + is_error = true; + } + else + { + if (amqpvalue_get_encoded_size(header_amqp_value, &encoded_size) != 0) + { + LogError("Cannot obtain header encoded size"); + is_error = true; + } + else + { + total_encoded_size += encoded_size; + } + } + } // message annotations - if ((message_get_message_annotations(message, &msg_annotations) == 0) && + if ((!is_error) && + (message_get_message_annotations(message, &msg_annotations) == 0) && (msg_annotations != NULL)) { - amqpvalue_get_encoded_size(msg_annotations, &encoded_size); - total_encoded_size += encoded_size; + if (amqpvalue_get_encoded_size(msg_annotations, &encoded_size) != 0) + { + LogError("Cannot obtain message annotations encoded size"); + is_error = true; + } + else + { + total_encoded_size += encoded_size; + } } // properties - message_get_properties(message, &properties); - properties_amqp_value = amqpvalue_create_properties(properties); - if (properties != NULL) + if ((!is_error) && + (message_get_properties(message, &properties) == 0) && + (properties != NULL)) { - amqpvalue_get_encoded_size(properties_amqp_value, &encoded_size); - total_encoded_size += encoded_size; + properties_amqp_value = amqpvalue_create_properties(properties); + if (properties_amqp_value == NULL) + { + LogError("Cannot create message properties AMQP value"); + is_error = true; + } + else + { + if (amqpvalue_get_encoded_size(properties_amqp_value, &encoded_size) != 0) + { + LogError("Cannot obtain message properties encoded size"); + is_error = true; + } + else + { + total_encoded_size += encoded_size; + } + } } // application properties - message_get_application_properties(message, &application_properties); - if (application_properties != NULL) + if ((!is_error) && + (message_get_application_properties(message, &application_properties) == 0) && + (application_properties != NULL)) { application_properties_value = amqpvalue_create_application_properties(application_properties); - amqpvalue_get_encoded_size(application_properties_value, &encoded_size); - total_encoded_size += encoded_size; + if (application_properties_value == NULL) + { + LogError("Cannot create application properties AMQP value"); + is_error = true; + } + else + { + if (amqpvalue_get_encoded_size(application_properties_value, &encoded_size) != 0) + { + LogError("Cannot obtain application properties encoded size"); + is_error = true; + } + else + { + total_encoded_size += encoded_size; + } + } + } + + if (is_error) + { + result = SEND_ONE_MESSAGE_ERROR; } else { - application_properties_value = NULL; - } - - result = SEND_ONE_MESSAGE_OK; + result = SEND_ONE_MESSAGE_OK; - // body - amqp data - switch (message_body_type) - { + // body - amqp data + switch (message_body_type) + { default: + LogError("Unknown body type"); result = SEND_ONE_MESSAGE_ERROR; break; @@ -246,19 +317,28 @@ AMQP_VALUE message_body_amqp_value; if (message_get_body_amqp_value_in_place(message, &message_body_amqp_value) != 0) { + LogError("Cannot obtain AMQP value from body"); result = SEND_ONE_MESSAGE_ERROR; } else { body_amqp_value = amqpvalue_create_amqp_value(message_body_amqp_value); - if ((body_amqp_value == NULL) || - (amqpvalue_get_encoded_size(body_amqp_value, &encoded_size) != 0)) + if (body_amqp_value == NULL) { + LogError("Cannot create body AMQP value"); result = SEND_ONE_MESSAGE_ERROR; } else { - total_encoded_size += encoded_size; + if (amqpvalue_get_encoded_size(body_amqp_value, &encoded_size) != 0) + { + LogError("Cannot get body AMQP value encoded size"); + result = SEND_ONE_MESSAGE_ERROR; + } + else + { + total_encoded_size += encoded_size; + } } } @@ -272,6 +352,7 @@ if (message_get_body_amqp_data_count(message, &body_data_count) != 0) { + LogError("Cannot get body AMQP data count"); result = SEND_ONE_MESSAGE_ERROR; } else @@ -280,6 +361,7 @@ { if (message_get_body_amqp_data_in_place(message, i, &binary_data) != 0) { + LogError("Cannot get body AMQP data %u", (unsigned int)i); result = SEND_ONE_MESSAGE_ERROR; } else @@ -291,12 +373,14 @@ body_amqp_data = amqpvalue_create_data(binary_value); if (body_amqp_data == NULL) { + LogError("Cannot create body AMQP data"); result = SEND_ONE_MESSAGE_ERROR; } else { if (amqpvalue_get_encoded_size(body_amqp_data, &encoded_size) != 0) { + LogError("Cannot get body AMQP data encoded size"); result = SEND_ONE_MESSAGE_ERROR; } else @@ -311,139 +395,151 @@ } break; } - } - - if (result == 0) - { - void* data_bytes = malloc(total_encoded_size); - PAYLOAD payload; - payload.bytes = (const unsigned char*)data_bytes; - payload.length = 0; - result = SEND_ONE_MESSAGE_OK; - - if (header != NULL) - { - if (amqpvalue_encode(header_amqp_value, encode_bytes, &payload) != 0) - { - result = SEND_ONE_MESSAGE_ERROR; - } - - log_message_chunk(message_sender_instance, "Header:", header_amqp_value); - } - - if ((result == SEND_ONE_MESSAGE_OK) && (msg_annotations != NULL)) - { - if (amqpvalue_encode(msg_annotations, encode_bytes, &payload) != 0) - { - result = SEND_ONE_MESSAGE_ERROR; - } - - log_message_chunk(message_sender_instance, "Message Annotations:", msg_annotations); } - if ((result == SEND_ONE_MESSAGE_OK) && (properties != NULL)) + if (result == 0) { - if (amqpvalue_encode(properties_amqp_value, encode_bytes, &payload) != 0) + void* data_bytes = malloc(total_encoded_size); + PAYLOAD payload; + payload.bytes = (const unsigned char*)data_bytes; + payload.length = 0; + result = SEND_ONE_MESSAGE_OK; + + if (header != NULL) { - result = SEND_ONE_MESSAGE_ERROR; + if (amqpvalue_encode(header_amqp_value, encode_bytes, &payload) != 0) + { + LogError("Cannot encode header value"); + result = SEND_ONE_MESSAGE_ERROR; + } + + log_message_chunk(message_sender, "Header:", header_amqp_value); } - log_message_chunk(message_sender_instance, "Properties:", properties_amqp_value); - } + if ((result == SEND_ONE_MESSAGE_OK) && (msg_annotations != NULL)) + { + if (amqpvalue_encode(msg_annotations, encode_bytes, &payload) != 0) + { + LogError("Cannot encode message annotations value"); + result = SEND_ONE_MESSAGE_ERROR; + } - if ((result == SEND_ONE_MESSAGE_OK) && (application_properties != NULL)) - { - if (amqpvalue_encode(application_properties_value, encode_bytes, &payload) != 0) - { - result = SEND_ONE_MESSAGE_ERROR; + log_message_chunk(message_sender, "Message Annotations:", msg_annotations); } - log_message_chunk(message_sender_instance, "Application properties:", application_properties_value); - } - - if (result == SEND_ONE_MESSAGE_OK) - { - switch (message_body_type) + if ((result == SEND_ONE_MESSAGE_OK) && (properties != NULL)) { - default: - result = SEND_ONE_MESSAGE_ERROR; - break; - - case MESSAGE_BODY_TYPE_VALUE: - { - if (amqpvalue_encode(body_amqp_value, encode_bytes, &payload) != 0) + if (amqpvalue_encode(properties_amqp_value, encode_bytes, &payload) != 0) { + LogError("Cannot encode message properties value"); result = SEND_ONE_MESSAGE_ERROR; } - log_message_chunk(message_sender_instance, "Body - amqp value:", body_amqp_value); - break; + log_message_chunk(message_sender, "Properties:", properties_amqp_value); } - case MESSAGE_BODY_TYPE_DATA: + + if ((result == SEND_ONE_MESSAGE_OK) && (application_properties != NULL)) { - BINARY_DATA binary_data; - size_t i; + if (amqpvalue_encode(application_properties_value, encode_bytes, &payload) != 0) + { + LogError("Cannot encode application properties value"); + result = SEND_ONE_MESSAGE_ERROR; + } + + log_message_chunk(message_sender, "Application properties:", application_properties_value); + } - for (i = 0; i < body_data_count; i++) + if (result == SEND_ONE_MESSAGE_OK) + { + switch (message_body_type) { - if (message_get_body_amqp_data_in_place(message, i, &binary_data) != 0) + default: + LogError("Unknown message type"); + result = SEND_ONE_MESSAGE_ERROR; + break; + + case MESSAGE_BODY_TYPE_VALUE: + { + if (amqpvalue_encode(body_amqp_value, encode_bytes, &payload) != 0) { + LogError("Cannot encode body AMQP value"); result = SEND_ONE_MESSAGE_ERROR; } - else + + log_message_chunk(message_sender, "Body - amqp value:", body_amqp_value); + break; + } + case MESSAGE_BODY_TYPE_DATA: + { + BINARY_DATA binary_data; + size_t i; + + for (i = 0; i < body_data_count; i++) { - AMQP_VALUE body_amqp_data; - amqp_binary binary_value; - binary_value.bytes = binary_data.bytes; - binary_value.length = (uint32_t)binary_data.length; - body_amqp_data = amqpvalue_create_data(binary_value); - if (body_amqp_data == NULL) + if (message_get_body_amqp_data_in_place(message, i, &binary_data) != 0) { + LogError("Cannot get AMQP data %u", (unsigned int)i); result = SEND_ONE_MESSAGE_ERROR; } else { - if (amqpvalue_encode(body_amqp_data, encode_bytes, &payload) != 0) + AMQP_VALUE body_amqp_data; + amqp_binary binary_value; + binary_value.bytes = binary_data.bytes; + binary_value.length = (uint32_t)binary_data.length; + body_amqp_data = amqpvalue_create_data(binary_value); + if (body_amqp_data == NULL) { + LogError("Cannot create body AMQP data %u", (unsigned int)i); result = SEND_ONE_MESSAGE_ERROR; - break; } + else + { + if (amqpvalue_encode(body_amqp_data, encode_bytes, &payload) != 0) + { + LogError("Cannot encode body AMQP data %u", (unsigned int)i); + result = SEND_ONE_MESSAGE_ERROR; + break; + } - amqpvalue_destroy(body_amqp_data); + amqpvalue_destroy(body_amqp_data); + } } } + break; } - break; + } } - } - } - if (result == SEND_ONE_MESSAGE_OK) - { - message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING; - switch (link_transfer(message_sender_instance->link, message_format, &payload, 1, on_delivery_settled, message_with_callback)) + if (result == SEND_ONE_MESSAGE_OK) { - default: - case LINK_TRANSFER_ERROR: - result = SEND_ONE_MESSAGE_ERROR; - break; + MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, pending_send); + message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING; + switch (link_transfer_async(message_sender->link, message_format, &payload, 1, on_delivery_settled, pending_send, message_with_callback->timeout)) + { + default: + case LINK_TRANSFER_ERROR: + LogError("Error in link transfer"); + result = SEND_ONE_MESSAGE_ERROR; + break; - case LINK_TRANSFER_BUSY: - message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT; - result = SEND_ONE_MESSAGE_BUSY; - break; + case LINK_TRANSFER_BUSY: + message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT; + result = SEND_ONE_MESSAGE_BUSY; + break; - case LINK_TRANSFER_OK: - result = SEND_ONE_MESSAGE_OK; - break; + case LINK_TRANSFER_OK: + result = SEND_ONE_MESSAGE_OK; + break; + } } - } - free(data_bytes); + free(data_bytes); - if (body_amqp_value != NULL) - { - amqpvalue_destroy(body_amqp_value); + if (body_amqp_value != NULL) + { + amqpvalue_destroy(body_amqp_value); + } } } @@ -466,14 +562,17 @@ { amqpvalue_destroy(application_properties); } + if (application_properties_value != NULL) { amqpvalue_destroy(application_properties_value); } + if (properties_amqp_value != NULL) { amqpvalue_destroy(properties_amqp_value); } + if (properties != NULL) { properties_destroy(properties); @@ -483,33 +582,37 @@ return result; } -static void send_all_pending_messages(MESSAGE_SENDER_INSTANCE* message_sender_instance) +static void send_all_pending_messages(MESSAGE_SENDER_HANDLE message_sender) { size_t i; - for (i = 0; i < message_sender_instance->message_count; i++) + for (i = 0; i < message_sender->message_count; i++) { - if (message_sender_instance->messages[i]->message_send_state == MESSAGE_SEND_STATE_NOT_SENT) + MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, message_sender->messages[i]); + if (message_with_callback->message_send_state == MESSAGE_SEND_STATE_NOT_SENT) { - switch (send_one_message(message_sender_instance, message_sender_instance->messages[i], message_sender_instance->messages[i]->message)) + switch (send_one_message(message_sender, message_sender->messages[i], message_with_callback->message)) { default: + LogError("Invalid send one message result"); + break; + case SEND_ONE_MESSAGE_ERROR: { - ON_MESSAGE_SEND_COMPLETE on_message_send_complete = message_sender_instance->messages[i]->on_message_send_complete; - void* context = message_sender_instance->messages[i]->context; - remove_pending_message_by_index(message_sender_instance, i); + ON_MESSAGE_SEND_COMPLETE on_message_send_complete = message_with_callback->on_message_send_complete; + void* context = message_with_callback->context; + remove_pending_message_by_index(message_sender, i); if (on_message_send_complete != NULL) { on_message_send_complete(context, MESSAGE_SEND_ERROR); } - i = message_sender_instance->message_count; + i = message_sender->message_count; break; } case SEND_ONE_MESSAGE_BUSY: - i = message_sender_instance->message_count + 1; + i = message_sender->message_count + 1; break; case SEND_ONE_MESSAGE_OK: @@ -521,46 +624,47 @@ } } -static void set_message_sender_state(MESSAGE_SENDER_INSTANCE* message_sender_instance, MESSAGE_SENDER_STATE new_state) +static void set_message_sender_state(MESSAGE_SENDER_INSTANCE* message_sender, MESSAGE_SENDER_STATE new_state) { - MESSAGE_SENDER_STATE previous_state = message_sender_instance->message_sender_state; - message_sender_instance->message_sender_state = new_state; - if (message_sender_instance->on_message_sender_state_changed != NULL) + MESSAGE_SENDER_STATE previous_state = message_sender->message_sender_state; + message_sender->message_sender_state = new_state; + if (message_sender->on_message_sender_state_changed != NULL) { - message_sender_instance->on_message_sender_state_changed(message_sender_instance->on_message_sender_state_changed_context, new_state, previous_state); + message_sender->on_message_sender_state_changed(message_sender->on_message_sender_state_changed_context, new_state, previous_state); } } -static void indicate_all_messages_as_error(MESSAGE_SENDER_INSTANCE* message_sender_instance) +static void indicate_all_messages_as_error(MESSAGE_SENDER_INSTANCE* message_sender) { size_t i; - for (i = 0; i < message_sender_instance->message_count; i++) + for (i = 0; i < message_sender->message_count; i++) { - if (message_sender_instance->messages[i]->on_message_send_complete != NULL) + MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, message_sender->messages[i]); + if (message_with_callback->on_message_send_complete != NULL) { - message_sender_instance->messages[i]->on_message_send_complete(message_sender_instance->messages[i]->context, MESSAGE_SEND_ERROR); + message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR); } - if (message_sender_instance->messages[i]->message != NULL) + if (message_with_callback->message != NULL) { - message_destroy(message_sender_instance->messages[i]->message); + message_destroy(message_with_callback->message); } - free(message_sender_instance->messages[i]); + async_operation_destroy(message_sender->messages[i]); } - if (message_sender_instance->messages != NULL) + if (message_sender->messages != NULL) { - message_sender_instance->message_count = 0; + message_sender->message_count = 0; - free(message_sender_instance->messages); - message_sender_instance->messages = NULL; + free(message_sender->messages); + message_sender->messages = NULL; } } static void on_link_state_changed(void* context, LINK_STATE new_link_state, LINK_STATE previous_link_state) { - MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)context; + MESSAGE_SENDER_INSTANCE* message_sender = (MESSAGE_SENDER_INSTANCE*)context; (void)previous_link_state; switch (new_link_state) @@ -569,30 +673,30 @@ break; case LINK_STATE_ATTACHED: - if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPENING) + if (message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPENING) { - set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_OPEN); + set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_OPEN); } break; case LINK_STATE_DETACHED: - if ((message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPEN) || - (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_CLOSING)) + if ((message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPEN) || + (message_sender->message_sender_state == MESSAGE_SENDER_STATE_CLOSING)) { /* User initiated transition, we should be good */ - set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_IDLE); - indicate_all_messages_as_error(message_sender_instance); + set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_IDLE); + indicate_all_messages_as_error(message_sender); } - else if (message_sender_instance->message_sender_state != MESSAGE_SENDER_STATE_IDLE) + else if (message_sender->message_sender_state != MESSAGE_SENDER_STATE_IDLE) { /* Any other transition must be an error */ - set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR); + set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_ERROR); } break; case LINK_STATE_ERROR: - if (message_sender_instance->message_sender_state != MESSAGE_SENDER_STATE_ERROR) + if (message_sender->message_sender_state != MESSAGE_SENDER_STATE_ERROR) { - set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR); - indicate_all_messages_as_error(message_sender_instance); + set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_ERROR); + indicate_all_messages_as_error(message_sender); } break; } @@ -600,36 +704,41 @@ static void on_link_flow_on(void* context) { - MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)context; - send_all_pending_messages(message_sender_instance); + MESSAGE_SENDER_HANDLE message_sender = (MESSAGE_SENDER_INSTANCE*)context; + send_all_pending_messages(message_sender); } MESSAGE_SENDER_HANDLE messagesender_create(LINK_HANDLE link, ON_MESSAGE_SENDER_STATE_CHANGED on_message_sender_state_changed, void* context) { - MESSAGE_SENDER_INSTANCE* result = (MESSAGE_SENDER_INSTANCE*)malloc(sizeof(MESSAGE_SENDER_INSTANCE)); - if (result != NULL) + MESSAGE_SENDER_INSTANCE* message_sender = (MESSAGE_SENDER_INSTANCE*)malloc(sizeof(MESSAGE_SENDER_INSTANCE)); + if (message_sender == NULL) + { + LogError("Failed allocating message sender"); + } + else { - result->messages = NULL; - result->message_count = 0; - result->link = link; - result->on_message_sender_state_changed = on_message_sender_state_changed; - result->on_message_sender_state_changed_context = context; - result->message_sender_state = MESSAGE_SENDER_STATE_IDLE; - result->is_trace_on = 0; + message_sender->messages = NULL; + message_sender->message_count = 0; + message_sender->link = link; + message_sender->on_message_sender_state_changed = on_message_sender_state_changed; + message_sender->on_message_sender_state_changed_context = context; + message_sender->message_sender_state = MESSAGE_SENDER_STATE_IDLE; + message_sender->is_trace_on = 0; } - return result; + return message_sender; } void messagesender_destroy(MESSAGE_SENDER_HANDLE message_sender) { - if (message_sender != NULL) + if (message_sender == NULL) { - MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender; - - messagesender_close(message_sender_instance); - - indicate_all_messages_as_error(message_sender_instance); + LogError("NULL message_sender"); + } + else + { + (void)messagesender_close(message_sender); + indicate_all_messages_as_error(message_sender); free(message_sender); } @@ -641,19 +750,19 @@ if (message_sender == NULL) { + LogError("NULL message_sender"); result = __FAILURE__; } else { - MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender; - - if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_IDLE) + if (message_sender->message_sender_state == MESSAGE_SENDER_STATE_IDLE) { - set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_OPENING); - if (link_attach(message_sender_instance->link, NULL, on_link_state_changed, on_link_flow_on, message_sender_instance) != 0) + set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_OPENING); + if (link_attach(message_sender->link, NULL, on_link_state_changed, on_link_flow_on, message_sender) != 0) { + LogError("attach link failed"); result = __FAILURE__; - set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR); + set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_ERROR); } else { @@ -675,20 +784,20 @@ if (message_sender == NULL) { + LogError("NULL message_sender"); result = __FAILURE__; } else { - MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender; - - if ((message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPENING) || - (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPEN)) + if ((message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPENING) || + (message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPEN)) { - set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_CLOSING); - if (link_detach(message_sender_instance->link, true) != 0) + set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_CLOSING); + if (link_detach(message_sender->link, true) != 0) { + LogError("Detaching link failed"); result = __FAILURE__; - set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR); + set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_ERROR); } else { @@ -704,49 +813,63 @@ return result; } -int messagesender_send(MESSAGE_SENDER_HANDLE message_sender, MESSAGE_HANDLE message, ON_MESSAGE_SEND_COMPLETE on_message_send_complete, void* callback_context) +static void messagesender_send_cancel_handler(ASYNC_OPERATION_HANDLE send_operation) { - int result; + MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, send_operation); + if (message_with_callback->on_message_send_complete != NULL) + { + message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_CANCELLED); + } + + remove_pending_message(message_with_callback->message_sender, send_operation); +} + +ASYNC_OPERATION_HANDLE messagesender_send_async(MESSAGE_SENDER_HANDLE message_sender, MESSAGE_HANDLE message, ON_MESSAGE_SEND_COMPLETE on_message_send_complete, void* callback_context, tickcounter_ms_t timeout) +{ + ASYNC_OPERATION_HANDLE result; if ((message_sender == NULL) || (message == NULL)) { - result = __FAILURE__; + LogError("Bad parameters: message_sender = %p, message = %p"); + result = NULL; } else { - MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender; - if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_ERROR) + if (message_sender->message_sender_state == MESSAGE_SENDER_STATE_ERROR) { - result = __FAILURE__; + LogError("Message sender in ERROR state"); + result = NULL; } else { - MESSAGE_WITH_CALLBACK* message_with_callback = (MESSAGE_WITH_CALLBACK*)malloc(sizeof(MESSAGE_WITH_CALLBACK)); - if (message_with_callback == NULL) + result = CREATE_ASYNC_OPERATION(MESSAGE_WITH_CALLBACK, messagesender_send_cancel_handler); + if (result == NULL) { - result = __FAILURE__; + LogError("Failed allocating context for send"); } else { - MESSAGE_WITH_CALLBACK** new_messages = (MESSAGE_WITH_CALLBACK**)realloc(message_sender_instance->messages, sizeof(MESSAGE_WITH_CALLBACK*) * (message_sender_instance->message_count + 1)); + MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, result); + ASYNC_OPERATION_HANDLE* new_messages = (ASYNC_OPERATION_HANDLE*)realloc(message_sender->messages, sizeof(ASYNC_OPERATION_HANDLE) * (message_sender->message_count + 1)); if (new_messages == NULL) { - free(message_with_callback); - result = __FAILURE__; + LogError("Failed allocating memory for pending sends"); + async_operation_destroy(result); + result = NULL; } else { - result = 0; - - message_sender_instance->messages = new_messages; - if (message_sender_instance->message_sender_state != MESSAGE_SENDER_STATE_OPEN) + message_with_callback->timeout = timeout; + message_sender->messages = new_messages; + if (message_sender->message_sender_state != MESSAGE_SENDER_STATE_OPEN) { message_with_callback->message = message_clone(message); if (message_with_callback->message == NULL) { - free(message_with_callback); - result = __FAILURE__; + LogError("Cannot clone message for placing it in the pending sends list"); + async_operation_destroy(result); + result = NULL; } message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT; @@ -757,42 +880,42 @@ message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING; } - if (result == 0) + if (result != NULL) { message_with_callback->on_message_send_complete = on_message_send_complete; message_with_callback->context = callback_context; - message_with_callback->message_sender = message_sender_instance; + message_with_callback->message_sender = message_sender; - message_sender_instance->messages[message_sender_instance->message_count] = message_with_callback; - message_sender_instance->message_count++; + message_sender->messages[message_sender->message_count] = result; + message_sender->message_count++; - if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPEN) + if (message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPEN) { - switch (send_one_message(message_sender_instance, message_with_callback, message)) + switch (send_one_message(message_sender, result, message)) { default: case SEND_ONE_MESSAGE_ERROR: - - remove_pending_message_by_index(message_sender_instance, message_sender_instance->message_count - 1); - result = __FAILURE__; + LogError("Error sending message"); + remove_pending_message_by_index(message_sender, message_sender->message_count - 1); + async_operation_destroy(result); + result = NULL; break; case SEND_ONE_MESSAGE_BUSY: message_with_callback->message = message_clone(message); if (message_with_callback->message == NULL) { - free(message_with_callback); - result = __FAILURE__; + LogError("Error cloning message for placing it in the pending sends list"); + async_operation_destroy(result); + result = NULL; } else { message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT; - result = 0; } break; case SEND_ONE_MESSAGE_OK: - result = 0; break; } } @@ -801,14 +924,18 @@ } } } + return result; } void messagesender_set_trace(MESSAGE_SENDER_HANDLE message_sender, bool traceOn) { - MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender; - if (message_sender_instance != NULL) + if (message_sender == NULL) { - message_sender_instance->is_trace_on = traceOn ? 1 : 0; + LogError("NULL message_sender"); + } + else + { + message_sender->is_trace_on = traceOn ? 1 : 0; } }