A small memory footprint AMQP implimentation
Dependents: iothub_client_sample_amqp remote_monitoring simplesample_amqp
Diff: connection.c
- Revision:
- 43:4c1e4e94cdd3
- Parent:
- 41:0e723f9cbd89
- Child:
- 45:83b4eda4891c
--- a/connection.c Fri May 04 13:24:52 2018 -0700 +++ b/connection.c Mon Jun 11 15:39:52 2018 -0700 @@ -18,10 +18,10 @@ #include "azure_uamqp_c/amqpvalue_to_string.h" /* Requirements satisfied by the virtue of implementing the ISO:*/ -/* Codes_SRS_CONNECTION_01_088: [Any data appearing beyond the protocol header MUST match the version indicated by the protocol header.] */ -/* Codes_SRS_CONNECTION_01_015: [Implementations SHOULD NOT expect to be able to reuse open TCP sockets after close performatives have been exchanged.] */ +/* Codes_S_R_S_CONNECTION_01_088: [Any data appearing beyond the protocol header MUST match the version indicated by the protocol header.] */ +/* Codes_S_R_S_CONNECTION_01_015: [Implementations SHOULD NOT expect to be able to reuse open TCP sockets after close performatives have been exchanged.] */ -/* Codes_SRS_CONNECTION_01_087: [The protocol header consists of the upper case ASCII letters "AMQP" followed by a protocol id of zero, followed by three unsigned bytes representing the major, minor, and revision of the protocol version (currently 1 (MAJOR), 0 (MINOR), 0 (REVISION)). In total this is an 8-octet sequence] */ +/* Codes_S_R_S_CONNECTION_01_087: [The protocol header consists of the upper case ASCII letters "AMQP" followed by a protocol id of zero, followed by three unsigned bytes representing the major, minor, and revision of the protocol version (currently 1 (MAJOR), 0 (MINOR), 0 (REVISION)). In total this is an 8-octet sequence] */ static const unsigned char amqp_header[] = { 'A', 'M', 'Q', 'P', 0, 1, 0, 0 }; typedef enum RECEIVE_FRAME_STATE_TAG @@ -30,6 +30,12 @@ RECEIVE_FRAME_STATE_FRAME_DATA } RECEIVE_FRAME_STATE; +typedef struct ON_CONNECTION_CLOSED_EVENT_SUBSCRIPTION_TAG +{ + ON_CONNECTION_CLOSE_RECEIVED on_connection_close_received; + void* context; +} ON_CONNECTION_CLOSED_EVENT_SUBSCRIPTION; + typedef struct ENDPOINT_INSTANCE_TAG { uint16_t incoming_channel; @@ -65,6 +71,8 @@ ON_IO_ERROR on_io_error; void* on_io_error_callback_context; + ON_CONNECTION_CLOSED_EVENT_SUBSCRIPTION on_connection_close_received_event_subscription; + /* options */ uint32_t max_frame_size; uint16_t channel_max; @@ -82,7 +90,7 @@ unsigned int is_trace_on : 1; } CONNECTION_INSTANCE; -/* Codes_SRS_CONNECTION_01_258: [on_connection_state_changed shall be invoked whenever the connection state changes.]*/ +/* Codes_S_R_S_CONNECTION_01_258: [on_connection_state_changed shall be invoked whenever the connection state changes.]*/ static void connection_set_state(CONNECTION_HANDLE connection, CONNECTION_STATE connection_state) { uint64_t i; @@ -90,21 +98,24 @@ CONNECTION_STATE previous_state = connection->connection_state; connection->connection_state = connection_state; - /* Codes_SRS_CONNECTION_22_001: [If a connection state changed occurs and a callback is registered the callback shall be called.] */ + /* Codes_S_R_S_CONNECTION_22_001: [If a connection state changed occurs and a callback is registered the callback shall be called.] */ if (connection->on_connection_state_changed) { connection->on_connection_state_changed(connection->on_connection_state_changed_callback_context, connection_state, previous_state); } - /* Codes_SRS_CONNECTION_01_260: [Each endpoint's on_connection_state_changed shall be called.] */ + /* Codes_S_R_S_CONNECTION_01_260: [Each endpoint's on_connection_state_changed shall be called.] */ for (i = 0; i < connection->endpoint_count; i++) { - /* Codes_SRS_CONNECTION_01_259: [The callback_context passed in connection_create_endpoint.] */ - connection->endpoints[i]->on_connection_state_changed(connection->endpoints[i]->callback_context, connection_state, previous_state); + /* Codes_S_R_S_CONNECTION_01_259: [The callback_context passed in connection_create_endpoint.] */ + if (connection->endpoints[i]->on_connection_state_changed != NULL) + { + connection->endpoints[i]->on_connection_state_changed(connection->endpoints[i]->callback_context, connection_state, previous_state); + } } } -// This callback usage needs to be either verified and commented or integrated into +// This callback usage needs to be either verified and commented or integrated into // the state machine. static void unchecked_on_send_complete(void* context, IO_SEND_RESULT send_result) { @@ -116,20 +127,20 @@ { int result; - /* Codes_SRS_CONNECTION_01_093: [_ When the client opens a new socket connection to a server, it MUST send a protocol header with the client's preferred protocol version.] */ - /* Codes_SRS_CONNECTION_01_104: [Sending the protocol header shall be done by using xio_send.] */ + /* Codes_S_R_S_CONNECTION_01_093: [_ When the client opens a new socket connection to a server, it MUST send a protocol header with the client's preferred protocol version.] */ + /* Codes_S_R_S_CONNECTION_01_104: [Sending the protocol header shall be done by using xio_send.] */ if (xio_send(connection->io, amqp_header, sizeof(amqp_header), unchecked_on_send_complete, NULL) != 0) { - /* Codes_SRS_CONNECTION_01_106: [When sending the protocol header fails, the connection shall be immediately closed.] */ + /* Codes_S_R_S_CONNECTION_01_106: [When sending the protocol header fails, the connection shall be immediately closed.] */ if (xio_close(connection->io, NULL, NULL) != 0) { LogError("xio_close failed"); } - /* Codes_SRS_CONNECTION_01_057: [END In this state it is illegal for either endpoint to write anything more onto the connection. The connection can be safely closed and discarded.] */ + /* Codes_S_R_S_CONNECTION_01_057: [END In this state it is illegal for either endpoint to write anything more onto the connection. The connection can be safely closed and discarded.] */ connection_set_state(connection, CONNECTION_STATE_END); - /* Codes_SRS_CONNECTION_01_105: [When xio_send fails, connection_dowork shall return a non-zero value.] */ + /* Codes_S_R_S_CONNECTION_01_105: [When xio_send fails, connection_dowork shall return a non-zero value.] */ result = __FAILURE__; } else @@ -139,7 +150,7 @@ LOG(AZ_LOG_TRACE, LOG_LINE, "-> Header (AMQP 0.1.0.0)"); } - /* Codes_SRS_CONNECTION_01_041: [HDR SENT In this state the connection header has been sent to the peer but no connection header has been received.] */ + /* Codes_S_R_S_CONNECTION_01_041: [HDR SENT In this state the connection header has been sent to the peer but no connection header has been received.] */ connection_set_state(connection, CONNECTION_STATE_HDR_SENT); result = 0; } @@ -147,6 +158,7 @@ return result; } +#ifndef NO_LOGGING static const char* get_frame_type_as_string(AMQP_VALUE descriptor) { const char* result; @@ -194,6 +206,7 @@ return result; } +#endif // NO_LOGGING static void log_incoming_frame(AMQP_VALUE performative) { @@ -248,7 +261,7 @@ static void on_bytes_encoded(void* context, const unsigned char* bytes, size_t length, bool encode_complete) { CONNECTION_HANDLE connection = (CONNECTION_HANDLE)context; - if (xio_send(connection->io, bytes, length, + if (xio_send(connection->io, bytes, length, (encode_complete && connection->on_send_complete != NULL) ? connection->on_send_complete : unchecked_on_send_complete, connection->on_send_complete_callback_context) != 0) { @@ -267,12 +280,12 @@ { int result; - /* Codes_SRS_CONNECTION_01_151: [The connection max_frame_size setting shall be passed down to the frame_codec when the Open frame is sent.] */ + /* Codes_S_R_S_CONNECTION_01_151: [The connection max_frame_size setting shall be passed down to the frame_codec when the Open frame is sent.] */ if (frame_codec_set_max_frame_size(connection->frame_codec, connection->max_frame_size) != 0) { LogError("Cannot set max frame size"); - /* Codes_SRS_CONNECTION_01_207: [If frame_codec_set_max_frame_size fails the connection shall be closed and the state set to END.] */ + /* Codes_S_R_S_CONNECTION_01_207: [If frame_codec_set_max_frame_size fails the connection shall be closed and the state set to END.] */ if (xio_close(connection->io, NULL, NULL) != 0) { LogError("xio_close failed"); @@ -283,13 +296,13 @@ } else { - /* Codes_SRS_CONNECTION_01_134: [The container id field shall be filled with the container id specified in connection_create.] */ + /* Codes_S_R_S_CONNECTION_01_134: [The container id field shall be filled with the container id specified in connection_create.] */ OPEN_HANDLE open_performative = open_create(connection->container_id); if (open_performative == NULL) { LogError("Cannot create OPEN performative"); - - /* Codes_SRS_CONNECTION_01_208: [If the open frame cannot be constructed, the connection shall be closed and set to the END state.] */ + + /* Codes_S_R_S_CONNECTION_01_208: [If the open frame cannot be constructed, the connection shall be closed and set to the END state.] */ if (xio_close(connection->io, NULL, NULL) != 0) { LogError("xio_close failed"); @@ -300,12 +313,12 @@ } else { - /* Codes_SRS_CONNECTION_01_137: [The max_frame_size connection setting shall be set in the open frame by using open_set_max_frame_size.] */ + /* Codes_S_R_S_CONNECTION_01_137: [The max_frame_size connection setting shall be set in the open frame by using open_set_max_frame_size.] */ if (open_set_max_frame_size(open_performative, connection->max_frame_size) != 0) { LogError("Cannot set max frame size"); - /* Codes_SRS_CONNECTION_01_208: [If the open frame cannot be constructed, the connection shall be closed and set to the END state.] */ + /* Codes_S_R_S_CONNECTION_01_208: [If the open frame cannot be constructed, the connection shall be closed and set to the END state.] */ if (xio_close(connection->io, NULL, NULL) != 0) { LogError("xio_close failed"); @@ -314,12 +327,12 @@ connection_set_state(connection, CONNECTION_STATE_END); result = __FAILURE__; } - /* Codes_SRS_CONNECTION_01_139: [The channel_max connection setting shall be set in the open frame by using open_set_channel_max.] */ + /* Codes_S_R_S_CONNECTION_01_139: [The channel_max connection setting shall be set in the open frame by using open_set_channel_max.] */ else if (open_set_channel_max(open_performative, connection->channel_max) != 0) { LogError("Cannot set max channel"); - /* Codes_SRS_CONNECTION_01_208: [If the open frame cannot be constructed, the connection shall be closed and set to the END state.] */ + /* Codes_S_R_S_CONNECTION_01_208: [If the open frame cannot be constructed, the connection shall be closed and set to the END state.] */ if (xio_close(connection->io, NULL, NULL) != 0) { LogError("xio_close failed"); @@ -328,12 +341,12 @@ connection_set_state(connection, CONNECTION_STATE_END); result = __FAILURE__; } - /* Codes_SRS_CONNECTION_01_142: [If no idle_timeout value has been specified, no value shall be stamped in the open frame (no call to open_set_idle_time_out shall be made).] */ + /* Codes_S_R_S_CONNECTION_01_142: [If no idle_timeout value has been specified, no value shall be stamped in the open frame (no call to open_set_idle_time_out shall be made).] */ else if ((connection->idle_timeout_specified) && - /* Codes_SRS_CONNECTION_01_141: [If idle_timeout has been specified by a call to connection_set_idle_timeout, then that value shall be stamped in the open frame.] */ + /* Codes_S_R_S_CONNECTION_01_141: [If idle_timeout has been specified by a call to connection_set_idle_timeout, then that value shall be stamped in the open frame.] */ (open_set_idle_time_out(open_performative, connection->idle_timeout) != 0)) { - /* Codes_SRS_CONNECTION_01_208: [If the open frame cannot be constructed, the connection shall be closed and set to the END state.] */ + /* Codes_S_R_S_CONNECTION_01_208: [If the open frame cannot be constructed, the connection shall be closed and set to the END state.] */ if (xio_close(connection->io, NULL, NULL) != 0) { LogError("xio_close failed"); @@ -342,14 +355,14 @@ connection_set_state(connection, CONNECTION_STATE_END); result = __FAILURE__; } - /* Codes_SRS_CONNECTION_01_136: [If no hostname value has been specified, no value shall be stamped in the open frame (no call to open_set_hostname shall be made).] */ + /* Codes_S_R_S_CONNECTION_01_136: [If no hostname value has been specified, no value shall be stamped in the open frame (no call to open_set_hostname shall be made).] */ else if ((connection->host_name != NULL) && - /* Codes_SRS_CONNECTION_01_135: [If hostname has been specified by a call to connection_set_hostname, then that value shall be stamped in the open frame.] */ + /* Codes_S_R_S_CONNECTION_01_135: [If hostname has been specified by a call to connection_set_hostname, then that value shall be stamped in the open frame.] */ (open_set_hostname(open_performative, connection->host_name) != 0)) { LogError("Cannot set hostname"); - /* Codes_SRS_CONNECTION_01_208: [If the open frame cannot be constructed, the connection shall be closed and set to the END state.] */ + /* Codes_S_R_S_CONNECTION_01_208: [If the open frame cannot be constructed, the connection shall be closed and set to the END state.] */ if (xio_close(connection->io, NULL, NULL) != 0) { LogError("xio_close failed"); @@ -358,14 +371,14 @@ connection_set_state(connection, CONNECTION_STATE_END); result = __FAILURE__; } - /* Codes_SRS_CONNECTION_01_243: [If no properties value has been specified, no value shall be stamped in the open frame (no call to open_set_properties shall be made).] */ + /* Codes_S_R_S_CONNECTION_01_243: [If no properties value has been specified, no value shall be stamped in the open frame (no call to open_set_properties shall be made).] */ else if ((connection->properties != NULL) && - /* Codes_SRS_CONNECTION_01_244: [If properties has been specified by a call to connection_set_properties, then that value shall be stamped in the open frame.] */ + /* Codes_S_R_S_CONNECTION_01_244: [If properties has been specified by a call to connection_set_properties, then that value shall be stamped in the open frame.] */ (open_set_properties(open_performative, connection->properties) != 0)) { LogError("Cannot set properties"); - /* Codes_SRS_CONNECTION_01_208: [If the open frame cannot be constructed, the connection shall be closed and set to the END state.] */ + /* Codes_S_R_S_CONNECTION_01_208: [If the open frame cannot be constructed, the connection shall be closed and set to the END state.] */ if (xio_close(connection->io, NULL, NULL) != 0) { LogError("xio_close failed"); @@ -381,7 +394,7 @@ { LogError("Cannot create OPEN AMQP value"); - /* Codes_SRS_CONNECTION_01_208: [If the open frame cannot be constructed, the connection shall be closed and set to the END state.] */ + /* Codes_S_R_S_CONNECTION_01_208: [If the open frame cannot be constructed, the connection shall be closed and set to the END state.] */ if (xio_close(connection->io, NULL, NULL) != 0) { LogError("xio_close failed"); @@ -392,18 +405,18 @@ } else { - /* Codes_SRS_CONNECTION_01_002: [Each AMQP connection begins with an exchange of capabilities and limitations, including the maximum frame size.] */ - /* Codes_SRS_CONNECTION_01_004: [After establishing or accepting a TCP connection and sending the protocol header, each peer MUST send an open frame before sending any other frames.] */ - /* Codes_SRS_CONNECTION_01_005: [The open frame describes the capabilities and limits of that peer.] */ - /* Codes_SRS_CONNECTION_01_205: [Sending the AMQP OPEN frame shall be done by calling amqp_frame_codec_begin_encode_frame with channel number 0, the actual performative payload and 0 as payload_size.] */ - /* Codes_SRS_CONNECTION_01_006: [The open frame can only be sent on channel 0.] */ + /* Codes_S_R_S_CONNECTION_01_002: [Each AMQP connection begins with an exchange of capabilities and limitations, including the maximum frame size.] */ + /* Codes_S_R_S_CONNECTION_01_004: [After establishing or accepting a TCP connection and sending the protocol header, each peer MUST send an open frame before sending any other frames.] */ + /* Codes_S_R_S_CONNECTION_01_005: [The open frame describes the capabilities and limits of that peer.] */ + /* Codes_S_R_S_CONNECTION_01_205: [Sending the AMQP OPEN frame shall be done by calling amqp_frame_codec_begin_encode_frame with channel number 0, the actual performative payload and 0 as payload_size.] */ + /* Codes_S_R_S_CONNECTION_01_006: [The open frame can only be sent on channel 0.] */ connection->on_send_complete = NULL; connection->on_send_complete_callback_context = NULL; if (amqp_frame_codec_encode_frame(connection->amqp_frame_codec, 0, open_performative_value, NULL, 0, on_bytes_encoded, connection) != 0) { LogError("amqp_frame_codec_encode_frame failed"); - /* Codes_SRS_CONNECTION_01_206: [If sending the frame fails, the connection shall be closed and state set to END.] */ + /* Codes_S_R_S_CONNECTION_01_206: [If sending the frame fails, the connection shall be closed and state set to END.] */ if (xio_close(connection->io, NULL, NULL) != 0) { LogError("xio_close failed"); @@ -419,7 +432,7 @@ log_outgoing_frame(open_performative_value); } - /* Codes_SRS_CONNECTION_01_046: [OPEN SENT In this state the connection headers have been exchanged. An open frame has been sent to the peer but no open frame has yet been received.] */ + /* Codes_S_R_S_CONNECTION_01_046: [OPEN SENT In this state the connection headers have been exchanged. An open frame has been sent to the peer but no open frame has yet been received.] */ connection_set_state(connection, CONNECTION_STATE_OPEN_SENT); result = 0; } @@ -440,7 +453,7 @@ int result; CLOSE_HANDLE close_performative; - /* Codes_SRS_CONNECTION_01_217: [The CLOSE frame shall be constructed by using close_create.] */ + /* Codes_S_R_S_CONNECTION_01_217: [The CLOSE frame shall be constructed by using close_create.] */ close_performative = close_create(); if (close_performative == NULL) { @@ -450,7 +463,7 @@ else { if ((error_handle != NULL) && - /* Codes_SRS_CONNECTION_01_238: [If set, this field indicates that the connection is being closed due to an error condition.] */ + /* Codes_S_R_S_CONNECTION_01_238: [If set, this field indicates that the connection is being closed due to an error condition.] */ (close_set_error(close_performative, error_handle) != 0)) { LogError("Cannot set error on CLOSE"); @@ -466,8 +479,8 @@ } else { - /* Codes_SRS_CONNECTION_01_215: [Sending the AMQP CLOSE frame shall be done by calling amqp_frame_codec_begin_encode_frame with channel number 0, the actual performative payload and 0 as payload_size.] */ - /* Codes_SRS_CONNECTION_01_013: [However, implementations SHOULD send it on channel 0] */ + /* Codes_S_R_S_CONNECTION_01_215: [Sending the AMQP CLOSE frame shall be done by calling amqp_frame_codec_begin_encode_frame with channel number 0, the actual performative payload and 0 as payload_size.] */ + /* Codes_S_R_S_CONNECTION_01_013: [However, implementations SHOULD send it on channel 0] */ connection->on_send_complete = NULL; connection->on_send_complete_callback_context = NULL; if (amqp_frame_codec_encode_frame(connection->amqp_frame_codec, 0, close_performative_value, NULL, 0, on_bytes_encoded, connection) != 0) @@ -495,12 +508,13 @@ return result; } -static void close_connection_with_error(CONNECTION_HANDLE connection, const char* condition_value, const char* description) +static void close_connection_with_error(CONNECTION_HANDLE connection, const char* condition_value, const char* description, AMQP_VALUE info) { ERROR_HANDLE error_handle = error_create(condition_value); + if (error_handle == NULL) { - /* Codes_SRS_CONNECTION_01_214: [If the close frame cannot be constructed or sent, the connection shall be closed and set to the END state.] */ + /* Codes_S_R_S_CONNECTION_01_214: [If the close frame cannot be constructed or sent, the connection shall be closed and set to the END state.] */ if (xio_close(connection->io, NULL, NULL) != 0) { LogError("xio_close failed"); @@ -510,12 +524,25 @@ } else { - /* Codes_SRS_CONNECTION_01_219: [The error description shall be set to an implementation defined string.] */ + /* Codes_S_R_S_CONNECTION_01_219: [The error description shall be set to an implementation defined string.] */ if (error_set_description(error_handle, description) != 0) { LogError("Cannot set error description on CLOSE frame"); - /* Codes_SRS_CONNECTION_01_214: [If the close frame cannot be constructed or sent, the connection shall be closed and set to the END state.] */ + /* Codes_S_R_S_CONNECTION_01_214: [If the close frame cannot be constructed or sent, the connection shall be closed and set to the END state.] */ + if (xio_close(connection->io, NULL, NULL) != 0) + { + LogError("xio_close failed"); + } + + connection_set_state(connection, CONNECTION_STATE_END); + } + else if ((info != NULL) && + (error_set_info(error_handle, info) != 0)) + { + LogError("Cannot set error info on CLOSE frame"); + + /* Codes_S_R_S_CONNECTION_01_214: [If the close frame cannot be constructed or sent, the connection shall be closed and set to the END state.] */ if (xio_close(connection->io, NULL, NULL) != 0) { LogError("xio_close failed"); @@ -527,7 +554,7 @@ { LogError("Cannot send CLOSE frame"); - /* Codes_SRS_CONNECTION_01_214: [If the close frame cannot be constructed or sent, the connection shall be closed and set to the END state.] */ + /* Codes_S_R_S_CONNECTION_01_214: [If the close frame cannot be constructed or sent, the connection shall be closed and set to the END state.] */ if (xio_close(connection->io, NULL, NULL) != 0) { LogError("xio_close failed"); @@ -537,9 +564,9 @@ } else { - /* Codes_SRS_CONNECTION_01_213: [When passing the bytes to frame_codec fails, a CLOSE frame shall be sent and the state shall be set to DISCARDING.] */ - /* Codes_SRS_CONNECTION_01_055: [DISCARDING The DISCARDING state is a variant of the CLOSE SENT state where the close is triggered by an error.] */ - /* Codes_SRS_CONNECTION_01_010: [After writing this frame the peer SHOULD continue to read from the connection until it receives the partner's close frame ] */ + /* Codes_S_R_S_CONNECTION_01_213: [When passing the bytes to frame_codec fails, a CLOSE frame shall be sent and the state shall be set to DISCARDING.] */ + /* Codes_S_R_S_CONNECTION_01_055: [DISCARDING The DISCARDING state is a variant of the CLOSE SENT state where the close is triggered by an error.] */ + /* Codes_S_R_S_CONNECTION_01_010: [After writing this frame the peer SHOULD continue to read from the connection until it receives the partner's close frame ] */ connection_set_state(connection, CONNECTION_STATE_DISCARDING); } @@ -610,14 +637,14 @@ result = __FAILURE__; break; - /* Codes_SRS_CONNECTION_01_039: [START In this state a connection exists, but nothing has been sent or received. This is the state an implementation would be in immediately after performing a socket connect or socket accept.] */ + /* Codes_S_R_S_CONNECTION_01_039: [START In this state a connection exists, but nothing has been sent or received. This is the state an implementation would be in immediately after performing a socket connect or socket accept.] */ case CONNECTION_STATE_START: - /* Codes_SRS_CONNECTION_01_041: [HDR SENT In this state the connection header has been sent to the peer but no connection header has been received.] */ + /* Codes_S_R_S_CONNECTION_01_041: [HDR SENT In this state the connection header has been sent to the peer but no connection header has been received.] */ case CONNECTION_STATE_HDR_SENT: if (b != amqp_header[connection->header_bytes_received]) { - /* Codes_SRS_CONNECTION_01_089: [If the incoming and outgoing protocol headers do not match, both peers MUST close their outgoing stream] */ + /* Codes_S_R_S_CONNECTION_01_089: [If the incoming and outgoing protocol headers do not match, both peers MUST close their outgoing stream] */ if (xio_close(connection->io, NULL, NULL) != 0) { LogError("xio_close failed"); @@ -649,28 +676,28 @@ } break; - /* Codes_SRS_CONNECTION_01_040: [HDR RCVD In this state the connection header has been received from the peer but a connection header has not been sent.] */ + /* Codes_S_R_S_CONNECTION_01_040: [HDR RCVD In this state the connection header has been received from the peer but a connection header has not been sent.] */ case CONNECTION_STATE_HDR_RCVD: - /* Codes_SRS_CONNECTION_01_042: [HDR EXCH In this state the connection header has been sent to the peer and a connection header has been received from the peer.] */ + /* Codes_S_R_S_CONNECTION_01_042: [HDR EXCH In this state the connection header has been sent to the peer and a connection header has been received from the peer.] */ /* we should not really get into this state, but just in case, we would treat that in the same way as HDR_RCVD */ case CONNECTION_STATE_HDR_EXCH: - /* Codes_SRS_CONNECTION_01_045: [OPEN RCVD In this state the connection headers have been exchanged. An open frame has been received from the peer but an open frame has not been sent.] */ + /* Codes_S_R_S_CONNECTION_01_045: [OPEN RCVD In this state the connection headers have been exchanged. An open frame has been received from the peer but an open frame has not been sent.] */ case CONNECTION_STATE_OPEN_RCVD: - /* Codes_SRS_CONNECTION_01_046: [OPEN SENT In this state the connection headers have been exchanged. An open frame has been sent to the peer but no open frame has yet been received.] */ + /* Codes_S_R_S_CONNECTION_01_046: [OPEN SENT In this state the connection headers have been exchanged. An open frame has been sent to the peer but no open frame has yet been received.] */ case CONNECTION_STATE_OPEN_SENT: - /* Codes_SRS_CONNECTION_01_048: [OPENED In this state the connection header and the open frame have been both sent and received.] */ + /* Codes_S_R_S_CONNECTION_01_048: [OPENED In this state the connection header and the open frame have been both sent and received.] */ case CONNECTION_STATE_OPENED: - /* Codes_SRS_CONNECTION_01_212: [After the initial handshake has been done all bytes received from the io instance shall be passed to the frame_codec for decoding by calling frame_codec_receive_bytes.] */ + /* Codes_S_R_S_CONNECTION_01_212: [After the initial handshake has been done all bytes received from the io instance shall be passed to the frame_codec for decoding by calling frame_codec_receive_bytes.] */ if (frame_codec_receive_bytes(connection->frame_codec, &b, 1) != 0) { LogError("Cannot process received bytes"); - /* Codes_SRS_CONNECTION_01_218: [The error amqp:internal-error shall be set in the error.condition field of the CLOSE frame.] */ - /* Codes_SRS_CONNECTION_01_219: [The error description shall be set to an implementation defined string.] */ - close_connection_with_error(connection, "amqp:internal-error", "connection_byte_received::frame_codec_receive_bytes failed"); + /* Codes_S_R_S_CONNECTION_01_218: [The error amqp:internal-error shall be set in the error.condition field of the CLOSE frame.] */ + /* Codes_S_R_S_CONNECTION_01_219: [The error description shall be set to an implementation defined string.] */ + close_connection_with_error(connection, "amqp:internal-error", "connection_byte_received::frame_codec_receive_bytes failed", NULL); result = __FAILURE__; } else @@ -704,7 +731,7 @@ if (io_open_result == IO_OPEN_OK) { - /* Codes_SRS_CONNECTION_01_084: [The connection_instance state machine implementing the protocol requirements shall be run as part of connection_dowork.] */ + /* Codes_S_R_S_CONNECTION_01_084: [The connection_instance state machine implementing the protocol requirements shall be run as part of connection_dowork.] */ switch (connection->connection_state) { default: @@ -712,8 +739,8 @@ break; case CONNECTION_STATE_START: - /* Codes_SRS_CONNECTION_01_086: [Prior to sending any frames on a connection_instance, each peer MUST start by sending a protocol header that indicates the protocol version used on the connection_instance.] */ - /* Codes_SRS_CONNECTION_01_091: [The AMQP peer which acted in the role of the TCP client (i.e. the peer that actively opened the connection_instance) MUST immediately send its outgoing protocol header on establishment of the TCP connection_instance.] */ + /* Codes_S_R_S_CONNECTION_01_086: [Prior to sending any frames on a connection_instance, each peer MUST start by sending a protocol header that indicates the protocol version used on the connection_instance.] */ + /* Codes_S_R_S_CONNECTION_01_091: [The AMQP peer which acted in the role of the TCP client (i.e. the peer that actively opened the connection_instance) MUST immediately send its outgoing protocol header on establishment of the TCP connection_instance.] */ if (send_header(connection) != 0) { LogError("Cannot send header"); @@ -726,9 +753,9 @@ break; case CONNECTION_STATE_HDR_EXCH: - /* Codes_SRS_CONNECTION_01_002: [Each AMQP connection_instance begins with an exchange of capabilities and limitations, including the maximum frame size.] */ - /* Codes_SRS_CONNECTION_01_004: [After establishing or accepting a TCP connection_instance and sending the protocol header, each peer MUST send an open frame before sending any other frames.] */ - /* Codes_SRS_CONNECTION_01_005: [The open frame describes the capabilities and limits of that peer.] */ + /* Codes_S_R_S_CONNECTION_01_002: [Each AMQP connection_instance begins with an exchange of capabilities and limitations, including the maximum frame size.] */ + /* Codes_S_R_S_CONNECTION_01_004: [After establishing or accepting a TCP connection_instance and sending the protocol header, each peer MUST send an open frame before sending any other frames.] */ + /* Codes_S_R_S_CONNECTION_01_005: [The open frame describes the capabilities and limits of that peer.] */ if (send_open_frame(connection) != 0) { LogError("Cannot send OPEN frame"); @@ -750,7 +777,7 @@ { CONNECTION_HANDLE connection = (CONNECTION_HANDLE)context; - /* Codes_SRS_CONNECTION_22_005: [If the io notifies the connection instance of an IO_STATE_ERROR state and an io error callback is registered, the connection shall call the registered callback.] */ + /* Codes_S_R_S_CONNECTION_22_005: [If the io notifies the connection instance of an IO_STATE_ERROR state and an io error callback is registered, the connection shall call the registered callback.] */ if (connection->on_io_error) { connection->on_io_error(connection->on_io_error_callback_context); @@ -758,7 +785,7 @@ if (connection->connection_state != CONNECTION_STATE_END) { - /* Codes_SRS_CONNECTION_01_202: [If the io notifies the connection instance of an IO_STATE_ERROR state the connection shall be closed and the state set to END.] */ + /* Codes_S_R_S_CONNECTION_01_202: [If the io notifies the connection instance of an IO_STATE_ERROR state the connection shall be closed and the state set to END.] */ connection_set_state(connection, CONNECTION_STATE_ERROR); if (xio_close(connection->io, NULL, NULL) != 0) { @@ -792,7 +819,7 @@ if (tickcounter_get_current_ms(connection->tick_counter, &connection->last_frame_received_time) != 0) { LogError("Cannot get tickcounter value"); - close_connection_with_error(connection, "amqp:internal-error", "cannot get current tick count"); + close_connection_with_error(connection, "amqp:internal-error", "cannot get current tick count", NULL); } else { @@ -803,8 +830,8 @@ default: if (performative == NULL) { - /* Codes_SRS_CONNECTION_01_223: [If the on_endpoint_frame_received is called with a NULL performative then the connection shall be closed with the error condition amqp:internal-error and an implementation defined error description.] */ - close_connection_with_error(connection, "amqp:internal-error", "connection_endpoint_frame_received::NULL performative"); + /* Codes_S_R_S_CONNECTION_01_223: [If the on_endpoint_frame_received is called with a NULL performative then the connection shall be closed with the error condition amqp:internal-error and an implementation defined error description.] */ + close_connection_with_error(connection, "amqp:internal-error", "connection_endpoint_frame_received::NULL performative", NULL); LogError("connection_endpoint_frame_received::NULL performative"); } else @@ -821,16 +848,16 @@ { if (channel != 0) { - /* Codes_SRS_CONNECTION_01_006: [The open frame can only be sent on channel 0.] */ - /* Codes_SRS_CONNECTION_01_222: [If an Open frame is received in a manner violating the ISO specification, the connection shall be closed with condition amqp:not-allowed and description being an implementation defined string.] */ - close_connection_with_error(connection, "amqp:not-allowed", "OPEN frame received on a channel that is not 0"); + /* Codes_S_R_S_CONNECTION_01_006: [The open frame can only be sent on channel 0.] */ + /* Codes_S_R_S_CONNECTION_01_222: [If an Open frame is received in a manner violating the ISO specification, the connection shall be closed with condition amqp:not-allowed and description being an implementation defined string.] */ + close_connection_with_error(connection, "amqp:not-allowed", "OPEN frame received on a channel that is not 0", NULL); LogError("OPEN frame received on a channel that is not 0"); } if (connection->connection_state == CONNECTION_STATE_OPENED) { - /* Codes_SRS_CONNECTION_01_239: [If an Open frame is received in the Opened state the connection shall be closed with condition amqp:illegal-state and description being an implementation defined string.] */ - close_connection_with_error(connection, "amqp:illegal-state", "OPEN frame received in the OPENED state"); + /* Codes_S_R_S_CONNECTION_01_239: [If an Open frame is received in the Opened state the connection shall be closed with condition amqp:illegal-state and description being an implementation defined string.] */ + close_connection_with_error(connection, "amqp:illegal-state", "OPEN frame received in the OPENED state", NULL); LogError("OPEN frame received in the OPENED state"); } else if ((connection->connection_state == CONNECTION_STATE_OPEN_SENT) || @@ -839,9 +866,9 @@ OPEN_HANDLE open_handle; if (amqpvalue_get_open(performative, &open_handle) != 0) { - /* Codes_SRS_CONNECTION_01_143: [If any of the values in the received open frame are invalid then the connection shall be closed.] */ - /* Codes_SRS_CONNECTION_01_220: [The error amqp:invalid-field shall be set in the error.condition field of the CLOSE frame.] */ - close_connection_with_error(connection, "amqp:invalid-field", "connection_endpoint_frame_received::failed parsing OPEN frame"); + /* Codes_S_R_S_CONNECTION_01_143: [If any of the values in the received open frame are invalid then the connection shall be closed.] */ + /* Codes_S_R_S_CONNECTION_01_220: [The error amqp:invalid-field shall be set in the error.condition field of the CLOSE frame.] */ + close_connection_with_error(connection, "amqp:invalid-field", "connection_endpoint_frame_received::failed parsing OPEN frame", NULL); LogError("connection_endpoint_frame_received::failed parsing OPEN frame"); } else @@ -853,12 +880,12 @@ } if ((open_get_max_frame_size(open_handle, &connection->remote_max_frame_size) != 0) || - /* Codes_SRS_CONNECTION_01_167: [Both peers MUST accept frames of up to 512 (MIN-MAX-FRAME-SIZE) octets.] */ + /* Codes_S_R_S_CONNECTION_01_167: [Both peers MUST accept frames of up to 512 (MIN-MAX-FRAME-SIZE) octets.] */ (connection->remote_max_frame_size < 512)) { - /* Codes_SRS_CONNECTION_01_143: [If any of the values in the received open frame are invalid then the connection shall be closed.] */ - /* Codes_SRS_CONNECTION_01_220: [The error amqp:invalid-field shall be set in the error.condition field of the CLOSE frame.] */ - close_connection_with_error(connection, "amqp:invalid-field", "connection_endpoint_frame_received::failed parsing OPEN frame"); + /* Codes_S_R_S_CONNECTION_01_143: [If any of the values in the received open frame are invalid then the connection shall be closed.] */ + /* Codes_S_R_S_CONNECTION_01_220: [The error amqp:invalid-field shall be set in the error.condition field of the CLOSE frame.] */ + close_connection_with_error(connection, "amqp:invalid-field", "connection_endpoint_frame_received::failed parsing OPEN frame", NULL); LogError("connection_endpoint_frame_received::failed parsing OPEN frame"); } else @@ -890,18 +917,17 @@ } else if (is_close_type_by_descriptor(descriptor)) { - /* Codes_SRS_CONNECTION_01_012: [A close frame MAY be received on any channel up to the maximum channel number negotiated in open.] */ - /* Codes_SRS_CONNECTION_01_242: [The connection module shall accept CLOSE frames even if they have extra payload bytes besides the Close performative.] */ + /* Codes_S_R_S_CONNECTION_01_242: [The connection module shall accept CLOSE frames even if they have extra payload bytes besides the Close performative.] */ - /* Codes_SRS_CONNECTION_01_225: [HDR_RCVD HDR OPEN] */ + /* Codes_S_R_S_CONNECTION_01_225: [HDR_RCVD HDR OPEN] */ if ((connection->connection_state == CONNECTION_STATE_HDR_RCVD) || - /* Codes_SRS_CONNECTION_01_227: [HDR_EXCH OPEN OPEN] */ + /* Codes_S_R_S_CONNECTION_01_227: [HDR_EXCH OPEN OPEN] */ (connection->connection_state == CONNECTION_STATE_HDR_EXCH) || - /* Codes_SRS_CONNECTION_01_228: [OPEN_RCVD OPEN *] */ + /* Codes_S_R_S_CONNECTION_01_228: [OPEN_RCVD OPEN *] */ (connection->connection_state == CONNECTION_STATE_OPEN_RCVD) || - /* Codes_SRS_CONNECTION_01_235: [CLOSE_SENT - * TCP Close for Write] */ + /* Codes_S_R_S_CONNECTION_01_235: [CLOSE_SENT - * TCP Close for Write] */ (connection->connection_state == CONNECTION_STATE_CLOSE_SENT) || - /* Codes_SRS_CONNECTION_01_236: [DISCARDING - * TCP Close for Write] */ + /* Codes_S_R_S_CONNECTION_01_236: [DISCARDING - * TCP Close for Write] */ (connection->connection_state == CONNECTION_STATE_DISCARDING)) { if (xio_close(connection->io, NULL, NULL) != 0) @@ -913,21 +939,28 @@ { CLOSE_HANDLE close_handle; - /* Codes_SRS_CONNECTION_01_012: [A close frame MAY be received on any channel up to the maximum channel number negotiated in open.] */ + /* Codes_S_R_S_CONNECTION_01_012: [A close frame MAY be received on any channel up to the maximum channel number negotiated in open.] */ if (channel > connection->channel_max) { - close_connection_with_error(connection, "amqp:invalid-field", "connection_endpoint_frame_received::failed parsing CLOSE frame"); + close_connection_with_error(connection, "amqp:invalid-field", "connection_endpoint_frame_received::failed parsing CLOSE frame", NULL); LogError("connection_endpoint_frame_received::failed parsing CLOSE frame"); } else { if (amqpvalue_get_close(performative, &close_handle) != 0) { - close_connection_with_error(connection, "amqp:invalid-field", "connection_endpoint_frame_received::failed parsing CLOSE frame"); + close_connection_with_error(connection, "amqp:invalid-field", "connection_endpoint_frame_received::failed parsing CLOSE frame", NULL); LogError("connection_endpoint_frame_received::failed parsing CLOSE frame"); } else { + ERROR_HANDLE error; + + if (close_get_error(close_handle, &error) != 0) + { + error = NULL; + } + close_destroy(close_handle); connection_set_state(connection, CONNECTION_STATE_CLOSE_RCVD); @@ -937,13 +970,20 @@ LogError("Cannot send CLOSE frame"); } - /* Codes_SRS_CONNECTION_01_214: [If the close frame cannot be constructed or sent, the connection shall be closed and set to the END state.] */ + /* Codes_S_R_S_CONNECTION_01_214: [If the close frame cannot be constructed or sent, the connection shall be closed and set to the END state.] */ if (xio_close(connection->io, NULL, NULL) != 0) { LogError("xio_close failed"); } connection_set_state(connection, CONNECTION_STATE_END); + + if (connection->on_connection_close_received_event_subscription.on_connection_close_received != NULL) + { + connection->on_connection_close_received_event_subscription.on_connection_close_received(connection->on_connection_close_received_event_subscription.context, error); + } + + error_destroy(error); } } } @@ -1039,17 +1079,17 @@ break; case CONNECTION_STATE_START: - /* Codes_SRS_CONNECTION_01_224: [START HDR HDR] */ + /* Codes_S_R_S_CONNECTION_01_224: [START HDR HDR] */ case CONNECTION_STATE_HDR_SENT: - /* Codes_SRS_CONNECTION_01_226: [HDR_SENT OPEN HDR] */ + /* Codes_S_R_S_CONNECTION_01_226: [HDR_SENT OPEN HDR] */ case CONNECTION_STATE_OPEN_PIPE: - /* Codes_SRS_CONNECTION_01_230: [OPEN_PIPE ** HDR] */ + /* Codes_S_R_S_CONNECTION_01_230: [OPEN_PIPE ** HDR] */ case CONNECTION_STATE_OC_PIPE: - /* Codes_SRS_CONNECTION_01_232: [OC_PIPE - HDR TCP Close for Write] */ + /* Codes_S_R_S_CONNECTION_01_232: [OC_PIPE - HDR TCP Close for Write] */ case CONNECTION_STATE_CLOSE_RCVD: - /* Codes_SRS_CONNECTION_01_234: [CLOSE_RCVD * - TCP Close for Read] */ + /* Codes_S_R_S_CONNECTION_01_234: [CLOSE_RCVD * - TCP Close for Read] */ case CONNECTION_STATE_END: - /* Codes_SRS_CONNECTION_01_237: [END - - TCP Close] */ + /* Codes_S_R_S_CONNECTION_01_237: [END - - TCP Close] */ if (xio_close(connection->io, NULL, NULL) != 0) { LogError("xio_close failed"); @@ -1062,7 +1102,7 @@ static void frame_codec_error(void* context) { - /* Bug: some error handling should happen here + /* Bug: some error handling should happen here Filed: uAMQP: frame_codec error and amqp_frame_codec_error should handle the errors */ LogError("A frame_codec_error occured"); (void)context; @@ -1076,14 +1116,14 @@ (void)context; } -/* Codes_SRS_CONNECTION_01_001: [connection_create shall open a new connection to a specified host/port.] */ +/* Codes_S_R_S_CONNECTION_01_001: [connection_create shall open a new connection to a specified host/port.] */ CONNECTION_HANDLE connection_create(XIO_HANDLE xio, const char* hostname, const char* container_id, ON_NEW_ENDPOINT on_new_endpoint, void* callback_context) { return connection_create2(xio, hostname, container_id, on_new_endpoint, callback_context, NULL, NULL, NULL, NULL); } -/* Codes_SRS_CONNECTION_01_001: [connection_create shall open a new connection to a specified host/port.] */ -/* Codes_SRS_CONNECTION_22_002: [connection_create shall allow registering connections state and io error callbacks.] */ +/* Codes_S_R_S_CONNECTION_01_001: [connection_create shall open a new connection to a specified host/port.] */ +/* Codes_S_R_S_CONNECTION_22_002: [connection_create shall allow registering connections state and io error callbacks.] */ CONNECTION_HANDLE connection_create2(XIO_HANDLE xio, const char* hostname, const char* container_id, ON_NEW_ENDPOINT on_new_endpoint, void* callback_context, ON_CONNECTION_STATE_CHANGED on_connection_state_changed, void* on_connection_state_changed_context, ON_IO_ERROR on_io_error, void* on_io_error_context) { CONNECTION_HANDLE connection; @@ -1091,7 +1131,7 @@ if ((xio == NULL) || (container_id == NULL)) { - /* Codes_SRS_CONNECTION_01_071: [If xio or container_id is NULL, connection_create shall return NULL.] */ + /* Codes_S_R_S_CONNECTION_01_071: [If xio or container_id is NULL, connection_create shall return NULL.] */ LogError("Bad arguments: xio = %p, container_id = %p", xio, container_id); connection = NULL; @@ -1099,7 +1139,7 @@ else { connection = (CONNECTION_HANDLE)malloc(sizeof(CONNECTION_INSTANCE)); - /* Codes_SRS_CONNECTION_01_081: [If allocating the memory for the connection fails then connection_create shall return NULL.] */ + /* Codes_S_R_S_CONNECTION_01_081: [If allocating the memory for the connection fails then connection_create shall return NULL.] */ if (connection == NULL) { LogError("Cannot allocate memory for connection"); @@ -1108,11 +1148,11 @@ { connection->io = xio; - /* Codes_SRS_CONNECTION_01_082: [connection_create shall allocate a new frame_codec instance to be used for frame encoding/decoding.] */ + /* Codes_S_R_S_CONNECTION_01_082: [connection_create shall allocate a new frame_codec instance to be used for frame encoding/decoding.] */ connection->frame_codec = frame_codec_create(frame_codec_error, connection); if (connection->frame_codec == NULL) { - /* Codes_SRS_CONNECTION_01_083: [If frame_codec_create fails then connection_create shall return NULL.] */ + /* Codes_S_R_S_CONNECTION_01_083: [If frame_codec_create fails then connection_create shall return NULL.] */ LogError("Cannot create frame_codec"); free(connection); connection = NULL; @@ -1122,7 +1162,7 @@ connection->amqp_frame_codec = amqp_frame_codec_create(connection->frame_codec, on_amqp_frame_received, on_empty_amqp_frame_received, amqp_frame_codec_error, connection); if (connection->amqp_frame_codec == NULL) { - /* Codes_SRS_CONNECTION_01_108: [If amqp_frame_codec_create fails, connection_create shall return NULL.] */ + /* Codes_S_R_S_CONNECTION_01_108: [If amqp_frame_codec_create fails, connection_create shall return NULL.] */ LogError("Cannot create amqp_frame_codec"); frame_codec_destroy(connection->frame_codec); free(connection); @@ -1136,7 +1176,7 @@ connection->host_name = (char*)malloc(hostname_length + 1); if (connection->host_name == NULL) { - /* Codes_SRS_CONNECTION_01_081: [If allocating the memory for the connection fails then connection_create shall return NULL.] */ + /* Codes_S_R_S_CONNECTION_01_081: [If allocating the memory for the connection fails then connection_create shall return NULL.] */ LogError("Cannot allocate memory for host name"); amqp_frame_codec_destroy(connection->amqp_frame_codec); frame_codec_destroy(connection->frame_codec); @@ -1159,7 +1199,7 @@ connection->container_id = (char*)malloc(container_id_length + 1); if (connection->container_id == NULL) { - /* Codes_SRS_CONNECTION_01_081: [If allocating the memory for the connection fails then connection_create shall return NULL.] */ + /* Codes_S_R_S_CONNECTION_01_081: [If allocating the memory for the connection fails then connection_create shall return NULL.] */ LogError("Cannot allocate memory for container_id"); free(connection->host_name); amqp_frame_codec_destroy(connection->amqp_frame_codec); @@ -1184,13 +1224,13 @@ { (void)memcpy(connection->container_id, container_id, container_id_length + 1); - /* Codes_SRS_CONNECTION_01_173: [<field name="max-frame-size" type="uint" default="4294967295"/>] */ + /* Codes_S_R_S_CONNECTION_01_173: [<field name="max-frame-size" type="uint" default="4294967295"/>] */ connection->max_frame_size = 4294967295u; /* Codes: [<field name="channel-max" type="ushort" default="65535"/>] */ connection->channel_max = 65535; - /* Codes_SRS_CONNECTION_01_175: [<field name="idle-time-out" type="milliseconds"/>] */ - /* Codes_SRS_CONNECTION_01_192: [A value of zero is the same as if it was not set (null).] */ + /* Codes_S_R_S_CONNECTION_01_175: [<field name="idle-time-out" type="milliseconds"/>] */ + /* Codes_S_R_S_CONNECTION_01_192: [A value of zero is the same as if it was not set (null).] */ connection->idle_timeout = 0; connection->remote_idle_timeout = 0; connection->remote_idle_timeout_send_frame_millisecond = 0; @@ -1212,6 +1252,9 @@ connection->on_new_endpoint = on_new_endpoint; connection->on_new_endpoint_callback_context = callback_context; + connection->on_connection_close_received_event_subscription.on_connection_close_received = NULL; + connection->on_connection_close_received_event_subscription.context = NULL; + connection->on_io_error = on_io_error; connection->on_io_error_callback_context = on_io_error_context; connection->on_connection_state_changed = on_connection_state_changed; @@ -1232,7 +1275,7 @@ { connection->last_frame_sent_time = connection->last_frame_received_time; - /* Codes_SRS_CONNECTION_01_072: [When connection_create succeeds, the state of the connection shall be CONNECTION_STATE_START.] */ + /* Codes_S_R_S_CONNECTION_01_072: [When connection_create succeeds, the state of the connection shall be CONNECTION_STATE_START.] */ connection_set_state(connection, CONNECTION_STATE_START); } } @@ -1248,17 +1291,17 @@ void connection_destroy(CONNECTION_HANDLE connection) { - /* Codes_SRS_CONNECTION_01_079: [If handle is NULL, connection_destroy shall do nothing.] */ + /* Codes_S_R_S_CONNECTION_01_079: [If handle is NULL, connection_destroy shall do nothing.] */ if (connection == NULL) { LogError("NULL connection"); } else { - /* Codes_SRS_CONNECTION_01_073: [connection_destroy shall free all resources associated with a connection.] */ + /* Codes_S_R_S_CONNECTION_01_073: [connection_destroy shall free all resources associated with a connection.] */ if (connection->is_underlying_io_open) { - (void)connection_close(connection, NULL, NULL); + (void)connection_close(connection, NULL, NULL, NULL); } amqp_frame_codec_destroy(connection->amqp_frame_codec); @@ -1272,7 +1315,7 @@ free(connection->host_name); free(connection->container_id); - /* Codes_SRS_CONNECTION_01_074: [connection_destroy shall close the socket connection.] */ + /* Codes_S_R_S_CONNECTION_01_074: [connection_destroy shall close the socket connection.] */ free(connection); } } @@ -1351,7 +1394,7 @@ return result; } -int connection_close(CONNECTION_HANDLE connection, const char* condition_value, const char* description) +int connection_close(CONNECTION_HANDLE connection, const char* condition_value, const char* description, AMQP_VALUE info) { int result; @@ -1360,11 +1403,18 @@ LogError("NULL connection"); result = __FAILURE__; } + else if ((info != NULL) && + (amqpvalue_get_type(info) != AMQP_TYPE_MAP) && + (amqpvalue_get_type(info) != AMQP_TYPE_NULL)) + { + LogError("Invalid info, expected a map"); + result = __FAILURE__; + } else { if (condition_value != NULL) { - close_connection_with_error(connection, condition_value, description); + close_connection_with_error(connection, condition_value, description, info); } else { @@ -1393,14 +1443,14 @@ { int result; - /* Codes_SRS_CONNECTION_01_163: [If connection is NULL, connection_set_max_frame_size shall fail and return a non-zero value.] */ + /* Codes_S_R_S_CONNECTION_01_163: [If connection is NULL, connection_set_max_frame_size shall fail and return a non-zero value.] */ if (connection == NULL) { LogError("NULL connection"); result = __FAILURE__; } - /* Codes_SRS_CONNECTION_01_150: [If the max_frame_size is invalid then connection_set_max_frame_size shall fail and return a non-zero value.] */ - /* Codes_SRS_CONNECTION_01_167: [Both peers MUST accept frames of up to 512 (MIN-MAX-FRAME-SIZE) octets.] */ + /* Codes_S_R_S_CONNECTION_01_150: [If the max_frame_size is invalid then connection_set_max_frame_size shall fail and return a non-zero value.] */ + /* Codes_S_R_S_CONNECTION_01_167: [Both peers MUST accept frames of up to 512 (MIN-MAX-FRAME-SIZE) octets.] */ else if (max_frame_size < 512) { LogError("max_frame_size too small"); @@ -1408,7 +1458,7 @@ } else { - /* Codes_SRS_CONNECTION_01_157: [If connection_set_max_frame_size is called after the initial Open frame has been sent, it shall fail and return a non-zero value.] */ + /* Codes_S_R_S_CONNECTION_01_157: [If connection_set_max_frame_size is called after the initial Open frame has been sent, it shall fail and return a non-zero value.] */ if (connection->connection_state != CONNECTION_STATE_START) { LogError("Connection already open"); @@ -1416,11 +1466,11 @@ } else { - /* Codes_SRS_CONNECTION_01_148: [connection_set_max_frame_size shall set the max_frame_size associated with a connection.] */ - /* Codes_SRS_CONNECTION_01_164: [If connection_set_max_frame_size fails, the previous max_frame_size setting shall be retained.] */ + /* Codes_S_R_S_CONNECTION_01_148: [connection_set_max_frame_size shall set the max_frame_size associated with a connection.] */ + /* Codes_S_R_S_CONNECTION_01_164: [If connection_set_max_frame_size fails, the previous max_frame_size setting shall be retained.] */ connection->max_frame_size = max_frame_size; - /* Codes_SRS_CONNECTION_01_149: [On success connection_set_max_frame_size shall return 0.] */ + /* Codes_S_R_S_CONNECTION_01_149: [On success connection_set_max_frame_size shall return 0.] */ result = 0; } } @@ -1432,7 +1482,7 @@ { int result; - /* Codes_SRS_CONNECTION_01_170: [If connection or max_frame_size is NULL, connection_get_max_frame_size shall fail and return a non-zero value.] */ + /* Codes_S_R_S_CONNECTION_01_170: [If connection or max_frame_size is NULL, connection_get_max_frame_size shall fail and return a non-zero value.] */ if ((connection == NULL) || (max_frame_size == NULL)) { @@ -1442,10 +1492,10 @@ } else { - /* Codes_SRS_CONNECTION_01_168: [connection_get_max_frame_size shall return in the max_frame_size argument the current max frame size setting.] */ + /* Codes_S_R_S_CONNECTION_01_168: [connection_get_max_frame_size shall return in the max_frame_size argument the current max frame size setting.] */ *max_frame_size = connection->max_frame_size; - /* Codes_SRS_CONNECTION_01_169: [On success, connection_get_max_frame_size shall return 0.] */ + /* Codes_S_R_S_CONNECTION_01_169: [On success, connection_get_max_frame_size shall return 0.] */ result = 0; } @@ -1456,7 +1506,7 @@ { int result; - /* Codes_SRS_CONNECTION_01_181: [If connection is NULL then connection_set_channel_max shall fail and return a non-zero value.] */ + /* Codes_S_R_S_CONNECTION_01_181: [If connection is NULL then connection_set_channel_max shall fail and return a non-zero value.] */ if (connection == NULL) { LogError("NULL connection"); @@ -1464,7 +1514,7 @@ } else { - /* Codes_SRS_CONNECTION_01_156: [If connection_set_channel_max is called after the initial Open frame has been sent, it shall fail and return a non-zero value.] */ + /* Codes_S_R_S_CONNECTION_01_156: [If connection_set_channel_max is called after the initial Open frame has been sent, it shall fail and return a non-zero value.] */ if (connection->connection_state != CONNECTION_STATE_START) { LogError("Connection already open"); @@ -1472,11 +1522,11 @@ } else { - /* Codes_SRS_CONNECTION_01_153: [connection_set_channel_max shall set the channel_max associated with a connection.] */ - /* Codes_SRS_CONNECTION_01_165: [If connection_set_channel_max fails, the previous channel_max setting shall be retained.] */ + /* Codes_S_R_S_CONNECTION_01_153: [connection_set_channel_max shall set the channel_max associated with a connection.] */ + /* Codes_S_R_S_CONNECTION_01_165: [If connection_set_channel_max fails, the previous channel_max setting shall be retained.] */ connection->channel_max = channel_max; - /* Codes_SRS_CONNECTION_01_154: [On success connection_set_channel_max shall return 0.] */ + /* Codes_S_R_S_CONNECTION_01_154: [On success connection_set_channel_max shall return 0.] */ result = 0; } } @@ -1488,7 +1538,7 @@ { int result; - /* Codes_SRS_CONNECTION_01_184: [If connection or channel_max is NULL, connection_get_channel_max shall fail and return a non-zero value.] */ + /* Codes_S_R_S_CONNECTION_01_184: [If connection or channel_max is NULL, connection_get_channel_max shall fail and return a non-zero value.] */ if ((connection == NULL) || (channel_max == NULL)) { @@ -1498,10 +1548,10 @@ } else { - /* Codes_SRS_CONNECTION_01_182: [connection_get_channel_max shall return in the channel_max argument the current channel_max setting.] */ + /* Codes_S_R_S_CONNECTION_01_182: [connection_get_channel_max shall return in the channel_max argument the current channel_max setting.] */ *channel_max = connection->channel_max; - /* Codes_SRS_CONNECTION_01_183: [On success, connection_get_channel_max shall return 0.] */ + /* Codes_S_R_S_CONNECTION_01_183: [On success, connection_get_channel_max shall return 0.] */ result = 0; } @@ -1512,7 +1562,7 @@ { int result; - /* Codes_SRS_CONNECTION_01_191: [If connection is NULL, connection_set_idle_timeout shall fail and return a non-zero value.] */ + /* Codes_S_R_S_CONNECTION_01_191: [If connection is NULL, connection_set_idle_timeout shall fail and return a non-zero value.] */ if (connection == NULL) { LogError("NULL connection"); @@ -1520,7 +1570,7 @@ } else { - /* Codes_SRS_CONNECTION_01_158: [If connection_set_idle_timeout is called after the initial Open frame has been sent, it shall fail and return a non-zero value.] */ + /* Codes_S_R_S_CONNECTION_01_158: [If connection_set_idle_timeout is called after the initial Open frame has been sent, it shall fail and return a non-zero value.] */ if (connection->connection_state != CONNECTION_STATE_START) { LogError("Connection already open"); @@ -1528,12 +1578,12 @@ } else { - /* Codes_SRS_CONNECTION_01_159: [connection_set_idle_timeout shall set the idle_timeout associated with a connection.] */ - /* Codes_SRS_CONNECTION_01_166: [If connection_set_idle_timeout fails, the previous idle_timeout setting shall be retained.] */ + /* Codes_S_R_S_CONNECTION_01_159: [connection_set_idle_timeout shall set the idle_timeout associated with a connection.] */ + /* Codes_S_R_S_CONNECTION_01_166: [If connection_set_idle_timeout fails, the previous idle_timeout setting shall be retained.] */ connection->idle_timeout = idle_timeout; connection->idle_timeout_specified = true; - /* Codes_SRS_CONNECTION_01_160: [On success connection_set_idle_timeout shall return 0.] */ + /* Codes_S_R_S_CONNECTION_01_160: [On success connection_set_idle_timeout shall return 0.] */ result = 0; } } @@ -1545,7 +1595,7 @@ { int result; - /* Codes_SRS_CONNECTION_01_190: [If connection or idle_timeout is NULL, connection_get_idle_timeout shall fail and return a non-zero value.] */ + /* Codes_S_R_S_CONNECTION_01_190: [If connection or idle_timeout is NULL, connection_get_idle_timeout shall fail and return a non-zero value.] */ if ((connection == NULL) || (idle_timeout == NULL)) { @@ -1555,10 +1605,10 @@ } else { - /* Codes_SRS_CONNECTION_01_188: [connection_get_idle_timeout shall return in the idle_timeout argument the current idle_timeout setting.] */ + /* Codes_S_R_S_CONNECTION_01_188: [connection_get_idle_timeout shall return in the idle_timeout argument the current idle_timeout setting.] */ *idle_timeout = connection->idle_timeout; - /* Codes_SRS_CONNECTION_01_189: [On success, connection_get_idle_timeout shall return 0.] */ + /* Codes_S_R_S_CONNECTION_01_189: [On success, connection_get_idle_timeout shall return 0.] */ result = 0; } @@ -1569,7 +1619,7 @@ { int result; - /* Codes_SRS_CONNECTION_01_261: [If connection is NULL, connection_set_properties shall fail and return a non-zero value.] */ + /* Codes_S_R_S_CONNECTION_01_261: [If connection is NULL, connection_set_properties shall fail and return a non-zero value.] */ if (connection == NULL) { LogError("NULL connection"); @@ -1577,7 +1627,7 @@ } else { - /* Codes_SRS_CONNECTION_01_262: [If connection_set_properties is called after the initial Open frame has been sent, it shall fail and return a non-zero value.] */ + /* Codes_S_R_S_CONNECTION_01_262: [If connection_set_properties is called after the initial Open frame has been sent, it shall fail and return a non-zero value.] */ if (connection->connection_state != CONNECTION_STATE_START) { LogError("Connection already open"); @@ -1587,32 +1637,32 @@ { if (properties == NULL) { - /* Codes_SRS_CONNECTION_01_263: [ If `properties` is NULL, the previously stored properties associated with `connection` shall be freed. ]*/ + /* Codes_S_R_S_CONNECTION_01_263: [ If `properties` is NULL, the previously stored properties associated with `connection` shall be freed. ]*/ if (connection->properties != NULL) { fields_destroy(connection->properties); connection->properties = NULL; } - /* Codes_SRS_CONNECTION_01_264: [ On success it shall return 0. ]*/ + /* Codes_S_R_S_CONNECTION_01_264: [ On success it shall return 0. ]*/ result = 0; } else { fields new_properties; - /* Codes_SRS_CONNECTION_01_265: [ `connection_set_properties` shall copy the contents of `properties` as the properties contents for the connection instance identified by `connection`. ]*/ - /* Codes_SRS_CONNECTION_01_266: [ Cloning the properties shall be done by calling `fields_clone`. ]*/ + /* Codes_S_R_S_CONNECTION_01_265: [ `connection_set_properties` shall copy the contents of `properties` as the properties contents for the connection instance identified by `connection`. ]*/ + /* Codes_S_R_S_CONNECTION_01_266: [ Cloning the properties shall be done by calling `fields_clone`. ]*/ new_properties = fields_clone(properties); if (new_properties == NULL) { - /* Codes_SRS_CONNECTION_01_267: [ If `fields_clone` fails, `connection_set_properties` shall fail and return a non-zero value. ]*/ + /* Codes_S_R_S_CONNECTION_01_267: [ If `fields_clone` fails, `connection_set_properties` shall fail and return a non-zero value. ]*/ LogError("Cannot clone connection properties"); result = __FAILURE__; } else { - /* Codes_SRS_CONNECTION_01_268: [ If setting the properties fails, the previous value shall be preserved. ]*/ + /* Codes_S_R_S_CONNECTION_01_268: [ If setting the properties fails, the previous value shall be preserved. ]*/ /* Only do the free of the previous value if we could clone the new one*/ if (connection->properties != NULL) { @@ -1621,7 +1671,7 @@ connection->properties = new_properties; - /* Codes_SRS_CONNECTION_01_264: [ On success it shall return 0. ]*/ + /* Codes_S_R_S_CONNECTION_01_264: [ On success it shall return 0. ]*/ result = 0; } } @@ -1635,7 +1685,7 @@ { int result; - /* Codes_SRS_CONNECTION_01_269: [If connection or properties is NULL, connection_get_properties shall fail and return a non-zero value.] */ + /* Codes_S_R_S_CONNECTION_01_269: [If connection or properties is NULL, connection_get_properties shall fail and return a non-zero value.] */ if ((connection == NULL) || (properties == NULL)) { @@ -1647,26 +1697,26 @@ { if (connection->properties == NULL) { - /* Codes_SRS_CONNECTION_01_270: [ If no properties have been set, `connection_get_properties` shall set `properties` to NULL. ]*/ + /* Codes_S_R_S_CONNECTION_01_270: [ If no properties have been set, `connection_get_properties` shall set `properties` to NULL. ]*/ *properties = NULL; - /* Codes_SRS_CONNECTION_01_271: [On success, connection_get_properties shall return 0.] */ + /* Codes_S_R_S_CONNECTION_01_271: [On success, connection_get_properties shall return 0.] */ result = 0; } else { - /* Codes_SRS_CONNECTION_01_272: [connection_get_properties shall return in the properties argument the current properties setting.] */ - /* Codes_SRS_CONNECTION_01_273: [ Cloning the properties shall be done by calling `fields_clone`. ]*/ + /* Codes_S_R_S_CONNECTION_01_272: [connection_get_properties shall return in the properties argument the current properties setting.] */ + /* Codes_S_R_S_CONNECTION_01_273: [ Cloning the properties shall be done by calling `fields_clone`. ]*/ *properties = fields_clone(connection->properties); if (*properties == NULL) { - /* Codes_SRS_CONNECTION_01_274: [ If `fields_clone` fails, `connection_get_properties` shall fail and return a non-zero value. ]*/ + /* Codes_S_R_S_CONNECTION_01_274: [ If `fields_clone` fails, `connection_get_properties` shall fail and return a non-zero value. ]*/ LogError("Cannot clone properties"); result = __FAILURE__; } else { - /* Codes_SRS_CONNECTION_01_271: [On success, connection_get_properties shall return 0.] */ + /* Codes_S_R_S_CONNECTION_01_271: [On success, connection_get_properties shall return 0.] */ result = 0; } } @@ -1712,7 +1762,7 @@ if (tickcounter_get_current_ms(connection->tick_counter, ¤t_ms) != 0) { LogError("Could not get tick counter value"); - close_connection_with_error(connection, "amqp:internal-error", "Could not get tick count"); + close_connection_with_error(connection, "amqp:internal-error", "Could not get tick count", NULL); } else { @@ -1730,7 +1780,7 @@ local_deadline = 0; /* close connection */ - close_connection_with_error(connection, "amqp:internal-error", "No frame received for the idle timeout"); + close_connection_with_error(connection, "amqp:internal-error", "No frame received for the idle timeout", NULL); } } @@ -1752,7 +1802,7 @@ { LogError("Encoding the empty frame failed"); /* close connection */ - close_connection_with_error(connection, "amqp:internal-error", "Cannot send empty frame"); + close_connection_with_error(connection, "amqp:internal-error", "Cannot send empty frame", NULL); } else { @@ -1776,7 +1826,7 @@ void connection_dowork(CONNECTION_HANDLE connection) { - /* Codes_SRS_CONNECTION_01_078: [If handle is NULL, connection_dowork shall do nothing.] */ + /* Codes_S_R_S_CONNECTION_01_078: [If handle is NULL, connection_dowork shall do nothing.] */ if (connection == NULL) { LogError("NULL connection"); @@ -1785,7 +1835,7 @@ { if (connection_handle_deadlines(connection) > 0) { - /* Codes_SRS_CONNECTION_01_076: [connection_dowork shall schedule the underlying IO interface to do its work by calling xio_dowork.] */ + /* Codes_S_R_S_CONNECTION_01_076: [connection_dowork shall schedule the underlying IO interface to do its work by calling xio_dowork.] */ xio_dowork(connection->io); } } @@ -1795,8 +1845,8 @@ { ENDPOINT_HANDLE result; - /* Codes_SRS_CONNECTION_01_113: [If connection, on_endpoint_frame_received or on_connection_state_changed is NULL, connection_create_endpoint shall fail and return NULL.] */ - /* Codes_SRS_CONNECTION_01_193: [The context argument shall be allowed to be NULL.] */ + /* Codes_S_R_S_CONNECTION_01_113: [If connection, on_endpoint_frame_received or on_connection_state_changed is NULL, connection_create_endpoint shall fail and return NULL.] */ + /* Codes_S_R_S_CONNECTION_01_193: [The context argument shall be allowed to be NULL.] */ if (connection == NULL) { LogError("NULL connection"); @@ -1804,7 +1854,7 @@ } else { - /* Codes_SRS_CONNECTION_01_115: [If no more endpoints can be created due to all channels being used, connection_create_endpoint shall fail and return NULL.] */ + /* Codes_S_R_S_CONNECTION_01_115: [If no more endpoints can be created due to all channels being used, connection_create_endpoint shall fail and return NULL.] */ if (connection->endpoint_count >= connection->channel_max) { result = NULL; @@ -1813,7 +1863,7 @@ { uint32_t i = 0; - /* Codes_SRS_CONNECTION_01_128: [The lowest number outgoing channel shall be associated with the newly created endpoint.] */ + /* Codes_S_R_S_CONNECTION_01_128: [The lowest number outgoing channel shall be associated with the newly created endpoint.] */ for (i = 0; i < connection->endpoint_count; i++) { if (connection->endpoints[i]->outgoing_channel > i) @@ -1823,9 +1873,9 @@ } } - /* Codes_SRS_CONNECTION_01_127: [On success, connection_create_endpoint shall return a non-NULL handle to the newly created endpoint.] */ + /* Codes_S_R_S_CONNECTION_01_127: [On success, connection_create_endpoint shall return a non-NULL handle to the newly created endpoint.] */ result = (ENDPOINT_HANDLE)malloc(sizeof(ENDPOINT_INSTANCE)); - /* Codes_SRS_CONNECTION_01_196: [If memory cannot be allocated for the new endpoint, connection_create_endpoint shall fail and return NULL.] */ + /* Codes_S_R_S_CONNECTION_01_196: [If memory cannot be allocated for the new endpoint, connection_create_endpoint shall fail and return NULL.] */ if (result == NULL) { LogError("Cannot allocate memory for endpoint"); @@ -1840,11 +1890,11 @@ result->outgoing_channel = (uint16_t)i; result->connection = connection; - /* Codes_SRS_CONNECTION_01_197: [The newly created endpoint shall be added to the endpoints list, so that it can be tracked.] */ + /* Codes_S_R_S_CONNECTION_01_197: [The newly created endpoint shall be added to the endpoints list, so that it can be tracked.] */ new_endpoints = (ENDPOINT_HANDLE*)realloc(connection->endpoints, sizeof(ENDPOINT_HANDLE) * (connection->endpoint_count + 1)); if (new_endpoints == NULL) { - /* Tests_SRS_CONNECTION_01_198: [If adding the endpoint to the endpoints list tracked by the connection fails, connection_create_endpoint shall fail and return NULL.] */ + /* Tests_S_R_S_CONNECTION_01_198: [If adding the endpoint to the endpoints list tracked by the connection fails, connection_create_endpoint shall fail and return NULL.] */ LogError("Cannot reallocate memory for connection endpoints"); free(result); result = NULL; @@ -1861,7 +1911,7 @@ connection->endpoints[i] = result; connection->endpoint_count++; - /* Codes_SRS_CONNECTION_01_112: [connection_create_endpoint shall create a new endpoint that can be used by a session.] */ + /* Codes_S_R_S_CONNECTION_01_112: [connection_create_endpoint shall create a new endpoint that can be used by a session.] */ } } } @@ -1914,7 +1964,7 @@ return result; } -/* Codes_SRS_CONNECTION_01_129: [connection_destroy_endpoint shall free all resources associated with an endpoint created by connection_create_endpoint.] */ +/* Codes_S_R_S_CONNECTION_01_129: [connection_destroy_endpoint shall free all resources associated with an endpoint created by connection_create_endpoint.] */ void connection_destroy_endpoint(ENDPOINT_HANDLE endpoint) { if (endpoint == NULL) @@ -1934,8 +1984,8 @@ } } - /* Codes_SRS_CONNECTION_01_130: [The outgoing channel associated with the endpoint shall be released by removing the endpoint from the endpoint list.] */ - /* Codes_SRS_CONNECTION_01_131: [Any incoming channel number associated with the endpoint shall be released.] */ + /* Codes_S_R_S_CONNECTION_01_130: [The outgoing channel associated with the endpoint shall be released by removing the endpoint from the endpoint list.] */ + /* Codes_S_R_S_CONNECTION_01_131: [Any incoming channel number associated with the endpoint shall be released.] */ if ((i < connection->endpoint_count) && (i > 0)) { ENDPOINT_HANDLE* new_endpoints; @@ -1960,12 +2010,12 @@ } } -/* Codes_SRS_CONNECTION_01_247: [connection_encode_frame shall send a frame for a certain endpoint.] */ +/* Codes_S_R_S_CONNECTION_01_247: [connection_encode_frame shall send a frame for a certain endpoint.] */ int connection_encode_frame(ENDPOINT_HANDLE endpoint, AMQP_VALUE performative, PAYLOAD* payloads, size_t payload_count, ON_SEND_COMPLETE on_send_complete, void* callback_context) { int result; - /* Codes_SRS_CONNECTION_01_249: [If endpoint or performative are NULL, connection_encode_frame shall fail and return a non-zero value.] */ + /* Codes_S_R_S_CONNECTION_01_249: [If endpoint or performative are NULL, connection_encode_frame shall fail and return a non-zero value.] */ if ((endpoint == NULL) || (performative == NULL)) { @@ -1978,7 +2028,7 @@ CONNECTION_HANDLE connection = (CONNECTION_HANDLE)endpoint->connection; AMQP_FRAME_CODEC_HANDLE amqp_frame_codec = connection->amqp_frame_codec; - /* Codes_SRS_CONNECTION_01_254: [If connection_encode_frame is called before the connection is in the OPENED state, connection_encode_frame shall fail and return a non-zero value.] */ + /* Codes_S_R_S_CONNECTION_01_254: [If connection_encode_frame is called before the connection is in the OPENED state, connection_encode_frame shall fail and return a non-zero value.] */ if (connection->connection_state != CONNECTION_STATE_OPENED) { LogError("Connection not open"); @@ -1986,15 +2036,15 @@ } else { - /* Codes_SRS_CONNECTION_01_255: [The payload size shall be computed based on all the payload chunks passed as argument in payloads.] */ - /* Codes_SRS_CONNECTION_01_250: [connection_encode_frame shall initiate the frame send by calling amqp_frame_codec_begin_encode_frame.] */ - /* Codes_SRS_CONNECTION_01_251: [The channel number passed to amqp_frame_codec_begin_encode_frame shall be the outgoing channel number associated with the endpoint by connection_create_endpoint.] */ - /* Codes_SRS_CONNECTION_01_252: [The performative passed to amqp_frame_codec_begin_encode_frame shall be the performative argument of connection_encode_frame.] */ + /* Codes_S_R_S_CONNECTION_01_255: [The payload size shall be computed based on all the payload chunks passed as argument in payloads.] */ + /* Codes_S_R_S_CONNECTION_01_250: [connection_encode_frame shall initiate the frame send by calling amqp_frame_codec_begin_encode_frame.] */ + /* Codes_S_R_S_CONNECTION_01_251: [The channel number passed to amqp_frame_codec_begin_encode_frame shall be the outgoing channel number associated with the endpoint by connection_create_endpoint.] */ + /* Codes_S_R_S_CONNECTION_01_252: [The performative passed to amqp_frame_codec_begin_encode_frame shall be the performative argument of connection_encode_frame.] */ connection->on_send_complete = on_send_complete; connection->on_send_complete_callback_context = callback_context; if (amqp_frame_codec_encode_frame(amqp_frame_codec, endpoint->outgoing_channel, performative, payloads, payload_count, on_bytes_encoded, connection) != 0) { - /* Codes_SRS_CONNECTION_01_253: [If amqp_frame_codec_begin_encode_frame or amqp_frame_codec_encode_payload_bytes fails, then connection_encode_frame shall fail and return a non-zero value.] */ + /* Codes_S_R_S_CONNECTION_01_253: [If amqp_frame_codec_begin_encode_frame or amqp_frame_codec_encode_payload_bytes fails, then connection_encode_frame shall fail and return a non-zero value.] */ LogError("Encoding AMQP frame failed"); result = __FAILURE__; } @@ -2012,7 +2062,7 @@ } else { - /* Codes_SRS_CONNECTION_01_248: [On success it shall return 0.] */ + /* Codes_S_R_S_CONNECTION_01_248: [On success it shall return 0.] */ result = 0; } } @@ -2024,14 +2074,14 @@ void connection_set_trace(CONNECTION_HANDLE connection, bool trace_on) { - /* Codes_SRS_CONNECTION_07_002: [If connection is NULL then connection_set_trace shall do nothing.] */ + /* Codes_S_R_S_CONNECTION_07_002: [If connection is NULL then connection_set_trace shall do nothing.] */ if (connection == NULL) { LogError("NULL connection"); } else { - /* Codes_SRS_CONNECTION_07_001: [connection_set_trace shall set the ability to turn on and off trace logging.] */ + /* Codes_S_R_S_CONNECTION_07_001: [connection_set_trace shall set the ability to turn on and off trace logging.] */ connection->is_trace_on = trace_on ? 1 : 0; } } @@ -2056,3 +2106,54 @@ return result; } + +ON_CONNECTION_CLOSED_EVENT_SUBSCRIPTION_HANDLE connection_subscribe_on_connection_close_received(CONNECTION_HANDLE connection, ON_CONNECTION_CLOSE_RECEIVED on_connection_close_received, void* context) +{ + ON_CONNECTION_CLOSED_EVENT_SUBSCRIPTION_HANDLE result; + + /* Codes_S_R_S_CONNECTION_01_279: [ `context` shall be allowed to be NULL. ]*/ + + /* Codes_S_R_S_CONNECTION_01_277: [ If `connection` is NULL, `connection_subscribe_on_connection_close_received` shall fail and return NULL. ]*/ + if ((connection == NULL) || + /* Codes_S_R_S_CONNECTION_01_278: [ If `on_connection_close_received` is NULL, `connection_subscribe_on_connection_close_received` shall fail and return NULL. ]*/ + (on_connection_close_received == NULL)) + { + LogError("Invalid arguments: connection = %p, on_connection_close_received = %p, context = %p", + connection, on_connection_close_received, context); + result = NULL; + } + else + { + if (connection->on_connection_close_received_event_subscription.on_connection_close_received != NULL) + { + /* Codes_S_R_S_CONNECTION_01_280: [ Only one subscription shall be allowed per connection, if a subsequent second even subscription is done while a subscription is active, `connection_subscribe_on_connection_close_received` shall fail and return NULL. ]*/ + LogError("Already subscribed for on_connection_close_received events"); + result = NULL; + } + else + { + /* Codes_S_R_S_CONNECTION_01_275: [ `connection_subscribe_on_connection_close_received` shall register the `on_connection_close_received` handler to be triggered whenever a CLOSE performative is received.. ]*/ + connection->on_connection_close_received_event_subscription.on_connection_close_received = on_connection_close_received; + connection->on_connection_close_received_event_subscription.context = context; + + /* Codes_S_R_S_CONNECTION_01_276: [ On success, `connection_subscribe_on_connection_close_received` shall return a non-NULL handle to the event subcription. ]*/ + result = &connection->on_connection_close_received_event_subscription; + } + } + + return result; +} + +void connection_unsubscribe_on_connection_close_received(ON_CONNECTION_CLOSED_EVENT_SUBSCRIPTION_HANDLE event_subscription) +{ + if (event_subscription == NULL) + { + LogError("NULL event_subscription"); + } + else + { + /* Codes_S_R_S_CONNECTION_01_281: [ `connection_unsubscribe_on_connection_close_received` shall remove the subscription for the connection closed event that was made by calling `connection_subscribe_on_connection_close_received`. ]*/ + event_subscription->on_connection_close_received = NULL; + event_subscription->context = NULL; + } +}