A small memory footprint AMQP implimentation
Dependents: iothub_client_sample_amqp remote_monitoring simplesample_amqp
Diff: link.c
- Revision:
- 43:4c1e4e94cdd3
- Parent:
- 41:0e723f9cbd89
- Child:
- 44:9dd558f13109
diff -r e2c1c1b77f13 -r 4c1e4e94cdd3 link.c --- a/link.c Fri May 04 13:24:52 2018 -0700 +++ b/link.c Mon Jun 11 15:39:52 2018 -0700 @@ -29,6 +29,12 @@ tickcounter_ms_t timeout; } DELIVERY_INSTANCE; +typedef struct ON_LINK_DETACH_EVENT_SUBSCRIPTION_TAG +{ + ON_LINK_DETACH_RECEIVED on_link_detach_received; + void* context; +} ON_LINK_DETACH_EVENT_SUBSCRIPTION; + typedef struct LINK_INSTANCE_TAG { SESSION_HANDLE session; @@ -61,6 +67,7 @@ uint32_t received_payload_size; delivery_number received_delivery_id; TICK_COUNTER_HANDLE tick_counter; + ON_LINK_DETACH_EVENT_SUBSCRIPTION on_link_detach_received_event_subscription; } LINK_INSTANCE; DEFINE_ASYNC_OPERATION_CONTEXT(DELIVERY_INSTANCE); @@ -595,7 +602,6 @@ LogError("Failed sending detach frame"); } } - /* Received a closing detach after we sent a non-closing detach. */ else if (closed && ((link_instance->link_state == LINK_STATE_HALF_ATTACHED_ATTACH_SENT) || (link_instance->link_state == LINK_STATE_HALF_ATTACHED_ATTACH_RECEIVED)) && @@ -616,10 +622,16 @@ if (detach_get_error(detach, &error) == 0) { - error_destroy(error); - remove_all_pending_deliveries(link_instance, true); set_link_state(link_instance, LINK_STATE_ERROR); + + // signal link detach received in order to handle cases like redirect + if (link_instance->on_link_detach_received_event_subscription.on_link_detach_received != NULL) + { + link_instance->on_link_detach_received_event_subscription.on_link_detach_received(link_instance->on_link_detach_received_event_subscription.context, error); + } + + error_destroy(error); } else { @@ -714,6 +726,8 @@ result->received_payload = NULL; result->received_payload_size = 0; result->received_delivery_id = 0; + result->on_link_detach_received_event_subscription.on_link_detach_received = NULL; + result->on_link_detach_received_event_subscription.context = NULL; result->tick_counter = tickcounter_create(); if (result->tick_counter == NULL) @@ -797,6 +811,8 @@ result->received_delivery_id = 0; result->source = amqpvalue_clone(target); result->target = amqpvalue_clone(source); + result->on_link_detach_received_event_subscription.on_link_detach_received = NULL; + result->on_link_detach_received_event_subscription.context = NULL; if (role == role_sender) { @@ -862,7 +878,7 @@ tickcounter_destroy(link->tick_counter); link->on_link_state_changed = NULL; - (void)link_detach(link, true); + (void)link_detach(link, true, NULL, NULL, NULL); session_destroy_link_endpoint(link->link_endpoint); amqpvalue_destroy(link->source); amqpvalue_destroy(link->target); @@ -1162,10 +1178,14 @@ return result; } -int link_detach(LINK_HANDLE link, bool close) +int link_detach(LINK_HANDLE link, bool close, const char* error_condition, const char* error_description, AMQP_VALUE info) { int result; + (void)error_condition; + (void)error_description; + (void)info; + if (link == NULL) { LogError("NULL link"); @@ -1177,12 +1197,45 @@ } else { + ERROR_HANDLE error; + + if (error_condition != NULL) + { + error = error_create(error_condition); + if (error == NULL) + { + LogInfo("Cannot create error for detach, detaching without error anyhow"); + } + else + { + if (error_description != NULL) + { + if (error_set_description(error, error_description) != 0) + { + LogInfo("Cannot set error description on detach error, detaching anyhow"); + } + } + + if (info != NULL) + { + if (error_set_info(error, info) != 0) + { + LogInfo("Cannot set info map on detach error, detaching anyhow"); + } + } + } + } + else + { + error = NULL; + } + switch (link->link_state) { case LINK_STATE_HALF_ATTACHED_ATTACH_SENT: case LINK_STATE_HALF_ATTACHED_ATTACH_RECEIVED: /* Sending detach when remote is not yet attached */ - if (send_detach(link, close, NULL) != 0) + if (send_detach(link, close, error) != 0) { LogError("Sending detach frame failed"); result = __FAILURE__; @@ -1196,7 +1249,7 @@ case LINK_STATE_ATTACHED: /* Send detach and wait for remote to respond */ - if (send_detach(link, close, NULL) != 0) + if (send_detach(link, close, error) != 0) { LogError("Sending detach frame failed"); result = __FAILURE__; @@ -1219,6 +1272,11 @@ result = __FAILURE__; break; } + + if (error != NULL) + { + error_destroy(error); + } } return result; @@ -1550,3 +1608,46 @@ } } } + +ON_LINK_DETACH_EVENT_SUBSCRIPTION_HANDLE link_subscribe_on_link_detach_received(LINK_HANDLE link, ON_LINK_DETACH_RECEIVED on_link_detach_received, void* context) +{ + ON_LINK_DETACH_EVENT_SUBSCRIPTION_HANDLE result; + + if ((link == NULL) || + (on_link_detach_received == NULL)) + { + LogError("Invalid arguments: link = %p, on_link_detach_received = %p, context = %p", + link, on_link_detach_received, context); + result = NULL; + } + else + { + if (link->on_link_detach_received_event_subscription.on_link_detach_received != NULL) + { + LogError("Already subscribed for on_link_detach_received events"); + result = NULL; + } + else + { + link->on_link_detach_received_event_subscription.on_link_detach_received = on_link_detach_received; + link->on_link_detach_received_event_subscription.context = context; + + result = &link->on_link_detach_received_event_subscription; + } + } + + return result; +} + +void link_unsubscribe_on_link_detach_received(ON_LINK_DETACH_EVENT_SUBSCRIPTION_HANDLE event_subscription) +{ + if (event_subscription == NULL) + { + LogError("NULL event_subscription"); + } + else + { + event_subscription->on_link_detach_received = NULL; + event_subscription->context = NULL; + } +}