Mark Radbourne / Mbed 2 deprecated iothub_client_sample_amqp

Dependencies:   EthernetInterface NTPClient iothub_amqp_transport iothub_client mbed-rtos mbed

Fork of iothub_client_sample_amqp by Azure IoT

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers session.c Source File

session.c

00001 // Copyright (c) Microsoft. All rights reserved.
00002 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
00003 
00004 #include <stdlib.h>
00005 #ifdef _CRTDBG_MAP_ALLOC
00006 #include <crtdbg.h>
00007 #endif
00008 #include <string.h>
00009 #include "azure_uamqp_c/session.h"
00010 #include "azure_uamqp_c/connection.h"
00011 #include "azure_uamqp_c/amqpalloc.h"
00012 #include "azure_c_shared_utility/xlogging.h"
00013 
00014 typedef struct LINK_ENDPOINT_INSTANCE_TAG
00015 {
00016     char* name;
00017     handle input_handle;
00018     handle output_handle;
00019     ON_ENDPOINT_FRAME_RECEIVED frame_received_callback;
00020     ON_SESSION_STATE_CHANGED on_session_state_changed;
00021     ON_SESSION_FLOW_ON on_session_flow_on;
00022     void* callback_context;
00023     SESSION_HANDLE session;
00024 } LINK_ENDPOINT_INSTANCE;
00025 
00026 typedef struct SESSION_INSTANCE_TAG
00027 {
00028     ON_ENDPOINT_FRAME_RECEIVED frame_received_callback;
00029     void* frame_received_callback_context;
00030     SESSION_STATE session_state;
00031     SESSION_STATE previous_session_state;
00032     CONNECTION_HANDLE connection;
00033     ENDPOINT_HANDLE endpoint;
00034     LINK_ENDPOINT_INSTANCE** link_endpoints;
00035     uint32_t link_endpoint_count;
00036 
00037     ON_LINK_ATTACHED on_link_attached;
00038     void* on_link_attached_callback_context;
00039 
00040     /* Codes_SRS_SESSION_01_016: [next-outgoing-id The next-outgoing-id is the transfer-id to assign to the next transfer frame.] */
00041     transfer_number next_outgoing_id;
00042     transfer_number next_incoming_id;
00043     uint32_t desired_incoming_window;
00044     uint32_t incoming_window;
00045     uint32_t outgoing_window;
00046     handle handle_max;
00047     uint32_t remote_incoming_window;
00048     uint32_t remote_outgoing_window;
00049     int is_underlying_connection_open : 1;
00050 } SESSION_INSTANCE;
00051 
00052 #define UNDERLYING_CONNECTION_NOT_OPEN 0
00053 #define UNDERLYING_CONNECTION_OPEN -1
00054 
00055 static void session_set_state(SESSION_INSTANCE* session_instance, SESSION_STATE session_state)
00056 {
00057     uint64_t i;
00058 
00059     session_instance->previous_session_state = session_instance->session_state;
00060     session_instance->session_state = session_state;
00061 
00062     for (i = 0; i < session_instance->link_endpoint_count; i++)
00063     {
00064         if (session_instance->link_endpoints[i]->on_session_state_changed != NULL)
00065         {
00066             session_instance->link_endpoints[i]->on_session_state_changed(session_instance->link_endpoints[i]->callback_context, session_state, session_instance->previous_session_state);
00067         }
00068     }
00069 }
00070 
00071 static int send_end_frame(SESSION_INSTANCE* session_instance, ERROR_HANDLE error_handle)
00072 {
00073     int result;
00074     END_HANDLE end_performative;
00075 
00076     end_performative = end_create();
00077     if (end_performative == NULL)
00078     {
00079         result = __LINE__;
00080     }
00081     else
00082     {
00083         if ((error_handle != NULL) &&
00084             (end_set_error(end_performative, error_handle) != 0))
00085         {
00086             result = __LINE__;
00087         }
00088         else
00089         {
00090             AMQP_VALUE end_performative_value = amqpvalue_create_end(end_performative);
00091             if (end_performative_value == NULL)
00092             {
00093                 result = __LINE__;
00094             }
00095             else
00096             {
00097                 if (connection_encode_frame(session_instance->endpoint, end_performative_value, NULL, 0, NULL, NULL) != 0)
00098                 {
00099                     result = __LINE__;
00100                 }
00101                 else
00102                 {
00103                     result = 0;
00104                 }
00105 
00106                 amqpvalue_destroy(end_performative_value);
00107             }
00108         }
00109 
00110         end_destroy(end_performative);
00111     }
00112 
00113     return result;
00114 }
00115 
00116 static void end_session_with_error(SESSION_INSTANCE* session_instance, const char* condition_value, const char* description)
00117 {
00118     ERROR_HANDLE error_handle = error_create(condition_value);
00119     if (error_handle == NULL)
00120     {
00121         /* fatal error */
00122         session_set_state(session_instance, SESSION_STATE_DISCARDING);
00123         (void)connection_close(session_instance->connection, "amqp:internal-error", "Cannot allocate error handle to end session");
00124     }
00125     else
00126     {
00127         if ((error_set_description(error_handle, description) != 0) ||
00128             (send_end_frame(session_instance, error_handle) != 0))
00129         {
00130             /* fatal error */
00131             session_set_state(session_instance, SESSION_STATE_DISCARDING);
00132             (void)connection_close(session_instance->connection, "amqp:internal-error", "Cannot allocate error handle to end session");
00133         }
00134         else
00135         {
00136             session_set_state(session_instance, SESSION_STATE_DISCARDING);
00137         }
00138 
00139         error_destroy(error_handle);
00140     }
00141 }
00142 
00143 static int send_begin(SESSION_INSTANCE* session_instance)
00144 {
00145     int result;
00146     BEGIN_HANDLE begin = begin_create(session_instance->next_outgoing_id, session_instance->incoming_window, session_instance->outgoing_window);
00147 
00148     if (begin == NULL)
00149     {
00150         result = __LINE__;
00151     }
00152     else
00153     {
00154         uint16_t remote_channel;
00155         if (begin_set_handle_max(begin, session_instance->handle_max) != 0)
00156         {
00157             result = __LINE__;
00158         }
00159         else if ((session_instance->session_state == SESSION_STATE_BEGIN_RCVD) &&
00160             ((connection_endpoint_get_incoming_channel(session_instance->endpoint, &remote_channel) != 0) ||
00161             (begin_set_remote_channel(begin, remote_channel) != 0)))
00162         {
00163             result = __LINE__;
00164         }
00165         else
00166         {
00167             AMQP_VALUE begin_performative_value = amqpvalue_create_begin(begin);
00168             if (begin_performative_value == NULL)
00169             {
00170                 result = __LINE__;
00171             }
00172             else
00173             {
00174                 if (connection_encode_frame(session_instance->endpoint, begin_performative_value, NULL, 0, NULL, NULL) != 0)
00175                 {
00176                     result = __LINE__;
00177                 }
00178                 else
00179                 {
00180                     result = 0;
00181                 }
00182 
00183                 amqpvalue_destroy(begin_performative_value);
00184             }
00185         }
00186 
00187         begin_destroy(begin);
00188     }
00189 
00190     return result;
00191 }
00192 
00193 static int send_flow(SESSION_INSTANCE* session)
00194 {
00195     int result;
00196     if (session == NULL)
00197     {
00198         result = __LINE__;
00199     }
00200     else
00201     {
00202         FLOW_HANDLE flow = flow_create(session->incoming_window, session->next_outgoing_id, session->outgoing_window);
00203 
00204         if (flow == NULL)
00205         {
00206             result = __LINE__;
00207         }
00208         else
00209         {
00210             if (flow_set_next_incoming_id(flow, session->next_incoming_id) != 0)
00211             {
00212                 result = __LINE__;
00213             }
00214             else
00215             {
00216                 AMQP_VALUE flow_performative_value = amqpvalue_create_flow(flow);
00217                 if (flow_performative_value == NULL)
00218                 {
00219                     result = __LINE__;
00220                 }
00221                 else
00222                 {
00223                     if (connection_encode_frame(session->endpoint, flow_performative_value, NULL, 0, NULL, NULL) != 0)
00224                     {
00225                         result = __LINE__;
00226                     }
00227                     else
00228                     {
00229                         result = 0;
00230                     }
00231 
00232                     amqpvalue_destroy(flow_performative_value);
00233                 }
00234             }
00235 
00236             flow_destroy(flow);
00237         }
00238     }
00239 
00240     return result;
00241 }
00242 
00243 static LINK_ENDPOINT_INSTANCE* find_link_endpoint_by_name(SESSION_INSTANCE* session, const char* name)
00244 {
00245     uint32_t i;
00246     LINK_ENDPOINT_INSTANCE* result;
00247 
00248     for (i = 0; i < session->link_endpoint_count; i++)
00249     {
00250         if (strcmp(session->link_endpoints[i]->name, name) == 0)
00251         {
00252             break;
00253         }
00254     }
00255 
00256     if (i == session->link_endpoint_count)
00257     {
00258         result = NULL;
00259     }
00260     else
00261     {
00262         result = session->link_endpoints[i];
00263     }
00264 
00265     return result;
00266 }
00267 
00268 static LINK_ENDPOINT_INSTANCE* find_link_endpoint_by_input_handle(SESSION_INSTANCE* session, handle input_handle)
00269 {
00270     uint32_t i;
00271     LINK_ENDPOINT_INSTANCE* result;
00272 
00273     for (i = 0; i < session->link_endpoint_count; i++)
00274     {
00275         if (session->link_endpoints[i]->input_handle == input_handle)
00276         {
00277             break;
00278         }
00279     }
00280 
00281     if (i == session->link_endpoint_count)
00282     {
00283         result = NULL;
00284     }
00285     else
00286     {
00287         result = session->link_endpoints[i];
00288     }
00289 
00290     return result;
00291 }
00292 
00293 static LINK_ENDPOINT_INSTANCE* find_link_endpoint_by_output_handle(SESSION_INSTANCE* session, handle output_handle)
00294 {
00295     uint32_t i;
00296     LINK_ENDPOINT_INSTANCE* result;
00297 
00298     for (i = 0; i < session->link_endpoint_count; i++)
00299     {
00300         if (session->link_endpoints[i]->output_handle == output_handle)
00301         {
00302             break;
00303         }
00304     }
00305 
00306     if (i == session->link_endpoint_count)
00307     {
00308         result = NULL;
00309     }
00310     else
00311     {
00312         result = session->link_endpoints[i];
00313     }
00314 
00315     return result;
00316 }
00317 
00318 static void on_connection_state_changed(void* context, CONNECTION_STATE new_connection_state, CONNECTION_STATE previous_connection_state)
00319 {
00320     SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)context;
00321 
00322     /* Codes_SRS_SESSION_01_060: [If the previous connection state is not OPENED and the new connection state is OPENED, the BEGIN frame shall be sent out and the state shall be switched to BEGIN_SENT.] */
00323     if ((new_connection_state == CONNECTION_STATE_OPENED) && (previous_connection_state != CONNECTION_STATE_OPENED) && (session_instance->session_state == SESSION_STATE_UNMAPPED))
00324     {
00325         if (send_begin(session_instance) == 0)
00326         {
00327             session_set_state(session_instance, SESSION_STATE_BEGIN_SENT);
00328         }
00329     }
00330     /* Codes_SRS_SESSION_01_061: [If the previous connection state is OPENED and the new connection state is not OPENED anymore, the state shall be switched to DISCARDING.] */
00331     else if ((new_connection_state == CONNECTION_STATE_CLOSE_RCVD) || (new_connection_state == CONNECTION_STATE_END))
00332     {
00333         session_set_state(session_instance, SESSION_STATE_DISCARDING);
00334     }
00335     /* Codes_SRS_SESSION_09_001: [If the new connection state is ERROR, the state shall be switched to ERROR.] */
00336     else if (new_connection_state == CONNECTION_STATE_ERROR)
00337     {
00338         session_set_state(session_instance, SESSION_STATE_ERROR);
00339     }
00340 }
00341 
00342 static void on_frame_received(void* context, AMQP_VALUE performative, uint32_t payload_size, const unsigned char* payload_bytes)
00343 {
00344     SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)context;
00345     AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(performative);
00346 
00347     if (is_begin_type_by_descriptor(descriptor))
00348     {
00349         BEGIN_HANDLE begin_handle;
00350 
00351         if (amqpvalue_get_begin(performative, &begin_handle) != 0)
00352         {
00353             connection_close(session_instance->connection, "amqp:decode-error", "Cannot decode BEGIN frame");
00354         }
00355         else
00356         {
00357             if ((begin_get_incoming_window(begin_handle, &session_instance->remote_incoming_window) != 0) ||
00358                 (begin_get_next_outgoing_id(begin_handle, &session_instance->next_incoming_id) != 0))
00359             {
00360                 /* error */
00361                 begin_destroy(begin_handle);
00362                 session_set_state(session_instance, SESSION_STATE_DISCARDING);
00363                 connection_close(session_instance->connection, "amqp:decode-error", "Cannot get incoming windows and next outgoing id");
00364             }
00365             else
00366             {
00367                 begin_destroy(begin_handle);
00368 
00369                 if (session_instance->session_state == SESSION_STATE_BEGIN_SENT)
00370                 {
00371                     session_set_state(session_instance, SESSION_STATE_MAPPED);
00372                 }
00373                 else if(session_instance->session_state == SESSION_STATE_UNMAPPED)
00374                 {
00375                     session_set_state(session_instance, SESSION_STATE_BEGIN_RCVD);
00376                     if (send_begin(session_instance) != 0)
00377                     {
00378                         connection_close(session_instance->connection, "amqp:internal-error", "Failed sending BEGIN frame");
00379                         session_set_state(session_instance, SESSION_STATE_DISCARDING);
00380                     }
00381                     else
00382                     {
00383                         session_set_state(session_instance, SESSION_STATE_MAPPED);
00384                     }
00385                 }
00386             }
00387         }
00388     }
00389     else if (is_attach_type_by_descriptor(descriptor))
00390     {
00391         const char* name = NULL;
00392         ATTACH_HANDLE attach_handle;
00393 
00394         if (amqpvalue_get_attach(performative, &attach_handle) != 0)
00395         {
00396             end_session_with_error(session_instance, "amqp:decode-error", "Cannot decode ATTACH frame");
00397         }
00398         else
00399         {
00400             role role;
00401             AMQP_VALUE source;
00402             AMQP_VALUE target;
00403 
00404             if ((attach_get_name(attach_handle, &name) != 0) ||
00405                 (attach_get_role(attach_handle, &role) != 0) ||
00406                 (attach_get_source(attach_handle, &source) != 0) ||
00407                 (attach_get_target(attach_handle, &target) != 0))
00408             {
00409                 end_session_with_error(session_instance, "amqp:decode-error", "Cannot get link name from ATTACH frame");
00410             }
00411             else
00412             {
00413                 LINK_ENDPOINT_INSTANCE* link_endpoint = find_link_endpoint_by_name(session_instance, name);
00414                 if (link_endpoint == NULL)
00415                 {
00416                     /* new link attach */
00417                     if (session_instance->on_link_attached != NULL)
00418                     {
00419                         LINK_ENDPOINT_HANDLE new_link_endpoint = session_create_link_endpoint(session_instance, name);
00420                         if (new_link_endpoint == NULL)
00421                         {
00422                             end_session_with_error(session_instance, "amqp:internal-error", "Cannot create link endpoint");
00423                         }
00424                         else if (attach_get_handle(attach_handle, &new_link_endpoint->input_handle) != 0)
00425                         {
00426                             end_session_with_error(session_instance, "amqp:decode-error", "Cannot get input handle from ATTACH frame");
00427                         }
00428                         else
00429                         {
00430                             if (!session_instance->on_link_attached(session_instance->on_link_attached_callback_context, new_link_endpoint, name, role, source, target))
00431                             {
00432                                 session_destroy_link_endpoint(new_link_endpoint);
00433                                 new_link_endpoint = NULL;
00434                             }
00435                             else
00436                             {
00437                                 if (new_link_endpoint->frame_received_callback != NULL)
00438                                 {
00439                                     new_link_endpoint->frame_received_callback(new_link_endpoint->callback_context, performative, payload_size, payload_bytes);
00440                                 }
00441                             }
00442                         }
00443                     }
00444                 }
00445                 else
00446                 {
00447                     if (attach_get_handle(attach_handle, &link_endpoint->input_handle) != 0)
00448                     {
00449                         end_session_with_error(session_instance, "amqp:decode-error", "Cannot get input handle from ATTACH frame");
00450                     }
00451                     else
00452                     {
00453                         link_endpoint->frame_received_callback(link_endpoint->callback_context, performative, payload_size, payload_bytes);
00454                     }
00455                 }
00456             }
00457 
00458             attach_destroy(attach_handle);
00459         }
00460     }
00461     else if (is_detach_type_by_descriptor(descriptor))
00462     {
00463         DETACH_HANDLE detach_handle;
00464 
00465         if (amqpvalue_get_detach(performative, &detach_handle) != 0)
00466         {
00467             end_session_with_error(session_instance, "amqp:decode-error", "Cannot decode DETACH frame");
00468         }
00469         else
00470         {
00471             uint32_t remote_handle;
00472             if (detach_get_handle(detach_handle, &remote_handle) != 0)
00473             {
00474                 end_session_with_error(session_instance, "amqp:decode-error", "Cannot get handle from DETACH frame");
00475 
00476                 detach_destroy(detach_handle);
00477             }
00478             else
00479             {
00480                 detach_destroy(detach_handle);
00481 
00482                 LINK_ENDPOINT_INSTANCE* link_endpoint = find_link_endpoint_by_input_handle(session_instance, remote_handle);
00483                 if (link_endpoint == NULL)
00484                 {
00485                     end_session_with_error(session_instance, "amqp:session:unattached-handle", "");
00486                 }
00487                 else
00488                 {
00489                     link_endpoint->frame_received_callback(link_endpoint->callback_context, performative, payload_size, payload_bytes);
00490                 }
00491             }
00492         }
00493     }
00494     else if (is_flow_type_by_descriptor(descriptor))
00495     {
00496         FLOW_HANDLE flow_handle;
00497 
00498         if (amqpvalue_get_flow(performative, &flow_handle) != 0)
00499         {
00500             end_session_with_error(session_instance, "amqp:decode-error", "Cannot decode FLOW frame");
00501         }
00502         else
00503         {
00504             uint32_t remote_handle;
00505             transfer_number flow_next_incoming_id;
00506             uint32_t flow_incoming_window;
00507 
00508             if (flow_get_next_incoming_id(flow_handle, &flow_next_incoming_id) != 0)
00509             {
00510                 /*
00511                 If the next-incoming-id field of the flow frame is not set, 
00512                 then remote-incomingwindow is computed as follows: 
00513                 initial-outgoing-id(endpoint) + incoming-window(flow) - next-outgoing-id(endpoint)
00514                 */
00515                 flow_next_incoming_id = session_instance->next_outgoing_id;
00516             }
00517 
00518             if ((flow_get_next_outgoing_id(flow_handle, &session_instance->next_incoming_id) != 0) ||
00519                 (flow_get_incoming_window(flow_handle, &flow_incoming_window) != 0))
00520             {
00521                 flow_destroy(flow_handle);
00522 
00523                 end_session_with_error(session_instance, "amqp:decode-error", "Cannot decode FLOW frame");
00524             }
00525             else
00526             {
00527                 LINK_ENDPOINT_INSTANCE* link_endpoint_instance = NULL;
00528 
00529                 session_instance->remote_incoming_window = flow_next_incoming_id + flow_incoming_window - session_instance->next_outgoing_id;
00530 
00531                 if (flow_get_handle(flow_handle, &remote_handle) == 0)
00532                 {
00533                     link_endpoint_instance = find_link_endpoint_by_input_handle(session_instance, remote_handle);
00534                 }
00535 
00536                 flow_destroy(flow_handle);
00537 
00538                 if (link_endpoint_instance != NULL)
00539                 {
00540                     link_endpoint_instance->frame_received_callback(link_endpoint_instance->callback_context, performative, payload_size, payload_bytes);
00541                 }
00542 
00543                 size_t i = 0;
00544                 while ((session_instance->remote_incoming_window > 0) && (i < session_instance->link_endpoint_count))
00545                 {
00546                     /* notify the caller that it can send here */
00547                     if (session_instance->link_endpoints[i]->on_session_flow_on != NULL)
00548                     {
00549                         session_instance->link_endpoints[i]->on_session_flow_on(session_instance->link_endpoints[i]->callback_context);
00550                     }
00551 
00552                     i++;
00553                 }
00554             }
00555         }
00556     }
00557     else if (is_transfer_type_by_descriptor(descriptor))
00558     {
00559         TRANSFER_HANDLE transfer_handle;
00560 
00561         if (amqpvalue_get_transfer(performative, &transfer_handle) != 0)
00562         {
00563             end_session_with_error(session_instance, "amqp:decode-error", "Cannot decode TRANSFER frame");
00564         }
00565         else
00566         {
00567             uint32_t remote_handle;
00568             delivery_number delivery_id;
00569 
00570             transfer_get_delivery_id(transfer_handle, &delivery_id);
00571             if (transfer_get_handle(transfer_handle, &remote_handle) != 0)
00572             {
00573                 transfer_destroy(transfer_handle);
00574                 end_session_with_error(session_instance, "amqp:decode-error", "Cannot get handle from TRANSFER frame");
00575             }
00576             else
00577             {
00578                 transfer_destroy(transfer_handle);
00579 
00580                 session_instance->next_incoming_id++;
00581                 session_instance->remote_outgoing_window--;
00582                 session_instance->incoming_window--;
00583 
00584                 LINK_ENDPOINT_INSTANCE* link_endpoint = find_link_endpoint_by_input_handle(session_instance, remote_handle);
00585                 if (link_endpoint == NULL)
00586                 {
00587                     end_session_with_error(session_instance, "amqp:session:unattached-handle", "");
00588                 }
00589                 else
00590                 {
00591                     link_endpoint->frame_received_callback(link_endpoint->callback_context, performative, payload_size, payload_bytes);
00592                 }
00593 
00594                 if (session_instance->incoming_window == 0)
00595                 {
00596                     session_instance->incoming_window = session_instance->desired_incoming_window;
00597                     send_flow(session_instance);
00598                 }
00599             }
00600         }
00601     }
00602     else if (is_disposition_type_by_descriptor(descriptor))
00603     {
00604         uint32_t i;
00605 
00606         for (i = 0; i < session_instance->link_endpoint_count; i++)
00607         {
00608             LINK_ENDPOINT_INSTANCE* link_endpoint = session_instance->link_endpoints[i];
00609             link_endpoint->frame_received_callback(link_endpoint->callback_context, performative, payload_size, payload_bytes);
00610         }
00611     }
00612     else if (is_end_type_by_descriptor(descriptor))
00613     {
00614         END_HANDLE end_handle;
00615 
00616         if (amqpvalue_get_end(performative, &end_handle) != 0)
00617         {
00618             end_session_with_error(session_instance, "amqp:decode-error", "Cannot decode END frame");
00619         }
00620         else
00621         {
00622             if ((session_instance->session_state != SESSION_STATE_END_RCVD) &&
00623                 (session_instance->session_state != SESSION_STATE_DISCARDING))
00624             {
00625                 session_set_state(session_instance, SESSION_STATE_END_RCVD);
00626                 if (send_end_frame(session_instance, NULL) != 0)
00627                 {
00628                     /* fatal error */
00629                     (void)connection_close(session_instance->connection, "amqp:internal-error", "Cannot send END frame.");
00630                 }
00631 
00632                 session_set_state(session_instance, SESSION_STATE_DISCARDING);
00633             }
00634         }
00635     }
00636 }
00637 
00638 SESSION_HANDLE session_create(CONNECTION_HANDLE connection, ON_LINK_ATTACHED on_link_attached, void* callback_context)
00639 {
00640     SESSION_INSTANCE* result;
00641 
00642     if (connection == NULL)
00643     {
00644         /* Codes_SRS_SESSION_01_031: [If connection is NULL, session_create shall fail and return NULL.] */
00645         result = NULL;
00646     }
00647     else
00648     {
00649         /* Codes_SRS_SESSION_01_030: [session_create shall create a new session instance and return a non-NULL handle to it.] */
00650         result = amqpalloc_malloc(sizeof(SESSION_INSTANCE));
00651         /* Codes_SRS_SESSION_01_042: [If allocating memory for the session fails, session_create shall fail and return NULL.] */
00652         if (result != NULL)
00653         {
00654             result->connection = connection;
00655             result->link_endpoints = NULL;
00656             result->link_endpoint_count = 0;
00657             result->handle_max = 4294967295u;
00658 
00659             /* Codes_SRS_SESSION_01_057: [The delivery ids shall be assigned starting at 0.] */
00660             /* Codes_SRS_SESSION_01_017: [The nextoutgoing-id MAY be initialized to an arbitrary value ] */
00661             result->next_outgoing_id = 0;
00662 
00663             result->desired_incoming_window = 1;
00664             result->incoming_window = 1;
00665             result->outgoing_window = 1;
00666             result->handle_max = 4294967295u;
00667             result->remote_incoming_window = 0;
00668             result->remote_outgoing_window = 0;
00669             result->previous_session_state = SESSION_STATE_UNMAPPED;
00670             result->is_underlying_connection_open = UNDERLYING_CONNECTION_NOT_OPEN;
00671             result->session_state = SESSION_STATE_UNMAPPED;
00672             result->on_link_attached = on_link_attached;
00673             result->on_link_attached_callback_context = callback_context;
00674 
00675             /* Codes_SRS_SESSION_01_032: [session_create shall create a new session endpoint by calling connection_create_endpoint.] */
00676             result->endpoint = connection_create_endpoint(connection);
00677             if (result->endpoint == NULL)
00678             {
00679                 /* Codes_SRS_SESSION_01_033: [If connection_create_endpoint fails, session_create shall fail and return NULL.] */
00680                 amqpalloc_free(result);
00681                 result = NULL;
00682             }
00683             else
00684             {
00685                 session_set_state(result, SESSION_STATE_UNMAPPED);
00686             }
00687         }
00688     }
00689 
00690     return result;
00691 }
00692 
00693 SESSION_HANDLE session_create_from_endpoint(CONNECTION_HANDLE connection, ENDPOINT_HANDLE endpoint, ON_LINK_ATTACHED on_link_attached, void* callback_context)
00694 {
00695     SESSION_INSTANCE* result;
00696 
00697     if (endpoint == NULL)
00698     {
00699         result = NULL;
00700     }
00701     else
00702     {
00703         result = amqpalloc_malloc(sizeof(SESSION_INSTANCE));
00704         if (result != NULL)
00705         {
00706             result->connection = connection;
00707             result->link_endpoints = NULL;
00708             result->link_endpoint_count = 0;
00709             result->handle_max = 4294967295u;
00710 
00711             result->next_outgoing_id = 0;
00712 
00713             result->incoming_window = 1;
00714             result->outgoing_window = 1;
00715             result->handle_max = 4294967295u;
00716             result->remote_incoming_window = 0;
00717             result->remote_outgoing_window = 0;
00718             result->previous_session_state = SESSION_STATE_UNMAPPED;
00719             result->is_underlying_connection_open = UNDERLYING_CONNECTION_NOT_OPEN;
00720             result->session_state = SESSION_STATE_UNMAPPED;
00721             result->on_link_attached = on_link_attached;
00722             result->on_link_attached_callback_context = callback_context;
00723 
00724             result->endpoint = endpoint;
00725             session_set_state(result, SESSION_STATE_UNMAPPED);
00726         }
00727     }
00728 
00729     return result;
00730 }
00731 
00732 void session_destroy(SESSION_HANDLE session)
00733 {
00734     /* Codes_SRS_SESSION_01_036: [If session is NULL, session_destroy shall do nothing.] */
00735     if (session != NULL)
00736     {
00737         SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)session;
00738 
00739         session_end(session, NULL, NULL);
00740 
00741         /* Codes_SRS_SESSION_01_034: [session_destroy shall free all resources allocated by session_create.] */
00742         /* Codes_SRS_SESSION_01_035: [The endpoint created in session_create shall be freed by calling connection_destroy_endpoint.] */
00743         connection_destroy_endpoint(session_instance->endpoint);
00744         if (session_instance->link_endpoints != NULL)
00745         {
00746             amqpalloc_free(session_instance->link_endpoints);
00747         }
00748 
00749         amqpalloc_free(session);
00750     }
00751 }
00752 
00753 int session_begin(SESSION_HANDLE session)
00754 {
00755     int result;
00756 
00757     if (session == NULL)
00758     {
00759         result = __LINE__;
00760     }
00761     else
00762     {
00763         SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)session;
00764 
00765         if (connection_start_endpoint(session_instance->endpoint, on_frame_received, on_connection_state_changed, session_instance) != 0)
00766         {
00767             result = __LINE__;
00768         }
00769         else
00770         {
00771             if (!session_instance->is_underlying_connection_open)
00772             {
00773                 if (connection_open(session_instance->connection) != 0)
00774                 {
00775                     session_instance->is_underlying_connection_open = UNDERLYING_CONNECTION_NOT_OPEN;
00776                     result = __LINE__;
00777                 }
00778                 else
00779                 {
00780                     session_instance->is_underlying_connection_open = UNDERLYING_CONNECTION_OPEN;
00781                     result = 0;
00782                 }
00783             }
00784             else
00785             {
00786                 result = 0;
00787             }
00788         }
00789     }
00790 
00791     return result;
00792 }
00793 
00794 int session_end(SESSION_HANDLE session, const char* condition_value, const char* description)
00795 {
00796     int result;
00797 
00798     if (session == NULL)
00799     {
00800         result = __LINE__;
00801     }
00802     else
00803     {
00804         SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)session;
00805 
00806         if ((session_instance->session_state != SESSION_STATE_UNMAPPED) &&
00807             (session_instance->session_state != SESSION_STATE_DISCARDING))
00808         {
00809             ERROR_HANDLE error_handle = NULL;
00810             result = 0;
00811 
00812             if (condition_value != NULL)
00813             {
00814                 error_handle = error_create(condition_value);
00815                 if (error_handle == NULL)
00816                 {
00817                     result = __LINE__;
00818                 }
00819                 else
00820                 {
00821                     if (error_set_description(error_handle, description) != 0)
00822                     {
00823                         result = __LINE__;
00824                     }
00825                 }
00826             }
00827 
00828             if (result == 0)
00829             {
00830                 if (send_end_frame(session_instance, error_handle) != 0)
00831                 {
00832                     result = __LINE__;
00833                 }
00834                 else
00835                 {
00836                     session_set_state(session_instance, SESSION_STATE_DISCARDING);
00837                     result = 0;
00838                 }
00839             }
00840 
00841             if (error_handle != NULL)
00842             {
00843                 error_destroy(error_handle);
00844             }
00845         }
00846         else
00847         {
00848             result = 0;
00849         }
00850     }
00851 
00852     return result;
00853 }
00854 
00855 int session_set_incoming_window(SESSION_HANDLE session, uint32_t incoming_window)
00856 {
00857     int result;
00858 
00859     if (session == NULL)
00860     {
00861         result = __LINE__;
00862     }
00863     else
00864     {
00865         SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)session;
00866 
00867         session_instance->desired_incoming_window = incoming_window;
00868         session_instance->incoming_window = incoming_window;
00869 
00870         result = 0;
00871     }
00872 
00873     return result;
00874 }
00875 
00876 int session_get_incoming_window(SESSION_HANDLE session, uint32_t* incoming_window)
00877 {
00878     int result;
00879 
00880     if ((session == NULL) ||
00881         (incoming_window == NULL))
00882     {
00883         result = __LINE__;
00884     }
00885     else
00886     {
00887         SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)session;
00888 
00889         *incoming_window = session_instance->incoming_window;
00890 
00891         result = 0;
00892     }
00893 
00894     return result;
00895 }
00896 
00897 int session_set_outgoing_window(SESSION_HANDLE session, uint32_t outgoing_window)
00898 {
00899     int result;
00900 
00901     if (session == NULL)
00902     {
00903         result = __LINE__;
00904     }
00905     else
00906     {
00907         SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)session;
00908 
00909         session_instance->outgoing_window = outgoing_window;
00910 
00911         result = 0;
00912     }
00913 
00914     return result;
00915 }
00916 
00917 int session_get_outgoing_window(SESSION_HANDLE session, uint32_t* outgoing_window)
00918 {
00919     int result;
00920 
00921     if ((session == NULL) ||
00922         (outgoing_window == NULL))
00923     {
00924         result = __LINE__;
00925     }
00926     else
00927     {
00928         SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)session;
00929 
00930         *outgoing_window = session_instance->outgoing_window;
00931 
00932         result = 0;
00933     }
00934 
00935     return result;
00936 }
00937 
00938 int session_set_handle_max(SESSION_HANDLE session, handle handle_max)
00939 {
00940     int result;
00941 
00942     if (session == NULL)
00943     {
00944         result = __LINE__;
00945     }
00946     else
00947     {
00948         SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)session;
00949 
00950         session_instance->handle_max = handle_max;
00951 
00952         result = 0;
00953     }
00954 
00955     return result;
00956 }
00957 
00958 int session_get_handle_max(SESSION_HANDLE session, handle* handle_max)
00959 {
00960     int result;
00961 
00962     if ((session == NULL) ||
00963         (handle_max == NULL))
00964     {
00965         result = __LINE__;
00966     }
00967     else
00968     {
00969         SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)session;
00970 
00971         *handle_max = session_instance->handle_max;
00972 
00973         result = 0;
00974     }
00975 
00976     return result;
00977 }
00978 
00979 LINK_ENDPOINT_HANDLE session_create_link_endpoint(SESSION_HANDLE session, const char* name)
00980 {
00981     LINK_ENDPOINT_INSTANCE* result;
00982 
00983     /* Codes_SRS_SESSION_01_044: [If session, name or frame_received_callback is NULL, session_create_link_endpoint shall fail and return NULL.] */
00984     if ((session == NULL) ||
00985         (name == NULL))
00986     {
00987         result = NULL;
00988     }
00989     else
00990     {
00991         /* Codes_SRS_SESSION_01_043: [session_create_link_endpoint shall create a link endpoint associated with a given session and return a non-NULL handle to it.] */
00992         SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)session;
00993 
00994         result = (LINK_ENDPOINT_INSTANCE*)amqpalloc_malloc(sizeof(LINK_ENDPOINT_INSTANCE));
00995         /* Codes_SRS_SESSION_01_045: [If allocating memory for the link endpoint fails, session_create_link_endpoint shall fail and return NULL.] */
00996         if (result != NULL)
00997         {
00998             /* Codes_SRS_SESSION_01_046: [An unused handle shall be assigned to the link endpoint.] */
00999             handle selected_handle = 0;
01000             size_t i;
01001 
01002             for (i = 0; i < session_instance->link_endpoint_count; i++)
01003             {
01004                 if (session_instance->link_endpoints[i]->output_handle > selected_handle)
01005                 {
01006                     break;
01007                 }
01008 
01009                 selected_handle++;
01010             }
01011 
01012             result->on_session_state_changed = NULL;
01013             result->on_session_flow_on = NULL;
01014             result->frame_received_callback = NULL;
01015             result->callback_context = NULL;
01016             result->output_handle = selected_handle;
01017             result->input_handle = 0xFFFFFFFF;
01018             result->name = amqpalloc_malloc(strlen(name) + 1);
01019             if (result->name == NULL)
01020             {
01021                 /* Codes_SRS_SESSION_01_045: [If allocating memory for the link endpoint fails, session_create_link_endpoint shall fail and return NULL.] */
01022                 amqpalloc_free(result);
01023                 result = NULL;
01024             }
01025             else
01026             {
01027                 LINK_ENDPOINT_INSTANCE** new_link_endpoints;
01028                 strcpy(result->name, name);
01029                 result->session = session;
01030 
01031                 new_link_endpoints = amqpalloc_realloc(session_instance->link_endpoints, sizeof(LINK_ENDPOINT_INSTANCE*) * (session_instance->link_endpoint_count + 1));
01032                 if (new_link_endpoints == NULL)
01033                 {
01034                     /* Codes_SRS_SESSION_01_045: [If allocating memory for the link endpoint fails, session_create_link_endpoint shall fail and return NULL.] */
01035                     amqpalloc_free(result->name);
01036                     amqpalloc_free(result);
01037                     result = NULL;
01038                 }
01039                 else
01040                 {
01041                     session_instance->link_endpoints = new_link_endpoints;
01042 
01043                     if (session_instance->link_endpoint_count - selected_handle > 0)
01044                     {
01045                         (void)memmove(&session_instance->link_endpoints[selected_handle + 1], &session_instance->link_endpoints[selected_handle], (session_instance->link_endpoint_count - selected_handle) * sizeof(LINK_ENDPOINT_INSTANCE*));
01046                     }
01047 
01048                     session_instance->link_endpoints[selected_handle] = result;
01049                     session_instance->link_endpoint_count++;
01050                 }
01051             }
01052         }
01053     }
01054 
01055     return result;
01056 }
01057 
01058 void session_destroy_link_endpoint(LINK_ENDPOINT_HANDLE link_endpoint)
01059 {
01060     /* Codes_SRS_SESSION_01_050: [If link_endpoint is NULL, session_destroy_link_endpoint shall do nothing.] */
01061     if (link_endpoint != NULL)
01062     {
01063         LINK_ENDPOINT_INSTANCE* endpoint_instance = (LINK_ENDPOINT_INSTANCE*)link_endpoint;
01064         SESSION_INSTANCE* session_instance = endpoint_instance->session;
01065         uint64_t i;
01066 
01067         /* Codes_SRS_SESSION_01_049: [session_destroy_link_endpoint shall free all resources associated with the endpoint.] */
01068         for (i = 0; i < session_instance->link_endpoint_count; i++)
01069         {
01070             if (session_instance->link_endpoints[i] == link_endpoint)
01071             {
01072                 break;
01073             }
01074         }
01075 
01076         if (i < session_instance->link_endpoint_count)
01077         {
01078             LINK_ENDPOINT_INSTANCE** new_endpoints;
01079 
01080             if (i < (session_instance->link_endpoint_count - 1))
01081             {
01082                 (void)memmove(&session_instance->link_endpoints[i], &session_instance->link_endpoints[i + 1], (session_instance->link_endpoint_count - (uint32_t)i - 1) * sizeof(LINK_ENDPOINT_INSTANCE*));
01083             }
01084 
01085             session_instance->link_endpoint_count--;
01086 
01087             if (session_instance->link_endpoint_count == 0)
01088             {
01089                 amqpalloc_free(session_instance->link_endpoints);
01090                 session_instance->link_endpoints = NULL;
01091             }
01092             else
01093             {
01094                 new_endpoints = (LINK_ENDPOINT_INSTANCE**)amqpalloc_realloc(session_instance->link_endpoints, sizeof(LINK_ENDPOINT_INSTANCE*) * session_instance->link_endpoint_count);
01095                 if (new_endpoints != NULL)
01096                 {
01097                     session_instance->link_endpoints = new_endpoints;
01098                 }
01099             }
01100         }
01101 
01102         if (endpoint_instance->name != NULL)
01103         {
01104             amqpalloc_free(endpoint_instance->name);
01105         }
01106 
01107         amqpalloc_free(endpoint_instance);
01108     }
01109 }
01110 
01111 int session_start_link_endpoint(LINK_ENDPOINT_HANDLE link_endpoint, ON_ENDPOINT_FRAME_RECEIVED frame_received_callback, ON_SESSION_STATE_CHANGED on_session_state_changed, ON_SESSION_FLOW_ON on_session_flow_on, void* context)
01112 {
01113     int result;
01114 
01115     if ((link_endpoint == NULL) ||
01116         (frame_received_callback == NULL))
01117     {
01118         result = __LINE__;
01119     }
01120     else
01121     {
01122         link_endpoint->frame_received_callback = frame_received_callback;
01123         link_endpoint->on_session_state_changed = on_session_state_changed;
01124         link_endpoint->on_session_flow_on = on_session_flow_on;
01125         link_endpoint->callback_context = context;
01126 
01127         if (link_endpoint->on_session_state_changed != NULL)
01128         {
01129             link_endpoint->on_session_state_changed(link_endpoint->callback_context, link_endpoint->session->session_state, link_endpoint->session->previous_session_state);
01130         }
01131 
01132         result = 0;
01133     }
01134 
01135     return result;
01136 }
01137 
01138 static int encode_frame(LINK_ENDPOINT_HANDLE link_endpoint, const AMQP_VALUE performative, PAYLOAD* payloads, size_t payload_count)
01139 {
01140     int result;
01141 
01142     if ((link_endpoint == NULL) ||
01143         (performative == NULL))
01144     {
01145         result = __LINE__;
01146     }
01147     else
01148     {
01149         LINK_ENDPOINT_INSTANCE* link_endpoint_instance = (LINK_ENDPOINT_INSTANCE*)link_endpoint;
01150         SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)link_endpoint_instance->session;
01151 
01152         if (connection_encode_frame(session_instance->endpoint, performative, payloads, payload_count, NULL, NULL) != 0)
01153         {
01154             result = __LINE__;
01155         }
01156         else
01157         {
01158             result = 0;
01159         }
01160     }
01161 
01162     return result;
01163 }
01164 
01165 int session_send_flow(LINK_ENDPOINT_HANDLE link_endpoint, FLOW_HANDLE flow)
01166 {
01167     int result;
01168 
01169     if ((link_endpoint == NULL) ||
01170         (flow == NULL))
01171     {
01172         result = __LINE__;
01173     }
01174     else
01175     {
01176         LINK_ENDPOINT_INSTANCE* link_endpoint_instance = (LINK_ENDPOINT_INSTANCE*)link_endpoint;
01177         SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)link_endpoint_instance->session;
01178 
01179         result = 0;
01180 
01181         if (session_instance->session_state == SESSION_STATE_BEGIN_RCVD)
01182         {
01183             if (flow_set_next_incoming_id(flow, session_instance->next_incoming_id) != 0)
01184             {
01185                 result = __LINE__;
01186             }
01187         }
01188 
01189         if (result == 0)
01190         {
01191             if ((flow_set_next_incoming_id(flow, session_instance->next_incoming_id) != 0) ||
01192                 (flow_set_incoming_window(flow, session_instance->incoming_window) != 0) ||
01193                 (flow_set_next_outgoing_id(flow, session_instance->next_outgoing_id) != 0) ||
01194                 (flow_set_outgoing_window(flow, session_instance->outgoing_window) != 0) ||
01195                 (flow_set_handle(flow, link_endpoint_instance->output_handle) != 0))
01196             {
01197                 result = __LINE__;
01198             }
01199             else
01200             {
01201                 AMQP_VALUE flow_performative_value = amqpvalue_create_flow(flow);
01202                 if (flow_performative_value == NULL)
01203                 {
01204                     result = __LINE__;
01205                 }
01206                 else
01207                 {
01208                     if (encode_frame(link_endpoint, flow_performative_value, NULL, 0) != 0)
01209                     {
01210                         result = __LINE__;
01211                     }
01212                     else
01213                     {
01214                         result = 0;
01215                     }
01216 
01217                     amqpvalue_destroy(flow_performative_value);
01218                 }
01219             }
01220         }
01221     }
01222 
01223     return result;
01224 }
01225 
01226 int session_send_attach(LINK_ENDPOINT_HANDLE link_endpoint, ATTACH_HANDLE attach)
01227 {
01228     int result;
01229 
01230     if ((link_endpoint == NULL) ||
01231         (attach == NULL))
01232     {
01233         result = __LINE__;
01234     }
01235     else
01236     {
01237         LINK_ENDPOINT_INSTANCE* link_endpoint_instance = (LINK_ENDPOINT_INSTANCE*)link_endpoint;
01238 
01239         if (attach_set_handle(attach, link_endpoint_instance->output_handle) != 0)
01240         {
01241             result = __LINE__;
01242         }
01243         else
01244         {
01245             AMQP_VALUE attach_performative_value = amqpvalue_create_attach(attach);
01246             if (attach_performative_value == NULL)
01247             {
01248                 result = __LINE__;
01249             }
01250             else
01251             {
01252                 if (encode_frame(link_endpoint, attach_performative_value, NULL, 0) != 0)
01253                 {
01254                     result = __LINE__;
01255                 }
01256                 else
01257                 {
01258                     result = 0;
01259                 }
01260 
01261                 amqpvalue_destroy(attach_performative_value);
01262             }
01263         }
01264     }
01265 
01266     return result;
01267 }
01268 
01269 int session_send_disposition(LINK_ENDPOINT_HANDLE link_endpoint, DISPOSITION_HANDLE disposition)
01270 {
01271     int result;
01272 
01273     if ((link_endpoint == NULL) ||
01274         (disposition == NULL))
01275     {
01276         result = __LINE__;
01277     }
01278     else
01279     {
01280         AMQP_VALUE disposition_performative_value = amqpvalue_create_disposition(disposition);
01281         if (disposition_performative_value == NULL)
01282         {
01283             result = __LINE__;
01284         }
01285         else
01286         {
01287             if (encode_frame(link_endpoint, disposition_performative_value, NULL, 0) != 0)
01288             {
01289                 result = __LINE__;
01290             }
01291             else
01292             {
01293                 result = 0;
01294             }
01295 
01296             amqpvalue_destroy(disposition_performative_value);
01297         }
01298     }
01299 
01300     return result;
01301 }
01302 
01303 int session_send_detach(LINK_ENDPOINT_HANDLE link_endpoint, DETACH_HANDLE detach)
01304 {
01305     int result;
01306 
01307     if ((link_endpoint == NULL) ||
01308         (detach == NULL))
01309     {
01310         result = __LINE__;
01311     }
01312     else
01313     {
01314         LINK_ENDPOINT_INSTANCE* link_endpoint_instance = (LINK_ENDPOINT_INSTANCE*)link_endpoint;
01315 
01316         if (detach_set_handle(detach, link_endpoint_instance->output_handle) != 0)
01317         {
01318             result = __LINE__;
01319         }
01320         else
01321         {
01322             AMQP_VALUE detach_performative_value = amqpvalue_create_detach(detach);
01323             if (detach_performative_value == NULL)
01324             {
01325                 result = __LINE__;
01326             }
01327             else
01328             {
01329                 if (encode_frame(link_endpoint, detach_performative_value, NULL, 0) != 0)
01330                 {
01331                     result = __LINE__;
01332                 }
01333                 else
01334                 {
01335                     result = 0;
01336                 }
01337 
01338                 amqpvalue_destroy(detach_performative_value);
01339             }
01340         }
01341     }
01342 
01343     return result;
01344 }
01345 
01346 /* Codes_SRS_SESSION_01_051: [session_send_transfer shall send a transfer frame with the performative indicated in the transfer argument.] */
01347 SESSION_SEND_TRANSFER_RESULT session_send_transfer(LINK_ENDPOINT_HANDLE link_endpoint, TRANSFER_HANDLE transfer, PAYLOAD* payloads, size_t payload_count, delivery_number* delivery_id, ON_SEND_COMPLETE on_send_complete, void* callback_context)
01348 {
01349     SESSION_SEND_TRANSFER_RESULT result;
01350 
01351     /* Codes_SRS_SESSION_01_054: [If link_endpoint or transfer is NULL, session_send_transfer shall fail and return a non-zero value.] */
01352     if ((link_endpoint == NULL) ||
01353         (transfer == NULL))
01354     {
01355         result = SESSION_SEND_TRANSFER_ERROR;
01356     }
01357     else
01358     {
01359         LINK_ENDPOINT_INSTANCE* link_endpoint_instance = (LINK_ENDPOINT_INSTANCE*)link_endpoint;
01360         SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)link_endpoint_instance->session;
01361 
01362         /* Codes_SRS_SESSION_01_059: [When session_send_transfer is called while the session is not in the MAPPED state, session_send_transfer shall fail and return a non-zero value.] */
01363         if (session_instance->session_state != SESSION_STATE_MAPPED)
01364         {
01365             result = SESSION_SEND_TRANSFER_ERROR;
01366         }
01367         else
01368         {
01369             size_t payload_size = 0;
01370             size_t i;
01371 
01372             for (i = 0; i < payload_count; i++)
01373             {
01374                 if ((payloads[i].length > UINT32_MAX) ||
01375                     (payload_size + payloads[i].length < payload_size))
01376                 {
01377                     break;
01378                 }
01379 
01380                 payload_size += payloads[i].length;
01381             }
01382 
01383             if ((i < payload_count) ||
01384                 (payload_size > UINT32_MAX))
01385             {
01386                 result = SESSION_SEND_TRANSFER_ERROR;
01387             }
01388             else
01389             {
01390                 if (session_instance->remote_incoming_window == 0)
01391                 {
01392                     result = SESSION_SEND_TRANSFER_BUSY;
01393                 }
01394                 else
01395                 {
01396                     /* Codes_SRS_SESSION_01_012: [The session endpoint assigns each outgoing transfer frame an implicit transfer-id from a session scoped sequence.] */
01397                     /* Codes_SRS_SESSION_01_027: [sending a transfer Upon sending a transfer, the sending endpoint will increment its next-outgoing-id] */
01398                     *delivery_id = session_instance->next_outgoing_id;
01399                     if ((transfer_set_handle(transfer, link_endpoint_instance->output_handle) != 0) ||
01400                         (transfer_set_delivery_id(transfer, *delivery_id) != 0) ||
01401                         (transfer_set_more(transfer, false) != 0))
01402                     {
01403                         /* Codes_SRS_SESSION_01_058: [When any other error occurs, session_send_transfer shall fail and return a non-zero value.] */
01404                         result = SESSION_SEND_TRANSFER_ERROR;
01405                     }
01406                     else
01407                     {
01408                         AMQP_VALUE transfer_value;
01409 
01410                         transfer_value = amqpvalue_create_transfer(transfer);
01411                         if (transfer_value == NULL)
01412                         {
01413                             /* Codes_SRS_SESSION_01_058: [When any other error occurs, session_send_transfer shall fail and return a non-zero value.] */
01414                             result = SESSION_SEND_TRANSFER_ERROR;
01415                         }
01416                         else
01417                         {
01418                             uint32_t available_frame_size;
01419                             size_t encoded_size;
01420 
01421                             if ((connection_get_remote_max_frame_size(session_instance->connection, &available_frame_size) != 0) ||
01422                                 (amqpvalue_get_encoded_size(transfer_value, &encoded_size) != 0))
01423                             {
01424                                 result = SESSION_SEND_TRANSFER_ERROR;
01425                             }
01426                             else
01427                             {
01428                                 payload_size = 0;
01429 
01430                                 for (i = 0; i < payload_count; i++)
01431                                 {
01432                                     payload_size += payloads[i].length;
01433                                 }
01434 
01435                                 available_frame_size -= (uint32_t)encoded_size;
01436                                 available_frame_size -= 8;
01437 
01438                                 if (available_frame_size >= payload_size)
01439                                 {
01440                                     /* Codes_SRS_SESSION_01_055: [The encoding of the frame shall be done by calling connection_encode_frame and passing as arguments: the connection handle associated with the session, the transfer performative and the payload chunks passed to session_send_transfer.] */
01441                                     if (connection_encode_frame(session_instance->endpoint, transfer_value, payloads, payload_count, on_send_complete, callback_context) != 0)
01442                                     {
01443                                         /* Codes_SRS_SESSION_01_056: [If connection_encode_frame fails then session_send_transfer shall fail and return a non-zero value.] */
01444                                         result = SESSION_SEND_TRANSFER_ERROR;
01445                                     }
01446                                     else
01447                                     {
01448                                         /* Codes_SRS_SESSION_01_018: [is incremented after each successive transfer according to RFC-1982 [RFC1982] serial number arithmetic.] */
01449                                         session_instance->next_outgoing_id++;
01450                                         session_instance->remote_incoming_window--;
01451                                         session_instance->outgoing_window--;
01452 
01453                                         /* Codes_SRS_SESSION_01_053: [On success, session_send_transfer shall return 0.] */
01454                                         result = SESSION_SEND_TRANSFER_OK;
01455                                     }
01456                                 }
01457                                 else
01458                                 {
01459                                     size_t current_payload_index = 0;
01460                                     uint32_t current_payload_pos = 0;
01461 
01462                                     /* break it down into different deliveries */
01463                                     while (payload_size > 0)
01464                                     {
01465                                         uint32_t transfer_frame_payload_count = 0;
01466                                         uint32_t current_transfer_frame_payload_size = (uint32_t)payload_size;
01467                                         uint32_t byte_counter;
01468                                         size_t temp_current_payload_index = current_payload_index;
01469                                         uint32_t temp_current_payload_pos = current_payload_pos;
01470                                         AMQP_VALUE multi_transfer_amqp_value;
01471                                         bool more;
01472 
01473                                         if (current_transfer_frame_payload_size > available_frame_size)
01474                                         {
01475                                             current_transfer_frame_payload_size = available_frame_size;
01476                                         }
01477 
01478                                         if (available_frame_size >= payload_size)
01479                                         {
01480                                             more = false;
01481                                         }
01482                                         else
01483                                         {
01484                                             more = true;
01485                                         }
01486 
01487                                         if (transfer_set_more(transfer, more) != 0)
01488                                         {
01489                                             break;
01490                                         }
01491 
01492                                         multi_transfer_amqp_value = amqpvalue_create_transfer(transfer);
01493                                         if (multi_transfer_amqp_value == NULL)
01494                                         {
01495                                             break;
01496                                         }
01497 
01498                                         byte_counter = current_transfer_frame_payload_size;
01499                                         while (byte_counter > 0)
01500                                         {
01501                                             if (payloads[temp_current_payload_index].length - temp_current_payload_pos >= byte_counter)
01502                                             {
01503                                                 /* more data than we need */
01504                                                 temp_current_payload_pos += byte_counter;
01505                                                 byte_counter = 0;
01506                                             }
01507                                             else
01508                                             {
01509                                                 byte_counter -= (uint32_t)payloads[temp_current_payload_index].length - temp_current_payload_pos;
01510                                                 temp_current_payload_index++;
01511                                                 temp_current_payload_pos = 0;
01512                                             }
01513                                         }
01514 
01515                                         transfer_frame_payload_count = (uint32_t)(temp_current_payload_index - current_payload_index + 1);
01516                                         PAYLOAD* transfer_frame_payloads = (PAYLOAD*)amqpalloc_malloc(transfer_frame_payload_count * sizeof(PAYLOAD));
01517                                         if (transfer_frame_payloads == NULL)
01518                                         {
01519                                             amqpvalue_destroy(multi_transfer_amqp_value);
01520                                             break;
01521                                         }
01522 
01523                                         /* copy data */
01524                                         byte_counter = current_transfer_frame_payload_size;
01525                                         transfer_frame_payload_count = 0;
01526 
01527                                         while (byte_counter > 0)
01528                                         {
01529                                             if (payloads[current_payload_index].length - current_payload_pos > byte_counter)
01530                                             {
01531                                                 /* more data than we need */
01532                                                 transfer_frame_payloads[transfer_frame_payload_count].bytes = payloads[current_payload_index].bytes + current_payload_pos;
01533                                                 transfer_frame_payloads[transfer_frame_payload_count].length = byte_counter;
01534                                                 current_payload_pos += byte_counter;
01535                                                 byte_counter = 0;
01536                                             }
01537                                             else
01538                                             {
01539                                                 /* copy entire payload and move to the next */
01540                                                 transfer_frame_payloads[transfer_frame_payload_count].bytes = payloads[current_payload_index].bytes + current_payload_pos;
01541                                                 transfer_frame_payloads[transfer_frame_payload_count].length = payloads[current_payload_index].length - current_payload_pos;
01542                                                 byte_counter -= (uint32_t)payloads[current_payload_index].length - current_payload_pos;
01543                                                 current_payload_index++;
01544                                                 current_payload_pos = 0;
01545                                             }
01546 
01547                                             transfer_frame_payload_count++;
01548                                         }
01549 
01550                                         if (connection_encode_frame(session_instance->endpoint, multi_transfer_amqp_value, transfer_frame_payloads, transfer_frame_payload_count, on_send_complete, callback_context) != 0)
01551                                         {
01552                                             amqpalloc_free(transfer_frame_payloads);
01553                                             amqpvalue_destroy(multi_transfer_amqp_value);
01554                                             break;
01555                                         }
01556 
01557                                         amqpalloc_free(transfer_frame_payloads);
01558                                         amqpvalue_destroy(multi_transfer_amqp_value);
01559                                         payload_size -= current_transfer_frame_payload_size;
01560                                     }
01561 
01562                                     if (payload_size > 0)
01563                                     {
01564                                         result = SESSION_SEND_TRANSFER_ERROR;
01565                                     }
01566                                     else
01567                                     {
01568                                         /* Codes_SRS_SESSION_01_018: [is incremented after each successive transfer according to RFC-1982 [RFC1982] serial number arithmetic.] */
01569                                         session_instance->next_outgoing_id++;
01570                                         session_instance->remote_incoming_window--;
01571                                         session_instance->outgoing_window--;
01572 
01573                                         result = SESSION_SEND_TRANSFER_OK;
01574                                     }
01575                                 }
01576                             }
01577 
01578                             amqpvalue_destroy(transfer_value);
01579                         }
01580                     }
01581                 }
01582             }
01583         }
01584     }
01585 
01586     return result;
01587 }