A small memory footprint AMQP implimentation
Dependents: iothub_client_sample_amqp remote_monitoring simplesample_amqp
Diff: link.c
- Revision:
- 12:b30dacf113f2
- Parent:
- 8:20dc883fb313
- Child:
- 13:9abd748f4e78
diff -r b9de84324501 -r b30dacf113f2 link.c --- a/link.c Thu Sep 22 18:16:30 2016 -0700 +++ b/link.c Thu Oct 20 17:07:44 2016 -0700 @@ -15,7 +15,7 @@ #include "azure_uamqp_c/amqpalloc.h" #include "azure_uamqp_c/amqp_frame_codec.h" #include "azure_c_shared_utility/xlogging.h" -#include "azure_c_shared_utility/list.h" +#include "azure_c_shared_utility/singlylinkedlist.h" #define DEFAULT_LINK_CREDIT 10000 @@ -37,7 +37,7 @@ handle handle; LINK_ENDPOINT_HANDLE link_endpoint; char* name; - LIST_HANDLE pending_deliveries; + SINGLYLINKEDLIST_HANDLE pending_deliveries; sequence_no delivery_count; role role; ON_LINK_STATE_CHANGED on_link_state_changed; @@ -51,7 +51,11 @@ uint32_t link_credit; uint32_t available; fields attach_properties; - int is_underlying_session_begun : 1; + bool is_underlying_session_begun; + bool is_closed; + unsigned char* received_payload; + uint32_t received_payload_size; + delivery_number received_delivery_id; } LINK_INSTANCE; static void set_link_state(LINK_INSTANCE* link_instance, LINK_STATE link_state) @@ -135,7 +139,7 @@ return result; } -static int send_detach_frame(LINK_INSTANCE* link_instance, ERROR_HANDLE error_handle) +static int send_detach(LINK_INSTANCE* link_instance, bool close, ERROR_HANDLE error_handle) { int result; DETACH_HANDLE detach_performative; @@ -152,7 +156,12 @@ { result = __LINE__; } - else + else if (close && + (detach_set_closed(detach_performative, true) != 0)) + { + result = __LINE__; + } + else { if (session_send_detach(link_instance->link_endpoint, detach_performative) != 0) { @@ -160,6 +169,12 @@ } else { + if (close) + { + /* Declare link to be closed */ + link_instance->is_closed = true; + } + result = 0; } } @@ -170,6 +185,55 @@ return result; } +static int send_attach(LINK_INSTANCE* link, const char* name, handle handle, role role) +{ + int result; + ATTACH_HANDLE attach = attach_create(name, handle, role); + + if (attach == NULL) + { + result = __LINE__; + } + else + { + result = 0; + + link->delivery_count = link->initial_delivery_count; + + attach_set_snd_settle_mode(attach, link->snd_settle_mode); + attach_set_rcv_settle_mode(attach, link->rcv_settle_mode); + attach_set_role(attach, role); + attach_set_source(attach, link->source); + attach_set_target(attach, link->target); + attach_set_properties(attach, link->attach_properties); + + if (role == role_sender) + { + if (attach_set_initial_delivery_count(attach, link->delivery_count) != 0) + { + result = __LINE__; + } + } + + if (result == 0) + { + if ((attach_set_max_message_size(attach, link->max_message_size) != 0) || + (session_send_attach(link->link_endpoint, attach) != 0)) + { + result = __LINE__; + } + else + { + result = 0; + } + } + + attach_destroy(attach); + } + + return result; +} + static void link_frame_received(void* context, AMQP_VALUE performative, uint32_t payload_size, const unsigned char* payload_bytes) { LINK_INSTANCE* link_instance = (LINK_INSTANCE*)context; @@ -244,7 +308,7 @@ if (amqpvalue_get_transfer(performative, &transfer_handle) == 0) { AMQP_VALUE delivery_state; - delivery_number received_delivery_id; + bool more; link_instance->link_credit--; link_instance->delivery_count++; @@ -254,24 +318,80 @@ send_flow(link_instance); } - if (transfer_get_delivery_id(transfer_handle, &received_delivery_id) != 0) - { - /* error */ - } - else - { - delivery_state = link_instance->on_transfer_received(link_instance->callback_context, transfer_handle, payload_size, payload_bytes); + if (transfer_get_more(transfer_handle, &more) != 0) + { + LogError("Could not get the more field from the transfer performative"); + } + else + { + bool is_error = false; + + if (transfer_get_delivery_id(transfer_handle, &link_instance->received_delivery_id) != 0) + { + /* is this not a continuation transfer? */ + if (link_instance->received_payload_size == 0) + { + LogError("Could not get the delivery Id from the transfer performative"); + is_error = true; + } + } + + if (!is_error) + { + /* If this is a continuation transfer or if this is the first chunk of a multi frame transfer */ + if ((link_instance->received_payload_size > 0) || more) + { + unsigned char* new_received_payload = (unsigned char*)realloc(link_instance->received_payload, link_instance->received_payload_size + payload_size); + if (new_received_payload == NULL) + { + LogError("Could not allocate memory for the received payload"); + } + else + { + link_instance->received_payload = new_received_payload; + (void)memcpy(link_instance->received_payload + link_instance->received_payload_size, payload_bytes, payload_size); + link_instance->received_payload_size += payload_size; + } + } - if (send_disposition(link_instance, received_delivery_id, delivery_state) != 0) - { - /* error */ - } + if (!more) + { + const unsigned char* indicate_payload_bytes; + uint32_t indicate_payload_size; + + /* if no previously stored chunks then simply report the current payload */ + if (link_instance->received_payload_size > 0) + { + indicate_payload_size = link_instance->received_payload_size; + indicate_payload_bytes = link_instance->received_payload; + } + else + { + indicate_payload_size = payload_size; + indicate_payload_bytes = payload_bytes; + } + + delivery_state = link_instance->on_transfer_received(link_instance->callback_context, transfer_handle, indicate_payload_size, indicate_payload_bytes); - if (delivery_state != NULL) - { - amqpvalue_destroy(delivery_state); - } - } + if (link_instance->received_payload_size > 0) + { + free(link_instance->received_payload); + link_instance->received_payload = NULL; + link_instance->received_payload_size = 0; + } + + if (send_disposition(link_instance, link_instance->received_delivery_id, delivery_state) != 0) + { + LogError("Cannot send disposition frame"); + } + + if (delivery_state != NULL) + { + amqpvalue_destroy(delivery_state); + } + } + } + } transfer_destroy(transfer_handle); } @@ -310,11 +430,11 @@ if (settled) { - LIST_ITEM_HANDLE pending_delivery = list_get_head_item(link_instance->pending_deliveries); + LIST_ITEM_HANDLE pending_delivery = singlylinkedlist_get_head_item(link_instance->pending_deliveries); while (pending_delivery != NULL) { - LIST_ITEM_HANDLE next_pending_delivery = list_get_next_item(pending_delivery); - DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)list_item_get_value(pending_delivery); + LIST_ITEM_HANDLE next_pending_delivery = singlylinkedlist_get_next_item(pending_delivery); + DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)singlylinkedlist_item_get_value(pending_delivery); if (delivery_instance == NULL) { /* error */ @@ -333,7 +453,7 @@ { delivery_instance->on_delivery_settled(delivery_instance->callback_context, delivery_instance->delivery_id, delivery_state); amqpalloc_free(delivery_instance); - if (list_remove(link_instance->pending_deliveries, pending_delivery) != 0) + if (singlylinkedlist_remove(link_instance->pending_deliveries, pending_delivery) != 0) { /* error */ break; @@ -360,12 +480,10 @@ { DETACH_HANDLE detach; - /* Respond with ack */ - (void)send_detach_frame(link_instance, NULL); - /* Set link state appropriately based on whether we received detach condition */ if (amqpvalue_get_detach(performative, &detach) == 0) { + bool closed = false; ERROR_HANDLE error; if (detach_get_error(detach, &error) == 0) { @@ -373,90 +491,36 @@ set_link_state(link_instance, LINK_STATE_ERROR); } - else + else { + (void)detach_get_closed(detach, &closed); + set_link_state(link_instance, LINK_STATE_DETACHED); } + + /* Received a detach while attached */ + if (link_instance->previous_link_state == LINK_STATE_ATTACHED) + { + /* Respond with ack */ + (void)send_detach(link_instance, closed, NULL); + } + + /* Received a closing detach after we sent a non-closing detach. */ + else if (closed && + (link_instance->previous_link_state == LINK_STATE_HALF_ATTACHED) && + !link_instance->is_closed) + { + + /* In this case, we MUST signal that we closed by reattaching and then sending a closing detach.*/ + (void)send_attach(link_instance, link_instance->name, 0, link_instance->role); + (void)send_detach(link_instance, true, NULL); + } + + detach_destroy(detach); } } } -static int send_attach(LINK_INSTANCE* link, const char* name, handle handle, role role) -{ - int result; - ATTACH_HANDLE attach = attach_create(name, handle, role); - - if (attach == NULL) - { - result = __LINE__; - } - else - { - result = 0; - - link->delivery_count = link->initial_delivery_count; - - attach_set_snd_settle_mode(attach, link->snd_settle_mode); - attach_set_rcv_settle_mode(attach, link->rcv_settle_mode); - attach_set_role(attach, role); - attach_set_source(attach, link->source); - attach_set_target(attach, link->target); - attach_set_properties(attach, link->attach_properties); - - if (role == role_sender) - { - if (attach_set_initial_delivery_count(attach, link->delivery_count) != 0) - { - result = __LINE__; - } - } - - if (result == 0) - { - if ((attach_set_max_message_size(attach, link->max_message_size) != 0) || - (session_send_attach(link->link_endpoint, attach) != 0)) - { - result = __LINE__; - } - else - { - result = 0; - } - } - - attach_destroy(attach); - } - - return result; -} - -static int send_detach(LINK_INSTANCE* link_instance, ERROR_HANDLE error) -{ - int result; - - DETACH_HANDLE detach = detach_create(0); - if (detach == NULL) - { - result = __LINE__; - } - else - { - if (((error != NULL) && (detach_set_error(detach, error) != 0)) || - (session_send_detach(link_instance->link_endpoint, detach) != 0)) - { - result = __LINE__; - } - else - { - result = 0; - } - - detach_destroy(detach); - } - - return result; -} - static void on_session_state_changed(void* context, SESSION_STATE new_session_state, SESSION_STATE previous_session_state) { LINK_INSTANCE* link_instance = (LINK_INSTANCE*)context; @@ -464,7 +528,7 @@ if (new_session_state == SESSION_STATE_MAPPED) { - if (link_instance->link_state == LINK_STATE_DETACHED) + if ((link_instance->link_state == LINK_STATE_DETACHED) && (!link_instance->is_closed)) { if (send_attach(link_instance, link_instance->name, 0, link_instance->role) == 0) { @@ -494,14 +558,14 @@ static void on_send_complete(void* context, IO_SEND_RESULT send_result) { LIST_ITEM_HANDLE delivery_instance_list_item = (LIST_ITEM_HANDLE)context; - DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)list_item_get_value(delivery_instance_list_item); + DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)singlylinkedlist_item_get_value(delivery_instance_list_item); LINK_INSTANCE* link_instance = (LINK_INSTANCE*)delivery_instance->link; (void)send_result; if (link_instance->snd_settle_mode == sender_settle_mode_settled) { delivery_instance->on_delivery_settled(delivery_instance->callback_context, delivery_instance->delivery_id, NULL); amqpalloc_free(delivery_instance); - (void)list_remove(link_instance->pending_deliveries, delivery_instance_list_item); + (void)singlylinkedlist_remove(link_instance->pending_deliveries, delivery_instance_list_item); } } @@ -522,10 +586,14 @@ result->delivery_count = 0; result->initial_delivery_count = 0; result->max_message_size = 0; - result->is_underlying_session_begun = 0; + result->is_underlying_session_begun = false; + result->is_closed = false; result->attach_properties = NULL; + result->received_payload = NULL; + result->received_payload_size = 0; + result->received_delivery_id = 0; - result->pending_deliveries = list_create(); + result->pending_deliveries = singlylinkedlist_create(); if (result->pending_deliveries == NULL) { amqpalloc_free(result); @@ -536,7 +604,7 @@ result->name = amqpalloc_malloc(strlen(name) + 1); if (result->name == NULL) { - list_destroy(result->pending_deliveries); + singlylinkedlist_destroy(result->pending_deliveries); amqpalloc_free(result); result = NULL; } @@ -550,7 +618,7 @@ result->link_endpoint = session_create_link_endpoint(session, name); if (result->link_endpoint == NULL) { - list_destroy(result->pending_deliveries); + singlylinkedlist_destroy(result->pending_deliveries); amqpalloc_free(result->name); amqpalloc_free(result); result = NULL; @@ -576,8 +644,12 @@ result->delivery_count = 0; result->initial_delivery_count = 0; result->max_message_size = 0; - result->is_underlying_session_begun = 0; + result->is_underlying_session_begun = false; + result->is_closed = false; result->attach_properties = NULL; + result->received_payload = NULL; + result->received_payload_size = 0; + result->received_delivery_id = 0; result->source = amqpvalue_clone(target); result->target = amqpvalue_clone(source); if (role == role_sender) @@ -589,7 +661,7 @@ result->role = role_sender; } - result->pending_deliveries = list_create(); + result->pending_deliveries = singlylinkedlist_create(); if (result->pending_deliveries == NULL) { amqpalloc_free(result); @@ -600,7 +672,7 @@ result->name = amqpalloc_malloc(strlen(name) + 1); if (result->name == NULL) { - list_destroy(result->pending_deliveries); + singlylinkedlist_destroy(result->pending_deliveries); amqpalloc_free(result); result = NULL; } @@ -621,19 +693,18 @@ { if (link != NULL) { - link->on_link_state_changed = NULL; - link_detach(link); - - session_destroy_link_endpoint(link->link_endpoint); + link->on_link_state_changed = NULL; + (void)link_detach(link, true); + session_destroy_link_endpoint(link->link_endpoint); amqpvalue_destroy(link->source); amqpvalue_destroy(link->target); if (link->pending_deliveries != NULL) { - LIST_ITEM_HANDLE item = list_get_head_item(link->pending_deliveries); + LIST_ITEM_HANDLE item = singlylinkedlist_get_head_item(link->pending_deliveries); while (item != NULL) { - LIST_ITEM_HANDLE next_item = list_get_next_item(item); - DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)list_item_get_value(item); + LIST_ITEM_HANDLE next_item = singlylinkedlist_get_next_item(item); + DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)singlylinkedlist_item_get_value(item); if (delivery_instance != NULL) { amqpalloc_free(delivery_instance); @@ -642,7 +713,7 @@ item = next_item; } - list_destroy(link->pending_deliveries); + singlylinkedlist_destroy(link->pending_deliveries); } if (link->name != NULL) @@ -655,6 +726,11 @@ amqpvalue_destroy(link->attach_properties); } + if (link->received_payload != NULL) + { + free(link->received_payload); + } + amqpalloc_free(link); } } @@ -828,7 +904,8 @@ { int result; - if (link == NULL) + if ((link == NULL) || + (link->is_closed)) { result = __LINE__; } @@ -847,7 +924,7 @@ } else { - link->is_underlying_session_begun = 1; + link->is_underlying_session_begun = true; if (session_start_link_endpoint(link->link_endpoint, link_frame_received, on_session_state_changed, on_session_flow_on, link) != 0) { @@ -855,6 +932,8 @@ } else { + link->received_payload_size = 0; + result = 0; } } @@ -868,40 +947,57 @@ return result; } -int link_detach(LINK_HANDLE link) +int link_detach(LINK_HANDLE link, bool close) { int result; - if (link == NULL) - { + if ((link == NULL) || + (link->is_closed)) + { result = __LINE__; } else { - if (link->link_state == LINK_STATE_ERROR) - { - result = __LINE__; - } - else if ((link->link_state == LINK_STATE_HALF_ATTACHED) || - (link->link_state == LINK_STATE_ATTACHED)) - { - if (send_detach(link, NULL) != 0) - { - result = __LINE__; - } - else - { - set_link_state(link, LINK_STATE_DETACHED); - link->on_link_state_changed = NULL; - result = 0; - } - } - else - { - set_link_state(link, LINK_STATE_DETACHED); - link->on_link_state_changed = NULL; - result = 0; - } + switch (link->link_state) + { + + case LINK_STATE_HALF_ATTACHED: + /* Sending detach when remote is not yet attached */ + if (send_detach(link, close, NULL) != 0) + { + result = __LINE__; + } + else + { + set_link_state(link, LINK_STATE_DETACHED); + result = 0; + } + break; + + case LINK_STATE_ATTACHED: + /* Send detach and wait for remote to respond */ + if (send_detach(link, close, NULL) != 0) + { + result = __LINE__; + } + else + { + set_link_state(link, LINK_STATE_HALF_ATTACHED); + result = 0; + } + break; + + case LINK_STATE_DETACHED: + /* Already detached */ + result = 0; + break; + + default: + case LINK_STATE_ERROR: + /* Already detached and in error state */ + result = __LINE__; + break; + } } return result; @@ -981,7 +1077,7 @@ pending_delivery->on_delivery_settled = on_delivery_settled; pending_delivery->callback_context = callback_context; pending_delivery->link = link; - delivery_instance_list_item = list_add(link->pending_deliveries, pending_delivery); + delivery_instance_list_item = singlylinkedlist_add(link->pending_deliveries, pending_delivery); if (delivery_instance_list_item == NULL) { @@ -995,14 +1091,14 @@ { default: case SESSION_SEND_TRANSFER_ERROR: - list_remove(link->pending_deliveries, delivery_instance_list_item); + singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item); amqpalloc_free(pending_delivery); result = LINK_TRANSFER_ERROR; break; case SESSION_SEND_TRANSFER_BUSY: /* Ensure we remove from list again since sender will attempt to transfer again on flow on */ - list_remove(link->pending_deliveries, delivery_instance_list_item); + singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item); amqpalloc_free(pending_delivery); result = LINK_TRANSFER_BUSY; break;