Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependencies: EthernetInterface NTPClient iothub_amqp_transport iothub_client mbed-rtos mbed
Fork of iothub_client_sample_amqp by
session.c
00001 // Copyright (c) Microsoft. All rights reserved. 00002 // Licensed under the MIT license. See LICENSE file in the project root for full license information. 00003 00004 #include <stdlib.h> 00005 #ifdef _CRTDBG_MAP_ALLOC 00006 #include <crtdbg.h> 00007 #endif 00008 #include <string.h> 00009 #include "azure_uamqp_c/session.h" 00010 #include "azure_uamqp_c/connection.h" 00011 #include "azure_uamqp_c/amqpalloc.h" 00012 #include "azure_c_shared_utility/xlogging.h" 00013 00014 typedef struct LINK_ENDPOINT_INSTANCE_TAG 00015 { 00016 char* name; 00017 handle input_handle; 00018 handle output_handle; 00019 ON_ENDPOINT_FRAME_RECEIVED frame_received_callback; 00020 ON_SESSION_STATE_CHANGED on_session_state_changed; 00021 ON_SESSION_FLOW_ON on_session_flow_on; 00022 void* callback_context; 00023 SESSION_HANDLE session; 00024 } LINK_ENDPOINT_INSTANCE; 00025 00026 typedef struct SESSION_INSTANCE_TAG 00027 { 00028 ON_ENDPOINT_FRAME_RECEIVED frame_received_callback; 00029 void* frame_received_callback_context; 00030 SESSION_STATE session_state; 00031 SESSION_STATE previous_session_state; 00032 CONNECTION_HANDLE connection; 00033 ENDPOINT_HANDLE endpoint; 00034 LINK_ENDPOINT_INSTANCE** link_endpoints; 00035 uint32_t link_endpoint_count; 00036 00037 ON_LINK_ATTACHED on_link_attached; 00038 void* on_link_attached_callback_context; 00039 00040 /* Codes_SRS_SESSION_01_016: [next-outgoing-id The next-outgoing-id is the transfer-id to assign to the next transfer frame.] */ 00041 transfer_number next_outgoing_id; 00042 transfer_number next_incoming_id; 00043 uint32_t desired_incoming_window; 00044 uint32_t incoming_window; 00045 uint32_t outgoing_window; 00046 handle handle_max; 00047 uint32_t remote_incoming_window; 00048 uint32_t remote_outgoing_window; 00049 int is_underlying_connection_open : 1; 00050 } SESSION_INSTANCE; 00051 00052 #define UNDERLYING_CONNECTION_NOT_OPEN 0 00053 #define UNDERLYING_CONNECTION_OPEN -1 00054 00055 static void session_set_state(SESSION_INSTANCE* session_instance, SESSION_STATE session_state) 00056 { 00057 uint64_t i; 00058 00059 session_instance->previous_session_state = session_instance->session_state; 00060 session_instance->session_state = session_state; 00061 00062 for (i = 0; i < session_instance->link_endpoint_count; i++) 00063 { 00064 if (session_instance->link_endpoints[i]->on_session_state_changed != NULL) 00065 { 00066 session_instance->link_endpoints[i]->on_session_state_changed(session_instance->link_endpoints[i]->callback_context, session_state, session_instance->previous_session_state); 00067 } 00068 } 00069 } 00070 00071 static int send_end_frame(SESSION_INSTANCE* session_instance, ERROR_HANDLE error_handle) 00072 { 00073 int result; 00074 END_HANDLE end_performative; 00075 00076 end_performative = end_create(); 00077 if (end_performative == NULL) 00078 { 00079 result = __LINE__; 00080 } 00081 else 00082 { 00083 if ((error_handle != NULL) && 00084 (end_set_error(end_performative, error_handle) != 0)) 00085 { 00086 result = __LINE__; 00087 } 00088 else 00089 { 00090 AMQP_VALUE end_performative_value = amqpvalue_create_end(end_performative); 00091 if (end_performative_value == NULL) 00092 { 00093 result = __LINE__; 00094 } 00095 else 00096 { 00097 if (connection_encode_frame(session_instance->endpoint, end_performative_value, NULL, 0, NULL, NULL) != 0) 00098 { 00099 result = __LINE__; 00100 } 00101 else 00102 { 00103 result = 0; 00104 } 00105 00106 amqpvalue_destroy(end_performative_value); 00107 } 00108 } 00109 00110 end_destroy(end_performative); 00111 } 00112 00113 return result; 00114 } 00115 00116 static void end_session_with_error(SESSION_INSTANCE* session_instance, const char* condition_value, const char* description) 00117 { 00118 ERROR_HANDLE error_handle = error_create(condition_value); 00119 if (error_handle == NULL) 00120 { 00121 /* fatal error */ 00122 session_set_state(session_instance, SESSION_STATE_DISCARDING); 00123 (void)connection_close(session_instance->connection, "amqp:internal-error", "Cannot allocate error handle to end session"); 00124 } 00125 else 00126 { 00127 if ((error_set_description(error_handle, description) != 0) || 00128 (send_end_frame(session_instance, error_handle) != 0)) 00129 { 00130 /* fatal error */ 00131 session_set_state(session_instance, SESSION_STATE_DISCARDING); 00132 (void)connection_close(session_instance->connection, "amqp:internal-error", "Cannot allocate error handle to end session"); 00133 } 00134 else 00135 { 00136 session_set_state(session_instance, SESSION_STATE_DISCARDING); 00137 } 00138 00139 error_destroy(error_handle); 00140 } 00141 } 00142 00143 static int send_begin(SESSION_INSTANCE* session_instance) 00144 { 00145 int result; 00146 BEGIN_HANDLE begin = begin_create(session_instance->next_outgoing_id, session_instance->incoming_window, session_instance->outgoing_window); 00147 00148 if (begin == NULL) 00149 { 00150 result = __LINE__; 00151 } 00152 else 00153 { 00154 uint16_t remote_channel; 00155 if (begin_set_handle_max(begin, session_instance->handle_max) != 0) 00156 { 00157 result = __LINE__; 00158 } 00159 else if ((session_instance->session_state == SESSION_STATE_BEGIN_RCVD) && 00160 ((connection_endpoint_get_incoming_channel(session_instance->endpoint, &remote_channel) != 0) || 00161 (begin_set_remote_channel(begin, remote_channel) != 0))) 00162 { 00163 result = __LINE__; 00164 } 00165 else 00166 { 00167 AMQP_VALUE begin_performative_value = amqpvalue_create_begin(begin); 00168 if (begin_performative_value == NULL) 00169 { 00170 result = __LINE__; 00171 } 00172 else 00173 { 00174 if (connection_encode_frame(session_instance->endpoint, begin_performative_value, NULL, 0, NULL, NULL) != 0) 00175 { 00176 result = __LINE__; 00177 } 00178 else 00179 { 00180 result = 0; 00181 } 00182 00183 amqpvalue_destroy(begin_performative_value); 00184 } 00185 } 00186 00187 begin_destroy(begin); 00188 } 00189 00190 return result; 00191 } 00192 00193 static int send_flow(SESSION_INSTANCE* session) 00194 { 00195 int result; 00196 if (session == NULL) 00197 { 00198 result = __LINE__; 00199 } 00200 else 00201 { 00202 FLOW_HANDLE flow = flow_create(session->incoming_window, session->next_outgoing_id, session->outgoing_window); 00203 00204 if (flow == NULL) 00205 { 00206 result = __LINE__; 00207 } 00208 else 00209 { 00210 if (flow_set_next_incoming_id(flow, session->next_incoming_id) != 0) 00211 { 00212 result = __LINE__; 00213 } 00214 else 00215 { 00216 AMQP_VALUE flow_performative_value = amqpvalue_create_flow(flow); 00217 if (flow_performative_value == NULL) 00218 { 00219 result = __LINE__; 00220 } 00221 else 00222 { 00223 if (connection_encode_frame(session->endpoint, flow_performative_value, NULL, 0, NULL, NULL) != 0) 00224 { 00225 result = __LINE__; 00226 } 00227 else 00228 { 00229 result = 0; 00230 } 00231 00232 amqpvalue_destroy(flow_performative_value); 00233 } 00234 } 00235 00236 flow_destroy(flow); 00237 } 00238 } 00239 00240 return result; 00241 } 00242 00243 static LINK_ENDPOINT_INSTANCE* find_link_endpoint_by_name(SESSION_INSTANCE* session, const char* name) 00244 { 00245 uint32_t i; 00246 LINK_ENDPOINT_INSTANCE* result; 00247 00248 for (i = 0; i < session->link_endpoint_count; i++) 00249 { 00250 if (strcmp(session->link_endpoints[i]->name, name) == 0) 00251 { 00252 break; 00253 } 00254 } 00255 00256 if (i == session->link_endpoint_count) 00257 { 00258 result = NULL; 00259 } 00260 else 00261 { 00262 result = session->link_endpoints[i]; 00263 } 00264 00265 return result; 00266 } 00267 00268 static LINK_ENDPOINT_INSTANCE* find_link_endpoint_by_input_handle(SESSION_INSTANCE* session, handle input_handle) 00269 { 00270 uint32_t i; 00271 LINK_ENDPOINT_INSTANCE* result; 00272 00273 for (i = 0; i < session->link_endpoint_count; i++) 00274 { 00275 if (session->link_endpoints[i]->input_handle == input_handle) 00276 { 00277 break; 00278 } 00279 } 00280 00281 if (i == session->link_endpoint_count) 00282 { 00283 result = NULL; 00284 } 00285 else 00286 { 00287 result = session->link_endpoints[i]; 00288 } 00289 00290 return result; 00291 } 00292 00293 static LINK_ENDPOINT_INSTANCE* find_link_endpoint_by_output_handle(SESSION_INSTANCE* session, handle output_handle) 00294 { 00295 uint32_t i; 00296 LINK_ENDPOINT_INSTANCE* result; 00297 00298 for (i = 0; i < session->link_endpoint_count; i++) 00299 { 00300 if (session->link_endpoints[i]->output_handle == output_handle) 00301 { 00302 break; 00303 } 00304 } 00305 00306 if (i == session->link_endpoint_count) 00307 { 00308 result = NULL; 00309 } 00310 else 00311 { 00312 result = session->link_endpoints[i]; 00313 } 00314 00315 return result; 00316 } 00317 00318 static void on_connection_state_changed(void* context, CONNECTION_STATE new_connection_state, CONNECTION_STATE previous_connection_state) 00319 { 00320 SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)context; 00321 00322 /* Codes_SRS_SESSION_01_060: [If the previous connection state is not OPENED and the new connection state is OPENED, the BEGIN frame shall be sent out and the state shall be switched to BEGIN_SENT.] */ 00323 if ((new_connection_state == CONNECTION_STATE_OPENED) && (previous_connection_state != CONNECTION_STATE_OPENED) && (session_instance->session_state == SESSION_STATE_UNMAPPED)) 00324 { 00325 if (send_begin(session_instance) == 0) 00326 { 00327 session_set_state(session_instance, SESSION_STATE_BEGIN_SENT); 00328 } 00329 } 00330 /* Codes_SRS_SESSION_01_061: [If the previous connection state is OPENED and the new connection state is not OPENED anymore, the state shall be switched to DISCARDING.] */ 00331 else if ((new_connection_state == CONNECTION_STATE_CLOSE_RCVD) || (new_connection_state == CONNECTION_STATE_END)) 00332 { 00333 session_set_state(session_instance, SESSION_STATE_DISCARDING); 00334 } 00335 /* Codes_SRS_SESSION_09_001: [If the new connection state is ERROR, the state shall be switched to ERROR.] */ 00336 else if (new_connection_state == CONNECTION_STATE_ERROR) 00337 { 00338 session_set_state(session_instance, SESSION_STATE_ERROR); 00339 } 00340 } 00341 00342 static void on_frame_received(void* context, AMQP_VALUE performative, uint32_t payload_size, const unsigned char* payload_bytes) 00343 { 00344 SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)context; 00345 AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(performative); 00346 00347 if (is_begin_type_by_descriptor(descriptor)) 00348 { 00349 BEGIN_HANDLE begin_handle; 00350 00351 if (amqpvalue_get_begin(performative, &begin_handle) != 0) 00352 { 00353 connection_close(session_instance->connection, "amqp:decode-error", "Cannot decode BEGIN frame"); 00354 } 00355 else 00356 { 00357 if ((begin_get_incoming_window(begin_handle, &session_instance->remote_incoming_window) != 0) || 00358 (begin_get_next_outgoing_id(begin_handle, &session_instance->next_incoming_id) != 0)) 00359 { 00360 /* error */ 00361 begin_destroy(begin_handle); 00362 session_set_state(session_instance, SESSION_STATE_DISCARDING); 00363 connection_close(session_instance->connection, "amqp:decode-error", "Cannot get incoming windows and next outgoing id"); 00364 } 00365 else 00366 { 00367 begin_destroy(begin_handle); 00368 00369 if (session_instance->session_state == SESSION_STATE_BEGIN_SENT) 00370 { 00371 session_set_state(session_instance, SESSION_STATE_MAPPED); 00372 } 00373 else if(session_instance->session_state == SESSION_STATE_UNMAPPED) 00374 { 00375 session_set_state(session_instance, SESSION_STATE_BEGIN_RCVD); 00376 if (send_begin(session_instance) != 0) 00377 { 00378 connection_close(session_instance->connection, "amqp:internal-error", "Failed sending BEGIN frame"); 00379 session_set_state(session_instance, SESSION_STATE_DISCARDING); 00380 } 00381 else 00382 { 00383 session_set_state(session_instance, SESSION_STATE_MAPPED); 00384 } 00385 } 00386 } 00387 } 00388 } 00389 else if (is_attach_type_by_descriptor(descriptor)) 00390 { 00391 const char* name = NULL; 00392 ATTACH_HANDLE attach_handle; 00393 00394 if (amqpvalue_get_attach(performative, &attach_handle) != 0) 00395 { 00396 end_session_with_error(session_instance, "amqp:decode-error", "Cannot decode ATTACH frame"); 00397 } 00398 else 00399 { 00400 role role; 00401 AMQP_VALUE source; 00402 AMQP_VALUE target; 00403 00404 if ((attach_get_name(attach_handle, &name) != 0) || 00405 (attach_get_role(attach_handle, &role) != 0) || 00406 (attach_get_source(attach_handle, &source) != 0) || 00407 (attach_get_target(attach_handle, &target) != 0)) 00408 { 00409 end_session_with_error(session_instance, "amqp:decode-error", "Cannot get link name from ATTACH frame"); 00410 } 00411 else 00412 { 00413 LINK_ENDPOINT_INSTANCE* link_endpoint = find_link_endpoint_by_name(session_instance, name); 00414 if (link_endpoint == NULL) 00415 { 00416 /* new link attach */ 00417 if (session_instance->on_link_attached != NULL) 00418 { 00419 LINK_ENDPOINT_HANDLE new_link_endpoint = session_create_link_endpoint(session_instance, name); 00420 if (new_link_endpoint == NULL) 00421 { 00422 end_session_with_error(session_instance, "amqp:internal-error", "Cannot create link endpoint"); 00423 } 00424 else if (attach_get_handle(attach_handle, &new_link_endpoint->input_handle) != 0) 00425 { 00426 end_session_with_error(session_instance, "amqp:decode-error", "Cannot get input handle from ATTACH frame"); 00427 } 00428 else 00429 { 00430 if (!session_instance->on_link_attached(session_instance->on_link_attached_callback_context, new_link_endpoint, name, role, source, target)) 00431 { 00432 session_destroy_link_endpoint(new_link_endpoint); 00433 new_link_endpoint = NULL; 00434 } 00435 else 00436 { 00437 if (new_link_endpoint->frame_received_callback != NULL) 00438 { 00439 new_link_endpoint->frame_received_callback(new_link_endpoint->callback_context, performative, payload_size, payload_bytes); 00440 } 00441 } 00442 } 00443 } 00444 } 00445 else 00446 { 00447 if (attach_get_handle(attach_handle, &link_endpoint->input_handle) != 0) 00448 { 00449 end_session_with_error(session_instance, "amqp:decode-error", "Cannot get input handle from ATTACH frame"); 00450 } 00451 else 00452 { 00453 link_endpoint->frame_received_callback(link_endpoint->callback_context, performative, payload_size, payload_bytes); 00454 } 00455 } 00456 } 00457 00458 attach_destroy(attach_handle); 00459 } 00460 } 00461 else if (is_detach_type_by_descriptor(descriptor)) 00462 { 00463 DETACH_HANDLE detach_handle; 00464 00465 if (amqpvalue_get_detach(performative, &detach_handle) != 0) 00466 { 00467 end_session_with_error(session_instance, "amqp:decode-error", "Cannot decode DETACH frame"); 00468 } 00469 else 00470 { 00471 uint32_t remote_handle; 00472 if (detach_get_handle(detach_handle, &remote_handle) != 0) 00473 { 00474 end_session_with_error(session_instance, "amqp:decode-error", "Cannot get handle from DETACH frame"); 00475 00476 detach_destroy(detach_handle); 00477 } 00478 else 00479 { 00480 detach_destroy(detach_handle); 00481 00482 LINK_ENDPOINT_INSTANCE* link_endpoint = find_link_endpoint_by_input_handle(session_instance, remote_handle); 00483 if (link_endpoint == NULL) 00484 { 00485 end_session_with_error(session_instance, "amqp:session:unattached-handle", ""); 00486 } 00487 else 00488 { 00489 link_endpoint->frame_received_callback(link_endpoint->callback_context, performative, payload_size, payload_bytes); 00490 } 00491 } 00492 } 00493 } 00494 else if (is_flow_type_by_descriptor(descriptor)) 00495 { 00496 FLOW_HANDLE flow_handle; 00497 00498 if (amqpvalue_get_flow(performative, &flow_handle) != 0) 00499 { 00500 end_session_with_error(session_instance, "amqp:decode-error", "Cannot decode FLOW frame"); 00501 } 00502 else 00503 { 00504 uint32_t remote_handle; 00505 transfer_number flow_next_incoming_id; 00506 uint32_t flow_incoming_window; 00507 00508 if (flow_get_next_incoming_id(flow_handle, &flow_next_incoming_id) != 0) 00509 { 00510 /* 00511 If the next-incoming-id field of the flow frame is not set, 00512 then remote-incomingwindow is computed as follows: 00513 initial-outgoing-id(endpoint) + incoming-window(flow) - next-outgoing-id(endpoint) 00514 */ 00515 flow_next_incoming_id = session_instance->next_outgoing_id; 00516 } 00517 00518 if ((flow_get_next_outgoing_id(flow_handle, &session_instance->next_incoming_id) != 0) || 00519 (flow_get_incoming_window(flow_handle, &flow_incoming_window) != 0)) 00520 { 00521 flow_destroy(flow_handle); 00522 00523 end_session_with_error(session_instance, "amqp:decode-error", "Cannot decode FLOW frame"); 00524 } 00525 else 00526 { 00527 LINK_ENDPOINT_INSTANCE* link_endpoint_instance = NULL; 00528 00529 session_instance->remote_incoming_window = flow_next_incoming_id + flow_incoming_window - session_instance->next_outgoing_id; 00530 00531 if (flow_get_handle(flow_handle, &remote_handle) == 0) 00532 { 00533 link_endpoint_instance = find_link_endpoint_by_input_handle(session_instance, remote_handle); 00534 } 00535 00536 flow_destroy(flow_handle); 00537 00538 if (link_endpoint_instance != NULL) 00539 { 00540 link_endpoint_instance->frame_received_callback(link_endpoint_instance->callback_context, performative, payload_size, payload_bytes); 00541 } 00542 00543 size_t i = 0; 00544 while ((session_instance->remote_incoming_window > 0) && (i < session_instance->link_endpoint_count)) 00545 { 00546 /* notify the caller that it can send here */ 00547 if (session_instance->link_endpoints[i]->on_session_flow_on != NULL) 00548 { 00549 session_instance->link_endpoints[i]->on_session_flow_on(session_instance->link_endpoints[i]->callback_context); 00550 } 00551 00552 i++; 00553 } 00554 } 00555 } 00556 } 00557 else if (is_transfer_type_by_descriptor(descriptor)) 00558 { 00559 TRANSFER_HANDLE transfer_handle; 00560 00561 if (amqpvalue_get_transfer(performative, &transfer_handle) != 0) 00562 { 00563 end_session_with_error(session_instance, "amqp:decode-error", "Cannot decode TRANSFER frame"); 00564 } 00565 else 00566 { 00567 uint32_t remote_handle; 00568 delivery_number delivery_id; 00569 00570 transfer_get_delivery_id(transfer_handle, &delivery_id); 00571 if (transfer_get_handle(transfer_handle, &remote_handle) != 0) 00572 { 00573 transfer_destroy(transfer_handle); 00574 end_session_with_error(session_instance, "amqp:decode-error", "Cannot get handle from TRANSFER frame"); 00575 } 00576 else 00577 { 00578 transfer_destroy(transfer_handle); 00579 00580 session_instance->next_incoming_id++; 00581 session_instance->remote_outgoing_window--; 00582 session_instance->incoming_window--; 00583 00584 LINK_ENDPOINT_INSTANCE* link_endpoint = find_link_endpoint_by_input_handle(session_instance, remote_handle); 00585 if (link_endpoint == NULL) 00586 { 00587 end_session_with_error(session_instance, "amqp:session:unattached-handle", ""); 00588 } 00589 else 00590 { 00591 link_endpoint->frame_received_callback(link_endpoint->callback_context, performative, payload_size, payload_bytes); 00592 } 00593 00594 if (session_instance->incoming_window == 0) 00595 { 00596 session_instance->incoming_window = session_instance->desired_incoming_window; 00597 send_flow(session_instance); 00598 } 00599 } 00600 } 00601 } 00602 else if (is_disposition_type_by_descriptor(descriptor)) 00603 { 00604 uint32_t i; 00605 00606 for (i = 0; i < session_instance->link_endpoint_count; i++) 00607 { 00608 LINK_ENDPOINT_INSTANCE* link_endpoint = session_instance->link_endpoints[i]; 00609 link_endpoint->frame_received_callback(link_endpoint->callback_context, performative, payload_size, payload_bytes); 00610 } 00611 } 00612 else if (is_end_type_by_descriptor(descriptor)) 00613 { 00614 END_HANDLE end_handle; 00615 00616 if (amqpvalue_get_end(performative, &end_handle) != 0) 00617 { 00618 end_session_with_error(session_instance, "amqp:decode-error", "Cannot decode END frame"); 00619 } 00620 else 00621 { 00622 if ((session_instance->session_state != SESSION_STATE_END_RCVD) && 00623 (session_instance->session_state != SESSION_STATE_DISCARDING)) 00624 { 00625 session_set_state(session_instance, SESSION_STATE_END_RCVD); 00626 if (send_end_frame(session_instance, NULL) != 0) 00627 { 00628 /* fatal error */ 00629 (void)connection_close(session_instance->connection, "amqp:internal-error", "Cannot send END frame."); 00630 } 00631 00632 session_set_state(session_instance, SESSION_STATE_DISCARDING); 00633 } 00634 } 00635 } 00636 } 00637 00638 SESSION_HANDLE session_create(CONNECTION_HANDLE connection, ON_LINK_ATTACHED on_link_attached, void* callback_context) 00639 { 00640 SESSION_INSTANCE* result; 00641 00642 if (connection == NULL) 00643 { 00644 /* Codes_SRS_SESSION_01_031: [If connection is NULL, session_create shall fail and return NULL.] */ 00645 result = NULL; 00646 } 00647 else 00648 { 00649 /* Codes_SRS_SESSION_01_030: [session_create shall create a new session instance and return a non-NULL handle to it.] */ 00650 result = amqpalloc_malloc(sizeof(SESSION_INSTANCE)); 00651 /* Codes_SRS_SESSION_01_042: [If allocating memory for the session fails, session_create shall fail and return NULL.] */ 00652 if (result != NULL) 00653 { 00654 result->connection = connection; 00655 result->link_endpoints = NULL; 00656 result->link_endpoint_count = 0; 00657 result->handle_max = 4294967295u; 00658 00659 /* Codes_SRS_SESSION_01_057: [The delivery ids shall be assigned starting at 0.] */ 00660 /* Codes_SRS_SESSION_01_017: [The nextoutgoing-id MAY be initialized to an arbitrary value ] */ 00661 result->next_outgoing_id = 0; 00662 00663 result->desired_incoming_window = 1; 00664 result->incoming_window = 1; 00665 result->outgoing_window = 1; 00666 result->handle_max = 4294967295u; 00667 result->remote_incoming_window = 0; 00668 result->remote_outgoing_window = 0; 00669 result->previous_session_state = SESSION_STATE_UNMAPPED; 00670 result->is_underlying_connection_open = UNDERLYING_CONNECTION_NOT_OPEN; 00671 result->session_state = SESSION_STATE_UNMAPPED; 00672 result->on_link_attached = on_link_attached; 00673 result->on_link_attached_callback_context = callback_context; 00674 00675 /* Codes_SRS_SESSION_01_032: [session_create shall create a new session endpoint by calling connection_create_endpoint.] */ 00676 result->endpoint = connection_create_endpoint(connection); 00677 if (result->endpoint == NULL) 00678 { 00679 /* Codes_SRS_SESSION_01_033: [If connection_create_endpoint fails, session_create shall fail and return NULL.] */ 00680 amqpalloc_free(result); 00681 result = NULL; 00682 } 00683 else 00684 { 00685 session_set_state(result, SESSION_STATE_UNMAPPED); 00686 } 00687 } 00688 } 00689 00690 return result; 00691 } 00692 00693 SESSION_HANDLE session_create_from_endpoint(CONNECTION_HANDLE connection, ENDPOINT_HANDLE endpoint, ON_LINK_ATTACHED on_link_attached, void* callback_context) 00694 { 00695 SESSION_INSTANCE* result; 00696 00697 if (endpoint == NULL) 00698 { 00699 result = NULL; 00700 } 00701 else 00702 { 00703 result = amqpalloc_malloc(sizeof(SESSION_INSTANCE)); 00704 if (result != NULL) 00705 { 00706 result->connection = connection; 00707 result->link_endpoints = NULL; 00708 result->link_endpoint_count = 0; 00709 result->handle_max = 4294967295u; 00710 00711 result->next_outgoing_id = 0; 00712 00713 result->incoming_window = 1; 00714 result->outgoing_window = 1; 00715 result->handle_max = 4294967295u; 00716 result->remote_incoming_window = 0; 00717 result->remote_outgoing_window = 0; 00718 result->previous_session_state = SESSION_STATE_UNMAPPED; 00719 result->is_underlying_connection_open = UNDERLYING_CONNECTION_NOT_OPEN; 00720 result->session_state = SESSION_STATE_UNMAPPED; 00721 result->on_link_attached = on_link_attached; 00722 result->on_link_attached_callback_context = callback_context; 00723 00724 result->endpoint = endpoint; 00725 session_set_state(result, SESSION_STATE_UNMAPPED); 00726 } 00727 } 00728 00729 return result; 00730 } 00731 00732 void session_destroy(SESSION_HANDLE session) 00733 { 00734 /* Codes_SRS_SESSION_01_036: [If session is NULL, session_destroy shall do nothing.] */ 00735 if (session != NULL) 00736 { 00737 SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)session; 00738 00739 session_end(session, NULL, NULL); 00740 00741 /* Codes_SRS_SESSION_01_034: [session_destroy shall free all resources allocated by session_create.] */ 00742 /* Codes_SRS_SESSION_01_035: [The endpoint created in session_create shall be freed by calling connection_destroy_endpoint.] */ 00743 connection_destroy_endpoint(session_instance->endpoint); 00744 if (session_instance->link_endpoints != NULL) 00745 { 00746 amqpalloc_free(session_instance->link_endpoints); 00747 } 00748 00749 amqpalloc_free(session); 00750 } 00751 } 00752 00753 int session_begin(SESSION_HANDLE session) 00754 { 00755 int result; 00756 00757 if (session == NULL) 00758 { 00759 result = __LINE__; 00760 } 00761 else 00762 { 00763 SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)session; 00764 00765 if (connection_start_endpoint(session_instance->endpoint, on_frame_received, on_connection_state_changed, session_instance) != 0) 00766 { 00767 result = __LINE__; 00768 } 00769 else 00770 { 00771 if (!session_instance->is_underlying_connection_open) 00772 { 00773 if (connection_open(session_instance->connection) != 0) 00774 { 00775 session_instance->is_underlying_connection_open = UNDERLYING_CONNECTION_NOT_OPEN; 00776 result = __LINE__; 00777 } 00778 else 00779 { 00780 session_instance->is_underlying_connection_open = UNDERLYING_CONNECTION_OPEN; 00781 result = 0; 00782 } 00783 } 00784 else 00785 { 00786 result = 0; 00787 } 00788 } 00789 } 00790 00791 return result; 00792 } 00793 00794 int session_end(SESSION_HANDLE session, const char* condition_value, const char* description) 00795 { 00796 int result; 00797 00798 if (session == NULL) 00799 { 00800 result = __LINE__; 00801 } 00802 else 00803 { 00804 SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)session; 00805 00806 if ((session_instance->session_state != SESSION_STATE_UNMAPPED) && 00807 (session_instance->session_state != SESSION_STATE_DISCARDING)) 00808 { 00809 ERROR_HANDLE error_handle = NULL; 00810 result = 0; 00811 00812 if (condition_value != NULL) 00813 { 00814 error_handle = error_create(condition_value); 00815 if (error_handle == NULL) 00816 { 00817 result = __LINE__; 00818 } 00819 else 00820 { 00821 if (error_set_description(error_handle, description) != 0) 00822 { 00823 result = __LINE__; 00824 } 00825 } 00826 } 00827 00828 if (result == 0) 00829 { 00830 if (send_end_frame(session_instance, error_handle) != 0) 00831 { 00832 result = __LINE__; 00833 } 00834 else 00835 { 00836 session_set_state(session_instance, SESSION_STATE_DISCARDING); 00837 result = 0; 00838 } 00839 } 00840 00841 if (error_handle != NULL) 00842 { 00843 error_destroy(error_handle); 00844 } 00845 } 00846 else 00847 { 00848 result = 0; 00849 } 00850 } 00851 00852 return result; 00853 } 00854 00855 int session_set_incoming_window(SESSION_HANDLE session, uint32_t incoming_window) 00856 { 00857 int result; 00858 00859 if (session == NULL) 00860 { 00861 result = __LINE__; 00862 } 00863 else 00864 { 00865 SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)session; 00866 00867 session_instance->desired_incoming_window = incoming_window; 00868 session_instance->incoming_window = incoming_window; 00869 00870 result = 0; 00871 } 00872 00873 return result; 00874 } 00875 00876 int session_get_incoming_window(SESSION_HANDLE session, uint32_t* incoming_window) 00877 { 00878 int result; 00879 00880 if ((session == NULL) || 00881 (incoming_window == NULL)) 00882 { 00883 result = __LINE__; 00884 } 00885 else 00886 { 00887 SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)session; 00888 00889 *incoming_window = session_instance->incoming_window; 00890 00891 result = 0; 00892 } 00893 00894 return result; 00895 } 00896 00897 int session_set_outgoing_window(SESSION_HANDLE session, uint32_t outgoing_window) 00898 { 00899 int result; 00900 00901 if (session == NULL) 00902 { 00903 result = __LINE__; 00904 } 00905 else 00906 { 00907 SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)session; 00908 00909 session_instance->outgoing_window = outgoing_window; 00910 00911 result = 0; 00912 } 00913 00914 return result; 00915 } 00916 00917 int session_get_outgoing_window(SESSION_HANDLE session, uint32_t* outgoing_window) 00918 { 00919 int result; 00920 00921 if ((session == NULL) || 00922 (outgoing_window == NULL)) 00923 { 00924 result = __LINE__; 00925 } 00926 else 00927 { 00928 SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)session; 00929 00930 *outgoing_window = session_instance->outgoing_window; 00931 00932 result = 0; 00933 } 00934 00935 return result; 00936 } 00937 00938 int session_set_handle_max(SESSION_HANDLE session, handle handle_max) 00939 { 00940 int result; 00941 00942 if (session == NULL) 00943 { 00944 result = __LINE__; 00945 } 00946 else 00947 { 00948 SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)session; 00949 00950 session_instance->handle_max = handle_max; 00951 00952 result = 0; 00953 } 00954 00955 return result; 00956 } 00957 00958 int session_get_handle_max(SESSION_HANDLE session, handle* handle_max) 00959 { 00960 int result; 00961 00962 if ((session == NULL) || 00963 (handle_max == NULL)) 00964 { 00965 result = __LINE__; 00966 } 00967 else 00968 { 00969 SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)session; 00970 00971 *handle_max = session_instance->handle_max; 00972 00973 result = 0; 00974 } 00975 00976 return result; 00977 } 00978 00979 LINK_ENDPOINT_HANDLE session_create_link_endpoint(SESSION_HANDLE session, const char* name) 00980 { 00981 LINK_ENDPOINT_INSTANCE* result; 00982 00983 /* Codes_SRS_SESSION_01_044: [If session, name or frame_received_callback is NULL, session_create_link_endpoint shall fail and return NULL.] */ 00984 if ((session == NULL) || 00985 (name == NULL)) 00986 { 00987 result = NULL; 00988 } 00989 else 00990 { 00991 /* Codes_SRS_SESSION_01_043: [session_create_link_endpoint shall create a link endpoint associated with a given session and return a non-NULL handle to it.] */ 00992 SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)session; 00993 00994 result = (LINK_ENDPOINT_INSTANCE*)amqpalloc_malloc(sizeof(LINK_ENDPOINT_INSTANCE)); 00995 /* Codes_SRS_SESSION_01_045: [If allocating memory for the link endpoint fails, session_create_link_endpoint shall fail and return NULL.] */ 00996 if (result != NULL) 00997 { 00998 /* Codes_SRS_SESSION_01_046: [An unused handle shall be assigned to the link endpoint.] */ 00999 handle selected_handle = 0; 01000 size_t i; 01001 01002 for (i = 0; i < session_instance->link_endpoint_count; i++) 01003 { 01004 if (session_instance->link_endpoints[i]->output_handle > selected_handle) 01005 { 01006 break; 01007 } 01008 01009 selected_handle++; 01010 } 01011 01012 result->on_session_state_changed = NULL; 01013 result->on_session_flow_on = NULL; 01014 result->frame_received_callback = NULL; 01015 result->callback_context = NULL; 01016 result->output_handle = selected_handle; 01017 result->input_handle = 0xFFFFFFFF; 01018 result->name = amqpalloc_malloc(strlen(name) + 1); 01019 if (result->name == NULL) 01020 { 01021 /* Codes_SRS_SESSION_01_045: [If allocating memory for the link endpoint fails, session_create_link_endpoint shall fail and return NULL.] */ 01022 amqpalloc_free(result); 01023 result = NULL; 01024 } 01025 else 01026 { 01027 LINK_ENDPOINT_INSTANCE** new_link_endpoints; 01028 strcpy(result->name, name); 01029 result->session = session; 01030 01031 new_link_endpoints = amqpalloc_realloc(session_instance->link_endpoints, sizeof(LINK_ENDPOINT_INSTANCE*) * (session_instance->link_endpoint_count + 1)); 01032 if (new_link_endpoints == NULL) 01033 { 01034 /* Codes_SRS_SESSION_01_045: [If allocating memory for the link endpoint fails, session_create_link_endpoint shall fail and return NULL.] */ 01035 amqpalloc_free(result->name); 01036 amqpalloc_free(result); 01037 result = NULL; 01038 } 01039 else 01040 { 01041 session_instance->link_endpoints = new_link_endpoints; 01042 01043 if (session_instance->link_endpoint_count - selected_handle > 0) 01044 { 01045 (void)memmove(&session_instance->link_endpoints[selected_handle + 1], &session_instance->link_endpoints[selected_handle], (session_instance->link_endpoint_count - selected_handle) * sizeof(LINK_ENDPOINT_INSTANCE*)); 01046 } 01047 01048 session_instance->link_endpoints[selected_handle] = result; 01049 session_instance->link_endpoint_count++; 01050 } 01051 } 01052 } 01053 } 01054 01055 return result; 01056 } 01057 01058 void session_destroy_link_endpoint(LINK_ENDPOINT_HANDLE link_endpoint) 01059 { 01060 /* Codes_SRS_SESSION_01_050: [If link_endpoint is NULL, session_destroy_link_endpoint shall do nothing.] */ 01061 if (link_endpoint != NULL) 01062 { 01063 LINK_ENDPOINT_INSTANCE* endpoint_instance = (LINK_ENDPOINT_INSTANCE*)link_endpoint; 01064 SESSION_INSTANCE* session_instance = endpoint_instance->session; 01065 uint64_t i; 01066 01067 /* Codes_SRS_SESSION_01_049: [session_destroy_link_endpoint shall free all resources associated with the endpoint.] */ 01068 for (i = 0; i < session_instance->link_endpoint_count; i++) 01069 { 01070 if (session_instance->link_endpoints[i] == link_endpoint) 01071 { 01072 break; 01073 } 01074 } 01075 01076 if (i < session_instance->link_endpoint_count) 01077 { 01078 LINK_ENDPOINT_INSTANCE** new_endpoints; 01079 01080 if (i < (session_instance->link_endpoint_count - 1)) 01081 { 01082 (void)memmove(&session_instance->link_endpoints[i], &session_instance->link_endpoints[i + 1], (session_instance->link_endpoint_count - (uint32_t)i - 1) * sizeof(LINK_ENDPOINT_INSTANCE*)); 01083 } 01084 01085 session_instance->link_endpoint_count--; 01086 01087 if (session_instance->link_endpoint_count == 0) 01088 { 01089 amqpalloc_free(session_instance->link_endpoints); 01090 session_instance->link_endpoints = NULL; 01091 } 01092 else 01093 { 01094 new_endpoints = (LINK_ENDPOINT_INSTANCE**)amqpalloc_realloc(session_instance->link_endpoints, sizeof(LINK_ENDPOINT_INSTANCE*) * session_instance->link_endpoint_count); 01095 if (new_endpoints != NULL) 01096 { 01097 session_instance->link_endpoints = new_endpoints; 01098 } 01099 } 01100 } 01101 01102 if (endpoint_instance->name != NULL) 01103 { 01104 amqpalloc_free(endpoint_instance->name); 01105 } 01106 01107 amqpalloc_free(endpoint_instance); 01108 } 01109 } 01110 01111 int session_start_link_endpoint(LINK_ENDPOINT_HANDLE link_endpoint, ON_ENDPOINT_FRAME_RECEIVED frame_received_callback, ON_SESSION_STATE_CHANGED on_session_state_changed, ON_SESSION_FLOW_ON on_session_flow_on, void* context) 01112 { 01113 int result; 01114 01115 if ((link_endpoint == NULL) || 01116 (frame_received_callback == NULL)) 01117 { 01118 result = __LINE__; 01119 } 01120 else 01121 { 01122 link_endpoint->frame_received_callback = frame_received_callback; 01123 link_endpoint->on_session_state_changed = on_session_state_changed; 01124 link_endpoint->on_session_flow_on = on_session_flow_on; 01125 link_endpoint->callback_context = context; 01126 01127 if (link_endpoint->on_session_state_changed != NULL) 01128 { 01129 link_endpoint->on_session_state_changed(link_endpoint->callback_context, link_endpoint->session->session_state, link_endpoint->session->previous_session_state); 01130 } 01131 01132 result = 0; 01133 } 01134 01135 return result; 01136 } 01137 01138 static int encode_frame(LINK_ENDPOINT_HANDLE link_endpoint, const AMQP_VALUE performative, PAYLOAD* payloads, size_t payload_count) 01139 { 01140 int result; 01141 01142 if ((link_endpoint == NULL) || 01143 (performative == NULL)) 01144 { 01145 result = __LINE__; 01146 } 01147 else 01148 { 01149 LINK_ENDPOINT_INSTANCE* link_endpoint_instance = (LINK_ENDPOINT_INSTANCE*)link_endpoint; 01150 SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)link_endpoint_instance->session; 01151 01152 if (connection_encode_frame(session_instance->endpoint, performative, payloads, payload_count, NULL, NULL) != 0) 01153 { 01154 result = __LINE__; 01155 } 01156 else 01157 { 01158 result = 0; 01159 } 01160 } 01161 01162 return result; 01163 } 01164 01165 int session_send_flow(LINK_ENDPOINT_HANDLE link_endpoint, FLOW_HANDLE flow) 01166 { 01167 int result; 01168 01169 if ((link_endpoint == NULL) || 01170 (flow == NULL)) 01171 { 01172 result = __LINE__; 01173 } 01174 else 01175 { 01176 LINK_ENDPOINT_INSTANCE* link_endpoint_instance = (LINK_ENDPOINT_INSTANCE*)link_endpoint; 01177 SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)link_endpoint_instance->session; 01178 01179 result = 0; 01180 01181 if (session_instance->session_state == SESSION_STATE_BEGIN_RCVD) 01182 { 01183 if (flow_set_next_incoming_id(flow, session_instance->next_incoming_id) != 0) 01184 { 01185 result = __LINE__; 01186 } 01187 } 01188 01189 if (result == 0) 01190 { 01191 if ((flow_set_next_incoming_id(flow, session_instance->next_incoming_id) != 0) || 01192 (flow_set_incoming_window(flow, session_instance->incoming_window) != 0) || 01193 (flow_set_next_outgoing_id(flow, session_instance->next_outgoing_id) != 0) || 01194 (flow_set_outgoing_window(flow, session_instance->outgoing_window) != 0) || 01195 (flow_set_handle(flow, link_endpoint_instance->output_handle) != 0)) 01196 { 01197 result = __LINE__; 01198 } 01199 else 01200 { 01201 AMQP_VALUE flow_performative_value = amqpvalue_create_flow(flow); 01202 if (flow_performative_value == NULL) 01203 { 01204 result = __LINE__; 01205 } 01206 else 01207 { 01208 if (encode_frame(link_endpoint, flow_performative_value, NULL, 0) != 0) 01209 { 01210 result = __LINE__; 01211 } 01212 else 01213 { 01214 result = 0; 01215 } 01216 01217 amqpvalue_destroy(flow_performative_value); 01218 } 01219 } 01220 } 01221 } 01222 01223 return result; 01224 } 01225 01226 int session_send_attach(LINK_ENDPOINT_HANDLE link_endpoint, ATTACH_HANDLE attach) 01227 { 01228 int result; 01229 01230 if ((link_endpoint == NULL) || 01231 (attach == NULL)) 01232 { 01233 result = __LINE__; 01234 } 01235 else 01236 { 01237 LINK_ENDPOINT_INSTANCE* link_endpoint_instance = (LINK_ENDPOINT_INSTANCE*)link_endpoint; 01238 01239 if (attach_set_handle(attach, link_endpoint_instance->output_handle) != 0) 01240 { 01241 result = __LINE__; 01242 } 01243 else 01244 { 01245 AMQP_VALUE attach_performative_value = amqpvalue_create_attach(attach); 01246 if (attach_performative_value == NULL) 01247 { 01248 result = __LINE__; 01249 } 01250 else 01251 { 01252 if (encode_frame(link_endpoint, attach_performative_value, NULL, 0) != 0) 01253 { 01254 result = __LINE__; 01255 } 01256 else 01257 { 01258 result = 0; 01259 } 01260 01261 amqpvalue_destroy(attach_performative_value); 01262 } 01263 } 01264 } 01265 01266 return result; 01267 } 01268 01269 int session_send_disposition(LINK_ENDPOINT_HANDLE link_endpoint, DISPOSITION_HANDLE disposition) 01270 { 01271 int result; 01272 01273 if ((link_endpoint == NULL) || 01274 (disposition == NULL)) 01275 { 01276 result = __LINE__; 01277 } 01278 else 01279 { 01280 AMQP_VALUE disposition_performative_value = amqpvalue_create_disposition(disposition); 01281 if (disposition_performative_value == NULL) 01282 { 01283 result = __LINE__; 01284 } 01285 else 01286 { 01287 if (encode_frame(link_endpoint, disposition_performative_value, NULL, 0) != 0) 01288 { 01289 result = __LINE__; 01290 } 01291 else 01292 { 01293 result = 0; 01294 } 01295 01296 amqpvalue_destroy(disposition_performative_value); 01297 } 01298 } 01299 01300 return result; 01301 } 01302 01303 int session_send_detach(LINK_ENDPOINT_HANDLE link_endpoint, DETACH_HANDLE detach) 01304 { 01305 int result; 01306 01307 if ((link_endpoint == NULL) || 01308 (detach == NULL)) 01309 { 01310 result = __LINE__; 01311 } 01312 else 01313 { 01314 LINK_ENDPOINT_INSTANCE* link_endpoint_instance = (LINK_ENDPOINT_INSTANCE*)link_endpoint; 01315 01316 if (detach_set_handle(detach, link_endpoint_instance->output_handle) != 0) 01317 { 01318 result = __LINE__; 01319 } 01320 else 01321 { 01322 AMQP_VALUE detach_performative_value = amqpvalue_create_detach(detach); 01323 if (detach_performative_value == NULL) 01324 { 01325 result = __LINE__; 01326 } 01327 else 01328 { 01329 if (encode_frame(link_endpoint, detach_performative_value, NULL, 0) != 0) 01330 { 01331 result = __LINE__; 01332 } 01333 else 01334 { 01335 result = 0; 01336 } 01337 01338 amqpvalue_destroy(detach_performative_value); 01339 } 01340 } 01341 } 01342 01343 return result; 01344 } 01345 01346 /* Codes_SRS_SESSION_01_051: [session_send_transfer shall send a transfer frame with the performative indicated in the transfer argument.] */ 01347 SESSION_SEND_TRANSFER_RESULT session_send_transfer(LINK_ENDPOINT_HANDLE link_endpoint, TRANSFER_HANDLE transfer, PAYLOAD* payloads, size_t payload_count, delivery_number* delivery_id, ON_SEND_COMPLETE on_send_complete, void* callback_context) 01348 { 01349 SESSION_SEND_TRANSFER_RESULT result; 01350 01351 /* Codes_SRS_SESSION_01_054: [If link_endpoint or transfer is NULL, session_send_transfer shall fail and return a non-zero value.] */ 01352 if ((link_endpoint == NULL) || 01353 (transfer == NULL)) 01354 { 01355 result = SESSION_SEND_TRANSFER_ERROR; 01356 } 01357 else 01358 { 01359 LINK_ENDPOINT_INSTANCE* link_endpoint_instance = (LINK_ENDPOINT_INSTANCE*)link_endpoint; 01360 SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)link_endpoint_instance->session; 01361 01362 /* Codes_SRS_SESSION_01_059: [When session_send_transfer is called while the session is not in the MAPPED state, session_send_transfer shall fail and return a non-zero value.] */ 01363 if (session_instance->session_state != SESSION_STATE_MAPPED) 01364 { 01365 result = SESSION_SEND_TRANSFER_ERROR; 01366 } 01367 else 01368 { 01369 size_t payload_size = 0; 01370 size_t i; 01371 01372 for (i = 0; i < payload_count; i++) 01373 { 01374 if ((payloads[i].length > UINT32_MAX) || 01375 (payload_size + payloads[i].length < payload_size)) 01376 { 01377 break; 01378 } 01379 01380 payload_size += payloads[i].length; 01381 } 01382 01383 if ((i < payload_count) || 01384 (payload_size > UINT32_MAX)) 01385 { 01386 result = SESSION_SEND_TRANSFER_ERROR; 01387 } 01388 else 01389 { 01390 if (session_instance->remote_incoming_window == 0) 01391 { 01392 result = SESSION_SEND_TRANSFER_BUSY; 01393 } 01394 else 01395 { 01396 /* Codes_SRS_SESSION_01_012: [The session endpoint assigns each outgoing transfer frame an implicit transfer-id from a session scoped sequence.] */ 01397 /* Codes_SRS_SESSION_01_027: [sending a transfer Upon sending a transfer, the sending endpoint will increment its next-outgoing-id] */ 01398 *delivery_id = session_instance->next_outgoing_id; 01399 if ((transfer_set_handle(transfer, link_endpoint_instance->output_handle) != 0) || 01400 (transfer_set_delivery_id(transfer, *delivery_id) != 0) || 01401 (transfer_set_more(transfer, false) != 0)) 01402 { 01403 /* Codes_SRS_SESSION_01_058: [When any other error occurs, session_send_transfer shall fail and return a non-zero value.] */ 01404 result = SESSION_SEND_TRANSFER_ERROR; 01405 } 01406 else 01407 { 01408 AMQP_VALUE transfer_value; 01409 01410 transfer_value = amqpvalue_create_transfer(transfer); 01411 if (transfer_value == NULL) 01412 { 01413 /* Codes_SRS_SESSION_01_058: [When any other error occurs, session_send_transfer shall fail and return a non-zero value.] */ 01414 result = SESSION_SEND_TRANSFER_ERROR; 01415 } 01416 else 01417 { 01418 uint32_t available_frame_size; 01419 size_t encoded_size; 01420 01421 if ((connection_get_remote_max_frame_size(session_instance->connection, &available_frame_size) != 0) || 01422 (amqpvalue_get_encoded_size(transfer_value, &encoded_size) != 0)) 01423 { 01424 result = SESSION_SEND_TRANSFER_ERROR; 01425 } 01426 else 01427 { 01428 payload_size = 0; 01429 01430 for (i = 0; i < payload_count; i++) 01431 { 01432 payload_size += payloads[i].length; 01433 } 01434 01435 available_frame_size -= (uint32_t)encoded_size; 01436 available_frame_size -= 8; 01437 01438 if (available_frame_size >= payload_size) 01439 { 01440 /* Codes_SRS_SESSION_01_055: [The encoding of the frame shall be done by calling connection_encode_frame and passing as arguments: the connection handle associated with the session, the transfer performative and the payload chunks passed to session_send_transfer.] */ 01441 if (connection_encode_frame(session_instance->endpoint, transfer_value, payloads, payload_count, on_send_complete, callback_context) != 0) 01442 { 01443 /* Codes_SRS_SESSION_01_056: [If connection_encode_frame fails then session_send_transfer shall fail and return a non-zero value.] */ 01444 result = SESSION_SEND_TRANSFER_ERROR; 01445 } 01446 else 01447 { 01448 /* Codes_SRS_SESSION_01_018: [is incremented after each successive transfer according to RFC-1982 [RFC1982] serial number arithmetic.] */ 01449 session_instance->next_outgoing_id++; 01450 session_instance->remote_incoming_window--; 01451 session_instance->outgoing_window--; 01452 01453 /* Codes_SRS_SESSION_01_053: [On success, session_send_transfer shall return 0.] */ 01454 result = SESSION_SEND_TRANSFER_OK; 01455 } 01456 } 01457 else 01458 { 01459 size_t current_payload_index = 0; 01460 uint32_t current_payload_pos = 0; 01461 01462 /* break it down into different deliveries */ 01463 while (payload_size > 0) 01464 { 01465 uint32_t transfer_frame_payload_count = 0; 01466 uint32_t current_transfer_frame_payload_size = (uint32_t)payload_size; 01467 uint32_t byte_counter; 01468 size_t temp_current_payload_index = current_payload_index; 01469 uint32_t temp_current_payload_pos = current_payload_pos; 01470 AMQP_VALUE multi_transfer_amqp_value; 01471 bool more; 01472 01473 if (current_transfer_frame_payload_size > available_frame_size) 01474 { 01475 current_transfer_frame_payload_size = available_frame_size; 01476 } 01477 01478 if (available_frame_size >= payload_size) 01479 { 01480 more = false; 01481 } 01482 else 01483 { 01484 more = true; 01485 } 01486 01487 if (transfer_set_more(transfer, more) != 0) 01488 { 01489 break; 01490 } 01491 01492 multi_transfer_amqp_value = amqpvalue_create_transfer(transfer); 01493 if (multi_transfer_amqp_value == NULL) 01494 { 01495 break; 01496 } 01497 01498 byte_counter = current_transfer_frame_payload_size; 01499 while (byte_counter > 0) 01500 { 01501 if (payloads[temp_current_payload_index].length - temp_current_payload_pos >= byte_counter) 01502 { 01503 /* more data than we need */ 01504 temp_current_payload_pos += byte_counter; 01505 byte_counter = 0; 01506 } 01507 else 01508 { 01509 byte_counter -= (uint32_t)payloads[temp_current_payload_index].length - temp_current_payload_pos; 01510 temp_current_payload_index++; 01511 temp_current_payload_pos = 0; 01512 } 01513 } 01514 01515 transfer_frame_payload_count = (uint32_t)(temp_current_payload_index - current_payload_index + 1); 01516 PAYLOAD* transfer_frame_payloads = (PAYLOAD*)amqpalloc_malloc(transfer_frame_payload_count * sizeof(PAYLOAD)); 01517 if (transfer_frame_payloads == NULL) 01518 { 01519 amqpvalue_destroy(multi_transfer_amqp_value); 01520 break; 01521 } 01522 01523 /* copy data */ 01524 byte_counter = current_transfer_frame_payload_size; 01525 transfer_frame_payload_count = 0; 01526 01527 while (byte_counter > 0) 01528 { 01529 if (payloads[current_payload_index].length - current_payload_pos > byte_counter) 01530 { 01531 /* more data than we need */ 01532 transfer_frame_payloads[transfer_frame_payload_count].bytes = payloads[current_payload_index].bytes + current_payload_pos; 01533 transfer_frame_payloads[transfer_frame_payload_count].length = byte_counter; 01534 current_payload_pos += byte_counter; 01535 byte_counter = 0; 01536 } 01537 else 01538 { 01539 /* copy entire payload and move to the next */ 01540 transfer_frame_payloads[transfer_frame_payload_count].bytes = payloads[current_payload_index].bytes + current_payload_pos; 01541 transfer_frame_payloads[transfer_frame_payload_count].length = payloads[current_payload_index].length - current_payload_pos; 01542 byte_counter -= (uint32_t)payloads[current_payload_index].length - current_payload_pos; 01543 current_payload_index++; 01544 current_payload_pos = 0; 01545 } 01546 01547 transfer_frame_payload_count++; 01548 } 01549 01550 if (connection_encode_frame(session_instance->endpoint, multi_transfer_amqp_value, transfer_frame_payloads, transfer_frame_payload_count, on_send_complete, callback_context) != 0) 01551 { 01552 amqpalloc_free(transfer_frame_payloads); 01553 amqpvalue_destroy(multi_transfer_amqp_value); 01554 break; 01555 } 01556 01557 amqpalloc_free(transfer_frame_payloads); 01558 amqpvalue_destroy(multi_transfer_amqp_value); 01559 payload_size -= current_transfer_frame_payload_size; 01560 } 01561 01562 if (payload_size > 0) 01563 { 01564 result = SESSION_SEND_TRANSFER_ERROR; 01565 } 01566 else 01567 { 01568 /* Codes_SRS_SESSION_01_018: [is incremented after each successive transfer according to RFC-1982 [RFC1982] serial number arithmetic.] */ 01569 session_instance->next_outgoing_id++; 01570 session_instance->remote_incoming_window--; 01571 session_instance->outgoing_window--; 01572 01573 result = SESSION_SEND_TRANSFER_OK; 01574 } 01575 } 01576 } 01577 01578 amqpvalue_destroy(transfer_value); 01579 } 01580 } 01581 } 01582 } 01583 } 01584 } 01585 01586 return result; 01587 }
Generated on Tue Jul 12 2022 12:43:22 by
