A small memory footprint AMQP implimentation
Dependents: iothub_client_sample_amqp remote_monitoring simplesample_amqp
Diff: session.c
- Revision:
- 6:641a9672db08
- Parent:
- 5:ae49385aff34
- Child:
- 7:9e9ab3b0efef
--- a/session.c Fri Jul 01 10:42:48 2016 -0700 +++ b/session.c Fri Jul 29 15:58:39 2016 -0700 @@ -46,9 +46,12 @@ handle handle_max; uint32_t remote_incoming_window; uint32_t remote_outgoing_window; - unsigned char is_underlying_connection_open : 1; + int is_underlying_connection_open : 1; } SESSION_INSTANCE; +#define UNDERLYING_CONNECTION_NOT_OPEN 0 +#define UNDERLYING_CONNECTION_OPEN -1 + static void session_set_state(SESSION_INSTANCE* session_instance, SESSION_STATE session_state) { uint64_t i; @@ -664,7 +667,7 @@ result->remote_incoming_window = 0; result->remote_outgoing_window = 0; result->previous_session_state = SESSION_STATE_UNMAPPED; - result->is_underlying_connection_open = 0; + result->is_underlying_connection_open = UNDERLYING_CONNECTION_NOT_OPEN; result->session_state = SESSION_STATE_UNMAPPED; result->on_link_attached = on_link_attached; result->on_link_attached_callback_context = callback_context; @@ -713,7 +716,7 @@ result->remote_incoming_window = 0; result->remote_outgoing_window = 0; result->previous_session_state = SESSION_STATE_UNMAPPED; - result->is_underlying_connection_open = 0; + result->is_underlying_connection_open = UNDERLYING_CONNECTION_NOT_OPEN; result->session_state = SESSION_STATE_UNMAPPED; result->on_link_attached = on_link_attached; result->on_link_attached_callback_context = callback_context; @@ -769,12 +772,12 @@ { if (connection_open(session_instance->connection) != 0) { - session_instance->is_underlying_connection_open = 0; + session_instance->is_underlying_connection_open = UNDERLYING_CONNECTION_NOT_OPEN; result = __LINE__; } else { - session_instance->is_underlying_connection_open = 1; + session_instance->is_underlying_connection_open = UNDERLYING_CONNECTION_OPEN; result = 0; } } @@ -1268,7 +1271,6 @@ } else { - LINK_ENDPOINT_INSTANCE* link_endpoint_instance = (LINK_ENDPOINT_INSTANCE*)link_endpoint; AMQP_VALUE disposition_performative_value = amqpvalue_create_disposition(disposition); if (disposition_performative_value == NULL) { @@ -1358,207 +1360,220 @@ } else { - uint32_t payload_size = 0; + size_t payload_size = 0; size_t i; for (i = 0; i < payload_count; i++) { - payload_size += payloads[i].length; - } + if ((payloads[i].length > UINT32_MAX) || + (payload_size + payloads[i].length < payload_size)) + { + break; + } + + payload_size += payloads[i].length; + } - if (session_instance->remote_incoming_window == 0) - { - result = SESSION_SEND_TRANSFER_BUSY; - } - else - { - /* Codes_SRS_SESSION_01_012: [The session endpoint assigns each outgoing transfer frame an implicit transfer-id from a session scoped sequence.] */ - /* Codes_SRS_SESSION_01_027: [sending a transfer Upon sending a transfer, the sending endpoint will increment its next-outgoing-id] */ - *delivery_id = session_instance->next_outgoing_id; - if ((transfer_set_handle(transfer, link_endpoint_instance->output_handle) != 0) || - (transfer_set_delivery_id(transfer, *delivery_id) != 0) || - (transfer_set_more(transfer, false) != 0)) - { - /* Codes_SRS_SESSION_01_058: [When any other error occurs, session_send_transfer shall fail and return a non-zero value.] */ - result = SESSION_SEND_TRANSFER_ERROR; - } - else - { - AMQP_VALUE transfer_value; + if ((i < payload_count) || + (payload_size > UINT32_MAX)) + { + result = SESSION_SEND_TRANSFER_ERROR; + } + else + { + if (session_instance->remote_incoming_window == 0) + { + result = SESSION_SEND_TRANSFER_BUSY; + } + else + { + /* Codes_SRS_SESSION_01_012: [The session endpoint assigns each outgoing transfer frame an implicit transfer-id from a session scoped sequence.] */ + /* Codes_SRS_SESSION_01_027: [sending a transfer Upon sending a transfer, the sending endpoint will increment its next-outgoing-id] */ + *delivery_id = session_instance->next_outgoing_id; + if ((transfer_set_handle(transfer, link_endpoint_instance->output_handle) != 0) || + (transfer_set_delivery_id(transfer, *delivery_id) != 0) || + (transfer_set_more(transfer, false) != 0)) + { + /* Codes_SRS_SESSION_01_058: [When any other error occurs, session_send_transfer shall fail and return a non-zero value.] */ + result = SESSION_SEND_TRANSFER_ERROR; + } + else + { + AMQP_VALUE transfer_value; - transfer_value = amqpvalue_create_transfer(transfer); - if (transfer_value == NULL) - { - /* Codes_SRS_SESSION_01_058: [When any other error occurs, session_send_transfer shall fail and return a non-zero value.] */ - result = SESSION_SEND_TRANSFER_ERROR; - } - else - { - uint32_t available_frame_size; - size_t encoded_size; - - if ((connection_get_remote_max_frame_size(session_instance->connection, &available_frame_size) != 0) || - (amqpvalue_get_encoded_size(transfer_value, &encoded_size) != 0)) - { - result = SESSION_SEND_TRANSFER_ERROR; - } - else - { - uint32_t payload_size = 0; - size_t i; + transfer_value = amqpvalue_create_transfer(transfer); + if (transfer_value == NULL) + { + /* Codes_SRS_SESSION_01_058: [When any other error occurs, session_send_transfer shall fail and return a non-zero value.] */ + result = SESSION_SEND_TRANSFER_ERROR; + } + else + { + uint32_t available_frame_size; + size_t encoded_size; - for (i = 0; i < payload_count; i++) - { - payload_size += payloads[i].length; - } - - available_frame_size -= encoded_size; - available_frame_size -= 8; + if ((connection_get_remote_max_frame_size(session_instance->connection, &available_frame_size) != 0) || + (amqpvalue_get_encoded_size(transfer_value, &encoded_size) != 0)) + { + result = SESSION_SEND_TRANSFER_ERROR; + } + else + { + payload_size = 0; - if (available_frame_size >= payload_size) - { - /* Codes_SRS_SESSION_01_055: [The encoding of the frame shall be done by calling connection_encode_frame and passing as arguments: the connection handle associated with the session, the transfer performative and the payload chunks passed to session_send_transfer.] */ - if (connection_encode_frame(session_instance->endpoint, transfer_value, payloads, payload_count, on_send_complete, callback_context) != 0) - { - /* Codes_SRS_SESSION_01_056: [If connection_encode_frame fails then session_send_transfer shall fail and return a non-zero value.] */ - result = SESSION_SEND_TRANSFER_ERROR; - } - else - { - /* Codes_SRS_SESSION_01_018: [is incremented after each successive transfer according to RFC-1982 [RFC1982] serial number arithmetic.] */ - session_instance->next_outgoing_id++; - session_instance->remote_incoming_window--; - session_instance->outgoing_window--; + for (i = 0; i < payload_count; i++) + { + payload_size += payloads[i].length; + } + + available_frame_size -= (uint32_t)encoded_size; + available_frame_size -= 8; - /* Codes_SRS_SESSION_01_053: [On success, session_send_transfer shall return 0.] */ - result = SESSION_SEND_TRANSFER_OK; - } - } - else - { - size_t current_payload_index = 0; - uint32_t current_payload_pos = 0; + if (available_frame_size >= payload_size) + { + /* Codes_SRS_SESSION_01_055: [The encoding of the frame shall be done by calling connection_encode_frame and passing as arguments: the connection handle associated with the session, the transfer performative and the payload chunks passed to session_send_transfer.] */ + if (connection_encode_frame(session_instance->endpoint, transfer_value, payloads, payload_count, on_send_complete, callback_context) != 0) + { + /* Codes_SRS_SESSION_01_056: [If connection_encode_frame fails then session_send_transfer shall fail and return a non-zero value.] */ + result = SESSION_SEND_TRANSFER_ERROR; + } + else + { + /* Codes_SRS_SESSION_01_018: [is incremented after each successive transfer according to RFC-1982 [RFC1982] serial number arithmetic.] */ + session_instance->next_outgoing_id++; + session_instance->remote_incoming_window--; + session_instance->outgoing_window--; - /* break it down into different deliveries */ - while (payload_size > 0) - { - uint32_t transfer_frame_payload_count = 0; - uint32_t current_transfer_frame_payload_size = payload_size; - uint32_t byte_counter; - size_t temp_current_payload_index = current_payload_index; - uint32_t temp_current_payload_pos = current_payload_pos; - AMQP_VALUE multi_transfer_amqp_value; - bool more; + /* Codes_SRS_SESSION_01_053: [On success, session_send_transfer shall return 0.] */ + result = SESSION_SEND_TRANSFER_OK; + } + } + else + { + size_t current_payload_index = 0; + uint32_t current_payload_pos = 0; - if (current_transfer_frame_payload_size > available_frame_size) - { - current_transfer_frame_payload_size = available_frame_size; - } + /* break it down into different deliveries */ + while (payload_size > 0) + { + uint32_t transfer_frame_payload_count = 0; + uint32_t current_transfer_frame_payload_size = (uint32_t)payload_size; + uint32_t byte_counter; + size_t temp_current_payload_index = current_payload_index; + uint32_t temp_current_payload_pos = current_payload_pos; + AMQP_VALUE multi_transfer_amqp_value; + bool more; + + if (current_transfer_frame_payload_size > available_frame_size) + { + current_transfer_frame_payload_size = available_frame_size; + } - if (available_frame_size >= payload_size) - { - more = false; - } - else - { - more = true; - } + if (available_frame_size >= payload_size) + { + more = false; + } + else + { + more = true; + } - if (transfer_set_more(transfer, more) != 0) - { - break; - } + if (transfer_set_more(transfer, more) != 0) + { + break; + } - multi_transfer_amqp_value = amqpvalue_create_transfer(transfer); - if (multi_transfer_amqp_value == NULL) - { - break; - } + multi_transfer_amqp_value = amqpvalue_create_transfer(transfer); + if (multi_transfer_amqp_value == NULL) + { + break; + } - byte_counter = current_transfer_frame_payload_size; - while (byte_counter > 0) - { - if (payloads[temp_current_payload_index].length - temp_current_payload_pos >= byte_counter) - { - /* more data than we need */ - temp_current_payload_pos += byte_counter; - byte_counter = 0; - } - else - { - byte_counter -= payloads[temp_current_payload_index].length - temp_current_payload_pos; - temp_current_payload_index++; - temp_current_payload_pos = 0; - } - } + byte_counter = current_transfer_frame_payload_size; + while (byte_counter > 0) + { + if (payloads[temp_current_payload_index].length - temp_current_payload_pos >= byte_counter) + { + /* more data than we need */ + temp_current_payload_pos += byte_counter; + byte_counter = 0; + } + else + { + byte_counter -= (uint32_t)payloads[temp_current_payload_index].length - temp_current_payload_pos; + temp_current_payload_index++; + temp_current_payload_pos = 0; + } + } - transfer_frame_payload_count = temp_current_payload_index - current_payload_index + 1; - PAYLOAD* transfer_frame_payloads = (PAYLOAD*)amqpalloc_malloc(transfer_frame_payload_count * sizeof(PAYLOAD)); - if (transfer_frame_payloads == NULL) - { - amqpvalue_destroy(multi_transfer_amqp_value); - break; - } + transfer_frame_payload_count = (uint32_t)(temp_current_payload_index - current_payload_index + 1); + PAYLOAD* transfer_frame_payloads = (PAYLOAD*)amqpalloc_malloc(transfer_frame_payload_count * sizeof(PAYLOAD)); + if (transfer_frame_payloads == NULL) + { + amqpvalue_destroy(multi_transfer_amqp_value); + break; + } - /* copy data */ - byte_counter = current_transfer_frame_payload_size; - transfer_frame_payload_count = 0; + /* copy data */ + byte_counter = current_transfer_frame_payload_size; + transfer_frame_payload_count = 0; - while (byte_counter > 0) - { - if (payloads[current_payload_index].length - current_payload_pos > byte_counter) - { - /* more data than we need */ - transfer_frame_payloads[transfer_frame_payload_count].bytes = payloads[current_payload_index].bytes + current_payload_pos; - transfer_frame_payloads[transfer_frame_payload_count].length = byte_counter; - current_payload_pos += byte_counter; - byte_counter = 0; - } - else - { - /* copy entire payload and move to the next */ - transfer_frame_payloads[transfer_frame_payload_count].bytes = payloads[current_payload_index].bytes + current_payload_pos; - transfer_frame_payloads[transfer_frame_payload_count].length = payloads[current_payload_index].length - current_payload_pos; - byte_counter -= payloads[current_payload_index].length - current_payload_pos; - current_payload_index++; - current_payload_pos = 0; - } + while (byte_counter > 0) + { + if (payloads[current_payload_index].length - current_payload_pos > byte_counter) + { + /* more data than we need */ + transfer_frame_payloads[transfer_frame_payload_count].bytes = payloads[current_payload_index].bytes + current_payload_pos; + transfer_frame_payloads[transfer_frame_payload_count].length = byte_counter; + current_payload_pos += byte_counter; + byte_counter = 0; + } + else + { + /* copy entire payload and move to the next */ + transfer_frame_payloads[transfer_frame_payload_count].bytes = payloads[current_payload_index].bytes + current_payload_pos; + transfer_frame_payloads[transfer_frame_payload_count].length = payloads[current_payload_index].length - current_payload_pos; + byte_counter -= (uint32_t)payloads[current_payload_index].length - current_payload_pos; + current_payload_index++; + current_payload_pos = 0; + } - transfer_frame_payload_count++; - } + transfer_frame_payload_count++; + } - if (connection_encode_frame(session_instance->endpoint, multi_transfer_amqp_value, transfer_frame_payloads, transfer_frame_payload_count, on_send_complete, callback_context) != 0) - { - amqpalloc_free(transfer_frame_payloads); - amqpvalue_destroy(multi_transfer_amqp_value); - break; - } + if (connection_encode_frame(session_instance->endpoint, multi_transfer_amqp_value, transfer_frame_payloads, transfer_frame_payload_count, on_send_complete, callback_context) != 0) + { + amqpalloc_free(transfer_frame_payloads); + amqpvalue_destroy(multi_transfer_amqp_value); + break; + } - amqpalloc_free(transfer_frame_payloads); - amqpvalue_destroy(multi_transfer_amqp_value); - payload_size -= current_transfer_frame_payload_size; - } + amqpalloc_free(transfer_frame_payloads); + amqpvalue_destroy(multi_transfer_amqp_value); + payload_size -= current_transfer_frame_payload_size; + } - if (payload_size > 0) - { - result = SESSION_SEND_TRANSFER_ERROR; - } - else - { - /* Codes_SRS_SESSION_01_018: [is incremented after each successive transfer according to RFC-1982 [RFC1982] serial number arithmetic.] */ - session_instance->next_outgoing_id++; - session_instance->remote_incoming_window--; - session_instance->outgoing_window--; + if (payload_size > 0) + { + result = SESSION_SEND_TRANSFER_ERROR; + } + else + { + /* Codes_SRS_SESSION_01_018: [is incremented after each successive transfer according to RFC-1982 [RFC1982] serial number arithmetic.] */ + session_instance->next_outgoing_id++; + session_instance->remote_incoming_window--; + session_instance->outgoing_window--; - result = SESSION_SEND_TRANSFER_OK; - } - } - } + result = SESSION_SEND_TRANSFER_OK; + } + } + } - amqpvalue_destroy(transfer_value); - } - } - } + amqpvalue_destroy(transfer_value); + } + } + } + } } }