Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependencies: EthernetInterface NTPClient iothub_amqp_transport iothub_client mbed-rtos mbed
Fork of iothub_client_sample_amqp by
link.c
00001 // Copyright (c) Microsoft. All rights reserved. 00002 // Licensed under the MIT license. See LICENSE file in the project root for full license information. 00003 00004 #include <stdlib.h> 00005 #ifdef _CRTDBG_MAP_ALLOC 00006 #include <crtdbg.h> 00007 #endif 00008 #include <string.h> 00009 #include <stdint.h> 00010 #include <stdbool.h> 00011 #include "azure_uamqp_c/link.h" 00012 #include "azure_uamqp_c/session.h" 00013 #include "azure_uamqp_c/amqpvalue.h" 00014 #include "azure_uamqp_c/amqp_definitions.h" 00015 #include "azure_uamqp_c/amqpalloc.h" 00016 #include "azure_uamqp_c/amqp_frame_codec.h" 00017 #include "azure_c_shared_utility/xlogging.h" 00018 #include "azure_c_shared_utility/singlylinkedlist.h" 00019 00020 #define DEFAULT_LINK_CREDIT 10000 00021 00022 typedef struct DELIVERY_INSTANCE_TAG 00023 { 00024 delivery_number delivery_id; 00025 ON_DELIVERY_SETTLED on_delivery_settled; 00026 void* callback_context; 00027 void* link; 00028 } DELIVERY_INSTANCE; 00029 00030 typedef struct LINK_INSTANCE_TAG 00031 { 00032 SESSION_HANDLE session; 00033 LINK_STATE link_state; 00034 LINK_STATE previous_link_state; 00035 AMQP_VALUE source; 00036 AMQP_VALUE target; 00037 handle handle; 00038 LINK_ENDPOINT_HANDLE link_endpoint; 00039 char* name; 00040 SINGLYLINKEDLIST_HANDLE pending_deliveries; 00041 sequence_no delivery_count; 00042 role role; 00043 ON_LINK_STATE_CHANGED on_link_state_changed; 00044 ON_LINK_FLOW_ON on_link_flow_on; 00045 ON_TRANSFER_RECEIVED on_transfer_received; 00046 void* callback_context; 00047 sender_settle_mode snd_settle_mode; 00048 receiver_settle_mode rcv_settle_mode; 00049 sequence_no initial_delivery_count; 00050 uint64_t max_message_size; 00051 uint32_t link_credit; 00052 uint32_t available; 00053 fields attach_properties; 00054 bool is_underlying_session_begun; 00055 bool is_closed; 00056 unsigned char* received_payload; 00057 uint32_t received_payload_size; 00058 delivery_number received_delivery_id; 00059 } LINK_INSTANCE; 00060 00061 static void set_link_state(LINK_INSTANCE* link_instance, LINK_STATE link_state) 00062 { 00063 link_instance->previous_link_state = link_instance->link_state; 00064 link_instance->link_state = link_state; 00065 00066 if (link_instance->on_link_state_changed != NULL) 00067 { 00068 link_instance->on_link_state_changed(link_instance->callback_context, link_state, link_instance->previous_link_state); 00069 } 00070 } 00071 00072 static int send_flow(LINK_INSTANCE* link) 00073 { 00074 int result; 00075 FLOW_HANDLE flow = flow_create(0, 0, 0); 00076 00077 if (flow == NULL) 00078 { 00079 result = __LINE__; 00080 } 00081 else 00082 { 00083 if ((flow_set_link_credit(flow, link->link_credit) != 0) || 00084 (flow_set_handle(flow, link->handle) != 0) || 00085 (flow_set_delivery_count(flow, link->delivery_count) != 0)) 00086 { 00087 result = __LINE__; 00088 } 00089 else 00090 { 00091 if (session_send_flow(link->link_endpoint, flow) != 0) 00092 { 00093 result = __LINE__; 00094 } 00095 else 00096 { 00097 result = 0; 00098 } 00099 } 00100 00101 flow_destroy(flow); 00102 } 00103 00104 return result; 00105 } 00106 00107 static int send_disposition(LINK_INSTANCE* link_instance, delivery_number delivery_number, AMQP_VALUE delivery_state) 00108 { 00109 int result; 00110 00111 DISPOSITION_HANDLE disposition = disposition_create(link_instance->role, delivery_number); 00112 if (disposition == NULL) 00113 { 00114 result = __LINE__; 00115 } 00116 else 00117 { 00118 if ((disposition_set_last(disposition, delivery_number) != 0) || 00119 (disposition_set_settled(disposition, true) != 0) || 00120 ((delivery_state != NULL) && (disposition_set_state(disposition, delivery_state) != 0))) 00121 { 00122 result = __LINE__; 00123 } 00124 else 00125 { 00126 if (session_send_disposition(link_instance->link_endpoint, disposition) != 0) 00127 { 00128 result = __LINE__; 00129 } 00130 else 00131 { 00132 result = 0; 00133 } 00134 } 00135 00136 disposition_destroy(disposition); 00137 } 00138 00139 return result; 00140 } 00141 00142 static int send_detach(LINK_INSTANCE* link_instance, bool close, ERROR_HANDLE error_handle) 00143 { 00144 int result; 00145 DETACH_HANDLE detach_performative; 00146 00147 detach_performative = detach_create(0); 00148 if (detach_performative == NULL) 00149 { 00150 result = __LINE__; 00151 } 00152 else 00153 { 00154 if ((error_handle != NULL) && 00155 (detach_set_error(detach_performative, error_handle) != 0)) 00156 { 00157 result = __LINE__; 00158 } 00159 else if (close && 00160 (detach_set_closed(detach_performative, true) != 0)) 00161 { 00162 result = __LINE__; 00163 } 00164 else 00165 { 00166 if (session_send_detach(link_instance->link_endpoint, detach_performative) != 0) 00167 { 00168 result = __LINE__; 00169 } 00170 else 00171 { 00172 if (close) 00173 { 00174 /* Declare link to be closed */ 00175 link_instance->is_closed = true; 00176 } 00177 00178 result = 0; 00179 } 00180 } 00181 00182 detach_destroy(detach_performative); 00183 } 00184 00185 return result; 00186 } 00187 00188 static int send_attach(LINK_INSTANCE* link, const char* name, handle handle, role role) 00189 { 00190 int result; 00191 ATTACH_HANDLE attach = attach_create(name, handle, role); 00192 00193 if (attach == NULL) 00194 { 00195 result = __LINE__; 00196 } 00197 else 00198 { 00199 result = 0; 00200 00201 link->delivery_count = link->initial_delivery_count; 00202 00203 attach_set_snd_settle_mode(attach, link->snd_settle_mode); 00204 attach_set_rcv_settle_mode(attach, link->rcv_settle_mode); 00205 attach_set_role(attach, role); 00206 attach_set_source(attach, link->source); 00207 attach_set_target(attach, link->target); 00208 attach_set_properties(attach, link->attach_properties); 00209 00210 if (role == role_sender) 00211 { 00212 if (attach_set_initial_delivery_count(attach, link->delivery_count) != 0) 00213 { 00214 result = __LINE__; 00215 } 00216 } 00217 00218 if (result == 0) 00219 { 00220 if ((attach_set_max_message_size(attach, link->max_message_size) != 0) || 00221 (session_send_attach(link->link_endpoint, attach) != 0)) 00222 { 00223 result = __LINE__; 00224 } 00225 else 00226 { 00227 result = 0; 00228 } 00229 } 00230 00231 attach_destroy(attach); 00232 } 00233 00234 return result; 00235 } 00236 00237 static void link_frame_received(void* context, AMQP_VALUE performative, uint32_t payload_size, const unsigned char* payload_bytes) 00238 { 00239 LINK_INSTANCE* link_instance = (LINK_INSTANCE*)context; 00240 AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(performative); 00241 00242 if (is_attach_type_by_descriptor(descriptor)) 00243 { 00244 ATTACH_HANDLE attach_handle; 00245 if (amqpvalue_get_attach(performative, &attach_handle) == 0) 00246 { 00247 if ((link_instance->role == role_receiver) && 00248 (attach_get_initial_delivery_count(attach_handle, &link_instance->delivery_count) != 0)) 00249 { 00250 /* error */ 00251 set_link_state(link_instance, LINK_STATE_DETACHED); 00252 } 00253 else 00254 { 00255 if (link_instance->link_state == LINK_STATE_HALF_ATTACHED) 00256 { 00257 if (link_instance->role == role_receiver) 00258 { 00259 link_instance->link_credit = DEFAULT_LINK_CREDIT; 00260 send_flow(link_instance); 00261 } 00262 else 00263 { 00264 link_instance->link_credit = 0; 00265 } 00266 00267 set_link_state(link_instance, LINK_STATE_ATTACHED); 00268 } 00269 } 00270 00271 attach_destroy(attach_handle); 00272 } 00273 } 00274 else if (is_flow_type_by_descriptor(descriptor)) 00275 { 00276 FLOW_HANDLE flow_handle; 00277 if (amqpvalue_get_flow(performative, &flow_handle) == 0) 00278 { 00279 if (link_instance->role == role_sender) 00280 { 00281 delivery_number rcv_delivery_count; 00282 uint32_t rcv_link_credit; 00283 00284 if ((flow_get_link_credit(flow_handle, &rcv_link_credit) != 0) || 00285 (flow_get_delivery_count(flow_handle, &rcv_delivery_count) != 0)) 00286 { 00287 /* error */ 00288 set_link_state(link_instance, LINK_STATE_DETACHED); 00289 } 00290 else 00291 { 00292 link_instance->link_credit = rcv_delivery_count + rcv_link_credit - link_instance->delivery_count; 00293 if (link_instance->link_credit > 0) 00294 { 00295 link_instance->on_link_flow_on(link_instance->callback_context); 00296 } 00297 } 00298 } 00299 } 00300 00301 flow_destroy(flow_handle); 00302 } 00303 else if (is_transfer_type_by_descriptor(descriptor)) 00304 { 00305 if (link_instance->on_transfer_received != NULL) 00306 { 00307 TRANSFER_HANDLE transfer_handle; 00308 if (amqpvalue_get_transfer(performative, &transfer_handle) == 0) 00309 { 00310 AMQP_VALUE delivery_state; 00311 bool more; 00312 bool is_error; 00313 00314 link_instance->link_credit--; 00315 link_instance->delivery_count++; 00316 if (link_instance->link_credit == 0) 00317 { 00318 link_instance->link_credit = DEFAULT_LINK_CREDIT; 00319 send_flow(link_instance); 00320 } 00321 00322 more = false; 00323 /* Attempt to get more flag, default to false */ 00324 (void)transfer_get_more(transfer_handle, &more); 00325 is_error = false; 00326 00327 if (transfer_get_delivery_id(transfer_handle, &link_instance->received_delivery_id) != 0) 00328 { 00329 /* is this not a continuation transfer? */ 00330 if (link_instance->received_payload_size == 0) 00331 { 00332 LogError("Could not get the delivery Id from the transfer performative"); 00333 is_error = true; 00334 } 00335 } 00336 00337 if (!is_error) 00338 { 00339 /* If this is a continuation transfer or if this is the first chunk of a multi frame transfer */ 00340 if ((link_instance->received_payload_size > 0) || more) 00341 { 00342 unsigned char* new_received_payload = (unsigned char*)realloc(link_instance->received_payload, link_instance->received_payload_size + payload_size); 00343 if (new_received_payload == NULL) 00344 { 00345 LogError("Could not allocate memory for the received payload"); 00346 } 00347 else 00348 { 00349 link_instance->received_payload = new_received_payload; 00350 (void)memcpy(link_instance->received_payload + link_instance->received_payload_size, payload_bytes, payload_size); 00351 link_instance->received_payload_size += payload_size; 00352 } 00353 } 00354 00355 if (!more) 00356 { 00357 const unsigned char* indicate_payload_bytes; 00358 uint32_t indicate_payload_size; 00359 00360 /* if no previously stored chunks then simply report the current payload */ 00361 if (link_instance->received_payload_size > 0) 00362 { 00363 indicate_payload_size = link_instance->received_payload_size; 00364 indicate_payload_bytes = link_instance->received_payload; 00365 } 00366 else 00367 { 00368 indicate_payload_size = payload_size; 00369 indicate_payload_bytes = payload_bytes; 00370 } 00371 00372 delivery_state = link_instance->on_transfer_received(link_instance->callback_context, transfer_handle, indicate_payload_size, indicate_payload_bytes); 00373 00374 if (link_instance->received_payload_size > 0) 00375 { 00376 free(link_instance->received_payload); 00377 link_instance->received_payload = NULL; 00378 link_instance->received_payload_size = 0; 00379 } 00380 00381 if (send_disposition(link_instance, link_instance->received_delivery_id, delivery_state) != 0) 00382 { 00383 LogError("Cannot send disposition frame"); 00384 } 00385 00386 if (delivery_state != NULL) 00387 { 00388 amqpvalue_destroy(delivery_state); 00389 } 00390 } 00391 } 00392 00393 transfer_destroy(transfer_handle); 00394 } 00395 } 00396 } 00397 else if (is_disposition_type_by_descriptor(descriptor)) 00398 { 00399 DISPOSITION_HANDLE disposition; 00400 if (amqpvalue_get_disposition(performative, &disposition) != 0) 00401 { 00402 /* error */ 00403 } 00404 else 00405 { 00406 delivery_number first; 00407 delivery_number last; 00408 00409 if (disposition_get_first(disposition, &first) != 0) 00410 { 00411 /* error */ 00412 } 00413 else 00414 { 00415 bool settled; 00416 00417 if (disposition_get_last(disposition, &last) != 0) 00418 { 00419 last = first; 00420 } 00421 00422 if (disposition_get_settled(disposition, &settled) != 0) 00423 { 00424 /* Error */ 00425 settled = false; 00426 } 00427 00428 if (settled) 00429 { 00430 LIST_ITEM_HANDLE pending_delivery = singlylinkedlist_get_head_item(link_instance->pending_deliveries); 00431 while (pending_delivery != NULL) 00432 { 00433 LIST_ITEM_HANDLE next_pending_delivery = singlylinkedlist_get_next_item(pending_delivery); 00434 DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)singlylinkedlist_item_get_value(pending_delivery); 00435 if (delivery_instance == NULL) 00436 { 00437 /* error */ 00438 break; 00439 } 00440 else 00441 { 00442 if ((delivery_instance->delivery_id >= first) && (delivery_instance->delivery_id <= last)) 00443 { 00444 AMQP_VALUE delivery_state; 00445 if (disposition_get_state(disposition, &delivery_state) != 0) 00446 { 00447 /* error */ 00448 } 00449 else 00450 { 00451 delivery_instance->on_delivery_settled(delivery_instance->callback_context, delivery_instance->delivery_id, delivery_state); 00452 amqpalloc_free(delivery_instance); 00453 if (singlylinkedlist_remove(link_instance->pending_deliveries, pending_delivery) != 0) 00454 { 00455 /* error */ 00456 break; 00457 } 00458 else 00459 { 00460 pending_delivery = next_pending_delivery; 00461 } 00462 } 00463 } 00464 else 00465 { 00466 pending_delivery = next_pending_delivery; 00467 } 00468 } 00469 } 00470 } 00471 } 00472 00473 disposition_destroy(disposition); 00474 } 00475 } 00476 else if (is_detach_type_by_descriptor(descriptor)) 00477 { 00478 DETACH_HANDLE detach; 00479 00480 /* Set link state appropriately based on whether we received detach condition */ 00481 if (amqpvalue_get_detach(performative, &detach) == 0) 00482 { 00483 bool closed = false; 00484 ERROR_HANDLE error; 00485 if (detach_get_error(detach, &error) == 0) 00486 { 00487 error_destroy(error); 00488 00489 set_link_state(link_instance, LINK_STATE_ERROR); 00490 } 00491 else 00492 { 00493 (void)detach_get_closed(detach, &closed); 00494 00495 set_link_state(link_instance, LINK_STATE_DETACHED); 00496 } 00497 00498 /* Received a detach while attached */ 00499 if (link_instance->previous_link_state == LINK_STATE_ATTACHED) 00500 { 00501 /* Respond with ack */ 00502 (void)send_detach(link_instance, closed, NULL); 00503 } 00504 00505 /* Received a closing detach after we sent a non-closing detach. */ 00506 else if (closed && 00507 (link_instance->previous_link_state == LINK_STATE_HALF_ATTACHED) && 00508 !link_instance->is_closed) 00509 { 00510 00511 /* In this case, we MUST signal that we closed by reattaching and then sending a closing detach.*/ 00512 (void)send_attach(link_instance, link_instance->name, 0, link_instance->role); 00513 (void)send_detach(link_instance, true, NULL); 00514 } 00515 00516 detach_destroy(detach); 00517 } 00518 } 00519 } 00520 00521 static void on_session_state_changed(void* context, SESSION_STATE new_session_state, SESSION_STATE previous_session_state) 00522 { 00523 LINK_INSTANCE* link_instance = (LINK_INSTANCE*)context; 00524 (void)previous_session_state; 00525 00526 if (new_session_state == SESSION_STATE_MAPPED) 00527 { 00528 if ((link_instance->link_state == LINK_STATE_DETACHED) && (!link_instance->is_closed)) 00529 { 00530 if (send_attach(link_instance, link_instance->name, 0, link_instance->role) == 0) 00531 { 00532 set_link_state(link_instance, LINK_STATE_HALF_ATTACHED); 00533 } 00534 } 00535 } 00536 else if (new_session_state == SESSION_STATE_DISCARDING) 00537 { 00538 set_link_state(link_instance, LINK_STATE_DETACHED); 00539 } 00540 else if (new_session_state == SESSION_STATE_ERROR) 00541 { 00542 set_link_state(link_instance, LINK_STATE_ERROR); 00543 } 00544 } 00545 00546 static void on_session_flow_on(void* context) 00547 { 00548 LINK_INSTANCE* link_instance = (LINK_INSTANCE*)context; 00549 if (link_instance->role == role_sender) 00550 { 00551 link_instance->on_link_flow_on(link_instance->callback_context); 00552 } 00553 } 00554 00555 static void on_send_complete(void* context, IO_SEND_RESULT send_result) 00556 { 00557 LIST_ITEM_HANDLE delivery_instance_list_item = (LIST_ITEM_HANDLE)context; 00558 DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)singlylinkedlist_item_get_value(delivery_instance_list_item); 00559 LINK_INSTANCE* link_instance = (LINK_INSTANCE*)delivery_instance->link; 00560 (void)send_result; 00561 if (link_instance->snd_settle_mode == sender_settle_mode_settled) 00562 { 00563 delivery_instance->on_delivery_settled(delivery_instance->callback_context, delivery_instance->delivery_id, NULL); 00564 amqpalloc_free(delivery_instance); 00565 (void)singlylinkedlist_remove(link_instance->pending_deliveries, delivery_instance_list_item); 00566 } 00567 } 00568 00569 LINK_HANDLE link_create(SESSION_HANDLE session, const char* name, role role, AMQP_VALUE source, AMQP_VALUE target) 00570 { 00571 LINK_INSTANCE* result = amqpalloc_malloc(sizeof(LINK_INSTANCE)); 00572 if (result != NULL) 00573 { 00574 result->link_state = LINK_STATE_DETACHED; 00575 result->previous_link_state = LINK_STATE_DETACHED; 00576 result->role = role; 00577 result->source = amqpvalue_clone(source); 00578 result->target = amqpvalue_clone(target); 00579 result->session = session; 00580 result->handle = 0; 00581 result->snd_settle_mode = sender_settle_mode_unsettled; 00582 result->rcv_settle_mode = receiver_settle_mode_first; 00583 result->delivery_count = 0; 00584 result->initial_delivery_count = 0; 00585 result->max_message_size = 0; 00586 result->is_underlying_session_begun = false; 00587 result->is_closed = false; 00588 result->attach_properties = NULL; 00589 result->received_payload = NULL; 00590 result->received_payload_size = 0; 00591 result->received_delivery_id = 0; 00592 00593 result->pending_deliveries = singlylinkedlist_create(); 00594 if (result->pending_deliveries == NULL) 00595 { 00596 amqpalloc_free(result); 00597 result = NULL; 00598 } 00599 else 00600 { 00601 result->name = amqpalloc_malloc(strlen(name) + 1); 00602 if (result->name == NULL) 00603 { 00604 singlylinkedlist_destroy(result->pending_deliveries); 00605 amqpalloc_free(result); 00606 result = NULL; 00607 } 00608 else 00609 { 00610 result->on_link_state_changed = NULL; 00611 result->callback_context = NULL; 00612 set_link_state(result, LINK_STATE_DETACHED); 00613 00614 (void)strcpy(result->name, name); 00615 result->link_endpoint = session_create_link_endpoint(session, name); 00616 if (result->link_endpoint == NULL) 00617 { 00618 singlylinkedlist_destroy(result->pending_deliveries); 00619 amqpalloc_free(result->name); 00620 amqpalloc_free(result); 00621 result = NULL; 00622 } 00623 } 00624 } 00625 } 00626 00627 return result; 00628 } 00629 00630 LINK_HANDLE link_create_from_endpoint(SESSION_HANDLE session, LINK_ENDPOINT_HANDLE link_endpoint, const char* name, role role, AMQP_VALUE source, AMQP_VALUE target) 00631 { 00632 LINK_INSTANCE* result = amqpalloc_malloc(sizeof(LINK_INSTANCE)); 00633 if (result != NULL) 00634 { 00635 result->link_state = LINK_STATE_DETACHED; 00636 result->previous_link_state = LINK_STATE_DETACHED; 00637 result->session = session; 00638 result->handle = 0; 00639 result->snd_settle_mode = sender_settle_mode_unsettled; 00640 result->rcv_settle_mode = receiver_settle_mode_first; 00641 result->delivery_count = 0; 00642 result->initial_delivery_count = 0; 00643 result->max_message_size = 0; 00644 result->is_underlying_session_begun = false; 00645 result->is_closed = false; 00646 result->attach_properties = NULL; 00647 result->received_payload = NULL; 00648 result->received_payload_size = 0; 00649 result->received_delivery_id = 0; 00650 result->source = amqpvalue_clone(target); 00651 result->target = amqpvalue_clone(source); 00652 if (role == role_sender) 00653 { 00654 result->role = role_receiver; 00655 } 00656 else 00657 { 00658 result->role = role_sender; 00659 } 00660 00661 result->pending_deliveries = singlylinkedlist_create(); 00662 if (result->pending_deliveries == NULL) 00663 { 00664 amqpalloc_free(result); 00665 result = NULL; 00666 } 00667 else 00668 { 00669 result->name = amqpalloc_malloc(strlen(name) + 1); 00670 if (result->name == NULL) 00671 { 00672 singlylinkedlist_destroy(result->pending_deliveries); 00673 amqpalloc_free(result); 00674 result = NULL; 00675 } 00676 else 00677 { 00678 (void)strcpy(result->name, name); 00679 result->on_link_state_changed = NULL; 00680 result->callback_context = NULL; 00681 result->link_endpoint = link_endpoint; 00682 } 00683 } 00684 } 00685 00686 return result; 00687 } 00688 00689 void link_destroy(LINK_HANDLE link) 00690 { 00691 if (link != NULL) 00692 { 00693 link->on_link_state_changed = NULL; 00694 (void)link_detach(link, true); 00695 session_destroy_link_endpoint(link->link_endpoint); 00696 amqpvalue_destroy(link->source); 00697 amqpvalue_destroy(link->target); 00698 if (link->pending_deliveries != NULL) 00699 { 00700 LIST_ITEM_HANDLE item = singlylinkedlist_get_head_item(link->pending_deliveries); 00701 while (item != NULL) 00702 { 00703 LIST_ITEM_HANDLE next_item = singlylinkedlist_get_next_item(item); 00704 DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)singlylinkedlist_item_get_value(item); 00705 if (delivery_instance != NULL) 00706 { 00707 amqpalloc_free(delivery_instance); 00708 } 00709 00710 item = next_item; 00711 } 00712 00713 singlylinkedlist_destroy(link->pending_deliveries); 00714 } 00715 00716 if (link->name != NULL) 00717 { 00718 amqpalloc_free(link->name); 00719 } 00720 00721 if (link->attach_properties != NULL) 00722 { 00723 amqpvalue_destroy(link->attach_properties); 00724 } 00725 00726 if (link->received_payload != NULL) 00727 { 00728 free(link->received_payload); 00729 } 00730 00731 amqpalloc_free(link); 00732 } 00733 } 00734 00735 int link_set_snd_settle_mode(LINK_HANDLE link, sender_settle_mode snd_settle_mode) 00736 { 00737 int result; 00738 00739 if (link == NULL) 00740 { 00741 result = __LINE__; 00742 } 00743 else 00744 { 00745 link->snd_settle_mode = snd_settle_mode; 00746 result = 0; 00747 } 00748 00749 return result; 00750 } 00751 00752 int link_get_snd_settle_mode(LINK_HANDLE link, sender_settle_mode* snd_settle_mode) 00753 { 00754 int result; 00755 00756 if ((link == NULL) || 00757 (snd_settle_mode == NULL)) 00758 { 00759 result = __LINE__; 00760 } 00761 else 00762 { 00763 *snd_settle_mode = link->snd_settle_mode; 00764 00765 result = 0; 00766 } 00767 00768 return result; 00769 } 00770 00771 int link_set_rcv_settle_mode(LINK_HANDLE link, receiver_settle_mode rcv_settle_mode) 00772 { 00773 int result; 00774 00775 if (link == NULL) 00776 { 00777 result = __LINE__; 00778 } 00779 else 00780 { 00781 link->rcv_settle_mode = rcv_settle_mode; 00782 result = 0; 00783 } 00784 00785 return result; 00786 } 00787 00788 int link_get_rcv_settle_mode(LINK_HANDLE link, receiver_settle_mode* rcv_settle_mode) 00789 { 00790 int result; 00791 00792 if ((link == NULL) || 00793 (rcv_settle_mode == NULL)) 00794 { 00795 result = __LINE__; 00796 } 00797 else 00798 { 00799 *rcv_settle_mode = link->rcv_settle_mode; 00800 result = 0; 00801 } 00802 00803 return result; 00804 } 00805 00806 int link_set_initial_delivery_count(LINK_HANDLE link, sequence_no initial_delivery_count) 00807 { 00808 int result; 00809 00810 if (link == NULL) 00811 { 00812 result = __LINE__; 00813 } 00814 else 00815 { 00816 link->initial_delivery_count = initial_delivery_count; 00817 result = 0; 00818 } 00819 00820 return result; 00821 } 00822 00823 int link_get_initial_delivery_count(LINK_HANDLE link, sequence_no* initial_delivery_count) 00824 { 00825 int result; 00826 00827 if ((link == NULL) || 00828 (initial_delivery_count == NULL)) 00829 { 00830 result = __LINE__; 00831 } 00832 else 00833 { 00834 *initial_delivery_count = link->initial_delivery_count; 00835 result = 0; 00836 } 00837 00838 return result; 00839 } 00840 00841 int link_set_max_message_size(LINK_HANDLE link, uint64_t max_message_size) 00842 { 00843 int result; 00844 00845 if (link == NULL) 00846 { 00847 result = __LINE__; 00848 } 00849 else 00850 { 00851 link->max_message_size = max_message_size; 00852 result = 0; 00853 } 00854 00855 return result; 00856 } 00857 00858 int link_get_max_message_size(LINK_HANDLE link, uint64_t* max_message_size) 00859 { 00860 int result; 00861 00862 if ((link == NULL) || 00863 (max_message_size == NULL)) 00864 { 00865 result = __LINE__; 00866 } 00867 else 00868 { 00869 *max_message_size = link->max_message_size; 00870 result = 0; 00871 } 00872 00873 return result; 00874 } 00875 00876 int link_set_attach_properties(LINK_HANDLE link, fields attach_properties) 00877 { 00878 int result; 00879 00880 if (link == NULL) 00881 { 00882 result = __LINE__; 00883 } 00884 else 00885 { 00886 link->attach_properties = amqpvalue_clone(attach_properties); 00887 if (link->attach_properties == NULL) 00888 { 00889 result = __LINE__; 00890 } 00891 else 00892 { 00893 result = 0; 00894 } 00895 } 00896 00897 return result; 00898 } 00899 00900 int link_attach(LINK_HANDLE link, ON_TRANSFER_RECEIVED on_transfer_received, ON_LINK_STATE_CHANGED on_link_state_changed, ON_LINK_FLOW_ON on_link_flow_on, void* callback_context) 00901 { 00902 int result; 00903 00904 if ((link == NULL) || 00905 (link->is_closed)) 00906 { 00907 result = __LINE__; 00908 } 00909 else 00910 { 00911 if (!link->is_underlying_session_begun) 00912 { 00913 link->on_link_state_changed = on_link_state_changed; 00914 link->on_transfer_received = on_transfer_received; 00915 link->on_link_flow_on = on_link_flow_on; 00916 link->callback_context = callback_context; 00917 00918 if (session_begin(link->session) != 0) 00919 { 00920 result = __LINE__; 00921 } 00922 else 00923 { 00924 link->is_underlying_session_begun = true; 00925 00926 if (session_start_link_endpoint(link->link_endpoint, link_frame_received, on_session_state_changed, on_session_flow_on, link) != 0) 00927 { 00928 result = __LINE__; 00929 } 00930 else 00931 { 00932 link->received_payload_size = 0; 00933 00934 result = 0; 00935 } 00936 } 00937 } 00938 else 00939 { 00940 result = 0; 00941 } 00942 } 00943 00944 return result; 00945 } 00946 00947 int link_detach(LINK_HANDLE link, bool close) 00948 { 00949 int result; 00950 00951 if ((link == NULL) || 00952 (link->is_closed)) 00953 { 00954 result = __LINE__; 00955 } 00956 else 00957 { 00958 switch (link->link_state) 00959 { 00960 00961 case LINK_STATE_HALF_ATTACHED: 00962 /* Sending detach when remote is not yet attached */ 00963 if (send_detach(link, close, NULL) != 0) 00964 { 00965 result = __LINE__; 00966 } 00967 else 00968 { 00969 set_link_state(link, LINK_STATE_DETACHED); 00970 result = 0; 00971 } 00972 break; 00973 00974 case LINK_STATE_ATTACHED: 00975 /* Send detach and wait for remote to respond */ 00976 if (send_detach(link, close, NULL) != 0) 00977 { 00978 result = __LINE__; 00979 } 00980 else 00981 { 00982 set_link_state(link, LINK_STATE_HALF_ATTACHED); 00983 result = 0; 00984 } 00985 break; 00986 00987 case LINK_STATE_DETACHED: 00988 /* Already detached */ 00989 result = 0; 00990 break; 00991 00992 default: 00993 case LINK_STATE_ERROR: 00994 /* Already detached and in error state */ 00995 result = __LINE__; 00996 break; 00997 } 00998 } 00999 01000 return result; 01001 } 01002 01003 LINK_TRANSFER_RESULT link_transfer(LINK_HANDLE link, message_format message_format, PAYLOAD* payloads, size_t payload_count, ON_DELIVERY_SETTLED on_delivery_settled, void* callback_context) 01004 { 01005 LINK_TRANSFER_RESULT result; 01006 01007 if (link == NULL) 01008 { 01009 result = LINK_TRANSFER_ERROR; 01010 } 01011 else 01012 { 01013 if ((link->role != role_sender) || 01014 (link->link_state != LINK_STATE_ATTACHED)) 01015 { 01016 result = LINK_TRANSFER_ERROR; 01017 } 01018 else if (link->link_credit == 0) 01019 { 01020 result = LINK_TRANSFER_BUSY; 01021 } 01022 else 01023 { 01024 TRANSFER_HANDLE transfer = transfer_create(0); 01025 if (transfer == NULL) 01026 { 01027 result = LINK_TRANSFER_ERROR; 01028 } 01029 else 01030 { 01031 sequence_no delivery_count = link->delivery_count + 1; 01032 unsigned char delivery_tag_bytes[sizeof(delivery_count)]; 01033 delivery_tag delivery_tag; 01034 bool settled; 01035 01036 (void)memcpy(delivery_tag_bytes, &delivery_count, sizeof(delivery_count)); 01037 01038 delivery_tag.bytes = &delivery_tag_bytes; 01039 delivery_tag.length = sizeof(delivery_tag_bytes); 01040 01041 if (link->snd_settle_mode == sender_settle_mode_unsettled) 01042 { 01043 settled = false; 01044 } 01045 else 01046 { 01047 settled = true; 01048 } 01049 01050 if ((transfer_set_delivery_tag(transfer, delivery_tag) != 0) || 01051 (transfer_set_message_format(transfer, message_format) != 0) || 01052 (transfer_set_settled(transfer, settled) != 0)) 01053 { 01054 result = LINK_TRANSFER_ERROR; 01055 } 01056 else 01057 { 01058 AMQP_VALUE transfer_value = amqpvalue_create_transfer(transfer); 01059 01060 if (transfer_value == NULL) 01061 { 01062 result = LINK_TRANSFER_ERROR; 01063 } 01064 else 01065 { 01066 DELIVERY_INSTANCE* pending_delivery = amqpalloc_malloc(sizeof(DELIVERY_INSTANCE)); 01067 if (pending_delivery == NULL) 01068 { 01069 result = LINK_TRANSFER_ERROR; 01070 } 01071 else 01072 { 01073 LIST_ITEM_HANDLE delivery_instance_list_item; 01074 pending_delivery->on_delivery_settled = on_delivery_settled; 01075 pending_delivery->callback_context = callback_context; 01076 pending_delivery->link = link; 01077 delivery_instance_list_item = singlylinkedlist_add(link->pending_deliveries, pending_delivery); 01078 01079 if (delivery_instance_list_item == NULL) 01080 { 01081 amqpalloc_free(pending_delivery); 01082 result = LINK_TRANSFER_ERROR; 01083 } 01084 else 01085 { 01086 /* here we should feed data to the transfer frame */ 01087 switch (session_send_transfer(link->link_endpoint, transfer, payloads, payload_count, &pending_delivery->delivery_id, (settled) ? on_send_complete : NULL, delivery_instance_list_item)) 01088 { 01089 default: 01090 case SESSION_SEND_TRANSFER_ERROR: 01091 singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item); 01092 amqpalloc_free(pending_delivery); 01093 result = LINK_TRANSFER_ERROR; 01094 break; 01095 01096 case SESSION_SEND_TRANSFER_BUSY: 01097 /* Ensure we remove from list again since sender will attempt to transfer again on flow on */ 01098 singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item); 01099 amqpalloc_free(pending_delivery); 01100 result = LINK_TRANSFER_BUSY; 01101 break; 01102 01103 case SESSION_SEND_TRANSFER_OK: 01104 link->delivery_count = delivery_count; 01105 link->link_credit--; 01106 result = LINK_TRANSFER_OK; 01107 break; 01108 } 01109 } 01110 } 01111 01112 amqpvalue_destroy(transfer_value); 01113 } 01114 } 01115 01116 transfer_destroy(transfer); 01117 } 01118 } 01119 } 01120 01121 return result; 01122 }
Generated on Tue Jul 12 2022 12:43:19 by
