A small memory footprint AMQP implimentation
Dependents: iothub_client_sample_amqp remote_monitoring simplesample_amqp
Diff: link.c
- Revision:
- 41:0e723f9cbd89
- Parent:
- 37:c923ba7f6cf9
- Child:
- 43:4c1e4e94cdd3
diff -r f0ceafa8d570 -r 0e723f9cbd89 link.c --- a/link.c Mon Mar 05 17:41:28 2018 -0800 +++ b/link.c Tue Mar 20 10:29:38 2018 -0700 @@ -51,7 +51,8 @@ sequence_no initial_delivery_count; uint64_t max_message_size; uint64_t peer_max_message_size; - uint32_t link_credit; + uint32_t current_link_credit; + uint32_t max_link_credit; uint32_t available; fields attach_properties; bool is_underlying_session_begun; @@ -110,20 +111,31 @@ if (flow == NULL) { + LogError("NULL flow performative"); result = __FAILURE__; } else { - if ((flow_set_link_credit(flow, link->link_credit) != 0) || - (flow_set_handle(flow, link->handle) != 0) || - (flow_set_delivery_count(flow, link->delivery_count) != 0)) + if (flow_set_link_credit(flow, link->current_link_credit) != 0) + { + LogError("Cannot set link credit on flow performative"); + result = __FAILURE__; + } + else if (flow_set_handle(flow, link->handle) != 0) { + LogError("Cannot set handle on flow performative"); + result = __FAILURE__; + } + else if (flow_set_delivery_count(flow, link->delivery_count) != 0) + { + LogError("Cannot set delivery count on flow performative"); result = __FAILURE__; } else { if (session_send_flow(link->link_endpoint, flow) != 0) { + LogError("Sending flow frame failed in session send"); result = __FAILURE__; } else @@ -145,20 +157,31 @@ DISPOSITION_HANDLE disposition = disposition_create(link_instance->role, delivery_number); if (disposition == NULL) { + LogError("NULL disposition performative"); result = __FAILURE__; } else { - if ((disposition_set_last(disposition, delivery_number) != 0) || - (disposition_set_settled(disposition, true) != 0) || - ((delivery_state != NULL) && (disposition_set_state(disposition, delivery_state) != 0))) + if (disposition_set_last(disposition, delivery_number) != 0) + { + LogError("Failed setting last on disposition performative"); + result = __FAILURE__; + } + else if (disposition_set_settled(disposition, true) != 0) { + LogError("Failed setting settled on disposition performative"); + result = __FAILURE__; + } + else if ((delivery_state != NULL) && (disposition_set_state(disposition, delivery_state) != 0)) + { + LogError("Failed setting state on disposition performative"); result = __FAILURE__; } else { if (session_send_disposition(link_instance->link_endpoint, disposition) != 0) { + LogError("Sending disposition failed in session send"); result = __FAILURE__; } else @@ -181,6 +204,7 @@ detach_performative = detach_create(0); if (detach_performative == NULL) { + LogError("NULL detach performative"); result = __FAILURE__; } else @@ -188,17 +212,20 @@ if ((error_handle != NULL) && (detach_set_error(detach_performative, error_handle) != 0)) { + LogError("Failed setting error on detach frame"); result = __FAILURE__; } else if (close && (detach_set_closed(detach_performative, true) != 0)) { + LogError("Failed setting closed field on detach frame"); result = __FAILURE__; } else { if (session_send_detach(link_instance->link_endpoint, detach_performative) != 0) { + LogError("Sending detach frame failed in session send"); result = __FAILURE__; } else @@ -226,6 +253,7 @@ if (attach == NULL) { + LogError("NULL attach performative"); result = __FAILURE__; } else @@ -241,22 +269,28 @@ attach_set_target(attach, link->target); if (link->attach_properties != NULL) { - attach_set_properties(attach, link->attach_properties); + (void)attach_set_properties(attach, link->attach_properties); } if (role == role_sender) { if (attach_set_initial_delivery_count(attach, link->delivery_count) != 0) { + LogError("Cannot set attach initial delivery count"); result = __FAILURE__; } } if (result == 0) { - if ((attach_set_max_message_size(attach, link->max_message_size) != 0) || - (session_send_attach(link->link_endpoint, attach) != 0)) + if (attach_set_max_message_size(attach, link->max_message_size) != 0) { + LogError("Cannot set max message size"); + result = __FAILURE__; + } + else if (session_send_attach(link->link_endpoint, attach) != 0) + { + LogError("Sending attach failed in session send"); result = __FAILURE__; } else @@ -280,12 +314,16 @@ { ATTACH_HANDLE attach_handle; - if (amqpvalue_get_attach(performative, &attach_handle) == 0) + if (amqpvalue_get_attach(performative, &attach_handle) != 0) + { + LogError("Cannot get attach performative"); + } + else { if ((link_instance->role == role_receiver) && (attach_get_initial_delivery_count(attach_handle, &link_instance->delivery_count) != 0)) { - /* error */ + LogError("Cannot get initial delivery count"); remove_all_pending_deliveries(link_instance, true); set_link_state(link_instance, LINK_STATE_DETACHED); } @@ -301,12 +339,12 @@ { if (link_instance->role == role_receiver) { - link_instance->link_credit = DEFAULT_LINK_CREDIT; + link_instance->current_link_credit = link_instance->max_link_credit; send_flow(link_instance); } else { - link_instance->link_credit = 0; + link_instance->current_link_credit = 0; } if (link_instance->link_state == LINK_STATE_DETACHED) @@ -326,24 +364,33 @@ else if (is_flow_type_by_descriptor(descriptor)) { FLOW_HANDLE flow_handle; - if (amqpvalue_get_flow(performative, &flow_handle) == 0) + if (amqpvalue_get_flow(performative, &flow_handle) != 0) + { + LogError("Cannot get flow performative"); + } + else { if (link_instance->role == role_sender) { delivery_number rcv_delivery_count; uint32_t rcv_link_credit; - if ((flow_get_link_credit(flow_handle, &rcv_link_credit) != 0) || - (flow_get_delivery_count(flow_handle, &rcv_delivery_count) != 0)) + if (flow_get_link_credit(flow_handle, &rcv_link_credit) != 0) { - /* error */ + LogError("Cannot get link credit"); + remove_all_pending_deliveries(link_instance, true); + set_link_state(link_instance, LINK_STATE_DETACHED); + } + else if (flow_get_delivery_count(flow_handle, &rcv_delivery_count) != 0) + { + LogError("Cannot get delivery count"); remove_all_pending_deliveries(link_instance, true); set_link_state(link_instance, LINK_STATE_DETACHED); } else { - link_instance->link_credit = rcv_delivery_count + rcv_link_credit - link_instance->delivery_count; - if (link_instance->link_credit > 0) + link_instance->current_link_credit = rcv_delivery_count + rcv_link_credit - link_instance->delivery_count; + if (link_instance->current_link_credit > 0) { link_instance->on_link_flow_on(link_instance->callback_context); } @@ -358,17 +405,21 @@ if (link_instance->on_transfer_received != NULL) { TRANSFER_HANDLE transfer_handle; - if (amqpvalue_get_transfer(performative, &transfer_handle) == 0) + if (amqpvalue_get_transfer(performative, &transfer_handle) != 0) + { + LogError("Cannot get transfer performative"); + } + else { AMQP_VALUE delivery_state; bool more; bool is_error; - link_instance->link_credit--; + link_instance->current_link_credit--; link_instance->delivery_count++; - if (link_instance->link_credit == 0) + if (link_instance->current_link_credit == 0) { - link_instance->link_credit = DEFAULT_LINK_CREDIT; + link_instance->current_link_credit = link_instance->max_link_credit; send_flow(link_instance); } @@ -437,6 +488,7 @@ { LogError("Cannot send disposition frame"); } + amqpvalue_destroy(delivery_state); } } @@ -451,7 +503,7 @@ DISPOSITION_HANDLE disposition; if (amqpvalue_get_disposition(performative, &disposition) != 0) { - /* error */ + LogError("Cannot get disposition performative"); } else { @@ -460,7 +512,7 @@ if (disposition_get_first(disposition, &first) != 0) { - /* error */ + LogError("Cannot get first field"); } else { @@ -473,7 +525,6 @@ if (disposition_get_settled(disposition, &settled) != 0) { - /* Error */ settled = false; } @@ -486,7 +537,7 @@ ASYNC_OPERATION_HANDLE pending_delivery_operation = (ASYNC_OPERATION_HANDLE)singlylinkedlist_item_get_value(pending_delivery); if (pending_delivery_operation == NULL) { - /* error */ + LogError("Cannot obtain pending delivery"); break; } else @@ -496,23 +547,17 @@ if ((delivery_instance->delivery_id >= first) && (delivery_instance->delivery_id <= last)) { AMQP_VALUE delivery_state; - if (disposition_get_state(disposition, &delivery_state) != 0) - { - /* error */ - } - else + if (disposition_get_state(disposition, &delivery_state) == 0) { delivery_instance->on_delivery_settled(delivery_instance->callback_context, delivery_instance->delivery_id, LINK_DELIVERY_SETTLE_REASON_DISPOSITION_RECEIVED, delivery_state); async_operation_destroy(pending_delivery_operation); if (singlylinkedlist_remove(link_instance->pending_deliveries, pending_delivery) != 0) { - /* error */ + LogError("Cannot remove pending delivery"); break; } - else - { - pending_delivery = next_pending_delivery; - } + + pending_delivery = next_pending_delivery; } } else @@ -532,7 +577,11 @@ DETACH_HANDLE detach; /* Set link state appropriately based on whether we received detach condition */ - if (amqpvalue_get_detach(performative, &detach) == 0) + if (amqpvalue_get_detach(performative, &detach) != 0) + { + LogError("Cannot get detach performative"); + } + else { bool closed = false; ERROR_HANDLE error; @@ -541,7 +590,10 @@ if (link_instance->link_state == LINK_STATE_ATTACHED) { /* Respond with ack */ - (void)send_detach(link_instance, closed, NULL); + if (send_detach(link_instance, closed, NULL) != 0) + { + LogError("Failed sending detach frame"); + } } /* Received a closing detach after we sent a non-closing detach. */ @@ -551,8 +603,15 @@ { /* In this case, we MUST signal that we closed by reattaching and then sending a closing detach.*/ - (void)send_attach(link_instance, link_instance->name, 0, link_instance->role); - (void)send_detach(link_instance, true, NULL); + if (send_attach(link_instance, link_instance->name, 0, link_instance->role) != 0) + { + LogError("Failed sending attach frame"); + } + + if (send_detach(link_instance, true, NULL) != 0) + { + LogError("Failed sending detach frame"); + } } if (detach_get_error(detach, &error) == 0) @@ -629,7 +688,11 @@ LINK_HANDLE link_create(SESSION_HANDLE session, const char* name, role role, AMQP_VALUE source, AMQP_VALUE target) { LINK_INSTANCE* result = (LINK_INSTANCE*)malloc(sizeof(LINK_INSTANCE)); - if (result != NULL) + if (result == NULL) + { + LogError("Cannot create link"); + } + else { result->link_state = LINK_STATE_DETACHED; result->previous_link_state = LINK_STATE_DETACHED; @@ -643,6 +706,7 @@ result->delivery_count = 0; result->initial_delivery_count = 0; result->max_message_size = 0; + result->max_link_credit = DEFAULT_LINK_CREDIT; result->peer_max_message_size = 0; result->is_underlying_session_begun = false; result->is_closed = false; @@ -654,6 +718,7 @@ result->tick_counter = tickcounter_create(); if (result->tick_counter == NULL) { + LogError("Cannot create tick counter for link"); free(result); result = NULL; } @@ -662,6 +727,7 @@ result->pending_deliveries = singlylinkedlist_create(); if (result->pending_deliveries == NULL) { + LogError("Cannot create pending deliveries list"); tickcounter_destroy(result->tick_counter); free(result); result = NULL; @@ -672,6 +738,7 @@ result->name = (char*)malloc(name_length + 1); if (result->name == NULL) { + LogError("Cannot allocate memory for link name"); tickcounter_destroy(result->tick_counter); singlylinkedlist_destroy(result->pending_deliveries); free(result); @@ -687,6 +754,7 @@ result->link_endpoint = session_create_link_endpoint(session, name); if (result->link_endpoint == NULL) { + LogError("Cannot create link endpoint"); tickcounter_destroy(result->tick_counter); singlylinkedlist_destroy(result->pending_deliveries); free(result->name); @@ -704,7 +772,11 @@ LINK_HANDLE link_create_from_endpoint(SESSION_HANDLE session, LINK_ENDPOINT_HANDLE link_endpoint, const char* name, role role, AMQP_VALUE source, AMQP_VALUE target) { LINK_INSTANCE* result = (LINK_INSTANCE*)malloc(sizeof(LINK_INSTANCE)); - if (result != NULL) + if (result == NULL) + { + LogError("Cannot create link"); + } + else { result->link_state = LINK_STATE_DETACHED; result->previous_link_state = LINK_STATE_DETACHED; @@ -715,6 +787,7 @@ result->delivery_count = 0; result->initial_delivery_count = 0; result->max_message_size = 0; + result->max_link_credit = DEFAULT_LINK_CREDIT; result->peer_max_message_size = 0; result->is_underlying_session_begun = false; result->is_closed = false; @@ -724,6 +797,7 @@ result->received_delivery_id = 0; result->source = amqpvalue_clone(target); result->target = amqpvalue_clone(source); + if (role == role_sender) { result->role = role_receiver; @@ -736,6 +810,7 @@ result->tick_counter = tickcounter_create(); if (result->tick_counter == NULL) { + LogError("Cannot create tick counter for link"); free(result); result = NULL; } @@ -744,6 +819,7 @@ result->pending_deliveries = singlylinkedlist_create(); if (result->pending_deliveries == NULL) { + LogError("Cannot create pending deliveries list"); tickcounter_destroy(result->tick_counter); free(result); result = NULL; @@ -754,6 +830,7 @@ result->name = (char*)malloc(name_length + 1); if (result->name == NULL) { + LogError("Cannot allocate memory for link name"); tickcounter_destroy(result->tick_counter); singlylinkedlist_destroy(result->pending_deliveries); free(result); @@ -775,7 +852,11 @@ void link_destroy(LINK_HANDLE link) { - if (link != NULL) + if (link == NULL) + { + LogError("NULL link"); + } + else { remove_all_pending_deliveries((LINK_INSTANCE*)link, false); tickcounter_destroy(link->tick_counter); @@ -811,6 +892,7 @@ if (link == NULL) { + LogError("NULL link"); result = __FAILURE__; } else @@ -829,6 +911,8 @@ if ((link == NULL) || (snd_settle_mode == NULL)) { + LogError("Bad arguments: link = %p, snd_settle_mode = %p", + link, snd_settle_mode); result = __FAILURE__; } else @@ -847,6 +931,7 @@ if (link == NULL) { + LogError("NULL link"); result = __FAILURE__; } else @@ -865,6 +950,8 @@ if ((link == NULL) || (rcv_settle_mode == NULL)) { + LogError("Bad arguments: link = %p, rcv_settle_mode = %p", + link, rcv_settle_mode); result = __FAILURE__; } else @@ -882,6 +969,7 @@ if (link == NULL) { + LogError("NULL link"); result = __FAILURE__; } else @@ -900,6 +988,8 @@ if ((link == NULL) || (initial_delivery_count == NULL)) { + LogError("Bad arguments: link = %p, initial_delivery_count = %p", + link, initial_delivery_count); result = __FAILURE__; } else @@ -917,6 +1007,7 @@ if (link == NULL) { + LogError("NULL link"); result = __FAILURE__; } else @@ -935,6 +1026,8 @@ if ((link == NULL) || (max_message_size == NULL)) { + LogError("Bad arguments: link = %p, max_message_size = %p", + link, max_message_size); result = __FAILURE__; } else @@ -978,6 +1071,7 @@ if (link == NULL) { + LogError("NULL link"); result = __FAILURE__; } else @@ -985,6 +1079,7 @@ link->attach_properties = amqpvalue_clone(attach_properties); if (link->attach_properties == NULL) { + LogError("Failed cloning attach properties"); result = __FAILURE__; } else @@ -996,13 +1091,35 @@ return result; } +int link_set_max_link_credit(LINK_HANDLE link, uint32_t max_link_credit) +{ + int result; + + if (link == NULL) + { + result = __FAILURE__; + } + else + { + link->max_link_credit = max_link_credit; + result = 0; + } + + return result; +} + int link_attach(LINK_HANDLE link, ON_TRANSFER_RECEIVED on_transfer_received, ON_LINK_STATE_CHANGED on_link_state_changed, ON_LINK_FLOW_ON on_link_flow_on, void* callback_context) { int result; - if ((link == NULL) || - (link->is_closed)) + if (link == NULL) { + LogError("NULL link"); + result = __FAILURE__; + } + else if (link->is_closed) + { + LogError("Already attached"); result = __FAILURE__; } else @@ -1016,6 +1133,7 @@ if (session_begin(link->session) != 0) { + LogError("Begin session failed"); result = __FAILURE__; } else @@ -1024,6 +1142,7 @@ if (session_start_link_endpoint(link->link_endpoint, link_frame_received, on_session_state_changed, on_session_flow_on, link) != 0) { + LogError("Binding link endpoint to session failed"); result = __FAILURE__; } else @@ -1047,11 +1166,15 @@ { int result; - if ((link == NULL) || - (link->is_closed)) + if (link == NULL) { + LogError("NULL link"); result = __FAILURE__; } + else if (link->is_closed) + { + result = 0; + } else { switch (link->link_state) @@ -1061,6 +1184,7 @@ /* Sending detach when remote is not yet attached */ if (send_detach(link, close, NULL) != 0) { + LogError("Sending detach frame failed"); result = __FAILURE__; } else @@ -1074,6 +1198,7 @@ /* Send detach and wait for remote to respond */ if (send_detach(link, close, NULL) != 0) { + LogError("Sending detach frame failed"); result = __FAILURE__; } else @@ -1142,17 +1267,25 @@ *link_transfer_error = LINK_TRANSFER_ERROR; } + LogError("Invalid arguments: link = %p, link_transfer_error = %p", + link, link_transfer_error); result = NULL; } else { - if ((link->role != role_sender) || - (link->link_state != LINK_STATE_ATTACHED)) + if (link->role != role_sender) { + LogError("Link is not a sender link"); *link_transfer_error = LINK_TRANSFER_ERROR; result = NULL; } - else if (link->link_credit == 0) + else if (link->link_state != LINK_STATE_ATTACHED) + { + LogError("Link is not attached"); + *link_transfer_error = LINK_TRANSFER_ERROR; + result = NULL; + } + else if (link->current_link_credit == 0) { *link_transfer_error = LINK_TRANSFER_BUSY; result = NULL; @@ -1160,11 +1293,17 @@ else { result = CREATE_ASYNC_OPERATION(DELIVERY_INSTANCE, link_transfer_cancel_handler); - if (result != NULL) + if (result == NULL) + { + LogError("Error creating async operation"); + *link_transfer_error = LINK_TRANSFER_ERROR; + } + else { TRANSFER_HANDLE transfer = transfer_create(0); if (transfer == NULL) { + LogError("Error creating transfer"); *link_transfer_error = LINK_TRANSFER_ERROR; async_operation_destroy(result); result = NULL; @@ -1190,10 +1329,23 @@ settled = true; } - if ((transfer_set_delivery_tag(transfer, delivery_tag) != 0) || - (transfer_set_message_format(transfer, message_format) != 0) || - (transfer_set_settled(transfer, settled) != 0)) + if (transfer_set_delivery_tag(transfer, delivery_tag) != 0) + { + LogError("Failed setting delivery tag"); + *link_transfer_error = LINK_TRANSFER_ERROR; + async_operation_destroy(result); + result = NULL; + } + else if (transfer_set_message_format(transfer, message_format) != 0) { + LogError("Failed setting message format"); + *link_transfer_error = LINK_TRANSFER_ERROR; + async_operation_destroy(result); + result = NULL; + } + else if (transfer_set_settled(transfer, settled) != 0) + { + LogError("Failed setting settled flag"); *link_transfer_error = LINK_TRANSFER_ERROR; async_operation_destroy(result); result = NULL; @@ -1201,9 +1353,9 @@ else { AMQP_VALUE transfer_value = amqpvalue_create_transfer(transfer); - if (transfer_value == NULL) { + LogError("Failed creating transfer performative AMQP value"); *link_transfer_error = LINK_TRANSFER_ERROR; async_operation_destroy(result); result = NULL; @@ -1213,6 +1365,7 @@ DELIVERY_INSTANCE* pending_delivery = GET_ASYNC_OPERATION_CONTEXT(DELIVERY_INSTANCE, result); if (pending_delivery == NULL) { + LogError("Failed getting pending delivery"); *link_transfer_error = LINK_TRANSFER_ERROR; async_operation_destroy(result); result = NULL; @@ -1221,6 +1374,7 @@ { if (tickcounter_get_current_ms(link->tick_counter, &pending_delivery->start_tick) != 0) { + LogError("Failed getting current tick"); *link_transfer_error = LINK_TRANSFER_ERROR; async_operation_destroy(result); result = NULL; @@ -1236,6 +1390,7 @@ if (delivery_instance_list_item == NULL) { + LogError("Failed adding delivery to list"); *link_transfer_error = LINK_TRANSFER_ERROR; async_operation_destroy(result); result = NULL; @@ -1247,7 +1402,12 @@ { default: case SESSION_SEND_TRANSFER_ERROR: - singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item); + LogError("Failed session send transfer"); + if (singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item) != 0) + { + LogError("Error removing pending delivery from the list"); + } + *link_transfer_error = LINK_TRANSFER_ERROR; async_operation_destroy(result); result = NULL; @@ -1255,7 +1415,12 @@ case SESSION_SEND_TRANSFER_BUSY: /* Ensure we remove from list again since sender will attempt to transfer again on flow on */ - singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item); + LogError("Failed session send transfer"); + if (singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item) != 0) + { + LogError("Error removing pending delivery from the list"); + } + *link_transfer_error = LINK_TRANSFER_BUSY; async_operation_destroy(result); result = NULL; @@ -1263,7 +1428,7 @@ case SESSION_SEND_TRANSFER_OK: link->delivery_count = delivery_count; - link->link_credit--; + link->current_link_credit--; break; } } @@ -1289,6 +1454,7 @@ if (link == NULL) { + LogError("NULL link"); result = __FAILURE__; } else @@ -1306,6 +1472,7 @@ if (link == NULL) { + LogError("NULL link"); result = __FAILURE__; } else @@ -1320,6 +1487,7 @@ int link_send_disposition(LINK_HANDLE link, delivery_number message_id, AMQP_VALUE delivery_state) { int result; + if (delivery_state == NULL) { result = 0; @@ -1327,24 +1495,29 @@ else { result = send_disposition(link, message_id, delivery_state); - if ( result != 0) + if (result != 0) { LogError("Cannot send disposition frame"); result = __FAILURE__; } } + return result; } void link_dowork(LINK_HANDLE link) { - if (link != NULL) + if (link == NULL) + { + LogError("NULL link"); + } + else { tickcounter_ms_t current_tick; if (tickcounter_get_current_ms(link->tick_counter, ¤t_tick) != 0) { - /* error */ + LogError("Cannot get tick counter value"); } else { @@ -1364,7 +1537,11 @@ delivery_instance->on_delivery_settled(delivery_instance->callback_context, delivery_instance->delivery_id, LINK_DELIVERY_SETTLE_REASON_TIMEOUT, NULL); } - (void)singlylinkedlist_remove(link->pending_deliveries, item); + if (singlylinkedlist_remove(link->pending_deliveries, item) != 0) + { + LogError("Cannot remove item from list"); + } + async_operation_destroy(delivery_instance_async_operation); }