A small memory footprint AMQP implimentation
Dependents: iothub_client_sample_amqp remote_monitoring simplesample_amqp
Diff: link.c
- Revision:
- 34:6be9c2058664
- Parent:
- 29:4a11413cf217
- Child:
- 36:8e1d94b0a70c
diff -r 08b53020ff0d -r 6be9c2058664 link.c --- a/link.c Mon Sep 25 13:38:40 2017 -0700 +++ b/link.c Sat Oct 21 20:12:19 2017 +0000 @@ -9,6 +9,7 @@ #include "azure_c_shared_utility/optimize_size.h" #include "azure_c_shared_utility/xlogging.h" #include "azure_c_shared_utility/singlylinkedlist.h" +#include "azure_c_shared_utility/tickcounter.h" #include "azure_uamqp_c/link.h" #include "azure_uamqp_c/session.h" #include "azure_uamqp_c/amqpvalue.h" @@ -23,6 +24,8 @@ ON_DELIVERY_SETTLED on_delivery_settled; void* callback_context; void* link; + tickcounter_ms_t start_tick; + tickcounter_ms_t timeout; } DELIVERY_INSTANCE; typedef struct LINK_INSTANCE_TAG @@ -55,6 +58,7 @@ unsigned char* received_payload; uint32_t received_payload_size; delivery_number received_delivery_id; + TICK_COUNTER_HANDLE tick_counter; } LINK_INSTANCE; static void set_link_state(LINK_INSTANCE* link_instance, LINK_STATE link_state) @@ -639,37 +643,49 @@ result->received_payload_size = 0; result->received_delivery_id = 0; - result->pending_deliveries = singlylinkedlist_create(); - if (result->pending_deliveries == NULL) + result->tick_counter = tickcounter_create(); + if (result->tick_counter == NULL) { free(result); result = NULL; } else { - size_t name_length = strlen(name); - result->name = (char*)malloc(name_length + 1); - if (result->name == NULL) + result->pending_deliveries = singlylinkedlist_create(); + if (result->pending_deliveries == NULL) { - singlylinkedlist_destroy(result->pending_deliveries); + tickcounter_destroy(result->tick_counter); free(result); result = NULL; } else { - result->on_link_state_changed = NULL; - result->callback_context = NULL; - set_link_state(result, LINK_STATE_DETACHED); - - (void)memcpy(result->name, name, name_length + 1); - result->link_endpoint = session_create_link_endpoint(session, name); - if (result->link_endpoint == NULL) + size_t name_length = strlen(name); + result->name = (char*)malloc(name_length + 1); + if (result->name == NULL) { + tickcounter_destroy(result->tick_counter); singlylinkedlist_destroy(result->pending_deliveries); - free(result->name); free(result); result = NULL; } + else + { + result->on_link_state_changed = NULL; + result->callback_context = NULL; + set_link_state(result, LINK_STATE_DETACHED); + + (void)memcpy(result->name, name, name_length + 1); + result->link_endpoint = session_create_link_endpoint(session, name); + if (result->link_endpoint == NULL) + { + tickcounter_destroy(result->tick_counter); + singlylinkedlist_destroy(result->pending_deliveries); + free(result->name); + free(result); + result = NULL; + } + } } } } @@ -709,28 +725,39 @@ result->role = role_sender; } - result->pending_deliveries = singlylinkedlist_create(); - if (result->pending_deliveries == NULL) + result->tick_counter = tickcounter_create(); + if (result->tick_counter == NULL) { free(result); result = NULL; } else { - size_t name_length = strlen(name); - result->name = (char*)malloc(name_length + 1); - if (result->name == NULL) + result->pending_deliveries = singlylinkedlist_create(); + if (result->pending_deliveries == NULL) { - singlylinkedlist_destroy(result->pending_deliveries); + tickcounter_destroy(result->tick_counter); free(result); result = NULL; } else { - (void)memcpy(result->name, name, name_length + 1); - result->on_link_state_changed = NULL; - result->callback_context = NULL; - result->link_endpoint = link_endpoint; + size_t name_length = strlen(name); + result->name = (char*)malloc(name_length + 1); + if (result->name == NULL) + { + tickcounter_destroy(result->tick_counter); + singlylinkedlist_destroy(result->pending_deliveries); + free(result); + result = NULL; + } + else + { + (void)memcpy(result->name, name, name_length + 1); + result->on_link_state_changed = NULL; + result->callback_context = NULL; + result->link_endpoint = link_endpoint; + } } } } @@ -743,6 +770,7 @@ if (link != NULL) { remove_all_pending_deliveries((LINK_INSTANCE*)link, false); + tickcounter_destroy(link->tick_counter); link->on_link_state_changed = NULL; (void)link_detach(link, true); @@ -1063,7 +1091,7 @@ return result; } -LINK_TRANSFER_RESULT link_transfer(LINK_HANDLE link, message_format message_format, PAYLOAD* payloads, size_t payload_count, ON_DELIVERY_SETTLED on_delivery_settled, void* callback_context) +LINK_TRANSFER_RESULT link_transfer_async(LINK_HANDLE link, message_format message_format, PAYLOAD* payloads, size_t payload_count, ON_DELIVERY_SETTLED on_delivery_settled, void* callback_context, tickcounter_ms_t timeout) { LINK_TRANSFER_RESULT result; @@ -1133,41 +1161,50 @@ } else { - LIST_ITEM_HANDLE delivery_instance_list_item; - pending_delivery->on_delivery_settled = on_delivery_settled; - pending_delivery->callback_context = callback_context; - pending_delivery->link = link; - delivery_instance_list_item = singlylinkedlist_add(link->pending_deliveries, pending_delivery); - - if (delivery_instance_list_item == NULL) + if (tickcounter_get_current_ms(link->tick_counter, &pending_delivery->start_tick) != 0) { free(pending_delivery); result = LINK_TRANSFER_ERROR; } else { - /* here we should feed data to the transfer frame */ - switch (session_send_transfer(link->link_endpoint, transfer, payloads, payload_count, &pending_delivery->delivery_id, (settled) ? on_send_complete : NULL, delivery_instance_list_item)) + LIST_ITEM_HANDLE delivery_instance_list_item; + pending_delivery->timeout = timeout; + pending_delivery->on_delivery_settled = on_delivery_settled; + pending_delivery->callback_context = callback_context; + pending_delivery->link = link; + delivery_instance_list_item = singlylinkedlist_add(link->pending_deliveries, pending_delivery); + + if (delivery_instance_list_item == NULL) { - default: - case SESSION_SEND_TRANSFER_ERROR: - singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item); free(pending_delivery); result = LINK_TRANSFER_ERROR; - break; + } + else + { + /* here we should feed data to the transfer frame */ + switch (session_send_transfer(link->link_endpoint, transfer, payloads, payload_count, &pending_delivery->delivery_id, (settled) ? on_send_complete : NULL, delivery_instance_list_item)) + { + default: + case SESSION_SEND_TRANSFER_ERROR: + singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item); + free(pending_delivery); + result = LINK_TRANSFER_ERROR; + break; - case SESSION_SEND_TRANSFER_BUSY: - /* Ensure we remove from list again since sender will attempt to transfer again on flow on */ - singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item); - free(pending_delivery); - result = LINK_TRANSFER_BUSY; - break; + case SESSION_SEND_TRANSFER_BUSY: + /* Ensure we remove from list again since sender will attempt to transfer again on flow on */ + singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item); + free(pending_delivery); + result = LINK_TRANSFER_BUSY; + break; - case SESSION_SEND_TRANSFER_OK: - link->delivery_count = delivery_count; - link->link_credit--; - result = LINK_TRANSFER_OK; - break; + case SESSION_SEND_TRANSFER_OK: + link->delivery_count = delivery_count; + link->link_credit--; + result = LINK_TRANSFER_OK; + break; + } } } } @@ -1236,3 +1273,40 @@ } return result; } + +void link_dowork(LINK_HANDLE link) +{ + if (link != NULL) + { + tickcounter_ms_t current_tick; + + if (tickcounter_get_current_ms(link->tick_counter, ¤t_tick) != 0) + { + /* error */ + } + else + { + // go through all and find timed out deliveries + LIST_ITEM_HANDLE item = singlylinkedlist_get_head_item(link->pending_deliveries); + while (item != NULL) + { + LIST_ITEM_HANDLE next_item = singlylinkedlist_get_next_item(item); + DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)singlylinkedlist_item_get_value(item); + + if ((delivery_instance->timeout != 0) && + (current_tick - delivery_instance->start_tick >= delivery_instance->timeout)) + { + if (delivery_instance->on_delivery_settled != NULL) + { + delivery_instance->on_delivery_settled(delivery_instance->callback_context, delivery_instance->delivery_id, LINK_DELIVERY_SETTLE_REASON_TIMEOUT, NULL); + } + + (void)singlylinkedlist_remove(link->pending_deliveries, item); + free(delivery_instance); + } + + item = next_item; + } + } + } +}