Xin Zhang / azure-iot-c-sdk-f767zi

Dependents:   samplemqtt

Committer:
XinZhangMS
Date:
Thu Aug 23 06:52:14 2018 +0000
Revision:
0:f7f1f0d76dd6
azure-c-sdk for mbed os supporting NUCLEO_F767ZI

Who changed what in which revision?

UserRevisionLine numberNew contents of line
XinZhangMS 0:f7f1f0d76dd6 1 // Copyright (c) Microsoft. All rights reserved.
XinZhangMS 0:f7f1f0d76dd6 2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
XinZhangMS 0:f7f1f0d76dd6 3
XinZhangMS 0:f7f1f0d76dd6 4 #include <stdlib.h>
XinZhangMS 0:f7f1f0d76dd6 5 #include <string.h>
XinZhangMS 0:f7f1f0d76dd6 6
XinZhangMS 0:f7f1f0d76dd6 7 #include "azure_c_shared_utility/optimize_size.h"
XinZhangMS 0:f7f1f0d76dd6 8 #include "azure_c_shared_utility/gballoc.h"
XinZhangMS 0:f7f1f0d76dd6 9 #include "azure_c_shared_utility/xio.h"
XinZhangMS 0:f7f1f0d76dd6 10 #include "azure_c_shared_utility/xlogging.h"
XinZhangMS 0:f7f1f0d76dd6 11 #include "azure_c_shared_utility/tickcounter.h"
XinZhangMS 0:f7f1f0d76dd6 12
XinZhangMS 0:f7f1f0d76dd6 13 #include "azure_uamqp_c/connection.h"
XinZhangMS 0:f7f1f0d76dd6 14 #include "azure_uamqp_c/frame_codec.h"
XinZhangMS 0:f7f1f0d76dd6 15 #include "azure_uamqp_c/amqpvalue.h"
XinZhangMS 0:f7f1f0d76dd6 16 #include "azure_uamqp_c/amqp_frame_codec.h"
XinZhangMS 0:f7f1f0d76dd6 17 #include "azure_uamqp_c/amqp_definitions.h"
XinZhangMS 0:f7f1f0d76dd6 18 #include "azure_uamqp_c/amqpvalue_to_string.h"
XinZhangMS 0:f7f1f0d76dd6 19
XinZhangMS 0:f7f1f0d76dd6 20 /* Requirements satisfied by the virtue of implementing the ISO:*/
XinZhangMS 0:f7f1f0d76dd6 21 /* Codes_S_R_S_CONNECTION_01_088: [Any data appearing beyond the protocol header MUST match the version indicated by the protocol header.] */
XinZhangMS 0:f7f1f0d76dd6 22 /* Codes_S_R_S_CONNECTION_01_015: [Implementations SHOULD NOT expect to be able to reuse open TCP sockets after close performatives have been exchanged.] */
XinZhangMS 0:f7f1f0d76dd6 23
XinZhangMS 0:f7f1f0d76dd6 24 /* Codes_S_R_S_CONNECTION_01_087: [The protocol header consists of the upper case ASCII letters "AMQP" followed by a protocol id of zero, followed by three unsigned bytes representing the major, minor, and revision of the protocol version (currently 1 (MAJOR), 0 (MINOR), 0 (REVISION)). In total this is an 8-octet sequence] */
XinZhangMS 0:f7f1f0d76dd6 25 static const unsigned char amqp_header[] = { 'A', 'M', 'Q', 'P', 0, 1, 0, 0 };
XinZhangMS 0:f7f1f0d76dd6 26
XinZhangMS 0:f7f1f0d76dd6 27 typedef enum RECEIVE_FRAME_STATE_TAG
XinZhangMS 0:f7f1f0d76dd6 28 {
XinZhangMS 0:f7f1f0d76dd6 29 RECEIVE_FRAME_STATE_FRAME_SIZE,
XinZhangMS 0:f7f1f0d76dd6 30 RECEIVE_FRAME_STATE_FRAME_DATA
XinZhangMS 0:f7f1f0d76dd6 31 } RECEIVE_FRAME_STATE;
XinZhangMS 0:f7f1f0d76dd6 32
XinZhangMS 0:f7f1f0d76dd6 33 typedef struct ON_CONNECTION_CLOSED_EVENT_SUBSCRIPTION_TAG
XinZhangMS 0:f7f1f0d76dd6 34 {
XinZhangMS 0:f7f1f0d76dd6 35 ON_CONNECTION_CLOSE_RECEIVED on_connection_close_received;
XinZhangMS 0:f7f1f0d76dd6 36 void* context;
XinZhangMS 0:f7f1f0d76dd6 37 } ON_CONNECTION_CLOSED_EVENT_SUBSCRIPTION;
XinZhangMS 0:f7f1f0d76dd6 38
XinZhangMS 0:f7f1f0d76dd6 39 typedef struct ENDPOINT_INSTANCE_TAG
XinZhangMS 0:f7f1f0d76dd6 40 {
XinZhangMS 0:f7f1f0d76dd6 41 uint16_t incoming_channel;
XinZhangMS 0:f7f1f0d76dd6 42 uint16_t outgoing_channel;
XinZhangMS 0:f7f1f0d76dd6 43 ON_ENDPOINT_FRAME_RECEIVED on_endpoint_frame_received;
XinZhangMS 0:f7f1f0d76dd6 44 ON_CONNECTION_STATE_CHANGED on_connection_state_changed;
XinZhangMS 0:f7f1f0d76dd6 45 void* callback_context;
XinZhangMS 0:f7f1f0d76dd6 46 CONNECTION_HANDLE connection;
XinZhangMS 0:f7f1f0d76dd6 47 } ENDPOINT_INSTANCE;
XinZhangMS 0:f7f1f0d76dd6 48
XinZhangMS 0:f7f1f0d76dd6 49 typedef struct CONNECTION_INSTANCE_TAG
XinZhangMS 0:f7f1f0d76dd6 50 {
XinZhangMS 0:f7f1f0d76dd6 51 XIO_HANDLE io;
XinZhangMS 0:f7f1f0d76dd6 52 size_t header_bytes_received;
XinZhangMS 0:f7f1f0d76dd6 53 CONNECTION_STATE connection_state;
XinZhangMS 0:f7f1f0d76dd6 54 FRAME_CODEC_HANDLE frame_codec;
XinZhangMS 0:f7f1f0d76dd6 55 AMQP_FRAME_CODEC_HANDLE amqp_frame_codec;
XinZhangMS 0:f7f1f0d76dd6 56 ENDPOINT_INSTANCE** endpoints;
XinZhangMS 0:f7f1f0d76dd6 57 uint32_t endpoint_count;
XinZhangMS 0:f7f1f0d76dd6 58 char* host_name;
XinZhangMS 0:f7f1f0d76dd6 59 char* container_id;
XinZhangMS 0:f7f1f0d76dd6 60 TICK_COUNTER_HANDLE tick_counter;
XinZhangMS 0:f7f1f0d76dd6 61 uint32_t remote_max_frame_size;
XinZhangMS 0:f7f1f0d76dd6 62
XinZhangMS 0:f7f1f0d76dd6 63 ON_SEND_COMPLETE on_send_complete;
XinZhangMS 0:f7f1f0d76dd6 64 void* on_send_complete_callback_context;
XinZhangMS 0:f7f1f0d76dd6 65
XinZhangMS 0:f7f1f0d76dd6 66 ON_NEW_ENDPOINT on_new_endpoint;
XinZhangMS 0:f7f1f0d76dd6 67 void* on_new_endpoint_callback_context;
XinZhangMS 0:f7f1f0d76dd6 68
XinZhangMS 0:f7f1f0d76dd6 69 ON_CONNECTION_STATE_CHANGED on_connection_state_changed;
XinZhangMS 0:f7f1f0d76dd6 70 void* on_connection_state_changed_callback_context;
XinZhangMS 0:f7f1f0d76dd6 71 ON_IO_ERROR on_io_error;
XinZhangMS 0:f7f1f0d76dd6 72 void* on_io_error_callback_context;
XinZhangMS 0:f7f1f0d76dd6 73
XinZhangMS 0:f7f1f0d76dd6 74 ON_CONNECTION_CLOSED_EVENT_SUBSCRIPTION on_connection_close_received_event_subscription;
XinZhangMS 0:f7f1f0d76dd6 75
XinZhangMS 0:f7f1f0d76dd6 76 /* options */
XinZhangMS 0:f7f1f0d76dd6 77 uint32_t max_frame_size;
XinZhangMS 0:f7f1f0d76dd6 78 uint16_t channel_max;
XinZhangMS 0:f7f1f0d76dd6 79 milliseconds idle_timeout;
XinZhangMS 0:f7f1f0d76dd6 80 milliseconds remote_idle_timeout;
XinZhangMS 0:f7f1f0d76dd6 81 milliseconds remote_idle_timeout_send_frame_millisecond;
XinZhangMS 0:f7f1f0d76dd6 82 double idle_timeout_empty_frame_send_ratio;
XinZhangMS 0:f7f1f0d76dd6 83 tickcounter_ms_t last_frame_received_time;
XinZhangMS 0:f7f1f0d76dd6 84 tickcounter_ms_t last_frame_sent_time;
XinZhangMS 0:f7f1f0d76dd6 85 fields properties;
XinZhangMS 0:f7f1f0d76dd6 86
XinZhangMS 0:f7f1f0d76dd6 87 unsigned int is_underlying_io_open : 1;
XinZhangMS 0:f7f1f0d76dd6 88 unsigned int idle_timeout_specified : 1;
XinZhangMS 0:f7f1f0d76dd6 89 unsigned int is_remote_frame_received : 1;
XinZhangMS 0:f7f1f0d76dd6 90 unsigned int is_trace_on : 1;
XinZhangMS 0:f7f1f0d76dd6 91 } CONNECTION_INSTANCE;
XinZhangMS 0:f7f1f0d76dd6 92
XinZhangMS 0:f7f1f0d76dd6 93 /* Codes_S_R_S_CONNECTION_01_258: [on_connection_state_changed shall be invoked whenever the connection state changes.]*/
XinZhangMS 0:f7f1f0d76dd6 94 static void connection_set_state(CONNECTION_HANDLE connection, CONNECTION_STATE connection_state)
XinZhangMS 0:f7f1f0d76dd6 95 {
XinZhangMS 0:f7f1f0d76dd6 96 uint64_t i;
XinZhangMS 0:f7f1f0d76dd6 97
XinZhangMS 0:f7f1f0d76dd6 98 CONNECTION_STATE previous_state = connection->connection_state;
XinZhangMS 0:f7f1f0d76dd6 99 connection->connection_state = connection_state;
XinZhangMS 0:f7f1f0d76dd6 100
XinZhangMS 0:f7f1f0d76dd6 101 /* Codes_S_R_S_CONNECTION_22_001: [If a connection state changed occurs and a callback is registered the callback shall be called.] */
XinZhangMS 0:f7f1f0d76dd6 102 if (connection->on_connection_state_changed)
XinZhangMS 0:f7f1f0d76dd6 103 {
XinZhangMS 0:f7f1f0d76dd6 104 connection->on_connection_state_changed(connection->on_connection_state_changed_callback_context, connection_state, previous_state);
XinZhangMS 0:f7f1f0d76dd6 105 }
XinZhangMS 0:f7f1f0d76dd6 106
XinZhangMS 0:f7f1f0d76dd6 107 /* Codes_S_R_S_CONNECTION_01_260: [Each endpoint's on_connection_state_changed shall be called.] */
XinZhangMS 0:f7f1f0d76dd6 108 for (i = 0; i < connection->endpoint_count; i++)
XinZhangMS 0:f7f1f0d76dd6 109 {
XinZhangMS 0:f7f1f0d76dd6 110 /* Codes_S_R_S_CONNECTION_01_259: [The callback_context passed in connection_create_endpoint.] */
XinZhangMS 0:f7f1f0d76dd6 111 if (connection->endpoints[i]->on_connection_state_changed != NULL)
XinZhangMS 0:f7f1f0d76dd6 112 {
XinZhangMS 0:f7f1f0d76dd6 113 connection->endpoints[i]->on_connection_state_changed(connection->endpoints[i]->callback_context, connection_state, previous_state);
XinZhangMS 0:f7f1f0d76dd6 114 }
XinZhangMS 0:f7f1f0d76dd6 115 }
XinZhangMS 0:f7f1f0d76dd6 116 }
XinZhangMS 0:f7f1f0d76dd6 117
XinZhangMS 0:f7f1f0d76dd6 118 // This callback usage needs to be either verified and commented or integrated into
XinZhangMS 0:f7f1f0d76dd6 119 // the state machine.
XinZhangMS 0:f7f1f0d76dd6 120 static void unchecked_on_send_complete(void* context, IO_SEND_RESULT send_result)
XinZhangMS 0:f7f1f0d76dd6 121 {
XinZhangMS 0:f7f1f0d76dd6 122 (void)context;
XinZhangMS 0:f7f1f0d76dd6 123 (void)send_result;
XinZhangMS 0:f7f1f0d76dd6 124 }
XinZhangMS 0:f7f1f0d76dd6 125
XinZhangMS 0:f7f1f0d76dd6 126 static int send_header(CONNECTION_HANDLE connection)
XinZhangMS 0:f7f1f0d76dd6 127 {
XinZhangMS 0:f7f1f0d76dd6 128 int result;
XinZhangMS 0:f7f1f0d76dd6 129
XinZhangMS 0:f7f1f0d76dd6 130 /* Codes_S_R_S_CONNECTION_01_093: [_ When the client opens a new socket connection to a server, it MUST send a protocol header with the client's preferred protocol version.] */
XinZhangMS 0:f7f1f0d76dd6 131 /* Codes_S_R_S_CONNECTION_01_104: [Sending the protocol header shall be done by using xio_send.] */
XinZhangMS 0:f7f1f0d76dd6 132 if (xio_send(connection->io, amqp_header, sizeof(amqp_header), unchecked_on_send_complete, NULL) != 0)
XinZhangMS 0:f7f1f0d76dd6 133 {
XinZhangMS 0:f7f1f0d76dd6 134 /* Codes_S_R_S_CONNECTION_01_106: [When sending the protocol header fails, the connection shall be immediately closed.] */
XinZhangMS 0:f7f1f0d76dd6 135 if (xio_close(connection->io, NULL, NULL) != 0)
XinZhangMS 0:f7f1f0d76dd6 136 {
XinZhangMS 0:f7f1f0d76dd6 137 LogError("xio_close failed");
XinZhangMS 0:f7f1f0d76dd6 138 }
XinZhangMS 0:f7f1f0d76dd6 139
XinZhangMS 0:f7f1f0d76dd6 140 /* Codes_S_R_S_CONNECTION_01_057: [END In this state it is illegal for either endpoint to write anything more onto the connection. The connection can be safely closed and discarded.] */
XinZhangMS 0:f7f1f0d76dd6 141 connection_set_state(connection, CONNECTION_STATE_END);
XinZhangMS 0:f7f1f0d76dd6 142
XinZhangMS 0:f7f1f0d76dd6 143 /* Codes_S_R_S_CONNECTION_01_105: [When xio_send fails, connection_dowork shall return a non-zero value.] */
XinZhangMS 0:f7f1f0d76dd6 144 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 145 }
XinZhangMS 0:f7f1f0d76dd6 146 else
XinZhangMS 0:f7f1f0d76dd6 147 {
XinZhangMS 0:f7f1f0d76dd6 148 if (connection->is_trace_on == 1)
XinZhangMS 0:f7f1f0d76dd6 149 {
XinZhangMS 0:f7f1f0d76dd6 150 LOG(AZ_LOG_TRACE, LOG_LINE, "-> Header (AMQP 0.1.0.0)");
XinZhangMS 0:f7f1f0d76dd6 151 }
XinZhangMS 0:f7f1f0d76dd6 152
XinZhangMS 0:f7f1f0d76dd6 153 /* Codes_S_R_S_CONNECTION_01_041: [HDR SENT In this state the connection header has been sent to the peer but no connection header has been received.] */
XinZhangMS 0:f7f1f0d76dd6 154 connection_set_state(connection, CONNECTION_STATE_HDR_SENT);
XinZhangMS 0:f7f1f0d76dd6 155 result = 0;
XinZhangMS 0:f7f1f0d76dd6 156 }
XinZhangMS 0:f7f1f0d76dd6 157
XinZhangMS 0:f7f1f0d76dd6 158 return result;
XinZhangMS 0:f7f1f0d76dd6 159 }
XinZhangMS 0:f7f1f0d76dd6 160
XinZhangMS 0:f7f1f0d76dd6 161 #ifndef NO_LOGGING
XinZhangMS 0:f7f1f0d76dd6 162 static const char* get_frame_type_as_string(AMQP_VALUE descriptor)
XinZhangMS 0:f7f1f0d76dd6 163 {
XinZhangMS 0:f7f1f0d76dd6 164 const char* result;
XinZhangMS 0:f7f1f0d76dd6 165
XinZhangMS 0:f7f1f0d76dd6 166 if (is_open_type_by_descriptor(descriptor))
XinZhangMS 0:f7f1f0d76dd6 167 {
XinZhangMS 0:f7f1f0d76dd6 168 result = "[OPEN]";
XinZhangMS 0:f7f1f0d76dd6 169 }
XinZhangMS 0:f7f1f0d76dd6 170 else if (is_begin_type_by_descriptor(descriptor))
XinZhangMS 0:f7f1f0d76dd6 171 {
XinZhangMS 0:f7f1f0d76dd6 172 result = "[BEGIN]";
XinZhangMS 0:f7f1f0d76dd6 173 }
XinZhangMS 0:f7f1f0d76dd6 174 else if (is_attach_type_by_descriptor(descriptor))
XinZhangMS 0:f7f1f0d76dd6 175 {
XinZhangMS 0:f7f1f0d76dd6 176 result = "[ATTACH]";
XinZhangMS 0:f7f1f0d76dd6 177 }
XinZhangMS 0:f7f1f0d76dd6 178 else if (is_flow_type_by_descriptor(descriptor))
XinZhangMS 0:f7f1f0d76dd6 179 {
XinZhangMS 0:f7f1f0d76dd6 180 result = "[FLOW]";
XinZhangMS 0:f7f1f0d76dd6 181 }
XinZhangMS 0:f7f1f0d76dd6 182 else if (is_disposition_type_by_descriptor(descriptor))
XinZhangMS 0:f7f1f0d76dd6 183 {
XinZhangMS 0:f7f1f0d76dd6 184 result = "[DISPOSITION]";
XinZhangMS 0:f7f1f0d76dd6 185 }
XinZhangMS 0:f7f1f0d76dd6 186 else if (is_transfer_type_by_descriptor(descriptor))
XinZhangMS 0:f7f1f0d76dd6 187 {
XinZhangMS 0:f7f1f0d76dd6 188 result = "[TRANSFER]";
XinZhangMS 0:f7f1f0d76dd6 189 }
XinZhangMS 0:f7f1f0d76dd6 190 else if (is_detach_type_by_descriptor(descriptor))
XinZhangMS 0:f7f1f0d76dd6 191 {
XinZhangMS 0:f7f1f0d76dd6 192 result = "[DETACH]";
XinZhangMS 0:f7f1f0d76dd6 193 }
XinZhangMS 0:f7f1f0d76dd6 194 else if (is_end_type_by_descriptor(descriptor))
XinZhangMS 0:f7f1f0d76dd6 195 {
XinZhangMS 0:f7f1f0d76dd6 196 result = "[END]";
XinZhangMS 0:f7f1f0d76dd6 197 }
XinZhangMS 0:f7f1f0d76dd6 198 else if (is_close_type_by_descriptor(descriptor))
XinZhangMS 0:f7f1f0d76dd6 199 {
XinZhangMS 0:f7f1f0d76dd6 200 result = "[CLOSE]";
XinZhangMS 0:f7f1f0d76dd6 201 }
XinZhangMS 0:f7f1f0d76dd6 202 else
XinZhangMS 0:f7f1f0d76dd6 203 {
XinZhangMS 0:f7f1f0d76dd6 204 result = "[Unknown]";
XinZhangMS 0:f7f1f0d76dd6 205 }
XinZhangMS 0:f7f1f0d76dd6 206
XinZhangMS 0:f7f1f0d76dd6 207 return result;
XinZhangMS 0:f7f1f0d76dd6 208 }
XinZhangMS 0:f7f1f0d76dd6 209 #endif // NO_LOGGING
XinZhangMS 0:f7f1f0d76dd6 210
XinZhangMS 0:f7f1f0d76dd6 211 static void log_incoming_frame(AMQP_VALUE performative)
XinZhangMS 0:f7f1f0d76dd6 212 {
XinZhangMS 0:f7f1f0d76dd6 213 #ifdef NO_LOGGING
XinZhangMS 0:f7f1f0d76dd6 214 UNUSED(performative);
XinZhangMS 0:f7f1f0d76dd6 215 #else
XinZhangMS 0:f7f1f0d76dd6 216 AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(performative);
XinZhangMS 0:f7f1f0d76dd6 217 if (descriptor == NULL)
XinZhangMS 0:f7f1f0d76dd6 218 {
XinZhangMS 0:f7f1f0d76dd6 219 LogError("Error getting performative descriptor");
XinZhangMS 0:f7f1f0d76dd6 220 }
XinZhangMS 0:f7f1f0d76dd6 221 else
XinZhangMS 0:f7f1f0d76dd6 222 {
XinZhangMS 0:f7f1f0d76dd6 223 char* performative_as_string;
XinZhangMS 0:f7f1f0d76dd6 224 LOG(AZ_LOG_TRACE, 0, "<- ");
XinZhangMS 0:f7f1f0d76dd6 225 LOG(AZ_LOG_TRACE, 0, (char*)get_frame_type_as_string(descriptor));
XinZhangMS 0:f7f1f0d76dd6 226 performative_as_string = NULL;
XinZhangMS 0:f7f1f0d76dd6 227 LOG(AZ_LOG_TRACE, LOG_LINE, (performative_as_string = amqpvalue_to_string(performative)));
XinZhangMS 0:f7f1f0d76dd6 228 if (performative_as_string != NULL)
XinZhangMS 0:f7f1f0d76dd6 229 {
XinZhangMS 0:f7f1f0d76dd6 230 free(performative_as_string);
XinZhangMS 0:f7f1f0d76dd6 231 }
XinZhangMS 0:f7f1f0d76dd6 232 }
XinZhangMS 0:f7f1f0d76dd6 233 #endif
XinZhangMS 0:f7f1f0d76dd6 234 }
XinZhangMS 0:f7f1f0d76dd6 235
XinZhangMS 0:f7f1f0d76dd6 236 static void log_outgoing_frame(AMQP_VALUE performative)
XinZhangMS 0:f7f1f0d76dd6 237 {
XinZhangMS 0:f7f1f0d76dd6 238 #ifdef NO_LOGGING
XinZhangMS 0:f7f1f0d76dd6 239 UNUSED(performative);
XinZhangMS 0:f7f1f0d76dd6 240 #else
XinZhangMS 0:f7f1f0d76dd6 241 AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(performative);
XinZhangMS 0:f7f1f0d76dd6 242 if (descriptor == NULL)
XinZhangMS 0:f7f1f0d76dd6 243 {
XinZhangMS 0:f7f1f0d76dd6 244 LogError("Error getting performative descriptor");
XinZhangMS 0:f7f1f0d76dd6 245 }
XinZhangMS 0:f7f1f0d76dd6 246 else
XinZhangMS 0:f7f1f0d76dd6 247 {
XinZhangMS 0:f7f1f0d76dd6 248 char* performative_as_string;
XinZhangMS 0:f7f1f0d76dd6 249 LOG(AZ_LOG_TRACE, 0, "-> ");
XinZhangMS 0:f7f1f0d76dd6 250 LOG(AZ_LOG_TRACE, 0, (char*)get_frame_type_as_string(descriptor));
XinZhangMS 0:f7f1f0d76dd6 251 performative_as_string = NULL;
XinZhangMS 0:f7f1f0d76dd6 252 LOG(AZ_LOG_TRACE, LOG_LINE, (performative_as_string = amqpvalue_to_string(performative)));
XinZhangMS 0:f7f1f0d76dd6 253 if (performative_as_string != NULL)
XinZhangMS 0:f7f1f0d76dd6 254 {
XinZhangMS 0:f7f1f0d76dd6 255 free(performative_as_string);
XinZhangMS 0:f7f1f0d76dd6 256 }
XinZhangMS 0:f7f1f0d76dd6 257 }
XinZhangMS 0:f7f1f0d76dd6 258 #endif
XinZhangMS 0:f7f1f0d76dd6 259 }
XinZhangMS 0:f7f1f0d76dd6 260
XinZhangMS 0:f7f1f0d76dd6 261 static void on_bytes_encoded(void* context, const unsigned char* bytes, size_t length, bool encode_complete)
XinZhangMS 0:f7f1f0d76dd6 262 {
XinZhangMS 0:f7f1f0d76dd6 263 CONNECTION_HANDLE connection = (CONNECTION_HANDLE)context;
XinZhangMS 0:f7f1f0d76dd6 264 if (xio_send(connection->io, bytes, length,
XinZhangMS 0:f7f1f0d76dd6 265 (encode_complete && connection->on_send_complete != NULL) ? connection->on_send_complete : unchecked_on_send_complete,
XinZhangMS 0:f7f1f0d76dd6 266 connection->on_send_complete_callback_context) != 0)
XinZhangMS 0:f7f1f0d76dd6 267 {
XinZhangMS 0:f7f1f0d76dd6 268 LogError("Cannot send encoded bytes");
XinZhangMS 0:f7f1f0d76dd6 269
XinZhangMS 0:f7f1f0d76dd6 270 if (xio_close(connection->io, NULL, NULL) != 0)
XinZhangMS 0:f7f1f0d76dd6 271 {
XinZhangMS 0:f7f1f0d76dd6 272 LogError("xio_close failed");
XinZhangMS 0:f7f1f0d76dd6 273 }
XinZhangMS 0:f7f1f0d76dd6 274
XinZhangMS 0:f7f1f0d76dd6 275 connection_set_state(connection, CONNECTION_STATE_END);
XinZhangMS 0:f7f1f0d76dd6 276 }
XinZhangMS 0:f7f1f0d76dd6 277 }
XinZhangMS 0:f7f1f0d76dd6 278
XinZhangMS 0:f7f1f0d76dd6 279 static int send_open_frame(CONNECTION_HANDLE connection)
XinZhangMS 0:f7f1f0d76dd6 280 {
XinZhangMS 0:f7f1f0d76dd6 281 int result;
XinZhangMS 0:f7f1f0d76dd6 282
XinZhangMS 0:f7f1f0d76dd6 283 /* Codes_S_R_S_CONNECTION_01_151: [The connection max_frame_size setting shall be passed down to the frame_codec when the Open frame is sent.] */
XinZhangMS 0:f7f1f0d76dd6 284 if (frame_codec_set_max_frame_size(connection->frame_codec, connection->max_frame_size) != 0)
XinZhangMS 0:f7f1f0d76dd6 285 {
XinZhangMS 0:f7f1f0d76dd6 286 LogError("Cannot set max frame size");
XinZhangMS 0:f7f1f0d76dd6 287
XinZhangMS 0:f7f1f0d76dd6 288 /* Codes_S_R_S_CONNECTION_01_207: [If frame_codec_set_max_frame_size fails the connection shall be closed and the state set to END.] */
XinZhangMS 0:f7f1f0d76dd6 289 if (xio_close(connection->io, NULL, NULL) != 0)
XinZhangMS 0:f7f1f0d76dd6 290 {
XinZhangMS 0:f7f1f0d76dd6 291 LogError("xio_close failed");
XinZhangMS 0:f7f1f0d76dd6 292 }
XinZhangMS 0:f7f1f0d76dd6 293
XinZhangMS 0:f7f1f0d76dd6 294 connection_set_state(connection, CONNECTION_STATE_END);
XinZhangMS 0:f7f1f0d76dd6 295 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 296 }
XinZhangMS 0:f7f1f0d76dd6 297 else
XinZhangMS 0:f7f1f0d76dd6 298 {
XinZhangMS 0:f7f1f0d76dd6 299 /* Codes_S_R_S_CONNECTION_01_134: [The container id field shall be filled with the container id specified in connection_create.] */
XinZhangMS 0:f7f1f0d76dd6 300 OPEN_HANDLE open_performative = open_create(connection->container_id);
XinZhangMS 0:f7f1f0d76dd6 301 if (open_performative == NULL)
XinZhangMS 0:f7f1f0d76dd6 302 {
XinZhangMS 0:f7f1f0d76dd6 303 LogError("Cannot create OPEN performative");
XinZhangMS 0:f7f1f0d76dd6 304
XinZhangMS 0:f7f1f0d76dd6 305 /* Codes_S_R_S_CONNECTION_01_208: [If the open frame cannot be constructed, the connection shall be closed and set to the END state.] */
XinZhangMS 0:f7f1f0d76dd6 306 if (xio_close(connection->io, NULL, NULL) != 0)
XinZhangMS 0:f7f1f0d76dd6 307 {
XinZhangMS 0:f7f1f0d76dd6 308 LogError("xio_close failed");
XinZhangMS 0:f7f1f0d76dd6 309 }
XinZhangMS 0:f7f1f0d76dd6 310
XinZhangMS 0:f7f1f0d76dd6 311 connection_set_state(connection, CONNECTION_STATE_END);
XinZhangMS 0:f7f1f0d76dd6 312 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 313 }
XinZhangMS 0:f7f1f0d76dd6 314 else
XinZhangMS 0:f7f1f0d76dd6 315 {
XinZhangMS 0:f7f1f0d76dd6 316 /* Codes_S_R_S_CONNECTION_01_137: [The max_frame_size connection setting shall be set in the open frame by using open_set_max_frame_size.] */
XinZhangMS 0:f7f1f0d76dd6 317 if (open_set_max_frame_size(open_performative, connection->max_frame_size) != 0)
XinZhangMS 0:f7f1f0d76dd6 318 {
XinZhangMS 0:f7f1f0d76dd6 319 LogError("Cannot set max frame size");
XinZhangMS 0:f7f1f0d76dd6 320
XinZhangMS 0:f7f1f0d76dd6 321 /* Codes_S_R_S_CONNECTION_01_208: [If the open frame cannot be constructed, the connection shall be closed and set to the END state.] */
XinZhangMS 0:f7f1f0d76dd6 322 if (xio_close(connection->io, NULL, NULL) != 0)
XinZhangMS 0:f7f1f0d76dd6 323 {
XinZhangMS 0:f7f1f0d76dd6 324 LogError("xio_close failed");
XinZhangMS 0:f7f1f0d76dd6 325 }
XinZhangMS 0:f7f1f0d76dd6 326
XinZhangMS 0:f7f1f0d76dd6 327 connection_set_state(connection, CONNECTION_STATE_END);
XinZhangMS 0:f7f1f0d76dd6 328 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 329 }
XinZhangMS 0:f7f1f0d76dd6 330 /* Codes_S_R_S_CONNECTION_01_139: [The channel_max connection setting shall be set in the open frame by using open_set_channel_max.] */
XinZhangMS 0:f7f1f0d76dd6 331 else if (open_set_channel_max(open_performative, connection->channel_max) != 0)
XinZhangMS 0:f7f1f0d76dd6 332 {
XinZhangMS 0:f7f1f0d76dd6 333 LogError("Cannot set max channel");
XinZhangMS 0:f7f1f0d76dd6 334
XinZhangMS 0:f7f1f0d76dd6 335 /* Codes_S_R_S_CONNECTION_01_208: [If the open frame cannot be constructed, the connection shall be closed and set to the END state.] */
XinZhangMS 0:f7f1f0d76dd6 336 if (xio_close(connection->io, NULL, NULL) != 0)
XinZhangMS 0:f7f1f0d76dd6 337 {
XinZhangMS 0:f7f1f0d76dd6 338 LogError("xio_close failed");
XinZhangMS 0:f7f1f0d76dd6 339 }
XinZhangMS 0:f7f1f0d76dd6 340
XinZhangMS 0:f7f1f0d76dd6 341 connection_set_state(connection, CONNECTION_STATE_END);
XinZhangMS 0:f7f1f0d76dd6 342 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 343 }
XinZhangMS 0:f7f1f0d76dd6 344 /* Codes_S_R_S_CONNECTION_01_142: [If no idle_timeout value has been specified, no value shall be stamped in the open frame (no call to open_set_idle_time_out shall be made).] */
XinZhangMS 0:f7f1f0d76dd6 345 else if ((connection->idle_timeout_specified) &&
XinZhangMS 0:f7f1f0d76dd6 346 /* Codes_S_R_S_CONNECTION_01_141: [If idle_timeout has been specified by a call to connection_set_idle_timeout, then that value shall be stamped in the open frame.] */
XinZhangMS 0:f7f1f0d76dd6 347 (open_set_idle_time_out(open_performative, connection->idle_timeout) != 0))
XinZhangMS 0:f7f1f0d76dd6 348 {
XinZhangMS 0:f7f1f0d76dd6 349 /* Codes_S_R_S_CONNECTION_01_208: [If the open frame cannot be constructed, the connection shall be closed and set to the END state.] */
XinZhangMS 0:f7f1f0d76dd6 350 if (xio_close(connection->io, NULL, NULL) != 0)
XinZhangMS 0:f7f1f0d76dd6 351 {
XinZhangMS 0:f7f1f0d76dd6 352 LogError("xio_close failed");
XinZhangMS 0:f7f1f0d76dd6 353 }
XinZhangMS 0:f7f1f0d76dd6 354
XinZhangMS 0:f7f1f0d76dd6 355 connection_set_state(connection, CONNECTION_STATE_END);
XinZhangMS 0:f7f1f0d76dd6 356 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 357 }
XinZhangMS 0:f7f1f0d76dd6 358 /* Codes_S_R_S_CONNECTION_01_136: [If no hostname value has been specified, no value shall be stamped in the open frame (no call to open_set_hostname shall be made).] */
XinZhangMS 0:f7f1f0d76dd6 359 else if ((connection->host_name != NULL) &&
XinZhangMS 0:f7f1f0d76dd6 360 /* Codes_S_R_S_CONNECTION_01_135: [If hostname has been specified by a call to connection_set_hostname, then that value shall be stamped in the open frame.] */
XinZhangMS 0:f7f1f0d76dd6 361 (open_set_hostname(open_performative, connection->host_name) != 0))
XinZhangMS 0:f7f1f0d76dd6 362 {
XinZhangMS 0:f7f1f0d76dd6 363 LogError("Cannot set hostname");
XinZhangMS 0:f7f1f0d76dd6 364
XinZhangMS 0:f7f1f0d76dd6 365 /* Codes_S_R_S_CONNECTION_01_208: [If the open frame cannot be constructed, the connection shall be closed and set to the END state.] */
XinZhangMS 0:f7f1f0d76dd6 366 if (xio_close(connection->io, NULL, NULL) != 0)
XinZhangMS 0:f7f1f0d76dd6 367 {
XinZhangMS 0:f7f1f0d76dd6 368 LogError("xio_close failed");
XinZhangMS 0:f7f1f0d76dd6 369 }
XinZhangMS 0:f7f1f0d76dd6 370
XinZhangMS 0:f7f1f0d76dd6 371 connection_set_state(connection, CONNECTION_STATE_END);
XinZhangMS 0:f7f1f0d76dd6 372 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 373 }
XinZhangMS 0:f7f1f0d76dd6 374 /* Codes_S_R_S_CONNECTION_01_243: [If no properties value has been specified, no value shall be stamped in the open frame (no call to open_set_properties shall be made).] */
XinZhangMS 0:f7f1f0d76dd6 375 else if ((connection->properties != NULL) &&
XinZhangMS 0:f7f1f0d76dd6 376 /* Codes_S_R_S_CONNECTION_01_244: [If properties has been specified by a call to connection_set_properties, then that value shall be stamped in the open frame.] */
XinZhangMS 0:f7f1f0d76dd6 377 (open_set_properties(open_performative, connection->properties) != 0))
XinZhangMS 0:f7f1f0d76dd6 378 {
XinZhangMS 0:f7f1f0d76dd6 379 LogError("Cannot set properties");
XinZhangMS 0:f7f1f0d76dd6 380
XinZhangMS 0:f7f1f0d76dd6 381 /* Codes_S_R_S_CONNECTION_01_208: [If the open frame cannot be constructed, the connection shall be closed and set to the END state.] */
XinZhangMS 0:f7f1f0d76dd6 382 if (xio_close(connection->io, NULL, NULL) != 0)
XinZhangMS 0:f7f1f0d76dd6 383 {
XinZhangMS 0:f7f1f0d76dd6 384 LogError("xio_close failed");
XinZhangMS 0:f7f1f0d76dd6 385 }
XinZhangMS 0:f7f1f0d76dd6 386
XinZhangMS 0:f7f1f0d76dd6 387 connection_set_state(connection, CONNECTION_STATE_END);
XinZhangMS 0:f7f1f0d76dd6 388 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 389 }
XinZhangMS 0:f7f1f0d76dd6 390 else
XinZhangMS 0:f7f1f0d76dd6 391 {
XinZhangMS 0:f7f1f0d76dd6 392 AMQP_VALUE open_performative_value = amqpvalue_create_open(open_performative);
XinZhangMS 0:f7f1f0d76dd6 393 if (open_performative_value == NULL)
XinZhangMS 0:f7f1f0d76dd6 394 {
XinZhangMS 0:f7f1f0d76dd6 395 LogError("Cannot create OPEN AMQP value");
XinZhangMS 0:f7f1f0d76dd6 396
XinZhangMS 0:f7f1f0d76dd6 397 /* Codes_S_R_S_CONNECTION_01_208: [If the open frame cannot be constructed, the connection shall be closed and set to the END state.] */
XinZhangMS 0:f7f1f0d76dd6 398 if (xio_close(connection->io, NULL, NULL) != 0)
XinZhangMS 0:f7f1f0d76dd6 399 {
XinZhangMS 0:f7f1f0d76dd6 400 LogError("xio_close failed");
XinZhangMS 0:f7f1f0d76dd6 401 }
XinZhangMS 0:f7f1f0d76dd6 402
XinZhangMS 0:f7f1f0d76dd6 403 connection_set_state(connection, CONNECTION_STATE_END);
XinZhangMS 0:f7f1f0d76dd6 404 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 405 }
XinZhangMS 0:f7f1f0d76dd6 406 else
XinZhangMS 0:f7f1f0d76dd6 407 {
XinZhangMS 0:f7f1f0d76dd6 408 /* Codes_S_R_S_CONNECTION_01_002: [Each AMQP connection begins with an exchange of capabilities and limitations, including the maximum frame size.] */
XinZhangMS 0:f7f1f0d76dd6 409 /* Codes_S_R_S_CONNECTION_01_004: [After establishing or accepting a TCP connection and sending the protocol header, each peer MUST send an open frame before sending any other frames.] */
XinZhangMS 0:f7f1f0d76dd6 410 /* Codes_S_R_S_CONNECTION_01_005: [The open frame describes the capabilities and limits of that peer.] */
XinZhangMS 0:f7f1f0d76dd6 411 /* Codes_S_R_S_CONNECTION_01_205: [Sending the AMQP OPEN frame shall be done by calling amqp_frame_codec_begin_encode_frame with channel number 0, the actual performative payload and 0 as payload_size.] */
XinZhangMS 0:f7f1f0d76dd6 412 /* Codes_S_R_S_CONNECTION_01_006: [The open frame can only be sent on channel 0.] */
XinZhangMS 0:f7f1f0d76dd6 413 connection->on_send_complete = NULL;
XinZhangMS 0:f7f1f0d76dd6 414 connection->on_send_complete_callback_context = NULL;
XinZhangMS 0:f7f1f0d76dd6 415 if (amqp_frame_codec_encode_frame(connection->amqp_frame_codec, 0, open_performative_value, NULL, 0, on_bytes_encoded, connection) != 0)
XinZhangMS 0:f7f1f0d76dd6 416 {
XinZhangMS 0:f7f1f0d76dd6 417 LogError("amqp_frame_codec_encode_frame failed");
XinZhangMS 0:f7f1f0d76dd6 418
XinZhangMS 0:f7f1f0d76dd6 419 /* Codes_S_R_S_CONNECTION_01_206: [If sending the frame fails, the connection shall be closed and state set to END.] */
XinZhangMS 0:f7f1f0d76dd6 420 if (xio_close(connection->io, NULL, NULL) != 0)
XinZhangMS 0:f7f1f0d76dd6 421 {
XinZhangMS 0:f7f1f0d76dd6 422 LogError("xio_close failed");
XinZhangMS 0:f7f1f0d76dd6 423 }
XinZhangMS 0:f7f1f0d76dd6 424
XinZhangMS 0:f7f1f0d76dd6 425 connection_set_state(connection, CONNECTION_STATE_END);
XinZhangMS 0:f7f1f0d76dd6 426 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 427 }
XinZhangMS 0:f7f1f0d76dd6 428 else
XinZhangMS 0:f7f1f0d76dd6 429 {
XinZhangMS 0:f7f1f0d76dd6 430 if (connection->is_trace_on == 1)
XinZhangMS 0:f7f1f0d76dd6 431 {
XinZhangMS 0:f7f1f0d76dd6 432 log_outgoing_frame(open_performative_value);
XinZhangMS 0:f7f1f0d76dd6 433 }
XinZhangMS 0:f7f1f0d76dd6 434
XinZhangMS 0:f7f1f0d76dd6 435 /* Codes_S_R_S_CONNECTION_01_046: [OPEN SENT In this state the connection headers have been exchanged. An open frame has been sent to the peer but no open frame has yet been received.] */
XinZhangMS 0:f7f1f0d76dd6 436 connection_set_state(connection, CONNECTION_STATE_OPEN_SENT);
XinZhangMS 0:f7f1f0d76dd6 437 result = 0;
XinZhangMS 0:f7f1f0d76dd6 438 }
XinZhangMS 0:f7f1f0d76dd6 439
XinZhangMS 0:f7f1f0d76dd6 440 amqpvalue_destroy(open_performative_value);
XinZhangMS 0:f7f1f0d76dd6 441 }
XinZhangMS 0:f7f1f0d76dd6 442 }
XinZhangMS 0:f7f1f0d76dd6 443
XinZhangMS 0:f7f1f0d76dd6 444 open_destroy(open_performative);
XinZhangMS 0:f7f1f0d76dd6 445 }
XinZhangMS 0:f7f1f0d76dd6 446 }
XinZhangMS 0:f7f1f0d76dd6 447
XinZhangMS 0:f7f1f0d76dd6 448 return result;
XinZhangMS 0:f7f1f0d76dd6 449 }
XinZhangMS 0:f7f1f0d76dd6 450
XinZhangMS 0:f7f1f0d76dd6 451 static int send_close_frame(CONNECTION_HANDLE connection, ERROR_HANDLE error_handle)
XinZhangMS 0:f7f1f0d76dd6 452 {
XinZhangMS 0:f7f1f0d76dd6 453 int result;
XinZhangMS 0:f7f1f0d76dd6 454 CLOSE_HANDLE close_performative;
XinZhangMS 0:f7f1f0d76dd6 455
XinZhangMS 0:f7f1f0d76dd6 456 /* Codes_S_R_S_CONNECTION_01_217: [The CLOSE frame shall be constructed by using close_create.] */
XinZhangMS 0:f7f1f0d76dd6 457 close_performative = close_create();
XinZhangMS 0:f7f1f0d76dd6 458 if (close_performative == NULL)
XinZhangMS 0:f7f1f0d76dd6 459 {
XinZhangMS 0:f7f1f0d76dd6 460 LogError("Cannot create close performative");
XinZhangMS 0:f7f1f0d76dd6 461 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 462 }
XinZhangMS 0:f7f1f0d76dd6 463 else
XinZhangMS 0:f7f1f0d76dd6 464 {
XinZhangMS 0:f7f1f0d76dd6 465 if ((error_handle != NULL) &&
XinZhangMS 0:f7f1f0d76dd6 466 /* Codes_S_R_S_CONNECTION_01_238: [If set, this field indicates that the connection is being closed due to an error condition.] */
XinZhangMS 0:f7f1f0d76dd6 467 (close_set_error(close_performative, error_handle) != 0))
XinZhangMS 0:f7f1f0d76dd6 468 {
XinZhangMS 0:f7f1f0d76dd6 469 LogError("Cannot set error on CLOSE");
XinZhangMS 0:f7f1f0d76dd6 470 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 471 }
XinZhangMS 0:f7f1f0d76dd6 472 else
XinZhangMS 0:f7f1f0d76dd6 473 {
XinZhangMS 0:f7f1f0d76dd6 474 AMQP_VALUE close_performative_value = amqpvalue_create_close(close_performative);
XinZhangMS 0:f7f1f0d76dd6 475 if (close_performative_value == NULL)
XinZhangMS 0:f7f1f0d76dd6 476 {
XinZhangMS 0:f7f1f0d76dd6 477 LogError("Cannot create AMQP CLOSE performative value");
XinZhangMS 0:f7f1f0d76dd6 478 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 479 }
XinZhangMS 0:f7f1f0d76dd6 480 else
XinZhangMS 0:f7f1f0d76dd6 481 {
XinZhangMS 0:f7f1f0d76dd6 482 /* Codes_S_R_S_CONNECTION_01_215: [Sending the AMQP CLOSE frame shall be done by calling amqp_frame_codec_begin_encode_frame with channel number 0, the actual performative payload and 0 as payload_size.] */
XinZhangMS 0:f7f1f0d76dd6 483 /* Codes_S_R_S_CONNECTION_01_013: [However, implementations SHOULD send it on channel 0] */
XinZhangMS 0:f7f1f0d76dd6 484 connection->on_send_complete = NULL;
XinZhangMS 0:f7f1f0d76dd6 485 connection->on_send_complete_callback_context = NULL;
XinZhangMS 0:f7f1f0d76dd6 486 if (amqp_frame_codec_encode_frame(connection->amqp_frame_codec, 0, close_performative_value, NULL, 0, on_bytes_encoded, connection) != 0)
XinZhangMS 0:f7f1f0d76dd6 487 {
XinZhangMS 0:f7f1f0d76dd6 488 LogError("amqp_frame_codec_encode_frame failed");
XinZhangMS 0:f7f1f0d76dd6 489 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 490 }
XinZhangMS 0:f7f1f0d76dd6 491 else
XinZhangMS 0:f7f1f0d76dd6 492 {
XinZhangMS 0:f7f1f0d76dd6 493 if (connection->is_trace_on == 1)
XinZhangMS 0:f7f1f0d76dd6 494 {
XinZhangMS 0:f7f1f0d76dd6 495 log_outgoing_frame(close_performative_value);
XinZhangMS 0:f7f1f0d76dd6 496 }
XinZhangMS 0:f7f1f0d76dd6 497
XinZhangMS 0:f7f1f0d76dd6 498 result = 0;
XinZhangMS 0:f7f1f0d76dd6 499 }
XinZhangMS 0:f7f1f0d76dd6 500
XinZhangMS 0:f7f1f0d76dd6 501 amqpvalue_destroy(close_performative_value);
XinZhangMS 0:f7f1f0d76dd6 502 }
XinZhangMS 0:f7f1f0d76dd6 503 }
XinZhangMS 0:f7f1f0d76dd6 504
XinZhangMS 0:f7f1f0d76dd6 505 close_destroy(close_performative);
XinZhangMS 0:f7f1f0d76dd6 506 }
XinZhangMS 0:f7f1f0d76dd6 507
XinZhangMS 0:f7f1f0d76dd6 508 return result;
XinZhangMS 0:f7f1f0d76dd6 509 }
XinZhangMS 0:f7f1f0d76dd6 510
XinZhangMS 0:f7f1f0d76dd6 511 static void close_connection_with_error(CONNECTION_HANDLE connection, const char* condition_value, const char* description, AMQP_VALUE info)
XinZhangMS 0:f7f1f0d76dd6 512 {
XinZhangMS 0:f7f1f0d76dd6 513 ERROR_HANDLE error_handle = error_create(condition_value);
XinZhangMS 0:f7f1f0d76dd6 514
XinZhangMS 0:f7f1f0d76dd6 515 if (error_handle == NULL)
XinZhangMS 0:f7f1f0d76dd6 516 {
XinZhangMS 0:f7f1f0d76dd6 517 /* Codes_S_R_S_CONNECTION_01_214: [If the close frame cannot be constructed or sent, the connection shall be closed and set to the END state.] */
XinZhangMS 0:f7f1f0d76dd6 518 if (xio_close(connection->io, NULL, NULL) != 0)
XinZhangMS 0:f7f1f0d76dd6 519 {
XinZhangMS 0:f7f1f0d76dd6 520 LogError("xio_close failed");
XinZhangMS 0:f7f1f0d76dd6 521 }
XinZhangMS 0:f7f1f0d76dd6 522
XinZhangMS 0:f7f1f0d76dd6 523 connection_set_state(connection, CONNECTION_STATE_END);
XinZhangMS 0:f7f1f0d76dd6 524 }
XinZhangMS 0:f7f1f0d76dd6 525 else
XinZhangMS 0:f7f1f0d76dd6 526 {
XinZhangMS 0:f7f1f0d76dd6 527 /* Codes_S_R_S_CONNECTION_01_219: [The error description shall be set to an implementation defined string.] */
XinZhangMS 0:f7f1f0d76dd6 528 if (error_set_description(error_handle, description) != 0)
XinZhangMS 0:f7f1f0d76dd6 529 {
XinZhangMS 0:f7f1f0d76dd6 530 LogError("Cannot set error description on CLOSE frame");
XinZhangMS 0:f7f1f0d76dd6 531
XinZhangMS 0:f7f1f0d76dd6 532 /* Codes_S_R_S_CONNECTION_01_214: [If the close frame cannot be constructed or sent, the connection shall be closed and set to the END state.] */
XinZhangMS 0:f7f1f0d76dd6 533 if (xio_close(connection->io, NULL, NULL) != 0)
XinZhangMS 0:f7f1f0d76dd6 534 {
XinZhangMS 0:f7f1f0d76dd6 535 LogError("xio_close failed");
XinZhangMS 0:f7f1f0d76dd6 536 }
XinZhangMS 0:f7f1f0d76dd6 537
XinZhangMS 0:f7f1f0d76dd6 538 connection_set_state(connection, CONNECTION_STATE_END);
XinZhangMS 0:f7f1f0d76dd6 539 }
XinZhangMS 0:f7f1f0d76dd6 540 else if ((info != NULL) &&
XinZhangMS 0:f7f1f0d76dd6 541 (error_set_info(error_handle, info) != 0))
XinZhangMS 0:f7f1f0d76dd6 542 {
XinZhangMS 0:f7f1f0d76dd6 543 LogError("Cannot set error info on CLOSE frame");
XinZhangMS 0:f7f1f0d76dd6 544
XinZhangMS 0:f7f1f0d76dd6 545 /* Codes_S_R_S_CONNECTION_01_214: [If the close frame cannot be constructed or sent, the connection shall be closed and set to the END state.] */
XinZhangMS 0:f7f1f0d76dd6 546 if (xio_close(connection->io, NULL, NULL) != 0)
XinZhangMS 0:f7f1f0d76dd6 547 {
XinZhangMS 0:f7f1f0d76dd6 548 LogError("xio_close failed");
XinZhangMS 0:f7f1f0d76dd6 549 }
XinZhangMS 0:f7f1f0d76dd6 550
XinZhangMS 0:f7f1f0d76dd6 551 connection_set_state(connection, CONNECTION_STATE_END);
XinZhangMS 0:f7f1f0d76dd6 552 }
XinZhangMS 0:f7f1f0d76dd6 553 else if (send_close_frame(connection, error_handle) != 0)
XinZhangMS 0:f7f1f0d76dd6 554 {
XinZhangMS 0:f7f1f0d76dd6 555 LogError("Cannot send CLOSE frame");
XinZhangMS 0:f7f1f0d76dd6 556
XinZhangMS 0:f7f1f0d76dd6 557 /* Codes_S_R_S_CONNECTION_01_214: [If the close frame cannot be constructed or sent, the connection shall be closed and set to the END state.] */
XinZhangMS 0:f7f1f0d76dd6 558 if (xio_close(connection->io, NULL, NULL) != 0)
XinZhangMS 0:f7f1f0d76dd6 559 {
XinZhangMS 0:f7f1f0d76dd6 560 LogError("xio_close failed");
XinZhangMS 0:f7f1f0d76dd6 561 }
XinZhangMS 0:f7f1f0d76dd6 562
XinZhangMS 0:f7f1f0d76dd6 563 connection_set_state(connection, CONNECTION_STATE_END);
XinZhangMS 0:f7f1f0d76dd6 564 }
XinZhangMS 0:f7f1f0d76dd6 565 else
XinZhangMS 0:f7f1f0d76dd6 566 {
XinZhangMS 0:f7f1f0d76dd6 567 /* Codes_S_R_S_CONNECTION_01_213: [When passing the bytes to frame_codec fails, a CLOSE frame shall be sent and the state shall be set to DISCARDING.] */
XinZhangMS 0:f7f1f0d76dd6 568 /* Codes_S_R_S_CONNECTION_01_055: [DISCARDING The DISCARDING state is a variant of the CLOSE SENT state where the close is triggered by an error.] */
XinZhangMS 0:f7f1f0d76dd6 569 /* Codes_S_R_S_CONNECTION_01_010: [After writing this frame the peer SHOULD continue to read from the connection until it receives the partner's close frame ] */
XinZhangMS 0:f7f1f0d76dd6 570 connection_set_state(connection, CONNECTION_STATE_DISCARDING);
XinZhangMS 0:f7f1f0d76dd6 571 }
XinZhangMS 0:f7f1f0d76dd6 572
XinZhangMS 0:f7f1f0d76dd6 573 error_destroy(error_handle);
XinZhangMS 0:f7f1f0d76dd6 574 }
XinZhangMS 0:f7f1f0d76dd6 575 }
XinZhangMS 0:f7f1f0d76dd6 576
XinZhangMS 0:f7f1f0d76dd6 577 static ENDPOINT_INSTANCE* find_session_endpoint_by_outgoing_channel(CONNECTION_HANDLE connection, uint16_t outgoing_channel)
XinZhangMS 0:f7f1f0d76dd6 578 {
XinZhangMS 0:f7f1f0d76dd6 579 uint32_t i;
XinZhangMS 0:f7f1f0d76dd6 580 ENDPOINT_INSTANCE* result;
XinZhangMS 0:f7f1f0d76dd6 581
XinZhangMS 0:f7f1f0d76dd6 582 for (i = 0; i < connection->endpoint_count; i++)
XinZhangMS 0:f7f1f0d76dd6 583 {
XinZhangMS 0:f7f1f0d76dd6 584 if (connection->endpoints[i]->outgoing_channel == outgoing_channel)
XinZhangMS 0:f7f1f0d76dd6 585 {
XinZhangMS 0:f7f1f0d76dd6 586 break;
XinZhangMS 0:f7f1f0d76dd6 587 }
XinZhangMS 0:f7f1f0d76dd6 588 }
XinZhangMS 0:f7f1f0d76dd6 589
XinZhangMS 0:f7f1f0d76dd6 590 if (i == connection->endpoint_count)
XinZhangMS 0:f7f1f0d76dd6 591 {
XinZhangMS 0:f7f1f0d76dd6 592 LogError("Cannot find session endpoint for channel %u", (unsigned int)outgoing_channel);
XinZhangMS 0:f7f1f0d76dd6 593 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 594 }
XinZhangMS 0:f7f1f0d76dd6 595 else
XinZhangMS 0:f7f1f0d76dd6 596 {
XinZhangMS 0:f7f1f0d76dd6 597 result = connection->endpoints[i];
XinZhangMS 0:f7f1f0d76dd6 598 }
XinZhangMS 0:f7f1f0d76dd6 599
XinZhangMS 0:f7f1f0d76dd6 600 return result;
XinZhangMS 0:f7f1f0d76dd6 601 }
XinZhangMS 0:f7f1f0d76dd6 602
XinZhangMS 0:f7f1f0d76dd6 603 static ENDPOINT_INSTANCE* find_session_endpoint_by_incoming_channel(CONNECTION_HANDLE connection, uint16_t incoming_channel)
XinZhangMS 0:f7f1f0d76dd6 604 {
XinZhangMS 0:f7f1f0d76dd6 605 uint32_t i;
XinZhangMS 0:f7f1f0d76dd6 606 ENDPOINT_INSTANCE* result;
XinZhangMS 0:f7f1f0d76dd6 607
XinZhangMS 0:f7f1f0d76dd6 608 for (i = 0; i < connection->endpoint_count; i++)
XinZhangMS 0:f7f1f0d76dd6 609 {
XinZhangMS 0:f7f1f0d76dd6 610 if (connection->endpoints[i]->incoming_channel == incoming_channel)
XinZhangMS 0:f7f1f0d76dd6 611 {
XinZhangMS 0:f7f1f0d76dd6 612 break;
XinZhangMS 0:f7f1f0d76dd6 613 }
XinZhangMS 0:f7f1f0d76dd6 614 }
XinZhangMS 0:f7f1f0d76dd6 615
XinZhangMS 0:f7f1f0d76dd6 616 if (i == connection->endpoint_count)
XinZhangMS 0:f7f1f0d76dd6 617 {
XinZhangMS 0:f7f1f0d76dd6 618 LogError("Cannot find session endpoint for channel %u", (unsigned int)incoming_channel);
XinZhangMS 0:f7f1f0d76dd6 619 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 620 }
XinZhangMS 0:f7f1f0d76dd6 621 else
XinZhangMS 0:f7f1f0d76dd6 622 {
XinZhangMS 0:f7f1f0d76dd6 623 result = connection->endpoints[i];
XinZhangMS 0:f7f1f0d76dd6 624 }
XinZhangMS 0:f7f1f0d76dd6 625
XinZhangMS 0:f7f1f0d76dd6 626 return result;
XinZhangMS 0:f7f1f0d76dd6 627 }
XinZhangMS 0:f7f1f0d76dd6 628
XinZhangMS 0:f7f1f0d76dd6 629 static int connection_byte_received(CONNECTION_HANDLE connection, unsigned char b)
XinZhangMS 0:f7f1f0d76dd6 630 {
XinZhangMS 0:f7f1f0d76dd6 631 int result;
XinZhangMS 0:f7f1f0d76dd6 632
XinZhangMS 0:f7f1f0d76dd6 633 switch (connection->connection_state)
XinZhangMS 0:f7f1f0d76dd6 634 {
XinZhangMS 0:f7f1f0d76dd6 635 default:
XinZhangMS 0:f7f1f0d76dd6 636 LogError("Unknown connection state: %d", (int)connection->connection_state);
XinZhangMS 0:f7f1f0d76dd6 637 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 638 break;
XinZhangMS 0:f7f1f0d76dd6 639
XinZhangMS 0:f7f1f0d76dd6 640 /* Codes_S_R_S_CONNECTION_01_039: [START In this state a connection exists, but nothing has been sent or received. This is the state an implementation would be in immediately after performing a socket connect or socket accept.] */
XinZhangMS 0:f7f1f0d76dd6 641 case CONNECTION_STATE_START:
XinZhangMS 0:f7f1f0d76dd6 642
XinZhangMS 0:f7f1f0d76dd6 643 /* Codes_S_R_S_CONNECTION_01_041: [HDR SENT In this state the connection header has been sent to the peer but no connection header has been received.] */
XinZhangMS 0:f7f1f0d76dd6 644 case CONNECTION_STATE_HDR_SENT:
XinZhangMS 0:f7f1f0d76dd6 645 if (b != amqp_header[connection->header_bytes_received])
XinZhangMS 0:f7f1f0d76dd6 646 {
XinZhangMS 0:f7f1f0d76dd6 647 /* Codes_S_R_S_CONNECTION_01_089: [If the incoming and outgoing protocol headers do not match, both peers MUST close their outgoing stream] */
XinZhangMS 0:f7f1f0d76dd6 648 if (xio_close(connection->io, NULL, NULL) != 0)
XinZhangMS 0:f7f1f0d76dd6 649 {
XinZhangMS 0:f7f1f0d76dd6 650 LogError("xio_close failed");
XinZhangMS 0:f7f1f0d76dd6 651 }
XinZhangMS 0:f7f1f0d76dd6 652
XinZhangMS 0:f7f1f0d76dd6 653 connection_set_state(connection, CONNECTION_STATE_END);
XinZhangMS 0:f7f1f0d76dd6 654 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 655 }
XinZhangMS 0:f7f1f0d76dd6 656 else
XinZhangMS 0:f7f1f0d76dd6 657 {
XinZhangMS 0:f7f1f0d76dd6 658 connection->header_bytes_received++;
XinZhangMS 0:f7f1f0d76dd6 659 if (connection->header_bytes_received == sizeof(amqp_header))
XinZhangMS 0:f7f1f0d76dd6 660 {
XinZhangMS 0:f7f1f0d76dd6 661 if (connection->is_trace_on == 1)
XinZhangMS 0:f7f1f0d76dd6 662 {
XinZhangMS 0:f7f1f0d76dd6 663 LOG(AZ_LOG_TRACE, LOG_LINE, "<- Header (AMQP 0.1.0.0)");
XinZhangMS 0:f7f1f0d76dd6 664 }
XinZhangMS 0:f7f1f0d76dd6 665
XinZhangMS 0:f7f1f0d76dd6 666 connection_set_state(connection, CONNECTION_STATE_HDR_EXCH);
XinZhangMS 0:f7f1f0d76dd6 667
XinZhangMS 0:f7f1f0d76dd6 668 if (send_open_frame(connection) != 0)
XinZhangMS 0:f7f1f0d76dd6 669 {
XinZhangMS 0:f7f1f0d76dd6 670 LogError("Cannot send open frame");
XinZhangMS 0:f7f1f0d76dd6 671 connection_set_state(connection, CONNECTION_STATE_END);
XinZhangMS 0:f7f1f0d76dd6 672 }
XinZhangMS 0:f7f1f0d76dd6 673 }
XinZhangMS 0:f7f1f0d76dd6 674
XinZhangMS 0:f7f1f0d76dd6 675 result = 0;
XinZhangMS 0:f7f1f0d76dd6 676 }
XinZhangMS 0:f7f1f0d76dd6 677 break;
XinZhangMS 0:f7f1f0d76dd6 678
XinZhangMS 0:f7f1f0d76dd6 679 /* Codes_S_R_S_CONNECTION_01_040: [HDR RCVD In this state the connection header has been received from the peer but a connection header has not been sent.] */
XinZhangMS 0:f7f1f0d76dd6 680 case CONNECTION_STATE_HDR_RCVD:
XinZhangMS 0:f7f1f0d76dd6 681
XinZhangMS 0:f7f1f0d76dd6 682 /* Codes_S_R_S_CONNECTION_01_042: [HDR EXCH In this state the connection header has been sent to the peer and a connection header has been received from the peer.] */
XinZhangMS 0:f7f1f0d76dd6 683 /* we should not really get into this state, but just in case, we would treat that in the same way as HDR_RCVD */
XinZhangMS 0:f7f1f0d76dd6 684 case CONNECTION_STATE_HDR_EXCH:
XinZhangMS 0:f7f1f0d76dd6 685
XinZhangMS 0:f7f1f0d76dd6 686 /* Codes_S_R_S_CONNECTION_01_045: [OPEN RCVD In this state the connection headers have been exchanged. An open frame has been received from the peer but an open frame has not been sent.] */
XinZhangMS 0:f7f1f0d76dd6 687 case CONNECTION_STATE_OPEN_RCVD:
XinZhangMS 0:f7f1f0d76dd6 688
XinZhangMS 0:f7f1f0d76dd6 689 /* Codes_S_R_S_CONNECTION_01_046: [OPEN SENT In this state the connection headers have been exchanged. An open frame has been sent to the peer but no open frame has yet been received.] */
XinZhangMS 0:f7f1f0d76dd6 690 case CONNECTION_STATE_OPEN_SENT:
XinZhangMS 0:f7f1f0d76dd6 691
XinZhangMS 0:f7f1f0d76dd6 692 /* Codes_S_R_S_CONNECTION_01_048: [OPENED In this state the connection header and the open frame have been both sent and received.] */
XinZhangMS 0:f7f1f0d76dd6 693 case CONNECTION_STATE_OPENED:
XinZhangMS 0:f7f1f0d76dd6 694 /* Codes_S_R_S_CONNECTION_01_212: [After the initial handshake has been done all bytes received from the io instance shall be passed to the frame_codec for decoding by calling frame_codec_receive_bytes.] */
XinZhangMS 0:f7f1f0d76dd6 695 if (frame_codec_receive_bytes(connection->frame_codec, &b, 1) != 0)
XinZhangMS 0:f7f1f0d76dd6 696 {
XinZhangMS 0:f7f1f0d76dd6 697 LogError("Cannot process received bytes");
XinZhangMS 0:f7f1f0d76dd6 698 /* Codes_S_R_S_CONNECTION_01_218: [The error amqp:internal-error shall be set in the error.condition field of the CLOSE frame.] */
XinZhangMS 0:f7f1f0d76dd6 699 /* Codes_S_R_S_CONNECTION_01_219: [The error description shall be set to an implementation defined string.] */
XinZhangMS 0:f7f1f0d76dd6 700 close_connection_with_error(connection, "amqp:internal-error", "connection_byte_received::frame_codec_receive_bytes failed", NULL);
XinZhangMS 0:f7f1f0d76dd6 701 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 702 }
XinZhangMS 0:f7f1f0d76dd6 703 else
XinZhangMS 0:f7f1f0d76dd6 704 {
XinZhangMS 0:f7f1f0d76dd6 705 result = 0;
XinZhangMS 0:f7f1f0d76dd6 706 }
XinZhangMS 0:f7f1f0d76dd6 707
XinZhangMS 0:f7f1f0d76dd6 708 break;
XinZhangMS 0:f7f1f0d76dd6 709 }
XinZhangMS 0:f7f1f0d76dd6 710
XinZhangMS 0:f7f1f0d76dd6 711 return result;
XinZhangMS 0:f7f1f0d76dd6 712 }
XinZhangMS 0:f7f1f0d76dd6 713
XinZhangMS 0:f7f1f0d76dd6 714 static void connection_on_bytes_received(void* context, const unsigned char* buffer, size_t size)
XinZhangMS 0:f7f1f0d76dd6 715 {
XinZhangMS 0:f7f1f0d76dd6 716 size_t i;
XinZhangMS 0:f7f1f0d76dd6 717
XinZhangMS 0:f7f1f0d76dd6 718 for (i = 0; i < size; i++)
XinZhangMS 0:f7f1f0d76dd6 719 {
XinZhangMS 0:f7f1f0d76dd6 720 if (connection_byte_received((CONNECTION_HANDLE)context, buffer[i]) != 0)
XinZhangMS 0:f7f1f0d76dd6 721 {
XinZhangMS 0:f7f1f0d76dd6 722 LogError("Cannot process received bytes");
XinZhangMS 0:f7f1f0d76dd6 723 break;
XinZhangMS 0:f7f1f0d76dd6 724 }
XinZhangMS 0:f7f1f0d76dd6 725 }
XinZhangMS 0:f7f1f0d76dd6 726 }
XinZhangMS 0:f7f1f0d76dd6 727
XinZhangMS 0:f7f1f0d76dd6 728 static void connection_on_io_open_complete(void* context, IO_OPEN_RESULT io_open_result)
XinZhangMS 0:f7f1f0d76dd6 729 {
XinZhangMS 0:f7f1f0d76dd6 730 CONNECTION_HANDLE connection = (CONNECTION_HANDLE)context;
XinZhangMS 0:f7f1f0d76dd6 731
XinZhangMS 0:f7f1f0d76dd6 732 if (io_open_result == IO_OPEN_OK)
XinZhangMS 0:f7f1f0d76dd6 733 {
XinZhangMS 0:f7f1f0d76dd6 734 /* Codes_S_R_S_CONNECTION_01_084: [The connection_instance state machine implementing the protocol requirements shall be run as part of connection_dowork.] */
XinZhangMS 0:f7f1f0d76dd6 735 switch (connection->connection_state)
XinZhangMS 0:f7f1f0d76dd6 736 {
XinZhangMS 0:f7f1f0d76dd6 737 default:
XinZhangMS 0:f7f1f0d76dd6 738 LogError("Unknown connection state: %d", (int)connection->connection_state);
XinZhangMS 0:f7f1f0d76dd6 739 break;
XinZhangMS 0:f7f1f0d76dd6 740
XinZhangMS 0:f7f1f0d76dd6 741 case CONNECTION_STATE_START:
XinZhangMS 0:f7f1f0d76dd6 742 /* Codes_S_R_S_CONNECTION_01_086: [Prior to sending any frames on a connection_instance, each peer MUST start by sending a protocol header that indicates the protocol version used on the connection_instance.] */
XinZhangMS 0:f7f1f0d76dd6 743 /* Codes_S_R_S_CONNECTION_01_091: [The AMQP peer which acted in the role of the TCP client (i.e. the peer that actively opened the connection_instance) MUST immediately send its outgoing protocol header on establishment of the TCP connection_instance.] */
XinZhangMS 0:f7f1f0d76dd6 744 if (send_header(connection) != 0)
XinZhangMS 0:f7f1f0d76dd6 745 {
XinZhangMS 0:f7f1f0d76dd6 746 LogError("Cannot send header");
XinZhangMS 0:f7f1f0d76dd6 747 }
XinZhangMS 0:f7f1f0d76dd6 748 break;
XinZhangMS 0:f7f1f0d76dd6 749
XinZhangMS 0:f7f1f0d76dd6 750 case CONNECTION_STATE_HDR_SENT:
XinZhangMS 0:f7f1f0d76dd6 751 case CONNECTION_STATE_OPEN_SENT:
XinZhangMS 0:f7f1f0d76dd6 752 case CONNECTION_STATE_OPENED:
XinZhangMS 0:f7f1f0d76dd6 753 break;
XinZhangMS 0:f7f1f0d76dd6 754
XinZhangMS 0:f7f1f0d76dd6 755 case CONNECTION_STATE_HDR_EXCH:
XinZhangMS 0:f7f1f0d76dd6 756 /* Codes_S_R_S_CONNECTION_01_002: [Each AMQP connection_instance begins with an exchange of capabilities and limitations, including the maximum frame size.] */
XinZhangMS 0:f7f1f0d76dd6 757 /* Codes_S_R_S_CONNECTION_01_004: [After establishing or accepting a TCP connection_instance and sending the protocol header, each peer MUST send an open frame before sending any other frames.] */
XinZhangMS 0:f7f1f0d76dd6 758 /* Codes_S_R_S_CONNECTION_01_005: [The open frame describes the capabilities and limits of that peer.] */
XinZhangMS 0:f7f1f0d76dd6 759 if (send_open_frame(connection) != 0)
XinZhangMS 0:f7f1f0d76dd6 760 {
XinZhangMS 0:f7f1f0d76dd6 761 LogError("Cannot send OPEN frame");
XinZhangMS 0:f7f1f0d76dd6 762 connection_set_state(connection, CONNECTION_STATE_END);
XinZhangMS 0:f7f1f0d76dd6 763 }
XinZhangMS 0:f7f1f0d76dd6 764 break;
XinZhangMS 0:f7f1f0d76dd6 765
XinZhangMS 0:f7f1f0d76dd6 766 case CONNECTION_STATE_OPEN_RCVD:
XinZhangMS 0:f7f1f0d76dd6 767 break;
XinZhangMS 0:f7f1f0d76dd6 768 }
XinZhangMS 0:f7f1f0d76dd6 769 }
XinZhangMS 0:f7f1f0d76dd6 770 else
XinZhangMS 0:f7f1f0d76dd6 771 {
XinZhangMS 0:f7f1f0d76dd6 772 connection_set_state(connection, CONNECTION_STATE_END);
XinZhangMS 0:f7f1f0d76dd6 773 }
XinZhangMS 0:f7f1f0d76dd6 774 }
XinZhangMS 0:f7f1f0d76dd6 775
XinZhangMS 0:f7f1f0d76dd6 776 static void connection_on_io_error(void* context)
XinZhangMS 0:f7f1f0d76dd6 777 {
XinZhangMS 0:f7f1f0d76dd6 778 CONNECTION_HANDLE connection = (CONNECTION_HANDLE)context;
XinZhangMS 0:f7f1f0d76dd6 779
XinZhangMS 0:f7f1f0d76dd6 780 /* Codes_S_R_S_CONNECTION_22_005: [If the io notifies the connection instance of an IO_STATE_ERROR state and an io error callback is registered, the connection shall call the registered callback.] */
XinZhangMS 0:f7f1f0d76dd6 781 if (connection->on_io_error)
XinZhangMS 0:f7f1f0d76dd6 782 {
XinZhangMS 0:f7f1f0d76dd6 783 connection->on_io_error(connection->on_io_error_callback_context);
XinZhangMS 0:f7f1f0d76dd6 784 }
XinZhangMS 0:f7f1f0d76dd6 785
XinZhangMS 0:f7f1f0d76dd6 786 if (connection->connection_state != CONNECTION_STATE_END)
XinZhangMS 0:f7f1f0d76dd6 787 {
XinZhangMS 0:f7f1f0d76dd6 788 /* Codes_S_R_S_CONNECTION_01_202: [If the io notifies the connection instance of an IO_STATE_ERROR state the connection shall be closed and the state set to END.] */
XinZhangMS 0:f7f1f0d76dd6 789 connection_set_state(connection, CONNECTION_STATE_ERROR);
XinZhangMS 0:f7f1f0d76dd6 790 if (xio_close(connection->io, NULL, NULL) != 0)
XinZhangMS 0:f7f1f0d76dd6 791 {
XinZhangMS 0:f7f1f0d76dd6 792 LogError("xio_close failed");
XinZhangMS 0:f7f1f0d76dd6 793 }
XinZhangMS 0:f7f1f0d76dd6 794 }
XinZhangMS 0:f7f1f0d76dd6 795 }
XinZhangMS 0:f7f1f0d76dd6 796
XinZhangMS 0:f7f1f0d76dd6 797 static void on_empty_amqp_frame_received(void* context, uint16_t channel)
XinZhangMS 0:f7f1f0d76dd6 798 {
XinZhangMS 0:f7f1f0d76dd6 799 CONNECTION_HANDLE connection = (CONNECTION_HANDLE)context;
XinZhangMS 0:f7f1f0d76dd6 800 /* It does not matter on which channel we received the frame */
XinZhangMS 0:f7f1f0d76dd6 801 (void)channel;
XinZhangMS 0:f7f1f0d76dd6 802
XinZhangMS 0:f7f1f0d76dd6 803 if (connection->is_trace_on == 1)
XinZhangMS 0:f7f1f0d76dd6 804 {
XinZhangMS 0:f7f1f0d76dd6 805 LOG(AZ_LOG_TRACE, LOG_LINE, "<- Empty frame");
XinZhangMS 0:f7f1f0d76dd6 806 }
XinZhangMS 0:f7f1f0d76dd6 807 if (tickcounter_get_current_ms(connection->tick_counter, &connection->last_frame_received_time) != 0)
XinZhangMS 0:f7f1f0d76dd6 808 {
XinZhangMS 0:f7f1f0d76dd6 809 LogError("Cannot get tickcounter value");
XinZhangMS 0:f7f1f0d76dd6 810 }
XinZhangMS 0:f7f1f0d76dd6 811 }
XinZhangMS 0:f7f1f0d76dd6 812
XinZhangMS 0:f7f1f0d76dd6 813 static void on_amqp_frame_received(void* context, uint16_t channel, AMQP_VALUE performative, const unsigned char* payload_bytes, uint32_t payload_size)
XinZhangMS 0:f7f1f0d76dd6 814 {
XinZhangMS 0:f7f1f0d76dd6 815 CONNECTION_HANDLE connection = (CONNECTION_HANDLE)context;
XinZhangMS 0:f7f1f0d76dd6 816
XinZhangMS 0:f7f1f0d76dd6 817 (void)channel;
XinZhangMS 0:f7f1f0d76dd6 818
XinZhangMS 0:f7f1f0d76dd6 819 if (tickcounter_get_current_ms(connection->tick_counter, &connection->last_frame_received_time) != 0)
XinZhangMS 0:f7f1f0d76dd6 820 {
XinZhangMS 0:f7f1f0d76dd6 821 LogError("Cannot get tickcounter value");
XinZhangMS 0:f7f1f0d76dd6 822 close_connection_with_error(connection, "amqp:internal-error", "cannot get current tick count", NULL);
XinZhangMS 0:f7f1f0d76dd6 823 }
XinZhangMS 0:f7f1f0d76dd6 824 else
XinZhangMS 0:f7f1f0d76dd6 825 {
XinZhangMS 0:f7f1f0d76dd6 826 if (connection->is_underlying_io_open)
XinZhangMS 0:f7f1f0d76dd6 827 {
XinZhangMS 0:f7f1f0d76dd6 828 switch (connection->connection_state)
XinZhangMS 0:f7f1f0d76dd6 829 {
XinZhangMS 0:f7f1f0d76dd6 830 default:
XinZhangMS 0:f7f1f0d76dd6 831 if (performative == NULL)
XinZhangMS 0:f7f1f0d76dd6 832 {
XinZhangMS 0:f7f1f0d76dd6 833 /* Codes_S_R_S_CONNECTION_01_223: [If the on_endpoint_frame_received is called with a NULL performative then the connection shall be closed with the error condition amqp:internal-error and an implementation defined error description.] */
XinZhangMS 0:f7f1f0d76dd6 834 close_connection_with_error(connection, "amqp:internal-error", "connection_endpoint_frame_received::NULL performative", NULL);
XinZhangMS 0:f7f1f0d76dd6 835 LogError("connection_endpoint_frame_received::NULL performative");
XinZhangMS 0:f7f1f0d76dd6 836 }
XinZhangMS 0:f7f1f0d76dd6 837 else
XinZhangMS 0:f7f1f0d76dd6 838 {
XinZhangMS 0:f7f1f0d76dd6 839 AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(performative);
XinZhangMS 0:f7f1f0d76dd6 840
XinZhangMS 0:f7f1f0d76dd6 841 if (connection->is_trace_on == 1)
XinZhangMS 0:f7f1f0d76dd6 842 {
XinZhangMS 0:f7f1f0d76dd6 843 log_incoming_frame(performative);
XinZhangMS 0:f7f1f0d76dd6 844 }
XinZhangMS 0:f7f1f0d76dd6 845
XinZhangMS 0:f7f1f0d76dd6 846 if (is_open_type_by_descriptor(descriptor))
XinZhangMS 0:f7f1f0d76dd6 847 {
XinZhangMS 0:f7f1f0d76dd6 848 if (channel != 0)
XinZhangMS 0:f7f1f0d76dd6 849 {
XinZhangMS 0:f7f1f0d76dd6 850 /* Codes_S_R_S_CONNECTION_01_006: [The open frame can only be sent on channel 0.] */
XinZhangMS 0:f7f1f0d76dd6 851 /* Codes_S_R_S_CONNECTION_01_222: [If an Open frame is received in a manner violating the ISO specification, the connection shall be closed with condition amqp:not-allowed and description being an implementation defined string.] */
XinZhangMS 0:f7f1f0d76dd6 852 close_connection_with_error(connection, "amqp:not-allowed", "OPEN frame received on a channel that is not 0", NULL);
XinZhangMS 0:f7f1f0d76dd6 853 LogError("OPEN frame received on a channel that is not 0");
XinZhangMS 0:f7f1f0d76dd6 854 }
XinZhangMS 0:f7f1f0d76dd6 855
XinZhangMS 0:f7f1f0d76dd6 856 if (connection->connection_state == CONNECTION_STATE_OPENED)
XinZhangMS 0:f7f1f0d76dd6 857 {
XinZhangMS 0:f7f1f0d76dd6 858 /* Codes_S_R_S_CONNECTION_01_239: [If an Open frame is received in the Opened state the connection shall be closed with condition amqp:illegal-state and description being an implementation defined string.] */
XinZhangMS 0:f7f1f0d76dd6 859 close_connection_with_error(connection, "amqp:illegal-state", "OPEN frame received in the OPENED state", NULL);
XinZhangMS 0:f7f1f0d76dd6 860 LogError("OPEN frame received in the OPENED state");
XinZhangMS 0:f7f1f0d76dd6 861 }
XinZhangMS 0:f7f1f0d76dd6 862 else if ((connection->connection_state == CONNECTION_STATE_OPEN_SENT) ||
XinZhangMS 0:f7f1f0d76dd6 863 (connection->connection_state == CONNECTION_STATE_HDR_EXCH))
XinZhangMS 0:f7f1f0d76dd6 864 {
XinZhangMS 0:f7f1f0d76dd6 865 OPEN_HANDLE open_handle;
XinZhangMS 0:f7f1f0d76dd6 866 if (amqpvalue_get_open(performative, &open_handle) != 0)
XinZhangMS 0:f7f1f0d76dd6 867 {
XinZhangMS 0:f7f1f0d76dd6 868 /* Codes_S_R_S_CONNECTION_01_143: [If any of the values in the received open frame are invalid then the connection shall be closed.] */
XinZhangMS 0:f7f1f0d76dd6 869 /* Codes_S_R_S_CONNECTION_01_220: [The error amqp:invalid-field shall be set in the error.condition field of the CLOSE frame.] */
XinZhangMS 0:f7f1f0d76dd6 870 close_connection_with_error(connection, "amqp:invalid-field", "connection_endpoint_frame_received::failed parsing OPEN frame", NULL);
XinZhangMS 0:f7f1f0d76dd6 871 LogError("connection_endpoint_frame_received::failed parsing OPEN frame");
XinZhangMS 0:f7f1f0d76dd6 872 }
XinZhangMS 0:f7f1f0d76dd6 873 else
XinZhangMS 0:f7f1f0d76dd6 874 {
XinZhangMS 0:f7f1f0d76dd6 875 if (open_get_idle_time_out(open_handle, &connection->remote_idle_timeout) == 0)
XinZhangMS 0:f7f1f0d76dd6 876 {
XinZhangMS 0:f7f1f0d76dd6 877 /* since we obtained the remote_idle_timeout, compute at what millisecond we should send the empty frame */
XinZhangMS 0:f7f1f0d76dd6 878 connection->remote_idle_timeout_send_frame_millisecond = (milliseconds)(connection->idle_timeout_empty_frame_send_ratio * connection->remote_idle_timeout);
XinZhangMS 0:f7f1f0d76dd6 879 }
XinZhangMS 0:f7f1f0d76dd6 880
XinZhangMS 0:f7f1f0d76dd6 881 if ((open_get_max_frame_size(open_handle, &connection->remote_max_frame_size) != 0) ||
XinZhangMS 0:f7f1f0d76dd6 882 /* Codes_S_R_S_CONNECTION_01_167: [Both peers MUST accept frames of up to 512 (MIN-MAX-FRAME-SIZE) octets.] */
XinZhangMS 0:f7f1f0d76dd6 883 (connection->remote_max_frame_size < 512))
XinZhangMS 0:f7f1f0d76dd6 884 {
XinZhangMS 0:f7f1f0d76dd6 885 /* Codes_S_R_S_CONNECTION_01_143: [If any of the values in the received open frame are invalid then the connection shall be closed.] */
XinZhangMS 0:f7f1f0d76dd6 886 /* Codes_S_R_S_CONNECTION_01_220: [The error amqp:invalid-field shall be set in the error.condition field of the CLOSE frame.] */
XinZhangMS 0:f7f1f0d76dd6 887 close_connection_with_error(connection, "amqp:invalid-field", "connection_endpoint_frame_received::failed parsing OPEN frame", NULL);
XinZhangMS 0:f7f1f0d76dd6 888 LogError("connection_endpoint_frame_received::failed parsing OPEN frame");
XinZhangMS 0:f7f1f0d76dd6 889 }
XinZhangMS 0:f7f1f0d76dd6 890 else
XinZhangMS 0:f7f1f0d76dd6 891 {
XinZhangMS 0:f7f1f0d76dd6 892 if (connection->connection_state == CONNECTION_STATE_OPEN_SENT)
XinZhangMS 0:f7f1f0d76dd6 893 {
XinZhangMS 0:f7f1f0d76dd6 894 connection_set_state(connection, CONNECTION_STATE_OPENED);
XinZhangMS 0:f7f1f0d76dd6 895 }
XinZhangMS 0:f7f1f0d76dd6 896 else
XinZhangMS 0:f7f1f0d76dd6 897 {
XinZhangMS 0:f7f1f0d76dd6 898 if (send_open_frame(connection) != 0)
XinZhangMS 0:f7f1f0d76dd6 899 {
XinZhangMS 0:f7f1f0d76dd6 900 connection_set_state(connection, CONNECTION_STATE_END);
XinZhangMS 0:f7f1f0d76dd6 901 }
XinZhangMS 0:f7f1f0d76dd6 902 else
XinZhangMS 0:f7f1f0d76dd6 903 {
XinZhangMS 0:f7f1f0d76dd6 904 connection_set_state(connection, CONNECTION_STATE_OPENED);
XinZhangMS 0:f7f1f0d76dd6 905 }
XinZhangMS 0:f7f1f0d76dd6 906 }
XinZhangMS 0:f7f1f0d76dd6 907 }
XinZhangMS 0:f7f1f0d76dd6 908
XinZhangMS 0:f7f1f0d76dd6 909 open_destroy(open_handle);
XinZhangMS 0:f7f1f0d76dd6 910 }
XinZhangMS 0:f7f1f0d76dd6 911 }
XinZhangMS 0:f7f1f0d76dd6 912 else
XinZhangMS 0:f7f1f0d76dd6 913 {
XinZhangMS 0:f7f1f0d76dd6 914 /* do nothing for now ... */
XinZhangMS 0:f7f1f0d76dd6 915 }
XinZhangMS 0:f7f1f0d76dd6 916 }
XinZhangMS 0:f7f1f0d76dd6 917 else if (is_close_type_by_descriptor(descriptor))
XinZhangMS 0:f7f1f0d76dd6 918 {
XinZhangMS 0:f7f1f0d76dd6 919 /* Codes_S_R_S_CONNECTION_01_242: [The connection module shall accept CLOSE frames even if they have extra payload bytes besides the Close performative.] */
XinZhangMS 0:f7f1f0d76dd6 920
XinZhangMS 0:f7f1f0d76dd6 921 /* Codes_S_R_S_CONNECTION_01_225: [HDR_RCVD HDR OPEN] */
XinZhangMS 0:f7f1f0d76dd6 922 if ((connection->connection_state == CONNECTION_STATE_HDR_RCVD) ||
XinZhangMS 0:f7f1f0d76dd6 923 /* Codes_S_R_S_CONNECTION_01_227: [HDR_EXCH OPEN OPEN] */
XinZhangMS 0:f7f1f0d76dd6 924 (connection->connection_state == CONNECTION_STATE_HDR_EXCH) ||
XinZhangMS 0:f7f1f0d76dd6 925 /* Codes_S_R_S_CONNECTION_01_228: [OPEN_RCVD OPEN *] */
XinZhangMS 0:f7f1f0d76dd6 926 (connection->connection_state == CONNECTION_STATE_OPEN_RCVD) ||
XinZhangMS 0:f7f1f0d76dd6 927 /* Codes_S_R_S_CONNECTION_01_235: [CLOSE_SENT - * TCP Close for Write] */
XinZhangMS 0:f7f1f0d76dd6 928 (connection->connection_state == CONNECTION_STATE_CLOSE_SENT) ||
XinZhangMS 0:f7f1f0d76dd6 929 /* Codes_S_R_S_CONNECTION_01_236: [DISCARDING - * TCP Close for Write] */
XinZhangMS 0:f7f1f0d76dd6 930 (connection->connection_state == CONNECTION_STATE_DISCARDING))
XinZhangMS 0:f7f1f0d76dd6 931 {
XinZhangMS 0:f7f1f0d76dd6 932 if (xio_close(connection->io, NULL, NULL) != 0)
XinZhangMS 0:f7f1f0d76dd6 933 {
XinZhangMS 0:f7f1f0d76dd6 934 LogError("xio_close failed");
XinZhangMS 0:f7f1f0d76dd6 935 }
XinZhangMS 0:f7f1f0d76dd6 936 }
XinZhangMS 0:f7f1f0d76dd6 937 else
XinZhangMS 0:f7f1f0d76dd6 938 {
XinZhangMS 0:f7f1f0d76dd6 939 CLOSE_HANDLE close_handle;
XinZhangMS 0:f7f1f0d76dd6 940
XinZhangMS 0:f7f1f0d76dd6 941 /* Codes_S_R_S_CONNECTION_01_012: [A close frame MAY be received on any channel up to the maximum channel number negotiated in open.] */
XinZhangMS 0:f7f1f0d76dd6 942 if (channel > connection->channel_max)
XinZhangMS 0:f7f1f0d76dd6 943 {
XinZhangMS 0:f7f1f0d76dd6 944 close_connection_with_error(connection, "amqp:invalid-field", "connection_endpoint_frame_received::failed parsing CLOSE frame", NULL);
XinZhangMS 0:f7f1f0d76dd6 945 LogError("connection_endpoint_frame_received::failed parsing CLOSE frame");
XinZhangMS 0:f7f1f0d76dd6 946 }
XinZhangMS 0:f7f1f0d76dd6 947 else
XinZhangMS 0:f7f1f0d76dd6 948 {
XinZhangMS 0:f7f1f0d76dd6 949 if (amqpvalue_get_close(performative, &close_handle) != 0)
XinZhangMS 0:f7f1f0d76dd6 950 {
XinZhangMS 0:f7f1f0d76dd6 951 close_connection_with_error(connection, "amqp:invalid-field", "connection_endpoint_frame_received::failed parsing CLOSE frame", NULL);
XinZhangMS 0:f7f1f0d76dd6 952 LogError("connection_endpoint_frame_received::failed parsing CLOSE frame");
XinZhangMS 0:f7f1f0d76dd6 953 }
XinZhangMS 0:f7f1f0d76dd6 954 else
XinZhangMS 0:f7f1f0d76dd6 955 {
XinZhangMS 0:f7f1f0d76dd6 956 ERROR_HANDLE error;
XinZhangMS 0:f7f1f0d76dd6 957
XinZhangMS 0:f7f1f0d76dd6 958 if (close_get_error(close_handle, &error) != 0)
XinZhangMS 0:f7f1f0d76dd6 959 {
XinZhangMS 0:f7f1f0d76dd6 960 error = NULL;
XinZhangMS 0:f7f1f0d76dd6 961 }
XinZhangMS 0:f7f1f0d76dd6 962
XinZhangMS 0:f7f1f0d76dd6 963 close_destroy(close_handle);
XinZhangMS 0:f7f1f0d76dd6 964
XinZhangMS 0:f7f1f0d76dd6 965 connection_set_state(connection, CONNECTION_STATE_CLOSE_RCVD);
XinZhangMS 0:f7f1f0d76dd6 966
XinZhangMS 0:f7f1f0d76dd6 967 if (send_close_frame(connection, NULL) != 0)
XinZhangMS 0:f7f1f0d76dd6 968 {
XinZhangMS 0:f7f1f0d76dd6 969 LogError("Cannot send CLOSE frame");
XinZhangMS 0:f7f1f0d76dd6 970 }
XinZhangMS 0:f7f1f0d76dd6 971
XinZhangMS 0:f7f1f0d76dd6 972 /* Codes_S_R_S_CONNECTION_01_214: [If the close frame cannot be constructed or sent, the connection shall be closed and set to the END state.] */
XinZhangMS 0:f7f1f0d76dd6 973 if (xio_close(connection->io, NULL, NULL) != 0)
XinZhangMS 0:f7f1f0d76dd6 974 {
XinZhangMS 0:f7f1f0d76dd6 975 LogError("xio_close failed");
XinZhangMS 0:f7f1f0d76dd6 976 }
XinZhangMS 0:f7f1f0d76dd6 977
XinZhangMS 0:f7f1f0d76dd6 978 connection_set_state(connection, CONNECTION_STATE_END);
XinZhangMS 0:f7f1f0d76dd6 979
XinZhangMS 0:f7f1f0d76dd6 980 if (connection->on_connection_close_received_event_subscription.on_connection_close_received != NULL)
XinZhangMS 0:f7f1f0d76dd6 981 {
XinZhangMS 0:f7f1f0d76dd6 982 connection->on_connection_close_received_event_subscription.on_connection_close_received(connection->on_connection_close_received_event_subscription.context, error);
XinZhangMS 0:f7f1f0d76dd6 983 }
XinZhangMS 0:f7f1f0d76dd6 984
XinZhangMS 0:f7f1f0d76dd6 985 error_destroy(error);
XinZhangMS 0:f7f1f0d76dd6 986 }
XinZhangMS 0:f7f1f0d76dd6 987 }
XinZhangMS 0:f7f1f0d76dd6 988 }
XinZhangMS 0:f7f1f0d76dd6 989 }
XinZhangMS 0:f7f1f0d76dd6 990 else
XinZhangMS 0:f7f1f0d76dd6 991 {
XinZhangMS 0:f7f1f0d76dd6 992 uint64_t performative_ulong;
XinZhangMS 0:f7f1f0d76dd6 993
XinZhangMS 0:f7f1f0d76dd6 994 if (amqpvalue_get_ulong(descriptor, &performative_ulong) != 0)
XinZhangMS 0:f7f1f0d76dd6 995 {
XinZhangMS 0:f7f1f0d76dd6 996 LogError("Failed getting ulong amqp performative");
XinZhangMS 0:f7f1f0d76dd6 997 }
XinZhangMS 0:f7f1f0d76dd6 998 else
XinZhangMS 0:f7f1f0d76dd6 999 {
XinZhangMS 0:f7f1f0d76dd6 1000 switch (performative_ulong)
XinZhangMS 0:f7f1f0d76dd6 1001 {
XinZhangMS 0:f7f1f0d76dd6 1002 default:
XinZhangMS 0:f7f1f0d76dd6 1003 LogError("Bad performative: %02x", performative);
XinZhangMS 0:f7f1f0d76dd6 1004 break;
XinZhangMS 0:f7f1f0d76dd6 1005
XinZhangMS 0:f7f1f0d76dd6 1006 case AMQP_BEGIN:
XinZhangMS 0:f7f1f0d76dd6 1007 {
XinZhangMS 0:f7f1f0d76dd6 1008 BEGIN_HANDLE begin;
XinZhangMS 0:f7f1f0d76dd6 1009
XinZhangMS 0:f7f1f0d76dd6 1010 if (amqpvalue_get_begin(performative, &begin) != 0)
XinZhangMS 0:f7f1f0d76dd6 1011 {
XinZhangMS 0:f7f1f0d76dd6 1012 LogError("Cannot get begin performative");
XinZhangMS 0:f7f1f0d76dd6 1013 }
XinZhangMS 0:f7f1f0d76dd6 1014 else
XinZhangMS 0:f7f1f0d76dd6 1015 {
XinZhangMS 0:f7f1f0d76dd6 1016 uint16_t remote_channel;
XinZhangMS 0:f7f1f0d76dd6 1017 ENDPOINT_HANDLE new_endpoint = NULL;
XinZhangMS 0:f7f1f0d76dd6 1018 bool remote_begin = false;
XinZhangMS 0:f7f1f0d76dd6 1019
XinZhangMS 0:f7f1f0d76dd6 1020 if (begin_get_remote_channel(begin, &remote_channel) != 0)
XinZhangMS 0:f7f1f0d76dd6 1021 {
XinZhangMS 0:f7f1f0d76dd6 1022 remote_begin = true;
XinZhangMS 0:f7f1f0d76dd6 1023 if (connection->on_new_endpoint != NULL)
XinZhangMS 0:f7f1f0d76dd6 1024 {
XinZhangMS 0:f7f1f0d76dd6 1025 new_endpoint = connection_create_endpoint(connection);
XinZhangMS 0:f7f1f0d76dd6 1026 if (!connection->on_new_endpoint(connection->on_new_endpoint_callback_context, new_endpoint))
XinZhangMS 0:f7f1f0d76dd6 1027 {
XinZhangMS 0:f7f1f0d76dd6 1028 connection_destroy_endpoint(new_endpoint);
XinZhangMS 0:f7f1f0d76dd6 1029 new_endpoint = NULL;
XinZhangMS 0:f7f1f0d76dd6 1030 }
XinZhangMS 0:f7f1f0d76dd6 1031 }
XinZhangMS 0:f7f1f0d76dd6 1032 }
XinZhangMS 0:f7f1f0d76dd6 1033
XinZhangMS 0:f7f1f0d76dd6 1034 if (!remote_begin)
XinZhangMS 0:f7f1f0d76dd6 1035 {
XinZhangMS 0:f7f1f0d76dd6 1036 ENDPOINT_INSTANCE* session_endpoint = find_session_endpoint_by_outgoing_channel(connection, remote_channel);
XinZhangMS 0:f7f1f0d76dd6 1037 if (session_endpoint == NULL)
XinZhangMS 0:f7f1f0d76dd6 1038 {
XinZhangMS 0:f7f1f0d76dd6 1039 LogError("Cannot create session endpoint");
XinZhangMS 0:f7f1f0d76dd6 1040 }
XinZhangMS 0:f7f1f0d76dd6 1041 else
XinZhangMS 0:f7f1f0d76dd6 1042 {
XinZhangMS 0:f7f1f0d76dd6 1043 session_endpoint->incoming_channel = channel;
XinZhangMS 0:f7f1f0d76dd6 1044 session_endpoint->on_endpoint_frame_received(session_endpoint->callback_context, performative, payload_size, payload_bytes);
XinZhangMS 0:f7f1f0d76dd6 1045 }
XinZhangMS 0:f7f1f0d76dd6 1046 }
XinZhangMS 0:f7f1f0d76dd6 1047 else
XinZhangMS 0:f7f1f0d76dd6 1048 {
XinZhangMS 0:f7f1f0d76dd6 1049 if (new_endpoint != NULL)
XinZhangMS 0:f7f1f0d76dd6 1050 {
XinZhangMS 0:f7f1f0d76dd6 1051 new_endpoint->incoming_channel = channel;
XinZhangMS 0:f7f1f0d76dd6 1052 new_endpoint->on_endpoint_frame_received(new_endpoint->callback_context, performative, payload_size, payload_bytes);
XinZhangMS 0:f7f1f0d76dd6 1053 }
XinZhangMS 0:f7f1f0d76dd6 1054 }
XinZhangMS 0:f7f1f0d76dd6 1055
XinZhangMS 0:f7f1f0d76dd6 1056 begin_destroy(begin);
XinZhangMS 0:f7f1f0d76dd6 1057 }
XinZhangMS 0:f7f1f0d76dd6 1058
XinZhangMS 0:f7f1f0d76dd6 1059 break;
XinZhangMS 0:f7f1f0d76dd6 1060 }
XinZhangMS 0:f7f1f0d76dd6 1061
XinZhangMS 0:f7f1f0d76dd6 1062 case AMQP_FLOW:
XinZhangMS 0:f7f1f0d76dd6 1063 case AMQP_TRANSFER:
XinZhangMS 0:f7f1f0d76dd6 1064 case AMQP_DISPOSITION:
XinZhangMS 0:f7f1f0d76dd6 1065 case AMQP_END:
XinZhangMS 0:f7f1f0d76dd6 1066 case AMQP_ATTACH:
XinZhangMS 0:f7f1f0d76dd6 1067 case AMQP_DETACH:
XinZhangMS 0:f7f1f0d76dd6 1068 {
XinZhangMS 0:f7f1f0d76dd6 1069 ENDPOINT_INSTANCE* session_endpoint = find_session_endpoint_by_incoming_channel(connection, channel);
XinZhangMS 0:f7f1f0d76dd6 1070 if (session_endpoint == NULL)
XinZhangMS 0:f7f1f0d76dd6 1071 {
XinZhangMS 0:f7f1f0d76dd6 1072 LogError("Cannot find session endpoint for channel %u", (unsigned int)channel);
XinZhangMS 0:f7f1f0d76dd6 1073 }
XinZhangMS 0:f7f1f0d76dd6 1074 else
XinZhangMS 0:f7f1f0d76dd6 1075 {
XinZhangMS 0:f7f1f0d76dd6 1076 session_endpoint->on_endpoint_frame_received(session_endpoint->callback_context, performative, payload_size, payload_bytes);
XinZhangMS 0:f7f1f0d76dd6 1077 }
XinZhangMS 0:f7f1f0d76dd6 1078
XinZhangMS 0:f7f1f0d76dd6 1079 break;
XinZhangMS 0:f7f1f0d76dd6 1080 }
XinZhangMS 0:f7f1f0d76dd6 1081 }
XinZhangMS 0:f7f1f0d76dd6 1082 }
XinZhangMS 0:f7f1f0d76dd6 1083 }
XinZhangMS 0:f7f1f0d76dd6 1084 }
XinZhangMS 0:f7f1f0d76dd6 1085 break;
XinZhangMS 0:f7f1f0d76dd6 1086
XinZhangMS 0:f7f1f0d76dd6 1087 case CONNECTION_STATE_START:
XinZhangMS 0:f7f1f0d76dd6 1088 /* Codes_S_R_S_CONNECTION_01_224: [START HDR HDR] */
XinZhangMS 0:f7f1f0d76dd6 1089 case CONNECTION_STATE_HDR_SENT:
XinZhangMS 0:f7f1f0d76dd6 1090 /* Codes_S_R_S_CONNECTION_01_226: [HDR_SENT OPEN HDR] */
XinZhangMS 0:f7f1f0d76dd6 1091 case CONNECTION_STATE_OPEN_PIPE:
XinZhangMS 0:f7f1f0d76dd6 1092 /* Codes_S_R_S_CONNECTION_01_230: [OPEN_PIPE ** HDR] */
XinZhangMS 0:f7f1f0d76dd6 1093 case CONNECTION_STATE_OC_PIPE:
XinZhangMS 0:f7f1f0d76dd6 1094 /* Codes_S_R_S_CONNECTION_01_232: [OC_PIPE - HDR TCP Close for Write] */
XinZhangMS 0:f7f1f0d76dd6 1095 case CONNECTION_STATE_CLOSE_RCVD:
XinZhangMS 0:f7f1f0d76dd6 1096 /* Codes_S_R_S_CONNECTION_01_234: [CLOSE_RCVD * - TCP Close for Read] */
XinZhangMS 0:f7f1f0d76dd6 1097 case CONNECTION_STATE_END:
XinZhangMS 0:f7f1f0d76dd6 1098 /* Codes_S_R_S_CONNECTION_01_237: [END - - TCP Close] */
XinZhangMS 0:f7f1f0d76dd6 1099 if (xio_close(connection->io, NULL, NULL) != 0)
XinZhangMS 0:f7f1f0d76dd6 1100 {
XinZhangMS 0:f7f1f0d76dd6 1101 LogError("xio_close failed");
XinZhangMS 0:f7f1f0d76dd6 1102 }
XinZhangMS 0:f7f1f0d76dd6 1103 break;
XinZhangMS 0:f7f1f0d76dd6 1104 }
XinZhangMS 0:f7f1f0d76dd6 1105 }
XinZhangMS 0:f7f1f0d76dd6 1106 }
XinZhangMS 0:f7f1f0d76dd6 1107 }
XinZhangMS 0:f7f1f0d76dd6 1108
XinZhangMS 0:f7f1f0d76dd6 1109 static void frame_codec_error(void* context)
XinZhangMS 0:f7f1f0d76dd6 1110 {
XinZhangMS 0:f7f1f0d76dd6 1111 /* Bug: some error handling should happen here
XinZhangMS 0:f7f1f0d76dd6 1112 Filed: uAMQP: frame_codec error and amqp_frame_codec_error should handle the errors */
XinZhangMS 0:f7f1f0d76dd6 1113 LogError("A frame_codec_error occured");
XinZhangMS 0:f7f1f0d76dd6 1114 (void)context;
XinZhangMS 0:f7f1f0d76dd6 1115 }
XinZhangMS 0:f7f1f0d76dd6 1116
XinZhangMS 0:f7f1f0d76dd6 1117 static void amqp_frame_codec_error(void* context)
XinZhangMS 0:f7f1f0d76dd6 1118 {
XinZhangMS 0:f7f1f0d76dd6 1119 /* Bug: some error handling should happen here
XinZhangMS 0:f7f1f0d76dd6 1120 Filed: uAMQP: frame_codec error and amqp_frame_codec_error should handle the errors */
XinZhangMS 0:f7f1f0d76dd6 1121 LogError("An amqp_frame_codec_error occured");
XinZhangMS 0:f7f1f0d76dd6 1122 (void)context;
XinZhangMS 0:f7f1f0d76dd6 1123 }
XinZhangMS 0:f7f1f0d76dd6 1124
XinZhangMS 0:f7f1f0d76dd6 1125 /* Codes_S_R_S_CONNECTION_01_001: [connection_create shall open a new connection to a specified host/port.] */
XinZhangMS 0:f7f1f0d76dd6 1126 CONNECTION_HANDLE connection_create(XIO_HANDLE xio, const char* hostname, const char* container_id, ON_NEW_ENDPOINT on_new_endpoint, void* callback_context)
XinZhangMS 0:f7f1f0d76dd6 1127 {
XinZhangMS 0:f7f1f0d76dd6 1128 return connection_create2(xio, hostname, container_id, on_new_endpoint, callback_context, NULL, NULL, NULL, NULL);
XinZhangMS 0:f7f1f0d76dd6 1129 }
XinZhangMS 0:f7f1f0d76dd6 1130
XinZhangMS 0:f7f1f0d76dd6 1131 /* Codes_S_R_S_CONNECTION_01_001: [connection_create shall open a new connection to a specified host/port.] */
XinZhangMS 0:f7f1f0d76dd6 1132 /* Codes_S_R_S_CONNECTION_22_002: [connection_create shall allow registering connections state and io error callbacks.] */
XinZhangMS 0:f7f1f0d76dd6 1133 CONNECTION_HANDLE connection_create2(XIO_HANDLE xio, const char* hostname, const char* container_id, ON_NEW_ENDPOINT on_new_endpoint, void* callback_context, ON_CONNECTION_STATE_CHANGED on_connection_state_changed, void* on_connection_state_changed_context, ON_IO_ERROR on_io_error, void* on_io_error_context)
XinZhangMS 0:f7f1f0d76dd6 1134 {
XinZhangMS 0:f7f1f0d76dd6 1135 CONNECTION_HANDLE connection;
XinZhangMS 0:f7f1f0d76dd6 1136
XinZhangMS 0:f7f1f0d76dd6 1137 if ((xio == NULL) ||
XinZhangMS 0:f7f1f0d76dd6 1138 (container_id == NULL))
XinZhangMS 0:f7f1f0d76dd6 1139 {
XinZhangMS 0:f7f1f0d76dd6 1140 /* Codes_S_R_S_CONNECTION_01_071: [If xio or container_id is NULL, connection_create shall return NULL.] */
XinZhangMS 0:f7f1f0d76dd6 1141 LogError("Bad arguments: xio = %p, container_id = %p",
XinZhangMS 0:f7f1f0d76dd6 1142 xio, container_id);
XinZhangMS 0:f7f1f0d76dd6 1143 connection = NULL;
XinZhangMS 0:f7f1f0d76dd6 1144 }
XinZhangMS 0:f7f1f0d76dd6 1145 else
XinZhangMS 0:f7f1f0d76dd6 1146 {
XinZhangMS 0:f7f1f0d76dd6 1147 connection = (CONNECTION_HANDLE)malloc(sizeof(CONNECTION_INSTANCE));
XinZhangMS 0:f7f1f0d76dd6 1148 /* Codes_S_R_S_CONNECTION_01_081: [If allocating the memory for the connection fails then connection_create shall return NULL.] */
XinZhangMS 0:f7f1f0d76dd6 1149 if (connection == NULL)
XinZhangMS 0:f7f1f0d76dd6 1150 {
XinZhangMS 0:f7f1f0d76dd6 1151 LogError("Cannot allocate memory for connection");
XinZhangMS 0:f7f1f0d76dd6 1152 }
XinZhangMS 0:f7f1f0d76dd6 1153 else
XinZhangMS 0:f7f1f0d76dd6 1154 {
XinZhangMS 0:f7f1f0d76dd6 1155 connection->io = xio;
XinZhangMS 0:f7f1f0d76dd6 1156
XinZhangMS 0:f7f1f0d76dd6 1157 /* Codes_S_R_S_CONNECTION_01_082: [connection_create shall allocate a new frame_codec instance to be used for frame encoding/decoding.] */
XinZhangMS 0:f7f1f0d76dd6 1158 connection->frame_codec = frame_codec_create(frame_codec_error, connection);
XinZhangMS 0:f7f1f0d76dd6 1159 if (connection->frame_codec == NULL)
XinZhangMS 0:f7f1f0d76dd6 1160 {
XinZhangMS 0:f7f1f0d76dd6 1161 /* Codes_S_R_S_CONNECTION_01_083: [If frame_codec_create fails then connection_create shall return NULL.] */
XinZhangMS 0:f7f1f0d76dd6 1162 LogError("Cannot create frame_codec");
XinZhangMS 0:f7f1f0d76dd6 1163 free(connection);
XinZhangMS 0:f7f1f0d76dd6 1164 connection = NULL;
XinZhangMS 0:f7f1f0d76dd6 1165 }
XinZhangMS 0:f7f1f0d76dd6 1166 else
XinZhangMS 0:f7f1f0d76dd6 1167 {
XinZhangMS 0:f7f1f0d76dd6 1168 connection->amqp_frame_codec = amqp_frame_codec_create(connection->frame_codec, on_amqp_frame_received, on_empty_amqp_frame_received, amqp_frame_codec_error, connection);
XinZhangMS 0:f7f1f0d76dd6 1169 if (connection->amqp_frame_codec == NULL)
XinZhangMS 0:f7f1f0d76dd6 1170 {
XinZhangMS 0:f7f1f0d76dd6 1171 /* Codes_S_R_S_CONNECTION_01_108: [If amqp_frame_codec_create fails, connection_create shall return NULL.] */
XinZhangMS 0:f7f1f0d76dd6 1172 LogError("Cannot create amqp_frame_codec");
XinZhangMS 0:f7f1f0d76dd6 1173 frame_codec_destroy(connection->frame_codec);
XinZhangMS 0:f7f1f0d76dd6 1174 free(connection);
XinZhangMS 0:f7f1f0d76dd6 1175 connection = NULL;
XinZhangMS 0:f7f1f0d76dd6 1176 }
XinZhangMS 0:f7f1f0d76dd6 1177 else
XinZhangMS 0:f7f1f0d76dd6 1178 {
XinZhangMS 0:f7f1f0d76dd6 1179 if (hostname != NULL)
XinZhangMS 0:f7f1f0d76dd6 1180 {
XinZhangMS 0:f7f1f0d76dd6 1181 size_t hostname_length = strlen(hostname);
XinZhangMS 0:f7f1f0d76dd6 1182 connection->host_name = (char*)malloc(hostname_length + 1);
XinZhangMS 0:f7f1f0d76dd6 1183 if (connection->host_name == NULL)
XinZhangMS 0:f7f1f0d76dd6 1184 {
XinZhangMS 0:f7f1f0d76dd6 1185 /* Codes_S_R_S_CONNECTION_01_081: [If allocating the memory for the connection fails then connection_create shall return NULL.] */
XinZhangMS 0:f7f1f0d76dd6 1186 LogError("Cannot allocate memory for host name");
XinZhangMS 0:f7f1f0d76dd6 1187 amqp_frame_codec_destroy(connection->amqp_frame_codec);
XinZhangMS 0:f7f1f0d76dd6 1188 frame_codec_destroy(connection->frame_codec);
XinZhangMS 0:f7f1f0d76dd6 1189 free(connection);
XinZhangMS 0:f7f1f0d76dd6 1190 connection = NULL;
XinZhangMS 0:f7f1f0d76dd6 1191 }
XinZhangMS 0:f7f1f0d76dd6 1192 else
XinZhangMS 0:f7f1f0d76dd6 1193 {
XinZhangMS 0:f7f1f0d76dd6 1194 (void)memcpy(connection->host_name, hostname, hostname_length + 1);
XinZhangMS 0:f7f1f0d76dd6 1195 }
XinZhangMS 0:f7f1f0d76dd6 1196 }
XinZhangMS 0:f7f1f0d76dd6 1197 else
XinZhangMS 0:f7f1f0d76dd6 1198 {
XinZhangMS 0:f7f1f0d76dd6 1199 connection->host_name = NULL;
XinZhangMS 0:f7f1f0d76dd6 1200 }
XinZhangMS 0:f7f1f0d76dd6 1201
XinZhangMS 0:f7f1f0d76dd6 1202 if (connection != NULL)
XinZhangMS 0:f7f1f0d76dd6 1203 {
XinZhangMS 0:f7f1f0d76dd6 1204 size_t container_id_length = strlen(container_id);
XinZhangMS 0:f7f1f0d76dd6 1205 connection->container_id = (char*)malloc(container_id_length + 1);
XinZhangMS 0:f7f1f0d76dd6 1206 if (connection->container_id == NULL)
XinZhangMS 0:f7f1f0d76dd6 1207 {
XinZhangMS 0:f7f1f0d76dd6 1208 /* Codes_S_R_S_CONNECTION_01_081: [If allocating the memory for the connection fails then connection_create shall return NULL.] */
XinZhangMS 0:f7f1f0d76dd6 1209 LogError("Cannot allocate memory for container_id");
XinZhangMS 0:f7f1f0d76dd6 1210 free(connection->host_name);
XinZhangMS 0:f7f1f0d76dd6 1211 amqp_frame_codec_destroy(connection->amqp_frame_codec);
XinZhangMS 0:f7f1f0d76dd6 1212 frame_codec_destroy(connection->frame_codec);
XinZhangMS 0:f7f1f0d76dd6 1213 free(connection);
XinZhangMS 0:f7f1f0d76dd6 1214 connection = NULL;
XinZhangMS 0:f7f1f0d76dd6 1215 }
XinZhangMS 0:f7f1f0d76dd6 1216 else
XinZhangMS 0:f7f1f0d76dd6 1217 {
XinZhangMS 0:f7f1f0d76dd6 1218 connection->tick_counter = tickcounter_create();
XinZhangMS 0:f7f1f0d76dd6 1219 if (connection->tick_counter == NULL)
XinZhangMS 0:f7f1f0d76dd6 1220 {
XinZhangMS 0:f7f1f0d76dd6 1221 LogError("Cannot create tick counter");
XinZhangMS 0:f7f1f0d76dd6 1222 free(connection->container_id);
XinZhangMS 0:f7f1f0d76dd6 1223 free(connection->host_name);
XinZhangMS 0:f7f1f0d76dd6 1224 amqp_frame_codec_destroy(connection->amqp_frame_codec);
XinZhangMS 0:f7f1f0d76dd6 1225 frame_codec_destroy(connection->frame_codec);
XinZhangMS 0:f7f1f0d76dd6 1226 free(connection);
XinZhangMS 0:f7f1f0d76dd6 1227 connection = NULL;
XinZhangMS 0:f7f1f0d76dd6 1228 }
XinZhangMS 0:f7f1f0d76dd6 1229 else
XinZhangMS 0:f7f1f0d76dd6 1230 {
XinZhangMS 0:f7f1f0d76dd6 1231 (void)memcpy(connection->container_id, container_id, container_id_length + 1);
XinZhangMS 0:f7f1f0d76dd6 1232
XinZhangMS 0:f7f1f0d76dd6 1233 /* Codes_S_R_S_CONNECTION_01_173: [<field name="max-frame-size" type="uint" default="4294967295"/>] */
XinZhangMS 0:f7f1f0d76dd6 1234 connection->max_frame_size = 4294967295u;
XinZhangMS 0:f7f1f0d76dd6 1235 /* Codes: [<field name="channel-max" type="ushort" default="65535"/>] */
XinZhangMS 0:f7f1f0d76dd6 1236 connection->channel_max = 65535;
XinZhangMS 0:f7f1f0d76dd6 1237
XinZhangMS 0:f7f1f0d76dd6 1238 /* Codes_S_R_S_CONNECTION_01_175: [<field name="idle-time-out" type="milliseconds"/>] */
XinZhangMS 0:f7f1f0d76dd6 1239 /* Codes_S_R_S_CONNECTION_01_192: [A value of zero is the same as if it was not set (null).] */
XinZhangMS 0:f7f1f0d76dd6 1240 connection->idle_timeout = 0;
XinZhangMS 0:f7f1f0d76dd6 1241 connection->remote_idle_timeout = 0;
XinZhangMS 0:f7f1f0d76dd6 1242 connection->remote_idle_timeout_send_frame_millisecond = 0;
XinZhangMS 0:f7f1f0d76dd6 1243 connection->idle_timeout_empty_frame_send_ratio = 0.5;
XinZhangMS 0:f7f1f0d76dd6 1244
XinZhangMS 0:f7f1f0d76dd6 1245 connection->endpoint_count = 0;
XinZhangMS 0:f7f1f0d76dd6 1246 connection->endpoints = NULL;
XinZhangMS 0:f7f1f0d76dd6 1247 connection->header_bytes_received = 0;
XinZhangMS 0:f7f1f0d76dd6 1248 connection->is_remote_frame_received = 0;
XinZhangMS 0:f7f1f0d76dd6 1249 connection->properties = NULL;
XinZhangMS 0:f7f1f0d76dd6 1250
XinZhangMS 0:f7f1f0d76dd6 1251 connection->is_underlying_io_open = 0;
XinZhangMS 0:f7f1f0d76dd6 1252 connection->remote_max_frame_size = 512;
XinZhangMS 0:f7f1f0d76dd6 1253 connection->is_trace_on = 0;
XinZhangMS 0:f7f1f0d76dd6 1254
XinZhangMS 0:f7f1f0d76dd6 1255 /* Mark that settings have not yet been set by the user */
XinZhangMS 0:f7f1f0d76dd6 1256 connection->idle_timeout_specified = 0;
XinZhangMS 0:f7f1f0d76dd6 1257
XinZhangMS 0:f7f1f0d76dd6 1258 connection->on_new_endpoint = on_new_endpoint;
XinZhangMS 0:f7f1f0d76dd6 1259 connection->on_new_endpoint_callback_context = callback_context;
XinZhangMS 0:f7f1f0d76dd6 1260
XinZhangMS 0:f7f1f0d76dd6 1261 connection->on_connection_close_received_event_subscription.on_connection_close_received = NULL;
XinZhangMS 0:f7f1f0d76dd6 1262 connection->on_connection_close_received_event_subscription.context = NULL;
XinZhangMS 0:f7f1f0d76dd6 1263
XinZhangMS 0:f7f1f0d76dd6 1264 connection->on_io_error = on_io_error;
XinZhangMS 0:f7f1f0d76dd6 1265 connection->on_io_error_callback_context = on_io_error_context;
XinZhangMS 0:f7f1f0d76dd6 1266 connection->on_connection_state_changed = on_connection_state_changed;
XinZhangMS 0:f7f1f0d76dd6 1267 connection->on_connection_state_changed_callback_context = on_connection_state_changed_context;
XinZhangMS 0:f7f1f0d76dd6 1268
XinZhangMS 0:f7f1f0d76dd6 1269 if (tickcounter_get_current_ms(connection->tick_counter, &connection->last_frame_received_time) != 0)
XinZhangMS 0:f7f1f0d76dd6 1270 {
XinZhangMS 0:f7f1f0d76dd6 1271 LogError("Could not retrieve time for last frame received time");
XinZhangMS 0:f7f1f0d76dd6 1272 tickcounter_destroy(connection->tick_counter);
XinZhangMS 0:f7f1f0d76dd6 1273 free(connection->container_id);
XinZhangMS 0:f7f1f0d76dd6 1274 free(connection->host_name);
XinZhangMS 0:f7f1f0d76dd6 1275 amqp_frame_codec_destroy(connection->amqp_frame_codec);
XinZhangMS 0:f7f1f0d76dd6 1276 frame_codec_destroy(connection->frame_codec);
XinZhangMS 0:f7f1f0d76dd6 1277 free(connection);
XinZhangMS 0:f7f1f0d76dd6 1278 connection = NULL;
XinZhangMS 0:f7f1f0d76dd6 1279 }
XinZhangMS 0:f7f1f0d76dd6 1280 else
XinZhangMS 0:f7f1f0d76dd6 1281 {
XinZhangMS 0:f7f1f0d76dd6 1282 connection->last_frame_sent_time = connection->last_frame_received_time;
XinZhangMS 0:f7f1f0d76dd6 1283
XinZhangMS 0:f7f1f0d76dd6 1284 /* Codes_S_R_S_CONNECTION_01_072: [When connection_create succeeds, the state of the connection shall be CONNECTION_STATE_START.] */
XinZhangMS 0:f7f1f0d76dd6 1285 connection_set_state(connection, CONNECTION_STATE_START);
XinZhangMS 0:f7f1f0d76dd6 1286 }
XinZhangMS 0:f7f1f0d76dd6 1287 }
XinZhangMS 0:f7f1f0d76dd6 1288 }
XinZhangMS 0:f7f1f0d76dd6 1289 }
XinZhangMS 0:f7f1f0d76dd6 1290 }
XinZhangMS 0:f7f1f0d76dd6 1291 }
XinZhangMS 0:f7f1f0d76dd6 1292 }
XinZhangMS 0:f7f1f0d76dd6 1293 }
XinZhangMS 0:f7f1f0d76dd6 1294
XinZhangMS 0:f7f1f0d76dd6 1295 return connection;
XinZhangMS 0:f7f1f0d76dd6 1296 }
XinZhangMS 0:f7f1f0d76dd6 1297
XinZhangMS 0:f7f1f0d76dd6 1298 void connection_destroy(CONNECTION_HANDLE connection)
XinZhangMS 0:f7f1f0d76dd6 1299 {
XinZhangMS 0:f7f1f0d76dd6 1300 /* Codes_S_R_S_CONNECTION_01_079: [If handle is NULL, connection_destroy shall do nothing.] */
XinZhangMS 0:f7f1f0d76dd6 1301 if (connection == NULL)
XinZhangMS 0:f7f1f0d76dd6 1302 {
XinZhangMS 0:f7f1f0d76dd6 1303 LogError("NULL connection");
XinZhangMS 0:f7f1f0d76dd6 1304 }
XinZhangMS 0:f7f1f0d76dd6 1305 else
XinZhangMS 0:f7f1f0d76dd6 1306 {
XinZhangMS 0:f7f1f0d76dd6 1307 /* Codes_S_R_S_CONNECTION_01_073: [connection_destroy shall free all resources associated with a connection.] */
XinZhangMS 0:f7f1f0d76dd6 1308 if (connection->is_underlying_io_open)
XinZhangMS 0:f7f1f0d76dd6 1309 {
XinZhangMS 0:f7f1f0d76dd6 1310 (void)connection_close(connection, NULL, NULL, NULL);
XinZhangMS 0:f7f1f0d76dd6 1311 }
XinZhangMS 0:f7f1f0d76dd6 1312
XinZhangMS 0:f7f1f0d76dd6 1313 amqp_frame_codec_destroy(connection->amqp_frame_codec);
XinZhangMS 0:f7f1f0d76dd6 1314 frame_codec_destroy(connection->frame_codec);
XinZhangMS 0:f7f1f0d76dd6 1315 tickcounter_destroy(connection->tick_counter);
XinZhangMS 0:f7f1f0d76dd6 1316 if (connection->properties != NULL)
XinZhangMS 0:f7f1f0d76dd6 1317 {
XinZhangMS 0:f7f1f0d76dd6 1318 amqpvalue_destroy(connection->properties);
XinZhangMS 0:f7f1f0d76dd6 1319 }
XinZhangMS 0:f7f1f0d76dd6 1320
XinZhangMS 0:f7f1f0d76dd6 1321 free(connection->host_name);
XinZhangMS 0:f7f1f0d76dd6 1322 free(connection->container_id);
XinZhangMS 0:f7f1f0d76dd6 1323
XinZhangMS 0:f7f1f0d76dd6 1324 /* Codes_S_R_S_CONNECTION_01_074: [connection_destroy shall close the socket connection.] */
XinZhangMS 0:f7f1f0d76dd6 1325 free(connection);
XinZhangMS 0:f7f1f0d76dd6 1326 }
XinZhangMS 0:f7f1f0d76dd6 1327 }
XinZhangMS 0:f7f1f0d76dd6 1328
XinZhangMS 0:f7f1f0d76dd6 1329 int connection_open(CONNECTION_HANDLE connection)
XinZhangMS 0:f7f1f0d76dd6 1330 {
XinZhangMS 0:f7f1f0d76dd6 1331 int result;
XinZhangMS 0:f7f1f0d76dd6 1332
XinZhangMS 0:f7f1f0d76dd6 1333 if (connection == NULL)
XinZhangMS 0:f7f1f0d76dd6 1334 {
XinZhangMS 0:f7f1f0d76dd6 1335 LogError("NULL connection");
XinZhangMS 0:f7f1f0d76dd6 1336 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1337 }
XinZhangMS 0:f7f1f0d76dd6 1338 else
XinZhangMS 0:f7f1f0d76dd6 1339 {
XinZhangMS 0:f7f1f0d76dd6 1340 if (!connection->is_underlying_io_open)
XinZhangMS 0:f7f1f0d76dd6 1341 {
XinZhangMS 0:f7f1f0d76dd6 1342 if (xio_open(connection->io, connection_on_io_open_complete, connection, connection_on_bytes_received, connection, connection_on_io_error, connection) != 0)
XinZhangMS 0:f7f1f0d76dd6 1343 {
XinZhangMS 0:f7f1f0d76dd6 1344 LogError("Opening the underlying IO failed");
XinZhangMS 0:f7f1f0d76dd6 1345 connection_set_state(connection, CONNECTION_STATE_END);
XinZhangMS 0:f7f1f0d76dd6 1346 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1347 }
XinZhangMS 0:f7f1f0d76dd6 1348 else
XinZhangMS 0:f7f1f0d76dd6 1349 {
XinZhangMS 0:f7f1f0d76dd6 1350 connection->is_underlying_io_open = 1;
XinZhangMS 0:f7f1f0d76dd6 1351
XinZhangMS 0:f7f1f0d76dd6 1352 connection_set_state(connection, CONNECTION_STATE_START);
XinZhangMS 0:f7f1f0d76dd6 1353
XinZhangMS 0:f7f1f0d76dd6 1354 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1355 }
XinZhangMS 0:f7f1f0d76dd6 1356 }
XinZhangMS 0:f7f1f0d76dd6 1357 else
XinZhangMS 0:f7f1f0d76dd6 1358 {
XinZhangMS 0:f7f1f0d76dd6 1359 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1360 }
XinZhangMS 0:f7f1f0d76dd6 1361 }
XinZhangMS 0:f7f1f0d76dd6 1362
XinZhangMS 0:f7f1f0d76dd6 1363 return result;
XinZhangMS 0:f7f1f0d76dd6 1364 }
XinZhangMS 0:f7f1f0d76dd6 1365
XinZhangMS 0:f7f1f0d76dd6 1366 int connection_listen(CONNECTION_HANDLE connection)
XinZhangMS 0:f7f1f0d76dd6 1367 {
XinZhangMS 0:f7f1f0d76dd6 1368 int result;
XinZhangMS 0:f7f1f0d76dd6 1369
XinZhangMS 0:f7f1f0d76dd6 1370 if (connection == NULL)
XinZhangMS 0:f7f1f0d76dd6 1371 {
XinZhangMS 0:f7f1f0d76dd6 1372 LogError("NULL connection");
XinZhangMS 0:f7f1f0d76dd6 1373 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1374 }
XinZhangMS 0:f7f1f0d76dd6 1375 else
XinZhangMS 0:f7f1f0d76dd6 1376 {
XinZhangMS 0:f7f1f0d76dd6 1377 if (!connection->is_underlying_io_open)
XinZhangMS 0:f7f1f0d76dd6 1378 {
XinZhangMS 0:f7f1f0d76dd6 1379 if (xio_open(connection->io, connection_on_io_open_complete, connection, connection_on_bytes_received, connection, connection_on_io_error, connection) != 0)
XinZhangMS 0:f7f1f0d76dd6 1380 {
XinZhangMS 0:f7f1f0d76dd6 1381 LogError("Opening the underlying IO failed");
XinZhangMS 0:f7f1f0d76dd6 1382 connection_set_state(connection, CONNECTION_STATE_END);
XinZhangMS 0:f7f1f0d76dd6 1383 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1384 }
XinZhangMS 0:f7f1f0d76dd6 1385 else
XinZhangMS 0:f7f1f0d76dd6 1386 {
XinZhangMS 0:f7f1f0d76dd6 1387 connection->is_underlying_io_open = 1;
XinZhangMS 0:f7f1f0d76dd6 1388
XinZhangMS 0:f7f1f0d76dd6 1389 connection_set_state(connection, CONNECTION_STATE_HDR_EXCH);
XinZhangMS 0:f7f1f0d76dd6 1390
XinZhangMS 0:f7f1f0d76dd6 1391 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1392 }
XinZhangMS 0:f7f1f0d76dd6 1393 }
XinZhangMS 0:f7f1f0d76dd6 1394 else
XinZhangMS 0:f7f1f0d76dd6 1395 {
XinZhangMS 0:f7f1f0d76dd6 1396 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1397 }
XinZhangMS 0:f7f1f0d76dd6 1398 }
XinZhangMS 0:f7f1f0d76dd6 1399
XinZhangMS 0:f7f1f0d76dd6 1400 return result;
XinZhangMS 0:f7f1f0d76dd6 1401 }
XinZhangMS 0:f7f1f0d76dd6 1402
XinZhangMS 0:f7f1f0d76dd6 1403 int connection_close(CONNECTION_HANDLE connection, const char* condition_value, const char* description, AMQP_VALUE info)
XinZhangMS 0:f7f1f0d76dd6 1404 {
XinZhangMS 0:f7f1f0d76dd6 1405 int result;
XinZhangMS 0:f7f1f0d76dd6 1406
XinZhangMS 0:f7f1f0d76dd6 1407 if (connection == NULL)
XinZhangMS 0:f7f1f0d76dd6 1408 {
XinZhangMS 0:f7f1f0d76dd6 1409 LogError("NULL connection");
XinZhangMS 0:f7f1f0d76dd6 1410 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1411 }
XinZhangMS 0:f7f1f0d76dd6 1412 else if ((info != NULL) &&
XinZhangMS 0:f7f1f0d76dd6 1413 (amqpvalue_get_type(info) != AMQP_TYPE_MAP) &&
XinZhangMS 0:f7f1f0d76dd6 1414 (amqpvalue_get_type(info) != AMQP_TYPE_NULL))
XinZhangMS 0:f7f1f0d76dd6 1415 {
XinZhangMS 0:f7f1f0d76dd6 1416 LogError("Invalid info, expected a map");
XinZhangMS 0:f7f1f0d76dd6 1417 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1418 }
XinZhangMS 0:f7f1f0d76dd6 1419 else
XinZhangMS 0:f7f1f0d76dd6 1420 {
XinZhangMS 0:f7f1f0d76dd6 1421 if (condition_value != NULL)
XinZhangMS 0:f7f1f0d76dd6 1422 {
XinZhangMS 0:f7f1f0d76dd6 1423 close_connection_with_error(connection, condition_value, description, info);
XinZhangMS 0:f7f1f0d76dd6 1424 }
XinZhangMS 0:f7f1f0d76dd6 1425 else
XinZhangMS 0:f7f1f0d76dd6 1426 {
XinZhangMS 0:f7f1f0d76dd6 1427 if (send_close_frame(connection, NULL) != 0)
XinZhangMS 0:f7f1f0d76dd6 1428 {
XinZhangMS 0:f7f1f0d76dd6 1429 LogError("Sending CLOSE frame failed");
XinZhangMS 0:f7f1f0d76dd6 1430 }
XinZhangMS 0:f7f1f0d76dd6 1431
XinZhangMS 0:f7f1f0d76dd6 1432 connection_set_state(connection, CONNECTION_STATE_END);
XinZhangMS 0:f7f1f0d76dd6 1433 }
XinZhangMS 0:f7f1f0d76dd6 1434
XinZhangMS 0:f7f1f0d76dd6 1435 if (xio_close(connection->io, NULL, NULL) != 0)
XinZhangMS 0:f7f1f0d76dd6 1436 {
XinZhangMS 0:f7f1f0d76dd6 1437 LogError("xio_close failed");
XinZhangMS 0:f7f1f0d76dd6 1438 }
XinZhangMS 0:f7f1f0d76dd6 1439
XinZhangMS 0:f7f1f0d76dd6 1440 connection->is_underlying_io_open = 1;
XinZhangMS 0:f7f1f0d76dd6 1441
XinZhangMS 0:f7f1f0d76dd6 1442 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1443 }
XinZhangMS 0:f7f1f0d76dd6 1444
XinZhangMS 0:f7f1f0d76dd6 1445 return result;
XinZhangMS 0:f7f1f0d76dd6 1446 }
XinZhangMS 0:f7f1f0d76dd6 1447
XinZhangMS 0:f7f1f0d76dd6 1448 int connection_set_max_frame_size(CONNECTION_HANDLE connection, uint32_t max_frame_size)
XinZhangMS 0:f7f1f0d76dd6 1449 {
XinZhangMS 0:f7f1f0d76dd6 1450 int result;
XinZhangMS 0:f7f1f0d76dd6 1451
XinZhangMS 0:f7f1f0d76dd6 1452 /* Codes_S_R_S_CONNECTION_01_163: [If connection is NULL, connection_set_max_frame_size shall fail and return a non-zero value.] */
XinZhangMS 0:f7f1f0d76dd6 1453 if (connection == NULL)
XinZhangMS 0:f7f1f0d76dd6 1454 {
XinZhangMS 0:f7f1f0d76dd6 1455 LogError("NULL connection");
XinZhangMS 0:f7f1f0d76dd6 1456 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1457 }
XinZhangMS 0:f7f1f0d76dd6 1458 /* Codes_S_R_S_CONNECTION_01_150: [If the max_frame_size is invalid then connection_set_max_frame_size shall fail and return a non-zero value.] */
XinZhangMS 0:f7f1f0d76dd6 1459 /* Codes_S_R_S_CONNECTION_01_167: [Both peers MUST accept frames of up to 512 (MIN-MAX-FRAME-SIZE) octets.] */
XinZhangMS 0:f7f1f0d76dd6 1460 else if (max_frame_size < 512)
XinZhangMS 0:f7f1f0d76dd6 1461 {
XinZhangMS 0:f7f1f0d76dd6 1462 LogError("max_frame_size too small");
XinZhangMS 0:f7f1f0d76dd6 1463 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1464 }
XinZhangMS 0:f7f1f0d76dd6 1465 else
XinZhangMS 0:f7f1f0d76dd6 1466 {
XinZhangMS 0:f7f1f0d76dd6 1467 /* Codes_S_R_S_CONNECTION_01_157: [If connection_set_max_frame_size is called after the initial Open frame has been sent, it shall fail and return a non-zero value.] */
XinZhangMS 0:f7f1f0d76dd6 1468 if (connection->connection_state != CONNECTION_STATE_START)
XinZhangMS 0:f7f1f0d76dd6 1469 {
XinZhangMS 0:f7f1f0d76dd6 1470 LogError("Connection already open");
XinZhangMS 0:f7f1f0d76dd6 1471 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1472 }
XinZhangMS 0:f7f1f0d76dd6 1473 else
XinZhangMS 0:f7f1f0d76dd6 1474 {
XinZhangMS 0:f7f1f0d76dd6 1475 /* Codes_S_R_S_CONNECTION_01_148: [connection_set_max_frame_size shall set the max_frame_size associated with a connection.] */
XinZhangMS 0:f7f1f0d76dd6 1476 /* Codes_S_R_S_CONNECTION_01_164: [If connection_set_max_frame_size fails, the previous max_frame_size setting shall be retained.] */
XinZhangMS 0:f7f1f0d76dd6 1477 connection->max_frame_size = max_frame_size;
XinZhangMS 0:f7f1f0d76dd6 1478
XinZhangMS 0:f7f1f0d76dd6 1479 /* Codes_S_R_S_CONNECTION_01_149: [On success connection_set_max_frame_size shall return 0.] */
XinZhangMS 0:f7f1f0d76dd6 1480 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1481 }
XinZhangMS 0:f7f1f0d76dd6 1482 }
XinZhangMS 0:f7f1f0d76dd6 1483
XinZhangMS 0:f7f1f0d76dd6 1484 return result;
XinZhangMS 0:f7f1f0d76dd6 1485 }
XinZhangMS 0:f7f1f0d76dd6 1486
XinZhangMS 0:f7f1f0d76dd6 1487 int connection_get_max_frame_size(CONNECTION_HANDLE connection, uint32_t* max_frame_size)
XinZhangMS 0:f7f1f0d76dd6 1488 {
XinZhangMS 0:f7f1f0d76dd6 1489 int result;
XinZhangMS 0:f7f1f0d76dd6 1490
XinZhangMS 0:f7f1f0d76dd6 1491 /* Codes_S_R_S_CONNECTION_01_170: [If connection or max_frame_size is NULL, connection_get_max_frame_size shall fail and return a non-zero value.] */
XinZhangMS 0:f7f1f0d76dd6 1492 if ((connection == NULL) ||
XinZhangMS 0:f7f1f0d76dd6 1493 (max_frame_size == NULL))
XinZhangMS 0:f7f1f0d76dd6 1494 {
XinZhangMS 0:f7f1f0d76dd6 1495 LogError("Bad arguments: connection = %p, max_frame_size = %p",
XinZhangMS 0:f7f1f0d76dd6 1496 connection, max_frame_size);
XinZhangMS 0:f7f1f0d76dd6 1497 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1498 }
XinZhangMS 0:f7f1f0d76dd6 1499 else
XinZhangMS 0:f7f1f0d76dd6 1500 {
XinZhangMS 0:f7f1f0d76dd6 1501 /* Codes_S_R_S_CONNECTION_01_168: [connection_get_max_frame_size shall return in the max_frame_size argument the current max frame size setting.] */
XinZhangMS 0:f7f1f0d76dd6 1502 *max_frame_size = connection->max_frame_size;
XinZhangMS 0:f7f1f0d76dd6 1503
XinZhangMS 0:f7f1f0d76dd6 1504 /* Codes_S_R_S_CONNECTION_01_169: [On success, connection_get_max_frame_size shall return 0.] */
XinZhangMS 0:f7f1f0d76dd6 1505 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1506 }
XinZhangMS 0:f7f1f0d76dd6 1507
XinZhangMS 0:f7f1f0d76dd6 1508 return result;
XinZhangMS 0:f7f1f0d76dd6 1509 }
XinZhangMS 0:f7f1f0d76dd6 1510
XinZhangMS 0:f7f1f0d76dd6 1511 int connection_set_channel_max(CONNECTION_HANDLE connection, uint16_t channel_max)
XinZhangMS 0:f7f1f0d76dd6 1512 {
XinZhangMS 0:f7f1f0d76dd6 1513 int result;
XinZhangMS 0:f7f1f0d76dd6 1514
XinZhangMS 0:f7f1f0d76dd6 1515 /* Codes_S_R_S_CONNECTION_01_181: [If connection is NULL then connection_set_channel_max shall fail and return a non-zero value.] */
XinZhangMS 0:f7f1f0d76dd6 1516 if (connection == NULL)
XinZhangMS 0:f7f1f0d76dd6 1517 {
XinZhangMS 0:f7f1f0d76dd6 1518 LogError("NULL connection");
XinZhangMS 0:f7f1f0d76dd6 1519 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1520 }
XinZhangMS 0:f7f1f0d76dd6 1521 else
XinZhangMS 0:f7f1f0d76dd6 1522 {
XinZhangMS 0:f7f1f0d76dd6 1523 /* Codes_S_R_S_CONNECTION_01_156: [If connection_set_channel_max is called after the initial Open frame has been sent, it shall fail and return a non-zero value.] */
XinZhangMS 0:f7f1f0d76dd6 1524 if (connection->connection_state != CONNECTION_STATE_START)
XinZhangMS 0:f7f1f0d76dd6 1525 {
XinZhangMS 0:f7f1f0d76dd6 1526 LogError("Connection already open");
XinZhangMS 0:f7f1f0d76dd6 1527 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1528 }
XinZhangMS 0:f7f1f0d76dd6 1529 else
XinZhangMS 0:f7f1f0d76dd6 1530 {
XinZhangMS 0:f7f1f0d76dd6 1531 /* Codes_S_R_S_CONNECTION_01_153: [connection_set_channel_max shall set the channel_max associated with a connection.] */
XinZhangMS 0:f7f1f0d76dd6 1532 /* Codes_S_R_S_CONNECTION_01_165: [If connection_set_channel_max fails, the previous channel_max setting shall be retained.] */
XinZhangMS 0:f7f1f0d76dd6 1533 connection->channel_max = channel_max;
XinZhangMS 0:f7f1f0d76dd6 1534
XinZhangMS 0:f7f1f0d76dd6 1535 /* Codes_S_R_S_CONNECTION_01_154: [On success connection_set_channel_max shall return 0.] */
XinZhangMS 0:f7f1f0d76dd6 1536 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1537 }
XinZhangMS 0:f7f1f0d76dd6 1538 }
XinZhangMS 0:f7f1f0d76dd6 1539
XinZhangMS 0:f7f1f0d76dd6 1540 return result;
XinZhangMS 0:f7f1f0d76dd6 1541 }
XinZhangMS 0:f7f1f0d76dd6 1542
XinZhangMS 0:f7f1f0d76dd6 1543 int connection_get_channel_max(CONNECTION_HANDLE connection, uint16_t* channel_max)
XinZhangMS 0:f7f1f0d76dd6 1544 {
XinZhangMS 0:f7f1f0d76dd6 1545 int result;
XinZhangMS 0:f7f1f0d76dd6 1546
XinZhangMS 0:f7f1f0d76dd6 1547 /* Codes_S_R_S_CONNECTION_01_184: [If connection or channel_max is NULL, connection_get_channel_max shall fail and return a non-zero value.] */
XinZhangMS 0:f7f1f0d76dd6 1548 if ((connection == NULL) ||
XinZhangMS 0:f7f1f0d76dd6 1549 (channel_max == NULL))
XinZhangMS 0:f7f1f0d76dd6 1550 {
XinZhangMS 0:f7f1f0d76dd6 1551 LogError("Bad arguments: connection = %p, channel_max = %p",
XinZhangMS 0:f7f1f0d76dd6 1552 connection, channel_max);
XinZhangMS 0:f7f1f0d76dd6 1553 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1554 }
XinZhangMS 0:f7f1f0d76dd6 1555 else
XinZhangMS 0:f7f1f0d76dd6 1556 {
XinZhangMS 0:f7f1f0d76dd6 1557 /* Codes_S_R_S_CONNECTION_01_182: [connection_get_channel_max shall return in the channel_max argument the current channel_max setting.] */
XinZhangMS 0:f7f1f0d76dd6 1558 *channel_max = connection->channel_max;
XinZhangMS 0:f7f1f0d76dd6 1559
XinZhangMS 0:f7f1f0d76dd6 1560 /* Codes_S_R_S_CONNECTION_01_183: [On success, connection_get_channel_max shall return 0.] */
XinZhangMS 0:f7f1f0d76dd6 1561 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1562 }
XinZhangMS 0:f7f1f0d76dd6 1563
XinZhangMS 0:f7f1f0d76dd6 1564 return result;
XinZhangMS 0:f7f1f0d76dd6 1565 }
XinZhangMS 0:f7f1f0d76dd6 1566
XinZhangMS 0:f7f1f0d76dd6 1567 int connection_set_idle_timeout(CONNECTION_HANDLE connection, milliseconds idle_timeout)
XinZhangMS 0:f7f1f0d76dd6 1568 {
XinZhangMS 0:f7f1f0d76dd6 1569 int result;
XinZhangMS 0:f7f1f0d76dd6 1570
XinZhangMS 0:f7f1f0d76dd6 1571 /* Codes_S_R_S_CONNECTION_01_191: [If connection is NULL, connection_set_idle_timeout shall fail and return a non-zero value.] */
XinZhangMS 0:f7f1f0d76dd6 1572 if (connection == NULL)
XinZhangMS 0:f7f1f0d76dd6 1573 {
XinZhangMS 0:f7f1f0d76dd6 1574 LogError("NULL connection");
XinZhangMS 0:f7f1f0d76dd6 1575 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1576 }
XinZhangMS 0:f7f1f0d76dd6 1577 else
XinZhangMS 0:f7f1f0d76dd6 1578 {
XinZhangMS 0:f7f1f0d76dd6 1579 /* Codes_S_R_S_CONNECTION_01_158: [If connection_set_idle_timeout is called after the initial Open frame has been sent, it shall fail and return a non-zero value.] */
XinZhangMS 0:f7f1f0d76dd6 1580 if (connection->connection_state != CONNECTION_STATE_START)
XinZhangMS 0:f7f1f0d76dd6 1581 {
XinZhangMS 0:f7f1f0d76dd6 1582 LogError("Connection already open");
XinZhangMS 0:f7f1f0d76dd6 1583 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1584 }
XinZhangMS 0:f7f1f0d76dd6 1585 else
XinZhangMS 0:f7f1f0d76dd6 1586 {
XinZhangMS 0:f7f1f0d76dd6 1587 /* Codes_S_R_S_CONNECTION_01_159: [connection_set_idle_timeout shall set the idle_timeout associated with a connection.] */
XinZhangMS 0:f7f1f0d76dd6 1588 /* Codes_S_R_S_CONNECTION_01_166: [If connection_set_idle_timeout fails, the previous idle_timeout setting shall be retained.] */
XinZhangMS 0:f7f1f0d76dd6 1589 connection->idle_timeout = idle_timeout;
XinZhangMS 0:f7f1f0d76dd6 1590 connection->idle_timeout_specified = true;
XinZhangMS 0:f7f1f0d76dd6 1591
XinZhangMS 0:f7f1f0d76dd6 1592 /* Codes_S_R_S_CONNECTION_01_160: [On success connection_set_idle_timeout shall return 0.] */
XinZhangMS 0:f7f1f0d76dd6 1593 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1594 }
XinZhangMS 0:f7f1f0d76dd6 1595 }
XinZhangMS 0:f7f1f0d76dd6 1596
XinZhangMS 0:f7f1f0d76dd6 1597 return result;
XinZhangMS 0:f7f1f0d76dd6 1598 }
XinZhangMS 0:f7f1f0d76dd6 1599
XinZhangMS 0:f7f1f0d76dd6 1600 int connection_get_idle_timeout(CONNECTION_HANDLE connection, milliseconds* idle_timeout)
XinZhangMS 0:f7f1f0d76dd6 1601 {
XinZhangMS 0:f7f1f0d76dd6 1602 int result;
XinZhangMS 0:f7f1f0d76dd6 1603
XinZhangMS 0:f7f1f0d76dd6 1604 /* Codes_S_R_S_CONNECTION_01_190: [If connection or idle_timeout is NULL, connection_get_idle_timeout shall fail and return a non-zero value.] */
XinZhangMS 0:f7f1f0d76dd6 1605 if ((connection == NULL) ||
XinZhangMS 0:f7f1f0d76dd6 1606 (idle_timeout == NULL))
XinZhangMS 0:f7f1f0d76dd6 1607 {
XinZhangMS 0:f7f1f0d76dd6 1608 LogError("Bad arguments: connection = %p, idle_timeout = %p",
XinZhangMS 0:f7f1f0d76dd6 1609 connection, idle_timeout);
XinZhangMS 0:f7f1f0d76dd6 1610 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1611 }
XinZhangMS 0:f7f1f0d76dd6 1612 else
XinZhangMS 0:f7f1f0d76dd6 1613 {
XinZhangMS 0:f7f1f0d76dd6 1614 /* Codes_S_R_S_CONNECTION_01_188: [connection_get_idle_timeout shall return in the idle_timeout argument the current idle_timeout setting.] */
XinZhangMS 0:f7f1f0d76dd6 1615 *idle_timeout = connection->idle_timeout;
XinZhangMS 0:f7f1f0d76dd6 1616
XinZhangMS 0:f7f1f0d76dd6 1617 /* Codes_S_R_S_CONNECTION_01_189: [On success, connection_get_idle_timeout shall return 0.] */
XinZhangMS 0:f7f1f0d76dd6 1618 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1619 }
XinZhangMS 0:f7f1f0d76dd6 1620
XinZhangMS 0:f7f1f0d76dd6 1621 return result;
XinZhangMS 0:f7f1f0d76dd6 1622 }
XinZhangMS 0:f7f1f0d76dd6 1623
XinZhangMS 0:f7f1f0d76dd6 1624 int connection_set_properties(CONNECTION_HANDLE connection, fields properties)
XinZhangMS 0:f7f1f0d76dd6 1625 {
XinZhangMS 0:f7f1f0d76dd6 1626 int result;
XinZhangMS 0:f7f1f0d76dd6 1627
XinZhangMS 0:f7f1f0d76dd6 1628 /* Codes_S_R_S_CONNECTION_01_261: [If connection is NULL, connection_set_properties shall fail and return a non-zero value.] */
XinZhangMS 0:f7f1f0d76dd6 1629 if (connection == NULL)
XinZhangMS 0:f7f1f0d76dd6 1630 {
XinZhangMS 0:f7f1f0d76dd6 1631 LogError("NULL connection");
XinZhangMS 0:f7f1f0d76dd6 1632 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1633 }
XinZhangMS 0:f7f1f0d76dd6 1634 else
XinZhangMS 0:f7f1f0d76dd6 1635 {
XinZhangMS 0:f7f1f0d76dd6 1636 /* Codes_S_R_S_CONNECTION_01_262: [If connection_set_properties is called after the initial Open frame has been sent, it shall fail and return a non-zero value.] */
XinZhangMS 0:f7f1f0d76dd6 1637 if (connection->connection_state != CONNECTION_STATE_START)
XinZhangMS 0:f7f1f0d76dd6 1638 {
XinZhangMS 0:f7f1f0d76dd6 1639 LogError("Connection already open");
XinZhangMS 0:f7f1f0d76dd6 1640 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1641 }
XinZhangMS 0:f7f1f0d76dd6 1642 else
XinZhangMS 0:f7f1f0d76dd6 1643 {
XinZhangMS 0:f7f1f0d76dd6 1644 if (properties == NULL)
XinZhangMS 0:f7f1f0d76dd6 1645 {
XinZhangMS 0:f7f1f0d76dd6 1646 /* Codes_S_R_S_CONNECTION_01_263: [ If `properties` is NULL, the previously stored properties associated with `connection` shall be freed. ]*/
XinZhangMS 0:f7f1f0d76dd6 1647 if (connection->properties != NULL)
XinZhangMS 0:f7f1f0d76dd6 1648 {
XinZhangMS 0:f7f1f0d76dd6 1649 fields_destroy(connection->properties);
XinZhangMS 0:f7f1f0d76dd6 1650 connection->properties = NULL;
XinZhangMS 0:f7f1f0d76dd6 1651 }
XinZhangMS 0:f7f1f0d76dd6 1652
XinZhangMS 0:f7f1f0d76dd6 1653 /* Codes_S_R_S_CONNECTION_01_264: [ On success it shall return 0. ]*/
XinZhangMS 0:f7f1f0d76dd6 1654 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1655 }
XinZhangMS 0:f7f1f0d76dd6 1656 else
XinZhangMS 0:f7f1f0d76dd6 1657 {
XinZhangMS 0:f7f1f0d76dd6 1658 fields new_properties;
XinZhangMS 0:f7f1f0d76dd6 1659
XinZhangMS 0:f7f1f0d76dd6 1660 /* Codes_S_R_S_CONNECTION_01_265: [ `connection_set_properties` shall copy the contents of `properties` as the properties contents for the connection instance identified by `connection`. ]*/
XinZhangMS 0:f7f1f0d76dd6 1661 /* Codes_S_R_S_CONNECTION_01_266: [ Cloning the properties shall be done by calling `fields_clone`. ]*/
XinZhangMS 0:f7f1f0d76dd6 1662 new_properties = fields_clone(properties);
XinZhangMS 0:f7f1f0d76dd6 1663 if (new_properties == NULL)
XinZhangMS 0:f7f1f0d76dd6 1664 {
XinZhangMS 0:f7f1f0d76dd6 1665 /* Codes_S_R_S_CONNECTION_01_267: [ If `fields_clone` fails, `connection_set_properties` shall fail and return a non-zero value. ]*/
XinZhangMS 0:f7f1f0d76dd6 1666 LogError("Cannot clone connection properties");
XinZhangMS 0:f7f1f0d76dd6 1667 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1668 }
XinZhangMS 0:f7f1f0d76dd6 1669 else
XinZhangMS 0:f7f1f0d76dd6 1670 {
XinZhangMS 0:f7f1f0d76dd6 1671 /* Codes_S_R_S_CONNECTION_01_268: [ If setting the properties fails, the previous value shall be preserved. ]*/
XinZhangMS 0:f7f1f0d76dd6 1672 /* Only do the free of the previous value if we could clone the new one*/
XinZhangMS 0:f7f1f0d76dd6 1673 if (connection->properties != NULL)
XinZhangMS 0:f7f1f0d76dd6 1674 {
XinZhangMS 0:f7f1f0d76dd6 1675 fields_destroy(connection->properties);
XinZhangMS 0:f7f1f0d76dd6 1676 }
XinZhangMS 0:f7f1f0d76dd6 1677
XinZhangMS 0:f7f1f0d76dd6 1678 connection->properties = new_properties;
XinZhangMS 0:f7f1f0d76dd6 1679
XinZhangMS 0:f7f1f0d76dd6 1680 /* Codes_S_R_S_CONNECTION_01_264: [ On success it shall return 0. ]*/
XinZhangMS 0:f7f1f0d76dd6 1681 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1682 }
XinZhangMS 0:f7f1f0d76dd6 1683 }
XinZhangMS 0:f7f1f0d76dd6 1684 }
XinZhangMS 0:f7f1f0d76dd6 1685 }
XinZhangMS 0:f7f1f0d76dd6 1686
XinZhangMS 0:f7f1f0d76dd6 1687 return result;
XinZhangMS 0:f7f1f0d76dd6 1688 }
XinZhangMS 0:f7f1f0d76dd6 1689
XinZhangMS 0:f7f1f0d76dd6 1690 int connection_get_properties(CONNECTION_HANDLE connection, fields* properties)
XinZhangMS 0:f7f1f0d76dd6 1691 {
XinZhangMS 0:f7f1f0d76dd6 1692 int result;
XinZhangMS 0:f7f1f0d76dd6 1693
XinZhangMS 0:f7f1f0d76dd6 1694 /* Codes_S_R_S_CONNECTION_01_269: [If connection or properties is NULL, connection_get_properties shall fail and return a non-zero value.] */
XinZhangMS 0:f7f1f0d76dd6 1695 if ((connection == NULL) ||
XinZhangMS 0:f7f1f0d76dd6 1696 (properties == NULL))
XinZhangMS 0:f7f1f0d76dd6 1697 {
XinZhangMS 0:f7f1f0d76dd6 1698 LogError("Bad arguments: connection = %p, properties = %p",
XinZhangMS 0:f7f1f0d76dd6 1699 connection, properties);
XinZhangMS 0:f7f1f0d76dd6 1700 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1701 }
XinZhangMS 0:f7f1f0d76dd6 1702 else
XinZhangMS 0:f7f1f0d76dd6 1703 {
XinZhangMS 0:f7f1f0d76dd6 1704 if (connection->properties == NULL)
XinZhangMS 0:f7f1f0d76dd6 1705 {
XinZhangMS 0:f7f1f0d76dd6 1706 /* Codes_S_R_S_CONNECTION_01_270: [ If no properties have been set, `connection_get_properties` shall set `properties` to NULL. ]*/
XinZhangMS 0:f7f1f0d76dd6 1707 *properties = NULL;
XinZhangMS 0:f7f1f0d76dd6 1708
XinZhangMS 0:f7f1f0d76dd6 1709 /* Codes_S_R_S_CONNECTION_01_271: [On success, connection_get_properties shall return 0.] */
XinZhangMS 0:f7f1f0d76dd6 1710 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1711 }
XinZhangMS 0:f7f1f0d76dd6 1712 else
XinZhangMS 0:f7f1f0d76dd6 1713 {
XinZhangMS 0:f7f1f0d76dd6 1714 /* Codes_S_R_S_CONNECTION_01_272: [connection_get_properties shall return in the properties argument the current properties setting.] */
XinZhangMS 0:f7f1f0d76dd6 1715 /* Codes_S_R_S_CONNECTION_01_273: [ Cloning the properties shall be done by calling `fields_clone`. ]*/
XinZhangMS 0:f7f1f0d76dd6 1716 *properties = fields_clone(connection->properties);
XinZhangMS 0:f7f1f0d76dd6 1717 if (*properties == NULL)
XinZhangMS 0:f7f1f0d76dd6 1718 {
XinZhangMS 0:f7f1f0d76dd6 1719 /* Codes_S_R_S_CONNECTION_01_274: [ If `fields_clone` fails, `connection_get_properties` shall fail and return a non-zero value. ]*/
XinZhangMS 0:f7f1f0d76dd6 1720 LogError("Cannot clone properties");
XinZhangMS 0:f7f1f0d76dd6 1721 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1722 }
XinZhangMS 0:f7f1f0d76dd6 1723 else
XinZhangMS 0:f7f1f0d76dd6 1724 {
XinZhangMS 0:f7f1f0d76dd6 1725 /* Codes_S_R_S_CONNECTION_01_271: [On success, connection_get_properties shall return 0.] */
XinZhangMS 0:f7f1f0d76dd6 1726 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1727 }
XinZhangMS 0:f7f1f0d76dd6 1728 }
XinZhangMS 0:f7f1f0d76dd6 1729 }
XinZhangMS 0:f7f1f0d76dd6 1730
XinZhangMS 0:f7f1f0d76dd6 1731 return result;
XinZhangMS 0:f7f1f0d76dd6 1732 }
XinZhangMS 0:f7f1f0d76dd6 1733
XinZhangMS 0:f7f1f0d76dd6 1734 int connection_get_remote_max_frame_size(CONNECTION_HANDLE connection, uint32_t* remote_max_frame_size)
XinZhangMS 0:f7f1f0d76dd6 1735 {
XinZhangMS 0:f7f1f0d76dd6 1736 int result;
XinZhangMS 0:f7f1f0d76dd6 1737
XinZhangMS 0:f7f1f0d76dd6 1738 if ((connection == NULL) ||
XinZhangMS 0:f7f1f0d76dd6 1739 (remote_max_frame_size == NULL))
XinZhangMS 0:f7f1f0d76dd6 1740 {
XinZhangMS 0:f7f1f0d76dd6 1741 LogError("Bad arguments: connection = %p, remote_max_frame_size = %p",
XinZhangMS 0:f7f1f0d76dd6 1742 connection, remote_max_frame_size);
XinZhangMS 0:f7f1f0d76dd6 1743 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1744 }
XinZhangMS 0:f7f1f0d76dd6 1745 else
XinZhangMS 0:f7f1f0d76dd6 1746 {
XinZhangMS 0:f7f1f0d76dd6 1747 *remote_max_frame_size = connection->remote_max_frame_size;
XinZhangMS 0:f7f1f0d76dd6 1748
XinZhangMS 0:f7f1f0d76dd6 1749 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1750 }
XinZhangMS 0:f7f1f0d76dd6 1751
XinZhangMS 0:f7f1f0d76dd6 1752 return result;
XinZhangMS 0:f7f1f0d76dd6 1753 }
XinZhangMS 0:f7f1f0d76dd6 1754
XinZhangMS 0:f7f1f0d76dd6 1755 uint64_t connection_handle_deadlines(CONNECTION_HANDLE connection)
XinZhangMS 0:f7f1f0d76dd6 1756 {
XinZhangMS 0:f7f1f0d76dd6 1757 uint64_t local_deadline = (uint64_t)-1;
XinZhangMS 0:f7f1f0d76dd6 1758 uint64_t remote_deadline = (uint64_t)-1;
XinZhangMS 0:f7f1f0d76dd6 1759
XinZhangMS 0:f7f1f0d76dd6 1760 if (connection == NULL)
XinZhangMS 0:f7f1f0d76dd6 1761 {
XinZhangMS 0:f7f1f0d76dd6 1762 LogError("NULL connection");
XinZhangMS 0:f7f1f0d76dd6 1763 }
XinZhangMS 0:f7f1f0d76dd6 1764 else
XinZhangMS 0:f7f1f0d76dd6 1765 {
XinZhangMS 0:f7f1f0d76dd6 1766 tickcounter_ms_t current_ms;
XinZhangMS 0:f7f1f0d76dd6 1767
XinZhangMS 0:f7f1f0d76dd6 1768 if (tickcounter_get_current_ms(connection->tick_counter, &current_ms) != 0)
XinZhangMS 0:f7f1f0d76dd6 1769 {
XinZhangMS 0:f7f1f0d76dd6 1770 LogError("Could not get tick counter value");
XinZhangMS 0:f7f1f0d76dd6 1771 close_connection_with_error(connection, "amqp:internal-error", "Could not get tick count", NULL);
XinZhangMS 0:f7f1f0d76dd6 1772 }
XinZhangMS 0:f7f1f0d76dd6 1773 else
XinZhangMS 0:f7f1f0d76dd6 1774 {
XinZhangMS 0:f7f1f0d76dd6 1775 if (connection->idle_timeout_specified && (connection->idle_timeout != 0))
XinZhangMS 0:f7f1f0d76dd6 1776 {
XinZhangMS 0:f7f1f0d76dd6 1777 /* Calculate time until configured idle timeout expires */
XinZhangMS 0:f7f1f0d76dd6 1778
XinZhangMS 0:f7f1f0d76dd6 1779 uint64_t time_since_last_received = current_ms - connection->last_frame_received_time;
XinZhangMS 0:f7f1f0d76dd6 1780 if (time_since_last_received < connection->idle_timeout)
XinZhangMS 0:f7f1f0d76dd6 1781 {
XinZhangMS 0:f7f1f0d76dd6 1782 local_deadline = connection->idle_timeout - time_since_last_received;
XinZhangMS 0:f7f1f0d76dd6 1783 }
XinZhangMS 0:f7f1f0d76dd6 1784 else
XinZhangMS 0:f7f1f0d76dd6 1785 {
XinZhangMS 0:f7f1f0d76dd6 1786 local_deadline = 0;
XinZhangMS 0:f7f1f0d76dd6 1787
XinZhangMS 0:f7f1f0d76dd6 1788 /* close connection */
XinZhangMS 0:f7f1f0d76dd6 1789 close_connection_with_error(connection, "amqp:internal-error", "No frame received for the idle timeout", NULL);
XinZhangMS 0:f7f1f0d76dd6 1790 }
XinZhangMS 0:f7f1f0d76dd6 1791 }
XinZhangMS 0:f7f1f0d76dd6 1792
XinZhangMS 0:f7f1f0d76dd6 1793 if (local_deadline != 0 && connection->remote_idle_timeout != 0)
XinZhangMS 0:f7f1f0d76dd6 1794 {
XinZhangMS 0:f7f1f0d76dd6 1795 /* Calculate time until remote idle timeout expires */
XinZhangMS 0:f7f1f0d76dd6 1796
XinZhangMS 0:f7f1f0d76dd6 1797 uint64_t remote_idle_timeout = connection->remote_idle_timeout_send_frame_millisecond;
XinZhangMS 0:f7f1f0d76dd6 1798 uint64_t time_since_last_sent = current_ms - connection->last_frame_sent_time;
XinZhangMS 0:f7f1f0d76dd6 1799
XinZhangMS 0:f7f1f0d76dd6 1800 if (time_since_last_sent < remote_idle_timeout)
XinZhangMS 0:f7f1f0d76dd6 1801 {
XinZhangMS 0:f7f1f0d76dd6 1802 remote_deadline = remote_idle_timeout - time_since_last_sent;
XinZhangMS 0:f7f1f0d76dd6 1803 }
XinZhangMS 0:f7f1f0d76dd6 1804 else
XinZhangMS 0:f7f1f0d76dd6 1805 {
XinZhangMS 0:f7f1f0d76dd6 1806 connection->on_send_complete = NULL;
XinZhangMS 0:f7f1f0d76dd6 1807 if (amqp_frame_codec_encode_empty_frame(connection->amqp_frame_codec, 0, on_bytes_encoded, connection) != 0)
XinZhangMS 0:f7f1f0d76dd6 1808 {
XinZhangMS 0:f7f1f0d76dd6 1809 LogError("Encoding the empty frame failed");
XinZhangMS 0:f7f1f0d76dd6 1810 /* close connection */
XinZhangMS 0:f7f1f0d76dd6 1811 close_connection_with_error(connection, "amqp:internal-error", "Cannot send empty frame", NULL);
XinZhangMS 0:f7f1f0d76dd6 1812 }
XinZhangMS 0:f7f1f0d76dd6 1813 else
XinZhangMS 0:f7f1f0d76dd6 1814 {
XinZhangMS 0:f7f1f0d76dd6 1815 if (connection->is_trace_on == 1)
XinZhangMS 0:f7f1f0d76dd6 1816 {
XinZhangMS 0:f7f1f0d76dd6 1817 LOG(AZ_LOG_TRACE, LOG_LINE, "-> Empty frame");
XinZhangMS 0:f7f1f0d76dd6 1818 }
XinZhangMS 0:f7f1f0d76dd6 1819
XinZhangMS 0:f7f1f0d76dd6 1820 connection->last_frame_sent_time = current_ms;
XinZhangMS 0:f7f1f0d76dd6 1821
XinZhangMS 0:f7f1f0d76dd6 1822 remote_deadline = remote_idle_timeout;
XinZhangMS 0:f7f1f0d76dd6 1823 }
XinZhangMS 0:f7f1f0d76dd6 1824 }
XinZhangMS 0:f7f1f0d76dd6 1825 }
XinZhangMS 0:f7f1f0d76dd6 1826 }
XinZhangMS 0:f7f1f0d76dd6 1827 }
XinZhangMS 0:f7f1f0d76dd6 1828
XinZhangMS 0:f7f1f0d76dd6 1829 /* Return the shorter of each deadline, or 0 to indicate connection closed */
XinZhangMS 0:f7f1f0d76dd6 1830 return local_deadline > remote_deadline ? remote_deadline : local_deadline;
XinZhangMS 0:f7f1f0d76dd6 1831 }
XinZhangMS 0:f7f1f0d76dd6 1832
XinZhangMS 0:f7f1f0d76dd6 1833 void connection_dowork(CONNECTION_HANDLE connection)
XinZhangMS 0:f7f1f0d76dd6 1834 {
XinZhangMS 0:f7f1f0d76dd6 1835 /* Codes_S_R_S_CONNECTION_01_078: [If handle is NULL, connection_dowork shall do nothing.] */
XinZhangMS 0:f7f1f0d76dd6 1836 if (connection == NULL)
XinZhangMS 0:f7f1f0d76dd6 1837 {
XinZhangMS 0:f7f1f0d76dd6 1838 LogError("NULL connection");
XinZhangMS 0:f7f1f0d76dd6 1839 }
XinZhangMS 0:f7f1f0d76dd6 1840 else
XinZhangMS 0:f7f1f0d76dd6 1841 {
XinZhangMS 0:f7f1f0d76dd6 1842 if (connection_handle_deadlines(connection) > 0)
XinZhangMS 0:f7f1f0d76dd6 1843 {
XinZhangMS 0:f7f1f0d76dd6 1844 /* Codes_S_R_S_CONNECTION_01_076: [connection_dowork shall schedule the underlying IO interface to do its work by calling xio_dowork.] */
XinZhangMS 0:f7f1f0d76dd6 1845 xio_dowork(connection->io);
XinZhangMS 0:f7f1f0d76dd6 1846 }
XinZhangMS 0:f7f1f0d76dd6 1847 }
XinZhangMS 0:f7f1f0d76dd6 1848 }
XinZhangMS 0:f7f1f0d76dd6 1849
XinZhangMS 0:f7f1f0d76dd6 1850 ENDPOINT_HANDLE connection_create_endpoint(CONNECTION_HANDLE connection)
XinZhangMS 0:f7f1f0d76dd6 1851 {
XinZhangMS 0:f7f1f0d76dd6 1852 ENDPOINT_HANDLE result;
XinZhangMS 0:f7f1f0d76dd6 1853
XinZhangMS 0:f7f1f0d76dd6 1854 /* Codes_S_R_S_CONNECTION_01_113: [If connection, on_endpoint_frame_received or on_connection_state_changed is NULL, connection_create_endpoint shall fail and return NULL.] */
XinZhangMS 0:f7f1f0d76dd6 1855 /* Codes_S_R_S_CONNECTION_01_193: [The context argument shall be allowed to be NULL.] */
XinZhangMS 0:f7f1f0d76dd6 1856 if (connection == NULL)
XinZhangMS 0:f7f1f0d76dd6 1857 {
XinZhangMS 0:f7f1f0d76dd6 1858 LogError("NULL connection");
XinZhangMS 0:f7f1f0d76dd6 1859 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 1860 }
XinZhangMS 0:f7f1f0d76dd6 1861 else
XinZhangMS 0:f7f1f0d76dd6 1862 {
XinZhangMS 0:f7f1f0d76dd6 1863 /* Codes_S_R_S_CONNECTION_01_115: [If no more endpoints can be created due to all channels being used, connection_create_endpoint shall fail and return NULL.] */
XinZhangMS 0:f7f1f0d76dd6 1864 if (connection->endpoint_count >= connection->channel_max)
XinZhangMS 0:f7f1f0d76dd6 1865 {
XinZhangMS 0:f7f1f0d76dd6 1866 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 1867 }
XinZhangMS 0:f7f1f0d76dd6 1868 else
XinZhangMS 0:f7f1f0d76dd6 1869 {
XinZhangMS 0:f7f1f0d76dd6 1870 uint32_t i = 0;
XinZhangMS 0:f7f1f0d76dd6 1871
XinZhangMS 0:f7f1f0d76dd6 1872 /* Codes_S_R_S_CONNECTION_01_128: [The lowest number outgoing channel shall be associated with the newly created endpoint.] */
XinZhangMS 0:f7f1f0d76dd6 1873 for (i = 0; i < connection->endpoint_count; i++)
XinZhangMS 0:f7f1f0d76dd6 1874 {
XinZhangMS 0:f7f1f0d76dd6 1875 if (connection->endpoints[i]->outgoing_channel > i)
XinZhangMS 0:f7f1f0d76dd6 1876 {
XinZhangMS 0:f7f1f0d76dd6 1877 /* found a gap in the sorted endpoint array */
XinZhangMS 0:f7f1f0d76dd6 1878 break;
XinZhangMS 0:f7f1f0d76dd6 1879 }
XinZhangMS 0:f7f1f0d76dd6 1880 }
XinZhangMS 0:f7f1f0d76dd6 1881
XinZhangMS 0:f7f1f0d76dd6 1882 /* Codes_S_R_S_CONNECTION_01_127: [On success, connection_create_endpoint shall return a non-NULL handle to the newly created endpoint.] */
XinZhangMS 0:f7f1f0d76dd6 1883 result = (ENDPOINT_HANDLE)malloc(sizeof(ENDPOINT_INSTANCE));
XinZhangMS 0:f7f1f0d76dd6 1884 /* Codes_S_R_S_CONNECTION_01_196: [If memory cannot be allocated for the new endpoint, connection_create_endpoint shall fail and return NULL.] */
XinZhangMS 0:f7f1f0d76dd6 1885 if (result == NULL)
XinZhangMS 0:f7f1f0d76dd6 1886 {
XinZhangMS 0:f7f1f0d76dd6 1887 LogError("Cannot allocate memory for endpoint");
XinZhangMS 0:f7f1f0d76dd6 1888 }
XinZhangMS 0:f7f1f0d76dd6 1889 else
XinZhangMS 0:f7f1f0d76dd6 1890 {
XinZhangMS 0:f7f1f0d76dd6 1891 ENDPOINT_HANDLE* new_endpoints;
XinZhangMS 0:f7f1f0d76dd6 1892
XinZhangMS 0:f7f1f0d76dd6 1893 result->on_endpoint_frame_received = NULL;
XinZhangMS 0:f7f1f0d76dd6 1894 result->on_connection_state_changed = NULL;
XinZhangMS 0:f7f1f0d76dd6 1895 result->callback_context = NULL;
XinZhangMS 0:f7f1f0d76dd6 1896 result->outgoing_channel = (uint16_t)i;
XinZhangMS 0:f7f1f0d76dd6 1897 result->connection = connection;
XinZhangMS 0:f7f1f0d76dd6 1898
XinZhangMS 0:f7f1f0d76dd6 1899 /* Codes_S_R_S_CONNECTION_01_197: [The newly created endpoint shall be added to the endpoints list, so that it can be tracked.] */
XinZhangMS 0:f7f1f0d76dd6 1900 new_endpoints = (ENDPOINT_HANDLE*)realloc(connection->endpoints, sizeof(ENDPOINT_HANDLE) * (connection->endpoint_count + 1));
XinZhangMS 0:f7f1f0d76dd6 1901 if (new_endpoints == NULL)
XinZhangMS 0:f7f1f0d76dd6 1902 {
XinZhangMS 0:f7f1f0d76dd6 1903 /* Tests_S_R_S_CONNECTION_01_198: [If adding the endpoint to the endpoints list tracked by the connection fails, connection_create_endpoint shall fail and return NULL.] */
XinZhangMS 0:f7f1f0d76dd6 1904 LogError("Cannot reallocate memory for connection endpoints");
XinZhangMS 0:f7f1f0d76dd6 1905 free(result);
XinZhangMS 0:f7f1f0d76dd6 1906 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 1907 }
XinZhangMS 0:f7f1f0d76dd6 1908 else
XinZhangMS 0:f7f1f0d76dd6 1909 {
XinZhangMS 0:f7f1f0d76dd6 1910 connection->endpoints = new_endpoints;
XinZhangMS 0:f7f1f0d76dd6 1911
XinZhangMS 0:f7f1f0d76dd6 1912 if (i < connection->endpoint_count)
XinZhangMS 0:f7f1f0d76dd6 1913 {
XinZhangMS 0:f7f1f0d76dd6 1914 (void)memmove(&connection->endpoints[i + 1], &connection->endpoints[i], sizeof(ENDPOINT_INSTANCE*) * (connection->endpoint_count - i));
XinZhangMS 0:f7f1f0d76dd6 1915 }
XinZhangMS 0:f7f1f0d76dd6 1916
XinZhangMS 0:f7f1f0d76dd6 1917 connection->endpoints[i] = result;
XinZhangMS 0:f7f1f0d76dd6 1918 connection->endpoint_count++;
XinZhangMS 0:f7f1f0d76dd6 1919
XinZhangMS 0:f7f1f0d76dd6 1920 /* Codes_S_R_S_CONNECTION_01_112: [connection_create_endpoint shall create a new endpoint that can be used by a session.] */
XinZhangMS 0:f7f1f0d76dd6 1921 }
XinZhangMS 0:f7f1f0d76dd6 1922 }
XinZhangMS 0:f7f1f0d76dd6 1923 }
XinZhangMS 0:f7f1f0d76dd6 1924 }
XinZhangMS 0:f7f1f0d76dd6 1925
XinZhangMS 0:f7f1f0d76dd6 1926 return result;
XinZhangMS 0:f7f1f0d76dd6 1927 }
XinZhangMS 0:f7f1f0d76dd6 1928
XinZhangMS 0:f7f1f0d76dd6 1929 int connection_start_endpoint(ENDPOINT_HANDLE endpoint, ON_ENDPOINT_FRAME_RECEIVED on_endpoint_frame_received, ON_CONNECTION_STATE_CHANGED on_connection_state_changed, void* context)
XinZhangMS 0:f7f1f0d76dd6 1930 {
XinZhangMS 0:f7f1f0d76dd6 1931 int result;
XinZhangMS 0:f7f1f0d76dd6 1932
XinZhangMS 0:f7f1f0d76dd6 1933 if ((endpoint == NULL) ||
XinZhangMS 0:f7f1f0d76dd6 1934 (on_endpoint_frame_received == NULL) ||
XinZhangMS 0:f7f1f0d76dd6 1935 (on_connection_state_changed == NULL))
XinZhangMS 0:f7f1f0d76dd6 1936 {
XinZhangMS 0:f7f1f0d76dd6 1937 LogError("Bad arguments: endpoint = %p, on_endpoint_frame_received = %p, on_connection_state_changed = %p",
XinZhangMS 0:f7f1f0d76dd6 1938 endpoint, on_endpoint_frame_received, on_connection_state_changed);
XinZhangMS 0:f7f1f0d76dd6 1939 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1940 }
XinZhangMS 0:f7f1f0d76dd6 1941 else
XinZhangMS 0:f7f1f0d76dd6 1942 {
XinZhangMS 0:f7f1f0d76dd6 1943 endpoint->on_endpoint_frame_received = on_endpoint_frame_received;
XinZhangMS 0:f7f1f0d76dd6 1944 endpoint->on_connection_state_changed = on_connection_state_changed;
XinZhangMS 0:f7f1f0d76dd6 1945 endpoint->callback_context = context;
XinZhangMS 0:f7f1f0d76dd6 1946
XinZhangMS 0:f7f1f0d76dd6 1947 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1948 }
XinZhangMS 0:f7f1f0d76dd6 1949
XinZhangMS 0:f7f1f0d76dd6 1950 return result;
XinZhangMS 0:f7f1f0d76dd6 1951 }
XinZhangMS 0:f7f1f0d76dd6 1952
XinZhangMS 0:f7f1f0d76dd6 1953 int connection_endpoint_get_incoming_channel(ENDPOINT_HANDLE endpoint, uint16_t* incoming_channel)
XinZhangMS 0:f7f1f0d76dd6 1954 {
XinZhangMS 0:f7f1f0d76dd6 1955 int result;
XinZhangMS 0:f7f1f0d76dd6 1956
XinZhangMS 0:f7f1f0d76dd6 1957 if ((endpoint == NULL) ||
XinZhangMS 0:f7f1f0d76dd6 1958 (incoming_channel == NULL))
XinZhangMS 0:f7f1f0d76dd6 1959 {
XinZhangMS 0:f7f1f0d76dd6 1960 LogError("Bad arguments: endpoint = %p, incoming_channel = %p",
XinZhangMS 0:f7f1f0d76dd6 1961 endpoint, incoming_channel);
XinZhangMS 0:f7f1f0d76dd6 1962 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1963 }
XinZhangMS 0:f7f1f0d76dd6 1964 else
XinZhangMS 0:f7f1f0d76dd6 1965 {
XinZhangMS 0:f7f1f0d76dd6 1966 *incoming_channel = endpoint->incoming_channel;
XinZhangMS 0:f7f1f0d76dd6 1967 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1968 }
XinZhangMS 0:f7f1f0d76dd6 1969
XinZhangMS 0:f7f1f0d76dd6 1970 return result;
XinZhangMS 0:f7f1f0d76dd6 1971 }
XinZhangMS 0:f7f1f0d76dd6 1972
XinZhangMS 0:f7f1f0d76dd6 1973 /* Codes_S_R_S_CONNECTION_01_129: [connection_destroy_endpoint shall free all resources associated with an endpoint created by connection_create_endpoint.] */
XinZhangMS 0:f7f1f0d76dd6 1974 void connection_destroy_endpoint(ENDPOINT_HANDLE endpoint)
XinZhangMS 0:f7f1f0d76dd6 1975 {
XinZhangMS 0:f7f1f0d76dd6 1976 if (endpoint == NULL)
XinZhangMS 0:f7f1f0d76dd6 1977 {
XinZhangMS 0:f7f1f0d76dd6 1978 LogError("NULL endpoint");
XinZhangMS 0:f7f1f0d76dd6 1979 }
XinZhangMS 0:f7f1f0d76dd6 1980 else
XinZhangMS 0:f7f1f0d76dd6 1981 {
XinZhangMS 0:f7f1f0d76dd6 1982 CONNECTION_HANDLE connection = (CONNECTION_HANDLE)endpoint->connection;
XinZhangMS 0:f7f1f0d76dd6 1983 size_t i;
XinZhangMS 0:f7f1f0d76dd6 1984
XinZhangMS 0:f7f1f0d76dd6 1985 for (i = 0; i < connection->endpoint_count; i++)
XinZhangMS 0:f7f1f0d76dd6 1986 {
XinZhangMS 0:f7f1f0d76dd6 1987 if (connection->endpoints[i] == endpoint)
XinZhangMS 0:f7f1f0d76dd6 1988 {
XinZhangMS 0:f7f1f0d76dd6 1989 break;
XinZhangMS 0:f7f1f0d76dd6 1990 }
XinZhangMS 0:f7f1f0d76dd6 1991 }
XinZhangMS 0:f7f1f0d76dd6 1992
XinZhangMS 0:f7f1f0d76dd6 1993 /* Codes_S_R_S_CONNECTION_01_130: [The outgoing channel associated with the endpoint shall be released by removing the endpoint from the endpoint list.] */
XinZhangMS 0:f7f1f0d76dd6 1994 /* Codes_S_R_S_CONNECTION_01_131: [Any incoming channel number associated with the endpoint shall be released.] */
XinZhangMS 0:f7f1f0d76dd6 1995 if (i < connection->endpoint_count)
XinZhangMS 0:f7f1f0d76dd6 1996 {
XinZhangMS 0:f7f1f0d76dd6 1997 // endpoint found
XinZhangMS 0:f7f1f0d76dd6 1998 if (connection->endpoint_count == 1)
XinZhangMS 0:f7f1f0d76dd6 1999 {
XinZhangMS 0:f7f1f0d76dd6 2000 free(connection->endpoints);
XinZhangMS 0:f7f1f0d76dd6 2001 connection->endpoints = NULL;
XinZhangMS 0:f7f1f0d76dd6 2002 connection->endpoint_count = 0;
XinZhangMS 0:f7f1f0d76dd6 2003 }
XinZhangMS 0:f7f1f0d76dd6 2004 else
XinZhangMS 0:f7f1f0d76dd6 2005 {
XinZhangMS 0:f7f1f0d76dd6 2006 ENDPOINT_HANDLE* new_endpoints;
XinZhangMS 0:f7f1f0d76dd6 2007
XinZhangMS 0:f7f1f0d76dd6 2008 if ((connection->endpoint_count - i - 1) > 0)
XinZhangMS 0:f7f1f0d76dd6 2009 {
XinZhangMS 0:f7f1f0d76dd6 2010 (void)memmove(connection->endpoints + i, connection->endpoints + i + 1, sizeof(ENDPOINT_HANDLE) * (connection->endpoint_count - i - 1));
XinZhangMS 0:f7f1f0d76dd6 2011 }
XinZhangMS 0:f7f1f0d76dd6 2012
XinZhangMS 0:f7f1f0d76dd6 2013 new_endpoints = (ENDPOINT_HANDLE*)realloc(connection->endpoints, (connection->endpoint_count - 1) * sizeof(ENDPOINT_HANDLE));
XinZhangMS 0:f7f1f0d76dd6 2014 if (new_endpoints != NULL)
XinZhangMS 0:f7f1f0d76dd6 2015 {
XinZhangMS 0:f7f1f0d76dd6 2016 connection->endpoints = new_endpoints;
XinZhangMS 0:f7f1f0d76dd6 2017 }
XinZhangMS 0:f7f1f0d76dd6 2018
XinZhangMS 0:f7f1f0d76dd6 2019 connection->endpoint_count--;
XinZhangMS 0:f7f1f0d76dd6 2020 }
XinZhangMS 0:f7f1f0d76dd6 2021 }
XinZhangMS 0:f7f1f0d76dd6 2022
XinZhangMS 0:f7f1f0d76dd6 2023 free(endpoint);
XinZhangMS 0:f7f1f0d76dd6 2024 }
XinZhangMS 0:f7f1f0d76dd6 2025 }
XinZhangMS 0:f7f1f0d76dd6 2026
XinZhangMS 0:f7f1f0d76dd6 2027 /* Codes_S_R_S_CONNECTION_01_247: [connection_encode_frame shall send a frame for a certain endpoint.] */
XinZhangMS 0:f7f1f0d76dd6 2028 int connection_encode_frame(ENDPOINT_HANDLE endpoint, AMQP_VALUE performative, PAYLOAD* payloads, size_t payload_count, ON_SEND_COMPLETE on_send_complete, void* callback_context)
XinZhangMS 0:f7f1f0d76dd6 2029 {
XinZhangMS 0:f7f1f0d76dd6 2030 int result;
XinZhangMS 0:f7f1f0d76dd6 2031
XinZhangMS 0:f7f1f0d76dd6 2032 /* Codes_S_R_S_CONNECTION_01_249: [If endpoint or performative are NULL, connection_encode_frame shall fail and return a non-zero value.] */
XinZhangMS 0:f7f1f0d76dd6 2033 if ((endpoint == NULL) ||
XinZhangMS 0:f7f1f0d76dd6 2034 (performative == NULL))
XinZhangMS 0:f7f1f0d76dd6 2035 {
XinZhangMS 0:f7f1f0d76dd6 2036 LogError("Bad arguments: endpoint = %p, performative = %p",
XinZhangMS 0:f7f1f0d76dd6 2037 endpoint, performative);
XinZhangMS 0:f7f1f0d76dd6 2038 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 2039 }
XinZhangMS 0:f7f1f0d76dd6 2040 else
XinZhangMS 0:f7f1f0d76dd6 2041 {
XinZhangMS 0:f7f1f0d76dd6 2042 CONNECTION_HANDLE connection = (CONNECTION_HANDLE)endpoint->connection;
XinZhangMS 0:f7f1f0d76dd6 2043 AMQP_FRAME_CODEC_HANDLE amqp_frame_codec = connection->amqp_frame_codec;
XinZhangMS 0:f7f1f0d76dd6 2044
XinZhangMS 0:f7f1f0d76dd6 2045 /* Codes_S_R_S_CONNECTION_01_254: [If connection_encode_frame is called before the connection is in the OPENED state, connection_encode_frame shall fail and return a non-zero value.] */
XinZhangMS 0:f7f1f0d76dd6 2046 if (connection->connection_state != CONNECTION_STATE_OPENED)
XinZhangMS 0:f7f1f0d76dd6 2047 {
XinZhangMS 0:f7f1f0d76dd6 2048 LogError("Connection not open");
XinZhangMS 0:f7f1f0d76dd6 2049 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 2050 }
XinZhangMS 0:f7f1f0d76dd6 2051 else
XinZhangMS 0:f7f1f0d76dd6 2052 {
XinZhangMS 0:f7f1f0d76dd6 2053 /* Codes_S_R_S_CONNECTION_01_255: [The payload size shall be computed based on all the payload chunks passed as argument in payloads.] */
XinZhangMS 0:f7f1f0d76dd6 2054 /* Codes_S_R_S_CONNECTION_01_250: [connection_encode_frame shall initiate the frame send by calling amqp_frame_codec_begin_encode_frame.] */
XinZhangMS 0:f7f1f0d76dd6 2055 /* Codes_S_R_S_CONNECTION_01_251: [The channel number passed to amqp_frame_codec_begin_encode_frame shall be the outgoing channel number associated with the endpoint by connection_create_endpoint.] */
XinZhangMS 0:f7f1f0d76dd6 2056 /* Codes_S_R_S_CONNECTION_01_252: [The performative passed to amqp_frame_codec_begin_encode_frame shall be the performative argument of connection_encode_frame.] */
XinZhangMS 0:f7f1f0d76dd6 2057 connection->on_send_complete = on_send_complete;
XinZhangMS 0:f7f1f0d76dd6 2058 connection->on_send_complete_callback_context = callback_context;
XinZhangMS 0:f7f1f0d76dd6 2059 if (amqp_frame_codec_encode_frame(amqp_frame_codec, endpoint->outgoing_channel, performative, payloads, payload_count, on_bytes_encoded, connection) != 0)
XinZhangMS 0:f7f1f0d76dd6 2060 {
XinZhangMS 0:f7f1f0d76dd6 2061 /* Codes_S_R_S_CONNECTION_01_253: [If amqp_frame_codec_begin_encode_frame or amqp_frame_codec_encode_payload_bytes fails, then connection_encode_frame shall fail and return a non-zero value.] */
XinZhangMS 0:f7f1f0d76dd6 2062 LogError("Encoding AMQP frame failed");
XinZhangMS 0:f7f1f0d76dd6 2063 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 2064 }
XinZhangMS 0:f7f1f0d76dd6 2065 else
XinZhangMS 0:f7f1f0d76dd6 2066 {
XinZhangMS 0:f7f1f0d76dd6 2067 if (connection->is_trace_on == 1)
XinZhangMS 0:f7f1f0d76dd6 2068 {
XinZhangMS 0:f7f1f0d76dd6 2069 log_outgoing_frame(performative);
XinZhangMS 0:f7f1f0d76dd6 2070 }
XinZhangMS 0:f7f1f0d76dd6 2071
XinZhangMS 0:f7f1f0d76dd6 2072 if (tickcounter_get_current_ms(connection->tick_counter, &connection->last_frame_sent_time) != 0)
XinZhangMS 0:f7f1f0d76dd6 2073 {
XinZhangMS 0:f7f1f0d76dd6 2074 LogError("Getting tick counter value failed");
XinZhangMS 0:f7f1f0d76dd6 2075 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 2076 }
XinZhangMS 0:f7f1f0d76dd6 2077 else
XinZhangMS 0:f7f1f0d76dd6 2078 {
XinZhangMS 0:f7f1f0d76dd6 2079 /* Codes_S_R_S_CONNECTION_01_248: [On success it shall return 0.] */
XinZhangMS 0:f7f1f0d76dd6 2080 result = 0;
XinZhangMS 0:f7f1f0d76dd6 2081 }
XinZhangMS 0:f7f1f0d76dd6 2082 }
XinZhangMS 0:f7f1f0d76dd6 2083 }
XinZhangMS 0:f7f1f0d76dd6 2084 }
XinZhangMS 0:f7f1f0d76dd6 2085
XinZhangMS 0:f7f1f0d76dd6 2086 return result;
XinZhangMS 0:f7f1f0d76dd6 2087 }
XinZhangMS 0:f7f1f0d76dd6 2088
XinZhangMS 0:f7f1f0d76dd6 2089 void connection_set_trace(CONNECTION_HANDLE connection, bool trace_on)
XinZhangMS 0:f7f1f0d76dd6 2090 {
XinZhangMS 0:f7f1f0d76dd6 2091 /* Codes_S_R_S_CONNECTION_07_002: [If connection is NULL then connection_set_trace shall do nothing.] */
XinZhangMS 0:f7f1f0d76dd6 2092 if (connection == NULL)
XinZhangMS 0:f7f1f0d76dd6 2093 {
XinZhangMS 0:f7f1f0d76dd6 2094 LogError("NULL connection");
XinZhangMS 0:f7f1f0d76dd6 2095 }
XinZhangMS 0:f7f1f0d76dd6 2096 else
XinZhangMS 0:f7f1f0d76dd6 2097 {
XinZhangMS 0:f7f1f0d76dd6 2098 /* Codes_S_R_S_CONNECTION_07_001: [connection_set_trace shall set the ability to turn on and off trace logging.] */
XinZhangMS 0:f7f1f0d76dd6 2099 connection->is_trace_on = trace_on ? 1 : 0;
XinZhangMS 0:f7f1f0d76dd6 2100 }
XinZhangMS 0:f7f1f0d76dd6 2101 }
XinZhangMS 0:f7f1f0d76dd6 2102
XinZhangMS 0:f7f1f0d76dd6 2103 int connection_set_remote_idle_timeout_empty_frame_send_ratio(CONNECTION_HANDLE connection, double idle_timeout_empty_frame_send_ratio)
XinZhangMS 0:f7f1f0d76dd6 2104 {
XinZhangMS 0:f7f1f0d76dd6 2105 int result;
XinZhangMS 0:f7f1f0d76dd6 2106
XinZhangMS 0:f7f1f0d76dd6 2107 if ((connection == NULL) ||
XinZhangMS 0:f7f1f0d76dd6 2108 (idle_timeout_empty_frame_send_ratio <= 0.0) ||
XinZhangMS 0:f7f1f0d76dd6 2109 (idle_timeout_empty_frame_send_ratio > 1.0))
XinZhangMS 0:f7f1f0d76dd6 2110 {
XinZhangMS 0:f7f1f0d76dd6 2111 LogError("Bad arguments: connection = %p, idle_timeout_empty_frame_send_ratio = %f",
XinZhangMS 0:f7f1f0d76dd6 2112 connection, idle_timeout_empty_frame_send_ratio);
XinZhangMS 0:f7f1f0d76dd6 2113 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 2114 }
XinZhangMS 0:f7f1f0d76dd6 2115 else
XinZhangMS 0:f7f1f0d76dd6 2116 {
XinZhangMS 0:f7f1f0d76dd6 2117 connection->idle_timeout_empty_frame_send_ratio = idle_timeout_empty_frame_send_ratio;
XinZhangMS 0:f7f1f0d76dd6 2118 result = 0;
XinZhangMS 0:f7f1f0d76dd6 2119 }
XinZhangMS 0:f7f1f0d76dd6 2120
XinZhangMS 0:f7f1f0d76dd6 2121 return result;
XinZhangMS 0:f7f1f0d76dd6 2122 }
XinZhangMS 0:f7f1f0d76dd6 2123
XinZhangMS 0:f7f1f0d76dd6 2124 ON_CONNECTION_CLOSED_EVENT_SUBSCRIPTION_HANDLE connection_subscribe_on_connection_close_received(CONNECTION_HANDLE connection, ON_CONNECTION_CLOSE_RECEIVED on_connection_close_received, void* context)
XinZhangMS 0:f7f1f0d76dd6 2125 {
XinZhangMS 0:f7f1f0d76dd6 2126 ON_CONNECTION_CLOSED_EVENT_SUBSCRIPTION_HANDLE result;
XinZhangMS 0:f7f1f0d76dd6 2127
XinZhangMS 0:f7f1f0d76dd6 2128 /* Codes_S_R_S_CONNECTION_01_279: [ `context` shall be allowed to be NULL. ]*/
XinZhangMS 0:f7f1f0d76dd6 2129
XinZhangMS 0:f7f1f0d76dd6 2130 /* Codes_S_R_S_CONNECTION_01_277: [ If `connection` is NULL, `connection_subscribe_on_connection_close_received` shall fail and return NULL. ]*/
XinZhangMS 0:f7f1f0d76dd6 2131 if ((connection == NULL) ||
XinZhangMS 0:f7f1f0d76dd6 2132 /* Codes_S_R_S_CONNECTION_01_278: [ If `on_connection_close_received` is NULL, `connection_subscribe_on_connection_close_received` shall fail and return NULL. ]*/
XinZhangMS 0:f7f1f0d76dd6 2133 (on_connection_close_received == NULL))
XinZhangMS 0:f7f1f0d76dd6 2134 {
XinZhangMS 0:f7f1f0d76dd6 2135 LogError("Invalid arguments: connection = %p, on_connection_close_received = %p, context = %p",
XinZhangMS 0:f7f1f0d76dd6 2136 connection, on_connection_close_received, context);
XinZhangMS 0:f7f1f0d76dd6 2137 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 2138 }
XinZhangMS 0:f7f1f0d76dd6 2139 else
XinZhangMS 0:f7f1f0d76dd6 2140 {
XinZhangMS 0:f7f1f0d76dd6 2141 if (connection->on_connection_close_received_event_subscription.on_connection_close_received != NULL)
XinZhangMS 0:f7f1f0d76dd6 2142 {
XinZhangMS 0:f7f1f0d76dd6 2143 /* Codes_S_R_S_CONNECTION_01_280: [ Only one subscription shall be allowed per connection, if a subsequent second even subscription is done while a subscription is active, `connection_subscribe_on_connection_close_received` shall fail and return NULL. ]*/
XinZhangMS 0:f7f1f0d76dd6 2144 LogError("Already subscribed for on_connection_close_received events");
XinZhangMS 0:f7f1f0d76dd6 2145 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 2146 }
XinZhangMS 0:f7f1f0d76dd6 2147 else
XinZhangMS 0:f7f1f0d76dd6 2148 {
XinZhangMS 0:f7f1f0d76dd6 2149 /* Codes_S_R_S_CONNECTION_01_275: [ `connection_subscribe_on_connection_close_received` shall register the `on_connection_close_received` handler to be triggered whenever a CLOSE performative is received.. ]*/
XinZhangMS 0:f7f1f0d76dd6 2150 connection->on_connection_close_received_event_subscription.on_connection_close_received = on_connection_close_received;
XinZhangMS 0:f7f1f0d76dd6 2151 connection->on_connection_close_received_event_subscription.context = context;
XinZhangMS 0:f7f1f0d76dd6 2152
XinZhangMS 0:f7f1f0d76dd6 2153 /* Codes_S_R_S_CONNECTION_01_276: [ On success, `connection_subscribe_on_connection_close_received` shall return a non-NULL handle to the event subcription. ]*/
XinZhangMS 0:f7f1f0d76dd6 2154 result = &connection->on_connection_close_received_event_subscription;
XinZhangMS 0:f7f1f0d76dd6 2155 }
XinZhangMS 0:f7f1f0d76dd6 2156 }
XinZhangMS 0:f7f1f0d76dd6 2157
XinZhangMS 0:f7f1f0d76dd6 2158 return result;
XinZhangMS 0:f7f1f0d76dd6 2159 }
XinZhangMS 0:f7f1f0d76dd6 2160
XinZhangMS 0:f7f1f0d76dd6 2161 void connection_unsubscribe_on_connection_close_received(ON_CONNECTION_CLOSED_EVENT_SUBSCRIPTION_HANDLE event_subscription)
XinZhangMS 0:f7f1f0d76dd6 2162 {
XinZhangMS 0:f7f1f0d76dd6 2163 if (event_subscription == NULL)
XinZhangMS 0:f7f1f0d76dd6 2164 {
XinZhangMS 0:f7f1f0d76dd6 2165 LogError("NULL event_subscription");
XinZhangMS 0:f7f1f0d76dd6 2166 }
XinZhangMS 0:f7f1f0d76dd6 2167 else
XinZhangMS 0:f7f1f0d76dd6 2168 {
XinZhangMS 0:f7f1f0d76dd6 2169 /* Codes_S_R_S_CONNECTION_01_281: [ `connection_unsubscribe_on_connection_close_received` shall remove the subscription for the connection closed event that was made by calling `connection_subscribe_on_connection_close_received`. ]*/
XinZhangMS 0:f7f1f0d76dd6 2170 event_subscription->on_connection_close_received = NULL;
XinZhangMS 0:f7f1f0d76dd6 2171 event_subscription->context = NULL;
XinZhangMS 0:f7f1f0d76dd6 2172 }
XinZhangMS 0:f7f1f0d76dd6 2173 }