Mark Radbourne / Mbed 2 deprecated iothub_client_sample_amqp

Dependencies:   EthernetInterface NTPClient iothub_amqp_transport iothub_client mbed-rtos mbed

Fork of iothub_client_sample_amqp by Azure IoT

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers link.c Source File

link.c

00001 // Copyright (c) Microsoft. All rights reserved.
00002 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
00003 
00004 #include <stdlib.h>
00005 #ifdef _CRTDBG_MAP_ALLOC
00006 #include <crtdbg.h>
00007 #endif
00008 #include <string.h>
00009 #include <stdint.h>
00010 #include <stdbool.h>
00011 #include "azure_uamqp_c/link.h"
00012 #include "azure_uamqp_c/session.h"
00013 #include "azure_uamqp_c/amqpvalue.h"
00014 #include "azure_uamqp_c/amqp_definitions.h"
00015 #include "azure_uamqp_c/amqpalloc.h"
00016 #include "azure_uamqp_c/amqp_frame_codec.h"
00017 #include "azure_c_shared_utility/xlogging.h"
00018 #include "azure_c_shared_utility/singlylinkedlist.h"
00019 
00020 #define DEFAULT_LINK_CREDIT 10000
00021 
00022 typedef struct DELIVERY_INSTANCE_TAG
00023 {
00024     delivery_number delivery_id;
00025     ON_DELIVERY_SETTLED on_delivery_settled;
00026     void* callback_context;
00027     void* link;
00028 } DELIVERY_INSTANCE;
00029 
00030 typedef struct LINK_INSTANCE_TAG
00031 {
00032     SESSION_HANDLE session;
00033     LINK_STATE link_state;
00034     LINK_STATE previous_link_state;
00035     AMQP_VALUE source;
00036     AMQP_VALUE target;
00037     handle handle;
00038     LINK_ENDPOINT_HANDLE link_endpoint;
00039     char* name;
00040     SINGLYLINKEDLIST_HANDLE pending_deliveries;
00041     sequence_no delivery_count;
00042     role role;
00043     ON_LINK_STATE_CHANGED on_link_state_changed;
00044     ON_LINK_FLOW_ON on_link_flow_on;
00045     ON_TRANSFER_RECEIVED on_transfer_received;
00046     void* callback_context;
00047     sender_settle_mode snd_settle_mode;
00048     receiver_settle_mode rcv_settle_mode;
00049     sequence_no initial_delivery_count;
00050     uint64_t max_message_size;
00051     uint32_t link_credit;
00052     uint32_t available;
00053     fields attach_properties;
00054     bool is_underlying_session_begun;
00055     bool is_closed;
00056     unsigned char* received_payload;
00057     uint32_t received_payload_size;
00058     delivery_number received_delivery_id;
00059 } LINK_INSTANCE;
00060 
00061 static void set_link_state(LINK_INSTANCE* link_instance, LINK_STATE link_state)
00062 {
00063     link_instance->previous_link_state = link_instance->link_state;
00064     link_instance->link_state = link_state;
00065 
00066     if (link_instance->on_link_state_changed != NULL)
00067     {
00068         link_instance->on_link_state_changed(link_instance->callback_context, link_state, link_instance->previous_link_state);
00069     }
00070 }
00071 
00072 static int send_flow(LINK_INSTANCE* link)
00073 {
00074     int result;
00075     FLOW_HANDLE flow = flow_create(0, 0, 0);
00076 
00077     if (flow == NULL)
00078     {
00079         result = __LINE__;
00080     }
00081     else
00082     {
00083         if ((flow_set_link_credit(flow, link->link_credit) != 0) ||
00084             (flow_set_handle(flow, link->handle) != 0) ||
00085             (flow_set_delivery_count(flow, link->delivery_count) != 0))
00086         {
00087             result = __LINE__;
00088         }
00089         else
00090         {
00091             if (session_send_flow(link->link_endpoint, flow) != 0)
00092             {
00093                 result = __LINE__;
00094             }
00095             else
00096             {
00097                 result = 0;
00098             }
00099         }
00100 
00101         flow_destroy(flow);
00102     }
00103 
00104     return result;
00105 }
00106 
00107 static int send_disposition(LINK_INSTANCE* link_instance, delivery_number delivery_number, AMQP_VALUE delivery_state)
00108 {
00109     int result;
00110 
00111     DISPOSITION_HANDLE disposition = disposition_create(link_instance->role, delivery_number);
00112     if (disposition == NULL)
00113     {
00114         result = __LINE__;
00115     }
00116     else
00117     {
00118         if ((disposition_set_last(disposition, delivery_number) != 0) ||
00119             (disposition_set_settled(disposition, true) != 0) ||
00120             ((delivery_state != NULL) && (disposition_set_state(disposition, delivery_state) != 0)))
00121         {
00122             result = __LINE__;
00123         }
00124         else
00125         {
00126             if (session_send_disposition(link_instance->link_endpoint, disposition) != 0)
00127             {
00128                 result = __LINE__;
00129             }
00130             else
00131             {
00132                 result = 0;
00133             }
00134         }
00135 
00136         disposition_destroy(disposition);
00137     }
00138 
00139     return result;
00140 }
00141 
00142 static int send_detach(LINK_INSTANCE* link_instance, bool close, ERROR_HANDLE error_handle)
00143 {
00144     int result;
00145     DETACH_HANDLE detach_performative;
00146 
00147     detach_performative = detach_create(0);
00148     if (detach_performative == NULL)
00149     {
00150         result = __LINE__;
00151     }
00152     else
00153     {
00154         if ((error_handle != NULL) &&
00155             (detach_set_error(detach_performative, error_handle) != 0))
00156         {
00157             result = __LINE__;
00158         }
00159         else if (close &&
00160             (detach_set_closed(detach_performative, true) != 0))
00161         {
00162             result = __LINE__;
00163         }
00164         else
00165         {
00166             if (session_send_detach(link_instance->link_endpoint, detach_performative) != 0)
00167             {
00168                 result = __LINE__;
00169             }
00170             else
00171             {
00172                 if (close)
00173                 {
00174                     /* Declare link to be closed */
00175                     link_instance->is_closed = true;
00176                 }
00177 
00178                 result = 0;
00179             }
00180         }
00181 
00182         detach_destroy(detach_performative);
00183     }
00184 
00185     return result;
00186 }
00187 
00188 static int send_attach(LINK_INSTANCE* link, const char* name, handle handle, role role)
00189 {
00190     int result;
00191     ATTACH_HANDLE attach = attach_create(name, handle, role);
00192 
00193     if (attach == NULL)
00194     {
00195         result = __LINE__;
00196     }
00197     else
00198     {
00199         result = 0;
00200 
00201         link->delivery_count = link->initial_delivery_count;
00202 
00203         attach_set_snd_settle_mode(attach, link->snd_settle_mode);
00204         attach_set_rcv_settle_mode(attach, link->rcv_settle_mode);
00205         attach_set_role(attach, role);
00206         attach_set_source(attach, link->source);
00207         attach_set_target(attach, link->target);
00208         attach_set_properties(attach, link->attach_properties);
00209 
00210         if (role == role_sender)
00211         {
00212             if (attach_set_initial_delivery_count(attach, link->delivery_count) != 0)
00213             {
00214                 result = __LINE__;
00215             }
00216         }
00217 
00218         if (result == 0)
00219         {
00220             if ((attach_set_max_message_size(attach, link->max_message_size) != 0) ||
00221                 (session_send_attach(link->link_endpoint, attach) != 0))
00222             {
00223                 result = __LINE__;
00224             }
00225             else
00226             {
00227                 result = 0;
00228             }
00229         }
00230 
00231         attach_destroy(attach);
00232     }
00233 
00234     return result;
00235 }
00236 
00237 static void link_frame_received(void* context, AMQP_VALUE performative, uint32_t payload_size, const unsigned char* payload_bytes)
00238 {
00239     LINK_INSTANCE* link_instance = (LINK_INSTANCE*)context;
00240     AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(performative);
00241 
00242     if (is_attach_type_by_descriptor(descriptor))
00243     {
00244         ATTACH_HANDLE attach_handle;
00245         if (amqpvalue_get_attach(performative, &attach_handle) == 0)
00246         {
00247             if ((link_instance->role == role_receiver) &&
00248                 (attach_get_initial_delivery_count(attach_handle, &link_instance->delivery_count) != 0))
00249             {
00250                 /* error */
00251                 set_link_state(link_instance, LINK_STATE_DETACHED);
00252             }
00253             else
00254             {
00255                 if (link_instance->link_state == LINK_STATE_HALF_ATTACHED)
00256                 {
00257                     if (link_instance->role == role_receiver)
00258                     {
00259                         link_instance->link_credit = DEFAULT_LINK_CREDIT;
00260                         send_flow(link_instance);
00261                     }
00262                     else
00263                     {
00264                         link_instance->link_credit = 0;
00265                     }
00266 
00267                     set_link_state(link_instance, LINK_STATE_ATTACHED);
00268                 }
00269             }
00270 
00271             attach_destroy(attach_handle);
00272         }
00273     }
00274     else if (is_flow_type_by_descriptor(descriptor))
00275     {
00276         FLOW_HANDLE flow_handle;
00277         if (amqpvalue_get_flow(performative, &flow_handle) == 0)
00278         {
00279             if (link_instance->role == role_sender)
00280             {
00281                 delivery_number rcv_delivery_count;
00282                 uint32_t rcv_link_credit;
00283 
00284                 if ((flow_get_link_credit(flow_handle, &rcv_link_credit) != 0) ||
00285                     (flow_get_delivery_count(flow_handle, &rcv_delivery_count) != 0))
00286                 {
00287                     /* error */
00288                     set_link_state(link_instance, LINK_STATE_DETACHED);
00289                 }
00290                 else
00291                 {
00292                     link_instance->link_credit = rcv_delivery_count + rcv_link_credit - link_instance->delivery_count;
00293                     if (link_instance->link_credit > 0)
00294                     {
00295                         link_instance->on_link_flow_on(link_instance->callback_context);
00296                     }
00297                 }
00298             }
00299         }
00300 
00301         flow_destroy(flow_handle);
00302     }
00303     else if (is_transfer_type_by_descriptor(descriptor))
00304     {
00305         if (link_instance->on_transfer_received != NULL)
00306         {
00307             TRANSFER_HANDLE transfer_handle;
00308             if (amqpvalue_get_transfer(performative, &transfer_handle) == 0)
00309             {
00310                 AMQP_VALUE delivery_state;
00311                 bool more;
00312                 bool is_error;
00313 
00314                 link_instance->link_credit--;
00315                 link_instance->delivery_count++;
00316                 if (link_instance->link_credit == 0)
00317                 {
00318                     link_instance->link_credit = DEFAULT_LINK_CREDIT;
00319                     send_flow(link_instance);
00320                 }
00321 
00322                 more = false;
00323                 /* Attempt to get more flag, default to false */
00324                 (void)transfer_get_more(transfer_handle, &more);
00325                 is_error = false;
00326 
00327                 if (transfer_get_delivery_id(transfer_handle, &link_instance->received_delivery_id) != 0)
00328                 {
00329                     /* is this not a continuation transfer? */
00330                     if (link_instance->received_payload_size == 0)
00331                     {
00332                         LogError("Could not get the delivery Id from the transfer performative");
00333                         is_error = true;
00334                     }
00335                 }
00336                     
00337                 if (!is_error)
00338                 {
00339                     /* If this is a continuation transfer or if this is the first chunk of a multi frame transfer */
00340                     if ((link_instance->received_payload_size > 0) || more)
00341                     {
00342                         unsigned char* new_received_payload = (unsigned char*)realloc(link_instance->received_payload, link_instance->received_payload_size + payload_size);
00343                         if (new_received_payload == NULL)
00344                         {
00345                             LogError("Could not allocate memory for the received payload");
00346                         }
00347                         else
00348                         {
00349                             link_instance->received_payload = new_received_payload;
00350                             (void)memcpy(link_instance->received_payload + link_instance->received_payload_size, payload_bytes, payload_size);
00351                             link_instance->received_payload_size += payload_size;
00352                         }
00353                     }
00354 
00355                     if (!more)
00356                     {
00357                         const unsigned char* indicate_payload_bytes;
00358                         uint32_t indicate_payload_size;
00359 
00360                         /* if no previously stored chunks then simply report the current payload */
00361                         if (link_instance->received_payload_size > 0)
00362                         {
00363                             indicate_payload_size = link_instance->received_payload_size;
00364                             indicate_payload_bytes = link_instance->received_payload;
00365                         }
00366                         else
00367                         {
00368                             indicate_payload_size = payload_size;
00369                             indicate_payload_bytes = payload_bytes;
00370                         }
00371 
00372                         delivery_state = link_instance->on_transfer_received(link_instance->callback_context, transfer_handle, indicate_payload_size, indicate_payload_bytes);
00373 
00374                         if (link_instance->received_payload_size > 0)
00375                         {
00376                             free(link_instance->received_payload);
00377                             link_instance->received_payload = NULL;
00378                             link_instance->received_payload_size = 0;
00379                         }
00380 
00381                         if (send_disposition(link_instance, link_instance->received_delivery_id, delivery_state) != 0)
00382                         {
00383                             LogError("Cannot send disposition frame");
00384                         }
00385 
00386                         if (delivery_state != NULL)
00387                         {
00388                             amqpvalue_destroy(delivery_state);
00389                         }
00390                     }
00391                 }
00392 
00393                 transfer_destroy(transfer_handle);
00394             }
00395         }
00396     }
00397     else if (is_disposition_type_by_descriptor(descriptor))
00398     {
00399         DISPOSITION_HANDLE disposition;
00400         if (amqpvalue_get_disposition(performative, &disposition) != 0)
00401         {
00402             /* error */
00403         }
00404         else
00405         {
00406             delivery_number first;
00407             delivery_number last;
00408 
00409             if (disposition_get_first(disposition, &first) != 0)
00410             {
00411                 /* error */
00412             }
00413             else
00414             {
00415                 bool settled;
00416 
00417                 if (disposition_get_last(disposition, &last) != 0)
00418                 {
00419                     last = first;
00420                 }
00421 
00422                 if (disposition_get_settled(disposition, &settled) != 0)
00423                 {
00424                     /* Error */
00425                     settled = false;
00426                 }
00427 
00428                 if (settled)
00429                 {
00430                     LIST_ITEM_HANDLE pending_delivery = singlylinkedlist_get_head_item(link_instance->pending_deliveries);
00431                     while (pending_delivery != NULL)
00432                     {
00433                         LIST_ITEM_HANDLE next_pending_delivery = singlylinkedlist_get_next_item(pending_delivery);
00434                         DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)singlylinkedlist_item_get_value(pending_delivery);
00435                         if (delivery_instance == NULL)
00436                         {
00437                             /* error */
00438                             break;
00439                         }
00440                         else
00441                         {
00442                             if ((delivery_instance->delivery_id >= first) && (delivery_instance->delivery_id <= last))
00443                             {
00444                                 AMQP_VALUE delivery_state;
00445                                 if (disposition_get_state(disposition, &delivery_state) != 0)
00446                                 {
00447                                     /* error */
00448                                 }
00449                                 else
00450                                 {
00451                                     delivery_instance->on_delivery_settled(delivery_instance->callback_context, delivery_instance->delivery_id, delivery_state);
00452                                     amqpalloc_free(delivery_instance);
00453                                     if (singlylinkedlist_remove(link_instance->pending_deliveries, pending_delivery) != 0)
00454                                     {
00455                                         /* error */
00456                                         break;
00457                                     }
00458                                     else
00459                                     {
00460                                         pending_delivery = next_pending_delivery;
00461                                     }
00462                                 }
00463                             }
00464                             else
00465                             {
00466                                 pending_delivery = next_pending_delivery;
00467                             }
00468                         }
00469                     }
00470                 }
00471             }
00472 
00473             disposition_destroy(disposition);
00474         }
00475     }
00476     else if (is_detach_type_by_descriptor(descriptor))
00477     {
00478         DETACH_HANDLE detach;
00479 
00480         /* Set link state appropriately based on whether we received detach condition */
00481         if (amqpvalue_get_detach(performative, &detach) == 0)
00482         {
00483             bool closed = false;
00484             ERROR_HANDLE error;
00485             if (detach_get_error(detach, &error) == 0)
00486             {
00487                 error_destroy(error);
00488 
00489                 set_link_state(link_instance, LINK_STATE_ERROR);
00490             }
00491             else 
00492             {
00493                 (void)detach_get_closed(detach, &closed);
00494 
00495                 set_link_state(link_instance, LINK_STATE_DETACHED);
00496             }
00497 
00498             /* Received a detach while attached */
00499             if (link_instance->previous_link_state == LINK_STATE_ATTACHED)
00500             {
00501                 /* Respond with ack */
00502                 (void)send_detach(link_instance, closed, NULL);
00503             }
00504 
00505             /* Received a closing detach after we sent a non-closing detach. */
00506             else if (closed &&
00507                 (link_instance->previous_link_state == LINK_STATE_HALF_ATTACHED) &&
00508                 !link_instance->is_closed)
00509             {
00510 
00511                 /* In this case, we MUST signal that we closed by reattaching and then sending a closing detach.*/
00512                 (void)send_attach(link_instance, link_instance->name, 0, link_instance->role);
00513                 (void)send_detach(link_instance, true, NULL);
00514             }
00515 
00516             detach_destroy(detach);
00517         }
00518     }
00519 }
00520 
00521 static void on_session_state_changed(void* context, SESSION_STATE new_session_state, SESSION_STATE previous_session_state)
00522 {
00523     LINK_INSTANCE* link_instance = (LINK_INSTANCE*)context;
00524     (void)previous_session_state;
00525 
00526     if (new_session_state == SESSION_STATE_MAPPED)
00527     {
00528         if ((link_instance->link_state == LINK_STATE_DETACHED) && (!link_instance->is_closed))
00529         {
00530             if (send_attach(link_instance, link_instance->name, 0, link_instance->role) == 0)
00531             {
00532                 set_link_state(link_instance, LINK_STATE_HALF_ATTACHED);
00533             }
00534         }
00535     }
00536     else if (new_session_state == SESSION_STATE_DISCARDING)
00537     {
00538         set_link_state(link_instance, LINK_STATE_DETACHED);
00539     }
00540     else if (new_session_state == SESSION_STATE_ERROR)
00541     {
00542         set_link_state(link_instance, LINK_STATE_ERROR);
00543     }
00544 }
00545 
00546 static void on_session_flow_on(void* context)
00547 {
00548     LINK_INSTANCE* link_instance = (LINK_INSTANCE*)context;
00549     if (link_instance->role == role_sender)
00550     {
00551         link_instance->on_link_flow_on(link_instance->callback_context);
00552     }
00553 }
00554 
00555 static void on_send_complete(void* context, IO_SEND_RESULT send_result)
00556 {
00557     LIST_ITEM_HANDLE delivery_instance_list_item = (LIST_ITEM_HANDLE)context;
00558     DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)singlylinkedlist_item_get_value(delivery_instance_list_item);
00559     LINK_INSTANCE* link_instance = (LINK_INSTANCE*)delivery_instance->link;
00560     (void)send_result;
00561     if (link_instance->snd_settle_mode == sender_settle_mode_settled)
00562     {
00563         delivery_instance->on_delivery_settled(delivery_instance->callback_context, delivery_instance->delivery_id, NULL);
00564         amqpalloc_free(delivery_instance);
00565         (void)singlylinkedlist_remove(link_instance->pending_deliveries, delivery_instance_list_item);
00566     }
00567 }
00568 
00569 LINK_HANDLE link_create(SESSION_HANDLE session, const char* name, role role, AMQP_VALUE source, AMQP_VALUE target)
00570 {
00571     LINK_INSTANCE* result = amqpalloc_malloc(sizeof(LINK_INSTANCE));
00572     if (result != NULL)
00573     {
00574         result->link_state = LINK_STATE_DETACHED;
00575         result->previous_link_state = LINK_STATE_DETACHED;
00576         result->role = role;
00577         result->source = amqpvalue_clone(source);
00578         result->target = amqpvalue_clone(target);
00579         result->session = session;
00580         result->handle = 0;
00581         result->snd_settle_mode = sender_settle_mode_unsettled;
00582         result->rcv_settle_mode = receiver_settle_mode_first;
00583         result->delivery_count = 0;
00584         result->initial_delivery_count = 0;
00585         result->max_message_size = 0;
00586         result->is_underlying_session_begun = false;
00587         result->is_closed = false;
00588         result->attach_properties = NULL;
00589         result->received_payload = NULL;
00590         result->received_payload_size = 0;
00591         result->received_delivery_id = 0;
00592 
00593         result->pending_deliveries = singlylinkedlist_create();
00594         if (result->pending_deliveries == NULL)
00595         {
00596             amqpalloc_free(result);
00597             result = NULL;
00598         }
00599         else
00600         {
00601             result->name = amqpalloc_malloc(strlen(name) + 1);
00602             if (result->name == NULL)
00603             {
00604                 singlylinkedlist_destroy(result->pending_deliveries);
00605                 amqpalloc_free(result);
00606                 result = NULL;
00607             }
00608             else
00609             {
00610                 result->on_link_state_changed = NULL;
00611                 result->callback_context = NULL;
00612                 set_link_state(result, LINK_STATE_DETACHED);
00613 
00614                 (void)strcpy(result->name, name);
00615                 result->link_endpoint = session_create_link_endpoint(session, name);
00616                 if (result->link_endpoint == NULL)
00617                 {
00618                     singlylinkedlist_destroy(result->pending_deliveries);
00619                     amqpalloc_free(result->name);
00620                     amqpalloc_free(result);
00621                     result = NULL;
00622                 }
00623             }
00624         }
00625     }
00626 
00627     return result;
00628 }
00629 
00630 LINK_HANDLE link_create_from_endpoint(SESSION_HANDLE session, LINK_ENDPOINT_HANDLE link_endpoint, const char* name, role role, AMQP_VALUE source, AMQP_VALUE target)
00631 {
00632     LINK_INSTANCE* result = amqpalloc_malloc(sizeof(LINK_INSTANCE));
00633     if (result != NULL)
00634     {
00635         result->link_state = LINK_STATE_DETACHED;
00636         result->previous_link_state = LINK_STATE_DETACHED;
00637         result->session = session;
00638         result->handle = 0;
00639         result->snd_settle_mode = sender_settle_mode_unsettled;
00640         result->rcv_settle_mode = receiver_settle_mode_first;
00641         result->delivery_count = 0;
00642         result->initial_delivery_count = 0;
00643         result->max_message_size = 0;
00644         result->is_underlying_session_begun = false;
00645         result->is_closed = false;
00646         result->attach_properties = NULL;
00647         result->received_payload = NULL;
00648         result->received_payload_size = 0;
00649         result->received_delivery_id = 0;
00650         result->source = amqpvalue_clone(target);
00651         result->target = amqpvalue_clone(source);
00652         if (role == role_sender)
00653         {
00654             result->role = role_receiver;
00655         }
00656         else
00657         {
00658             result->role = role_sender;
00659         }
00660 
00661         result->pending_deliveries = singlylinkedlist_create();
00662         if (result->pending_deliveries == NULL)
00663         {
00664             amqpalloc_free(result);
00665             result = NULL;
00666         }
00667         else
00668         {
00669             result->name = amqpalloc_malloc(strlen(name) + 1);
00670             if (result->name == NULL)
00671             {
00672                 singlylinkedlist_destroy(result->pending_deliveries);
00673                 amqpalloc_free(result);
00674                 result = NULL;
00675             }
00676             else
00677             {
00678                 (void)strcpy(result->name, name);
00679                 result->on_link_state_changed = NULL;
00680                 result->callback_context = NULL;
00681                 result->link_endpoint = link_endpoint;
00682             }
00683         }
00684     }
00685 
00686     return result;
00687 }
00688 
00689 void link_destroy(LINK_HANDLE link)
00690 {
00691     if (link != NULL)
00692     {
00693         link->on_link_state_changed = NULL;
00694         (void)link_detach(link, true);
00695         session_destroy_link_endpoint(link->link_endpoint);
00696         amqpvalue_destroy(link->source);
00697         amqpvalue_destroy(link->target);
00698         if (link->pending_deliveries != NULL)
00699         {
00700             LIST_ITEM_HANDLE item = singlylinkedlist_get_head_item(link->pending_deliveries);
00701             while (item != NULL)
00702             {
00703                 LIST_ITEM_HANDLE next_item = singlylinkedlist_get_next_item(item);
00704                 DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)singlylinkedlist_item_get_value(item);
00705                 if (delivery_instance != NULL)
00706                 {
00707                     amqpalloc_free(delivery_instance);
00708                 }
00709 
00710                 item = next_item;
00711             }
00712 
00713             singlylinkedlist_destroy(link->pending_deliveries);
00714         }
00715 
00716         if (link->name != NULL)
00717         {
00718             amqpalloc_free(link->name);
00719         }
00720 
00721         if (link->attach_properties != NULL)
00722         {
00723             amqpvalue_destroy(link->attach_properties);
00724         }
00725 
00726         if (link->received_payload != NULL)
00727         {
00728             free(link->received_payload);
00729         }
00730 
00731         amqpalloc_free(link);
00732     }
00733 }
00734 
00735 int link_set_snd_settle_mode(LINK_HANDLE link, sender_settle_mode snd_settle_mode)
00736 {
00737     int result;
00738 
00739     if (link == NULL)
00740     {
00741         result = __LINE__;
00742     }
00743     else
00744     {
00745         link->snd_settle_mode = snd_settle_mode;
00746         result = 0;
00747     }
00748 
00749     return result;
00750 }
00751 
00752 int link_get_snd_settle_mode(LINK_HANDLE link, sender_settle_mode* snd_settle_mode)
00753 {
00754     int result;
00755 
00756     if ((link == NULL) ||
00757         (snd_settle_mode == NULL))
00758     {
00759         result = __LINE__;
00760     }
00761     else
00762     {
00763         *snd_settle_mode = link->snd_settle_mode;
00764 
00765         result = 0;
00766     }
00767 
00768     return result;
00769 }
00770 
00771 int link_set_rcv_settle_mode(LINK_HANDLE link, receiver_settle_mode rcv_settle_mode)
00772 {
00773     int result;
00774 
00775     if (link == NULL)
00776     {
00777         result = __LINE__;
00778     }
00779     else
00780     {
00781         link->rcv_settle_mode = rcv_settle_mode;
00782         result = 0;
00783     }
00784 
00785     return result;
00786 }
00787 
00788 int link_get_rcv_settle_mode(LINK_HANDLE link, receiver_settle_mode* rcv_settle_mode)
00789 {
00790     int result;
00791 
00792     if ((link == NULL) ||
00793         (rcv_settle_mode == NULL))
00794     {
00795         result = __LINE__;
00796     }
00797     else
00798     {
00799         *rcv_settle_mode = link->rcv_settle_mode;
00800         result = 0;
00801     }
00802 
00803     return result;
00804 }
00805 
00806 int link_set_initial_delivery_count(LINK_HANDLE link, sequence_no initial_delivery_count)
00807 {
00808     int result;
00809 
00810     if (link == NULL)
00811     {
00812         result = __LINE__;
00813     }
00814     else
00815     {
00816         link->initial_delivery_count = initial_delivery_count;
00817         result = 0;
00818     }
00819 
00820     return result;
00821 }
00822 
00823 int link_get_initial_delivery_count(LINK_HANDLE link, sequence_no* initial_delivery_count)
00824 {
00825     int result;
00826 
00827     if ((link == NULL) ||
00828         (initial_delivery_count == NULL))
00829     {
00830         result = __LINE__;
00831     }
00832     else
00833     {
00834         *initial_delivery_count = link->initial_delivery_count;
00835         result = 0;
00836     }
00837 
00838     return result;
00839 }
00840 
00841 int link_set_max_message_size(LINK_HANDLE link, uint64_t max_message_size)
00842 {
00843     int result;
00844 
00845     if (link == NULL)
00846     {
00847         result = __LINE__;
00848     }
00849     else
00850     {
00851         link->max_message_size = max_message_size;
00852         result = 0;
00853     }
00854 
00855     return result;
00856 }
00857 
00858 int link_get_max_message_size(LINK_HANDLE link, uint64_t* max_message_size)
00859 {
00860     int result;
00861 
00862     if ((link == NULL) ||
00863         (max_message_size == NULL))
00864     {
00865         result = __LINE__;
00866     }
00867     else
00868     {
00869         *max_message_size = link->max_message_size;
00870         result = 0;
00871     }
00872 
00873     return result;
00874 }
00875 
00876 int link_set_attach_properties(LINK_HANDLE link, fields attach_properties)
00877 {
00878     int result;
00879 
00880     if (link == NULL)
00881     {
00882         result = __LINE__;
00883     }
00884     else
00885     {
00886         link->attach_properties = amqpvalue_clone(attach_properties);
00887         if (link->attach_properties == NULL)
00888         {
00889             result = __LINE__;
00890         }
00891         else
00892         {
00893             result = 0;
00894         }
00895     }
00896 
00897     return result;
00898 }
00899 
00900 int link_attach(LINK_HANDLE link, ON_TRANSFER_RECEIVED on_transfer_received, ON_LINK_STATE_CHANGED on_link_state_changed, ON_LINK_FLOW_ON on_link_flow_on, void* callback_context)
00901 {
00902     int result;
00903 
00904     if ((link == NULL) ||
00905         (link->is_closed))
00906     {
00907         result = __LINE__;
00908     }
00909     else
00910     {
00911         if (!link->is_underlying_session_begun)
00912         {
00913             link->on_link_state_changed = on_link_state_changed;
00914             link->on_transfer_received = on_transfer_received;
00915             link->on_link_flow_on = on_link_flow_on;
00916             link->callback_context = callback_context;
00917 
00918             if (session_begin(link->session) != 0)
00919             {
00920                 result = __LINE__;
00921             }
00922             else
00923             {
00924                 link->is_underlying_session_begun = true;
00925 
00926                 if (session_start_link_endpoint(link->link_endpoint, link_frame_received, on_session_state_changed, on_session_flow_on, link) != 0)
00927                 {
00928                     result = __LINE__;
00929                 }
00930                 else
00931                 {
00932                     link->received_payload_size = 0;
00933 
00934                     result = 0;
00935                 }
00936             }
00937         }
00938         else
00939         {
00940             result = 0;
00941         }
00942     }
00943 
00944     return result;
00945 }
00946 
00947 int link_detach(LINK_HANDLE link, bool close)
00948 {
00949     int result;
00950 
00951     if ((link == NULL) ||
00952         (link->is_closed))
00953     {
00954         result = __LINE__;
00955     }
00956     else
00957     {
00958         switch (link->link_state)
00959         {
00960 
00961         case LINK_STATE_HALF_ATTACHED:
00962             /* Sending detach when remote is not yet attached */
00963             if (send_detach(link, close, NULL) != 0)
00964             {
00965                 result = __LINE__;
00966             }
00967             else
00968             {
00969                 set_link_state(link, LINK_STATE_DETACHED);
00970                 result = 0;
00971             }
00972             break;
00973 
00974         case LINK_STATE_ATTACHED:
00975             /* Send detach and wait for remote to respond */
00976             if (send_detach(link, close, NULL) != 0)
00977             {
00978                 result = __LINE__;
00979             }
00980             else
00981             {
00982                 set_link_state(link, LINK_STATE_HALF_ATTACHED);
00983                 result = 0;
00984             }
00985             break;
00986 
00987         case LINK_STATE_DETACHED:
00988             /* Already detached */
00989             result = 0;
00990             break;
00991 
00992         default:
00993         case LINK_STATE_ERROR:
00994             /* Already detached and in error state */
00995             result = __LINE__;
00996             break;
00997         }
00998     }
00999 
01000     return result;
01001 }
01002 
01003 LINK_TRANSFER_RESULT link_transfer(LINK_HANDLE link, message_format message_format, PAYLOAD* payloads, size_t payload_count, ON_DELIVERY_SETTLED on_delivery_settled, void* callback_context)
01004 {
01005     LINK_TRANSFER_RESULT result;
01006 
01007     if (link == NULL)
01008     {
01009         result = LINK_TRANSFER_ERROR;
01010     }
01011     else
01012     {
01013         if ((link->role != role_sender) ||
01014             (link->link_state != LINK_STATE_ATTACHED))
01015         {
01016             result = LINK_TRANSFER_ERROR;
01017         }
01018         else if (link->link_credit == 0)
01019         {
01020             result = LINK_TRANSFER_BUSY;
01021         }
01022         else
01023         {
01024             TRANSFER_HANDLE transfer = transfer_create(0);
01025             if (transfer == NULL)
01026             {
01027                 result = LINK_TRANSFER_ERROR;
01028             }
01029             else
01030             {
01031                 sequence_no delivery_count = link->delivery_count + 1;
01032                 unsigned char delivery_tag_bytes[sizeof(delivery_count)];
01033                 delivery_tag delivery_tag;
01034                 bool settled;
01035 
01036                 (void)memcpy(delivery_tag_bytes, &delivery_count, sizeof(delivery_count));
01037 
01038                 delivery_tag.bytes = &delivery_tag_bytes;
01039                 delivery_tag.length = sizeof(delivery_tag_bytes);
01040 
01041                 if (link->snd_settle_mode == sender_settle_mode_unsettled)
01042                 {
01043                     settled = false;
01044                 }
01045                 else
01046                 {
01047                     settled = true;
01048                 }
01049 
01050                 if ((transfer_set_delivery_tag(transfer, delivery_tag) != 0) ||
01051                     (transfer_set_message_format(transfer, message_format) != 0) ||
01052                     (transfer_set_settled(transfer, settled) != 0))
01053                 {
01054                     result = LINK_TRANSFER_ERROR;
01055                 }
01056                 else
01057                 {
01058                     AMQP_VALUE transfer_value = amqpvalue_create_transfer(transfer);
01059 
01060                     if (transfer_value == NULL)
01061                     {
01062                         result = LINK_TRANSFER_ERROR;
01063                     }
01064                     else
01065                     {
01066                         DELIVERY_INSTANCE* pending_delivery = amqpalloc_malloc(sizeof(DELIVERY_INSTANCE));
01067                         if (pending_delivery == NULL)
01068                         {
01069                             result = LINK_TRANSFER_ERROR;
01070                         }
01071                         else
01072                         {
01073                             LIST_ITEM_HANDLE delivery_instance_list_item;
01074                             pending_delivery->on_delivery_settled = on_delivery_settled;
01075                             pending_delivery->callback_context = callback_context;
01076                             pending_delivery->link = link;
01077                             delivery_instance_list_item = singlylinkedlist_add(link->pending_deliveries, pending_delivery);
01078 
01079                             if (delivery_instance_list_item == NULL)
01080                             {
01081                                 amqpalloc_free(pending_delivery);
01082                                 result = LINK_TRANSFER_ERROR;
01083                             }
01084                             else
01085                             {
01086                                 /* here we should feed data to the transfer frame */
01087                                 switch (session_send_transfer(link->link_endpoint, transfer, payloads, payload_count, &pending_delivery->delivery_id, (settled) ? on_send_complete : NULL, delivery_instance_list_item))
01088                                 {
01089                                 default:
01090                                 case SESSION_SEND_TRANSFER_ERROR:
01091                                     singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item);
01092                                     amqpalloc_free(pending_delivery);
01093                                     result = LINK_TRANSFER_ERROR;
01094                                     break;
01095 
01096                                 case SESSION_SEND_TRANSFER_BUSY:
01097                                     /* Ensure we remove from list again since sender will attempt to transfer again on flow on */
01098                                     singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item);
01099                                     amqpalloc_free(pending_delivery);
01100                                     result = LINK_TRANSFER_BUSY;
01101                                     break;
01102 
01103                                 case SESSION_SEND_TRANSFER_OK:
01104                                     link->delivery_count = delivery_count;
01105                                     link->link_credit--;
01106                                     result = LINK_TRANSFER_OK;
01107                                     break;
01108                                 }
01109                             }
01110                         }
01111 
01112                         amqpvalue_destroy(transfer_value);
01113                     }
01114                 }
01115 
01116                 transfer_destroy(transfer);
01117             }
01118         }
01119     }
01120 
01121     return result;
01122 }