A small memory footprint AMQP implimentation
Dependents: iothub_client_sample_amqp remote_monitoring simplesample_amqp
Diff: connection.c
- Revision:
- 28:add19eb7defa
- Parent:
- 25:1101516ee67d
- Child:
- 30:0407b2db334c
--- a/connection.c Fri Jun 02 15:53:07 2017 -0700 +++ b/connection.c Fri Jun 30 10:41:22 2017 -0700 @@ -109,10 +109,10 @@ if (xio_send(connection->io, amqp_header, sizeof(amqp_header), NULL, NULL) != 0) { /* Codes_SRS_CONNECTION_01_106: [When sending the protocol header fails, the connection shall be immediately closed.] */ - if (xio_close(connection->io, NULL, NULL) != 0) - { - LogError("xio_close failed"); - } + if (xio_close(connection->io, NULL, NULL) != 0) + { + LogError("xio_close failed"); + } /* Codes_SRS_CONNECTION_01_057: [END In this state it is illegal for either endpoint to write anything more onto the connection. The connection can be safely closed and discarded.] */ connection_set_state(connection, CONNECTION_STATE_END); @@ -189,13 +189,13 @@ UNUSED(performative); #else AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(performative); - if (descriptor == NULL) - { - LogError("Error getting performative descriptor"); - } - else - { - char* performative_as_string; + if (descriptor == NULL) + { + LogError("Error getting performative descriptor"); + } + else + { + char* performative_as_string; LOG(AZ_LOG_TRACE, 0, "<- "); LOG(AZ_LOG_TRACE, 0, (char*)get_frame_type_as_string(descriptor)); performative_as_string = NULL; @@ -214,14 +214,14 @@ UNUSED(performative); #else AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(performative); - if (descriptor == NULL) - { - LogError("Error getting performative descriptor"); - } - else - { - char* performative_as_string; - LOG(AZ_LOG_TRACE, 0, "-> "); + if (descriptor == NULL) + { + LogError("Error getting performative descriptor"); + } + else + { + char* performative_as_string; + LOG(AZ_LOG_TRACE, 0, "-> "); LOG(AZ_LOG_TRACE, 0, (char*)get_frame_type_as_string(descriptor)); performative_as_string = NULL; LOG(AZ_LOG_TRACE, LOG_LINE, (performative_as_string = amqpvalue_to_string(performative))); @@ -238,12 +238,12 @@ CONNECTION_HANDLE connection = (CONNECTION_HANDLE)context; if (xio_send(connection->io, bytes, length, encode_complete ? connection->on_send_complete : NULL, connection->on_send_complete_callback_context) != 0) { - LogError("Cannot send encoded bytes"); + LogError("Cannot send encoded bytes"); - if (xio_close(connection->io, NULL, NULL) != 0) - { - LogError("xio_close failed"); - } + if (xio_close(connection->io, NULL, NULL) != 0) + { + LogError("xio_close failed"); + } connection_set_state(connection, CONNECTION_STATE_END); } @@ -256,15 +256,15 @@ /* Codes_SRS_CONNECTION_01_151: [The connection max_frame_size setting shall be passed down to the frame_codec when the Open frame is sent.] */ if (frame_codec_set_max_frame_size(connection->frame_codec, connection->max_frame_size) != 0) { - LogError("Cannot set max frame size"); + LogError("Cannot set max frame size"); /* Codes_SRS_CONNECTION_01_207: [If frame_codec_set_max_frame_size fails the connection shall be closed and the state set to END.] */ - if (xio_close(connection->io, NULL, NULL) != 0) - { - LogError("xio_close failed"); - } + if (xio_close(connection->io, NULL, NULL) != 0) + { + LogError("xio_close failed"); + } - connection_set_state(connection, CONNECTION_STATE_END); + connection_set_state(connection, CONNECTION_STATE_END); result = __FAILURE__; } else @@ -273,15 +273,15 @@ OPEN_HANDLE open_performative = open_create(connection->container_id); if (open_performative == NULL) { - LogError("Cannot create OPEN performative"); - - /* Codes_SRS_CONNECTION_01_208: [If the open frame cannot be constructed, the connection shall be closed and set to the END state.] */ - if (xio_close(connection->io, NULL, NULL) != 0) - { - LogError("xio_close failed"); - } + LogError("Cannot create OPEN performative"); + + /* Codes_SRS_CONNECTION_01_208: [If the open frame cannot be constructed, the connection shall be closed and set to the END state.] */ + if (xio_close(connection->io, NULL, NULL) != 0) + { + LogError("xio_close failed"); + } - connection_set_state(connection, CONNECTION_STATE_END); + connection_set_state(connection, CONNECTION_STATE_END); result = __FAILURE__; } else @@ -289,29 +289,29 @@ /* Codes_SRS_CONNECTION_01_137: [The max_frame_size connection setting shall be set in the open frame by using open_set_max_frame_size.] */ if (open_set_max_frame_size(open_performative, connection->max_frame_size) != 0) { - LogError("Cannot set max frame size"); + LogError("Cannot set max frame size"); - /* Codes_SRS_CONNECTION_01_208: [If the open frame cannot be constructed, the connection shall be closed and set to the END state.] */ - if (xio_close(connection->io, NULL, NULL) != 0) - { - LogError("xio_close failed"); - } + /* Codes_SRS_CONNECTION_01_208: [If the open frame cannot be constructed, the connection shall be closed and set to the END state.] */ + if (xio_close(connection->io, NULL, NULL) != 0) + { + LogError("xio_close failed"); + } - connection_set_state(connection, CONNECTION_STATE_END); + connection_set_state(connection, CONNECTION_STATE_END); result = __FAILURE__; } /* Codes_SRS_CONNECTION_01_139: [The channel_max connection setting shall be set in the open frame by using open_set_channel_max.] */ else if (open_set_channel_max(open_performative, connection->channel_max) != 0) { - LogError("Cannot set max channel"); + LogError("Cannot set max channel"); - /* Codes_SRS_CONNECTION_01_208: [If the open frame cannot be constructed, the connection shall be closed and set to the END state.] */ - if (xio_close(connection->io, NULL, NULL) != 0) - { - LogError("xio_close failed"); - } + /* Codes_SRS_CONNECTION_01_208: [If the open frame cannot be constructed, the connection shall be closed and set to the END state.] */ + if (xio_close(connection->io, NULL, NULL) != 0) + { + LogError("xio_close failed"); + } - connection_set_state(connection, CONNECTION_STATE_END); + connection_set_state(connection, CONNECTION_STATE_END); result = __FAILURE__; } /* Codes_SRS_CONNECTION_01_142: [If no idle_timeout value has been specified, no value shall be stamped in the open frame (no call to open_set_idle_time_out shall be made).] */ @@ -320,12 +320,12 @@ (open_set_idle_time_out(open_performative, connection->idle_timeout) != 0)) { /* Codes_SRS_CONNECTION_01_208: [If the open frame cannot be constructed, the connection shall be closed and set to the END state.] */ - if (xio_close(connection->io, NULL, NULL) != 0) - { - LogError("xio_close failed"); - } + if (xio_close(connection->io, NULL, NULL) != 0) + { + LogError("xio_close failed"); + } - connection_set_state(connection, CONNECTION_STATE_END); + connection_set_state(connection, CONNECTION_STATE_END); result = __FAILURE__; } /* Codes_SRS_CONNECTION_01_136: [If no hostname value has been specified, no value shall be stamped in the open frame (no call to open_set_hostname shall be made).] */ @@ -333,15 +333,15 @@ /* Codes_SRS_CONNECTION_01_135: [If hostname has been specified by a call to connection_set_hostname, then that value shall be stamped in the open frame.] */ (open_set_hostname(open_performative, connection->host_name) != 0)) { - LogError("Cannot set hostname"); + LogError("Cannot set hostname"); - /* Codes_SRS_CONNECTION_01_208: [If the open frame cannot be constructed, the connection shall be closed and set to the END state.] */ - if (xio_close(connection->io, NULL, NULL) != 0) - { - LogError("xio_close failed"); - } + /* Codes_SRS_CONNECTION_01_208: [If the open frame cannot be constructed, the connection shall be closed and set to the END state.] */ + if (xio_close(connection->io, NULL, NULL) != 0) + { + LogError("xio_close failed"); + } - connection_set_state(connection, CONNECTION_STATE_END); + connection_set_state(connection, CONNECTION_STATE_END); result = __FAILURE__; } else @@ -349,15 +349,15 @@ AMQP_VALUE open_performative_value = amqpvalue_create_open(open_performative); if (open_performative_value == NULL) { - LogError("Cannot create OPEN AMQP value"); + LogError("Cannot create OPEN AMQP value"); - /* Codes_SRS_CONNECTION_01_208: [If the open frame cannot be constructed, the connection shall be closed and set to the END state.] */ - if (xio_close(connection->io, NULL, NULL) != 0) - { - LogError("xio_close failed"); - } + /* Codes_SRS_CONNECTION_01_208: [If the open frame cannot be constructed, the connection shall be closed and set to the END state.] */ + if (xio_close(connection->io, NULL, NULL) != 0) + { + LogError("xio_close failed"); + } - connection_set_state(connection, CONNECTION_STATE_END); + connection_set_state(connection, CONNECTION_STATE_END); result = __FAILURE__; } else @@ -367,19 +367,19 @@ /* Codes_SRS_CONNECTION_01_005: [The open frame describes the capabilities and limits of that peer.] */ /* Codes_SRS_CONNECTION_01_205: [Sending the AMQP OPEN frame shall be done by calling amqp_frame_codec_begin_encode_frame with channel number 0, the actual performative payload and 0 as payload_size.] */ /* Codes_SRS_CONNECTION_01_006: [The open frame can only be sent on channel 0.] */ - connection->on_send_complete = NULL; - connection->on_send_complete_callback_context = NULL; + connection->on_send_complete = NULL; + connection->on_send_complete_callback_context = NULL; if (amqp_frame_codec_encode_frame(connection->amqp_frame_codec, 0, open_performative_value, NULL, 0, on_bytes_encoded, connection) != 0) { - LogError("amqp_frame_codec_encode_frame failed"); + LogError("amqp_frame_codec_encode_frame failed"); - /* Codes_SRS_CONNECTION_01_206: [If sending the frame fails, the connection shall be closed and state set to END.] */ - if (xio_close(connection->io, NULL, NULL) != 0) - { - LogError("xio_close failed"); - } + /* Codes_SRS_CONNECTION_01_206: [If sending the frame fails, the connection shall be closed and state set to END.] */ + if (xio_close(connection->io, NULL, NULL) != 0) + { + LogError("xio_close failed"); + } - connection_set_state(connection, CONNECTION_STATE_END); + connection_set_state(connection, CONNECTION_STATE_END); result = __FAILURE__; } else @@ -414,8 +414,8 @@ close_performative = close_create(); if (close_performative == NULL) { - LogError("Cannot create close performative"); - result = __FAILURE__; + LogError("Cannot create close performative"); + result = __FAILURE__; } else { @@ -423,27 +423,27 @@ /* Codes_SRS_CONNECTION_01_238: [If set, this field indicates that the connection is being closed due to an error condition.] */ (close_set_error(close_performative, error_handle) != 0)) { - LogError("Cannot set error on CLOSE"); - result = __FAILURE__; + LogError("Cannot set error on CLOSE"); + result = __FAILURE__; } else { AMQP_VALUE close_performative_value = amqpvalue_create_close(close_performative); if (close_performative_value == NULL) { - LogError("Cannot create AMQP CLOSE performative value"); - result = __FAILURE__; + LogError("Cannot create AMQP CLOSE performative value"); + result = __FAILURE__; } else { /* Codes_SRS_CONNECTION_01_215: [Sending the AMQP CLOSE frame shall be done by calling amqp_frame_codec_begin_encode_frame with channel number 0, the actual performative payload and 0 as payload_size.] */ /* Codes_SRS_CONNECTION_01_013: [However, implementations SHOULD send it on channel 0] */ - connection->on_send_complete = NULL; - connection->on_send_complete_callback_context = NULL; + connection->on_send_complete = NULL; + connection->on_send_complete_callback_context = NULL; if (amqp_frame_codec_encode_frame(connection->amqp_frame_codec, 0, close_performative_value, NULL, 0, on_bytes_encoded, connection) != 0) { - LogError("amqp_frame_codec_encode_frame failed"); - result = __FAILURE__; + LogError("amqp_frame_codec_encode_frame failed"); + result = __FAILURE__; } else { @@ -471,37 +471,37 @@ if (error_handle == NULL) { /* Codes_SRS_CONNECTION_01_214: [If the close frame cannot be constructed or sent, the connection shall be closed and set to the END state.] */ - if (xio_close(connection->io, NULL, NULL) != 0) - { - LogError("xio_close failed"); - } + if (xio_close(connection->io, NULL, NULL) != 0) + { + LogError("xio_close failed"); + } - connection_set_state(connection, CONNECTION_STATE_END); + connection_set_state(connection, CONNECTION_STATE_END); } else { /* Codes_SRS_CONNECTION_01_219: [The error description shall be set to an implementation defined string.] */ - if (error_set_description(error_handle, description) != 0) - { - LogError("Cannot set error description on CLOSE frame"); + if (error_set_description(error_handle, description) != 0) + { + LogError("Cannot set error description on CLOSE frame"); - /* Codes_SRS_CONNECTION_01_214: [If the close frame cannot be constructed or sent, the connection shall be closed and set to the END state.] */ - if (xio_close(connection->io, NULL, NULL) != 0) - { - LogError("xio_close failed"); - } + /* Codes_SRS_CONNECTION_01_214: [If the close frame cannot be constructed or sent, the connection shall be closed and set to the END state.] */ + if (xio_close(connection->io, NULL, NULL) != 0) + { + LogError("xio_close failed"); + } - connection_set_state(connection, CONNECTION_STATE_END); - } - else if (send_close_frame(connection, error_handle) != 0) - { - LogError("Cannot send CLOSE frame"); + connection_set_state(connection, CONNECTION_STATE_END); + } + else if (send_close_frame(connection, error_handle) != 0) + { + LogError("Cannot send CLOSE frame"); - /* Codes_SRS_CONNECTION_01_214: [If the close frame cannot be constructed or sent, the connection shall be closed and set to the END state.] */ - if (xio_close(connection->io, NULL, NULL) != 0) - { - LogError("xio_close failed"); - } + /* Codes_SRS_CONNECTION_01_214: [If the close frame cannot be constructed or sent, the connection shall be closed and set to the END state.] */ + if (xio_close(connection->io, NULL, NULL) != 0) + { + LogError("xio_close failed"); + } connection_set_state(connection, CONNECTION_STATE_END); } @@ -532,8 +532,8 @@ if (i == connection->endpoint_count) { - LogError("Cannot find session endpoint for channel %u", (unsigned int)outgoing_channel); - result = NULL; + LogError("Cannot find session endpoint for channel %u", (unsigned int)outgoing_channel); + result = NULL; } else { @@ -558,8 +558,8 @@ if (i == connection->endpoint_count) { - LogError("Cannot find session endpoint for channel %u", (unsigned int)incoming_channel); - result = NULL; + LogError("Cannot find session endpoint for channel %u", (unsigned int)incoming_channel); + result = NULL; } else { @@ -576,8 +576,8 @@ switch (connection->connection_state) { default: - LogError("Unknown connection state: %d", (int)connection->connection_state); - result = __FAILURE__; + LogError("Unknown connection state: %d", (int)connection->connection_state); + result = __FAILURE__; break; /* Codes_SRS_CONNECTION_01_039: [START In this state a connection exists, but nothing has been sent or received. This is the state an implementation would be in immediately after performing a socket connect or socket accept.] */ @@ -588,17 +588,17 @@ if (b != amqp_header[connection->header_bytes_received]) { /* Codes_SRS_CONNECTION_01_089: [If the incoming and outgoing protocol headers do not match, both peers MUST close their outgoing stream] */ - if (xio_close(connection->io, NULL, NULL) != 0) - { - LogError("xio_close failed"); - } + if (xio_close(connection->io, NULL, NULL) != 0) + { + LogError("xio_close failed"); + } - connection_set_state(connection, CONNECTION_STATE_END); + connection_set_state(connection, CONNECTION_STATE_END); result = __FAILURE__; } else { - connection->header_bytes_received++; + connection->header_bytes_received++; if (connection->header_bytes_received == sizeof(amqp_header)) { if (connection->is_trace_on == 1) @@ -610,8 +610,8 @@ if (send_open_frame(connection) != 0) { - LogError("Cannot send open frame"); - connection_set_state(connection, CONNECTION_STATE_END); + LogError("Cannot send open frame"); + connection_set_state(connection, CONNECTION_STATE_END); } } @@ -637,8 +637,8 @@ /* Codes_SRS_CONNECTION_01_212: [After the initial handshake has been done all bytes received from the io instance shall be passed to the frame_codec for decoding by calling frame_codec_receive_bytes.] */ if (frame_codec_receive_bytes(connection->frame_codec, &b, 1) != 0) { - LogError("Cannot process received bytes"); - /* Codes_SRS_CONNECTION_01_218: [The error amqp:internal-error shall be set in the error.condition field of the CLOSE frame.] */ + LogError("Cannot process received bytes"); + /* Codes_SRS_CONNECTION_01_218: [The error amqp:internal-error shall be set in the error.condition field of the CLOSE frame.] */ /* Codes_SRS_CONNECTION_01_219: [The error description shall be set to an implementation defined string.] */ close_connection_with_error(connection, "amqp:internal-error", "connection_byte_received::frame_codec_receive_bytes failed"); result = __FAILURE__; @@ -662,8 +662,8 @@ { if (connection_byte_received((CONNECTION_HANDLE)context, buffer[i]) != 0) { - LogError("Cannot process received bytes"); - break; + LogError("Cannot process received bytes"); + break; } } } @@ -678,16 +678,16 @@ switch (connection->connection_state) { default: - LogError("Unknown connection state: %d", (int)connection->connection_state); - break; + LogError("Unknown connection state: %d", (int)connection->connection_state); + break; case CONNECTION_STATE_START: /* Codes_SRS_CONNECTION_01_086: [Prior to sending any frames on a connection_instance, each peer MUST start by sending a protocol header that indicates the protocol version used on the connection_instance.] */ /* Codes_SRS_CONNECTION_01_091: [The AMQP peer which acted in the role of the TCP client (i.e. the peer that actively opened the connection_instance) MUST immediately send its outgoing protocol header on establishment of the TCP connection_instance.] */ - if (send_header(connection) != 0) - { - LogError("Cannot send header"); - } + if (send_header(connection) != 0) + { + LogError("Cannot send header"); + } break; case CONNECTION_STATE_HDR_SENT: @@ -701,8 +701,8 @@ /* Codes_SRS_CONNECTION_01_005: [The open frame describes the capabilities and limits of that peer.] */ if (send_open_frame(connection) != 0) { - LogError("Cannot send OPEN frame"); - connection_set_state(connection, CONNECTION_STATE_END); + LogError("Cannot send OPEN frame"); + connection_set_state(connection, CONNECTION_STATE_END); } break; @@ -718,51 +718,51 @@ static void connection_on_io_error(void* context) { - CONNECTION_HANDLE connection = (CONNECTION_HANDLE)context; + CONNECTION_HANDLE connection = (CONNECTION_HANDLE)context; /* Codes_SRS_CONNECTION_22_005: [If the io notifies the connection instance of an IO_STATE_ERROR state and an io error callback is registered, the connection shall call the registered callback.] */ if (connection->on_io_error) { - connection->on_io_error(connection->on_io_error_callback_context); + connection->on_io_error(connection->on_io_error_callback_context); } if (connection->connection_state != CONNECTION_STATE_END) { /* Codes_SRS_CONNECTION_01_202: [If the io notifies the connection instance of an IO_STATE_ERROR state the connection shall be closed and the state set to END.] */ connection_set_state(connection, CONNECTION_STATE_ERROR); - if (xio_close(connection->io, NULL, NULL) != 0) - { - LogError("xio_close failed"); - } - } + if (xio_close(connection->io, NULL, NULL) != 0) + { + LogError("xio_close failed"); + } + } } static void on_empty_amqp_frame_received(void* context, uint16_t channel) { - CONNECTION_HANDLE connection = (CONNECTION_HANDLE)context; - /* It does not matter on which channel we received the frame */ - (void)channel; + CONNECTION_HANDLE connection = (CONNECTION_HANDLE)context; + /* It does not matter on which channel we received the frame */ + (void)channel; - if (connection->is_trace_on == 1) + if (connection->is_trace_on == 1) { LOG(AZ_LOG_TRACE, LOG_LINE, "<- Empty frame"); } if (tickcounter_get_current_ms(connection->tick_counter, &connection->last_frame_received_time) != 0) { - LogError("Cannot get tickcounter value"); - } + LogError("Cannot get tickcounter value"); + } } static void on_amqp_frame_received(void* context, uint16_t channel, AMQP_VALUE performative, const unsigned char* payload_bytes, uint32_t payload_size) { CONNECTION_HANDLE connection = (CONNECTION_HANDLE)context; - (void)channel; + (void)channel; if (tickcounter_get_current_ms(connection->tick_counter, &connection->last_frame_received_time) != 0) { - LogError("Cannot get tickcounter value"); - close_connection_with_error(connection, "amqp:internal-error", "cannot get current tick count"); + LogError("Cannot get tickcounter value"); + close_connection_with_error(connection, "amqp:internal-error", "cannot get current tick count"); } else { @@ -773,10 +773,10 @@ default: if (performative == NULL) { - /* Codes_SRS_CONNECTION_01_223: [If the on_endpoint_frame_received is called with a NULL performative then the connection shall be closed with the error condition amqp:internal-error and an implementation defined error description.] */ + /* Codes_SRS_CONNECTION_01_223: [If the on_endpoint_frame_received is called with a NULL performative then the connection shall be closed with the error condition amqp:internal-error and an implementation defined error description.] */ close_connection_with_error(connection, "amqp:internal-error", "connection_endpoint_frame_received::NULL performative"); - LogError("connection_endpoint_frame_received::NULL performative"); - } + LogError("connection_endpoint_frame_received::NULL performative"); + } else { AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(performative); @@ -791,18 +791,18 @@ { if (channel != 0) { - /* Codes_SRS_CONNECTION_01_006: [The open frame can only be sent on channel 0.] */ + /* Codes_SRS_CONNECTION_01_006: [The open frame can only be sent on channel 0.] */ /* Codes_SRS_CONNECTION_01_222: [If an Open frame is received in a manner violating the ISO specification, the connection shall be closed with condition amqp:not-allowed and description being an implementation defined string.] */ close_connection_with_error(connection, "amqp:not-allowed", "OPEN frame received on a channel that is not 0"); - LogError("OPEN frame received on a channel that is not 0"); - } + LogError("OPEN frame received on a channel that is not 0"); + } if (connection->connection_state == CONNECTION_STATE_OPENED) { /* Codes_SRS_CONNECTION_01_239: [If an Open frame is received in the Opened state the connection shall be closed with condition amqp:illegal-state and description being an implementation defined string.] */ close_connection_with_error(connection, "amqp:illegal-state", "OPEN frame received in the OPENED state"); - LogError("OPEN frame received in the OPENED state"); - } + LogError("OPEN frame received in the OPENED state"); + } else if ((connection->connection_state == CONNECTION_STATE_OPEN_SENT) || (connection->connection_state == CONNECTION_STATE_HDR_EXCH)) { @@ -812,8 +812,8 @@ /* Codes_SRS_CONNECTION_01_143: [If any of the values in the received open frame are invalid then the connection shall be closed.] */ /* Codes_SRS_CONNECTION_01_220: [The error amqp:invalid-field shall be set in the error.condition field of the CLOSE frame.] */ close_connection_with_error(connection, "amqp:invalid-field", "connection_endpoint_frame_received::failed parsing OPEN frame"); - LogError("connection_endpoint_frame_received::failed parsing OPEN frame"); - } + LogError("connection_endpoint_frame_received::failed parsing OPEN frame"); + } else { (void)open_get_idle_time_out(open_handle, &connection->remote_idle_timeout); @@ -824,8 +824,8 @@ /* Codes_SRS_CONNECTION_01_143: [If any of the values in the received open frame are invalid then the connection shall be closed.] */ /* Codes_SRS_CONNECTION_01_220: [The error amqp:invalid-field shall be set in the error.condition field of the CLOSE frame.] */ close_connection_with_error(connection, "amqp:invalid-field", "connection_endpoint_frame_received::failed parsing OPEN frame"); - LogError("connection_endpoint_frame_received::failed parsing OPEN frame"); - } + LogError("connection_endpoint_frame_received::failed parsing OPEN frame"); + } else { if (connection->connection_state == CONNECTION_STATE_OPEN_SENT) @@ -869,11 +869,11 @@ /* Codes_SRS_CONNECTION_01_236: [DISCARDING - * TCP Close for Write] */ (connection->connection_state == CONNECTION_STATE_DISCARDING)) { - if (xio_close(connection->io, NULL, NULL) != 0) - { - LogError("xio_close failed"); - } - } + if (xio_close(connection->io, NULL, NULL) != 0) + { + LogError("xio_close failed"); + } + } else { CLOSE_HANDLE close_handle; @@ -882,31 +882,31 @@ if (channel > connection->channel_max) { close_connection_with_error(connection, "amqp:invalid-field", "connection_endpoint_frame_received::failed parsing CLOSE frame"); - LogError("connection_endpoint_frame_received::failed parsing CLOSE frame"); - } + LogError("connection_endpoint_frame_received::failed parsing CLOSE frame"); + } else { if (amqpvalue_get_close(performative, &close_handle) != 0) { close_connection_with_error(connection, "amqp:invalid-field", "connection_endpoint_frame_received::failed parsing CLOSE frame"); - LogError("connection_endpoint_frame_received::failed parsing CLOSE frame"); - } + LogError("connection_endpoint_frame_received::failed parsing CLOSE frame"); + } else { close_destroy(close_handle); connection_set_state(connection, CONNECTION_STATE_CLOSE_RCVD); - if (send_close_frame(connection, NULL) != 0) - { - LogError("Cannot send CLOSE frame"); - } + if (send_close_frame(connection, NULL) != 0) + { + LogError("Cannot send CLOSE frame"); + } /* Codes_SRS_CONNECTION_01_214: [If the close frame cannot be constructed or sent, the connection shall be closed and set to the END state.] */ - if (xio_close(connection->io, NULL, NULL) != 0) - { - LogError("xio_close failed"); - } + if (xio_close(connection->io, NULL, NULL) != 0) + { + LogError("xio_close failed"); + } connection_set_state(connection, CONNECTION_STATE_END); } @@ -920,7 +920,7 @@ switch (performative_ulong) { default: - LogError("Bad performative: %02x", performative); + LogError("Bad performative: %02x", performative); break; case AMQP_BEGIN: @@ -929,8 +929,8 @@ if (amqpvalue_get_begin(performative, &begin) != 0) { - LogError("Cannot get begin performative"); - } + LogError("Cannot get begin performative"); + } else { uint16_t remote_channel; @@ -939,7 +939,7 @@ if (begin_get_remote_channel(begin, &remote_channel) != 0) { - remote_begin = true; + remote_begin = true; if (connection->on_new_endpoint != NULL) { new_endpoint = connection_create_endpoint(connection); @@ -956,8 +956,8 @@ ENDPOINT_INSTANCE* session_endpoint = find_session_endpoint_by_outgoing_channel(connection, remote_channel); if (session_endpoint == NULL) { - LogError("Cannot create session endpoint"); - } + LogError("Cannot create session endpoint"); + } else { session_endpoint->incoming_channel = channel; @@ -989,8 +989,8 @@ ENDPOINT_INSTANCE* session_endpoint = find_session_endpoint_by_incoming_channel(connection, channel); if (session_endpoint == NULL) { - LogError("Cannot find session endpoint for channel %u", (unsigned int)channel); - } + LogError("Cannot find session endpoint for channel %u", (unsigned int)channel); + } else { session_endpoint->on_endpoint_frame_received(session_endpoint->callback_context, performative, payload_size, payload_bytes); @@ -1015,11 +1015,11 @@ /* Codes_SRS_CONNECTION_01_234: [CLOSE_RCVD * - TCP Close for Read] */ case CONNECTION_STATE_END: /* Codes_SRS_CONNECTION_01_237: [END - - TCP Close] */ - if (xio_close(connection->io, NULL, NULL) != 0) - { - LogError("xio_close failed"); - } - break; + if (xio_close(connection->io, NULL, NULL) != 0) + { + LogError("xio_close failed"); + } + break; } } } @@ -1029,16 +1029,16 @@ { /* Bug: some error handling should happen here Filed: uAMQP: frame_codec error and amqp_frame_codec_error should handle the errors */ - LogError("A frame_codec_error occured"); - (void)context; + LogError("A frame_codec_error occured"); + (void)context; } static void amqp_frame_codec_error(void* context) { /* Bug: some error handling should happen here Filed: uAMQP: frame_codec error and amqp_frame_codec_error should handle the errors */ - LogError("An amqp_frame_codec_error occured"); - (void)context; + LogError("An amqp_frame_codec_error occured"); + (void)context; } /* Codes_SRS_CONNECTION_01_001: [connection_create shall open a new connection to a specified host/port.] */ @@ -1057,20 +1057,20 @@ (container_id == NULL)) { /* Codes_SRS_CONNECTION_01_071: [If xio or container_id is NULL, connection_create shall return NULL.] */ - LogError("Bad arguments: xio = %p, container_id = %p", - xio, container_id); - result = NULL; + LogError("Bad arguments: xio = %p, container_id = %p", + xio, container_id); + result = NULL; } else { result = (CONNECTION_HANDLE)malloc(sizeof(CONNECTION_INSTANCE)); /* Codes_SRS_CONNECTION_01_081: [If allocating the memory for the connection fails then connection_create shall return NULL.] */ - if (result == NULL) - { - LogError("Cannot allocate memory for connection"); - } - else - { + if (result == NULL) + { + LogError("Cannot allocate memory for connection"); + } + else + { result->io = xio; /* Codes_SRS_CONNECTION_01_082: [connection_create shall allocate a new frame_codec instance to be used for frame encoding/decoding.] */ @@ -1078,8 +1078,8 @@ if (result->frame_codec == NULL) { /* Codes_SRS_CONNECTION_01_083: [If frame_codec_create fails then connection_create shall return NULL.] */ - LogError("Cannot create frame_codec"); - free(result); + LogError("Cannot create frame_codec"); + free(result); result = NULL; } else @@ -1088,8 +1088,8 @@ if (result->amqp_frame_codec == NULL) { /* Codes_SRS_CONNECTION_01_108: [If amqp_frame_codec_create fails, connection_create shall return NULL.] */ - LogError("Cannot create amqp_frame_codec"); - frame_codec_destroy(result->frame_codec); + LogError("Cannot create amqp_frame_codec"); + frame_codec_destroy(result->frame_codec); free(result); result = NULL; } @@ -1102,8 +1102,8 @@ if (result->host_name == NULL) { /* Codes_SRS_CONNECTION_01_081: [If allocating the memory for the connection fails then connection_create shall return NULL.] */ - LogError("Cannot allocate memory for host name"); - amqp_frame_codec_destroy(result->amqp_frame_codec); + LogError("Cannot allocate memory for host name"); + amqp_frame_codec_destroy(result->amqp_frame_codec); frame_codec_destroy(result->frame_codec); free(result); result = NULL; @@ -1125,8 +1125,8 @@ if (result->container_id == NULL) { /* Codes_SRS_CONNECTION_01_081: [If allocating the memory for the connection fails then connection_create shall return NULL.] */ - LogError("Cannot allocate memory for container_id"); - free(result->host_name); + LogError("Cannot allocate memory for container_id"); + free(result->host_name); amqp_frame_codec_destroy(result->amqp_frame_codec); frame_codec_destroy(result->frame_codec); free(result); @@ -1137,8 +1137,8 @@ result->tick_counter = tickcounter_create(); if (result->tick_counter == NULL) { - LogError("Cannot create tick counter"); - free(result->container_id); + LogError("Cannot create tick counter"); + free(result->container_id); free(result->host_name); amqp_frame_codec_destroy(result->amqp_frame_codec); frame_codec_destroy(result->frame_codec); @@ -1211,12 +1211,12 @@ void connection_destroy(CONNECTION_HANDLE connection) { /* Codes_SRS_CONNECTION_01_079: [If handle is NULL, connection_destroy shall do nothing.] */ - if (connection == NULL) - { - LogError("NULL connection"); - } - else - { + if (connection == NULL) + { + LogError("NULL connection"); + } + else + { /* Codes_SRS_CONNECTION_01_073: [connection_destroy shall free all resources associated with a connection.] */ if (connection->is_underlying_io_open) { @@ -1241,8 +1241,8 @@ if (connection == NULL) { - LogError("NULL connection"); - result = __FAILURE__; + LogError("NULL connection"); + result = __FAILURE__; } else { @@ -1250,8 +1250,8 @@ { if (xio_open(connection->io, connection_on_io_open_complete, connection, connection_on_bytes_received, connection, connection_on_io_error, connection) != 0) { - LogError("Opening the underlying IO failed"); - connection_set_state(connection, CONNECTION_STATE_END); + LogError("Opening the underlying IO failed"); + connection_set_state(connection, CONNECTION_STATE_END); result = __FAILURE__; } else @@ -1278,8 +1278,8 @@ if (connection == NULL) { - LogError("NULL connection"); - result = __FAILURE__; + LogError("NULL connection"); + result = __FAILURE__; } else { @@ -1287,8 +1287,8 @@ { if (xio_open(connection->io, connection_on_io_open_complete, connection, connection_on_bytes_received, connection, connection_on_io_error, connection) != 0) { - LogError("Opening the underlying IO failed"); - connection_set_state(connection, CONNECTION_STATE_END); + LogError("Opening the underlying IO failed"); + connection_set_state(connection, CONNECTION_STATE_END); result = __FAILURE__; } else @@ -1315,8 +1315,8 @@ if (connection == NULL) { - LogError("NULL connection"); - result = __FAILURE__; + LogError("NULL connection"); + result = __FAILURE__; } else { @@ -1326,20 +1326,20 @@ } else { - if (send_close_frame(connection, NULL) != 0) - { - LogError("Sending CLOSE frame failed"); - } + if (send_close_frame(connection, NULL) != 0) + { + LogError("Sending CLOSE frame failed"); + } connection_set_state(connection, CONNECTION_STATE_END); } - if (xio_close(connection->io, NULL, NULL) != 0) - { - LogError("xio_close failed"); - } + if (xio_close(connection->io, NULL, NULL) != 0) + { + LogError("xio_close failed"); + } - connection->is_underlying_io_open = 1; + connection->is_underlying_io_open = 1; result = 0; } @@ -1352,25 +1352,25 @@ int result; /* Codes_SRS_CONNECTION_01_163: [If connection is NULL, connection_set_max_frame_size shall fail and return a non-zero value.] */ - if (connection == NULL) - { - LogError("NULL connection"); - result = __FAILURE__; - } - /* Codes_SRS_CONNECTION_01_150: [If the max_frame_size is invalid then connection_set_max_frame_size shall fail and return a non-zero value.] */ - /* Codes_SRS_CONNECTION_01_167: [Both peers MUST accept frames of up to 512 (MIN-MAX-FRAME-SIZE) octets.] */ - else if (max_frame_size < 512) + if (connection == NULL) { - LogError("max_frame_size too small"); - result = __FAILURE__; + LogError("NULL connection"); + result = __FAILURE__; + } + /* Codes_SRS_CONNECTION_01_150: [If the max_frame_size is invalid then connection_set_max_frame_size shall fail and return a non-zero value.] */ + /* Codes_SRS_CONNECTION_01_167: [Both peers MUST accept frames of up to 512 (MIN-MAX-FRAME-SIZE) octets.] */ + else if (max_frame_size < 512) + { + LogError("max_frame_size too small"); + result = __FAILURE__; } else { /* Codes_SRS_CONNECTION_01_157: [If connection_set_max_frame_size is called after the initial Open frame has been sent, it shall fail and return a non-zero value.] */ if (connection->connection_state != CONNECTION_STATE_START) { - LogError("Connection already open"); - result = __FAILURE__; + LogError("Connection already open"); + result = __FAILURE__; } else { @@ -1394,9 +1394,9 @@ if ((connection == NULL) || (max_frame_size == NULL)) { - LogError("Bad arguments: connection = %p, max_frame_size = %p", - connection, max_frame_size); - result = __FAILURE__; + LogError("Bad arguments: connection = %p, max_frame_size = %p", + connection, max_frame_size); + result = __FAILURE__; } else { @@ -1417,16 +1417,16 @@ /* Codes_SRS_CONNECTION_01_181: [If connection is NULL then connection_set_channel_max shall fail and return a non-zero value.] */ if (connection == NULL) { - LogError("NULL connection"); - result = __FAILURE__; + LogError("NULL connection"); + result = __FAILURE__; } else { /* Codes_SRS_CONNECTION_01_156: [If connection_set_channel_max is called after the initial Open frame has been sent, it shall fail and return a non-zero value.] */ if (connection->connection_state != CONNECTION_STATE_START) { - LogError("Connection already open"); - result = __FAILURE__; + LogError("Connection already open"); + result = __FAILURE__; } else { @@ -1450,9 +1450,9 @@ if ((connection == NULL) || (channel_max == NULL)) { - LogError("Bad arguments: connection = %p, channel_max = %p", - connection, channel_max); - result = __FAILURE__; + LogError("Bad arguments: connection = %p, channel_max = %p", + connection, channel_max); + result = __FAILURE__; } else { @@ -1473,16 +1473,16 @@ /* Codes_SRS_CONNECTION_01_191: [If connection is NULL, connection_set_idle_timeout shall fail and return a non-zero value.] */ if (connection == NULL) { - LogError("NULL connection"); - result = __FAILURE__; + LogError("NULL connection"); + result = __FAILURE__; } else { /* Codes_SRS_CONNECTION_01_158: [If connection_set_idle_timeout is called after the initial Open frame has been sent, it shall fail and return a non-zero value.] */ if (connection->connection_state != CONNECTION_STATE_START) { - LogError("Connection already open"); - result = __FAILURE__; + LogError("Connection already open"); + result = __FAILURE__; } else { @@ -1507,9 +1507,9 @@ if ((connection == NULL) || (idle_timeout == NULL)) { - LogError("Bad arguments: connection = %p, idle_timeout = %p", - connection, idle_timeout); - result = __FAILURE__; + LogError("Bad arguments: connection = %p, idle_timeout = %p", + connection, idle_timeout); + result = __FAILURE__; } else { @@ -1530,9 +1530,9 @@ if ((connection == NULL) || (remote_max_frame_size == NULL)) { - LogError("Bad arguments: connection = %p, remote_max_frame_size = %p", - connection, remote_max_frame_size); - result = __FAILURE__; + LogError("Bad arguments: connection = %p, remote_max_frame_size = %p", + connection, remote_max_frame_size); + result = __FAILURE__; } else { @@ -1549,18 +1549,18 @@ uint64_t local_deadline = (uint64_t )-1; uint64_t remote_deadline = (uint64_t)-1; - if (connection == NULL) - { - LogError("NULL connection"); - } - else - { + if (connection == NULL) + { + LogError("NULL connection"); + } + else + { tickcounter_ms_t current_ms; if (tickcounter_get_current_ms(connection->tick_counter, ¤t_ms) != 0) { - LogError("Could not get tick counter value"); - close_connection_with_error(connection, "amqp:internal-error", "Could not get tick count"); + LogError("Could not get tick counter value"); + close_connection_with_error(connection, "amqp:internal-error", "Could not get tick count"); } else { @@ -1598,9 +1598,9 @@ connection->on_send_complete = NULL; if (amqp_frame_codec_encode_empty_frame(connection->amqp_frame_codec, 0, on_bytes_encoded, connection) != 0) { - LogError("Encoding the empty frame failed"); - /* close connection */ - close_connection_with_error(connection, "amqp:internal-error", "Cannot send empty frame"); + LogError("Encoding the empty frame failed"); + /* close connection */ + close_connection_with_error(connection, "amqp:internal-error", "Cannot send empty frame"); } else { @@ -1625,12 +1625,12 @@ void connection_dowork(CONNECTION_HANDLE connection) { /* Codes_SRS_CONNECTION_01_078: [If handle is NULL, connection_dowork shall do nothing.] */ - if (connection == NULL) - { - LogError("NULL connection"); - } - else - { + if (connection == NULL) + { + LogError("NULL connection"); + } + else + { if (connection_handle_deadlines(connection) > 0) { /* Codes_SRS_CONNECTION_01_076: [connection_dowork shall schedule the underlying IO interface to do its work by calling xio_dowork.] */ @@ -1641,14 +1641,14 @@ ENDPOINT_HANDLE connection_create_endpoint(CONNECTION_HANDLE connection) { - ENDPOINT_HANDLE result; + ENDPOINT_HANDLE result; /* Codes_SRS_CONNECTION_01_113: [If connection, on_endpoint_frame_received or on_connection_state_changed is NULL, connection_create_endpoint shall fail and return NULL.] */ /* Codes_SRS_CONNECTION_01_193: [The context argument shall be allowed to be NULL.] */ if (connection == NULL) { - LogError("NULL connection"); - result = NULL; + LogError("NULL connection"); + result = NULL; } else { @@ -1674,13 +1674,13 @@ /* Codes_SRS_CONNECTION_01_127: [On success, connection_create_endpoint shall return a non-NULL handle to the newly created endpoint.] */ result = (ENDPOINT_HANDLE)malloc(sizeof(ENDPOINT_INSTANCE)); /* Codes_SRS_CONNECTION_01_196: [If memory cannot be allocated for the new endpoint, connection_create_endpoint shall fail and return NULL.] */ - if (result == NULL) - { - LogError("Cannot allocate memory for endpoint"); - } - else - { - ENDPOINT_HANDLE* new_endpoints; + if (result == NULL) + { + LogError("Cannot allocate memory for endpoint"); + } + else + { + ENDPOINT_HANDLE* new_endpoints; result->on_endpoint_frame_received = NULL; result->on_connection_state_changed = NULL; @@ -1693,8 +1693,8 @@ if (new_endpoints == NULL) { /* Tests_SRS_CONNECTION_01_198: [If adding the endpoint to the endpoints list tracked by the connection fails, connection_create_endpoint shall fail and return NULL.] */ - LogError("Cannot reallocate memory for connection endpoints"); - free(result); + LogError("Cannot reallocate memory for connection endpoints"); + free(result); result = NULL; } else @@ -1726,9 +1726,9 @@ (on_endpoint_frame_received == NULL) || (on_connection_state_changed == NULL)) { - LogError("Bad arguments: endpoint = %p, on_endpoint_frame_received = %p, on_connection_state_changed = %p", - endpoint, on_endpoint_frame_received, on_connection_state_changed); - result = __FAILURE__; + LogError("Bad arguments: endpoint = %p, on_endpoint_frame_received = %p, on_connection_state_changed = %p", + endpoint, on_endpoint_frame_received, on_connection_state_changed); + result = __FAILURE__; } else { @@ -1749,9 +1749,9 @@ if ((endpoint == NULL) || (incoming_channel == NULL)) { - LogError("Bad arguments: endpoint = %p, incoming_channel = %p", - endpoint, incoming_channel); - result = __FAILURE__; + LogError("Bad arguments: endpoint = %p, incoming_channel = %p", + endpoint, incoming_channel); + result = __FAILURE__; } else { @@ -1765,12 +1765,12 @@ /* Codes_SRS_CONNECTION_01_129: [connection_destroy_endpoint shall free all resources associated with an endpoint created by connection_create_endpoint.] */ void connection_destroy_endpoint(ENDPOINT_HANDLE endpoint) { - if (endpoint == NULL) - { - LogError("NULL endpoint"); - } - else - { + if (endpoint == NULL) + { + LogError("NULL endpoint"); + } + else + { CONNECTION_HANDLE connection = (CONNECTION_HANDLE)endpoint->connection; size_t i; @@ -1784,25 +1784,25 @@ /* Codes_SRS_CONNECTION_01_130: [The outgoing channel associated with the endpoint shall be released by removing the endpoint from the endpoint list.] */ /* Codes_SRS_CONNECTION_01_131: [Any incoming channel number associated with the endpoint shall be released.] */ - if ((i < connection->endpoint_count) && (i > 0)) - { - ENDPOINT_HANDLE* new_endpoints; - (void)memmove(connection->endpoints + i, connection->endpoints + i + 1, sizeof(ENDPOINT_HANDLE) * (connection->endpoint_count - i - 1)); + if ((i < connection->endpoint_count) && (i > 0)) + { + ENDPOINT_HANDLE* new_endpoints; + (void)memmove(connection->endpoints + i, connection->endpoints + i + 1, sizeof(ENDPOINT_HANDLE) * (connection->endpoint_count - i - 1)); - new_endpoints = (ENDPOINT_HANDLE*)realloc(connection->endpoints, (connection->endpoint_count - 1) * sizeof(ENDPOINT_HANDLE)); - if (new_endpoints != NULL) - { - connection->endpoints = new_endpoints; - } + new_endpoints = (ENDPOINT_HANDLE*)realloc(connection->endpoints, (connection->endpoint_count - 1) * sizeof(ENDPOINT_HANDLE)); + if (new_endpoints != NULL) + { + connection->endpoints = new_endpoints; + } - connection->endpoint_count--; - } - else if (connection->endpoint_count == 1) - { - free(connection->endpoints); - connection->endpoints = NULL; - connection->endpoint_count = 0; - } + connection->endpoint_count--; + } + else if (connection->endpoint_count == 1) + { + free(connection->endpoints); + connection->endpoints = NULL; + connection->endpoint_count = 0; + } free(endpoint); } @@ -1817,20 +1817,20 @@ if ((endpoint == NULL) || (performative == NULL)) { - LogError("Bad arguments: endpoint = %p, performative = %p", - endpoint, performative); - result = __FAILURE__; + LogError("Bad arguments: endpoint = %p, performative = %p", + endpoint, performative); + result = __FAILURE__; } else { - CONNECTION_HANDLE connection = (CONNECTION_HANDLE)endpoint->connection; + CONNECTION_HANDLE connection = (CONNECTION_HANDLE)endpoint->connection; AMQP_FRAME_CODEC_HANDLE amqp_frame_codec = connection->amqp_frame_codec; /* Codes_SRS_CONNECTION_01_254: [If connection_encode_frame is called before the connection is in the OPENED state, connection_encode_frame shall fail and return a non-zero value.] */ if (connection->connection_state != CONNECTION_STATE_OPENED) { - LogError("Connection not open"); - result = __FAILURE__; + LogError("Connection not open"); + result = __FAILURE__; } else { @@ -1843,8 +1843,8 @@ if (amqp_frame_codec_encode_frame(amqp_frame_codec, endpoint->outgoing_channel, performative, payloads, payload_count, on_bytes_encoded, connection) != 0) { /* Codes_SRS_CONNECTION_01_253: [If amqp_frame_codec_begin_encode_frame or amqp_frame_codec_encode_payload_bytes fails, then connection_encode_frame shall fail and return a non-zero value.] */ - LogError("Encoding AMQP frame failed"); - result = __FAILURE__; + LogError("Encoding AMQP frame failed"); + result = __FAILURE__; } else { @@ -1855,8 +1855,8 @@ if (tickcounter_get_current_ms(connection->tick_counter, &connection->last_frame_sent_time) != 0) { - LogError("Getting tick counter value failed"); - result = __FAILURE__; + LogError("Getting tick counter value failed"); + result = __FAILURE__; } else { @@ -1873,12 +1873,12 @@ void connection_set_trace(CONNECTION_HANDLE connection, bool trace_on) { /* Codes_SRS_CONNECTION_07_002: [If connection is NULL then connection_set_trace shall do nothing.] */ - if (connection == NULL) - { - LogError("NULL connection"); - } - else - { + if (connection == NULL) + { + LogError("NULL connection"); + } + else + { /* Codes_SRS_CONNECTION_07_001: [connection_set_trace shall set the ability to turn on and off trace logging.] */ connection->is_trace_on = trace_on ? 1 : 0; }