A small memory footprint AMQP implimentation
Dependents: iothub_client_sample_amqp remote_monitoring simplesample_amqp
Diff: link.c
- Revision:
- 36:8e1d94b0a70c
- Parent:
- 34:6be9c2058664
- Child:
- 37:c923ba7f6cf9
diff -r d0bed2404ee9 -r 8e1d94b0a70c link.c --- a/link.c Fri Nov 03 13:18:57 2017 -0700 +++ b/link.c Fri Nov 17 13:58:00 2017 -0800 @@ -15,6 +15,7 @@ #include "azure_uamqp_c/amqpvalue.h" #include "azure_uamqp_c/amqp_definitions.h" #include "azure_uamqp_c/amqp_frame_codec.h" +#include "azure_uamqp_c/async_operation.h" #define DEFAULT_LINK_CREDIT 10000 @@ -61,6 +62,8 @@ TICK_COUNTER_HANDLE tick_counter; } LINK_INSTANCE; +DEFINE_ASYNC_OPERATION_CONTEXT(DELIVERY_INSTANCE); + static void set_link_state(LINK_INSTANCE* link_instance, LINK_STATE link_state) { link_instance->previous_link_state = link_instance->link_state; @@ -80,14 +83,16 @@ while (item != NULL) { LIST_ITEM_HANDLE next_item = singlylinkedlist_get_next_item(item); - DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)singlylinkedlist_item_get_value(item); - if (delivery_instance != NULL) + ASYNC_OPERATION_HANDLE pending_delivery_operation = (ASYNC_OPERATION_HANDLE)singlylinkedlist_item_get_value(item); + if (pending_delivery_operation != NULL) { + DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)GET_ASYNC_OPERATION_CONTEXT(DELIVERY_INSTANCE, pending_delivery_operation); if (indicate_settled && (delivery_instance->on_delivery_settled != NULL)) { delivery_instance->on_delivery_settled(delivery_instance->callback_context, delivery_instance->delivery_id, LINK_DELIVERY_SETTLE_REASON_NOT_DELIVERED, NULL); } - free(delivery_instance); + + async_operation_destroy(pending_delivery_operation); } item = next_item; @@ -478,14 +483,16 @@ while (pending_delivery != NULL) { LIST_ITEM_HANDLE next_pending_delivery = singlylinkedlist_get_next_item(pending_delivery); - DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)singlylinkedlist_item_get_value(pending_delivery); - if (delivery_instance == NULL) + ASYNC_OPERATION_HANDLE pending_delivery_operation = (ASYNC_OPERATION_HANDLE)singlylinkedlist_item_get_value(pending_delivery); + if (pending_delivery_operation == NULL) { /* error */ break; } else { + DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)GET_ASYNC_OPERATION_CONTEXT(DELIVERY_INSTANCE, pending_delivery_operation); + if ((delivery_instance->delivery_id >= first) && (delivery_instance->delivery_id <= last)) { AMQP_VALUE delivery_state; @@ -496,7 +503,7 @@ else { delivery_instance->on_delivery_settled(delivery_instance->callback_context, delivery_instance->delivery_id, LINK_DELIVERY_SETTLE_REASON_DISPOSITION_RECEIVED, delivery_state); - free(delivery_instance); + async_operation_destroy(pending_delivery_operation); if (singlylinkedlist_remove(link_instance->pending_deliveries, pending_delivery) != 0) { /* error */ @@ -585,13 +592,13 @@ } else if (new_session_state == SESSION_STATE_DISCARDING) { - remove_all_pending_deliveries(link_instance, true); - set_link_state(link_instance, LINK_STATE_DETACHED); + remove_all_pending_deliveries(link_instance, true); + set_link_state(link_instance, LINK_STATE_DETACHED); } else if (new_session_state == SESSION_STATE_ERROR) { - remove_all_pending_deliveries(link_instance, true); - set_link_state(link_instance, LINK_STATE_ERROR); + remove_all_pending_deliveries(link_instance, true); + set_link_state(link_instance, LINK_STATE_ERROR); } } @@ -607,14 +614,15 @@ static void on_send_complete(void* context, IO_SEND_RESULT send_result) { LIST_ITEM_HANDLE delivery_instance_list_item = (LIST_ITEM_HANDLE)context; - DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)singlylinkedlist_item_get_value(delivery_instance_list_item); - LINK_INSTANCE* link_instance = (LINK_INSTANCE*)delivery_instance->link; + ASYNC_OPERATION_HANDLE pending_delivery_operation = (ASYNC_OPERATION_HANDLE)singlylinkedlist_item_get_value(delivery_instance_list_item); + DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)GET_ASYNC_OPERATION_CONTEXT(DELIVERY_INSTANCE, pending_delivery_operation); + LINK_HANDLE link = (LINK_HANDLE)delivery_instance->link; (void)send_result; - if (link_instance->snd_settle_mode == sender_settle_mode_settled) + if (link->snd_settle_mode == sender_settle_mode_settled) { delivery_instance->on_delivery_settled(delivery_instance->callback_context, delivery_instance->delivery_id, LINK_DELIVERY_SETTLE_REASON_SETTLED, NULL); - free(delivery_instance); - (void)singlylinkedlist_remove(link_instance->pending_deliveries, delivery_instance_list_item); + async_operation_destroy(pending_delivery_operation); + (void)singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item); } } @@ -1091,129 +1099,186 @@ return result; } -LINK_TRANSFER_RESULT link_transfer_async(LINK_HANDLE link, message_format message_format, PAYLOAD* payloads, size_t payload_count, ON_DELIVERY_SETTLED on_delivery_settled, void* callback_context, tickcounter_ms_t timeout) +static bool remove_pending_delivery_condition_function(const void* item, const void* match_context, bool* continue_processing) { - LINK_TRANSFER_RESULT result; + bool result; + + if (item == match_context) + { + result = true; + *continue_processing = false; + } + else + { + result = false; + *continue_processing = true; + } + + return result; +} - if (link == NULL) +static void link_transfer_cancel_handler(ASYNC_OPERATION_HANDLE link_transfer_operation) +{ + DELIVERY_INSTANCE* pending_delivery = GET_ASYNC_OPERATION_CONTEXT(DELIVERY_INSTANCE, link_transfer_operation); + if (pending_delivery->on_delivery_settled != NULL) { - result = LINK_TRANSFER_ERROR; + pending_delivery->on_delivery_settled(pending_delivery->callback_context, pending_delivery->delivery_id, LINK_DELIVERY_SETTLE_REASON_CANCELLED, NULL); + } + free(pending_delivery); + + (void)singlylinkedlist_remove_if(((LINK_HANDLE)pending_delivery->link)->pending_deliveries, remove_pending_delivery_condition_function, pending_delivery); +} + +ASYNC_OPERATION_HANDLE link_transfer_async(LINK_HANDLE link, message_format message_format, PAYLOAD* payloads, size_t payload_count, ON_DELIVERY_SETTLED on_delivery_settled, void* callback_context, LINK_TRANSFER_RESULT* link_transfer_error, tickcounter_ms_t timeout) +{ + ASYNC_OPERATION_HANDLE result; + + if ((link == NULL) || + (link_transfer_error == NULL)) + { + if (link_transfer_error != NULL) + { + *link_transfer_error = LINK_TRANSFER_ERROR; + } + + result = NULL; } else { if ((link->role != role_sender) || (link->link_state != LINK_STATE_ATTACHED)) { - result = LINK_TRANSFER_ERROR; + *link_transfer_error = LINK_TRANSFER_ERROR; + result = NULL; } else if (link->link_credit == 0) { - result = LINK_TRANSFER_BUSY; + *link_transfer_error = LINK_TRANSFER_BUSY; + result = NULL; } else { - TRANSFER_HANDLE transfer = transfer_create(0); - if (transfer == NULL) - { - result = LINK_TRANSFER_ERROR; - } - else + result = CREATE_ASYNC_OPERATION(DELIVERY_INSTANCE, link_transfer_cancel_handler); + if (result != NULL) { - sequence_no delivery_count = link->delivery_count + 1; - unsigned char delivery_tag_bytes[sizeof(delivery_count)]; - delivery_tag delivery_tag; - bool settled; - - (void)memcpy(delivery_tag_bytes, &delivery_count, sizeof(delivery_count)); - - delivery_tag.bytes = &delivery_tag_bytes; - delivery_tag.length = sizeof(delivery_tag_bytes); - - if (link->snd_settle_mode == sender_settle_mode_unsettled) + TRANSFER_HANDLE transfer = transfer_create(0); + if (transfer == NULL) { - settled = false; + *link_transfer_error = LINK_TRANSFER_ERROR; + async_operation_destroy(result); + result = NULL; } else { - settled = true; - } + sequence_no delivery_count = link->delivery_count + 1; + unsigned char delivery_tag_bytes[sizeof(delivery_count)]; + delivery_tag delivery_tag; + bool settled; + + (void)memcpy(delivery_tag_bytes, &delivery_count, sizeof(delivery_count)); + + delivery_tag.bytes = &delivery_tag_bytes; + delivery_tag.length = sizeof(delivery_tag_bytes); - if ((transfer_set_delivery_tag(transfer, delivery_tag) != 0) || - (transfer_set_message_format(transfer, message_format) != 0) || - (transfer_set_settled(transfer, settled) != 0)) - { - result = LINK_TRANSFER_ERROR; - } - else - { - AMQP_VALUE transfer_value = amqpvalue_create_transfer(transfer); + if (link->snd_settle_mode == sender_settle_mode_unsettled) + { + settled = false; + } + else + { + settled = true; + } - if (transfer_value == NULL) + if ((transfer_set_delivery_tag(transfer, delivery_tag) != 0) || + (transfer_set_message_format(transfer, message_format) != 0) || + (transfer_set_settled(transfer, settled) != 0)) { - result = LINK_TRANSFER_ERROR; + *link_transfer_error = LINK_TRANSFER_ERROR; + async_operation_destroy(result); + result = NULL; } else { - DELIVERY_INSTANCE* pending_delivery = (DELIVERY_INSTANCE*)malloc(sizeof(DELIVERY_INSTANCE)); - if (pending_delivery == NULL) + AMQP_VALUE transfer_value = amqpvalue_create_transfer(transfer); + + if (transfer_value == NULL) { - result = LINK_TRANSFER_ERROR; + *link_transfer_error = LINK_TRANSFER_ERROR; + async_operation_destroy(result); + result = NULL; } else { - if (tickcounter_get_current_ms(link->tick_counter, &pending_delivery->start_tick) != 0) + DELIVERY_INSTANCE* pending_delivery = GET_ASYNC_OPERATION_CONTEXT(DELIVERY_INSTANCE, result); + if (pending_delivery == NULL) { - free(pending_delivery); - result = LINK_TRANSFER_ERROR; + *link_transfer_error = LINK_TRANSFER_ERROR; + async_operation_destroy(result); + result = NULL; } else { - LIST_ITEM_HANDLE delivery_instance_list_item; - pending_delivery->timeout = timeout; - pending_delivery->on_delivery_settled = on_delivery_settled; - pending_delivery->callback_context = callback_context; - pending_delivery->link = link; - delivery_instance_list_item = singlylinkedlist_add(link->pending_deliveries, pending_delivery); - - if (delivery_instance_list_item == NULL) + if (tickcounter_get_current_ms(link->tick_counter, &pending_delivery->start_tick) != 0) { free(pending_delivery); - result = LINK_TRANSFER_ERROR; + *link_transfer_error = LINK_TRANSFER_ERROR; + async_operation_destroy(result); + result = NULL; } else { - /* here we should feed data to the transfer frame */ - switch (session_send_transfer(link->link_endpoint, transfer, payloads, payload_count, &pending_delivery->delivery_id, (settled) ? on_send_complete : NULL, delivery_instance_list_item)) - { - default: - case SESSION_SEND_TRANSFER_ERROR: - singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item); - free(pending_delivery); - result = LINK_TRANSFER_ERROR; - break; + LIST_ITEM_HANDLE delivery_instance_list_item; + pending_delivery->timeout = timeout; + pending_delivery->on_delivery_settled = on_delivery_settled; + pending_delivery->callback_context = callback_context; + pending_delivery->link = link; + delivery_instance_list_item = singlylinkedlist_add(link->pending_deliveries, result); - case SESSION_SEND_TRANSFER_BUSY: - /* Ensure we remove from list again since sender will attempt to transfer again on flow on */ - singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item); + if (delivery_instance_list_item == NULL) + { free(pending_delivery); - result = LINK_TRANSFER_BUSY; - break; + *link_transfer_error = LINK_TRANSFER_ERROR; + async_operation_destroy(result); + result = NULL; + } + else + { + /* here we should feed data to the transfer frame */ + switch (session_send_transfer(link->link_endpoint, transfer, payloads, payload_count, &pending_delivery->delivery_id, (settled) ? on_send_complete : NULL, delivery_instance_list_item)) + { + default: + case SESSION_SEND_TRANSFER_ERROR: + singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item); + free(pending_delivery); + *link_transfer_error = LINK_TRANSFER_ERROR; + async_operation_destroy(result); + result = NULL; + break; - case SESSION_SEND_TRANSFER_OK: - link->delivery_count = delivery_count; - link->link_credit--; - result = LINK_TRANSFER_OK; - break; + case SESSION_SEND_TRANSFER_BUSY: + /* Ensure we remove from list again since sender will attempt to transfer again on flow on */ + singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item); + free(pending_delivery); + *link_transfer_error = LINK_TRANSFER_BUSY; + async_operation_destroy(result); + result = NULL; + break; + + case SESSION_SEND_TRANSFER_OK: + link->delivery_count = delivery_count; + link->link_credit--; + break; + } } } } + + amqpvalue_destroy(transfer_value); } - - amqpvalue_destroy(transfer_value); } + + transfer_destroy(transfer); } - - transfer_destroy(transfer); } } } @@ -1291,7 +1356,8 @@ while (item != NULL) { LIST_ITEM_HANDLE next_item = singlylinkedlist_get_next_item(item); - DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)singlylinkedlist_item_get_value(item); + ASYNC_OPERATION_HANDLE delivery_instance_async_operation = (ASYNC_OPERATION_HANDLE)singlylinkedlist_item_get_value(item); + DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)GET_ASYNC_OPERATION_CONTEXT(DELIVERY_INSTANCE, delivery_instance_async_operation); if ((delivery_instance->timeout != 0) && (current_tick - delivery_instance->start_tick >= delivery_instance->timeout)) @@ -1302,7 +1368,7 @@ } (void)singlylinkedlist_remove(link->pending_deliveries, item); - free(delivery_instance); + async_operation_destroy(delivery_instance_async_operation); } item = next_item;