A small memory footprint AMQP implimentation
Dependents: iothub_client_sample_amqp remote_monitoring simplesample_amqp
Diff: amqp_management.c
- Revision:
- 22:524bded3f7a8
- Parent:
- 21:f9c433d8e6ca
- Child:
- 23:1111ee8bcba4
--- a/amqp_management.c Fri Mar 24 16:35:45 2017 -0700 +++ b/amqp_management.c Thu Apr 06 14:11:27 2017 -0700 @@ -24,11 +24,19 @@ { MESSAGE_HANDLE message; OPERATION_STATE operation_state; - ON_OPERATION_COMPLETE on_operation_complete; + ON_AMQP_MANAGEMENT_EXECUTE_OPERATION_COMPLETE on_execute_operation_complete; void* callback_context; unsigned long message_id; } OPERATION_MESSAGE_INSTANCE; +typedef enum AMQP_MANAGEMENT_STATE_TAG +{ + AMQP_MANAGEMENT_STATE_IDLE, + AMQP_MANAGEMENT_STATE_OPENING, + AMQP_MANAGEMENT_STATE_OPEN, + AMQP_MANAGEMENT_STATE_ERROR +} AMQP_MANAGEMENT_STATE; + typedef struct AMQP_MANAGEMENT_INSTANCE_TAG { SESSION_HANDLE session; @@ -39,25 +47,15 @@ OPERATION_MESSAGE_INSTANCE** operation_messages; size_t operation_message_count; unsigned long next_message_id; - ON_AMQP_MANAGEMENT_STATE_CHANGED on_amqp_management_state_changed; - void* callback_context; + ON_AMQP_MANAGEMENT_OPEN_COMPLETE on_amqp_management_open_complete; + void* on_amqp_management_open_complete_context; + ON_AMQP_MANAGEMENT_ERROR on_amqp_management_error; + void* on_amqp_management_error_context; AMQP_MANAGEMENT_STATE amqp_management_state; - AMQP_MANAGEMENT_STATE previous_amqp_management_state; int sender_connected : 1; int receiver_connected : 1; } AMQP_MANAGEMENT_INSTANCE; -static void amqpmanagement_set_state(AMQP_MANAGEMENT_INSTANCE* amqp_management_instance, AMQP_MANAGEMENT_STATE amqp_management_state) -{ - amqp_management_instance->previous_amqp_management_state = amqp_management_instance->amqp_management_state; - amqp_management_instance->amqp_management_state = amqp_management_state; - - if (amqp_management_instance->on_amqp_management_state_changed != NULL) - { - amqp_management_instance->on_amqp_management_state_changed(amqp_management_instance->callback_context, amqp_management_instance->amqp_management_state, amqp_management_instance->previous_amqp_management_state); - } -} - static void remove_operation_message_by_index(AMQP_MANAGEMENT_INSTANCE* amqp_management_instance, size_t index) { message_destroy(amqp_management_instance->operation_messages[index]->message); @@ -166,7 +164,7 @@ if (amqp_management_instance->operation_messages[i]->operation_state == OPERATION_STATE_AWAIT_REPLY) { AMQP_VALUE expected_message_id = amqpvalue_create_ulong(amqp_management_instance->operation_messages[i]->message_id); - OPERATION_RESULT operation_result; + AMQP_MANAGEMENT_EXECUTE_OPERATION_RESULT execute_operation_result; if (expected_message_id == NULL) { @@ -179,14 +177,14 @@ /* 202 is not mentioned in the draft in any way, this is a workaround for an EH bug for now */ if ((status_code != 200) && (status_code != 202)) { - operation_result = OPERATION_RESULT_OPERATION_FAILED; + execute_operation_result = AMQP_MANAGEMENT_EXECUTE_OPERATION_FAILED_BAD_STATUS; } else { - operation_result = OPERATION_RESULT_OK; + execute_operation_result = AMQP_MANAGEMENT_EXECUTE_OPERATION_OK; } - amqp_management_instance->operation_messages[i]->on_operation_complete(amqp_management_instance->operation_messages[i]->callback_context, operation_result, status_code, status_description); + amqp_management_instance->operation_messages[i]->on_execute_operation_complete(amqp_management_instance->operation_messages[i]->callback_context, execute_operation_result, status_code, status_description); remove_operation_message_by_index(amqp_management_instance, i); @@ -281,14 +279,52 @@ break; case MESSAGE_SENDER_STATE_OPEN: - amqp_management_instance->sender_connected = 1; - (void)send_operation_messages(amqp_management_instance); + switch (amqp_management_instance->amqp_management_state) + { + default: + case AMQP_MANAGEMENT_STATE_ERROR: + /* do nothing */ + break; + + case AMQP_MANAGEMENT_STATE_OPENING: + amqp_management_instance->sender_connected = -1; + if (amqp_management_instance->receiver_connected != 0) + { + (void)send_operation_messages(amqp_management_instance); + amqp_management_instance->amqp_management_state = AMQP_MANAGEMENT_STATE_OPEN; + amqp_management_instance->on_amqp_management_open_complete(amqp_management_instance->on_amqp_management_open_complete_context, AMQP_MANAGEMENT_OPEN_OK); + } + break; + + case AMQP_MANAGEMENT_STATE_OPEN: + amqp_management_instance->amqp_management_state = AMQP_MANAGEMENT_STATE_ERROR; + amqp_management_instance->on_amqp_management_error(amqp_management_instance->on_amqp_management_error_context); + break; + } + break; case MESSAGE_SENDER_STATE_CLOSING: + case MESSAGE_SENDER_STATE_IDLE: case MESSAGE_SENDER_STATE_ERROR: amqp_management_instance->sender_connected = 0; - amqpmanagement_set_state(amqp_management_instance, AMQP_MANAGEMENT_STATE_ERROR); + switch (amqp_management_instance->amqp_management_state) + { + default: + case AMQP_MANAGEMENT_STATE_ERROR: + /* do nothing */ + break; + + case AMQP_MANAGEMENT_STATE_OPENING: + amqp_management_instance->amqp_management_state = AMQP_MANAGEMENT_STATE_IDLE; + amqp_management_instance->on_amqp_management_open_complete(amqp_management_instance->on_amqp_management_open_complete_context, AMQP_MANAGEMENT_OPEN_ERROR); + break; + + case AMQP_MANAGEMENT_STATE_OPEN: + amqp_management_instance->amqp_management_state = AMQP_MANAGEMENT_STATE_ERROR; + amqp_management_instance->on_amqp_management_error(amqp_management_instance->on_amqp_management_error_context); + break; + } break; } } @@ -303,14 +339,53 @@ break; case MESSAGE_RECEIVER_STATE_OPEN: - amqp_management_instance->receiver_connected = 1; - (void)send_operation_messages(amqp_management_instance); + switch (amqp_management_instance->amqp_management_state) + { + default: + case AMQP_MANAGEMENT_STATE_ERROR: + /* do nothing */ + break; + + case AMQP_MANAGEMENT_STATE_OPENING: + amqp_management_instance->receiver_connected = -1; + + if (amqp_management_instance->sender_connected != 0) + { + (void)send_operation_messages(amqp_management_instance); + amqp_management_instance->amqp_management_state = AMQP_MANAGEMENT_STATE_OPEN; + amqp_management_instance->on_amqp_management_open_complete(amqp_management_instance->on_amqp_management_open_complete_context, AMQP_MANAGEMENT_OPEN_OK); + } + break; + + case AMQP_MANAGEMENT_STATE_OPEN: + amqp_management_instance->amqp_management_state = AMQP_MANAGEMENT_STATE_ERROR; + amqp_management_instance->on_amqp_management_error(amqp_management_instance->on_amqp_management_error_context); + break; + } + break; case MESSAGE_RECEIVER_STATE_CLOSING: + case MESSAGE_RECEIVER_STATE_IDLE: case MESSAGE_RECEIVER_STATE_ERROR: - amqp_management_instance->receiver_connected = 0; - amqpmanagement_set_state(amqp_management_instance, AMQP_MANAGEMENT_STATE_ERROR); + amqp_management_instance->sender_connected = 0; + switch (amqp_management_instance->amqp_management_state) + { + default: + case AMQP_MANAGEMENT_STATE_ERROR: + /* do nothing */ + break; + + case AMQP_MANAGEMENT_STATE_OPENING: + amqp_management_instance->amqp_management_state = AMQP_MANAGEMENT_STATE_IDLE; + amqp_management_instance->on_amqp_management_open_complete(amqp_management_instance->on_amqp_management_open_complete_context, AMQP_MANAGEMENT_OPEN_ERROR); + break; + + case AMQP_MANAGEMENT_STATE_OPEN: + amqp_management_instance->amqp_management_state = AMQP_MANAGEMENT_STATE_ERROR; + amqp_management_instance->on_amqp_management_error(amqp_management_instance->on_amqp_management_error_context); + break; + } break; } } @@ -400,7 +475,7 @@ return result; } -AMQP_MANAGEMENT_HANDLE amqpmanagement_create(SESSION_HANDLE session, const char* management_node, ON_AMQP_MANAGEMENT_STATE_CHANGED on_amqp_management_state_changed, void* callback_context) +AMQP_MANAGEMENT_HANDLE amqp_management_create(SESSION_HANDLE session, const char* management_node) { AMQP_MANAGEMENT_INSTANCE* result; @@ -418,8 +493,10 @@ result->receiver_connected = 0; result->operation_message_count = 0; result->operation_messages = NULL; - result->on_amqp_management_state_changed = on_amqp_management_state_changed; - result->callback_context = callback_context; + result->on_amqp_management_open_complete = NULL; + result->on_amqp_management_open_complete_context = NULL; + result->on_amqp_management_error = NULL; + result->on_amqp_management_error_context = NULL; result->amqp_management_state = AMQP_MANAGEMENT_STATE_IDLE; AMQP_VALUE source = messaging_create_source(management_node); @@ -534,11 +611,11 @@ return result; } -void amqpmanagement_destroy(AMQP_MANAGEMENT_HANDLE amqp_management) +void amqp_management_destroy(AMQP_MANAGEMENT_HANDLE amqp_management) { if (amqp_management != NULL) { - (void)amqpmanagement_close(amqp_management); + (void)amqp_management_close(amqp_management); if (amqp_management->operation_message_count > 0) { @@ -560,7 +637,7 @@ } } -int amqpmanagement_open(AMQP_MANAGEMENT_HANDLE amqp_management) +int amqp_management_open_async(AMQP_MANAGEMENT_HANDLE amqp_management, ON_AMQP_MANAGEMENT_OPEN_COMPLETE on_amqp_management_open_complete, void* on_amqp_management_open_complete_context, ON_AMQP_MANAGEMENT_ERROR on_amqp_management_error, void* on_amqp_management_error_context) { int result; @@ -570,14 +647,22 @@ } else { + amqp_management->on_amqp_management_open_complete = on_amqp_management_open_complete; + amqp_management->on_amqp_management_open_complete_context = on_amqp_management_open_complete_context; + amqp_management->on_amqp_management_error = on_amqp_management_error; + amqp_management->on_amqp_management_error_context = on_amqp_management_error_context; + amqp_management->amqp_management_state = AMQP_MANAGEMENT_STATE_OPENING; + if (messagereceiver_open(amqp_management->message_receiver, on_message_received, amqp_management) != 0) { + amqp_management->amqp_management_state = AMQP_MANAGEMENT_STATE_IDLE; result = __FAILURE__; } else { if (messagesender_open(amqp_management->message_sender) != 0) { + amqp_management->amqp_management_state = AMQP_MANAGEMENT_STATE_IDLE; messagereceiver_close(amqp_management->message_receiver); result = __FAILURE__; } @@ -591,7 +676,7 @@ return result; } -int amqpmanagement_close(AMQP_MANAGEMENT_HANDLE amqp_management) +int amqp_management_close(AMQP_MANAGEMENT_HANDLE amqp_management) { int result; @@ -608,6 +693,7 @@ } else { + amqp_management->amqp_management_state = AMQP_MANAGEMENT_STATE_IDLE; result = 0; } } @@ -615,7 +701,7 @@ return result; } -int amqpmanagement_start_operation(AMQP_MANAGEMENT_HANDLE amqp_management, const char* operation, const char* type, const char* locales, MESSAGE_HANDLE message, ON_OPERATION_COMPLETE on_operation_complete, void* context) +int amqp_management_execute_operation_async(AMQP_MANAGEMENT_HANDLE amqp_management, const char* operation, const char* type, const char* locales, MESSAGE_HANDLE message, ON_AMQP_MANAGEMENT_EXECUTE_OPERATION_COMPLETE on_execute_operation_complete, void* on_execute_operation_complete_context) { int result; @@ -656,8 +742,8 @@ else { pending_operation_message->message = message_clone(message); - pending_operation_message->callback_context = context; - pending_operation_message->on_operation_complete = on_operation_complete; + pending_operation_message->callback_context = on_execute_operation_complete_context; + pending_operation_message->on_execute_operation_complete = on_execute_operation_complete; pending_operation_message->operation_state = OPERATION_STATE_NOT_SENT; pending_operation_message->message_id = amqp_management->next_message_id; @@ -678,9 +764,9 @@ if (send_operation_messages(amqp_management) != 0) { - if (on_operation_complete != NULL) + if (on_execute_operation_complete != NULL) { - on_operation_complete(context, OPERATION_RESULT_ERROR, 0, NULL); + on_execute_operation_complete(on_execute_operation_complete_context, AMQP_MANAGEMENT_EXECUTE_OPERATION_ERROR, 0, NULL); } result = __FAILURE__; @@ -700,7 +786,7 @@ return result; } -void amqpmanagement_set_trace(AMQP_MANAGEMENT_HANDLE amqp_management, bool traceOn) +void amqp_management_set_trace(AMQP_MANAGEMENT_HANDLE amqp_management, bool traceOn) { if (amqp_management != NULL) {