Xin Zhang / azure-iot-c-sdk-f767zi

Dependents:   samplemqtt

Committer:
XinZhangMS
Date:
Thu Aug 23 06:52:14 2018 +0000
Revision:
0:f7f1f0d76dd6
azure-c-sdk for mbed os supporting NUCLEO_F767ZI

Who changed what in which revision?

UserRevisionLine numberNew contents of line
XinZhangMS 0:f7f1f0d76dd6 1 // Copyright (c) Microsoft. All rights reserved.
XinZhangMS 0:f7f1f0d76dd6 2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
XinZhangMS 0:f7f1f0d76dd6 3
XinZhangMS 0:f7f1f0d76dd6 4 #include <stdlib.h>
XinZhangMS 0:f7f1f0d76dd6 5 #include <string.h>
XinZhangMS 0:f7f1f0d76dd6 6 #include <stdint.h>
XinZhangMS 0:f7f1f0d76dd6 7 #include <stdbool.h>
XinZhangMS 0:f7f1f0d76dd6 8 #include "azure_c_shared_utility/gballoc.h"
XinZhangMS 0:f7f1f0d76dd6 9 #include "azure_c_shared_utility/optimize_size.h"
XinZhangMS 0:f7f1f0d76dd6 10 #include "azure_c_shared_utility/xlogging.h"
XinZhangMS 0:f7f1f0d76dd6 11 #include "azure_c_shared_utility/singlylinkedlist.h"
XinZhangMS 0:f7f1f0d76dd6 12 #include "azure_c_shared_utility/tickcounter.h"
XinZhangMS 0:f7f1f0d76dd6 13 #include "azure_uamqp_c/link.h"
XinZhangMS 0:f7f1f0d76dd6 14 #include "azure_uamqp_c/session.h"
XinZhangMS 0:f7f1f0d76dd6 15 #include "azure_uamqp_c/amqpvalue.h"
XinZhangMS 0:f7f1f0d76dd6 16 #include "azure_uamqp_c/amqp_definitions.h"
XinZhangMS 0:f7f1f0d76dd6 17 #include "azure_uamqp_c/amqp_frame_codec.h"
XinZhangMS 0:f7f1f0d76dd6 18 #include "azure_uamqp_c/async_operation.h"
XinZhangMS 0:f7f1f0d76dd6 19
XinZhangMS 0:f7f1f0d76dd6 20 #define DEFAULT_LINK_CREDIT 10000
XinZhangMS 0:f7f1f0d76dd6 21
XinZhangMS 0:f7f1f0d76dd6 22 typedef struct DELIVERY_INSTANCE_TAG
XinZhangMS 0:f7f1f0d76dd6 23 {
XinZhangMS 0:f7f1f0d76dd6 24 delivery_number delivery_id;
XinZhangMS 0:f7f1f0d76dd6 25 ON_DELIVERY_SETTLED on_delivery_settled;
XinZhangMS 0:f7f1f0d76dd6 26 void* callback_context;
XinZhangMS 0:f7f1f0d76dd6 27 void* link;
XinZhangMS 0:f7f1f0d76dd6 28 tickcounter_ms_t start_tick;
XinZhangMS 0:f7f1f0d76dd6 29 tickcounter_ms_t timeout;
XinZhangMS 0:f7f1f0d76dd6 30 } DELIVERY_INSTANCE;
XinZhangMS 0:f7f1f0d76dd6 31
XinZhangMS 0:f7f1f0d76dd6 32 typedef struct ON_LINK_DETACH_EVENT_SUBSCRIPTION_TAG
XinZhangMS 0:f7f1f0d76dd6 33 {
XinZhangMS 0:f7f1f0d76dd6 34 ON_LINK_DETACH_RECEIVED on_link_detach_received;
XinZhangMS 0:f7f1f0d76dd6 35 void* context;
XinZhangMS 0:f7f1f0d76dd6 36 } ON_LINK_DETACH_EVENT_SUBSCRIPTION;
XinZhangMS 0:f7f1f0d76dd6 37
XinZhangMS 0:f7f1f0d76dd6 38 typedef struct LINK_INSTANCE_TAG
XinZhangMS 0:f7f1f0d76dd6 39 {
XinZhangMS 0:f7f1f0d76dd6 40 SESSION_HANDLE session;
XinZhangMS 0:f7f1f0d76dd6 41 LINK_STATE link_state;
XinZhangMS 0:f7f1f0d76dd6 42 LINK_STATE previous_link_state;
XinZhangMS 0:f7f1f0d76dd6 43 AMQP_VALUE source;
XinZhangMS 0:f7f1f0d76dd6 44 AMQP_VALUE target;
XinZhangMS 0:f7f1f0d76dd6 45 handle handle;
XinZhangMS 0:f7f1f0d76dd6 46 LINK_ENDPOINT_HANDLE link_endpoint;
XinZhangMS 0:f7f1f0d76dd6 47 char* name;
XinZhangMS 0:f7f1f0d76dd6 48 SINGLYLINKEDLIST_HANDLE pending_deliveries;
XinZhangMS 0:f7f1f0d76dd6 49 sequence_no delivery_count;
XinZhangMS 0:f7f1f0d76dd6 50 role role;
XinZhangMS 0:f7f1f0d76dd6 51 ON_LINK_STATE_CHANGED on_link_state_changed;
XinZhangMS 0:f7f1f0d76dd6 52 ON_LINK_FLOW_ON on_link_flow_on;
XinZhangMS 0:f7f1f0d76dd6 53 ON_TRANSFER_RECEIVED on_transfer_received;
XinZhangMS 0:f7f1f0d76dd6 54 void* callback_context;
XinZhangMS 0:f7f1f0d76dd6 55 sender_settle_mode snd_settle_mode;
XinZhangMS 0:f7f1f0d76dd6 56 receiver_settle_mode rcv_settle_mode;
XinZhangMS 0:f7f1f0d76dd6 57 sequence_no initial_delivery_count;
XinZhangMS 0:f7f1f0d76dd6 58 uint64_t max_message_size;
XinZhangMS 0:f7f1f0d76dd6 59 uint64_t peer_max_message_size;
XinZhangMS 0:f7f1f0d76dd6 60 uint32_t current_link_credit;
XinZhangMS 0:f7f1f0d76dd6 61 uint32_t max_link_credit;
XinZhangMS 0:f7f1f0d76dd6 62 uint32_t available;
XinZhangMS 0:f7f1f0d76dd6 63 fields attach_properties;
XinZhangMS 0:f7f1f0d76dd6 64 bool is_underlying_session_begun;
XinZhangMS 0:f7f1f0d76dd6 65 bool is_closed;
XinZhangMS 0:f7f1f0d76dd6 66 unsigned char* received_payload;
XinZhangMS 0:f7f1f0d76dd6 67 uint32_t received_payload_size;
XinZhangMS 0:f7f1f0d76dd6 68 delivery_number received_delivery_id;
XinZhangMS 0:f7f1f0d76dd6 69 TICK_COUNTER_HANDLE tick_counter;
XinZhangMS 0:f7f1f0d76dd6 70 ON_LINK_DETACH_EVENT_SUBSCRIPTION on_link_detach_received_event_subscription;
XinZhangMS 0:f7f1f0d76dd6 71 } LINK_INSTANCE;
XinZhangMS 0:f7f1f0d76dd6 72
XinZhangMS 0:f7f1f0d76dd6 73 DEFINE_ASYNC_OPERATION_CONTEXT(DELIVERY_INSTANCE);
XinZhangMS 0:f7f1f0d76dd6 74
XinZhangMS 0:f7f1f0d76dd6 75 static void set_link_state(LINK_INSTANCE* link_instance, LINK_STATE link_state)
XinZhangMS 0:f7f1f0d76dd6 76 {
XinZhangMS 0:f7f1f0d76dd6 77 link_instance->previous_link_state = link_instance->link_state;
XinZhangMS 0:f7f1f0d76dd6 78 link_instance->link_state = link_state;
XinZhangMS 0:f7f1f0d76dd6 79
XinZhangMS 0:f7f1f0d76dd6 80 if (link_instance->on_link_state_changed != NULL)
XinZhangMS 0:f7f1f0d76dd6 81 {
XinZhangMS 0:f7f1f0d76dd6 82 link_instance->on_link_state_changed(link_instance->callback_context, link_state, link_instance->previous_link_state);
XinZhangMS 0:f7f1f0d76dd6 83 }
XinZhangMS 0:f7f1f0d76dd6 84 }
XinZhangMS 0:f7f1f0d76dd6 85
XinZhangMS 0:f7f1f0d76dd6 86 static void remove_all_pending_deliveries(LINK_INSTANCE* link, bool indicate_settled)
XinZhangMS 0:f7f1f0d76dd6 87 {
XinZhangMS 0:f7f1f0d76dd6 88 if (link->pending_deliveries != NULL)
XinZhangMS 0:f7f1f0d76dd6 89 {
XinZhangMS 0:f7f1f0d76dd6 90 LIST_ITEM_HANDLE item = singlylinkedlist_get_head_item(link->pending_deliveries);
XinZhangMS 0:f7f1f0d76dd6 91 while (item != NULL)
XinZhangMS 0:f7f1f0d76dd6 92 {
XinZhangMS 0:f7f1f0d76dd6 93 LIST_ITEM_HANDLE next_item = singlylinkedlist_get_next_item(item);
XinZhangMS 0:f7f1f0d76dd6 94 ASYNC_OPERATION_HANDLE pending_delivery_operation = (ASYNC_OPERATION_HANDLE)singlylinkedlist_item_get_value(item);
XinZhangMS 0:f7f1f0d76dd6 95 if (pending_delivery_operation != NULL)
XinZhangMS 0:f7f1f0d76dd6 96 {
XinZhangMS 0:f7f1f0d76dd6 97 DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)GET_ASYNC_OPERATION_CONTEXT(DELIVERY_INSTANCE, pending_delivery_operation);
XinZhangMS 0:f7f1f0d76dd6 98 if (indicate_settled && (delivery_instance->on_delivery_settled != NULL))
XinZhangMS 0:f7f1f0d76dd6 99 {
XinZhangMS 0:f7f1f0d76dd6 100 delivery_instance->on_delivery_settled(delivery_instance->callback_context, delivery_instance->delivery_id, LINK_DELIVERY_SETTLE_REASON_NOT_DELIVERED, NULL);
XinZhangMS 0:f7f1f0d76dd6 101 }
XinZhangMS 0:f7f1f0d76dd6 102
XinZhangMS 0:f7f1f0d76dd6 103 async_operation_destroy(pending_delivery_operation);
XinZhangMS 0:f7f1f0d76dd6 104 }
XinZhangMS 0:f7f1f0d76dd6 105
XinZhangMS 0:f7f1f0d76dd6 106 item = next_item;
XinZhangMS 0:f7f1f0d76dd6 107 }
XinZhangMS 0:f7f1f0d76dd6 108
XinZhangMS 0:f7f1f0d76dd6 109 singlylinkedlist_destroy(link->pending_deliveries);
XinZhangMS 0:f7f1f0d76dd6 110 link->pending_deliveries = NULL;
XinZhangMS 0:f7f1f0d76dd6 111 }
XinZhangMS 0:f7f1f0d76dd6 112 }
XinZhangMS 0:f7f1f0d76dd6 113
XinZhangMS 0:f7f1f0d76dd6 114 static int send_flow(LINK_INSTANCE* link)
XinZhangMS 0:f7f1f0d76dd6 115 {
XinZhangMS 0:f7f1f0d76dd6 116 int result;
XinZhangMS 0:f7f1f0d76dd6 117 FLOW_HANDLE flow = flow_create(0, 0, 0);
XinZhangMS 0:f7f1f0d76dd6 118
XinZhangMS 0:f7f1f0d76dd6 119 if (flow == NULL)
XinZhangMS 0:f7f1f0d76dd6 120 {
XinZhangMS 0:f7f1f0d76dd6 121 LogError("NULL flow performative");
XinZhangMS 0:f7f1f0d76dd6 122 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 123 }
XinZhangMS 0:f7f1f0d76dd6 124 else
XinZhangMS 0:f7f1f0d76dd6 125 {
XinZhangMS 0:f7f1f0d76dd6 126 if (flow_set_link_credit(flow, link->current_link_credit) != 0)
XinZhangMS 0:f7f1f0d76dd6 127 {
XinZhangMS 0:f7f1f0d76dd6 128 LogError("Cannot set link credit on flow performative");
XinZhangMS 0:f7f1f0d76dd6 129 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 130 }
XinZhangMS 0:f7f1f0d76dd6 131 else if (flow_set_handle(flow, link->handle) != 0)
XinZhangMS 0:f7f1f0d76dd6 132 {
XinZhangMS 0:f7f1f0d76dd6 133 LogError("Cannot set handle on flow performative");
XinZhangMS 0:f7f1f0d76dd6 134 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 135 }
XinZhangMS 0:f7f1f0d76dd6 136 else if (flow_set_delivery_count(flow, link->delivery_count) != 0)
XinZhangMS 0:f7f1f0d76dd6 137 {
XinZhangMS 0:f7f1f0d76dd6 138 LogError("Cannot set delivery count on flow performative");
XinZhangMS 0:f7f1f0d76dd6 139 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 140 }
XinZhangMS 0:f7f1f0d76dd6 141 else
XinZhangMS 0:f7f1f0d76dd6 142 {
XinZhangMS 0:f7f1f0d76dd6 143 if (session_send_flow(link->link_endpoint, flow) != 0)
XinZhangMS 0:f7f1f0d76dd6 144 {
XinZhangMS 0:f7f1f0d76dd6 145 LogError("Sending flow frame failed in session send");
XinZhangMS 0:f7f1f0d76dd6 146 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 147 }
XinZhangMS 0:f7f1f0d76dd6 148 else
XinZhangMS 0:f7f1f0d76dd6 149 {
XinZhangMS 0:f7f1f0d76dd6 150 result = 0;
XinZhangMS 0:f7f1f0d76dd6 151 }
XinZhangMS 0:f7f1f0d76dd6 152 }
XinZhangMS 0:f7f1f0d76dd6 153
XinZhangMS 0:f7f1f0d76dd6 154 flow_destroy(flow);
XinZhangMS 0:f7f1f0d76dd6 155 }
XinZhangMS 0:f7f1f0d76dd6 156
XinZhangMS 0:f7f1f0d76dd6 157 return result;
XinZhangMS 0:f7f1f0d76dd6 158 }
XinZhangMS 0:f7f1f0d76dd6 159
XinZhangMS 0:f7f1f0d76dd6 160 static int send_disposition(LINK_INSTANCE* link_instance, delivery_number delivery_number, AMQP_VALUE delivery_state)
XinZhangMS 0:f7f1f0d76dd6 161 {
XinZhangMS 0:f7f1f0d76dd6 162 int result;
XinZhangMS 0:f7f1f0d76dd6 163
XinZhangMS 0:f7f1f0d76dd6 164 DISPOSITION_HANDLE disposition = disposition_create(link_instance->role, delivery_number);
XinZhangMS 0:f7f1f0d76dd6 165 if (disposition == NULL)
XinZhangMS 0:f7f1f0d76dd6 166 {
XinZhangMS 0:f7f1f0d76dd6 167 LogError("NULL disposition performative");
XinZhangMS 0:f7f1f0d76dd6 168 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 169 }
XinZhangMS 0:f7f1f0d76dd6 170 else
XinZhangMS 0:f7f1f0d76dd6 171 {
XinZhangMS 0:f7f1f0d76dd6 172 if (disposition_set_last(disposition, delivery_number) != 0)
XinZhangMS 0:f7f1f0d76dd6 173 {
XinZhangMS 0:f7f1f0d76dd6 174 LogError("Failed setting last on disposition performative");
XinZhangMS 0:f7f1f0d76dd6 175 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 176 }
XinZhangMS 0:f7f1f0d76dd6 177 else if (disposition_set_settled(disposition, true) != 0)
XinZhangMS 0:f7f1f0d76dd6 178 {
XinZhangMS 0:f7f1f0d76dd6 179 LogError("Failed setting settled on disposition performative");
XinZhangMS 0:f7f1f0d76dd6 180 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 181 }
XinZhangMS 0:f7f1f0d76dd6 182 else if ((delivery_state != NULL) && (disposition_set_state(disposition, delivery_state) != 0))
XinZhangMS 0:f7f1f0d76dd6 183 {
XinZhangMS 0:f7f1f0d76dd6 184 LogError("Failed setting state on disposition performative");
XinZhangMS 0:f7f1f0d76dd6 185 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 186 }
XinZhangMS 0:f7f1f0d76dd6 187 else
XinZhangMS 0:f7f1f0d76dd6 188 {
XinZhangMS 0:f7f1f0d76dd6 189 if (session_send_disposition(link_instance->link_endpoint, disposition) != 0)
XinZhangMS 0:f7f1f0d76dd6 190 {
XinZhangMS 0:f7f1f0d76dd6 191 LogError("Sending disposition failed in session send");
XinZhangMS 0:f7f1f0d76dd6 192 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 193 }
XinZhangMS 0:f7f1f0d76dd6 194 else
XinZhangMS 0:f7f1f0d76dd6 195 {
XinZhangMS 0:f7f1f0d76dd6 196 result = 0;
XinZhangMS 0:f7f1f0d76dd6 197 }
XinZhangMS 0:f7f1f0d76dd6 198 }
XinZhangMS 0:f7f1f0d76dd6 199
XinZhangMS 0:f7f1f0d76dd6 200 disposition_destroy(disposition);
XinZhangMS 0:f7f1f0d76dd6 201 }
XinZhangMS 0:f7f1f0d76dd6 202
XinZhangMS 0:f7f1f0d76dd6 203 return result;
XinZhangMS 0:f7f1f0d76dd6 204 }
XinZhangMS 0:f7f1f0d76dd6 205
XinZhangMS 0:f7f1f0d76dd6 206 static int send_detach(LINK_INSTANCE* link_instance, bool close, ERROR_HANDLE error_handle)
XinZhangMS 0:f7f1f0d76dd6 207 {
XinZhangMS 0:f7f1f0d76dd6 208 int result;
XinZhangMS 0:f7f1f0d76dd6 209 DETACH_HANDLE detach_performative;
XinZhangMS 0:f7f1f0d76dd6 210
XinZhangMS 0:f7f1f0d76dd6 211 detach_performative = detach_create(0);
XinZhangMS 0:f7f1f0d76dd6 212 if (detach_performative == NULL)
XinZhangMS 0:f7f1f0d76dd6 213 {
XinZhangMS 0:f7f1f0d76dd6 214 LogError("NULL detach performative");
XinZhangMS 0:f7f1f0d76dd6 215 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 216 }
XinZhangMS 0:f7f1f0d76dd6 217 else
XinZhangMS 0:f7f1f0d76dd6 218 {
XinZhangMS 0:f7f1f0d76dd6 219 if ((error_handle != NULL) &&
XinZhangMS 0:f7f1f0d76dd6 220 (detach_set_error(detach_performative, error_handle) != 0))
XinZhangMS 0:f7f1f0d76dd6 221 {
XinZhangMS 0:f7f1f0d76dd6 222 LogError("Failed setting error on detach frame");
XinZhangMS 0:f7f1f0d76dd6 223 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 224 }
XinZhangMS 0:f7f1f0d76dd6 225 else if (close &&
XinZhangMS 0:f7f1f0d76dd6 226 (detach_set_closed(detach_performative, true) != 0))
XinZhangMS 0:f7f1f0d76dd6 227 {
XinZhangMS 0:f7f1f0d76dd6 228 LogError("Failed setting closed field on detach frame");
XinZhangMS 0:f7f1f0d76dd6 229 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 230 }
XinZhangMS 0:f7f1f0d76dd6 231 else
XinZhangMS 0:f7f1f0d76dd6 232 {
XinZhangMS 0:f7f1f0d76dd6 233 if (session_send_detach(link_instance->link_endpoint, detach_performative) != 0)
XinZhangMS 0:f7f1f0d76dd6 234 {
XinZhangMS 0:f7f1f0d76dd6 235 LogError("Sending detach frame failed in session send");
XinZhangMS 0:f7f1f0d76dd6 236 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 237 }
XinZhangMS 0:f7f1f0d76dd6 238 else
XinZhangMS 0:f7f1f0d76dd6 239 {
XinZhangMS 0:f7f1f0d76dd6 240 if (close)
XinZhangMS 0:f7f1f0d76dd6 241 {
XinZhangMS 0:f7f1f0d76dd6 242 /* Declare link to be closed */
XinZhangMS 0:f7f1f0d76dd6 243 link_instance->is_closed = true;
XinZhangMS 0:f7f1f0d76dd6 244 }
XinZhangMS 0:f7f1f0d76dd6 245
XinZhangMS 0:f7f1f0d76dd6 246 result = 0;
XinZhangMS 0:f7f1f0d76dd6 247 }
XinZhangMS 0:f7f1f0d76dd6 248 }
XinZhangMS 0:f7f1f0d76dd6 249
XinZhangMS 0:f7f1f0d76dd6 250 detach_destroy(detach_performative);
XinZhangMS 0:f7f1f0d76dd6 251 }
XinZhangMS 0:f7f1f0d76dd6 252
XinZhangMS 0:f7f1f0d76dd6 253 return result;
XinZhangMS 0:f7f1f0d76dd6 254 }
XinZhangMS 0:f7f1f0d76dd6 255
XinZhangMS 0:f7f1f0d76dd6 256 static int send_attach(LINK_INSTANCE* link, const char* name, handle handle, role role)
XinZhangMS 0:f7f1f0d76dd6 257 {
XinZhangMS 0:f7f1f0d76dd6 258 int result;
XinZhangMS 0:f7f1f0d76dd6 259 ATTACH_HANDLE attach = attach_create(name, handle, role);
XinZhangMS 0:f7f1f0d76dd6 260
XinZhangMS 0:f7f1f0d76dd6 261 if (attach == NULL)
XinZhangMS 0:f7f1f0d76dd6 262 {
XinZhangMS 0:f7f1f0d76dd6 263 LogError("NULL attach performative");
XinZhangMS 0:f7f1f0d76dd6 264 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 265 }
XinZhangMS 0:f7f1f0d76dd6 266 else
XinZhangMS 0:f7f1f0d76dd6 267 {
XinZhangMS 0:f7f1f0d76dd6 268 result = 0;
XinZhangMS 0:f7f1f0d76dd6 269
XinZhangMS 0:f7f1f0d76dd6 270 link->delivery_count = link->initial_delivery_count;
XinZhangMS 0:f7f1f0d76dd6 271
XinZhangMS 0:f7f1f0d76dd6 272 attach_set_snd_settle_mode(attach, link->snd_settle_mode);
XinZhangMS 0:f7f1f0d76dd6 273 attach_set_rcv_settle_mode(attach, link->rcv_settle_mode);
XinZhangMS 0:f7f1f0d76dd6 274 attach_set_role(attach, role);
XinZhangMS 0:f7f1f0d76dd6 275 attach_set_source(attach, link->source);
XinZhangMS 0:f7f1f0d76dd6 276 attach_set_target(attach, link->target);
XinZhangMS 0:f7f1f0d76dd6 277 if (link->attach_properties != NULL)
XinZhangMS 0:f7f1f0d76dd6 278 {
XinZhangMS 0:f7f1f0d76dd6 279 (void)attach_set_properties(attach, link->attach_properties);
XinZhangMS 0:f7f1f0d76dd6 280 }
XinZhangMS 0:f7f1f0d76dd6 281
XinZhangMS 0:f7f1f0d76dd6 282 if (role == role_sender)
XinZhangMS 0:f7f1f0d76dd6 283 {
XinZhangMS 0:f7f1f0d76dd6 284 if (attach_set_initial_delivery_count(attach, link->delivery_count) != 0)
XinZhangMS 0:f7f1f0d76dd6 285 {
XinZhangMS 0:f7f1f0d76dd6 286 LogError("Cannot set attach initial delivery count");
XinZhangMS 0:f7f1f0d76dd6 287 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 288 }
XinZhangMS 0:f7f1f0d76dd6 289 }
XinZhangMS 0:f7f1f0d76dd6 290
XinZhangMS 0:f7f1f0d76dd6 291 if (result == 0)
XinZhangMS 0:f7f1f0d76dd6 292 {
XinZhangMS 0:f7f1f0d76dd6 293 if (attach_set_max_message_size(attach, link->max_message_size) != 0)
XinZhangMS 0:f7f1f0d76dd6 294 {
XinZhangMS 0:f7f1f0d76dd6 295 LogError("Cannot set max message size");
XinZhangMS 0:f7f1f0d76dd6 296 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 297 }
XinZhangMS 0:f7f1f0d76dd6 298 else if (session_send_attach(link->link_endpoint, attach) != 0)
XinZhangMS 0:f7f1f0d76dd6 299 {
XinZhangMS 0:f7f1f0d76dd6 300 LogError("Sending attach failed in session send");
XinZhangMS 0:f7f1f0d76dd6 301 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 302 }
XinZhangMS 0:f7f1f0d76dd6 303 else
XinZhangMS 0:f7f1f0d76dd6 304 {
XinZhangMS 0:f7f1f0d76dd6 305 result = 0;
XinZhangMS 0:f7f1f0d76dd6 306 }
XinZhangMS 0:f7f1f0d76dd6 307 }
XinZhangMS 0:f7f1f0d76dd6 308
XinZhangMS 0:f7f1f0d76dd6 309 attach_destroy(attach);
XinZhangMS 0:f7f1f0d76dd6 310 }
XinZhangMS 0:f7f1f0d76dd6 311
XinZhangMS 0:f7f1f0d76dd6 312 return result;
XinZhangMS 0:f7f1f0d76dd6 313 }
XinZhangMS 0:f7f1f0d76dd6 314
XinZhangMS 0:f7f1f0d76dd6 315 static void link_frame_received(void* context, AMQP_VALUE performative, uint32_t payload_size, const unsigned char* payload_bytes)
XinZhangMS 0:f7f1f0d76dd6 316 {
XinZhangMS 0:f7f1f0d76dd6 317 LINK_INSTANCE* link_instance = (LINK_INSTANCE*)context;
XinZhangMS 0:f7f1f0d76dd6 318 AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(performative);
XinZhangMS 0:f7f1f0d76dd6 319
XinZhangMS 0:f7f1f0d76dd6 320 if (is_attach_type_by_descriptor(descriptor))
XinZhangMS 0:f7f1f0d76dd6 321 {
XinZhangMS 0:f7f1f0d76dd6 322 ATTACH_HANDLE attach_handle;
XinZhangMS 0:f7f1f0d76dd6 323
XinZhangMS 0:f7f1f0d76dd6 324 if (amqpvalue_get_attach(performative, &attach_handle) != 0)
XinZhangMS 0:f7f1f0d76dd6 325 {
XinZhangMS 0:f7f1f0d76dd6 326 LogError("Cannot get attach performative");
XinZhangMS 0:f7f1f0d76dd6 327 }
XinZhangMS 0:f7f1f0d76dd6 328 else
XinZhangMS 0:f7f1f0d76dd6 329 {
XinZhangMS 0:f7f1f0d76dd6 330 if ((link_instance->role == role_receiver) &&
XinZhangMS 0:f7f1f0d76dd6 331 (attach_get_initial_delivery_count(attach_handle, &link_instance->delivery_count) != 0))
XinZhangMS 0:f7f1f0d76dd6 332 {
XinZhangMS 0:f7f1f0d76dd6 333 LogError("Cannot get initial delivery count");
XinZhangMS 0:f7f1f0d76dd6 334 remove_all_pending_deliveries(link_instance, true);
XinZhangMS 0:f7f1f0d76dd6 335 set_link_state(link_instance, LINK_STATE_DETACHED);
XinZhangMS 0:f7f1f0d76dd6 336 }
XinZhangMS 0:f7f1f0d76dd6 337 else
XinZhangMS 0:f7f1f0d76dd6 338 {
XinZhangMS 0:f7f1f0d76dd6 339 if (attach_get_max_message_size(attach_handle, &link_instance->peer_max_message_size) != 0)
XinZhangMS 0:f7f1f0d76dd6 340 {
XinZhangMS 0:f7f1f0d76dd6 341 LogError("Could not retrieve peer_max_message_size from attach frame");
XinZhangMS 0:f7f1f0d76dd6 342 }
XinZhangMS 0:f7f1f0d76dd6 343
XinZhangMS 0:f7f1f0d76dd6 344 if ((link_instance->link_state == LINK_STATE_DETACHED) ||
XinZhangMS 0:f7f1f0d76dd6 345 (link_instance->link_state == LINK_STATE_HALF_ATTACHED_ATTACH_SENT))
XinZhangMS 0:f7f1f0d76dd6 346 {
XinZhangMS 0:f7f1f0d76dd6 347 if (link_instance->role == role_receiver)
XinZhangMS 0:f7f1f0d76dd6 348 {
XinZhangMS 0:f7f1f0d76dd6 349 link_instance->current_link_credit = link_instance->max_link_credit;
XinZhangMS 0:f7f1f0d76dd6 350 send_flow(link_instance);
XinZhangMS 0:f7f1f0d76dd6 351 }
XinZhangMS 0:f7f1f0d76dd6 352 else
XinZhangMS 0:f7f1f0d76dd6 353 {
XinZhangMS 0:f7f1f0d76dd6 354 link_instance->current_link_credit = 0;
XinZhangMS 0:f7f1f0d76dd6 355 }
XinZhangMS 0:f7f1f0d76dd6 356
XinZhangMS 0:f7f1f0d76dd6 357 if (link_instance->link_state == LINK_STATE_DETACHED)
XinZhangMS 0:f7f1f0d76dd6 358 {
XinZhangMS 0:f7f1f0d76dd6 359 set_link_state(link_instance, LINK_STATE_HALF_ATTACHED_ATTACH_RECEIVED);
XinZhangMS 0:f7f1f0d76dd6 360 }
XinZhangMS 0:f7f1f0d76dd6 361 else
XinZhangMS 0:f7f1f0d76dd6 362 {
XinZhangMS 0:f7f1f0d76dd6 363 set_link_state(link_instance, LINK_STATE_ATTACHED);
XinZhangMS 0:f7f1f0d76dd6 364 }
XinZhangMS 0:f7f1f0d76dd6 365 }
XinZhangMS 0:f7f1f0d76dd6 366 }
XinZhangMS 0:f7f1f0d76dd6 367
XinZhangMS 0:f7f1f0d76dd6 368 attach_destroy(attach_handle);
XinZhangMS 0:f7f1f0d76dd6 369 }
XinZhangMS 0:f7f1f0d76dd6 370 }
XinZhangMS 0:f7f1f0d76dd6 371 else if (is_flow_type_by_descriptor(descriptor))
XinZhangMS 0:f7f1f0d76dd6 372 {
XinZhangMS 0:f7f1f0d76dd6 373 FLOW_HANDLE flow_handle;
XinZhangMS 0:f7f1f0d76dd6 374 if (amqpvalue_get_flow(performative, &flow_handle) != 0)
XinZhangMS 0:f7f1f0d76dd6 375 {
XinZhangMS 0:f7f1f0d76dd6 376 LogError("Cannot get flow performative");
XinZhangMS 0:f7f1f0d76dd6 377 }
XinZhangMS 0:f7f1f0d76dd6 378 else
XinZhangMS 0:f7f1f0d76dd6 379 {
XinZhangMS 0:f7f1f0d76dd6 380 if (link_instance->role == role_sender)
XinZhangMS 0:f7f1f0d76dd6 381 {
XinZhangMS 0:f7f1f0d76dd6 382 delivery_number rcv_delivery_count;
XinZhangMS 0:f7f1f0d76dd6 383 uint32_t rcv_link_credit;
XinZhangMS 0:f7f1f0d76dd6 384
XinZhangMS 0:f7f1f0d76dd6 385 if (flow_get_link_credit(flow_handle, &rcv_link_credit) != 0)
XinZhangMS 0:f7f1f0d76dd6 386 {
XinZhangMS 0:f7f1f0d76dd6 387 LogError("Cannot get link credit");
XinZhangMS 0:f7f1f0d76dd6 388 remove_all_pending_deliveries(link_instance, true);
XinZhangMS 0:f7f1f0d76dd6 389 set_link_state(link_instance, LINK_STATE_DETACHED);
XinZhangMS 0:f7f1f0d76dd6 390 }
XinZhangMS 0:f7f1f0d76dd6 391 else if (flow_get_delivery_count(flow_handle, &rcv_delivery_count) != 0)
XinZhangMS 0:f7f1f0d76dd6 392 {
XinZhangMS 0:f7f1f0d76dd6 393 LogError("Cannot get delivery count");
XinZhangMS 0:f7f1f0d76dd6 394 remove_all_pending_deliveries(link_instance, true);
XinZhangMS 0:f7f1f0d76dd6 395 set_link_state(link_instance, LINK_STATE_DETACHED);
XinZhangMS 0:f7f1f0d76dd6 396 }
XinZhangMS 0:f7f1f0d76dd6 397 else
XinZhangMS 0:f7f1f0d76dd6 398 {
XinZhangMS 0:f7f1f0d76dd6 399 link_instance->current_link_credit = rcv_delivery_count + rcv_link_credit - link_instance->delivery_count;
XinZhangMS 0:f7f1f0d76dd6 400 if (link_instance->current_link_credit > 0)
XinZhangMS 0:f7f1f0d76dd6 401 {
XinZhangMS 0:f7f1f0d76dd6 402 link_instance->on_link_flow_on(link_instance->callback_context);
XinZhangMS 0:f7f1f0d76dd6 403 }
XinZhangMS 0:f7f1f0d76dd6 404 }
XinZhangMS 0:f7f1f0d76dd6 405 }
XinZhangMS 0:f7f1f0d76dd6 406 }
XinZhangMS 0:f7f1f0d76dd6 407
XinZhangMS 0:f7f1f0d76dd6 408 flow_destroy(flow_handle);
XinZhangMS 0:f7f1f0d76dd6 409 }
XinZhangMS 0:f7f1f0d76dd6 410 else if (is_transfer_type_by_descriptor(descriptor))
XinZhangMS 0:f7f1f0d76dd6 411 {
XinZhangMS 0:f7f1f0d76dd6 412 if (link_instance->on_transfer_received != NULL)
XinZhangMS 0:f7f1f0d76dd6 413 {
XinZhangMS 0:f7f1f0d76dd6 414 TRANSFER_HANDLE transfer_handle;
XinZhangMS 0:f7f1f0d76dd6 415 if (amqpvalue_get_transfer(performative, &transfer_handle) != 0)
XinZhangMS 0:f7f1f0d76dd6 416 {
XinZhangMS 0:f7f1f0d76dd6 417 LogError("Cannot get transfer performative");
XinZhangMS 0:f7f1f0d76dd6 418 }
XinZhangMS 0:f7f1f0d76dd6 419 else
XinZhangMS 0:f7f1f0d76dd6 420 {
XinZhangMS 0:f7f1f0d76dd6 421 AMQP_VALUE delivery_state;
XinZhangMS 0:f7f1f0d76dd6 422 bool more;
XinZhangMS 0:f7f1f0d76dd6 423 bool is_error;
XinZhangMS 0:f7f1f0d76dd6 424
XinZhangMS 0:f7f1f0d76dd6 425 link_instance->current_link_credit--;
XinZhangMS 0:f7f1f0d76dd6 426 link_instance->delivery_count++;
XinZhangMS 0:f7f1f0d76dd6 427 if (link_instance->current_link_credit == 0)
XinZhangMS 0:f7f1f0d76dd6 428 {
XinZhangMS 0:f7f1f0d76dd6 429 link_instance->current_link_credit = link_instance->max_link_credit;
XinZhangMS 0:f7f1f0d76dd6 430 send_flow(link_instance);
XinZhangMS 0:f7f1f0d76dd6 431 }
XinZhangMS 0:f7f1f0d76dd6 432
XinZhangMS 0:f7f1f0d76dd6 433 more = false;
XinZhangMS 0:f7f1f0d76dd6 434 /* Attempt to get more flag, default to false */
XinZhangMS 0:f7f1f0d76dd6 435 (void)transfer_get_more(transfer_handle, &more);
XinZhangMS 0:f7f1f0d76dd6 436 is_error = false;
XinZhangMS 0:f7f1f0d76dd6 437
XinZhangMS 0:f7f1f0d76dd6 438 if (transfer_get_delivery_id(transfer_handle, &link_instance->received_delivery_id) != 0)
XinZhangMS 0:f7f1f0d76dd6 439 {
XinZhangMS 0:f7f1f0d76dd6 440 /* is this not a continuation transfer? */
XinZhangMS 0:f7f1f0d76dd6 441 if (link_instance->received_payload_size == 0)
XinZhangMS 0:f7f1f0d76dd6 442 {
XinZhangMS 0:f7f1f0d76dd6 443 LogError("Could not get the delivery Id from the transfer performative");
XinZhangMS 0:f7f1f0d76dd6 444 is_error = true;
XinZhangMS 0:f7f1f0d76dd6 445 }
XinZhangMS 0:f7f1f0d76dd6 446 }
XinZhangMS 0:f7f1f0d76dd6 447
XinZhangMS 0:f7f1f0d76dd6 448 if (!is_error)
XinZhangMS 0:f7f1f0d76dd6 449 {
XinZhangMS 0:f7f1f0d76dd6 450 /* If this is a continuation transfer or if this is the first chunk of a multi frame transfer */
XinZhangMS 0:f7f1f0d76dd6 451 if ((link_instance->received_payload_size > 0) || more)
XinZhangMS 0:f7f1f0d76dd6 452 {
XinZhangMS 0:f7f1f0d76dd6 453 unsigned char* new_received_payload = (unsigned char*)realloc(link_instance->received_payload, link_instance->received_payload_size + payload_size);
XinZhangMS 0:f7f1f0d76dd6 454 if (new_received_payload == NULL)
XinZhangMS 0:f7f1f0d76dd6 455 {
XinZhangMS 0:f7f1f0d76dd6 456 LogError("Could not allocate memory for the received payload");
XinZhangMS 0:f7f1f0d76dd6 457 }
XinZhangMS 0:f7f1f0d76dd6 458 else
XinZhangMS 0:f7f1f0d76dd6 459 {
XinZhangMS 0:f7f1f0d76dd6 460 link_instance->received_payload = new_received_payload;
XinZhangMS 0:f7f1f0d76dd6 461 (void)memcpy(link_instance->received_payload + link_instance->received_payload_size, payload_bytes, payload_size);
XinZhangMS 0:f7f1f0d76dd6 462 link_instance->received_payload_size += payload_size;
XinZhangMS 0:f7f1f0d76dd6 463 }
XinZhangMS 0:f7f1f0d76dd6 464 }
XinZhangMS 0:f7f1f0d76dd6 465
XinZhangMS 0:f7f1f0d76dd6 466 if (!more)
XinZhangMS 0:f7f1f0d76dd6 467 {
XinZhangMS 0:f7f1f0d76dd6 468 const unsigned char* indicate_payload_bytes;
XinZhangMS 0:f7f1f0d76dd6 469 uint32_t indicate_payload_size;
XinZhangMS 0:f7f1f0d76dd6 470
XinZhangMS 0:f7f1f0d76dd6 471 /* if no previously stored chunks then simply report the current payload */
XinZhangMS 0:f7f1f0d76dd6 472 if (link_instance->received_payload_size > 0)
XinZhangMS 0:f7f1f0d76dd6 473 {
XinZhangMS 0:f7f1f0d76dd6 474 indicate_payload_size = link_instance->received_payload_size;
XinZhangMS 0:f7f1f0d76dd6 475 indicate_payload_bytes = link_instance->received_payload;
XinZhangMS 0:f7f1f0d76dd6 476 }
XinZhangMS 0:f7f1f0d76dd6 477 else
XinZhangMS 0:f7f1f0d76dd6 478 {
XinZhangMS 0:f7f1f0d76dd6 479 indicate_payload_size = payload_size;
XinZhangMS 0:f7f1f0d76dd6 480 indicate_payload_bytes = payload_bytes;
XinZhangMS 0:f7f1f0d76dd6 481 }
XinZhangMS 0:f7f1f0d76dd6 482
XinZhangMS 0:f7f1f0d76dd6 483 delivery_state = link_instance->on_transfer_received(link_instance->callback_context, transfer_handle, indicate_payload_size, indicate_payload_bytes);
XinZhangMS 0:f7f1f0d76dd6 484
XinZhangMS 0:f7f1f0d76dd6 485 if (link_instance->received_payload_size > 0)
XinZhangMS 0:f7f1f0d76dd6 486 {
XinZhangMS 0:f7f1f0d76dd6 487 free(link_instance->received_payload);
XinZhangMS 0:f7f1f0d76dd6 488 link_instance->received_payload = NULL;
XinZhangMS 0:f7f1f0d76dd6 489 link_instance->received_payload_size = 0;
XinZhangMS 0:f7f1f0d76dd6 490 }
XinZhangMS 0:f7f1f0d76dd6 491
XinZhangMS 0:f7f1f0d76dd6 492 if (delivery_state != NULL)
XinZhangMS 0:f7f1f0d76dd6 493 {
XinZhangMS 0:f7f1f0d76dd6 494 if (send_disposition(link_instance, link_instance->received_delivery_id, delivery_state) != 0)
XinZhangMS 0:f7f1f0d76dd6 495 {
XinZhangMS 0:f7f1f0d76dd6 496 LogError("Cannot send disposition frame");
XinZhangMS 0:f7f1f0d76dd6 497 }
XinZhangMS 0:f7f1f0d76dd6 498
XinZhangMS 0:f7f1f0d76dd6 499 amqpvalue_destroy(delivery_state);
XinZhangMS 0:f7f1f0d76dd6 500 }
XinZhangMS 0:f7f1f0d76dd6 501 }
XinZhangMS 0:f7f1f0d76dd6 502 }
XinZhangMS 0:f7f1f0d76dd6 503
XinZhangMS 0:f7f1f0d76dd6 504 transfer_destroy(transfer_handle);
XinZhangMS 0:f7f1f0d76dd6 505 }
XinZhangMS 0:f7f1f0d76dd6 506 }
XinZhangMS 0:f7f1f0d76dd6 507 }
XinZhangMS 0:f7f1f0d76dd6 508 else if (is_disposition_type_by_descriptor(descriptor))
XinZhangMS 0:f7f1f0d76dd6 509 {
XinZhangMS 0:f7f1f0d76dd6 510 DISPOSITION_HANDLE disposition;
XinZhangMS 0:f7f1f0d76dd6 511 if (amqpvalue_get_disposition(performative, &disposition) != 0)
XinZhangMS 0:f7f1f0d76dd6 512 {
XinZhangMS 0:f7f1f0d76dd6 513 LogError("Cannot get disposition performative");
XinZhangMS 0:f7f1f0d76dd6 514 }
XinZhangMS 0:f7f1f0d76dd6 515 else
XinZhangMS 0:f7f1f0d76dd6 516 {
XinZhangMS 0:f7f1f0d76dd6 517 delivery_number first;
XinZhangMS 0:f7f1f0d76dd6 518 delivery_number last;
XinZhangMS 0:f7f1f0d76dd6 519
XinZhangMS 0:f7f1f0d76dd6 520 if (disposition_get_first(disposition, &first) != 0)
XinZhangMS 0:f7f1f0d76dd6 521 {
XinZhangMS 0:f7f1f0d76dd6 522 LogError("Cannot get first field");
XinZhangMS 0:f7f1f0d76dd6 523 }
XinZhangMS 0:f7f1f0d76dd6 524 else
XinZhangMS 0:f7f1f0d76dd6 525 {
XinZhangMS 0:f7f1f0d76dd6 526 bool settled;
XinZhangMS 0:f7f1f0d76dd6 527
XinZhangMS 0:f7f1f0d76dd6 528 if (disposition_get_last(disposition, &last) != 0)
XinZhangMS 0:f7f1f0d76dd6 529 {
XinZhangMS 0:f7f1f0d76dd6 530 last = first;
XinZhangMS 0:f7f1f0d76dd6 531 }
XinZhangMS 0:f7f1f0d76dd6 532
XinZhangMS 0:f7f1f0d76dd6 533 if (disposition_get_settled(disposition, &settled) != 0)
XinZhangMS 0:f7f1f0d76dd6 534 {
XinZhangMS 0:f7f1f0d76dd6 535 settled = false;
XinZhangMS 0:f7f1f0d76dd6 536 }
XinZhangMS 0:f7f1f0d76dd6 537
XinZhangMS 0:f7f1f0d76dd6 538 if (settled)
XinZhangMS 0:f7f1f0d76dd6 539 {
XinZhangMS 0:f7f1f0d76dd6 540 LIST_ITEM_HANDLE pending_delivery = singlylinkedlist_get_head_item(link_instance->pending_deliveries);
XinZhangMS 0:f7f1f0d76dd6 541 while (pending_delivery != NULL)
XinZhangMS 0:f7f1f0d76dd6 542 {
XinZhangMS 0:f7f1f0d76dd6 543 LIST_ITEM_HANDLE next_pending_delivery = singlylinkedlist_get_next_item(pending_delivery);
XinZhangMS 0:f7f1f0d76dd6 544 ASYNC_OPERATION_HANDLE pending_delivery_operation = (ASYNC_OPERATION_HANDLE)singlylinkedlist_item_get_value(pending_delivery);
XinZhangMS 0:f7f1f0d76dd6 545 if (pending_delivery_operation == NULL)
XinZhangMS 0:f7f1f0d76dd6 546 {
XinZhangMS 0:f7f1f0d76dd6 547 LogError("Cannot obtain pending delivery");
XinZhangMS 0:f7f1f0d76dd6 548 break;
XinZhangMS 0:f7f1f0d76dd6 549 }
XinZhangMS 0:f7f1f0d76dd6 550 else
XinZhangMS 0:f7f1f0d76dd6 551 {
XinZhangMS 0:f7f1f0d76dd6 552 DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)GET_ASYNC_OPERATION_CONTEXT(DELIVERY_INSTANCE, pending_delivery_operation);
XinZhangMS 0:f7f1f0d76dd6 553
XinZhangMS 0:f7f1f0d76dd6 554 if ((delivery_instance->delivery_id >= first) && (delivery_instance->delivery_id <= last))
XinZhangMS 0:f7f1f0d76dd6 555 {
XinZhangMS 0:f7f1f0d76dd6 556 AMQP_VALUE delivery_state;
XinZhangMS 0:f7f1f0d76dd6 557 if (disposition_get_state(disposition, &delivery_state) == 0)
XinZhangMS 0:f7f1f0d76dd6 558 {
XinZhangMS 0:f7f1f0d76dd6 559 delivery_instance->on_delivery_settled(delivery_instance->callback_context, delivery_instance->delivery_id, LINK_DELIVERY_SETTLE_REASON_DISPOSITION_RECEIVED, delivery_state);
XinZhangMS 0:f7f1f0d76dd6 560 async_operation_destroy(pending_delivery_operation);
XinZhangMS 0:f7f1f0d76dd6 561 if (singlylinkedlist_remove(link_instance->pending_deliveries, pending_delivery) != 0)
XinZhangMS 0:f7f1f0d76dd6 562 {
XinZhangMS 0:f7f1f0d76dd6 563 LogError("Cannot remove pending delivery");
XinZhangMS 0:f7f1f0d76dd6 564 break;
XinZhangMS 0:f7f1f0d76dd6 565 }
XinZhangMS 0:f7f1f0d76dd6 566
XinZhangMS 0:f7f1f0d76dd6 567 pending_delivery = next_pending_delivery;
XinZhangMS 0:f7f1f0d76dd6 568 }
XinZhangMS 0:f7f1f0d76dd6 569 }
XinZhangMS 0:f7f1f0d76dd6 570 else
XinZhangMS 0:f7f1f0d76dd6 571 {
XinZhangMS 0:f7f1f0d76dd6 572 pending_delivery = next_pending_delivery;
XinZhangMS 0:f7f1f0d76dd6 573 }
XinZhangMS 0:f7f1f0d76dd6 574 }
XinZhangMS 0:f7f1f0d76dd6 575 }
XinZhangMS 0:f7f1f0d76dd6 576 }
XinZhangMS 0:f7f1f0d76dd6 577 }
XinZhangMS 0:f7f1f0d76dd6 578
XinZhangMS 0:f7f1f0d76dd6 579 disposition_destroy(disposition);
XinZhangMS 0:f7f1f0d76dd6 580 }
XinZhangMS 0:f7f1f0d76dd6 581 }
XinZhangMS 0:f7f1f0d76dd6 582 else if (is_detach_type_by_descriptor(descriptor))
XinZhangMS 0:f7f1f0d76dd6 583 {
XinZhangMS 0:f7f1f0d76dd6 584 DETACH_HANDLE detach;
XinZhangMS 0:f7f1f0d76dd6 585
XinZhangMS 0:f7f1f0d76dd6 586 /* Set link state appropriately based on whether we received detach condition */
XinZhangMS 0:f7f1f0d76dd6 587 if (amqpvalue_get_detach(performative, &detach) != 0)
XinZhangMS 0:f7f1f0d76dd6 588 {
XinZhangMS 0:f7f1f0d76dd6 589 LogError("Cannot get detach performative");
XinZhangMS 0:f7f1f0d76dd6 590 }
XinZhangMS 0:f7f1f0d76dd6 591 else
XinZhangMS 0:f7f1f0d76dd6 592 {
XinZhangMS 0:f7f1f0d76dd6 593 bool closed = false;
XinZhangMS 0:f7f1f0d76dd6 594 ERROR_HANDLE error;
XinZhangMS 0:f7f1f0d76dd6 595
XinZhangMS 0:f7f1f0d76dd6 596 (void)detach_get_closed(detach, &closed);
XinZhangMS 0:f7f1f0d76dd6 597
XinZhangMS 0:f7f1f0d76dd6 598 /* Received a detach while attached */
XinZhangMS 0:f7f1f0d76dd6 599 if (link_instance->link_state == LINK_STATE_ATTACHED)
XinZhangMS 0:f7f1f0d76dd6 600 {
XinZhangMS 0:f7f1f0d76dd6 601 /* Respond with ack */
XinZhangMS 0:f7f1f0d76dd6 602 if (send_detach(link_instance, closed, NULL) != 0)
XinZhangMS 0:f7f1f0d76dd6 603 {
XinZhangMS 0:f7f1f0d76dd6 604 LogError("Failed sending detach frame");
XinZhangMS 0:f7f1f0d76dd6 605 }
XinZhangMS 0:f7f1f0d76dd6 606 }
XinZhangMS 0:f7f1f0d76dd6 607 /* Received a closing detach after we sent a non-closing detach. */
XinZhangMS 0:f7f1f0d76dd6 608 else if (closed &&
XinZhangMS 0:f7f1f0d76dd6 609 ((link_instance->link_state == LINK_STATE_HALF_ATTACHED_ATTACH_SENT) || (link_instance->link_state == LINK_STATE_HALF_ATTACHED_ATTACH_RECEIVED)) &&
XinZhangMS 0:f7f1f0d76dd6 610 !link_instance->is_closed)
XinZhangMS 0:f7f1f0d76dd6 611 {
XinZhangMS 0:f7f1f0d76dd6 612
XinZhangMS 0:f7f1f0d76dd6 613 /* In this case, we MUST signal that we closed by reattaching and then sending a closing detach.*/
XinZhangMS 0:f7f1f0d76dd6 614 if (send_attach(link_instance, link_instance->name, 0, link_instance->role) != 0)
XinZhangMS 0:f7f1f0d76dd6 615 {
XinZhangMS 0:f7f1f0d76dd6 616 LogError("Failed sending attach frame");
XinZhangMS 0:f7f1f0d76dd6 617 }
XinZhangMS 0:f7f1f0d76dd6 618
XinZhangMS 0:f7f1f0d76dd6 619 if (send_detach(link_instance, true, NULL) != 0)
XinZhangMS 0:f7f1f0d76dd6 620 {
XinZhangMS 0:f7f1f0d76dd6 621 LogError("Failed sending detach frame");
XinZhangMS 0:f7f1f0d76dd6 622 }
XinZhangMS 0:f7f1f0d76dd6 623 }
XinZhangMS 0:f7f1f0d76dd6 624
XinZhangMS 0:f7f1f0d76dd6 625 if (detach_get_error(detach, &error) != 0)
XinZhangMS 0:f7f1f0d76dd6 626 {
XinZhangMS 0:f7f1f0d76dd6 627 error = NULL;
XinZhangMS 0:f7f1f0d76dd6 628 }
XinZhangMS 0:f7f1f0d76dd6 629 remove_all_pending_deliveries(link_instance, true);
XinZhangMS 0:f7f1f0d76dd6 630 // signal link detach received in order to handle cases like redirect
XinZhangMS 0:f7f1f0d76dd6 631 if (link_instance->on_link_detach_received_event_subscription.on_link_detach_received != NULL)
XinZhangMS 0:f7f1f0d76dd6 632 {
XinZhangMS 0:f7f1f0d76dd6 633 link_instance->on_link_detach_received_event_subscription.on_link_detach_received(link_instance->on_link_detach_received_event_subscription.context, error);
XinZhangMS 0:f7f1f0d76dd6 634 }
XinZhangMS 0:f7f1f0d76dd6 635
XinZhangMS 0:f7f1f0d76dd6 636 if (error != NULL)
XinZhangMS 0:f7f1f0d76dd6 637 {
XinZhangMS 0:f7f1f0d76dd6 638 set_link_state(link_instance, LINK_STATE_ERROR);
XinZhangMS 0:f7f1f0d76dd6 639 error_destroy(error);
XinZhangMS 0:f7f1f0d76dd6 640 }
XinZhangMS 0:f7f1f0d76dd6 641 else
XinZhangMS 0:f7f1f0d76dd6 642 {
XinZhangMS 0:f7f1f0d76dd6 643 set_link_state(link_instance, LINK_STATE_DETACHED);
XinZhangMS 0:f7f1f0d76dd6 644 }
XinZhangMS 0:f7f1f0d76dd6 645
XinZhangMS 0:f7f1f0d76dd6 646 detach_destroy(detach);
XinZhangMS 0:f7f1f0d76dd6 647 }
XinZhangMS 0:f7f1f0d76dd6 648 }
XinZhangMS 0:f7f1f0d76dd6 649 }
XinZhangMS 0:f7f1f0d76dd6 650
XinZhangMS 0:f7f1f0d76dd6 651 static void on_session_state_changed(void* context, SESSION_STATE new_session_state, SESSION_STATE previous_session_state)
XinZhangMS 0:f7f1f0d76dd6 652 {
XinZhangMS 0:f7f1f0d76dd6 653 LINK_INSTANCE* link_instance = (LINK_INSTANCE*)context;
XinZhangMS 0:f7f1f0d76dd6 654 (void)previous_session_state;
XinZhangMS 0:f7f1f0d76dd6 655
XinZhangMS 0:f7f1f0d76dd6 656 if (new_session_state == SESSION_STATE_MAPPED)
XinZhangMS 0:f7f1f0d76dd6 657 {
XinZhangMS 0:f7f1f0d76dd6 658 if ((link_instance->link_state == LINK_STATE_DETACHED) && (!link_instance->is_closed))
XinZhangMS 0:f7f1f0d76dd6 659 {
XinZhangMS 0:f7f1f0d76dd6 660 if (send_attach(link_instance, link_instance->name, 0, link_instance->role) == 0)
XinZhangMS 0:f7f1f0d76dd6 661 {
XinZhangMS 0:f7f1f0d76dd6 662 set_link_state(link_instance, LINK_STATE_HALF_ATTACHED_ATTACH_SENT);
XinZhangMS 0:f7f1f0d76dd6 663 }
XinZhangMS 0:f7f1f0d76dd6 664 }
XinZhangMS 0:f7f1f0d76dd6 665 }
XinZhangMS 0:f7f1f0d76dd6 666 else if (new_session_state == SESSION_STATE_DISCARDING)
XinZhangMS 0:f7f1f0d76dd6 667 {
XinZhangMS 0:f7f1f0d76dd6 668 remove_all_pending_deliveries(link_instance, true);
XinZhangMS 0:f7f1f0d76dd6 669 set_link_state(link_instance, LINK_STATE_DETACHED);
XinZhangMS 0:f7f1f0d76dd6 670 }
XinZhangMS 0:f7f1f0d76dd6 671 else if (new_session_state == SESSION_STATE_ERROR)
XinZhangMS 0:f7f1f0d76dd6 672 {
XinZhangMS 0:f7f1f0d76dd6 673 remove_all_pending_deliveries(link_instance, true);
XinZhangMS 0:f7f1f0d76dd6 674 set_link_state(link_instance, LINK_STATE_ERROR);
XinZhangMS 0:f7f1f0d76dd6 675 }
XinZhangMS 0:f7f1f0d76dd6 676 }
XinZhangMS 0:f7f1f0d76dd6 677
XinZhangMS 0:f7f1f0d76dd6 678 static void on_session_flow_on(void* context)
XinZhangMS 0:f7f1f0d76dd6 679 {
XinZhangMS 0:f7f1f0d76dd6 680 LINK_INSTANCE* link_instance = (LINK_INSTANCE*)context;
XinZhangMS 0:f7f1f0d76dd6 681 if (link_instance->role == role_sender)
XinZhangMS 0:f7f1f0d76dd6 682 {
XinZhangMS 0:f7f1f0d76dd6 683 link_instance->on_link_flow_on(link_instance->callback_context);
XinZhangMS 0:f7f1f0d76dd6 684 }
XinZhangMS 0:f7f1f0d76dd6 685 }
XinZhangMS 0:f7f1f0d76dd6 686
XinZhangMS 0:f7f1f0d76dd6 687 static void on_send_complete(void* context, IO_SEND_RESULT send_result)
XinZhangMS 0:f7f1f0d76dd6 688 {
XinZhangMS 0:f7f1f0d76dd6 689 LIST_ITEM_HANDLE delivery_instance_list_item = (LIST_ITEM_HANDLE)context;
XinZhangMS 0:f7f1f0d76dd6 690 ASYNC_OPERATION_HANDLE pending_delivery_operation = (ASYNC_OPERATION_HANDLE)singlylinkedlist_item_get_value(delivery_instance_list_item);
XinZhangMS 0:f7f1f0d76dd6 691 DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)GET_ASYNC_OPERATION_CONTEXT(DELIVERY_INSTANCE, pending_delivery_operation);
XinZhangMS 0:f7f1f0d76dd6 692 LINK_HANDLE link = (LINK_HANDLE)delivery_instance->link;
XinZhangMS 0:f7f1f0d76dd6 693 (void)send_result;
XinZhangMS 0:f7f1f0d76dd6 694 if (link->snd_settle_mode == sender_settle_mode_settled)
XinZhangMS 0:f7f1f0d76dd6 695 {
XinZhangMS 0:f7f1f0d76dd6 696 delivery_instance->on_delivery_settled(delivery_instance->callback_context, delivery_instance->delivery_id, LINK_DELIVERY_SETTLE_REASON_SETTLED, NULL);
XinZhangMS 0:f7f1f0d76dd6 697 async_operation_destroy(pending_delivery_operation);
XinZhangMS 0:f7f1f0d76dd6 698 (void)singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item);
XinZhangMS 0:f7f1f0d76dd6 699 }
XinZhangMS 0:f7f1f0d76dd6 700 }
XinZhangMS 0:f7f1f0d76dd6 701
XinZhangMS 0:f7f1f0d76dd6 702 LINK_HANDLE link_create(SESSION_HANDLE session, const char* name, role role, AMQP_VALUE source, AMQP_VALUE target)
XinZhangMS 0:f7f1f0d76dd6 703 {
XinZhangMS 0:f7f1f0d76dd6 704 LINK_INSTANCE* result = (LINK_INSTANCE*)malloc(sizeof(LINK_INSTANCE));
XinZhangMS 0:f7f1f0d76dd6 705 if (result == NULL)
XinZhangMS 0:f7f1f0d76dd6 706 {
XinZhangMS 0:f7f1f0d76dd6 707 LogError("Cannot create link");
XinZhangMS 0:f7f1f0d76dd6 708 }
XinZhangMS 0:f7f1f0d76dd6 709 else
XinZhangMS 0:f7f1f0d76dd6 710 {
XinZhangMS 0:f7f1f0d76dd6 711 result->link_state = LINK_STATE_DETACHED;
XinZhangMS 0:f7f1f0d76dd6 712 result->previous_link_state = LINK_STATE_DETACHED;
XinZhangMS 0:f7f1f0d76dd6 713 result->role = role;
XinZhangMS 0:f7f1f0d76dd6 714 result->source = amqpvalue_clone(source);
XinZhangMS 0:f7f1f0d76dd6 715 result->target = amqpvalue_clone(target);
XinZhangMS 0:f7f1f0d76dd6 716 result->session = session;
XinZhangMS 0:f7f1f0d76dd6 717 result->handle = 0;
XinZhangMS 0:f7f1f0d76dd6 718 result->snd_settle_mode = sender_settle_mode_unsettled;
XinZhangMS 0:f7f1f0d76dd6 719 result->rcv_settle_mode = receiver_settle_mode_first;
XinZhangMS 0:f7f1f0d76dd6 720 result->delivery_count = 0;
XinZhangMS 0:f7f1f0d76dd6 721 result->initial_delivery_count = 0;
XinZhangMS 0:f7f1f0d76dd6 722 result->max_message_size = 0;
XinZhangMS 0:f7f1f0d76dd6 723 result->max_link_credit = DEFAULT_LINK_CREDIT;
XinZhangMS 0:f7f1f0d76dd6 724 result->peer_max_message_size = 0;
XinZhangMS 0:f7f1f0d76dd6 725 result->is_underlying_session_begun = false;
XinZhangMS 0:f7f1f0d76dd6 726 result->is_closed = false;
XinZhangMS 0:f7f1f0d76dd6 727 result->attach_properties = NULL;
XinZhangMS 0:f7f1f0d76dd6 728 result->received_payload = NULL;
XinZhangMS 0:f7f1f0d76dd6 729 result->received_payload_size = 0;
XinZhangMS 0:f7f1f0d76dd6 730 result->received_delivery_id = 0;
XinZhangMS 0:f7f1f0d76dd6 731 result->on_link_detach_received_event_subscription.on_link_detach_received = NULL;
XinZhangMS 0:f7f1f0d76dd6 732 result->on_link_detach_received_event_subscription.context = NULL;
XinZhangMS 0:f7f1f0d76dd6 733
XinZhangMS 0:f7f1f0d76dd6 734 result->tick_counter = tickcounter_create();
XinZhangMS 0:f7f1f0d76dd6 735 if (result->tick_counter == NULL)
XinZhangMS 0:f7f1f0d76dd6 736 {
XinZhangMS 0:f7f1f0d76dd6 737 LogError("Cannot create tick counter for link");
XinZhangMS 0:f7f1f0d76dd6 738 free(result);
XinZhangMS 0:f7f1f0d76dd6 739 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 740 }
XinZhangMS 0:f7f1f0d76dd6 741 else
XinZhangMS 0:f7f1f0d76dd6 742 {
XinZhangMS 0:f7f1f0d76dd6 743 result->pending_deliveries = singlylinkedlist_create();
XinZhangMS 0:f7f1f0d76dd6 744 if (result->pending_deliveries == NULL)
XinZhangMS 0:f7f1f0d76dd6 745 {
XinZhangMS 0:f7f1f0d76dd6 746 LogError("Cannot create pending deliveries list");
XinZhangMS 0:f7f1f0d76dd6 747 tickcounter_destroy(result->tick_counter);
XinZhangMS 0:f7f1f0d76dd6 748 free(result);
XinZhangMS 0:f7f1f0d76dd6 749 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 750 }
XinZhangMS 0:f7f1f0d76dd6 751 else
XinZhangMS 0:f7f1f0d76dd6 752 {
XinZhangMS 0:f7f1f0d76dd6 753 size_t name_length = strlen(name);
XinZhangMS 0:f7f1f0d76dd6 754 result->name = (char*)malloc(name_length + 1);
XinZhangMS 0:f7f1f0d76dd6 755 if (result->name == NULL)
XinZhangMS 0:f7f1f0d76dd6 756 {
XinZhangMS 0:f7f1f0d76dd6 757 LogError("Cannot allocate memory for link name");
XinZhangMS 0:f7f1f0d76dd6 758 tickcounter_destroy(result->tick_counter);
XinZhangMS 0:f7f1f0d76dd6 759 singlylinkedlist_destroy(result->pending_deliveries);
XinZhangMS 0:f7f1f0d76dd6 760 free(result);
XinZhangMS 0:f7f1f0d76dd6 761 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 762 }
XinZhangMS 0:f7f1f0d76dd6 763 else
XinZhangMS 0:f7f1f0d76dd6 764 {
XinZhangMS 0:f7f1f0d76dd6 765 result->on_link_state_changed = NULL;
XinZhangMS 0:f7f1f0d76dd6 766 result->callback_context = NULL;
XinZhangMS 0:f7f1f0d76dd6 767 set_link_state(result, LINK_STATE_DETACHED);
XinZhangMS 0:f7f1f0d76dd6 768
XinZhangMS 0:f7f1f0d76dd6 769 (void)memcpy(result->name, name, name_length + 1);
XinZhangMS 0:f7f1f0d76dd6 770 result->link_endpoint = session_create_link_endpoint(session, name);
XinZhangMS 0:f7f1f0d76dd6 771 if (result->link_endpoint == NULL)
XinZhangMS 0:f7f1f0d76dd6 772 {
XinZhangMS 0:f7f1f0d76dd6 773 LogError("Cannot create link endpoint");
XinZhangMS 0:f7f1f0d76dd6 774 tickcounter_destroy(result->tick_counter);
XinZhangMS 0:f7f1f0d76dd6 775 singlylinkedlist_destroy(result->pending_deliveries);
XinZhangMS 0:f7f1f0d76dd6 776 free(result->name);
XinZhangMS 0:f7f1f0d76dd6 777 free(result);
XinZhangMS 0:f7f1f0d76dd6 778 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 779 }
XinZhangMS 0:f7f1f0d76dd6 780 }
XinZhangMS 0:f7f1f0d76dd6 781 }
XinZhangMS 0:f7f1f0d76dd6 782 }
XinZhangMS 0:f7f1f0d76dd6 783 }
XinZhangMS 0:f7f1f0d76dd6 784
XinZhangMS 0:f7f1f0d76dd6 785 return result;
XinZhangMS 0:f7f1f0d76dd6 786 }
XinZhangMS 0:f7f1f0d76dd6 787
XinZhangMS 0:f7f1f0d76dd6 788 LINK_HANDLE link_create_from_endpoint(SESSION_HANDLE session, LINK_ENDPOINT_HANDLE link_endpoint, const char* name, role role, AMQP_VALUE source, AMQP_VALUE target)
XinZhangMS 0:f7f1f0d76dd6 789 {
XinZhangMS 0:f7f1f0d76dd6 790 LINK_INSTANCE* result = (LINK_INSTANCE*)malloc(sizeof(LINK_INSTANCE));
XinZhangMS 0:f7f1f0d76dd6 791 if (result == NULL)
XinZhangMS 0:f7f1f0d76dd6 792 {
XinZhangMS 0:f7f1f0d76dd6 793 LogError("Cannot create link");
XinZhangMS 0:f7f1f0d76dd6 794 }
XinZhangMS 0:f7f1f0d76dd6 795 else
XinZhangMS 0:f7f1f0d76dd6 796 {
XinZhangMS 0:f7f1f0d76dd6 797 result->link_state = LINK_STATE_DETACHED;
XinZhangMS 0:f7f1f0d76dd6 798 result->previous_link_state = LINK_STATE_DETACHED;
XinZhangMS 0:f7f1f0d76dd6 799 result->session = session;
XinZhangMS 0:f7f1f0d76dd6 800 result->handle = 0;
XinZhangMS 0:f7f1f0d76dd6 801 result->snd_settle_mode = sender_settle_mode_unsettled;
XinZhangMS 0:f7f1f0d76dd6 802 result->rcv_settle_mode = receiver_settle_mode_first;
XinZhangMS 0:f7f1f0d76dd6 803 result->delivery_count = 0;
XinZhangMS 0:f7f1f0d76dd6 804 result->initial_delivery_count = 0;
XinZhangMS 0:f7f1f0d76dd6 805 result->max_message_size = 0;
XinZhangMS 0:f7f1f0d76dd6 806 result->max_link_credit = DEFAULT_LINK_CREDIT;
XinZhangMS 0:f7f1f0d76dd6 807 result->peer_max_message_size = 0;
XinZhangMS 0:f7f1f0d76dd6 808 result->is_underlying_session_begun = false;
XinZhangMS 0:f7f1f0d76dd6 809 result->is_closed = false;
XinZhangMS 0:f7f1f0d76dd6 810 result->attach_properties = NULL;
XinZhangMS 0:f7f1f0d76dd6 811 result->received_payload = NULL;
XinZhangMS 0:f7f1f0d76dd6 812 result->received_payload_size = 0;
XinZhangMS 0:f7f1f0d76dd6 813 result->received_delivery_id = 0;
XinZhangMS 0:f7f1f0d76dd6 814 result->source = amqpvalue_clone(target);
XinZhangMS 0:f7f1f0d76dd6 815 result->target = amqpvalue_clone(source);
XinZhangMS 0:f7f1f0d76dd6 816 result->on_link_detach_received_event_subscription.on_link_detach_received = NULL;
XinZhangMS 0:f7f1f0d76dd6 817 result->on_link_detach_received_event_subscription.context = NULL;
XinZhangMS 0:f7f1f0d76dd6 818
XinZhangMS 0:f7f1f0d76dd6 819 if (role == role_sender)
XinZhangMS 0:f7f1f0d76dd6 820 {
XinZhangMS 0:f7f1f0d76dd6 821 result->role = role_receiver;
XinZhangMS 0:f7f1f0d76dd6 822 }
XinZhangMS 0:f7f1f0d76dd6 823 else
XinZhangMS 0:f7f1f0d76dd6 824 {
XinZhangMS 0:f7f1f0d76dd6 825 result->role = role_sender;
XinZhangMS 0:f7f1f0d76dd6 826 }
XinZhangMS 0:f7f1f0d76dd6 827
XinZhangMS 0:f7f1f0d76dd6 828 result->tick_counter = tickcounter_create();
XinZhangMS 0:f7f1f0d76dd6 829 if (result->tick_counter == NULL)
XinZhangMS 0:f7f1f0d76dd6 830 {
XinZhangMS 0:f7f1f0d76dd6 831 LogError("Cannot create tick counter for link");
XinZhangMS 0:f7f1f0d76dd6 832 free(result);
XinZhangMS 0:f7f1f0d76dd6 833 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 834 }
XinZhangMS 0:f7f1f0d76dd6 835 else
XinZhangMS 0:f7f1f0d76dd6 836 {
XinZhangMS 0:f7f1f0d76dd6 837 result->pending_deliveries = singlylinkedlist_create();
XinZhangMS 0:f7f1f0d76dd6 838 if (result->pending_deliveries == NULL)
XinZhangMS 0:f7f1f0d76dd6 839 {
XinZhangMS 0:f7f1f0d76dd6 840 LogError("Cannot create pending deliveries list");
XinZhangMS 0:f7f1f0d76dd6 841 tickcounter_destroy(result->tick_counter);
XinZhangMS 0:f7f1f0d76dd6 842 free(result);
XinZhangMS 0:f7f1f0d76dd6 843 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 844 }
XinZhangMS 0:f7f1f0d76dd6 845 else
XinZhangMS 0:f7f1f0d76dd6 846 {
XinZhangMS 0:f7f1f0d76dd6 847 size_t name_length = strlen(name);
XinZhangMS 0:f7f1f0d76dd6 848 result->name = (char*)malloc(name_length + 1);
XinZhangMS 0:f7f1f0d76dd6 849 if (result->name == NULL)
XinZhangMS 0:f7f1f0d76dd6 850 {
XinZhangMS 0:f7f1f0d76dd6 851 LogError("Cannot allocate memory for link name");
XinZhangMS 0:f7f1f0d76dd6 852 tickcounter_destroy(result->tick_counter);
XinZhangMS 0:f7f1f0d76dd6 853 singlylinkedlist_destroy(result->pending_deliveries);
XinZhangMS 0:f7f1f0d76dd6 854 free(result);
XinZhangMS 0:f7f1f0d76dd6 855 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 856 }
XinZhangMS 0:f7f1f0d76dd6 857 else
XinZhangMS 0:f7f1f0d76dd6 858 {
XinZhangMS 0:f7f1f0d76dd6 859 (void)memcpy(result->name, name, name_length + 1);
XinZhangMS 0:f7f1f0d76dd6 860 result->on_link_state_changed = NULL;
XinZhangMS 0:f7f1f0d76dd6 861 result->callback_context = NULL;
XinZhangMS 0:f7f1f0d76dd6 862 result->link_endpoint = link_endpoint;
XinZhangMS 0:f7f1f0d76dd6 863 }
XinZhangMS 0:f7f1f0d76dd6 864 }
XinZhangMS 0:f7f1f0d76dd6 865 }
XinZhangMS 0:f7f1f0d76dd6 866 }
XinZhangMS 0:f7f1f0d76dd6 867
XinZhangMS 0:f7f1f0d76dd6 868 return result;
XinZhangMS 0:f7f1f0d76dd6 869 }
XinZhangMS 0:f7f1f0d76dd6 870
XinZhangMS 0:f7f1f0d76dd6 871 void link_destroy(LINK_HANDLE link)
XinZhangMS 0:f7f1f0d76dd6 872 {
XinZhangMS 0:f7f1f0d76dd6 873 if (link == NULL)
XinZhangMS 0:f7f1f0d76dd6 874 {
XinZhangMS 0:f7f1f0d76dd6 875 LogError("NULL link");
XinZhangMS 0:f7f1f0d76dd6 876 }
XinZhangMS 0:f7f1f0d76dd6 877 else
XinZhangMS 0:f7f1f0d76dd6 878 {
XinZhangMS 0:f7f1f0d76dd6 879 remove_all_pending_deliveries((LINK_INSTANCE*)link, false);
XinZhangMS 0:f7f1f0d76dd6 880 tickcounter_destroy(link->tick_counter);
XinZhangMS 0:f7f1f0d76dd6 881
XinZhangMS 0:f7f1f0d76dd6 882 link->on_link_state_changed = NULL;
XinZhangMS 0:f7f1f0d76dd6 883 (void)link_detach(link, true, NULL, NULL, NULL);
XinZhangMS 0:f7f1f0d76dd6 884 session_destroy_link_endpoint(link->link_endpoint);
XinZhangMS 0:f7f1f0d76dd6 885 amqpvalue_destroy(link->source);
XinZhangMS 0:f7f1f0d76dd6 886 amqpvalue_destroy(link->target);
XinZhangMS 0:f7f1f0d76dd6 887
XinZhangMS 0:f7f1f0d76dd6 888 if (link->name != NULL)
XinZhangMS 0:f7f1f0d76dd6 889 {
XinZhangMS 0:f7f1f0d76dd6 890 free(link->name);
XinZhangMS 0:f7f1f0d76dd6 891 }
XinZhangMS 0:f7f1f0d76dd6 892
XinZhangMS 0:f7f1f0d76dd6 893 if (link->attach_properties != NULL)
XinZhangMS 0:f7f1f0d76dd6 894 {
XinZhangMS 0:f7f1f0d76dd6 895 amqpvalue_destroy(link->attach_properties);
XinZhangMS 0:f7f1f0d76dd6 896 }
XinZhangMS 0:f7f1f0d76dd6 897
XinZhangMS 0:f7f1f0d76dd6 898 if (link->received_payload != NULL)
XinZhangMS 0:f7f1f0d76dd6 899 {
XinZhangMS 0:f7f1f0d76dd6 900 free(link->received_payload);
XinZhangMS 0:f7f1f0d76dd6 901 }
XinZhangMS 0:f7f1f0d76dd6 902
XinZhangMS 0:f7f1f0d76dd6 903 free(link);
XinZhangMS 0:f7f1f0d76dd6 904 }
XinZhangMS 0:f7f1f0d76dd6 905 }
XinZhangMS 0:f7f1f0d76dd6 906
XinZhangMS 0:f7f1f0d76dd6 907 int link_set_snd_settle_mode(LINK_HANDLE link, sender_settle_mode snd_settle_mode)
XinZhangMS 0:f7f1f0d76dd6 908 {
XinZhangMS 0:f7f1f0d76dd6 909 int result;
XinZhangMS 0:f7f1f0d76dd6 910
XinZhangMS 0:f7f1f0d76dd6 911 if (link == NULL)
XinZhangMS 0:f7f1f0d76dd6 912 {
XinZhangMS 0:f7f1f0d76dd6 913 LogError("NULL link");
XinZhangMS 0:f7f1f0d76dd6 914 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 915 }
XinZhangMS 0:f7f1f0d76dd6 916 else
XinZhangMS 0:f7f1f0d76dd6 917 {
XinZhangMS 0:f7f1f0d76dd6 918 link->snd_settle_mode = snd_settle_mode;
XinZhangMS 0:f7f1f0d76dd6 919 result = 0;
XinZhangMS 0:f7f1f0d76dd6 920 }
XinZhangMS 0:f7f1f0d76dd6 921
XinZhangMS 0:f7f1f0d76dd6 922 return result;
XinZhangMS 0:f7f1f0d76dd6 923 }
XinZhangMS 0:f7f1f0d76dd6 924
XinZhangMS 0:f7f1f0d76dd6 925 int link_get_snd_settle_mode(LINK_HANDLE link, sender_settle_mode* snd_settle_mode)
XinZhangMS 0:f7f1f0d76dd6 926 {
XinZhangMS 0:f7f1f0d76dd6 927 int result;
XinZhangMS 0:f7f1f0d76dd6 928
XinZhangMS 0:f7f1f0d76dd6 929 if ((link == NULL) ||
XinZhangMS 0:f7f1f0d76dd6 930 (snd_settle_mode == NULL))
XinZhangMS 0:f7f1f0d76dd6 931 {
XinZhangMS 0:f7f1f0d76dd6 932 LogError("Bad arguments: link = %p, snd_settle_mode = %p",
XinZhangMS 0:f7f1f0d76dd6 933 link, snd_settle_mode);
XinZhangMS 0:f7f1f0d76dd6 934 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 935 }
XinZhangMS 0:f7f1f0d76dd6 936 else
XinZhangMS 0:f7f1f0d76dd6 937 {
XinZhangMS 0:f7f1f0d76dd6 938 *snd_settle_mode = link->snd_settle_mode;
XinZhangMS 0:f7f1f0d76dd6 939
XinZhangMS 0:f7f1f0d76dd6 940 result = 0;
XinZhangMS 0:f7f1f0d76dd6 941 }
XinZhangMS 0:f7f1f0d76dd6 942
XinZhangMS 0:f7f1f0d76dd6 943 return result;
XinZhangMS 0:f7f1f0d76dd6 944 }
XinZhangMS 0:f7f1f0d76dd6 945
XinZhangMS 0:f7f1f0d76dd6 946 int link_set_rcv_settle_mode(LINK_HANDLE link, receiver_settle_mode rcv_settle_mode)
XinZhangMS 0:f7f1f0d76dd6 947 {
XinZhangMS 0:f7f1f0d76dd6 948 int result;
XinZhangMS 0:f7f1f0d76dd6 949
XinZhangMS 0:f7f1f0d76dd6 950 if (link == NULL)
XinZhangMS 0:f7f1f0d76dd6 951 {
XinZhangMS 0:f7f1f0d76dd6 952 LogError("NULL link");
XinZhangMS 0:f7f1f0d76dd6 953 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 954 }
XinZhangMS 0:f7f1f0d76dd6 955 else
XinZhangMS 0:f7f1f0d76dd6 956 {
XinZhangMS 0:f7f1f0d76dd6 957 link->rcv_settle_mode = rcv_settle_mode;
XinZhangMS 0:f7f1f0d76dd6 958 result = 0;
XinZhangMS 0:f7f1f0d76dd6 959 }
XinZhangMS 0:f7f1f0d76dd6 960
XinZhangMS 0:f7f1f0d76dd6 961 return result;
XinZhangMS 0:f7f1f0d76dd6 962 }
XinZhangMS 0:f7f1f0d76dd6 963
XinZhangMS 0:f7f1f0d76dd6 964 int link_get_rcv_settle_mode(LINK_HANDLE link, receiver_settle_mode* rcv_settle_mode)
XinZhangMS 0:f7f1f0d76dd6 965 {
XinZhangMS 0:f7f1f0d76dd6 966 int result;
XinZhangMS 0:f7f1f0d76dd6 967
XinZhangMS 0:f7f1f0d76dd6 968 if ((link == NULL) ||
XinZhangMS 0:f7f1f0d76dd6 969 (rcv_settle_mode == NULL))
XinZhangMS 0:f7f1f0d76dd6 970 {
XinZhangMS 0:f7f1f0d76dd6 971 LogError("Bad arguments: link = %p, rcv_settle_mode = %p",
XinZhangMS 0:f7f1f0d76dd6 972 link, rcv_settle_mode);
XinZhangMS 0:f7f1f0d76dd6 973 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 974 }
XinZhangMS 0:f7f1f0d76dd6 975 else
XinZhangMS 0:f7f1f0d76dd6 976 {
XinZhangMS 0:f7f1f0d76dd6 977 *rcv_settle_mode = link->rcv_settle_mode;
XinZhangMS 0:f7f1f0d76dd6 978 result = 0;
XinZhangMS 0:f7f1f0d76dd6 979 }
XinZhangMS 0:f7f1f0d76dd6 980
XinZhangMS 0:f7f1f0d76dd6 981 return result;
XinZhangMS 0:f7f1f0d76dd6 982 }
XinZhangMS 0:f7f1f0d76dd6 983
XinZhangMS 0:f7f1f0d76dd6 984 int link_set_initial_delivery_count(LINK_HANDLE link, sequence_no initial_delivery_count)
XinZhangMS 0:f7f1f0d76dd6 985 {
XinZhangMS 0:f7f1f0d76dd6 986 int result;
XinZhangMS 0:f7f1f0d76dd6 987
XinZhangMS 0:f7f1f0d76dd6 988 if (link == NULL)
XinZhangMS 0:f7f1f0d76dd6 989 {
XinZhangMS 0:f7f1f0d76dd6 990 LogError("NULL link");
XinZhangMS 0:f7f1f0d76dd6 991 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 992 }
XinZhangMS 0:f7f1f0d76dd6 993 else
XinZhangMS 0:f7f1f0d76dd6 994 {
XinZhangMS 0:f7f1f0d76dd6 995 link->initial_delivery_count = initial_delivery_count;
XinZhangMS 0:f7f1f0d76dd6 996 result = 0;
XinZhangMS 0:f7f1f0d76dd6 997 }
XinZhangMS 0:f7f1f0d76dd6 998
XinZhangMS 0:f7f1f0d76dd6 999 return result;
XinZhangMS 0:f7f1f0d76dd6 1000 }
XinZhangMS 0:f7f1f0d76dd6 1001
XinZhangMS 0:f7f1f0d76dd6 1002 int link_get_initial_delivery_count(LINK_HANDLE link, sequence_no* initial_delivery_count)
XinZhangMS 0:f7f1f0d76dd6 1003 {
XinZhangMS 0:f7f1f0d76dd6 1004 int result;
XinZhangMS 0:f7f1f0d76dd6 1005
XinZhangMS 0:f7f1f0d76dd6 1006 if ((link == NULL) ||
XinZhangMS 0:f7f1f0d76dd6 1007 (initial_delivery_count == NULL))
XinZhangMS 0:f7f1f0d76dd6 1008 {
XinZhangMS 0:f7f1f0d76dd6 1009 LogError("Bad arguments: link = %p, initial_delivery_count = %p",
XinZhangMS 0:f7f1f0d76dd6 1010 link, initial_delivery_count);
XinZhangMS 0:f7f1f0d76dd6 1011 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1012 }
XinZhangMS 0:f7f1f0d76dd6 1013 else
XinZhangMS 0:f7f1f0d76dd6 1014 {
XinZhangMS 0:f7f1f0d76dd6 1015 *initial_delivery_count = link->initial_delivery_count;
XinZhangMS 0:f7f1f0d76dd6 1016 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1017 }
XinZhangMS 0:f7f1f0d76dd6 1018
XinZhangMS 0:f7f1f0d76dd6 1019 return result;
XinZhangMS 0:f7f1f0d76dd6 1020 }
XinZhangMS 0:f7f1f0d76dd6 1021
XinZhangMS 0:f7f1f0d76dd6 1022 int link_set_max_message_size(LINK_HANDLE link, uint64_t max_message_size)
XinZhangMS 0:f7f1f0d76dd6 1023 {
XinZhangMS 0:f7f1f0d76dd6 1024 int result;
XinZhangMS 0:f7f1f0d76dd6 1025
XinZhangMS 0:f7f1f0d76dd6 1026 if (link == NULL)
XinZhangMS 0:f7f1f0d76dd6 1027 {
XinZhangMS 0:f7f1f0d76dd6 1028 LogError("NULL link");
XinZhangMS 0:f7f1f0d76dd6 1029 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1030 }
XinZhangMS 0:f7f1f0d76dd6 1031 else
XinZhangMS 0:f7f1f0d76dd6 1032 {
XinZhangMS 0:f7f1f0d76dd6 1033 link->max_message_size = max_message_size;
XinZhangMS 0:f7f1f0d76dd6 1034 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1035 }
XinZhangMS 0:f7f1f0d76dd6 1036
XinZhangMS 0:f7f1f0d76dd6 1037 return result;
XinZhangMS 0:f7f1f0d76dd6 1038 }
XinZhangMS 0:f7f1f0d76dd6 1039
XinZhangMS 0:f7f1f0d76dd6 1040 int link_get_max_message_size(LINK_HANDLE link, uint64_t* max_message_size)
XinZhangMS 0:f7f1f0d76dd6 1041 {
XinZhangMS 0:f7f1f0d76dd6 1042 int result;
XinZhangMS 0:f7f1f0d76dd6 1043
XinZhangMS 0:f7f1f0d76dd6 1044 if ((link == NULL) ||
XinZhangMS 0:f7f1f0d76dd6 1045 (max_message_size == NULL))
XinZhangMS 0:f7f1f0d76dd6 1046 {
XinZhangMS 0:f7f1f0d76dd6 1047 LogError("Bad arguments: link = %p, max_message_size = %p",
XinZhangMS 0:f7f1f0d76dd6 1048 link, max_message_size);
XinZhangMS 0:f7f1f0d76dd6 1049 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1050 }
XinZhangMS 0:f7f1f0d76dd6 1051 else
XinZhangMS 0:f7f1f0d76dd6 1052 {
XinZhangMS 0:f7f1f0d76dd6 1053 *max_message_size = link->max_message_size;
XinZhangMS 0:f7f1f0d76dd6 1054 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1055 }
XinZhangMS 0:f7f1f0d76dd6 1056
XinZhangMS 0:f7f1f0d76dd6 1057 return result;
XinZhangMS 0:f7f1f0d76dd6 1058 }
XinZhangMS 0:f7f1f0d76dd6 1059
XinZhangMS 0:f7f1f0d76dd6 1060 int link_get_peer_max_message_size(LINK_HANDLE link, uint64_t* peer_max_message_size)
XinZhangMS 0:f7f1f0d76dd6 1061 {
XinZhangMS 0:f7f1f0d76dd6 1062 int result;
XinZhangMS 0:f7f1f0d76dd6 1063
XinZhangMS 0:f7f1f0d76dd6 1064 if ((link == NULL) ||
XinZhangMS 0:f7f1f0d76dd6 1065 (peer_max_message_size == NULL))
XinZhangMS 0:f7f1f0d76dd6 1066 {
XinZhangMS 0:f7f1f0d76dd6 1067 LogError("Bad arguments: link = %p, peer_max_message_size = %p",
XinZhangMS 0:f7f1f0d76dd6 1068 link, peer_max_message_size);
XinZhangMS 0:f7f1f0d76dd6 1069 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1070 }
XinZhangMS 0:f7f1f0d76dd6 1071 else if ((link->link_state != LINK_STATE_ATTACHED) &&
XinZhangMS 0:f7f1f0d76dd6 1072 (link->link_state != LINK_STATE_HALF_ATTACHED_ATTACH_RECEIVED))
XinZhangMS 0:f7f1f0d76dd6 1073 {
XinZhangMS 0:f7f1f0d76dd6 1074 LogError("Attempting to read peer max message size before it was received");
XinZhangMS 0:f7f1f0d76dd6 1075 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1076 }
XinZhangMS 0:f7f1f0d76dd6 1077 else
XinZhangMS 0:f7f1f0d76dd6 1078 {
XinZhangMS 0:f7f1f0d76dd6 1079 *peer_max_message_size = link->peer_max_message_size;
XinZhangMS 0:f7f1f0d76dd6 1080 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1081 }
XinZhangMS 0:f7f1f0d76dd6 1082
XinZhangMS 0:f7f1f0d76dd6 1083 return result;
XinZhangMS 0:f7f1f0d76dd6 1084 }
XinZhangMS 0:f7f1f0d76dd6 1085
XinZhangMS 0:f7f1f0d76dd6 1086 int link_set_attach_properties(LINK_HANDLE link, fields attach_properties)
XinZhangMS 0:f7f1f0d76dd6 1087 {
XinZhangMS 0:f7f1f0d76dd6 1088 int result;
XinZhangMS 0:f7f1f0d76dd6 1089
XinZhangMS 0:f7f1f0d76dd6 1090 if (link == NULL)
XinZhangMS 0:f7f1f0d76dd6 1091 {
XinZhangMS 0:f7f1f0d76dd6 1092 LogError("NULL link");
XinZhangMS 0:f7f1f0d76dd6 1093 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1094 }
XinZhangMS 0:f7f1f0d76dd6 1095 else
XinZhangMS 0:f7f1f0d76dd6 1096 {
XinZhangMS 0:f7f1f0d76dd6 1097 link->attach_properties = amqpvalue_clone(attach_properties);
XinZhangMS 0:f7f1f0d76dd6 1098 if (link->attach_properties == NULL)
XinZhangMS 0:f7f1f0d76dd6 1099 {
XinZhangMS 0:f7f1f0d76dd6 1100 LogError("Failed cloning attach properties");
XinZhangMS 0:f7f1f0d76dd6 1101 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1102 }
XinZhangMS 0:f7f1f0d76dd6 1103 else
XinZhangMS 0:f7f1f0d76dd6 1104 {
XinZhangMS 0:f7f1f0d76dd6 1105 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1106 }
XinZhangMS 0:f7f1f0d76dd6 1107 }
XinZhangMS 0:f7f1f0d76dd6 1108
XinZhangMS 0:f7f1f0d76dd6 1109 return result;
XinZhangMS 0:f7f1f0d76dd6 1110 }
XinZhangMS 0:f7f1f0d76dd6 1111
XinZhangMS 0:f7f1f0d76dd6 1112 int link_set_max_link_credit(LINK_HANDLE link, uint32_t max_link_credit)
XinZhangMS 0:f7f1f0d76dd6 1113 {
XinZhangMS 0:f7f1f0d76dd6 1114 int result;
XinZhangMS 0:f7f1f0d76dd6 1115
XinZhangMS 0:f7f1f0d76dd6 1116 if (link == NULL)
XinZhangMS 0:f7f1f0d76dd6 1117 {
XinZhangMS 0:f7f1f0d76dd6 1118 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1119 }
XinZhangMS 0:f7f1f0d76dd6 1120 else
XinZhangMS 0:f7f1f0d76dd6 1121 {
XinZhangMS 0:f7f1f0d76dd6 1122 link->max_link_credit = max_link_credit;
XinZhangMS 0:f7f1f0d76dd6 1123 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1124 }
XinZhangMS 0:f7f1f0d76dd6 1125
XinZhangMS 0:f7f1f0d76dd6 1126 return result;
XinZhangMS 0:f7f1f0d76dd6 1127 }
XinZhangMS 0:f7f1f0d76dd6 1128
XinZhangMS 0:f7f1f0d76dd6 1129 int link_attach(LINK_HANDLE link, ON_TRANSFER_RECEIVED on_transfer_received, ON_LINK_STATE_CHANGED on_link_state_changed, ON_LINK_FLOW_ON on_link_flow_on, void* callback_context)
XinZhangMS 0:f7f1f0d76dd6 1130 {
XinZhangMS 0:f7f1f0d76dd6 1131 int result;
XinZhangMS 0:f7f1f0d76dd6 1132
XinZhangMS 0:f7f1f0d76dd6 1133 if (link == NULL)
XinZhangMS 0:f7f1f0d76dd6 1134 {
XinZhangMS 0:f7f1f0d76dd6 1135 LogError("NULL link");
XinZhangMS 0:f7f1f0d76dd6 1136 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1137 }
XinZhangMS 0:f7f1f0d76dd6 1138 else if (link->is_closed)
XinZhangMS 0:f7f1f0d76dd6 1139 {
XinZhangMS 0:f7f1f0d76dd6 1140 LogError("Already attached");
XinZhangMS 0:f7f1f0d76dd6 1141 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1142 }
XinZhangMS 0:f7f1f0d76dd6 1143 else
XinZhangMS 0:f7f1f0d76dd6 1144 {
XinZhangMS 0:f7f1f0d76dd6 1145 if (!link->is_underlying_session_begun)
XinZhangMS 0:f7f1f0d76dd6 1146 {
XinZhangMS 0:f7f1f0d76dd6 1147 link->on_link_state_changed = on_link_state_changed;
XinZhangMS 0:f7f1f0d76dd6 1148 link->on_transfer_received = on_transfer_received;
XinZhangMS 0:f7f1f0d76dd6 1149 link->on_link_flow_on = on_link_flow_on;
XinZhangMS 0:f7f1f0d76dd6 1150 link->callback_context = callback_context;
XinZhangMS 0:f7f1f0d76dd6 1151
XinZhangMS 0:f7f1f0d76dd6 1152 if (session_begin(link->session) != 0)
XinZhangMS 0:f7f1f0d76dd6 1153 {
XinZhangMS 0:f7f1f0d76dd6 1154 LogError("Begin session failed");
XinZhangMS 0:f7f1f0d76dd6 1155 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1156 }
XinZhangMS 0:f7f1f0d76dd6 1157 else
XinZhangMS 0:f7f1f0d76dd6 1158 {
XinZhangMS 0:f7f1f0d76dd6 1159 link->is_underlying_session_begun = true;
XinZhangMS 0:f7f1f0d76dd6 1160
XinZhangMS 0:f7f1f0d76dd6 1161 if (session_start_link_endpoint(link->link_endpoint, link_frame_received, on_session_state_changed, on_session_flow_on, link) != 0)
XinZhangMS 0:f7f1f0d76dd6 1162 {
XinZhangMS 0:f7f1f0d76dd6 1163 LogError("Binding link endpoint to session failed");
XinZhangMS 0:f7f1f0d76dd6 1164 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1165 }
XinZhangMS 0:f7f1f0d76dd6 1166 else
XinZhangMS 0:f7f1f0d76dd6 1167 {
XinZhangMS 0:f7f1f0d76dd6 1168 link->received_payload_size = 0;
XinZhangMS 0:f7f1f0d76dd6 1169
XinZhangMS 0:f7f1f0d76dd6 1170 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1171 }
XinZhangMS 0:f7f1f0d76dd6 1172 }
XinZhangMS 0:f7f1f0d76dd6 1173 }
XinZhangMS 0:f7f1f0d76dd6 1174 else
XinZhangMS 0:f7f1f0d76dd6 1175 {
XinZhangMS 0:f7f1f0d76dd6 1176 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1177 }
XinZhangMS 0:f7f1f0d76dd6 1178 }
XinZhangMS 0:f7f1f0d76dd6 1179
XinZhangMS 0:f7f1f0d76dd6 1180 return result;
XinZhangMS 0:f7f1f0d76dd6 1181 }
XinZhangMS 0:f7f1f0d76dd6 1182
XinZhangMS 0:f7f1f0d76dd6 1183 int link_detach(LINK_HANDLE link, bool close, const char* error_condition, const char* error_description, AMQP_VALUE info)
XinZhangMS 0:f7f1f0d76dd6 1184 {
XinZhangMS 0:f7f1f0d76dd6 1185 int result;
XinZhangMS 0:f7f1f0d76dd6 1186
XinZhangMS 0:f7f1f0d76dd6 1187 (void)error_condition;
XinZhangMS 0:f7f1f0d76dd6 1188 (void)error_description;
XinZhangMS 0:f7f1f0d76dd6 1189 (void)info;
XinZhangMS 0:f7f1f0d76dd6 1190
XinZhangMS 0:f7f1f0d76dd6 1191 if (link == NULL)
XinZhangMS 0:f7f1f0d76dd6 1192 {
XinZhangMS 0:f7f1f0d76dd6 1193 LogError("NULL link");
XinZhangMS 0:f7f1f0d76dd6 1194 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1195 }
XinZhangMS 0:f7f1f0d76dd6 1196 else if (link->is_closed)
XinZhangMS 0:f7f1f0d76dd6 1197 {
XinZhangMS 0:f7f1f0d76dd6 1198 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1199 }
XinZhangMS 0:f7f1f0d76dd6 1200 else
XinZhangMS 0:f7f1f0d76dd6 1201 {
XinZhangMS 0:f7f1f0d76dd6 1202 ERROR_HANDLE error;
XinZhangMS 0:f7f1f0d76dd6 1203
XinZhangMS 0:f7f1f0d76dd6 1204 if (error_condition != NULL)
XinZhangMS 0:f7f1f0d76dd6 1205 {
XinZhangMS 0:f7f1f0d76dd6 1206 error = error_create(error_condition);
XinZhangMS 0:f7f1f0d76dd6 1207 if (error == NULL)
XinZhangMS 0:f7f1f0d76dd6 1208 {
XinZhangMS 0:f7f1f0d76dd6 1209 LogInfo("Cannot create error for detach, detaching without error anyhow");
XinZhangMS 0:f7f1f0d76dd6 1210 }
XinZhangMS 0:f7f1f0d76dd6 1211 else
XinZhangMS 0:f7f1f0d76dd6 1212 {
XinZhangMS 0:f7f1f0d76dd6 1213 if (error_description != NULL)
XinZhangMS 0:f7f1f0d76dd6 1214 {
XinZhangMS 0:f7f1f0d76dd6 1215 if (error_set_description(error, error_description) != 0)
XinZhangMS 0:f7f1f0d76dd6 1216 {
XinZhangMS 0:f7f1f0d76dd6 1217 LogInfo("Cannot set error description on detach error, detaching anyhow");
XinZhangMS 0:f7f1f0d76dd6 1218 }
XinZhangMS 0:f7f1f0d76dd6 1219 }
XinZhangMS 0:f7f1f0d76dd6 1220
XinZhangMS 0:f7f1f0d76dd6 1221 if (info != NULL)
XinZhangMS 0:f7f1f0d76dd6 1222 {
XinZhangMS 0:f7f1f0d76dd6 1223 if (error_set_info(error, info) != 0)
XinZhangMS 0:f7f1f0d76dd6 1224 {
XinZhangMS 0:f7f1f0d76dd6 1225 LogInfo("Cannot set info map on detach error, detaching anyhow");
XinZhangMS 0:f7f1f0d76dd6 1226 }
XinZhangMS 0:f7f1f0d76dd6 1227 }
XinZhangMS 0:f7f1f0d76dd6 1228 }
XinZhangMS 0:f7f1f0d76dd6 1229 }
XinZhangMS 0:f7f1f0d76dd6 1230 else
XinZhangMS 0:f7f1f0d76dd6 1231 {
XinZhangMS 0:f7f1f0d76dd6 1232 error = NULL;
XinZhangMS 0:f7f1f0d76dd6 1233 }
XinZhangMS 0:f7f1f0d76dd6 1234
XinZhangMS 0:f7f1f0d76dd6 1235 switch (link->link_state)
XinZhangMS 0:f7f1f0d76dd6 1236 {
XinZhangMS 0:f7f1f0d76dd6 1237 case LINK_STATE_HALF_ATTACHED_ATTACH_SENT:
XinZhangMS 0:f7f1f0d76dd6 1238 case LINK_STATE_HALF_ATTACHED_ATTACH_RECEIVED:
XinZhangMS 0:f7f1f0d76dd6 1239 /* Sending detach when remote is not yet attached */
XinZhangMS 0:f7f1f0d76dd6 1240 if (send_detach(link, close, error) != 0)
XinZhangMS 0:f7f1f0d76dd6 1241 {
XinZhangMS 0:f7f1f0d76dd6 1242 LogError("Sending detach frame failed");
XinZhangMS 0:f7f1f0d76dd6 1243 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1244 }
XinZhangMS 0:f7f1f0d76dd6 1245 else
XinZhangMS 0:f7f1f0d76dd6 1246 {
XinZhangMS 0:f7f1f0d76dd6 1247 set_link_state(link, LINK_STATE_DETACHED);
XinZhangMS 0:f7f1f0d76dd6 1248 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1249 }
XinZhangMS 0:f7f1f0d76dd6 1250 break;
XinZhangMS 0:f7f1f0d76dd6 1251
XinZhangMS 0:f7f1f0d76dd6 1252 case LINK_STATE_ATTACHED:
XinZhangMS 0:f7f1f0d76dd6 1253 /* Send detach and wait for remote to respond */
XinZhangMS 0:f7f1f0d76dd6 1254 if (send_detach(link, close, error) != 0)
XinZhangMS 0:f7f1f0d76dd6 1255 {
XinZhangMS 0:f7f1f0d76dd6 1256 LogError("Sending detach frame failed");
XinZhangMS 0:f7f1f0d76dd6 1257 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1258 }
XinZhangMS 0:f7f1f0d76dd6 1259 else
XinZhangMS 0:f7f1f0d76dd6 1260 {
XinZhangMS 0:f7f1f0d76dd6 1261 set_link_state(link, LINK_STATE_HALF_ATTACHED_ATTACH_SENT);
XinZhangMS 0:f7f1f0d76dd6 1262 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1263 }
XinZhangMS 0:f7f1f0d76dd6 1264 break;
XinZhangMS 0:f7f1f0d76dd6 1265
XinZhangMS 0:f7f1f0d76dd6 1266 case LINK_STATE_DETACHED:
XinZhangMS 0:f7f1f0d76dd6 1267 /* Already detached */
XinZhangMS 0:f7f1f0d76dd6 1268 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1269 break;
XinZhangMS 0:f7f1f0d76dd6 1270
XinZhangMS 0:f7f1f0d76dd6 1271 default:
XinZhangMS 0:f7f1f0d76dd6 1272 case LINK_STATE_ERROR:
XinZhangMS 0:f7f1f0d76dd6 1273 /* Already detached and in error state */
XinZhangMS 0:f7f1f0d76dd6 1274 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1275 break;
XinZhangMS 0:f7f1f0d76dd6 1276 }
XinZhangMS 0:f7f1f0d76dd6 1277
XinZhangMS 0:f7f1f0d76dd6 1278 if (error != NULL)
XinZhangMS 0:f7f1f0d76dd6 1279 {
XinZhangMS 0:f7f1f0d76dd6 1280 error_destroy(error);
XinZhangMS 0:f7f1f0d76dd6 1281 }
XinZhangMS 0:f7f1f0d76dd6 1282 }
XinZhangMS 0:f7f1f0d76dd6 1283
XinZhangMS 0:f7f1f0d76dd6 1284 return result;
XinZhangMS 0:f7f1f0d76dd6 1285 }
XinZhangMS 0:f7f1f0d76dd6 1286
XinZhangMS 0:f7f1f0d76dd6 1287 static bool remove_pending_delivery_condition_function(const void* item, const void* match_context, bool* continue_processing)
XinZhangMS 0:f7f1f0d76dd6 1288 {
XinZhangMS 0:f7f1f0d76dd6 1289 bool result;
XinZhangMS 0:f7f1f0d76dd6 1290
XinZhangMS 0:f7f1f0d76dd6 1291 if (item == match_context)
XinZhangMS 0:f7f1f0d76dd6 1292 {
XinZhangMS 0:f7f1f0d76dd6 1293 result = true;
XinZhangMS 0:f7f1f0d76dd6 1294 *continue_processing = false;
XinZhangMS 0:f7f1f0d76dd6 1295 }
XinZhangMS 0:f7f1f0d76dd6 1296 else
XinZhangMS 0:f7f1f0d76dd6 1297 {
XinZhangMS 0:f7f1f0d76dd6 1298 result = false;
XinZhangMS 0:f7f1f0d76dd6 1299 *continue_processing = true;
XinZhangMS 0:f7f1f0d76dd6 1300 }
XinZhangMS 0:f7f1f0d76dd6 1301
XinZhangMS 0:f7f1f0d76dd6 1302 return result;
XinZhangMS 0:f7f1f0d76dd6 1303 }
XinZhangMS 0:f7f1f0d76dd6 1304
XinZhangMS 0:f7f1f0d76dd6 1305 static void link_transfer_cancel_handler(ASYNC_OPERATION_HANDLE link_transfer_operation)
XinZhangMS 0:f7f1f0d76dd6 1306 {
XinZhangMS 0:f7f1f0d76dd6 1307 DELIVERY_INSTANCE* pending_delivery = GET_ASYNC_OPERATION_CONTEXT(DELIVERY_INSTANCE, link_transfer_operation);
XinZhangMS 0:f7f1f0d76dd6 1308 if (pending_delivery->on_delivery_settled != NULL)
XinZhangMS 0:f7f1f0d76dd6 1309 {
XinZhangMS 0:f7f1f0d76dd6 1310 pending_delivery->on_delivery_settled(pending_delivery->callback_context, pending_delivery->delivery_id, LINK_DELIVERY_SETTLE_REASON_CANCELLED, NULL);
XinZhangMS 0:f7f1f0d76dd6 1311 }
XinZhangMS 0:f7f1f0d76dd6 1312
XinZhangMS 0:f7f1f0d76dd6 1313 (void)singlylinkedlist_remove_if(((LINK_HANDLE)pending_delivery->link)->pending_deliveries, remove_pending_delivery_condition_function, pending_delivery);
XinZhangMS 0:f7f1f0d76dd6 1314
XinZhangMS 0:f7f1f0d76dd6 1315 async_operation_destroy(link_transfer_operation);
XinZhangMS 0:f7f1f0d76dd6 1316 }
XinZhangMS 0:f7f1f0d76dd6 1317
XinZhangMS 0:f7f1f0d76dd6 1318 ASYNC_OPERATION_HANDLE link_transfer_async(LINK_HANDLE link, message_format message_format, PAYLOAD* payloads, size_t payload_count, ON_DELIVERY_SETTLED on_delivery_settled, void* callback_context, LINK_TRANSFER_RESULT* link_transfer_error, tickcounter_ms_t timeout)
XinZhangMS 0:f7f1f0d76dd6 1319 {
XinZhangMS 0:f7f1f0d76dd6 1320 ASYNC_OPERATION_HANDLE result;
XinZhangMS 0:f7f1f0d76dd6 1321
XinZhangMS 0:f7f1f0d76dd6 1322 if ((link == NULL) ||
XinZhangMS 0:f7f1f0d76dd6 1323 (link_transfer_error == NULL))
XinZhangMS 0:f7f1f0d76dd6 1324 {
XinZhangMS 0:f7f1f0d76dd6 1325 if (link_transfer_error != NULL)
XinZhangMS 0:f7f1f0d76dd6 1326 {
XinZhangMS 0:f7f1f0d76dd6 1327 *link_transfer_error = LINK_TRANSFER_ERROR;
XinZhangMS 0:f7f1f0d76dd6 1328 }
XinZhangMS 0:f7f1f0d76dd6 1329
XinZhangMS 0:f7f1f0d76dd6 1330 LogError("Invalid arguments: link = %p, link_transfer_error = %p",
XinZhangMS 0:f7f1f0d76dd6 1331 link, link_transfer_error);
XinZhangMS 0:f7f1f0d76dd6 1332 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 1333 }
XinZhangMS 0:f7f1f0d76dd6 1334 else
XinZhangMS 0:f7f1f0d76dd6 1335 {
XinZhangMS 0:f7f1f0d76dd6 1336 if (link->role != role_sender)
XinZhangMS 0:f7f1f0d76dd6 1337 {
XinZhangMS 0:f7f1f0d76dd6 1338 LogError("Link is not a sender link");
XinZhangMS 0:f7f1f0d76dd6 1339 *link_transfer_error = LINK_TRANSFER_ERROR;
XinZhangMS 0:f7f1f0d76dd6 1340 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 1341 }
XinZhangMS 0:f7f1f0d76dd6 1342 else if (link->link_state != LINK_STATE_ATTACHED)
XinZhangMS 0:f7f1f0d76dd6 1343 {
XinZhangMS 0:f7f1f0d76dd6 1344 LogError("Link is not attached");
XinZhangMS 0:f7f1f0d76dd6 1345 *link_transfer_error = LINK_TRANSFER_ERROR;
XinZhangMS 0:f7f1f0d76dd6 1346 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 1347 }
XinZhangMS 0:f7f1f0d76dd6 1348 else if (link->current_link_credit == 0)
XinZhangMS 0:f7f1f0d76dd6 1349 {
XinZhangMS 0:f7f1f0d76dd6 1350 *link_transfer_error = LINK_TRANSFER_BUSY;
XinZhangMS 0:f7f1f0d76dd6 1351 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 1352 }
XinZhangMS 0:f7f1f0d76dd6 1353 else
XinZhangMS 0:f7f1f0d76dd6 1354 {
XinZhangMS 0:f7f1f0d76dd6 1355 result = CREATE_ASYNC_OPERATION(DELIVERY_INSTANCE, link_transfer_cancel_handler);
XinZhangMS 0:f7f1f0d76dd6 1356 if (result == NULL)
XinZhangMS 0:f7f1f0d76dd6 1357 {
XinZhangMS 0:f7f1f0d76dd6 1358 LogError("Error creating async operation");
XinZhangMS 0:f7f1f0d76dd6 1359 *link_transfer_error = LINK_TRANSFER_ERROR;
XinZhangMS 0:f7f1f0d76dd6 1360 }
XinZhangMS 0:f7f1f0d76dd6 1361 else
XinZhangMS 0:f7f1f0d76dd6 1362 {
XinZhangMS 0:f7f1f0d76dd6 1363 TRANSFER_HANDLE transfer = transfer_create(0);
XinZhangMS 0:f7f1f0d76dd6 1364 if (transfer == NULL)
XinZhangMS 0:f7f1f0d76dd6 1365 {
XinZhangMS 0:f7f1f0d76dd6 1366 LogError("Error creating transfer");
XinZhangMS 0:f7f1f0d76dd6 1367 *link_transfer_error = LINK_TRANSFER_ERROR;
XinZhangMS 0:f7f1f0d76dd6 1368 async_operation_destroy(result);
XinZhangMS 0:f7f1f0d76dd6 1369 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 1370 }
XinZhangMS 0:f7f1f0d76dd6 1371 else
XinZhangMS 0:f7f1f0d76dd6 1372 {
XinZhangMS 0:f7f1f0d76dd6 1373 sequence_no delivery_count = link->delivery_count + 1;
XinZhangMS 0:f7f1f0d76dd6 1374 unsigned char delivery_tag_bytes[sizeof(delivery_count)];
XinZhangMS 0:f7f1f0d76dd6 1375 delivery_tag delivery_tag;
XinZhangMS 0:f7f1f0d76dd6 1376 bool settled;
XinZhangMS 0:f7f1f0d76dd6 1377
XinZhangMS 0:f7f1f0d76dd6 1378 (void)memcpy(delivery_tag_bytes, &delivery_count, sizeof(delivery_count));
XinZhangMS 0:f7f1f0d76dd6 1379
XinZhangMS 0:f7f1f0d76dd6 1380 delivery_tag.bytes = &delivery_tag_bytes;
XinZhangMS 0:f7f1f0d76dd6 1381 delivery_tag.length = sizeof(delivery_tag_bytes);
XinZhangMS 0:f7f1f0d76dd6 1382
XinZhangMS 0:f7f1f0d76dd6 1383 if (link->snd_settle_mode == sender_settle_mode_unsettled)
XinZhangMS 0:f7f1f0d76dd6 1384 {
XinZhangMS 0:f7f1f0d76dd6 1385 settled = false;
XinZhangMS 0:f7f1f0d76dd6 1386 }
XinZhangMS 0:f7f1f0d76dd6 1387 else
XinZhangMS 0:f7f1f0d76dd6 1388 {
XinZhangMS 0:f7f1f0d76dd6 1389 settled = true;
XinZhangMS 0:f7f1f0d76dd6 1390 }
XinZhangMS 0:f7f1f0d76dd6 1391
XinZhangMS 0:f7f1f0d76dd6 1392 if (transfer_set_delivery_tag(transfer, delivery_tag) != 0)
XinZhangMS 0:f7f1f0d76dd6 1393 {
XinZhangMS 0:f7f1f0d76dd6 1394 LogError("Failed setting delivery tag");
XinZhangMS 0:f7f1f0d76dd6 1395 *link_transfer_error = LINK_TRANSFER_ERROR;
XinZhangMS 0:f7f1f0d76dd6 1396 async_operation_destroy(result);
XinZhangMS 0:f7f1f0d76dd6 1397 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 1398 }
XinZhangMS 0:f7f1f0d76dd6 1399 else if (transfer_set_message_format(transfer, message_format) != 0)
XinZhangMS 0:f7f1f0d76dd6 1400 {
XinZhangMS 0:f7f1f0d76dd6 1401 LogError("Failed setting message format");
XinZhangMS 0:f7f1f0d76dd6 1402 *link_transfer_error = LINK_TRANSFER_ERROR;
XinZhangMS 0:f7f1f0d76dd6 1403 async_operation_destroy(result);
XinZhangMS 0:f7f1f0d76dd6 1404 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 1405 }
XinZhangMS 0:f7f1f0d76dd6 1406 else if (transfer_set_settled(transfer, settled) != 0)
XinZhangMS 0:f7f1f0d76dd6 1407 {
XinZhangMS 0:f7f1f0d76dd6 1408 LogError("Failed setting settled flag");
XinZhangMS 0:f7f1f0d76dd6 1409 *link_transfer_error = LINK_TRANSFER_ERROR;
XinZhangMS 0:f7f1f0d76dd6 1410 async_operation_destroy(result);
XinZhangMS 0:f7f1f0d76dd6 1411 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 1412 }
XinZhangMS 0:f7f1f0d76dd6 1413 else
XinZhangMS 0:f7f1f0d76dd6 1414 {
XinZhangMS 0:f7f1f0d76dd6 1415 AMQP_VALUE transfer_value = amqpvalue_create_transfer(transfer);
XinZhangMS 0:f7f1f0d76dd6 1416 if (transfer_value == NULL)
XinZhangMS 0:f7f1f0d76dd6 1417 {
XinZhangMS 0:f7f1f0d76dd6 1418 LogError("Failed creating transfer performative AMQP value");
XinZhangMS 0:f7f1f0d76dd6 1419 *link_transfer_error = LINK_TRANSFER_ERROR;
XinZhangMS 0:f7f1f0d76dd6 1420 async_operation_destroy(result);
XinZhangMS 0:f7f1f0d76dd6 1421 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 1422 }
XinZhangMS 0:f7f1f0d76dd6 1423 else
XinZhangMS 0:f7f1f0d76dd6 1424 {
XinZhangMS 0:f7f1f0d76dd6 1425 DELIVERY_INSTANCE* pending_delivery = GET_ASYNC_OPERATION_CONTEXT(DELIVERY_INSTANCE, result);
XinZhangMS 0:f7f1f0d76dd6 1426 if (pending_delivery == NULL)
XinZhangMS 0:f7f1f0d76dd6 1427 {
XinZhangMS 0:f7f1f0d76dd6 1428 LogError("Failed getting pending delivery");
XinZhangMS 0:f7f1f0d76dd6 1429 *link_transfer_error = LINK_TRANSFER_ERROR;
XinZhangMS 0:f7f1f0d76dd6 1430 async_operation_destroy(result);
XinZhangMS 0:f7f1f0d76dd6 1431 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 1432 }
XinZhangMS 0:f7f1f0d76dd6 1433 else
XinZhangMS 0:f7f1f0d76dd6 1434 {
XinZhangMS 0:f7f1f0d76dd6 1435 if (tickcounter_get_current_ms(link->tick_counter, &pending_delivery->start_tick) != 0)
XinZhangMS 0:f7f1f0d76dd6 1436 {
XinZhangMS 0:f7f1f0d76dd6 1437 LogError("Failed getting current tick");
XinZhangMS 0:f7f1f0d76dd6 1438 *link_transfer_error = LINK_TRANSFER_ERROR;
XinZhangMS 0:f7f1f0d76dd6 1439 async_operation_destroy(result);
XinZhangMS 0:f7f1f0d76dd6 1440 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 1441 }
XinZhangMS 0:f7f1f0d76dd6 1442 else
XinZhangMS 0:f7f1f0d76dd6 1443 {
XinZhangMS 0:f7f1f0d76dd6 1444 LIST_ITEM_HANDLE delivery_instance_list_item;
XinZhangMS 0:f7f1f0d76dd6 1445 pending_delivery->timeout = timeout;
XinZhangMS 0:f7f1f0d76dd6 1446 pending_delivery->on_delivery_settled = on_delivery_settled;
XinZhangMS 0:f7f1f0d76dd6 1447 pending_delivery->callback_context = callback_context;
XinZhangMS 0:f7f1f0d76dd6 1448 pending_delivery->link = link;
XinZhangMS 0:f7f1f0d76dd6 1449 delivery_instance_list_item = singlylinkedlist_add(link->pending_deliveries, result);
XinZhangMS 0:f7f1f0d76dd6 1450
XinZhangMS 0:f7f1f0d76dd6 1451 if (delivery_instance_list_item == NULL)
XinZhangMS 0:f7f1f0d76dd6 1452 {
XinZhangMS 0:f7f1f0d76dd6 1453 LogError("Failed adding delivery to list");
XinZhangMS 0:f7f1f0d76dd6 1454 *link_transfer_error = LINK_TRANSFER_ERROR;
XinZhangMS 0:f7f1f0d76dd6 1455 async_operation_destroy(result);
XinZhangMS 0:f7f1f0d76dd6 1456 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 1457 }
XinZhangMS 0:f7f1f0d76dd6 1458 else
XinZhangMS 0:f7f1f0d76dd6 1459 {
XinZhangMS 0:f7f1f0d76dd6 1460 /* here we should feed data to the transfer frame */
XinZhangMS 0:f7f1f0d76dd6 1461 switch (session_send_transfer(link->link_endpoint, transfer, payloads, payload_count, &pending_delivery->delivery_id, (settled) ? on_send_complete : NULL, delivery_instance_list_item))
XinZhangMS 0:f7f1f0d76dd6 1462 {
XinZhangMS 0:f7f1f0d76dd6 1463 default:
XinZhangMS 0:f7f1f0d76dd6 1464 case SESSION_SEND_TRANSFER_ERROR:
XinZhangMS 0:f7f1f0d76dd6 1465 LogError("Failed session send transfer");
XinZhangMS 0:f7f1f0d76dd6 1466 if (singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item) != 0)
XinZhangMS 0:f7f1f0d76dd6 1467 {
XinZhangMS 0:f7f1f0d76dd6 1468 LogError("Error removing pending delivery from the list");
XinZhangMS 0:f7f1f0d76dd6 1469 }
XinZhangMS 0:f7f1f0d76dd6 1470
XinZhangMS 0:f7f1f0d76dd6 1471 *link_transfer_error = LINK_TRANSFER_ERROR;
XinZhangMS 0:f7f1f0d76dd6 1472 async_operation_destroy(result);
XinZhangMS 0:f7f1f0d76dd6 1473 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 1474 break;
XinZhangMS 0:f7f1f0d76dd6 1475
XinZhangMS 0:f7f1f0d76dd6 1476 case SESSION_SEND_TRANSFER_BUSY:
XinZhangMS 0:f7f1f0d76dd6 1477 /* Ensure we remove from list again since sender will attempt to transfer again on flow on */
XinZhangMS 0:f7f1f0d76dd6 1478 LogError("Failed session send transfer");
XinZhangMS 0:f7f1f0d76dd6 1479 if (singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item) != 0)
XinZhangMS 0:f7f1f0d76dd6 1480 {
XinZhangMS 0:f7f1f0d76dd6 1481 LogError("Error removing pending delivery from the list");
XinZhangMS 0:f7f1f0d76dd6 1482 }
XinZhangMS 0:f7f1f0d76dd6 1483
XinZhangMS 0:f7f1f0d76dd6 1484 *link_transfer_error = LINK_TRANSFER_BUSY;
XinZhangMS 0:f7f1f0d76dd6 1485 async_operation_destroy(result);
XinZhangMS 0:f7f1f0d76dd6 1486 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 1487 break;
XinZhangMS 0:f7f1f0d76dd6 1488
XinZhangMS 0:f7f1f0d76dd6 1489 case SESSION_SEND_TRANSFER_OK:
XinZhangMS 0:f7f1f0d76dd6 1490 link->delivery_count = delivery_count;
XinZhangMS 0:f7f1f0d76dd6 1491 link->current_link_credit--;
XinZhangMS 0:f7f1f0d76dd6 1492 break;
XinZhangMS 0:f7f1f0d76dd6 1493 }
XinZhangMS 0:f7f1f0d76dd6 1494 }
XinZhangMS 0:f7f1f0d76dd6 1495 }
XinZhangMS 0:f7f1f0d76dd6 1496 }
XinZhangMS 0:f7f1f0d76dd6 1497
XinZhangMS 0:f7f1f0d76dd6 1498 amqpvalue_destroy(transfer_value);
XinZhangMS 0:f7f1f0d76dd6 1499 }
XinZhangMS 0:f7f1f0d76dd6 1500 }
XinZhangMS 0:f7f1f0d76dd6 1501
XinZhangMS 0:f7f1f0d76dd6 1502 transfer_destroy(transfer);
XinZhangMS 0:f7f1f0d76dd6 1503 }
XinZhangMS 0:f7f1f0d76dd6 1504 }
XinZhangMS 0:f7f1f0d76dd6 1505 }
XinZhangMS 0:f7f1f0d76dd6 1506 }
XinZhangMS 0:f7f1f0d76dd6 1507
XinZhangMS 0:f7f1f0d76dd6 1508 return result;
XinZhangMS 0:f7f1f0d76dd6 1509 }
XinZhangMS 0:f7f1f0d76dd6 1510
XinZhangMS 0:f7f1f0d76dd6 1511 int link_get_name(LINK_HANDLE link, const char** link_name)
XinZhangMS 0:f7f1f0d76dd6 1512 {
XinZhangMS 0:f7f1f0d76dd6 1513 int result;
XinZhangMS 0:f7f1f0d76dd6 1514
XinZhangMS 0:f7f1f0d76dd6 1515 if (link == NULL)
XinZhangMS 0:f7f1f0d76dd6 1516 {
XinZhangMS 0:f7f1f0d76dd6 1517 LogError("NULL link");
XinZhangMS 0:f7f1f0d76dd6 1518 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1519 }
XinZhangMS 0:f7f1f0d76dd6 1520 else
XinZhangMS 0:f7f1f0d76dd6 1521 {
XinZhangMS 0:f7f1f0d76dd6 1522 *link_name = link->name;
XinZhangMS 0:f7f1f0d76dd6 1523 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1524 }
XinZhangMS 0:f7f1f0d76dd6 1525
XinZhangMS 0:f7f1f0d76dd6 1526 return result;
XinZhangMS 0:f7f1f0d76dd6 1527 }
XinZhangMS 0:f7f1f0d76dd6 1528
XinZhangMS 0:f7f1f0d76dd6 1529 int link_get_received_message_id(LINK_HANDLE link, delivery_number* message_id)
XinZhangMS 0:f7f1f0d76dd6 1530 {
XinZhangMS 0:f7f1f0d76dd6 1531 int result;
XinZhangMS 0:f7f1f0d76dd6 1532
XinZhangMS 0:f7f1f0d76dd6 1533 if (link == NULL)
XinZhangMS 0:f7f1f0d76dd6 1534 {
XinZhangMS 0:f7f1f0d76dd6 1535 LogError("NULL link");
XinZhangMS 0:f7f1f0d76dd6 1536 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1537 }
XinZhangMS 0:f7f1f0d76dd6 1538 else
XinZhangMS 0:f7f1f0d76dd6 1539 {
XinZhangMS 0:f7f1f0d76dd6 1540 *message_id = link->received_delivery_id;
XinZhangMS 0:f7f1f0d76dd6 1541 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1542 }
XinZhangMS 0:f7f1f0d76dd6 1543
XinZhangMS 0:f7f1f0d76dd6 1544 return result;
XinZhangMS 0:f7f1f0d76dd6 1545 }
XinZhangMS 0:f7f1f0d76dd6 1546
XinZhangMS 0:f7f1f0d76dd6 1547 int link_send_disposition(LINK_HANDLE link, delivery_number message_id, AMQP_VALUE delivery_state)
XinZhangMS 0:f7f1f0d76dd6 1548 {
XinZhangMS 0:f7f1f0d76dd6 1549 int result;
XinZhangMS 0:f7f1f0d76dd6 1550
XinZhangMS 0:f7f1f0d76dd6 1551 if (delivery_state == NULL)
XinZhangMS 0:f7f1f0d76dd6 1552 {
XinZhangMS 0:f7f1f0d76dd6 1553 result = 0;
XinZhangMS 0:f7f1f0d76dd6 1554 }
XinZhangMS 0:f7f1f0d76dd6 1555 else
XinZhangMS 0:f7f1f0d76dd6 1556 {
XinZhangMS 0:f7f1f0d76dd6 1557 result = send_disposition(link, message_id, delivery_state);
XinZhangMS 0:f7f1f0d76dd6 1558 if (result != 0)
XinZhangMS 0:f7f1f0d76dd6 1559 {
XinZhangMS 0:f7f1f0d76dd6 1560 LogError("Cannot send disposition frame");
XinZhangMS 0:f7f1f0d76dd6 1561 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 1562 }
XinZhangMS 0:f7f1f0d76dd6 1563 }
XinZhangMS 0:f7f1f0d76dd6 1564
XinZhangMS 0:f7f1f0d76dd6 1565 return result;
XinZhangMS 0:f7f1f0d76dd6 1566 }
XinZhangMS 0:f7f1f0d76dd6 1567
XinZhangMS 0:f7f1f0d76dd6 1568 void link_dowork(LINK_HANDLE link)
XinZhangMS 0:f7f1f0d76dd6 1569 {
XinZhangMS 0:f7f1f0d76dd6 1570 if (link == NULL)
XinZhangMS 0:f7f1f0d76dd6 1571 {
XinZhangMS 0:f7f1f0d76dd6 1572 LogError("NULL link");
XinZhangMS 0:f7f1f0d76dd6 1573 }
XinZhangMS 0:f7f1f0d76dd6 1574 else
XinZhangMS 0:f7f1f0d76dd6 1575 {
XinZhangMS 0:f7f1f0d76dd6 1576 tickcounter_ms_t current_tick;
XinZhangMS 0:f7f1f0d76dd6 1577
XinZhangMS 0:f7f1f0d76dd6 1578 if (tickcounter_get_current_ms(link->tick_counter, &current_tick) != 0)
XinZhangMS 0:f7f1f0d76dd6 1579 {
XinZhangMS 0:f7f1f0d76dd6 1580 LogError("Cannot get tick counter value");
XinZhangMS 0:f7f1f0d76dd6 1581 }
XinZhangMS 0:f7f1f0d76dd6 1582 else
XinZhangMS 0:f7f1f0d76dd6 1583 {
XinZhangMS 0:f7f1f0d76dd6 1584 // go through all and find timed out deliveries
XinZhangMS 0:f7f1f0d76dd6 1585 LIST_ITEM_HANDLE item = singlylinkedlist_get_head_item(link->pending_deliveries);
XinZhangMS 0:f7f1f0d76dd6 1586 while (item != NULL)
XinZhangMS 0:f7f1f0d76dd6 1587 {
XinZhangMS 0:f7f1f0d76dd6 1588 LIST_ITEM_HANDLE next_item = singlylinkedlist_get_next_item(item);
XinZhangMS 0:f7f1f0d76dd6 1589 ASYNC_OPERATION_HANDLE delivery_instance_async_operation = (ASYNC_OPERATION_HANDLE)singlylinkedlist_item_get_value(item);
XinZhangMS 0:f7f1f0d76dd6 1590 DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)GET_ASYNC_OPERATION_CONTEXT(DELIVERY_INSTANCE, delivery_instance_async_operation);
XinZhangMS 0:f7f1f0d76dd6 1591
XinZhangMS 0:f7f1f0d76dd6 1592 if ((delivery_instance->timeout != 0) &&
XinZhangMS 0:f7f1f0d76dd6 1593 (current_tick - delivery_instance->start_tick >= delivery_instance->timeout))
XinZhangMS 0:f7f1f0d76dd6 1594 {
XinZhangMS 0:f7f1f0d76dd6 1595 if (delivery_instance->on_delivery_settled != NULL)
XinZhangMS 0:f7f1f0d76dd6 1596 {
XinZhangMS 0:f7f1f0d76dd6 1597 delivery_instance->on_delivery_settled(delivery_instance->callback_context, delivery_instance->delivery_id, LINK_DELIVERY_SETTLE_REASON_TIMEOUT, NULL);
XinZhangMS 0:f7f1f0d76dd6 1598 }
XinZhangMS 0:f7f1f0d76dd6 1599
XinZhangMS 0:f7f1f0d76dd6 1600 if (singlylinkedlist_remove(link->pending_deliveries, item) != 0)
XinZhangMS 0:f7f1f0d76dd6 1601 {
XinZhangMS 0:f7f1f0d76dd6 1602 LogError("Cannot remove item from list");
XinZhangMS 0:f7f1f0d76dd6 1603 }
XinZhangMS 0:f7f1f0d76dd6 1604
XinZhangMS 0:f7f1f0d76dd6 1605 async_operation_destroy(delivery_instance_async_operation);
XinZhangMS 0:f7f1f0d76dd6 1606 }
XinZhangMS 0:f7f1f0d76dd6 1607
XinZhangMS 0:f7f1f0d76dd6 1608 item = next_item;
XinZhangMS 0:f7f1f0d76dd6 1609 }
XinZhangMS 0:f7f1f0d76dd6 1610 }
XinZhangMS 0:f7f1f0d76dd6 1611 }
XinZhangMS 0:f7f1f0d76dd6 1612 }
XinZhangMS 0:f7f1f0d76dd6 1613
XinZhangMS 0:f7f1f0d76dd6 1614 ON_LINK_DETACH_EVENT_SUBSCRIPTION_HANDLE link_subscribe_on_link_detach_received(LINK_HANDLE link, ON_LINK_DETACH_RECEIVED on_link_detach_received, void* context)
XinZhangMS 0:f7f1f0d76dd6 1615 {
XinZhangMS 0:f7f1f0d76dd6 1616 ON_LINK_DETACH_EVENT_SUBSCRIPTION_HANDLE result;
XinZhangMS 0:f7f1f0d76dd6 1617
XinZhangMS 0:f7f1f0d76dd6 1618 if ((link == NULL) ||
XinZhangMS 0:f7f1f0d76dd6 1619 (on_link_detach_received == NULL))
XinZhangMS 0:f7f1f0d76dd6 1620 {
XinZhangMS 0:f7f1f0d76dd6 1621 LogError("Invalid arguments: link = %p, on_link_detach_received = %p, context = %p",
XinZhangMS 0:f7f1f0d76dd6 1622 link, on_link_detach_received, context);
XinZhangMS 0:f7f1f0d76dd6 1623 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 1624 }
XinZhangMS 0:f7f1f0d76dd6 1625 else
XinZhangMS 0:f7f1f0d76dd6 1626 {
XinZhangMS 0:f7f1f0d76dd6 1627 if (link->on_link_detach_received_event_subscription.on_link_detach_received != NULL)
XinZhangMS 0:f7f1f0d76dd6 1628 {
XinZhangMS 0:f7f1f0d76dd6 1629 LogError("Already subscribed for on_link_detach_received events");
XinZhangMS 0:f7f1f0d76dd6 1630 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 1631 }
XinZhangMS 0:f7f1f0d76dd6 1632 else
XinZhangMS 0:f7f1f0d76dd6 1633 {
XinZhangMS 0:f7f1f0d76dd6 1634 link->on_link_detach_received_event_subscription.on_link_detach_received = on_link_detach_received;
XinZhangMS 0:f7f1f0d76dd6 1635 link->on_link_detach_received_event_subscription.context = context;
XinZhangMS 0:f7f1f0d76dd6 1636
XinZhangMS 0:f7f1f0d76dd6 1637 result = &link->on_link_detach_received_event_subscription;
XinZhangMS 0:f7f1f0d76dd6 1638 }
XinZhangMS 0:f7f1f0d76dd6 1639 }
XinZhangMS 0:f7f1f0d76dd6 1640
XinZhangMS 0:f7f1f0d76dd6 1641 return result;
XinZhangMS 0:f7f1f0d76dd6 1642 }
XinZhangMS 0:f7f1f0d76dd6 1643
XinZhangMS 0:f7f1f0d76dd6 1644 void link_unsubscribe_on_link_detach_received(ON_LINK_DETACH_EVENT_SUBSCRIPTION_HANDLE event_subscription)
XinZhangMS 0:f7f1f0d76dd6 1645 {
XinZhangMS 0:f7f1f0d76dd6 1646 if (event_subscription == NULL)
XinZhangMS 0:f7f1f0d76dd6 1647 {
XinZhangMS 0:f7f1f0d76dd6 1648 LogError("NULL event_subscription");
XinZhangMS 0:f7f1f0d76dd6 1649 }
XinZhangMS 0:f7f1f0d76dd6 1650 else
XinZhangMS 0:f7f1f0d76dd6 1651 {
XinZhangMS 0:f7f1f0d76dd6 1652 event_subscription->on_link_detach_received = NULL;
XinZhangMS 0:f7f1f0d76dd6 1653 event_subscription->context = NULL;
XinZhangMS 0:f7f1f0d76dd6 1654 }
XinZhangMS 0:f7f1f0d76dd6 1655 }