A small memory footprint AMQP implimentation
Dependents: iothub_client_sample_amqp remote_monitoring simplesample_amqp
Diff: amqp_management.c
- Revision:
- 41:0e723f9cbd89
- Parent:
- 40:f0ceafa8d570
- Child:
- 43:4c1e4e94cdd3
diff -r f0ceafa8d570 -r 0e723f9cbd89 amqp_management.c --- a/amqp_management.c Mon Mar 05 17:41:28 2018 -0800 +++ b/amqp_management.c Tue Mar 20 10:29:38 2018 -0700 @@ -27,12 +27,14 @@ ON_AMQP_MANAGEMENT_EXECUTE_OPERATION_COMPLETE on_execute_operation_complete; void* callback_context; uint64_t message_id; + AMQP_MANAGEMENT_HANDLE amqp_management; } OPERATION_MESSAGE_INSTANCE; typedef enum AMQP_MANAGEMENT_STATE_TAG { AMQP_MANAGEMENT_STATE_IDLE, AMQP_MANAGEMENT_STATE_OPENING, + AMQP_MANAGEMENT_STATE_CLOSING, AMQP_MANAGEMENT_STATE_OPEN, AMQP_MANAGEMENT_STATE_ERROR } AMQP_MANAGEMENT_STATE; @@ -330,6 +332,46 @@ return result; } +static void on_message_send_complete(void* context, MESSAGE_SEND_RESULT send_result) +{ + if (context == NULL) + { + /* Codes_SRS_AMQP_MANAGEMENT_01_167: [ When `on_message_send_complete` is called with a NULL context it shall return. ]*/ + LogError("NULL context"); + } + else + { + if (send_result == MESSAGE_SEND_OK) + { + /* Codes_SRS_AMQP_MANAGEMENT_01_170: [ If `send_result` is `MESSAGE_SEND_OK`, `on_message_send_complete` shall return. ]*/ + } + else + { + /* Codes_SRS_AMQP_MANAGEMENT_01_172: [ If `send_result` is different then `MESSAGE_SEND_OK`: ]*/ + /* Codes_SRS_AMQP_MANAGEMENT_01_168: [ - `context` shall be used as a LIST_ITEM_HANDLE containing the pending operation. ]*/ + LIST_ITEM_HANDLE pending_operation_list_item_handle = (LIST_ITEM_HANDLE)context; + + /* Codes_SRS_AMQP_MANAGEMENT_01_169: [ - `on_message_send_complete` shall obtain the pending operation by calling `singlylinkedlist_item_get_value`. ]*/ + OPERATION_MESSAGE_INSTANCE* pending_operation_message = (OPERATION_MESSAGE_INSTANCE*)singlylinkedlist_item_get_value(pending_operation_list_item_handle); + AMQP_MANAGEMENT_HANDLE amqp_management = pending_operation_message->amqp_management; + + /* Codes_SRS_AMQP_MANAGEMENT_01_171: [ - `on_message_send_complete` shall removed the pending operation from the pending operations list. ]*/ + if (singlylinkedlist_remove(amqp_management->pending_operations, pending_operation_list_item_handle) != 0) + { + /* Tests_SRS_AMQP_MANAGEMENT_01_174: [ If any error occurs in removing the pending operation from the list `on_amqp_management_error` callback shall be invoked while passing the `on_amqp_management_error_context` as argument. ]*/ + amqp_management->on_amqp_management_error(amqp_management->on_amqp_management_error_context); + LogError("Cannot remove pending operation"); + } + else + { + /* Codes_SRS_AMQP_MANAGEMENT_01_173: [ - The callback associated with the pending operation shall be called with `AMQP_MANAGEMENT_EXECUTE_OPERATION_ERROR`. ]*/ + pending_operation_message->on_execute_operation_complete(pending_operation_message->callback_context, AMQP_MANAGEMENT_EXECUTE_OPERATION_ERROR, 0, NULL, NULL); + free(pending_operation_message); + } + } + } +} + static void on_message_sender_state_changed(void* context, MESSAGE_SENDER_STATE new_state, MESSAGE_SENDER_STATE previous_state) { if (context == NULL) @@ -400,6 +442,26 @@ } break; } + /* Codes_SRS_AMQP_MANAGEMENT_09_001: [ For the current state of AMQP management being `CLOSING`: ]*/ + case AMQP_MANAGEMENT_STATE_CLOSING: + { + switch (new_state) + { + default: + /* Codes_SRS_AMQP_MANAGEMENT_09_002: [ - If `new_state` is `MESSAGE_SENDER_STATE_OPEN`, `MESSAGE_SENDER_STATE_OPENING`, `MESSAGE_SENDER_STATE_ERROR` the `on_amqp_management_error` callback shall be invoked while passing the `on_amqp_management_error_context` as argument. ]*/ + case MESSAGE_SENDER_STATE_OPEN: + case MESSAGE_SENDER_STATE_OPENING: + case MESSAGE_SENDER_STATE_ERROR: + amqp_management_instance->amqp_management_state = AMQP_MANAGEMENT_STATE_ERROR; + amqp_management_instance->on_amqp_management_error(amqp_management_instance->on_amqp_management_error_context); + break; + case MESSAGE_SENDER_STATE_IDLE: + case MESSAGE_SENDER_STATE_CLOSING: + /* Codes_SRS_AMQP_MANAGEMENT_09_003: [ - If `new_state` is `MESSAGE_SENDER_STATE_CLOSING` or `MESSAGE_SENDER_STATE_IDLE`, `on_message_sender_state_changed` shall do nothing. ]*/ + break; + } + break; + } /* Codes_SRS_AMQP_MANAGEMENT_01_146: [ For the current state of AMQP management being `ERROR`: ]*/ case AMQP_MANAGEMENT_STATE_ERROR: /* Codes_SRS_AMQP_MANAGEMENT_01_147: [ - All state transitions shall be ignored. ]*/ @@ -958,6 +1020,17 @@ } else { + AMQP_MANAGEMENT_STATE previous_state = amqp_management->amqp_management_state; + + amqp_management->amqp_management_state = AMQP_MANAGEMENT_STATE_CLOSING; + + if (previous_state == AMQP_MANAGEMENT_STATE_OPENING) + { + /* Codes_SRS_AMQP_MANAGEMENT_01_048: [ `amqp_management_close` on an AMQP management instance that is OPENING shall trigger the `on_amqp_management_open_complete` callback with `AMQP_MANAGEMENT_OPEN_CANCELLED`, while also passing the context passed in `amqp_management_open_async`. ]*/ + amqp_management->on_amqp_management_open_complete(amqp_management->on_amqp_management_open_complete_context, AMQP_MANAGEMENT_OPEN_CANCELLED); + } + + /* Codes_SRS_AMQP_MANAGEMENT_01_045: [ `amqp_management_close` shall close the AMQP management instance. ]*/ /* Codes_SRS_AMQP_MANAGEMENT_01_050: [ `amqp_management_close` shall close the message sender by calling `messagesender_close`. ]*/ if (messagesender_close(amqp_management->message_sender) != 0) @@ -998,12 +1071,6 @@ list_item_handle = singlylinkedlist_get_head_item(amqp_management->pending_operations); } - if (amqp_management->amqp_management_state == AMQP_MANAGEMENT_STATE_OPENING) - { - /* Codes_SRS_AMQP_MANAGEMENT_01_048: [ `amqp_management_close` on an AMQP management instance that is OPENING shall trigger the `on_amqp_management_open_complete` callback with `AMQP_MANAGEMENT_OPEN_CANCELLED`, while also passing the context passed in `amqp_management_open_async`. ]*/ - amqp_management->on_amqp_management_open_complete(amqp_management->on_amqp_management_open_complete_context, AMQP_MANAGEMENT_OPEN_CANCELLED); - } - amqp_management->amqp_management_state = AMQP_MANAGEMENT_STATE_IDLE; /* Codes_SRS_AMQP_MANAGEMENT_01_046: [ On success it shall return 0. ]*/ @@ -1126,6 +1193,7 @@ pending_operation_message->callback_context = on_execute_operation_complete_context; pending_operation_message->on_execute_operation_complete = on_execute_operation_complete; pending_operation_message->message_id = amqp_management->next_message_id; + pending_operation_message->amqp_management = amqp_management; /* Codes_SRS_AMQP_MANAGEMENT_01_091: [ Once the request message has been sent, an entry shall be stored in the pending operations list by calling `singlylinkedlist_add`. ]*/ added_item = singlylinkedlist_add(amqp_management->pending_operations, pending_operation_message); @@ -1138,8 +1206,9 @@ } else { - /* Codes_SRS_AMQP_MANAGEMENT_01_088: [ `amqp_management_execute_operation_async` shall send the message by calling `messagesender_send`. ]*/ - if (messagesender_send_async(amqp_management->message_sender, cloned_message, NULL, NULL, 0) == NULL) + /* Codes_SRS_AMQP_MANAGEMENT_01_088: [ `amqp_management_execute_operation_async` shall send the message by calling `messagesender_send_async`. ]*/ + /* Codes_SRS_AMQP_MANAGEMENT_01_166: [ The `on_message_send_complete` callback shall be passed to the `messagesender_send_async` call. ]*/ + if (messagesender_send_async(amqp_management->message_sender, cloned_message, on_message_send_complete, added_item, 0) == NULL) { /* Codes_SRS_AMQP_MANAGEMENT_01_089: [ If `messagesender_send_async` fails, `amqp_management_execute_operation_async` shall fail and return a non-zero value. ]*/ LogError("Could not send request message");