Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Diff: uamqp/src/link.c
- Revision:
- 0:f7f1f0d76dd6
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/uamqp/src/link.c Thu Aug 23 06:52:14 2018 +0000 @@ -0,0 +1,1655 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +#include <stdlib.h> +#include <string.h> +#include <stdint.h> +#include <stdbool.h> +#include "azure_c_shared_utility/gballoc.h" +#include "azure_c_shared_utility/optimize_size.h" +#include "azure_c_shared_utility/xlogging.h" +#include "azure_c_shared_utility/singlylinkedlist.h" +#include "azure_c_shared_utility/tickcounter.h" +#include "azure_uamqp_c/link.h" +#include "azure_uamqp_c/session.h" +#include "azure_uamqp_c/amqpvalue.h" +#include "azure_uamqp_c/amqp_definitions.h" +#include "azure_uamqp_c/amqp_frame_codec.h" +#include "azure_uamqp_c/async_operation.h" + +#define DEFAULT_LINK_CREDIT 10000 + +typedef struct DELIVERY_INSTANCE_TAG +{ + delivery_number delivery_id; + ON_DELIVERY_SETTLED on_delivery_settled; + void* callback_context; + void* link; + tickcounter_ms_t start_tick; + tickcounter_ms_t timeout; +} DELIVERY_INSTANCE; + +typedef struct ON_LINK_DETACH_EVENT_SUBSCRIPTION_TAG +{ + ON_LINK_DETACH_RECEIVED on_link_detach_received; + void* context; +} ON_LINK_DETACH_EVENT_SUBSCRIPTION; + +typedef struct LINK_INSTANCE_TAG +{ + SESSION_HANDLE session; + LINK_STATE link_state; + LINK_STATE previous_link_state; + AMQP_VALUE source; + AMQP_VALUE target; + handle handle; + LINK_ENDPOINT_HANDLE link_endpoint; + char* name; + SINGLYLINKEDLIST_HANDLE pending_deliveries; + sequence_no delivery_count; + role role; + ON_LINK_STATE_CHANGED on_link_state_changed; + ON_LINK_FLOW_ON on_link_flow_on; + ON_TRANSFER_RECEIVED on_transfer_received; + void* callback_context; + sender_settle_mode snd_settle_mode; + receiver_settle_mode rcv_settle_mode; + sequence_no initial_delivery_count; + uint64_t max_message_size; + uint64_t peer_max_message_size; + uint32_t current_link_credit; + uint32_t max_link_credit; + uint32_t available; + fields attach_properties; + bool is_underlying_session_begun; + bool is_closed; + unsigned char* received_payload; + uint32_t received_payload_size; + delivery_number received_delivery_id; + TICK_COUNTER_HANDLE tick_counter; + ON_LINK_DETACH_EVENT_SUBSCRIPTION on_link_detach_received_event_subscription; +} LINK_INSTANCE; + +DEFINE_ASYNC_OPERATION_CONTEXT(DELIVERY_INSTANCE); + +static void set_link_state(LINK_INSTANCE* link_instance, LINK_STATE link_state) +{ + link_instance->previous_link_state = link_instance->link_state; + link_instance->link_state = link_state; + + if (link_instance->on_link_state_changed != NULL) + { + link_instance->on_link_state_changed(link_instance->callback_context, link_state, link_instance->previous_link_state); + } +} + +static void remove_all_pending_deliveries(LINK_INSTANCE* link, bool indicate_settled) +{ + if (link->pending_deliveries != NULL) + { + LIST_ITEM_HANDLE item = singlylinkedlist_get_head_item(link->pending_deliveries); + while (item != NULL) + { + LIST_ITEM_HANDLE next_item = singlylinkedlist_get_next_item(item); + ASYNC_OPERATION_HANDLE pending_delivery_operation = (ASYNC_OPERATION_HANDLE)singlylinkedlist_item_get_value(item); + if (pending_delivery_operation != NULL) + { + DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)GET_ASYNC_OPERATION_CONTEXT(DELIVERY_INSTANCE, pending_delivery_operation); + if (indicate_settled && (delivery_instance->on_delivery_settled != NULL)) + { + delivery_instance->on_delivery_settled(delivery_instance->callback_context, delivery_instance->delivery_id, LINK_DELIVERY_SETTLE_REASON_NOT_DELIVERED, NULL); + } + + async_operation_destroy(pending_delivery_operation); + } + + item = next_item; + } + + singlylinkedlist_destroy(link->pending_deliveries); + link->pending_deliveries = NULL; + } +} + +static int send_flow(LINK_INSTANCE* link) +{ + int result; + FLOW_HANDLE flow = flow_create(0, 0, 0); + + if (flow == NULL) + { + LogError("NULL flow performative"); + result = __FAILURE__; + } + else + { + if (flow_set_link_credit(flow, link->current_link_credit) != 0) + { + LogError("Cannot set link credit on flow performative"); + result = __FAILURE__; + } + else if (flow_set_handle(flow, link->handle) != 0) + { + LogError("Cannot set handle on flow performative"); + result = __FAILURE__; + } + else if (flow_set_delivery_count(flow, link->delivery_count) != 0) + { + LogError("Cannot set delivery count on flow performative"); + result = __FAILURE__; + } + else + { + if (session_send_flow(link->link_endpoint, flow) != 0) + { + LogError("Sending flow frame failed in session send"); + result = __FAILURE__; + } + else + { + result = 0; + } + } + + flow_destroy(flow); + } + + return result; +} + +static int send_disposition(LINK_INSTANCE* link_instance, delivery_number delivery_number, AMQP_VALUE delivery_state) +{ + int result; + + DISPOSITION_HANDLE disposition = disposition_create(link_instance->role, delivery_number); + if (disposition == NULL) + { + LogError("NULL disposition performative"); + result = __FAILURE__; + } + else + { + if (disposition_set_last(disposition, delivery_number) != 0) + { + LogError("Failed setting last on disposition performative"); + result = __FAILURE__; + } + else if (disposition_set_settled(disposition, true) != 0) + { + LogError("Failed setting settled on disposition performative"); + result = __FAILURE__; + } + else if ((delivery_state != NULL) && (disposition_set_state(disposition, delivery_state) != 0)) + { + LogError("Failed setting state on disposition performative"); + result = __FAILURE__; + } + else + { + if (session_send_disposition(link_instance->link_endpoint, disposition) != 0) + { + LogError("Sending disposition failed in session send"); + result = __FAILURE__; + } + else + { + result = 0; + } + } + + disposition_destroy(disposition); + } + + return result; +} + +static int send_detach(LINK_INSTANCE* link_instance, bool close, ERROR_HANDLE error_handle) +{ + int result; + DETACH_HANDLE detach_performative; + + detach_performative = detach_create(0); + if (detach_performative == NULL) + { + LogError("NULL detach performative"); + result = __FAILURE__; + } + else + { + if ((error_handle != NULL) && + (detach_set_error(detach_performative, error_handle) != 0)) + { + LogError("Failed setting error on detach frame"); + result = __FAILURE__; + } + else if (close && + (detach_set_closed(detach_performative, true) != 0)) + { + LogError("Failed setting closed field on detach frame"); + result = __FAILURE__; + } + else + { + if (session_send_detach(link_instance->link_endpoint, detach_performative) != 0) + { + LogError("Sending detach frame failed in session send"); + result = __FAILURE__; + } + else + { + if (close) + { + /* Declare link to be closed */ + link_instance->is_closed = true; + } + + result = 0; + } + } + + detach_destroy(detach_performative); + } + + return result; +} + +static int send_attach(LINK_INSTANCE* link, const char* name, handle handle, role role) +{ + int result; + ATTACH_HANDLE attach = attach_create(name, handle, role); + + if (attach == NULL) + { + LogError("NULL attach performative"); + result = __FAILURE__; + } + else + { + result = 0; + + link->delivery_count = link->initial_delivery_count; + + attach_set_snd_settle_mode(attach, link->snd_settle_mode); + attach_set_rcv_settle_mode(attach, link->rcv_settle_mode); + attach_set_role(attach, role); + attach_set_source(attach, link->source); + attach_set_target(attach, link->target); + if (link->attach_properties != NULL) + { + (void)attach_set_properties(attach, link->attach_properties); + } + + if (role == role_sender) + { + if (attach_set_initial_delivery_count(attach, link->delivery_count) != 0) + { + LogError("Cannot set attach initial delivery count"); + result = __FAILURE__; + } + } + + if (result == 0) + { + if (attach_set_max_message_size(attach, link->max_message_size) != 0) + { + LogError("Cannot set max message size"); + result = __FAILURE__; + } + else if (session_send_attach(link->link_endpoint, attach) != 0) + { + LogError("Sending attach failed in session send"); + result = __FAILURE__; + } + else + { + result = 0; + } + } + + attach_destroy(attach); + } + + return result; +} + +static void link_frame_received(void* context, AMQP_VALUE performative, uint32_t payload_size, const unsigned char* payload_bytes) +{ + LINK_INSTANCE* link_instance = (LINK_INSTANCE*)context; + AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(performative); + + if (is_attach_type_by_descriptor(descriptor)) + { + ATTACH_HANDLE attach_handle; + + if (amqpvalue_get_attach(performative, &attach_handle) != 0) + { + LogError("Cannot get attach performative"); + } + else + { + if ((link_instance->role == role_receiver) && + (attach_get_initial_delivery_count(attach_handle, &link_instance->delivery_count) != 0)) + { + LogError("Cannot get initial delivery count"); + remove_all_pending_deliveries(link_instance, true); + set_link_state(link_instance, LINK_STATE_DETACHED); + } + else + { + if (attach_get_max_message_size(attach_handle, &link_instance->peer_max_message_size) != 0) + { + LogError("Could not retrieve peer_max_message_size from attach frame"); + } + + if ((link_instance->link_state == LINK_STATE_DETACHED) || + (link_instance->link_state == LINK_STATE_HALF_ATTACHED_ATTACH_SENT)) + { + if (link_instance->role == role_receiver) + { + link_instance->current_link_credit = link_instance->max_link_credit; + send_flow(link_instance); + } + else + { + link_instance->current_link_credit = 0; + } + + if (link_instance->link_state == LINK_STATE_DETACHED) + { + set_link_state(link_instance, LINK_STATE_HALF_ATTACHED_ATTACH_RECEIVED); + } + else + { + set_link_state(link_instance, LINK_STATE_ATTACHED); + } + } + } + + attach_destroy(attach_handle); + } + } + else if (is_flow_type_by_descriptor(descriptor)) + { + FLOW_HANDLE flow_handle; + if (amqpvalue_get_flow(performative, &flow_handle) != 0) + { + LogError("Cannot get flow performative"); + } + else + { + if (link_instance->role == role_sender) + { + delivery_number rcv_delivery_count; + uint32_t rcv_link_credit; + + if (flow_get_link_credit(flow_handle, &rcv_link_credit) != 0) + { + LogError("Cannot get link credit"); + remove_all_pending_deliveries(link_instance, true); + set_link_state(link_instance, LINK_STATE_DETACHED); + } + else if (flow_get_delivery_count(flow_handle, &rcv_delivery_count) != 0) + { + LogError("Cannot get delivery count"); + remove_all_pending_deliveries(link_instance, true); + set_link_state(link_instance, LINK_STATE_DETACHED); + } + else + { + link_instance->current_link_credit = rcv_delivery_count + rcv_link_credit - link_instance->delivery_count; + if (link_instance->current_link_credit > 0) + { + link_instance->on_link_flow_on(link_instance->callback_context); + } + } + } + } + + flow_destroy(flow_handle); + } + else if (is_transfer_type_by_descriptor(descriptor)) + { + if (link_instance->on_transfer_received != NULL) + { + TRANSFER_HANDLE transfer_handle; + if (amqpvalue_get_transfer(performative, &transfer_handle) != 0) + { + LogError("Cannot get transfer performative"); + } + else + { + AMQP_VALUE delivery_state; + bool more; + bool is_error; + + link_instance->current_link_credit--; + link_instance->delivery_count++; + if (link_instance->current_link_credit == 0) + { + link_instance->current_link_credit = link_instance->max_link_credit; + send_flow(link_instance); + } + + more = false; + /* Attempt to get more flag, default to false */ + (void)transfer_get_more(transfer_handle, &more); + is_error = false; + + if (transfer_get_delivery_id(transfer_handle, &link_instance->received_delivery_id) != 0) + { + /* is this not a continuation transfer? */ + if (link_instance->received_payload_size == 0) + { + LogError("Could not get the delivery Id from the transfer performative"); + is_error = true; + } + } + + if (!is_error) + { + /* If this is a continuation transfer or if this is the first chunk of a multi frame transfer */ + if ((link_instance->received_payload_size > 0) || more) + { + unsigned char* new_received_payload = (unsigned char*)realloc(link_instance->received_payload, link_instance->received_payload_size + payload_size); + if (new_received_payload == NULL) + { + LogError("Could not allocate memory for the received payload"); + } + else + { + link_instance->received_payload = new_received_payload; + (void)memcpy(link_instance->received_payload + link_instance->received_payload_size, payload_bytes, payload_size); + link_instance->received_payload_size += payload_size; + } + } + + if (!more) + { + const unsigned char* indicate_payload_bytes; + uint32_t indicate_payload_size; + + /* if no previously stored chunks then simply report the current payload */ + if (link_instance->received_payload_size > 0) + { + indicate_payload_size = link_instance->received_payload_size; + indicate_payload_bytes = link_instance->received_payload; + } + else + { + indicate_payload_size = payload_size; + indicate_payload_bytes = payload_bytes; + } + + delivery_state = link_instance->on_transfer_received(link_instance->callback_context, transfer_handle, indicate_payload_size, indicate_payload_bytes); + + if (link_instance->received_payload_size > 0) + { + free(link_instance->received_payload); + link_instance->received_payload = NULL; + link_instance->received_payload_size = 0; + } + + if (delivery_state != NULL) + { + if (send_disposition(link_instance, link_instance->received_delivery_id, delivery_state) != 0) + { + LogError("Cannot send disposition frame"); + } + + amqpvalue_destroy(delivery_state); + } + } + } + + transfer_destroy(transfer_handle); + } + } + } + else if (is_disposition_type_by_descriptor(descriptor)) + { + DISPOSITION_HANDLE disposition; + if (amqpvalue_get_disposition(performative, &disposition) != 0) + { + LogError("Cannot get disposition performative"); + } + else + { + delivery_number first; + delivery_number last; + + if (disposition_get_first(disposition, &first) != 0) + { + LogError("Cannot get first field"); + } + else + { + bool settled; + + if (disposition_get_last(disposition, &last) != 0) + { + last = first; + } + + if (disposition_get_settled(disposition, &settled) != 0) + { + settled = false; + } + + if (settled) + { + LIST_ITEM_HANDLE pending_delivery = singlylinkedlist_get_head_item(link_instance->pending_deliveries); + while (pending_delivery != NULL) + { + LIST_ITEM_HANDLE next_pending_delivery = singlylinkedlist_get_next_item(pending_delivery); + ASYNC_OPERATION_HANDLE pending_delivery_operation = (ASYNC_OPERATION_HANDLE)singlylinkedlist_item_get_value(pending_delivery); + if (pending_delivery_operation == NULL) + { + LogError("Cannot obtain pending delivery"); + break; + } + else + { + DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)GET_ASYNC_OPERATION_CONTEXT(DELIVERY_INSTANCE, pending_delivery_operation); + + if ((delivery_instance->delivery_id >= first) && (delivery_instance->delivery_id <= last)) + { + AMQP_VALUE delivery_state; + if (disposition_get_state(disposition, &delivery_state) == 0) + { + delivery_instance->on_delivery_settled(delivery_instance->callback_context, delivery_instance->delivery_id, LINK_DELIVERY_SETTLE_REASON_DISPOSITION_RECEIVED, delivery_state); + async_operation_destroy(pending_delivery_operation); + if (singlylinkedlist_remove(link_instance->pending_deliveries, pending_delivery) != 0) + { + LogError("Cannot remove pending delivery"); + break; + } + + pending_delivery = next_pending_delivery; + } + } + else + { + pending_delivery = next_pending_delivery; + } + } + } + } + } + + disposition_destroy(disposition); + } + } + else if (is_detach_type_by_descriptor(descriptor)) + { + DETACH_HANDLE detach; + + /* Set link state appropriately based on whether we received detach condition */ + if (amqpvalue_get_detach(performative, &detach) != 0) + { + LogError("Cannot get detach performative"); + } + else + { + bool closed = false; + ERROR_HANDLE error; + + (void)detach_get_closed(detach, &closed); + + /* Received a detach while attached */ + if (link_instance->link_state == LINK_STATE_ATTACHED) + { + /* Respond with ack */ + if (send_detach(link_instance, closed, NULL) != 0) + { + LogError("Failed sending detach frame"); + } + } + /* Received a closing detach after we sent a non-closing detach. */ + else if (closed && + ((link_instance->link_state == LINK_STATE_HALF_ATTACHED_ATTACH_SENT) || (link_instance->link_state == LINK_STATE_HALF_ATTACHED_ATTACH_RECEIVED)) && + !link_instance->is_closed) + { + + /* In this case, we MUST signal that we closed by reattaching and then sending a closing detach.*/ + if (send_attach(link_instance, link_instance->name, 0, link_instance->role) != 0) + { + LogError("Failed sending attach frame"); + } + + if (send_detach(link_instance, true, NULL) != 0) + { + LogError("Failed sending detach frame"); + } + } + + if (detach_get_error(detach, &error) != 0) + { + error = NULL; + } + remove_all_pending_deliveries(link_instance, true); + // signal link detach received in order to handle cases like redirect + if (link_instance->on_link_detach_received_event_subscription.on_link_detach_received != NULL) + { + link_instance->on_link_detach_received_event_subscription.on_link_detach_received(link_instance->on_link_detach_received_event_subscription.context, error); + } + + if (error != NULL) + { + set_link_state(link_instance, LINK_STATE_ERROR); + error_destroy(error); + } + else + { + set_link_state(link_instance, LINK_STATE_DETACHED); + } + + detach_destroy(detach); + } + } +} + +static void on_session_state_changed(void* context, SESSION_STATE new_session_state, SESSION_STATE previous_session_state) +{ + LINK_INSTANCE* link_instance = (LINK_INSTANCE*)context; + (void)previous_session_state; + + if (new_session_state == SESSION_STATE_MAPPED) + { + if ((link_instance->link_state == LINK_STATE_DETACHED) && (!link_instance->is_closed)) + { + if (send_attach(link_instance, link_instance->name, 0, link_instance->role) == 0) + { + set_link_state(link_instance, LINK_STATE_HALF_ATTACHED_ATTACH_SENT); + } + } + } + else if (new_session_state == SESSION_STATE_DISCARDING) + { + remove_all_pending_deliveries(link_instance, true); + set_link_state(link_instance, LINK_STATE_DETACHED); + } + else if (new_session_state == SESSION_STATE_ERROR) + { + remove_all_pending_deliveries(link_instance, true); + set_link_state(link_instance, LINK_STATE_ERROR); + } +} + +static void on_session_flow_on(void* context) +{ + LINK_INSTANCE* link_instance = (LINK_INSTANCE*)context; + if (link_instance->role == role_sender) + { + link_instance->on_link_flow_on(link_instance->callback_context); + } +} + +static void on_send_complete(void* context, IO_SEND_RESULT send_result) +{ + LIST_ITEM_HANDLE delivery_instance_list_item = (LIST_ITEM_HANDLE)context; + ASYNC_OPERATION_HANDLE pending_delivery_operation = (ASYNC_OPERATION_HANDLE)singlylinkedlist_item_get_value(delivery_instance_list_item); + DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)GET_ASYNC_OPERATION_CONTEXT(DELIVERY_INSTANCE, pending_delivery_operation); + LINK_HANDLE link = (LINK_HANDLE)delivery_instance->link; + (void)send_result; + if (link->snd_settle_mode == sender_settle_mode_settled) + { + delivery_instance->on_delivery_settled(delivery_instance->callback_context, delivery_instance->delivery_id, LINK_DELIVERY_SETTLE_REASON_SETTLED, NULL); + async_operation_destroy(pending_delivery_operation); + (void)singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item); + } +} + +LINK_HANDLE link_create(SESSION_HANDLE session, const char* name, role role, AMQP_VALUE source, AMQP_VALUE target) +{ + LINK_INSTANCE* result = (LINK_INSTANCE*)malloc(sizeof(LINK_INSTANCE)); + if (result == NULL) + { + LogError("Cannot create link"); + } + else + { + result->link_state = LINK_STATE_DETACHED; + result->previous_link_state = LINK_STATE_DETACHED; + result->role = role; + result->source = amqpvalue_clone(source); + result->target = amqpvalue_clone(target); + result->session = session; + result->handle = 0; + result->snd_settle_mode = sender_settle_mode_unsettled; + result->rcv_settle_mode = receiver_settle_mode_first; + result->delivery_count = 0; + result->initial_delivery_count = 0; + result->max_message_size = 0; + result->max_link_credit = DEFAULT_LINK_CREDIT; + result->peer_max_message_size = 0; + result->is_underlying_session_begun = false; + result->is_closed = false; + result->attach_properties = NULL; + result->received_payload = NULL; + result->received_payload_size = 0; + result->received_delivery_id = 0; + result->on_link_detach_received_event_subscription.on_link_detach_received = NULL; + result->on_link_detach_received_event_subscription.context = NULL; + + result->tick_counter = tickcounter_create(); + if (result->tick_counter == NULL) + { + LogError("Cannot create tick counter for link"); + free(result); + result = NULL; + } + else + { + result->pending_deliveries = singlylinkedlist_create(); + if (result->pending_deliveries == NULL) + { + LogError("Cannot create pending deliveries list"); + tickcounter_destroy(result->tick_counter); + free(result); + result = NULL; + } + else + { + size_t name_length = strlen(name); + result->name = (char*)malloc(name_length + 1); + if (result->name == NULL) + { + LogError("Cannot allocate memory for link name"); + tickcounter_destroy(result->tick_counter); + singlylinkedlist_destroy(result->pending_deliveries); + free(result); + result = NULL; + } + else + { + result->on_link_state_changed = NULL; + result->callback_context = NULL; + set_link_state(result, LINK_STATE_DETACHED); + + (void)memcpy(result->name, name, name_length + 1); + result->link_endpoint = session_create_link_endpoint(session, name); + if (result->link_endpoint == NULL) + { + LogError("Cannot create link endpoint"); + tickcounter_destroy(result->tick_counter); + singlylinkedlist_destroy(result->pending_deliveries); + free(result->name); + free(result); + result = NULL; + } + } + } + } + } + + return result; +} + +LINK_HANDLE link_create_from_endpoint(SESSION_HANDLE session, LINK_ENDPOINT_HANDLE link_endpoint, const char* name, role role, AMQP_VALUE source, AMQP_VALUE target) +{ + LINK_INSTANCE* result = (LINK_INSTANCE*)malloc(sizeof(LINK_INSTANCE)); + if (result == NULL) + { + LogError("Cannot create link"); + } + else + { + result->link_state = LINK_STATE_DETACHED; + result->previous_link_state = LINK_STATE_DETACHED; + result->session = session; + result->handle = 0; + result->snd_settle_mode = sender_settle_mode_unsettled; + result->rcv_settle_mode = receiver_settle_mode_first; + result->delivery_count = 0; + result->initial_delivery_count = 0; + result->max_message_size = 0; + result->max_link_credit = DEFAULT_LINK_CREDIT; + result->peer_max_message_size = 0; + result->is_underlying_session_begun = false; + result->is_closed = false; + result->attach_properties = NULL; + result->received_payload = NULL; + result->received_payload_size = 0; + result->received_delivery_id = 0; + result->source = amqpvalue_clone(target); + result->target = amqpvalue_clone(source); + result->on_link_detach_received_event_subscription.on_link_detach_received = NULL; + result->on_link_detach_received_event_subscription.context = NULL; + + if (role == role_sender) + { + result->role = role_receiver; + } + else + { + result->role = role_sender; + } + + result->tick_counter = tickcounter_create(); + if (result->tick_counter == NULL) + { + LogError("Cannot create tick counter for link"); + free(result); + result = NULL; + } + else + { + result->pending_deliveries = singlylinkedlist_create(); + if (result->pending_deliveries == NULL) + { + LogError("Cannot create pending deliveries list"); + tickcounter_destroy(result->tick_counter); + free(result); + result = NULL; + } + else + { + size_t name_length = strlen(name); + result->name = (char*)malloc(name_length + 1); + if (result->name == NULL) + { + LogError("Cannot allocate memory for link name"); + tickcounter_destroy(result->tick_counter); + singlylinkedlist_destroy(result->pending_deliveries); + free(result); + result = NULL; + } + else + { + (void)memcpy(result->name, name, name_length + 1); + result->on_link_state_changed = NULL; + result->callback_context = NULL; + result->link_endpoint = link_endpoint; + } + } + } + } + + return result; +} + +void link_destroy(LINK_HANDLE link) +{ + if (link == NULL) + { + LogError("NULL link"); + } + else + { + remove_all_pending_deliveries((LINK_INSTANCE*)link, false); + tickcounter_destroy(link->tick_counter); + + link->on_link_state_changed = NULL; + (void)link_detach(link, true, NULL, NULL, NULL); + session_destroy_link_endpoint(link->link_endpoint); + amqpvalue_destroy(link->source); + amqpvalue_destroy(link->target); + + if (link->name != NULL) + { + free(link->name); + } + + if (link->attach_properties != NULL) + { + amqpvalue_destroy(link->attach_properties); + } + + if (link->received_payload != NULL) + { + free(link->received_payload); + } + + free(link); + } +} + +int link_set_snd_settle_mode(LINK_HANDLE link, sender_settle_mode snd_settle_mode) +{ + int result; + + if (link == NULL) + { + LogError("NULL link"); + result = __FAILURE__; + } + else + { + link->snd_settle_mode = snd_settle_mode; + result = 0; + } + + return result; +} + +int link_get_snd_settle_mode(LINK_HANDLE link, sender_settle_mode* snd_settle_mode) +{ + int result; + + if ((link == NULL) || + (snd_settle_mode == NULL)) + { + LogError("Bad arguments: link = %p, snd_settle_mode = %p", + link, snd_settle_mode); + result = __FAILURE__; + } + else + { + *snd_settle_mode = link->snd_settle_mode; + + result = 0; + } + + return result; +} + +int link_set_rcv_settle_mode(LINK_HANDLE link, receiver_settle_mode rcv_settle_mode) +{ + int result; + + if (link == NULL) + { + LogError("NULL link"); + result = __FAILURE__; + } + else + { + link->rcv_settle_mode = rcv_settle_mode; + result = 0; + } + + return result; +} + +int link_get_rcv_settle_mode(LINK_HANDLE link, receiver_settle_mode* rcv_settle_mode) +{ + int result; + + if ((link == NULL) || + (rcv_settle_mode == NULL)) + { + LogError("Bad arguments: link = %p, rcv_settle_mode = %p", + link, rcv_settle_mode); + result = __FAILURE__; + } + else + { + *rcv_settle_mode = link->rcv_settle_mode; + result = 0; + } + + return result; +} + +int link_set_initial_delivery_count(LINK_HANDLE link, sequence_no initial_delivery_count) +{ + int result; + + if (link == NULL) + { + LogError("NULL link"); + result = __FAILURE__; + } + else + { + link->initial_delivery_count = initial_delivery_count; + result = 0; + } + + return result; +} + +int link_get_initial_delivery_count(LINK_HANDLE link, sequence_no* initial_delivery_count) +{ + int result; + + if ((link == NULL) || + (initial_delivery_count == NULL)) + { + LogError("Bad arguments: link = %p, initial_delivery_count = %p", + link, initial_delivery_count); + result = __FAILURE__; + } + else + { + *initial_delivery_count = link->initial_delivery_count; + result = 0; + } + + return result; +} + +int link_set_max_message_size(LINK_HANDLE link, uint64_t max_message_size) +{ + int result; + + if (link == NULL) + { + LogError("NULL link"); + result = __FAILURE__; + } + else + { + link->max_message_size = max_message_size; + result = 0; + } + + return result; +} + +int link_get_max_message_size(LINK_HANDLE link, uint64_t* max_message_size) +{ + int result; + + if ((link == NULL) || + (max_message_size == NULL)) + { + LogError("Bad arguments: link = %p, max_message_size = %p", + link, max_message_size); + result = __FAILURE__; + } + else + { + *max_message_size = link->max_message_size; + result = 0; + } + + return result; +} + +int link_get_peer_max_message_size(LINK_HANDLE link, uint64_t* peer_max_message_size) +{ + int result; + + if ((link == NULL) || + (peer_max_message_size == NULL)) + { + LogError("Bad arguments: link = %p, peer_max_message_size = %p", + link, peer_max_message_size); + result = __FAILURE__; + } + else if ((link->link_state != LINK_STATE_ATTACHED) && + (link->link_state != LINK_STATE_HALF_ATTACHED_ATTACH_RECEIVED)) + { + LogError("Attempting to read peer max message size before it was received"); + result = __FAILURE__; + } + else + { + *peer_max_message_size = link->peer_max_message_size; + result = 0; + } + + return result; +} + +int link_set_attach_properties(LINK_HANDLE link, fields attach_properties) +{ + int result; + + if (link == NULL) + { + LogError("NULL link"); + result = __FAILURE__; + } + else + { + link->attach_properties = amqpvalue_clone(attach_properties); + if (link->attach_properties == NULL) + { + LogError("Failed cloning attach properties"); + result = __FAILURE__; + } + else + { + result = 0; + } + } + + return result; +} + +int link_set_max_link_credit(LINK_HANDLE link, uint32_t max_link_credit) +{ + int result; + + if (link == NULL) + { + result = __FAILURE__; + } + else + { + link->max_link_credit = max_link_credit; + result = 0; + } + + return result; +} + +int link_attach(LINK_HANDLE link, ON_TRANSFER_RECEIVED on_transfer_received, ON_LINK_STATE_CHANGED on_link_state_changed, ON_LINK_FLOW_ON on_link_flow_on, void* callback_context) +{ + int result; + + if (link == NULL) + { + LogError("NULL link"); + result = __FAILURE__; + } + else if (link->is_closed) + { + LogError("Already attached"); + result = __FAILURE__; + } + else + { + if (!link->is_underlying_session_begun) + { + link->on_link_state_changed = on_link_state_changed; + link->on_transfer_received = on_transfer_received; + link->on_link_flow_on = on_link_flow_on; + link->callback_context = callback_context; + + if (session_begin(link->session) != 0) + { + LogError("Begin session failed"); + result = __FAILURE__; + } + else + { + link->is_underlying_session_begun = true; + + if (session_start_link_endpoint(link->link_endpoint, link_frame_received, on_session_state_changed, on_session_flow_on, link) != 0) + { + LogError("Binding link endpoint to session failed"); + result = __FAILURE__; + } + else + { + link->received_payload_size = 0; + + result = 0; + } + } + } + else + { + result = 0; + } + } + + return result; +} + +int link_detach(LINK_HANDLE link, bool close, const char* error_condition, const char* error_description, AMQP_VALUE info) +{ + int result; + + (void)error_condition; + (void)error_description; + (void)info; + + if (link == NULL) + { + LogError("NULL link"); + result = __FAILURE__; + } + else if (link->is_closed) + { + result = 0; + } + else + { + ERROR_HANDLE error; + + if (error_condition != NULL) + { + error = error_create(error_condition); + if (error == NULL) + { + LogInfo("Cannot create error for detach, detaching without error anyhow"); + } + else + { + if (error_description != NULL) + { + if (error_set_description(error, error_description) != 0) + { + LogInfo("Cannot set error description on detach error, detaching anyhow"); + } + } + + if (info != NULL) + { + if (error_set_info(error, info) != 0) + { + LogInfo("Cannot set info map on detach error, detaching anyhow"); + } + } + } + } + else + { + error = NULL; + } + + switch (link->link_state) + { + case LINK_STATE_HALF_ATTACHED_ATTACH_SENT: + case LINK_STATE_HALF_ATTACHED_ATTACH_RECEIVED: + /* Sending detach when remote is not yet attached */ + if (send_detach(link, close, error) != 0) + { + LogError("Sending detach frame failed"); + result = __FAILURE__; + } + else + { + set_link_state(link, LINK_STATE_DETACHED); + result = 0; + } + break; + + case LINK_STATE_ATTACHED: + /* Send detach and wait for remote to respond */ + if (send_detach(link, close, error) != 0) + { + LogError("Sending detach frame failed"); + result = __FAILURE__; + } + else + { + set_link_state(link, LINK_STATE_HALF_ATTACHED_ATTACH_SENT); + result = 0; + } + break; + + case LINK_STATE_DETACHED: + /* Already detached */ + result = 0; + break; + + default: + case LINK_STATE_ERROR: + /* Already detached and in error state */ + result = __FAILURE__; + break; + } + + if (error != NULL) + { + error_destroy(error); + } + } + + return result; +} + +static bool remove_pending_delivery_condition_function(const void* item, const void* match_context, bool* continue_processing) +{ + bool result; + + if (item == match_context) + { + result = true; + *continue_processing = false; + } + else + { + result = false; + *continue_processing = true; + } + + return result; +} + +static void link_transfer_cancel_handler(ASYNC_OPERATION_HANDLE link_transfer_operation) +{ + DELIVERY_INSTANCE* pending_delivery = GET_ASYNC_OPERATION_CONTEXT(DELIVERY_INSTANCE, link_transfer_operation); + if (pending_delivery->on_delivery_settled != NULL) + { + pending_delivery->on_delivery_settled(pending_delivery->callback_context, pending_delivery->delivery_id, LINK_DELIVERY_SETTLE_REASON_CANCELLED, NULL); + } + + (void)singlylinkedlist_remove_if(((LINK_HANDLE)pending_delivery->link)->pending_deliveries, remove_pending_delivery_condition_function, pending_delivery); + + async_operation_destroy(link_transfer_operation); +} + +ASYNC_OPERATION_HANDLE link_transfer_async(LINK_HANDLE link, message_format message_format, PAYLOAD* payloads, size_t payload_count, ON_DELIVERY_SETTLED on_delivery_settled, void* callback_context, LINK_TRANSFER_RESULT* link_transfer_error, tickcounter_ms_t timeout) +{ + ASYNC_OPERATION_HANDLE result; + + if ((link == NULL) || + (link_transfer_error == NULL)) + { + if (link_transfer_error != NULL) + { + *link_transfer_error = LINK_TRANSFER_ERROR; + } + + LogError("Invalid arguments: link = %p, link_transfer_error = %p", + link, link_transfer_error); + result = NULL; + } + else + { + if (link->role != role_sender) + { + LogError("Link is not a sender link"); + *link_transfer_error = LINK_TRANSFER_ERROR; + result = NULL; + } + else if (link->link_state != LINK_STATE_ATTACHED) + { + LogError("Link is not attached"); + *link_transfer_error = LINK_TRANSFER_ERROR; + result = NULL; + } + else if (link->current_link_credit == 0) + { + *link_transfer_error = LINK_TRANSFER_BUSY; + result = NULL; + } + else + { + result = CREATE_ASYNC_OPERATION(DELIVERY_INSTANCE, link_transfer_cancel_handler); + if (result == NULL) + { + LogError("Error creating async operation"); + *link_transfer_error = LINK_TRANSFER_ERROR; + } + else + { + TRANSFER_HANDLE transfer = transfer_create(0); + if (transfer == NULL) + { + LogError("Error creating transfer"); + *link_transfer_error = LINK_TRANSFER_ERROR; + async_operation_destroy(result); + result = NULL; + } + else + { + sequence_no delivery_count = link->delivery_count + 1; + unsigned char delivery_tag_bytes[sizeof(delivery_count)]; + delivery_tag delivery_tag; + bool settled; + + (void)memcpy(delivery_tag_bytes, &delivery_count, sizeof(delivery_count)); + + delivery_tag.bytes = &delivery_tag_bytes; + delivery_tag.length = sizeof(delivery_tag_bytes); + + if (link->snd_settle_mode == sender_settle_mode_unsettled) + { + settled = false; + } + else + { + settled = true; + } + + if (transfer_set_delivery_tag(transfer, delivery_tag) != 0) + { + LogError("Failed setting delivery tag"); + *link_transfer_error = LINK_TRANSFER_ERROR; + async_operation_destroy(result); + result = NULL; + } + else if (transfer_set_message_format(transfer, message_format) != 0) + { + LogError("Failed setting message format"); + *link_transfer_error = LINK_TRANSFER_ERROR; + async_operation_destroy(result); + result = NULL; + } + else if (transfer_set_settled(transfer, settled) != 0) + { + LogError("Failed setting settled flag"); + *link_transfer_error = LINK_TRANSFER_ERROR; + async_operation_destroy(result); + result = NULL; + } + else + { + AMQP_VALUE transfer_value = amqpvalue_create_transfer(transfer); + if (transfer_value == NULL) + { + LogError("Failed creating transfer performative AMQP value"); + *link_transfer_error = LINK_TRANSFER_ERROR; + async_operation_destroy(result); + result = NULL; + } + else + { + DELIVERY_INSTANCE* pending_delivery = GET_ASYNC_OPERATION_CONTEXT(DELIVERY_INSTANCE, result); + if (pending_delivery == NULL) + { + LogError("Failed getting pending delivery"); + *link_transfer_error = LINK_TRANSFER_ERROR; + async_operation_destroy(result); + result = NULL; + } + else + { + if (tickcounter_get_current_ms(link->tick_counter, &pending_delivery->start_tick) != 0) + { + LogError("Failed getting current tick"); + *link_transfer_error = LINK_TRANSFER_ERROR; + async_operation_destroy(result); + result = NULL; + } + else + { + LIST_ITEM_HANDLE delivery_instance_list_item; + pending_delivery->timeout = timeout; + pending_delivery->on_delivery_settled = on_delivery_settled; + pending_delivery->callback_context = callback_context; + pending_delivery->link = link; + delivery_instance_list_item = singlylinkedlist_add(link->pending_deliveries, result); + + if (delivery_instance_list_item == NULL) + { + LogError("Failed adding delivery to list"); + *link_transfer_error = LINK_TRANSFER_ERROR; + async_operation_destroy(result); + result = NULL; + } + else + { + /* here we should feed data to the transfer frame */ + switch (session_send_transfer(link->link_endpoint, transfer, payloads, payload_count, &pending_delivery->delivery_id, (settled) ? on_send_complete : NULL, delivery_instance_list_item)) + { + default: + case SESSION_SEND_TRANSFER_ERROR: + LogError("Failed session send transfer"); + if (singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item) != 0) + { + LogError("Error removing pending delivery from the list"); + } + + *link_transfer_error = LINK_TRANSFER_ERROR; + async_operation_destroy(result); + result = NULL; + break; + + case SESSION_SEND_TRANSFER_BUSY: + /* Ensure we remove from list again since sender will attempt to transfer again on flow on */ + LogError("Failed session send transfer"); + if (singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item) != 0) + { + LogError("Error removing pending delivery from the list"); + } + + *link_transfer_error = LINK_TRANSFER_BUSY; + async_operation_destroy(result); + result = NULL; + break; + + case SESSION_SEND_TRANSFER_OK: + link->delivery_count = delivery_count; + link->current_link_credit--; + break; + } + } + } + } + + amqpvalue_destroy(transfer_value); + } + } + + transfer_destroy(transfer); + } + } + } + } + + return result; +} + +int link_get_name(LINK_HANDLE link, const char** link_name) +{ + int result; + + if (link == NULL) + { + LogError("NULL link"); + result = __FAILURE__; + } + else + { + *link_name = link->name; + result = 0; + } + + return result; +} + +int link_get_received_message_id(LINK_HANDLE link, delivery_number* message_id) +{ + int result; + + if (link == NULL) + { + LogError("NULL link"); + result = __FAILURE__; + } + else + { + *message_id = link->received_delivery_id; + result = 0; + } + + return result; +} + +int link_send_disposition(LINK_HANDLE link, delivery_number message_id, AMQP_VALUE delivery_state) +{ + int result; + + if (delivery_state == NULL) + { + result = 0; + } + else + { + result = send_disposition(link, message_id, delivery_state); + if (result != 0) + { + LogError("Cannot send disposition frame"); + result = __FAILURE__; + } + } + + return result; +} + +void link_dowork(LINK_HANDLE link) +{ + if (link == NULL) + { + LogError("NULL link"); + } + else + { + tickcounter_ms_t current_tick; + + if (tickcounter_get_current_ms(link->tick_counter, ¤t_tick) != 0) + { + LogError("Cannot get tick counter value"); + } + else + { + // go through all and find timed out deliveries + LIST_ITEM_HANDLE item = singlylinkedlist_get_head_item(link->pending_deliveries); + while (item != NULL) + { + LIST_ITEM_HANDLE next_item = singlylinkedlist_get_next_item(item); + ASYNC_OPERATION_HANDLE delivery_instance_async_operation = (ASYNC_OPERATION_HANDLE)singlylinkedlist_item_get_value(item); + DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)GET_ASYNC_OPERATION_CONTEXT(DELIVERY_INSTANCE, delivery_instance_async_operation); + + if ((delivery_instance->timeout != 0) && + (current_tick - delivery_instance->start_tick >= delivery_instance->timeout)) + { + if (delivery_instance->on_delivery_settled != NULL) + { + delivery_instance->on_delivery_settled(delivery_instance->callback_context, delivery_instance->delivery_id, LINK_DELIVERY_SETTLE_REASON_TIMEOUT, NULL); + } + + if (singlylinkedlist_remove(link->pending_deliveries, item) != 0) + { + LogError("Cannot remove item from list"); + } + + async_operation_destroy(delivery_instance_async_operation); + } + + item = next_item; + } + } + } +} + +ON_LINK_DETACH_EVENT_SUBSCRIPTION_HANDLE link_subscribe_on_link_detach_received(LINK_HANDLE link, ON_LINK_DETACH_RECEIVED on_link_detach_received, void* context) +{ + ON_LINK_DETACH_EVENT_SUBSCRIPTION_HANDLE result; + + if ((link == NULL) || + (on_link_detach_received == NULL)) + { + LogError("Invalid arguments: link = %p, on_link_detach_received = %p, context = %p", + link, on_link_detach_received, context); + result = NULL; + } + else + { + if (link->on_link_detach_received_event_subscription.on_link_detach_received != NULL) + { + LogError("Already subscribed for on_link_detach_received events"); + result = NULL; + } + else + { + link->on_link_detach_received_event_subscription.on_link_detach_received = on_link_detach_received; + link->on_link_detach_received_event_subscription.context = context; + + result = &link->on_link_detach_received_event_subscription; + } + } + + return result; +} + +void link_unsubscribe_on_link_detach_received(ON_LINK_DETACH_EVENT_SUBSCRIPTION_HANDLE event_subscription) +{ + if (event_subscription == NULL) + { + LogError("NULL event_subscription"); + } + else + { + event_subscription->on_link_detach_received = NULL; + event_subscription->context = NULL; + } +}