Xin Zhang / azure-iot-c-sdk-f767zi

Dependents:   samplemqtt

Committer:
XinZhangMS
Date:
Thu Aug 23 06:52:14 2018 +0000
Revision:
0:f7f1f0d76dd6
azure-c-sdk for mbed os supporting NUCLEO_F767ZI

Who changed what in which revision?

UserRevisionLine numberNew contents of line
XinZhangMS 0:f7f1f0d76dd6 1 // Copyright (c) Microsoft. All rights reserved.
XinZhangMS 0:f7f1f0d76dd6 2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
XinZhangMS 0:f7f1f0d76dd6 3
XinZhangMS 0:f7f1f0d76dd6 4 #include <stdlib.h>
XinZhangMS 0:f7f1f0d76dd6 5
XinZhangMS 0:f7f1f0d76dd6 6 #include "azure_c_shared_utility/optimize_size.h"
XinZhangMS 0:f7f1f0d76dd6 7 #include "azure_c_shared_utility/gballoc.h"
XinZhangMS 0:f7f1f0d76dd6 8 #include "azure_c_shared_utility/xlogging.h"
XinZhangMS 0:f7f1f0d76dd6 9 #include "azure_uamqp_c/link.h"
XinZhangMS 0:f7f1f0d76dd6 10 #include "azure_uamqp_c/amqp_definitions.h"
XinZhangMS 0:f7f1f0d76dd6 11 #include "azure_uamqp_c/message.h"
XinZhangMS 0:f7f1f0d76dd6 12 #include "azure_uamqp_c/message_receiver.h"
XinZhangMS 0:f7f1f0d76dd6 13 #include "azure_uamqp_c/amqpvalue.h"
XinZhangMS 0:f7f1f0d76dd6 14
XinZhangMS 0:f7f1f0d76dd6 15 typedef struct MESSAGE_RECEIVER_INSTANCE_TAG
XinZhangMS 0:f7f1f0d76dd6 16 {
XinZhangMS 0:f7f1f0d76dd6 17 LINK_HANDLE link;
XinZhangMS 0:f7f1f0d76dd6 18 ON_MESSAGE_RECEIVED on_message_received;
XinZhangMS 0:f7f1f0d76dd6 19 ON_MESSAGE_RECEIVER_STATE_CHANGED on_message_receiver_state_changed;
XinZhangMS 0:f7f1f0d76dd6 20 MESSAGE_RECEIVER_STATE message_receiver_state;
XinZhangMS 0:f7f1f0d76dd6 21 const void* on_message_receiver_state_changed_context;
XinZhangMS 0:f7f1f0d76dd6 22 const void* callback_context;
XinZhangMS 0:f7f1f0d76dd6 23 MESSAGE_HANDLE decoded_message;
XinZhangMS 0:f7f1f0d76dd6 24 bool decode_error;
XinZhangMS 0:f7f1f0d76dd6 25 } MESSAGE_RECEIVER_INSTANCE;
XinZhangMS 0:f7f1f0d76dd6 26
XinZhangMS 0:f7f1f0d76dd6 27 static void set_message_receiver_state(MESSAGE_RECEIVER_INSTANCE* message_receiver, MESSAGE_RECEIVER_STATE new_state)
XinZhangMS 0:f7f1f0d76dd6 28 {
XinZhangMS 0:f7f1f0d76dd6 29 MESSAGE_RECEIVER_STATE previous_state = message_receiver->message_receiver_state;
XinZhangMS 0:f7f1f0d76dd6 30 message_receiver->message_receiver_state = new_state;
XinZhangMS 0:f7f1f0d76dd6 31 if (message_receiver->on_message_receiver_state_changed != NULL)
XinZhangMS 0:f7f1f0d76dd6 32 {
XinZhangMS 0:f7f1f0d76dd6 33 message_receiver->on_message_receiver_state_changed(message_receiver->on_message_receiver_state_changed_context, new_state, previous_state);
XinZhangMS 0:f7f1f0d76dd6 34 }
XinZhangMS 0:f7f1f0d76dd6 35 }
XinZhangMS 0:f7f1f0d76dd6 36
XinZhangMS 0:f7f1f0d76dd6 37 static void decode_message_value_callback(void* context, AMQP_VALUE decoded_value)
XinZhangMS 0:f7f1f0d76dd6 38 {
XinZhangMS 0:f7f1f0d76dd6 39 MESSAGE_RECEIVER_INSTANCE* message_receiver = (MESSAGE_RECEIVER_INSTANCE*)context;
XinZhangMS 0:f7f1f0d76dd6 40 MESSAGE_HANDLE decoded_message = message_receiver->decoded_message;
XinZhangMS 0:f7f1f0d76dd6 41 AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(decoded_value);
XinZhangMS 0:f7f1f0d76dd6 42
XinZhangMS 0:f7f1f0d76dd6 43 if (is_application_properties_type_by_descriptor(descriptor))
XinZhangMS 0:f7f1f0d76dd6 44 {
XinZhangMS 0:f7f1f0d76dd6 45 if (message_set_application_properties(decoded_message, decoded_value) != 0)
XinZhangMS 0:f7f1f0d76dd6 46 {
XinZhangMS 0:f7f1f0d76dd6 47 LogError("Error setting application properties on received message");
XinZhangMS 0:f7f1f0d76dd6 48 message_receiver->decode_error = true;
XinZhangMS 0:f7f1f0d76dd6 49 }
XinZhangMS 0:f7f1f0d76dd6 50 }
XinZhangMS 0:f7f1f0d76dd6 51 else if (is_properties_type_by_descriptor(descriptor))
XinZhangMS 0:f7f1f0d76dd6 52 {
XinZhangMS 0:f7f1f0d76dd6 53 PROPERTIES_HANDLE properties;
XinZhangMS 0:f7f1f0d76dd6 54 if (amqpvalue_get_properties(decoded_value, &properties) != 0)
XinZhangMS 0:f7f1f0d76dd6 55 {
XinZhangMS 0:f7f1f0d76dd6 56 LogError("Error getting message properties");
XinZhangMS 0:f7f1f0d76dd6 57 message_receiver->decode_error = true;
XinZhangMS 0:f7f1f0d76dd6 58 }
XinZhangMS 0:f7f1f0d76dd6 59 else
XinZhangMS 0:f7f1f0d76dd6 60 {
XinZhangMS 0:f7f1f0d76dd6 61 if (message_set_properties(decoded_message, properties) != 0)
XinZhangMS 0:f7f1f0d76dd6 62 {
XinZhangMS 0:f7f1f0d76dd6 63 LogError("Error setting message properties on received message");
XinZhangMS 0:f7f1f0d76dd6 64 message_receiver->decode_error = true;
XinZhangMS 0:f7f1f0d76dd6 65 }
XinZhangMS 0:f7f1f0d76dd6 66
XinZhangMS 0:f7f1f0d76dd6 67 properties_destroy(properties);
XinZhangMS 0:f7f1f0d76dd6 68 }
XinZhangMS 0:f7f1f0d76dd6 69 }
XinZhangMS 0:f7f1f0d76dd6 70 else if (is_delivery_annotations_type_by_descriptor(descriptor))
XinZhangMS 0:f7f1f0d76dd6 71 {
XinZhangMS 0:f7f1f0d76dd6 72 annotations delivery_annotations = amqpvalue_get_inplace_described_value(decoded_value);
XinZhangMS 0:f7f1f0d76dd6 73 if (delivery_annotations == NULL)
XinZhangMS 0:f7f1f0d76dd6 74 {
XinZhangMS 0:f7f1f0d76dd6 75 LogError("Error getting delivery annotations");
XinZhangMS 0:f7f1f0d76dd6 76 message_receiver->decode_error = true;
XinZhangMS 0:f7f1f0d76dd6 77 }
XinZhangMS 0:f7f1f0d76dd6 78 else
XinZhangMS 0:f7f1f0d76dd6 79 {
XinZhangMS 0:f7f1f0d76dd6 80 if (message_set_delivery_annotations(decoded_message, delivery_annotations) != 0)
XinZhangMS 0:f7f1f0d76dd6 81 {
XinZhangMS 0:f7f1f0d76dd6 82 LogError("Error setting delivery annotations on received message");
XinZhangMS 0:f7f1f0d76dd6 83 message_receiver->decode_error = true;
XinZhangMS 0:f7f1f0d76dd6 84 }
XinZhangMS 0:f7f1f0d76dd6 85 }
XinZhangMS 0:f7f1f0d76dd6 86 }
XinZhangMS 0:f7f1f0d76dd6 87 else if (is_message_annotations_type_by_descriptor(descriptor))
XinZhangMS 0:f7f1f0d76dd6 88 {
XinZhangMS 0:f7f1f0d76dd6 89 annotations message_annotations = amqpvalue_get_inplace_described_value(decoded_value);
XinZhangMS 0:f7f1f0d76dd6 90 if (message_annotations == NULL)
XinZhangMS 0:f7f1f0d76dd6 91 {
XinZhangMS 0:f7f1f0d76dd6 92 LogError("Error getting message annotations");
XinZhangMS 0:f7f1f0d76dd6 93 message_receiver->decode_error = true;
XinZhangMS 0:f7f1f0d76dd6 94 }
XinZhangMS 0:f7f1f0d76dd6 95 else
XinZhangMS 0:f7f1f0d76dd6 96 {
XinZhangMS 0:f7f1f0d76dd6 97 if (message_set_message_annotations(decoded_message, message_annotations) != 0)
XinZhangMS 0:f7f1f0d76dd6 98 {
XinZhangMS 0:f7f1f0d76dd6 99 LogError("Error setting message annotations on received message");
XinZhangMS 0:f7f1f0d76dd6 100 message_receiver->decode_error = true;
XinZhangMS 0:f7f1f0d76dd6 101 }
XinZhangMS 0:f7f1f0d76dd6 102 }
XinZhangMS 0:f7f1f0d76dd6 103 }
XinZhangMS 0:f7f1f0d76dd6 104 else if (is_header_type_by_descriptor(descriptor))
XinZhangMS 0:f7f1f0d76dd6 105 {
XinZhangMS 0:f7f1f0d76dd6 106 HEADER_HANDLE header;
XinZhangMS 0:f7f1f0d76dd6 107 if (amqpvalue_get_header(decoded_value, &header) != 0)
XinZhangMS 0:f7f1f0d76dd6 108 {
XinZhangMS 0:f7f1f0d76dd6 109 LogError("Error getting message header");
XinZhangMS 0:f7f1f0d76dd6 110 message_receiver->decode_error = true;
XinZhangMS 0:f7f1f0d76dd6 111 }
XinZhangMS 0:f7f1f0d76dd6 112 else
XinZhangMS 0:f7f1f0d76dd6 113 {
XinZhangMS 0:f7f1f0d76dd6 114 if (message_set_header(decoded_message, header) != 0)
XinZhangMS 0:f7f1f0d76dd6 115 {
XinZhangMS 0:f7f1f0d76dd6 116 LogError("Error setting message header on received message");
XinZhangMS 0:f7f1f0d76dd6 117 message_receiver->decode_error = true;
XinZhangMS 0:f7f1f0d76dd6 118 }
XinZhangMS 0:f7f1f0d76dd6 119
XinZhangMS 0:f7f1f0d76dd6 120 header_destroy(header);
XinZhangMS 0:f7f1f0d76dd6 121 }
XinZhangMS 0:f7f1f0d76dd6 122 }
XinZhangMS 0:f7f1f0d76dd6 123 else if (is_footer_type_by_descriptor(descriptor))
XinZhangMS 0:f7f1f0d76dd6 124 {
XinZhangMS 0:f7f1f0d76dd6 125 annotations footer = amqpvalue_get_inplace_described_value(decoded_value);
XinZhangMS 0:f7f1f0d76dd6 126 if (footer == NULL)
XinZhangMS 0:f7f1f0d76dd6 127 {
XinZhangMS 0:f7f1f0d76dd6 128 LogError("Error getting message footer");
XinZhangMS 0:f7f1f0d76dd6 129 message_receiver->decode_error = true;
XinZhangMS 0:f7f1f0d76dd6 130 }
XinZhangMS 0:f7f1f0d76dd6 131 else
XinZhangMS 0:f7f1f0d76dd6 132 {
XinZhangMS 0:f7f1f0d76dd6 133 if (message_set_footer(decoded_message, footer) != 0)
XinZhangMS 0:f7f1f0d76dd6 134 {
XinZhangMS 0:f7f1f0d76dd6 135 LogError("Error setting message footer on received message");
XinZhangMS 0:f7f1f0d76dd6 136 message_receiver->decode_error = true;
XinZhangMS 0:f7f1f0d76dd6 137 }
XinZhangMS 0:f7f1f0d76dd6 138 }
XinZhangMS 0:f7f1f0d76dd6 139 }
XinZhangMS 0:f7f1f0d76dd6 140 else if (is_amqp_value_type_by_descriptor(descriptor))
XinZhangMS 0:f7f1f0d76dd6 141 {
XinZhangMS 0:f7f1f0d76dd6 142 MESSAGE_BODY_TYPE body_type;
XinZhangMS 0:f7f1f0d76dd6 143 if (message_get_body_type(decoded_message, &body_type) != 0)
XinZhangMS 0:f7f1f0d76dd6 144 {
XinZhangMS 0:f7f1f0d76dd6 145 LogError("Error getting message body type");
XinZhangMS 0:f7f1f0d76dd6 146 message_receiver->decode_error = true;
XinZhangMS 0:f7f1f0d76dd6 147 }
XinZhangMS 0:f7f1f0d76dd6 148 else
XinZhangMS 0:f7f1f0d76dd6 149 {
XinZhangMS 0:f7f1f0d76dd6 150 if (body_type != MESSAGE_BODY_TYPE_NONE)
XinZhangMS 0:f7f1f0d76dd6 151 {
XinZhangMS 0:f7f1f0d76dd6 152 LogError("Body already set on received message");
XinZhangMS 0:f7f1f0d76dd6 153 message_receiver->decode_error = true;
XinZhangMS 0:f7f1f0d76dd6 154 }
XinZhangMS 0:f7f1f0d76dd6 155 else
XinZhangMS 0:f7f1f0d76dd6 156 {
XinZhangMS 0:f7f1f0d76dd6 157 AMQP_VALUE body_amqp_value = amqpvalue_get_inplace_described_value(decoded_value);
XinZhangMS 0:f7f1f0d76dd6 158 if (body_amqp_value == NULL)
XinZhangMS 0:f7f1f0d76dd6 159 {
XinZhangMS 0:f7f1f0d76dd6 160 LogError("Error getting body AMQP value");
XinZhangMS 0:f7f1f0d76dd6 161 message_receiver->decode_error = true;
XinZhangMS 0:f7f1f0d76dd6 162 }
XinZhangMS 0:f7f1f0d76dd6 163 else
XinZhangMS 0:f7f1f0d76dd6 164 {
XinZhangMS 0:f7f1f0d76dd6 165 if (message_set_body_amqp_value(decoded_message, body_amqp_value) != 0)
XinZhangMS 0:f7f1f0d76dd6 166 {
XinZhangMS 0:f7f1f0d76dd6 167 LogError("Error setting body AMQP value on received message");
XinZhangMS 0:f7f1f0d76dd6 168 message_receiver->decode_error = true;
XinZhangMS 0:f7f1f0d76dd6 169 }
XinZhangMS 0:f7f1f0d76dd6 170 }
XinZhangMS 0:f7f1f0d76dd6 171 }
XinZhangMS 0:f7f1f0d76dd6 172 }
XinZhangMS 0:f7f1f0d76dd6 173 }
XinZhangMS 0:f7f1f0d76dd6 174 else if (is_data_type_by_descriptor(descriptor))
XinZhangMS 0:f7f1f0d76dd6 175 {
XinZhangMS 0:f7f1f0d76dd6 176 MESSAGE_BODY_TYPE body_type;
XinZhangMS 0:f7f1f0d76dd6 177 if (message_get_body_type(decoded_message, &body_type) != 0)
XinZhangMS 0:f7f1f0d76dd6 178 {
XinZhangMS 0:f7f1f0d76dd6 179 LogError("Error getting message body type");
XinZhangMS 0:f7f1f0d76dd6 180 message_receiver->decode_error = true;
XinZhangMS 0:f7f1f0d76dd6 181 }
XinZhangMS 0:f7f1f0d76dd6 182 else
XinZhangMS 0:f7f1f0d76dd6 183 {
XinZhangMS 0:f7f1f0d76dd6 184 if ((body_type != MESSAGE_BODY_TYPE_NONE) &&
XinZhangMS 0:f7f1f0d76dd6 185 (body_type != MESSAGE_BODY_TYPE_DATA))
XinZhangMS 0:f7f1f0d76dd6 186 {
XinZhangMS 0:f7f1f0d76dd6 187 LogError("Message body type already set to something different than AMQP DATA");
XinZhangMS 0:f7f1f0d76dd6 188 message_receiver->decode_error = true;
XinZhangMS 0:f7f1f0d76dd6 189 }
XinZhangMS 0:f7f1f0d76dd6 190 else
XinZhangMS 0:f7f1f0d76dd6 191 {
XinZhangMS 0:f7f1f0d76dd6 192 AMQP_VALUE body_data_value = amqpvalue_get_inplace_described_value(decoded_value);
XinZhangMS 0:f7f1f0d76dd6 193 if (body_data_value == NULL)
XinZhangMS 0:f7f1f0d76dd6 194 {
XinZhangMS 0:f7f1f0d76dd6 195 LogError("Error getting body DATA value");
XinZhangMS 0:f7f1f0d76dd6 196 message_receiver->decode_error = true;
XinZhangMS 0:f7f1f0d76dd6 197 }
XinZhangMS 0:f7f1f0d76dd6 198 else
XinZhangMS 0:f7f1f0d76dd6 199 {
XinZhangMS 0:f7f1f0d76dd6 200 data data_value;
XinZhangMS 0:f7f1f0d76dd6 201 if (amqpvalue_get_data(body_data_value, &data_value) != 0)
XinZhangMS 0:f7f1f0d76dd6 202 {
XinZhangMS 0:f7f1f0d76dd6 203 LogError("Error getting body DATA AMQP value");
XinZhangMS 0:f7f1f0d76dd6 204 message_receiver->decode_error = true;
XinZhangMS 0:f7f1f0d76dd6 205 }
XinZhangMS 0:f7f1f0d76dd6 206 else
XinZhangMS 0:f7f1f0d76dd6 207 {
XinZhangMS 0:f7f1f0d76dd6 208 BINARY_DATA binary_data;
XinZhangMS 0:f7f1f0d76dd6 209 binary_data.bytes = (const unsigned char*)data_value.bytes;
XinZhangMS 0:f7f1f0d76dd6 210 binary_data.length = data_value.length;
XinZhangMS 0:f7f1f0d76dd6 211 if (message_add_body_amqp_data(decoded_message, binary_data) != 0)
XinZhangMS 0:f7f1f0d76dd6 212 {
XinZhangMS 0:f7f1f0d76dd6 213 LogError("Error adding body DATA to received message");
XinZhangMS 0:f7f1f0d76dd6 214 message_receiver->decode_error = true;
XinZhangMS 0:f7f1f0d76dd6 215 }
XinZhangMS 0:f7f1f0d76dd6 216 }
XinZhangMS 0:f7f1f0d76dd6 217 }
XinZhangMS 0:f7f1f0d76dd6 218 }
XinZhangMS 0:f7f1f0d76dd6 219 }
XinZhangMS 0:f7f1f0d76dd6 220 }
XinZhangMS 0:f7f1f0d76dd6 221 }
XinZhangMS 0:f7f1f0d76dd6 222
XinZhangMS 0:f7f1f0d76dd6 223 static AMQP_VALUE on_transfer_received(void* context, TRANSFER_HANDLE transfer, uint32_t payload_size, const unsigned char* payload_bytes)
XinZhangMS 0:f7f1f0d76dd6 224 {
XinZhangMS 0:f7f1f0d76dd6 225 AMQP_VALUE result = NULL;
XinZhangMS 0:f7f1f0d76dd6 226 MESSAGE_RECEIVER_INSTANCE* message_receiver = (MESSAGE_RECEIVER_INSTANCE*)context;
XinZhangMS 0:f7f1f0d76dd6 227
XinZhangMS 0:f7f1f0d76dd6 228 (void)transfer;
XinZhangMS 0:f7f1f0d76dd6 229 if (message_receiver->on_message_received != NULL)
XinZhangMS 0:f7f1f0d76dd6 230 {
XinZhangMS 0:f7f1f0d76dd6 231 MESSAGE_HANDLE message = message_create();
XinZhangMS 0:f7f1f0d76dd6 232 if (message == NULL)
XinZhangMS 0:f7f1f0d76dd6 233 {
XinZhangMS 0:f7f1f0d76dd6 234 LogError("Cannot create message");
XinZhangMS 0:f7f1f0d76dd6 235 set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
XinZhangMS 0:f7f1f0d76dd6 236 }
XinZhangMS 0:f7f1f0d76dd6 237 else
XinZhangMS 0:f7f1f0d76dd6 238 {
XinZhangMS 0:f7f1f0d76dd6 239 AMQPVALUE_DECODER_HANDLE amqpvalue_decoder = amqpvalue_decoder_create(decode_message_value_callback, message_receiver);
XinZhangMS 0:f7f1f0d76dd6 240 if (amqpvalue_decoder == NULL)
XinZhangMS 0:f7f1f0d76dd6 241 {
XinZhangMS 0:f7f1f0d76dd6 242 LogError("Cannot create AMQP value decoder");
XinZhangMS 0:f7f1f0d76dd6 243 set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
XinZhangMS 0:f7f1f0d76dd6 244 }
XinZhangMS 0:f7f1f0d76dd6 245 else
XinZhangMS 0:f7f1f0d76dd6 246 {
XinZhangMS 0:f7f1f0d76dd6 247 message_receiver->decoded_message = message;
XinZhangMS 0:f7f1f0d76dd6 248 message_receiver->decode_error = false;
XinZhangMS 0:f7f1f0d76dd6 249 if (amqpvalue_decode_bytes(amqpvalue_decoder, payload_bytes, payload_size) != 0)
XinZhangMS 0:f7f1f0d76dd6 250 {
XinZhangMS 0:f7f1f0d76dd6 251 LogError("Cannot decode bytes");
XinZhangMS 0:f7f1f0d76dd6 252 set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
XinZhangMS 0:f7f1f0d76dd6 253 }
XinZhangMS 0:f7f1f0d76dd6 254 else
XinZhangMS 0:f7f1f0d76dd6 255 {
XinZhangMS 0:f7f1f0d76dd6 256 if (message_receiver->decode_error)
XinZhangMS 0:f7f1f0d76dd6 257 {
XinZhangMS 0:f7f1f0d76dd6 258 LogError("Error decoding message");
XinZhangMS 0:f7f1f0d76dd6 259 set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
XinZhangMS 0:f7f1f0d76dd6 260 }
XinZhangMS 0:f7f1f0d76dd6 261 else
XinZhangMS 0:f7f1f0d76dd6 262 {
XinZhangMS 0:f7f1f0d76dd6 263 result = message_receiver->on_message_received(message_receiver->callback_context, message);
XinZhangMS 0:f7f1f0d76dd6 264 }
XinZhangMS 0:f7f1f0d76dd6 265 }
XinZhangMS 0:f7f1f0d76dd6 266
XinZhangMS 0:f7f1f0d76dd6 267 amqpvalue_decoder_destroy(amqpvalue_decoder);
XinZhangMS 0:f7f1f0d76dd6 268 }
XinZhangMS 0:f7f1f0d76dd6 269
XinZhangMS 0:f7f1f0d76dd6 270 message_destroy(message);
XinZhangMS 0:f7f1f0d76dd6 271 }
XinZhangMS 0:f7f1f0d76dd6 272 }
XinZhangMS 0:f7f1f0d76dd6 273
XinZhangMS 0:f7f1f0d76dd6 274 return result;
XinZhangMS 0:f7f1f0d76dd6 275 }
XinZhangMS 0:f7f1f0d76dd6 276
XinZhangMS 0:f7f1f0d76dd6 277 static void on_link_state_changed(void* context, LINK_STATE new_link_state, LINK_STATE previous_link_state)
XinZhangMS 0:f7f1f0d76dd6 278 {
XinZhangMS 0:f7f1f0d76dd6 279 MESSAGE_RECEIVER_INSTANCE* message_receiver = (MESSAGE_RECEIVER_INSTANCE*)context;
XinZhangMS 0:f7f1f0d76dd6 280 (void)previous_link_state;
XinZhangMS 0:f7f1f0d76dd6 281
XinZhangMS 0:f7f1f0d76dd6 282 switch (new_link_state)
XinZhangMS 0:f7f1f0d76dd6 283 {
XinZhangMS 0:f7f1f0d76dd6 284 default:
XinZhangMS 0:f7f1f0d76dd6 285 break;
XinZhangMS 0:f7f1f0d76dd6 286
XinZhangMS 0:f7f1f0d76dd6 287 case LINK_STATE_ATTACHED:
XinZhangMS 0:f7f1f0d76dd6 288 if (message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_OPENING)
XinZhangMS 0:f7f1f0d76dd6 289 {
XinZhangMS 0:f7f1f0d76dd6 290 set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_OPEN);
XinZhangMS 0:f7f1f0d76dd6 291 }
XinZhangMS 0:f7f1f0d76dd6 292 break;
XinZhangMS 0:f7f1f0d76dd6 293 case LINK_STATE_DETACHED:
XinZhangMS 0:f7f1f0d76dd6 294 if ((message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_OPEN) ||
XinZhangMS 0:f7f1f0d76dd6 295 (message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_CLOSING))
XinZhangMS 0:f7f1f0d76dd6 296 {
XinZhangMS 0:f7f1f0d76dd6 297 /* User initiated transition, we should be good */
XinZhangMS 0:f7f1f0d76dd6 298 set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_IDLE);
XinZhangMS 0:f7f1f0d76dd6 299 }
XinZhangMS 0:f7f1f0d76dd6 300 else if (message_receiver->message_receiver_state != MESSAGE_RECEIVER_STATE_IDLE)
XinZhangMS 0:f7f1f0d76dd6 301 {
XinZhangMS 0:f7f1f0d76dd6 302 /* Any other transition must be an error */
XinZhangMS 0:f7f1f0d76dd6 303 set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
XinZhangMS 0:f7f1f0d76dd6 304 }
XinZhangMS 0:f7f1f0d76dd6 305 break;
XinZhangMS 0:f7f1f0d76dd6 306 case LINK_STATE_ERROR:
XinZhangMS 0:f7f1f0d76dd6 307 if (message_receiver->message_receiver_state != MESSAGE_RECEIVER_STATE_ERROR)
XinZhangMS 0:f7f1f0d76dd6 308 {
XinZhangMS 0:f7f1f0d76dd6 309 set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
XinZhangMS 0:f7f1f0d76dd6 310 }
XinZhangMS 0:f7f1f0d76dd6 311 break;
XinZhangMS 0:f7f1f0d76dd6 312 }
XinZhangMS 0:f7f1f0d76dd6 313 }
XinZhangMS 0:f7f1f0d76dd6 314
XinZhangMS 0:f7f1f0d76dd6 315 MESSAGE_RECEIVER_HANDLE messagereceiver_create(LINK_HANDLE link, ON_MESSAGE_RECEIVER_STATE_CHANGED on_message_receiver_state_changed, void* context)
XinZhangMS 0:f7f1f0d76dd6 316 {
XinZhangMS 0:f7f1f0d76dd6 317 MESSAGE_RECEIVER_INSTANCE* message_receiver = (MESSAGE_RECEIVER_INSTANCE*)malloc(sizeof(MESSAGE_RECEIVER_INSTANCE));
XinZhangMS 0:f7f1f0d76dd6 318 if (message_receiver == NULL)
XinZhangMS 0:f7f1f0d76dd6 319 {
XinZhangMS 0:f7f1f0d76dd6 320 LogError("Error creating message receiver");
XinZhangMS 0:f7f1f0d76dd6 321 }
XinZhangMS 0:f7f1f0d76dd6 322 else
XinZhangMS 0:f7f1f0d76dd6 323 {
XinZhangMS 0:f7f1f0d76dd6 324 message_receiver->link = link;
XinZhangMS 0:f7f1f0d76dd6 325 message_receiver->on_message_receiver_state_changed = on_message_receiver_state_changed;
XinZhangMS 0:f7f1f0d76dd6 326 message_receiver->on_message_receiver_state_changed_context = context;
XinZhangMS 0:f7f1f0d76dd6 327 message_receiver->message_receiver_state = MESSAGE_RECEIVER_STATE_IDLE;
XinZhangMS 0:f7f1f0d76dd6 328 }
XinZhangMS 0:f7f1f0d76dd6 329
XinZhangMS 0:f7f1f0d76dd6 330 return message_receiver;
XinZhangMS 0:f7f1f0d76dd6 331 }
XinZhangMS 0:f7f1f0d76dd6 332
XinZhangMS 0:f7f1f0d76dd6 333 void messagereceiver_destroy(MESSAGE_RECEIVER_HANDLE message_receiver)
XinZhangMS 0:f7f1f0d76dd6 334 {
XinZhangMS 0:f7f1f0d76dd6 335 if (message_receiver == NULL)
XinZhangMS 0:f7f1f0d76dd6 336 {
XinZhangMS 0:f7f1f0d76dd6 337 LogError("NULL message_receiver");
XinZhangMS 0:f7f1f0d76dd6 338 }
XinZhangMS 0:f7f1f0d76dd6 339 else
XinZhangMS 0:f7f1f0d76dd6 340 {
XinZhangMS 0:f7f1f0d76dd6 341 (void)messagereceiver_close(message_receiver);
XinZhangMS 0:f7f1f0d76dd6 342 free(message_receiver);
XinZhangMS 0:f7f1f0d76dd6 343 }
XinZhangMS 0:f7f1f0d76dd6 344 }
XinZhangMS 0:f7f1f0d76dd6 345
XinZhangMS 0:f7f1f0d76dd6 346 int messagereceiver_open(MESSAGE_RECEIVER_HANDLE message_receiver, ON_MESSAGE_RECEIVED on_message_received, void* callback_context)
XinZhangMS 0:f7f1f0d76dd6 347 {
XinZhangMS 0:f7f1f0d76dd6 348 int result;
XinZhangMS 0:f7f1f0d76dd6 349
XinZhangMS 0:f7f1f0d76dd6 350 if (message_receiver == NULL)
XinZhangMS 0:f7f1f0d76dd6 351 {
XinZhangMS 0:f7f1f0d76dd6 352 LogError("NULL message_receiver");
XinZhangMS 0:f7f1f0d76dd6 353 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 354 }
XinZhangMS 0:f7f1f0d76dd6 355 else
XinZhangMS 0:f7f1f0d76dd6 356 {
XinZhangMS 0:f7f1f0d76dd6 357 if (message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_IDLE)
XinZhangMS 0:f7f1f0d76dd6 358 {
XinZhangMS 0:f7f1f0d76dd6 359 set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_OPENING);
XinZhangMS 0:f7f1f0d76dd6 360 if (link_attach(message_receiver->link, on_transfer_received, on_link_state_changed, NULL, message_receiver) != 0)
XinZhangMS 0:f7f1f0d76dd6 361 {
XinZhangMS 0:f7f1f0d76dd6 362 LogError("Link attach failed");
XinZhangMS 0:f7f1f0d76dd6 363 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 364 set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
XinZhangMS 0:f7f1f0d76dd6 365 }
XinZhangMS 0:f7f1f0d76dd6 366 else
XinZhangMS 0:f7f1f0d76dd6 367 {
XinZhangMS 0:f7f1f0d76dd6 368 message_receiver->on_message_received = on_message_received;
XinZhangMS 0:f7f1f0d76dd6 369 message_receiver->callback_context = callback_context;
XinZhangMS 0:f7f1f0d76dd6 370
XinZhangMS 0:f7f1f0d76dd6 371 result = 0;
XinZhangMS 0:f7f1f0d76dd6 372 }
XinZhangMS 0:f7f1f0d76dd6 373 }
XinZhangMS 0:f7f1f0d76dd6 374 else
XinZhangMS 0:f7f1f0d76dd6 375 {
XinZhangMS 0:f7f1f0d76dd6 376 result = 0;
XinZhangMS 0:f7f1f0d76dd6 377 }
XinZhangMS 0:f7f1f0d76dd6 378 }
XinZhangMS 0:f7f1f0d76dd6 379
XinZhangMS 0:f7f1f0d76dd6 380 return result;
XinZhangMS 0:f7f1f0d76dd6 381 }
XinZhangMS 0:f7f1f0d76dd6 382
XinZhangMS 0:f7f1f0d76dd6 383 int messagereceiver_close(MESSAGE_RECEIVER_HANDLE message_receiver)
XinZhangMS 0:f7f1f0d76dd6 384 {
XinZhangMS 0:f7f1f0d76dd6 385 int result;
XinZhangMS 0:f7f1f0d76dd6 386
XinZhangMS 0:f7f1f0d76dd6 387 if (message_receiver == NULL)
XinZhangMS 0:f7f1f0d76dd6 388 {
XinZhangMS 0:f7f1f0d76dd6 389 LogError("NULL message_receiver");
XinZhangMS 0:f7f1f0d76dd6 390 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 391 }
XinZhangMS 0:f7f1f0d76dd6 392 else
XinZhangMS 0:f7f1f0d76dd6 393 {
XinZhangMS 0:f7f1f0d76dd6 394 if ((message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_OPENING) ||
XinZhangMS 0:f7f1f0d76dd6 395 (message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_OPEN))
XinZhangMS 0:f7f1f0d76dd6 396 {
XinZhangMS 0:f7f1f0d76dd6 397 set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_CLOSING);
XinZhangMS 0:f7f1f0d76dd6 398
XinZhangMS 0:f7f1f0d76dd6 399 if (link_detach(message_receiver->link, true, NULL, NULL, NULL) != 0)
XinZhangMS 0:f7f1f0d76dd6 400 {
XinZhangMS 0:f7f1f0d76dd6 401 LogError("link detach failed");
XinZhangMS 0:f7f1f0d76dd6 402 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 403 set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
XinZhangMS 0:f7f1f0d76dd6 404 }
XinZhangMS 0:f7f1f0d76dd6 405 else
XinZhangMS 0:f7f1f0d76dd6 406 {
XinZhangMS 0:f7f1f0d76dd6 407 result = 0;
XinZhangMS 0:f7f1f0d76dd6 408 }
XinZhangMS 0:f7f1f0d76dd6 409 }
XinZhangMS 0:f7f1f0d76dd6 410 else
XinZhangMS 0:f7f1f0d76dd6 411 {
XinZhangMS 0:f7f1f0d76dd6 412 result = 0;
XinZhangMS 0:f7f1f0d76dd6 413 }
XinZhangMS 0:f7f1f0d76dd6 414 }
XinZhangMS 0:f7f1f0d76dd6 415
XinZhangMS 0:f7f1f0d76dd6 416 return result;
XinZhangMS 0:f7f1f0d76dd6 417 }
XinZhangMS 0:f7f1f0d76dd6 418
XinZhangMS 0:f7f1f0d76dd6 419 int messagereceiver_get_link_name(MESSAGE_RECEIVER_HANDLE message_receiver, const char** link_name)
XinZhangMS 0:f7f1f0d76dd6 420 {
XinZhangMS 0:f7f1f0d76dd6 421 int result;
XinZhangMS 0:f7f1f0d76dd6 422
XinZhangMS 0:f7f1f0d76dd6 423 if (message_receiver == NULL)
XinZhangMS 0:f7f1f0d76dd6 424 {
XinZhangMS 0:f7f1f0d76dd6 425 LogError("NULL message_receiver");
XinZhangMS 0:f7f1f0d76dd6 426 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 427 }
XinZhangMS 0:f7f1f0d76dd6 428 else
XinZhangMS 0:f7f1f0d76dd6 429 {
XinZhangMS 0:f7f1f0d76dd6 430 if (link_get_name(message_receiver->link, link_name) != 0)
XinZhangMS 0:f7f1f0d76dd6 431 {
XinZhangMS 0:f7f1f0d76dd6 432 LogError("Getting link name failed");
XinZhangMS 0:f7f1f0d76dd6 433 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 434 }
XinZhangMS 0:f7f1f0d76dd6 435 else
XinZhangMS 0:f7f1f0d76dd6 436 {
XinZhangMS 0:f7f1f0d76dd6 437 result = 0;
XinZhangMS 0:f7f1f0d76dd6 438 }
XinZhangMS 0:f7f1f0d76dd6 439 }
XinZhangMS 0:f7f1f0d76dd6 440
XinZhangMS 0:f7f1f0d76dd6 441 return result;
XinZhangMS 0:f7f1f0d76dd6 442 }
XinZhangMS 0:f7f1f0d76dd6 443
XinZhangMS 0:f7f1f0d76dd6 444 int messagereceiver_get_received_message_id(MESSAGE_RECEIVER_HANDLE message_receiver, delivery_number* message_id)
XinZhangMS 0:f7f1f0d76dd6 445 {
XinZhangMS 0:f7f1f0d76dd6 446 int result;
XinZhangMS 0:f7f1f0d76dd6 447
XinZhangMS 0:f7f1f0d76dd6 448 if (message_receiver == NULL)
XinZhangMS 0:f7f1f0d76dd6 449 {
XinZhangMS 0:f7f1f0d76dd6 450 LogError("NULL message_receiver");
XinZhangMS 0:f7f1f0d76dd6 451 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 452 }
XinZhangMS 0:f7f1f0d76dd6 453 else
XinZhangMS 0:f7f1f0d76dd6 454 {
XinZhangMS 0:f7f1f0d76dd6 455 if (link_get_received_message_id(message_receiver->link, message_id) != 0)
XinZhangMS 0:f7f1f0d76dd6 456 {
XinZhangMS 0:f7f1f0d76dd6 457 LogError("Failed getting received message Id");
XinZhangMS 0:f7f1f0d76dd6 458 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 459 }
XinZhangMS 0:f7f1f0d76dd6 460 else
XinZhangMS 0:f7f1f0d76dd6 461 {
XinZhangMS 0:f7f1f0d76dd6 462 result = 0;
XinZhangMS 0:f7f1f0d76dd6 463 }
XinZhangMS 0:f7f1f0d76dd6 464 }
XinZhangMS 0:f7f1f0d76dd6 465
XinZhangMS 0:f7f1f0d76dd6 466 return result;
XinZhangMS 0:f7f1f0d76dd6 467 }
XinZhangMS 0:f7f1f0d76dd6 468
XinZhangMS 0:f7f1f0d76dd6 469 int messagereceiver_send_message_disposition(MESSAGE_RECEIVER_HANDLE message_receiver, const char* link_name, delivery_number message_number, AMQP_VALUE delivery_state)
XinZhangMS 0:f7f1f0d76dd6 470 {
XinZhangMS 0:f7f1f0d76dd6 471 int result;
XinZhangMS 0:f7f1f0d76dd6 472
XinZhangMS 0:f7f1f0d76dd6 473 if (message_receiver == NULL)
XinZhangMS 0:f7f1f0d76dd6 474 {
XinZhangMS 0:f7f1f0d76dd6 475 LogError("NULL message_receiver");
XinZhangMS 0:f7f1f0d76dd6 476 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 477 }
XinZhangMS 0:f7f1f0d76dd6 478 else
XinZhangMS 0:f7f1f0d76dd6 479 {
XinZhangMS 0:f7f1f0d76dd6 480 if (message_receiver->message_receiver_state != MESSAGE_RECEIVER_STATE_OPEN)
XinZhangMS 0:f7f1f0d76dd6 481 {
XinZhangMS 0:f7f1f0d76dd6 482 LogError("Message received not open");
XinZhangMS 0:f7f1f0d76dd6 483 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 484 }
XinZhangMS 0:f7f1f0d76dd6 485 else
XinZhangMS 0:f7f1f0d76dd6 486 {
XinZhangMS 0:f7f1f0d76dd6 487 const char* my_name;
XinZhangMS 0:f7f1f0d76dd6 488 if (link_get_name(message_receiver->link, &my_name) != 0)
XinZhangMS 0:f7f1f0d76dd6 489 {
XinZhangMS 0:f7f1f0d76dd6 490 LogError("Failed getting link name");
XinZhangMS 0:f7f1f0d76dd6 491 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 492 }
XinZhangMS 0:f7f1f0d76dd6 493 else
XinZhangMS 0:f7f1f0d76dd6 494 {
XinZhangMS 0:f7f1f0d76dd6 495 if (strcmp(link_name, my_name) != 0)
XinZhangMS 0:f7f1f0d76dd6 496 {
XinZhangMS 0:f7f1f0d76dd6 497 LogError("Link name does not match");
XinZhangMS 0:f7f1f0d76dd6 498 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 499 }
XinZhangMS 0:f7f1f0d76dd6 500 else
XinZhangMS 0:f7f1f0d76dd6 501 {
XinZhangMS 0:f7f1f0d76dd6 502 if (link_send_disposition(message_receiver->link, message_number, delivery_state) != 0)
XinZhangMS 0:f7f1f0d76dd6 503 {
XinZhangMS 0:f7f1f0d76dd6 504 LogError("Seding disposition failed");
XinZhangMS 0:f7f1f0d76dd6 505 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 506 }
XinZhangMS 0:f7f1f0d76dd6 507 else
XinZhangMS 0:f7f1f0d76dd6 508 {
XinZhangMS 0:f7f1f0d76dd6 509 result = 0;
XinZhangMS 0:f7f1f0d76dd6 510 }
XinZhangMS 0:f7f1f0d76dd6 511 }
XinZhangMS 0:f7f1f0d76dd6 512 }
XinZhangMS 0:f7f1f0d76dd6 513 }
XinZhangMS 0:f7f1f0d76dd6 514 }
XinZhangMS 0:f7f1f0d76dd6 515
XinZhangMS 0:f7f1f0d76dd6 516 return result;
XinZhangMS 0:f7f1f0d76dd6 517 }
XinZhangMS 0:f7f1f0d76dd6 518
XinZhangMS 0:f7f1f0d76dd6 519 void messagereceiver_set_trace(MESSAGE_RECEIVER_HANDLE message_receiver, bool trace_on)
XinZhangMS 0:f7f1f0d76dd6 520 {
XinZhangMS 0:f7f1f0d76dd6 521 if (message_receiver == NULL)
XinZhangMS 0:f7f1f0d76dd6 522 {
XinZhangMS 0:f7f1f0d76dd6 523 LogError("NULL message_receiver");
XinZhangMS 0:f7f1f0d76dd6 524 }
XinZhangMS 0:f7f1f0d76dd6 525 else
XinZhangMS 0:f7f1f0d76dd6 526 {
XinZhangMS 0:f7f1f0d76dd6 527 /* No tracing is yet implemented for message receiver */
XinZhangMS 0:f7f1f0d76dd6 528 (void)trace_on;
XinZhangMS 0:f7f1f0d76dd6 529 }
XinZhangMS 0:f7f1f0d76dd6 530 }