A small memory footprint AMQP implimentation

Dependents:   iothub_client_sample_amqp remote_monitoring simplesample_amqp

Committer:
AzureIoTClient
Date:
Sun Apr 24 16:40:31 2016 -0700
Revision:
1:eab586236bfe
Parent:
0:6ae2f7bca550
Child:
5:ae49385aff34
1.0.5

Who changed what in which revision?

UserRevisionLine numberNew contents of line
Azure.IoT Build 0:6ae2f7bca550 1 // Copyright (c) Microsoft. All rights reserved.
Azure.IoT Build 0:6ae2f7bca550 2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
Azure.IoT Build 0:6ae2f7bca550 3
Azure.IoT Build 0:6ae2f7bca550 4 #include <stdlib.h>
Azure.IoT Build 0:6ae2f7bca550 5 #ifdef _CRTDBG_MAP_ALLOC
Azure.IoT Build 0:6ae2f7bca550 6 #include <crtdbg.h>
Azure.IoT Build 0:6ae2f7bca550 7 #endif
Azure.IoT Build 0:6ae2f7bca550 8
Azure.IoT Build 0:6ae2f7bca550 9 #include <stdio.h>
Azure.IoT Build 0:6ae2f7bca550 10 #include <string.h>
Azure.IoT Build 0:6ae2f7bca550 11 #include "azure_uamqp_c/amqp_management.h"
Azure.IoT Build 0:6ae2f7bca550 12 #include "azure_uamqp_c/link.h"
Azure.IoT Build 0:6ae2f7bca550 13 #include "azure_uamqp_c/amqpalloc.h"
Azure.IoT Build 0:6ae2f7bca550 14 #include "azure_uamqp_c/message_sender.h"
Azure.IoT Build 0:6ae2f7bca550 15 #include "azure_uamqp_c/message_receiver.h"
Azure.IoT Build 0:6ae2f7bca550 16 #include "azure_uamqp_c/messaging.h"
Azure.IoT Build 0:6ae2f7bca550 17 #include "azure_uamqp_c/amqpvalue_to_string.h"
Azure.IoT Build 0:6ae2f7bca550 18 #include "azure_uamqp_c/consolelogger.h"
Azure.IoT Build 0:6ae2f7bca550 19
Azure.IoT Build 0:6ae2f7bca550 20 typedef enum OPERATION_STATE_TAG
Azure.IoT Build 0:6ae2f7bca550 21 {
Azure.IoT Build 0:6ae2f7bca550 22 OPERATION_STATE_NOT_SENT,
Azure.IoT Build 0:6ae2f7bca550 23 OPERATION_STATE_AWAIT_REPLY
Azure.IoT Build 0:6ae2f7bca550 24 } OPERATION_STATE;
Azure.IoT Build 0:6ae2f7bca550 25
Azure.IoT Build 0:6ae2f7bca550 26 typedef struct OPERATION_MESSAGE_INSTANCE_TAG
Azure.IoT Build 0:6ae2f7bca550 27 {
Azure.IoT Build 0:6ae2f7bca550 28 MESSAGE_HANDLE message;
Azure.IoT Build 0:6ae2f7bca550 29 OPERATION_STATE operation_state;
Azure.IoT Build 0:6ae2f7bca550 30 ON_OPERATION_COMPLETE on_operation_complete;
Azure.IoT Build 0:6ae2f7bca550 31 void* callback_context;
Azure.IoT Build 0:6ae2f7bca550 32 unsigned long message_id;
Azure.IoT Build 0:6ae2f7bca550 33 } OPERATION_MESSAGE_INSTANCE;
Azure.IoT Build 0:6ae2f7bca550 34
Azure.IoT Build 0:6ae2f7bca550 35 typedef struct AMQP_MANAGEMENT_INSTANCE_TAG
Azure.IoT Build 0:6ae2f7bca550 36 {
Azure.IoT Build 0:6ae2f7bca550 37 SESSION_HANDLE session;
Azure.IoT Build 0:6ae2f7bca550 38 LINK_HANDLE sender_link;
Azure.IoT Build 0:6ae2f7bca550 39 LINK_HANDLE receiver_link;
Azure.IoT Build 0:6ae2f7bca550 40 MESSAGE_SENDER_HANDLE message_sender;
Azure.IoT Build 0:6ae2f7bca550 41 MESSAGE_RECEIVER_HANDLE message_receiver;
Azure.IoT Build 0:6ae2f7bca550 42 OPERATION_MESSAGE_INSTANCE** operation_messages;
Azure.IoT Build 0:6ae2f7bca550 43 size_t operation_message_count;
Azure.IoT Build 0:6ae2f7bca550 44 unsigned long next_message_id;
Azure.IoT Build 0:6ae2f7bca550 45 ON_AMQP_MANAGEMENT_STATE_CHANGED on_amqp_management_state_changed;
Azure.IoT Build 0:6ae2f7bca550 46 void* callback_context;
Azure.IoT Build 0:6ae2f7bca550 47 AMQP_MANAGEMENT_STATE amqp_management_state;
Azure.IoT Build 0:6ae2f7bca550 48 AMQP_MANAGEMENT_STATE previous_amqp_management_state;
Azure.IoT Build 0:6ae2f7bca550 49 unsigned char sender_connected : 1;
Azure.IoT Build 0:6ae2f7bca550 50 unsigned char receiver_connected : 1;
Azure.IoT Build 0:6ae2f7bca550 51 } AMQP_MANAGEMENT_INSTANCE;
Azure.IoT Build 0:6ae2f7bca550 52
Azure.IoT Build 0:6ae2f7bca550 53 static void amqpmanagement_set_state(AMQP_MANAGEMENT_INSTANCE* amqp_management_instance, AMQP_MANAGEMENT_STATE amqp_management_state)
Azure.IoT Build 0:6ae2f7bca550 54 {
Azure.IoT Build 0:6ae2f7bca550 55 amqp_management_instance->previous_amqp_management_state = amqp_management_instance->amqp_management_state;
Azure.IoT Build 0:6ae2f7bca550 56 amqp_management_instance->amqp_management_state = amqp_management_state;
Azure.IoT Build 0:6ae2f7bca550 57
Azure.IoT Build 0:6ae2f7bca550 58 if (amqp_management_instance->on_amqp_management_state_changed != NULL)
Azure.IoT Build 0:6ae2f7bca550 59 {
Azure.IoT Build 0:6ae2f7bca550 60 amqp_management_instance->on_amqp_management_state_changed(amqp_management_instance->callback_context, amqp_management_instance->amqp_management_state, amqp_management_instance->previous_amqp_management_state);
Azure.IoT Build 0:6ae2f7bca550 61 }
Azure.IoT Build 0:6ae2f7bca550 62 }
Azure.IoT Build 0:6ae2f7bca550 63
Azure.IoT Build 0:6ae2f7bca550 64 static void remove_operation_message_by_index(AMQP_MANAGEMENT_INSTANCE* amqp_management_instance, size_t index)
Azure.IoT Build 0:6ae2f7bca550 65 {
Azure.IoT Build 0:6ae2f7bca550 66 message_destroy(amqp_management_instance->operation_messages[index]->message);
Azure.IoT Build 0:6ae2f7bca550 67 amqpalloc_free(amqp_management_instance->operation_messages[index]);
Azure.IoT Build 0:6ae2f7bca550 68
Azure.IoT Build 0:6ae2f7bca550 69 if (amqp_management_instance->operation_message_count - index > 1)
Azure.IoT Build 0:6ae2f7bca550 70 {
AzureIoTClient 1:eab586236bfe 71 memmove(&amqp_management_instance->operation_messages[index], &amqp_management_instance->operation_messages[index + 1], sizeof(OPERATION_MESSAGE_INSTANCE*) * (amqp_management_instance->operation_message_count - index - 1));
Azure.IoT Build 0:6ae2f7bca550 72 }
Azure.IoT Build 0:6ae2f7bca550 73
Azure.IoT Build 0:6ae2f7bca550 74 if (amqp_management_instance->operation_message_count == 1)
Azure.IoT Build 0:6ae2f7bca550 75 {
Azure.IoT Build 0:6ae2f7bca550 76 amqpalloc_free(amqp_management_instance->operation_messages);
Azure.IoT Build 0:6ae2f7bca550 77 amqp_management_instance->operation_messages = NULL;
Azure.IoT Build 0:6ae2f7bca550 78 }
Azure.IoT Build 0:6ae2f7bca550 79 else
Azure.IoT Build 0:6ae2f7bca550 80 {
Azure.IoT Build 0:6ae2f7bca550 81 OPERATION_MESSAGE_INSTANCE** new_operation_messages = (OPERATION_MESSAGE_INSTANCE**)amqpalloc_realloc(amqp_management_instance->operation_messages, sizeof(OPERATION_MESSAGE_INSTANCE*) * (amqp_management_instance->operation_message_count - 1));
Azure.IoT Build 0:6ae2f7bca550 82 if (new_operation_messages != NULL)
Azure.IoT Build 0:6ae2f7bca550 83 {
Azure.IoT Build 0:6ae2f7bca550 84 amqp_management_instance->operation_messages = new_operation_messages;
Azure.IoT Build 0:6ae2f7bca550 85 }
Azure.IoT Build 0:6ae2f7bca550 86 }
Azure.IoT Build 0:6ae2f7bca550 87
Azure.IoT Build 0:6ae2f7bca550 88 amqp_management_instance->operation_message_count--;
Azure.IoT Build 0:6ae2f7bca550 89 }
Azure.IoT Build 0:6ae2f7bca550 90
Azure.IoT Build 0:6ae2f7bca550 91 static AMQP_VALUE on_message_received(const void* context, MESSAGE_HANDLE message)
Azure.IoT Build 0:6ae2f7bca550 92 {
Azure.IoT Build 0:6ae2f7bca550 93 AMQP_MANAGEMENT_INSTANCE* amqp_management_instance = (AMQP_MANAGEMENT_INSTANCE*)context;
Azure.IoT Build 0:6ae2f7bca550 94
Azure.IoT Build 0:6ae2f7bca550 95 AMQP_VALUE application_properties;
Azure.IoT Build 0:6ae2f7bca550 96 if (message_get_application_properties(message, &application_properties) != 0)
Azure.IoT Build 0:6ae2f7bca550 97 {
Azure.IoT Build 0:6ae2f7bca550 98 /* error */
Azure.IoT Build 0:6ae2f7bca550 99 }
Azure.IoT Build 0:6ae2f7bca550 100 else
Azure.IoT Build 0:6ae2f7bca550 101 {
Azure.IoT Build 0:6ae2f7bca550 102 PROPERTIES_HANDLE response_properties;
Azure.IoT Build 0:6ae2f7bca550 103
Azure.IoT Build 0:6ae2f7bca550 104 if (message_get_properties(message, &response_properties) != 0)
Azure.IoT Build 0:6ae2f7bca550 105 {
Azure.IoT Build 0:6ae2f7bca550 106 /* error */
Azure.IoT Build 0:6ae2f7bca550 107 }
Azure.IoT Build 0:6ae2f7bca550 108 else
Azure.IoT Build 0:6ae2f7bca550 109 {
AzureIoTClient 1:eab586236bfe 110 AMQP_VALUE key;
AzureIoTClient 1:eab586236bfe 111 AMQP_VALUE value;
AzureIoTClient 1:eab586236bfe 112 AMQP_VALUE desc_key;
AzureIoTClient 1:eab586236bfe 113 AMQP_VALUE desc_value;
Azure.IoT Build 0:6ae2f7bca550 114 AMQP_VALUE map;
Azure.IoT Build 0:6ae2f7bca550 115 AMQP_VALUE correlation_id_value;
Azure.IoT Build 0:6ae2f7bca550 116
Azure.IoT Build 0:6ae2f7bca550 117 if (properties_get_correlation_id(response_properties, &correlation_id_value) != 0)
Azure.IoT Build 0:6ae2f7bca550 118 {
Azure.IoT Build 0:6ae2f7bca550 119 /* error */
Azure.IoT Build 0:6ae2f7bca550 120 }
Azure.IoT Build 0:6ae2f7bca550 121 else
Azure.IoT Build 0:6ae2f7bca550 122 {
Azure.IoT Build 0:6ae2f7bca550 123 map = amqpvalue_get_inplace_described_value(application_properties);
Azure.IoT Build 0:6ae2f7bca550 124 if (map == NULL)
Azure.IoT Build 0:6ae2f7bca550 125 {
Azure.IoT Build 0:6ae2f7bca550 126 /* error */
Azure.IoT Build 0:6ae2f7bca550 127 }
Azure.IoT Build 0:6ae2f7bca550 128 else
Azure.IoT Build 0:6ae2f7bca550 129 {
Azure.IoT Build 0:6ae2f7bca550 130 key = amqpvalue_create_string("status-code");
Azure.IoT Build 0:6ae2f7bca550 131 if (key == NULL)
Azure.IoT Build 0:6ae2f7bca550 132 {
Azure.IoT Build 0:6ae2f7bca550 133 /* error */
Azure.IoT Build 0:6ae2f7bca550 134 }
Azure.IoT Build 0:6ae2f7bca550 135 else
Azure.IoT Build 0:6ae2f7bca550 136 {
Azure.IoT Build 0:6ae2f7bca550 137 value = amqpvalue_get_map_value(map, key);
Azure.IoT Build 0:6ae2f7bca550 138 if (value == NULL)
Azure.IoT Build 0:6ae2f7bca550 139 {
Azure.IoT Build 0:6ae2f7bca550 140 /* error */
Azure.IoT Build 0:6ae2f7bca550 141 }
Azure.IoT Build 0:6ae2f7bca550 142 else
Azure.IoT Build 0:6ae2f7bca550 143 {
Azure.IoT Build 0:6ae2f7bca550 144 int32_t status_code;
Azure.IoT Build 0:6ae2f7bca550 145 if (amqpvalue_get_int(value, &status_code) != 0)
Azure.IoT Build 0:6ae2f7bca550 146 {
Azure.IoT Build 0:6ae2f7bca550 147 /* error */
Azure.IoT Build 0:6ae2f7bca550 148 }
AzureIoTClient 1:eab586236bfe 149 else
AzureIoTClient 1:eab586236bfe 150 {
AzureIoTClient 1:eab586236bfe 151 desc_key = amqpvalue_create_string("status-description");
AzureIoTClient 1:eab586236bfe 152 if (desc_key == NULL)
AzureIoTClient 1:eab586236bfe 153 {
AzureIoTClient 1:eab586236bfe 154 /* error */
AzureIoTClient 1:eab586236bfe 155 }
AzureIoTClient 1:eab586236bfe 156 else
AzureIoTClient 1:eab586236bfe 157 {
AzureIoTClient 1:eab586236bfe 158 const char* status_description = NULL;
AzureIoTClient 1:eab586236bfe 159
AzureIoTClient 1:eab586236bfe 160 desc_value = amqpvalue_get_map_value(map, desc_key);
AzureIoTClient 1:eab586236bfe 161 if (desc_value != NULL)
AzureIoTClient 1:eab586236bfe 162 {
AzureIoTClient 1:eab586236bfe 163 amqpvalue_get_string(desc_value, &status_description);
AzureIoTClient 1:eab586236bfe 164 }
AzureIoTClient 1:eab586236bfe 165
AzureIoTClient 1:eab586236bfe 166 size_t i = 0;
AzureIoTClient 1:eab586236bfe 167 while (i < amqp_management_instance->operation_message_count)
AzureIoTClient 1:eab586236bfe 168 {
AzureIoTClient 1:eab586236bfe 169 if (amqp_management_instance->operation_messages[i]->operation_state == OPERATION_STATE_AWAIT_REPLY)
AzureIoTClient 1:eab586236bfe 170 {
AzureIoTClient 1:eab586236bfe 171 AMQP_VALUE expected_message_id = amqpvalue_create_ulong(amqp_management_instance->operation_messages[i]->message_id);
AzureIoTClient 1:eab586236bfe 172 OPERATION_RESULT operation_result;
Azure.IoT Build 0:6ae2f7bca550 173
AzureIoTClient 1:eab586236bfe 174 if (expected_message_id == NULL)
AzureIoTClient 1:eab586236bfe 175 {
AzureIoTClient 1:eab586236bfe 176 break;
AzureIoTClient 1:eab586236bfe 177 }
AzureIoTClient 1:eab586236bfe 178 else
AzureIoTClient 1:eab586236bfe 179 {
AzureIoTClient 1:eab586236bfe 180 if (amqpvalue_are_equal(correlation_id_value, expected_message_id))
AzureIoTClient 1:eab586236bfe 181 {
AzureIoTClient 1:eab586236bfe 182 /* 202 is not mentioned in the draft in any way, this is a workaround for an EH bug for now */
AzureIoTClient 1:eab586236bfe 183 if ((status_code != 200) && (status_code != 202))
AzureIoTClient 1:eab586236bfe 184 {
AzureIoTClient 1:eab586236bfe 185 operation_result = OPERATION_RESULT_OPERATION_FAILED;
AzureIoTClient 1:eab586236bfe 186 }
AzureIoTClient 1:eab586236bfe 187 else
AzureIoTClient 1:eab586236bfe 188 {
AzureIoTClient 1:eab586236bfe 189 operation_result = OPERATION_RESULT_OK;
AzureIoTClient 1:eab586236bfe 190 }
AzureIoTClient 1:eab586236bfe 191
AzureIoTClient 1:eab586236bfe 192 amqp_management_instance->operation_messages[i]->on_operation_complete(amqp_management_instance->operation_messages[i]->callback_context, operation_result, status_code, status_description);
AzureIoTClient 1:eab586236bfe 193
AzureIoTClient 1:eab586236bfe 194 remove_operation_message_by_index(amqp_management_instance, i);
Azure.IoT Build 0:6ae2f7bca550 195
AzureIoTClient 1:eab586236bfe 196 amqpvalue_destroy(expected_message_id);
Azure.IoT Build 0:6ae2f7bca550 197
AzureIoTClient 1:eab586236bfe 198 break;
AzureIoTClient 1:eab586236bfe 199 }
AzureIoTClient 1:eab586236bfe 200 else
AzureIoTClient 1:eab586236bfe 201 {
AzureIoTClient 1:eab586236bfe 202 i++;
AzureIoTClient 1:eab586236bfe 203 }
Azure.IoT Build 0:6ae2f7bca550 204
AzureIoTClient 1:eab586236bfe 205 amqpvalue_destroy(expected_message_id);
AzureIoTClient 1:eab586236bfe 206 }
AzureIoTClient 1:eab586236bfe 207 }
AzureIoTClient 1:eab586236bfe 208 else
AzureIoTClient 1:eab586236bfe 209 {
AzureIoTClient 1:eab586236bfe 210 i++;
AzureIoTClient 1:eab586236bfe 211 }
AzureIoTClient 1:eab586236bfe 212 }
Azure.IoT Build 0:6ae2f7bca550 213
AzureIoTClient 1:eab586236bfe 214 if (desc_value != NULL)
AzureIoTClient 1:eab586236bfe 215 {
AzureIoTClient 1:eab586236bfe 216 amqpvalue_destroy(desc_value);
AzureIoTClient 1:eab586236bfe 217 }
AzureIoTClient 1:eab586236bfe 218 amqpvalue_destroy(desc_key);
Azure.IoT Build 0:6ae2f7bca550 219 }
Azure.IoT Build 0:6ae2f7bca550 220 }
Azure.IoT Build 0:6ae2f7bca550 221 amqpvalue_destroy(value);
Azure.IoT Build 0:6ae2f7bca550 222 }
Azure.IoT Build 0:6ae2f7bca550 223 amqpvalue_destroy(key);
Azure.IoT Build 0:6ae2f7bca550 224 }
Azure.IoT Build 0:6ae2f7bca550 225 }
Azure.IoT Build 0:6ae2f7bca550 226 }
Azure.IoT Build 0:6ae2f7bca550 227
Azure.IoT Build 0:6ae2f7bca550 228 properties_destroy(response_properties);
Azure.IoT Build 0:6ae2f7bca550 229 }
Azure.IoT Build 0:6ae2f7bca550 230
Azure.IoT Build 0:6ae2f7bca550 231 application_properties_destroy(application_properties);
Azure.IoT Build 0:6ae2f7bca550 232 }
Azure.IoT Build 0:6ae2f7bca550 233
Azure.IoT Build 0:6ae2f7bca550 234 return messaging_delivery_accepted();
Azure.IoT Build 0:6ae2f7bca550 235 }
Azure.IoT Build 0:6ae2f7bca550 236
Azure.IoT Build 0:6ae2f7bca550 237 static int send_operation_messages(AMQP_MANAGEMENT_INSTANCE* amqp_management_instance)
Azure.IoT Build 0:6ae2f7bca550 238 {
Azure.IoT Build 0:6ae2f7bca550 239 int result;
Azure.IoT Build 0:6ae2f7bca550 240
Azure.IoT Build 0:6ae2f7bca550 241 if ((amqp_management_instance->sender_connected != 0) &&
Azure.IoT Build 0:6ae2f7bca550 242 (amqp_management_instance->receiver_connected != 0))
Azure.IoT Build 0:6ae2f7bca550 243 {
Azure.IoT Build 0:6ae2f7bca550 244 size_t i;
Azure.IoT Build 0:6ae2f7bca550 245
Azure.IoT Build 0:6ae2f7bca550 246 for (i = 0; i < amqp_management_instance->operation_message_count; i++)
Azure.IoT Build 0:6ae2f7bca550 247 {
Azure.IoT Build 0:6ae2f7bca550 248 if (amqp_management_instance->operation_messages[i]->operation_state == OPERATION_STATE_NOT_SENT)
Azure.IoT Build 0:6ae2f7bca550 249 {
Azure.IoT Build 0:6ae2f7bca550 250 if (messagesender_send(amqp_management_instance->message_sender, amqp_management_instance->operation_messages[i]->message, NULL, NULL) != 0)
Azure.IoT Build 0:6ae2f7bca550 251 {
Azure.IoT Build 0:6ae2f7bca550 252 /* error */
Azure.IoT Build 0:6ae2f7bca550 253 break;
Azure.IoT Build 0:6ae2f7bca550 254 }
Azure.IoT Build 0:6ae2f7bca550 255
Azure.IoT Build 0:6ae2f7bca550 256 amqp_management_instance->operation_messages[i]->operation_state = OPERATION_STATE_AWAIT_REPLY;
Azure.IoT Build 0:6ae2f7bca550 257 }
Azure.IoT Build 0:6ae2f7bca550 258 }
Azure.IoT Build 0:6ae2f7bca550 259
Azure.IoT Build 0:6ae2f7bca550 260 if (i < amqp_management_instance->operation_message_count)
Azure.IoT Build 0:6ae2f7bca550 261 {
Azure.IoT Build 0:6ae2f7bca550 262 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 263 }
Azure.IoT Build 0:6ae2f7bca550 264 else
Azure.IoT Build 0:6ae2f7bca550 265 {
Azure.IoT Build 0:6ae2f7bca550 266 result = 0;
Azure.IoT Build 0:6ae2f7bca550 267 }
Azure.IoT Build 0:6ae2f7bca550 268 }
Azure.IoT Build 0:6ae2f7bca550 269 else
Azure.IoT Build 0:6ae2f7bca550 270 {
Azure.IoT Build 0:6ae2f7bca550 271 result = 0;
Azure.IoT Build 0:6ae2f7bca550 272 }
Azure.IoT Build 0:6ae2f7bca550 273
Azure.IoT Build 0:6ae2f7bca550 274 return result;
Azure.IoT Build 0:6ae2f7bca550 275 }
Azure.IoT Build 0:6ae2f7bca550 276
Azure.IoT Build 0:6ae2f7bca550 277 static void on_message_sender_state_changed(void* context, MESSAGE_SENDER_STATE new_state, MESSAGE_SENDER_STATE previous_state)
Azure.IoT Build 0:6ae2f7bca550 278 {
Azure.IoT Build 0:6ae2f7bca550 279 AMQP_MANAGEMENT_INSTANCE* amqp_management_instance = (AMQP_MANAGEMENT_INSTANCE*)context;
Azure.IoT Build 0:6ae2f7bca550 280 switch (new_state)
Azure.IoT Build 0:6ae2f7bca550 281 {
Azure.IoT Build 0:6ae2f7bca550 282 default:
Azure.IoT Build 0:6ae2f7bca550 283 break;
Azure.IoT Build 0:6ae2f7bca550 284
Azure.IoT Build 0:6ae2f7bca550 285 case MESSAGE_SENDER_STATE_OPEN:
Azure.IoT Build 0:6ae2f7bca550 286 amqp_management_instance->sender_connected = 1;
Azure.IoT Build 0:6ae2f7bca550 287 (void)send_operation_messages(amqp_management_instance);
Azure.IoT Build 0:6ae2f7bca550 288 break;
Azure.IoT Build 0:6ae2f7bca550 289
Azure.IoT Build 0:6ae2f7bca550 290 case MESSAGE_SENDER_STATE_CLOSING:
Azure.IoT Build 0:6ae2f7bca550 291 case MESSAGE_SENDER_STATE_ERROR:
Azure.IoT Build 0:6ae2f7bca550 292 amqp_management_instance->sender_connected = 0;
Azure.IoT Build 0:6ae2f7bca550 293 amqpmanagement_set_state(amqp_management_instance, AMQP_MANAGEMENT_STATE_ERROR);
Azure.IoT Build 0:6ae2f7bca550 294 break;
Azure.IoT Build 0:6ae2f7bca550 295 }
Azure.IoT Build 0:6ae2f7bca550 296 }
Azure.IoT Build 0:6ae2f7bca550 297
Azure.IoT Build 0:6ae2f7bca550 298 static void on_message_receiver_state_changed(const void* context, MESSAGE_RECEIVER_STATE new_state, MESSAGE_RECEIVER_STATE previous_state)
Azure.IoT Build 0:6ae2f7bca550 299 {
Azure.IoT Build 0:6ae2f7bca550 300 AMQP_MANAGEMENT_INSTANCE* amqp_management_instance = (AMQP_MANAGEMENT_INSTANCE*)context;
Azure.IoT Build 0:6ae2f7bca550 301 switch (new_state)
Azure.IoT Build 0:6ae2f7bca550 302 {
Azure.IoT Build 0:6ae2f7bca550 303 default:
Azure.IoT Build 0:6ae2f7bca550 304 break;
Azure.IoT Build 0:6ae2f7bca550 305
Azure.IoT Build 0:6ae2f7bca550 306 case MESSAGE_RECEIVER_STATE_OPEN:
Azure.IoT Build 0:6ae2f7bca550 307 amqp_management_instance->receiver_connected = 1;
Azure.IoT Build 0:6ae2f7bca550 308 (void)send_operation_messages(amqp_management_instance);
Azure.IoT Build 0:6ae2f7bca550 309 break;
Azure.IoT Build 0:6ae2f7bca550 310
Azure.IoT Build 0:6ae2f7bca550 311 case MESSAGE_RECEIVER_STATE_CLOSING:
Azure.IoT Build 0:6ae2f7bca550 312 case MESSAGE_RECEIVER_STATE_ERROR:
Azure.IoT Build 0:6ae2f7bca550 313 amqp_management_instance->receiver_connected = 0;
Azure.IoT Build 0:6ae2f7bca550 314 amqpmanagement_set_state(amqp_management_instance, AMQP_MANAGEMENT_STATE_ERROR);
Azure.IoT Build 0:6ae2f7bca550 315 break;
Azure.IoT Build 0:6ae2f7bca550 316 }
Azure.IoT Build 0:6ae2f7bca550 317 }
Azure.IoT Build 0:6ae2f7bca550 318
Azure.IoT Build 0:6ae2f7bca550 319 static int set_message_id(MESSAGE_HANDLE message, unsigned long next_message_id)
Azure.IoT Build 0:6ae2f7bca550 320 {
Azure.IoT Build 0:6ae2f7bca550 321 int result = 0;
Azure.IoT Build 0:6ae2f7bca550 322
Azure.IoT Build 0:6ae2f7bca550 323 PROPERTIES_HANDLE properties;
Azure.IoT Build 0:6ae2f7bca550 324 if (message_get_properties(message, &properties) != 0)
Azure.IoT Build 0:6ae2f7bca550 325 {
Azure.IoT Build 0:6ae2f7bca550 326 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 327 }
Azure.IoT Build 0:6ae2f7bca550 328 else
Azure.IoT Build 0:6ae2f7bca550 329 {
Azure.IoT Build 0:6ae2f7bca550 330 AMQP_VALUE message_id = amqpvalue_create_message_id_ulong(next_message_id);
Azure.IoT Build 0:6ae2f7bca550 331 if (message_id == NULL)
Azure.IoT Build 0:6ae2f7bca550 332 {
Azure.IoT Build 0:6ae2f7bca550 333 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 334 }
Azure.IoT Build 0:6ae2f7bca550 335 else
Azure.IoT Build 0:6ae2f7bca550 336 {
Azure.IoT Build 0:6ae2f7bca550 337 if (properties_set_message_id(properties, message_id) != 0)
Azure.IoT Build 0:6ae2f7bca550 338 {
Azure.IoT Build 0:6ae2f7bca550 339 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 340 }
Azure.IoT Build 0:6ae2f7bca550 341
Azure.IoT Build 0:6ae2f7bca550 342 amqpvalue_destroy(message_id);
Azure.IoT Build 0:6ae2f7bca550 343 }
Azure.IoT Build 0:6ae2f7bca550 344
Azure.IoT Build 0:6ae2f7bca550 345 if (message_set_properties(message, properties) != 0)
Azure.IoT Build 0:6ae2f7bca550 346 {
Azure.IoT Build 0:6ae2f7bca550 347 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 348 }
Azure.IoT Build 0:6ae2f7bca550 349
Azure.IoT Build 0:6ae2f7bca550 350 properties_destroy(properties);
Azure.IoT Build 0:6ae2f7bca550 351 }
Azure.IoT Build 0:6ae2f7bca550 352
Azure.IoT Build 0:6ae2f7bca550 353 return result;
Azure.IoT Build 0:6ae2f7bca550 354 }
Azure.IoT Build 0:6ae2f7bca550 355
Azure.IoT Build 0:6ae2f7bca550 356 static int add_string_key_value_pair_to_map(AMQP_VALUE map, const char* key, const char* value)
Azure.IoT Build 0:6ae2f7bca550 357 {
Azure.IoT Build 0:6ae2f7bca550 358 int result;
Azure.IoT Build 0:6ae2f7bca550 359
Azure.IoT Build 0:6ae2f7bca550 360 AMQP_VALUE key_value = amqpvalue_create_string(key);
Azure.IoT Build 0:6ae2f7bca550 361 if (key == NULL)
Azure.IoT Build 0:6ae2f7bca550 362 {
Azure.IoT Build 0:6ae2f7bca550 363 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 364 }
Azure.IoT Build 0:6ae2f7bca550 365 else
Azure.IoT Build 0:6ae2f7bca550 366 {
Azure.IoT Build 0:6ae2f7bca550 367 AMQP_VALUE value_value = amqpvalue_create_string(value);
Azure.IoT Build 0:6ae2f7bca550 368 if (value_value == NULL)
Azure.IoT Build 0:6ae2f7bca550 369 {
Azure.IoT Build 0:6ae2f7bca550 370 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 371 }
Azure.IoT Build 0:6ae2f7bca550 372 else
Azure.IoT Build 0:6ae2f7bca550 373 {
Azure.IoT Build 0:6ae2f7bca550 374 if (amqpvalue_set_map_value(map, key_value, value_value) != 0)
Azure.IoT Build 0:6ae2f7bca550 375 {
Azure.IoT Build 0:6ae2f7bca550 376 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 377 }
Azure.IoT Build 0:6ae2f7bca550 378 else
Azure.IoT Build 0:6ae2f7bca550 379 {
Azure.IoT Build 0:6ae2f7bca550 380 result = 0;
Azure.IoT Build 0:6ae2f7bca550 381 }
Azure.IoT Build 0:6ae2f7bca550 382
Azure.IoT Build 0:6ae2f7bca550 383 amqpvalue_destroy(key_value);
Azure.IoT Build 0:6ae2f7bca550 384 }
Azure.IoT Build 0:6ae2f7bca550 385
Azure.IoT Build 0:6ae2f7bca550 386 amqpvalue_destroy(value_value);
Azure.IoT Build 0:6ae2f7bca550 387 }
Azure.IoT Build 0:6ae2f7bca550 388
Azure.IoT Build 0:6ae2f7bca550 389 return result;
Azure.IoT Build 0:6ae2f7bca550 390 }
Azure.IoT Build 0:6ae2f7bca550 391
Azure.IoT Build 0:6ae2f7bca550 392 AMQP_MANAGEMENT_HANDLE amqpmanagement_create(SESSION_HANDLE session, const char* management_node, ON_AMQP_MANAGEMENT_STATE_CHANGED on_amqp_management_state_changed, void* callback_context)
Azure.IoT Build 0:6ae2f7bca550 393 {
Azure.IoT Build 0:6ae2f7bca550 394 AMQP_MANAGEMENT_INSTANCE* result;
Azure.IoT Build 0:6ae2f7bca550 395
Azure.IoT Build 0:6ae2f7bca550 396 if (session == NULL)
Azure.IoT Build 0:6ae2f7bca550 397 {
Azure.IoT Build 0:6ae2f7bca550 398 result = NULL;
Azure.IoT Build 0:6ae2f7bca550 399 }
Azure.IoT Build 0:6ae2f7bca550 400 else
Azure.IoT Build 0:6ae2f7bca550 401 {
Azure.IoT Build 0:6ae2f7bca550 402 result = (AMQP_MANAGEMENT_INSTANCE*)amqpalloc_malloc(sizeof(AMQP_MANAGEMENT_INSTANCE));
Azure.IoT Build 0:6ae2f7bca550 403 if (result != NULL)
Azure.IoT Build 0:6ae2f7bca550 404 {
Azure.IoT Build 0:6ae2f7bca550 405 result->session = session;
Azure.IoT Build 0:6ae2f7bca550 406 result->sender_connected = 0;
Azure.IoT Build 0:6ae2f7bca550 407 result->receiver_connected = 0;
Azure.IoT Build 0:6ae2f7bca550 408 result->operation_message_count = 0;
Azure.IoT Build 0:6ae2f7bca550 409 result->operation_messages = NULL;
Azure.IoT Build 0:6ae2f7bca550 410 result->on_amqp_management_state_changed = on_amqp_management_state_changed;
Azure.IoT Build 0:6ae2f7bca550 411 result->callback_context = callback_context;
Azure.IoT Build 0:6ae2f7bca550 412
Azure.IoT Build 0:6ae2f7bca550 413 AMQP_VALUE source = messaging_create_source(management_node);
Azure.IoT Build 0:6ae2f7bca550 414 if (source == NULL)
Azure.IoT Build 0:6ae2f7bca550 415 {
Azure.IoT Build 0:6ae2f7bca550 416 amqpalloc_free(result);
Azure.IoT Build 0:6ae2f7bca550 417 result = NULL;
Azure.IoT Build 0:6ae2f7bca550 418 }
Azure.IoT Build 0:6ae2f7bca550 419 else
Azure.IoT Build 0:6ae2f7bca550 420 {
Azure.IoT Build 0:6ae2f7bca550 421 AMQP_VALUE target = messaging_create_target(management_node);
Azure.IoT Build 0:6ae2f7bca550 422 if (target == NULL)
Azure.IoT Build 0:6ae2f7bca550 423 {
Azure.IoT Build 0:6ae2f7bca550 424 amqpalloc_free(result);
Azure.IoT Build 0:6ae2f7bca550 425 result = NULL;
Azure.IoT Build 0:6ae2f7bca550 426 }
Azure.IoT Build 0:6ae2f7bca550 427 else
Azure.IoT Build 0:6ae2f7bca550 428 {
Azure.IoT Build 0:6ae2f7bca550 429 static const char* sender_suffix = "-sender";
Azure.IoT Build 0:6ae2f7bca550 430
Azure.IoT Build 0:6ae2f7bca550 431 char* sender_link_name = (char*)amqpalloc_malloc(strlen(management_node) + strlen(sender_suffix) + 1);
Azure.IoT Build 0:6ae2f7bca550 432 if (sender_link_name == NULL)
Azure.IoT Build 0:6ae2f7bca550 433 {
Azure.IoT Build 0:6ae2f7bca550 434 result = NULL;
Azure.IoT Build 0:6ae2f7bca550 435 }
Azure.IoT Build 0:6ae2f7bca550 436 else
Azure.IoT Build 0:6ae2f7bca550 437 {
Azure.IoT Build 0:6ae2f7bca550 438 static const char* receiver_suffix = "-receiver";
Azure.IoT Build 0:6ae2f7bca550 439
Azure.IoT Build 0:6ae2f7bca550 440 (void)strcpy(sender_link_name, management_node);
Azure.IoT Build 0:6ae2f7bca550 441 (void)strcat(sender_link_name, sender_suffix);
Azure.IoT Build 0:6ae2f7bca550 442
Azure.IoT Build 0:6ae2f7bca550 443 char* receiver_link_name = (char*)amqpalloc_malloc(strlen(management_node) + strlen(receiver_suffix) + 1);
Azure.IoT Build 0:6ae2f7bca550 444 if (receiver_link_name == NULL)
Azure.IoT Build 0:6ae2f7bca550 445 {
Azure.IoT Build 0:6ae2f7bca550 446 result = NULL;
Azure.IoT Build 0:6ae2f7bca550 447 }
Azure.IoT Build 0:6ae2f7bca550 448 else
Azure.IoT Build 0:6ae2f7bca550 449 {
Azure.IoT Build 0:6ae2f7bca550 450 (void)strcpy(receiver_link_name, management_node);
Azure.IoT Build 0:6ae2f7bca550 451 (void)strcat(receiver_link_name, receiver_suffix);
Azure.IoT Build 0:6ae2f7bca550 452
Azure.IoT Build 0:6ae2f7bca550 453 result->sender_link = link_create(session, "cbs-sender", role_sender, source, target);
Azure.IoT Build 0:6ae2f7bca550 454 if (result->sender_link == NULL)
Azure.IoT Build 0:6ae2f7bca550 455 {
Azure.IoT Build 0:6ae2f7bca550 456 amqpalloc_free(result);
Azure.IoT Build 0:6ae2f7bca550 457 result = NULL;
Azure.IoT Build 0:6ae2f7bca550 458 }
Azure.IoT Build 0:6ae2f7bca550 459 else
Azure.IoT Build 0:6ae2f7bca550 460 {
Azure.IoT Build 0:6ae2f7bca550 461 result->receiver_link = link_create(session, "cbs-receiver", role_receiver, source, target);
Azure.IoT Build 0:6ae2f7bca550 462 if (result->receiver_link == NULL)
Azure.IoT Build 0:6ae2f7bca550 463 {
Azure.IoT Build 0:6ae2f7bca550 464 link_destroy(result->sender_link);
Azure.IoT Build 0:6ae2f7bca550 465 amqpalloc_free(result);
Azure.IoT Build 0:6ae2f7bca550 466 result = NULL;
Azure.IoT Build 0:6ae2f7bca550 467 }
Azure.IoT Build 0:6ae2f7bca550 468 else
Azure.IoT Build 0:6ae2f7bca550 469 {
Azure.IoT Build 0:6ae2f7bca550 470 if ((link_set_max_message_size(result->sender_link, 65535) != 0) ||
Azure.IoT Build 0:6ae2f7bca550 471 (link_set_max_message_size(result->receiver_link, 65535) != 0))
Azure.IoT Build 0:6ae2f7bca550 472 {
Azure.IoT Build 0:6ae2f7bca550 473 link_destroy(result->sender_link);
Azure.IoT Build 0:6ae2f7bca550 474 link_destroy(result->receiver_link);
Azure.IoT Build 0:6ae2f7bca550 475 amqpalloc_free(result);
Azure.IoT Build 0:6ae2f7bca550 476 result = NULL;
Azure.IoT Build 0:6ae2f7bca550 477 }
Azure.IoT Build 0:6ae2f7bca550 478 else
Azure.IoT Build 0:6ae2f7bca550 479 {
Azure.IoT Build 0:6ae2f7bca550 480 result->message_sender = messagesender_create(result->sender_link, on_message_sender_state_changed, result, NULL);
Azure.IoT Build 0:6ae2f7bca550 481 if (result->message_sender == NULL)
Azure.IoT Build 0:6ae2f7bca550 482 {
Azure.IoT Build 0:6ae2f7bca550 483 link_destroy(result->sender_link);
Azure.IoT Build 0:6ae2f7bca550 484 link_destroy(result->receiver_link);
Azure.IoT Build 0:6ae2f7bca550 485 amqpalloc_free(result);
Azure.IoT Build 0:6ae2f7bca550 486 result = NULL;
Azure.IoT Build 0:6ae2f7bca550 487 }
Azure.IoT Build 0:6ae2f7bca550 488 else
Azure.IoT Build 0:6ae2f7bca550 489 {
Azure.IoT Build 0:6ae2f7bca550 490 result->message_receiver = messagereceiver_create(result->receiver_link, on_message_receiver_state_changed, result);
Azure.IoT Build 0:6ae2f7bca550 491 if (result->message_receiver == NULL)
Azure.IoT Build 0:6ae2f7bca550 492 {
Azure.IoT Build 0:6ae2f7bca550 493 messagesender_destroy(result->message_sender);
Azure.IoT Build 0:6ae2f7bca550 494 link_destroy(result->sender_link);
Azure.IoT Build 0:6ae2f7bca550 495 link_destroy(result->receiver_link);
Azure.IoT Build 0:6ae2f7bca550 496 amqpalloc_free(result);
Azure.IoT Build 0:6ae2f7bca550 497 result = NULL;
Azure.IoT Build 0:6ae2f7bca550 498 }
Azure.IoT Build 0:6ae2f7bca550 499 else
Azure.IoT Build 0:6ae2f7bca550 500 {
Azure.IoT Build 0:6ae2f7bca550 501 result->next_message_id = 0;
Azure.IoT Build 0:6ae2f7bca550 502 }
Azure.IoT Build 0:6ae2f7bca550 503 }
Azure.IoT Build 0:6ae2f7bca550 504 }
Azure.IoT Build 0:6ae2f7bca550 505 }
Azure.IoT Build 0:6ae2f7bca550 506 }
Azure.IoT Build 0:6ae2f7bca550 507
Azure.IoT Build 0:6ae2f7bca550 508 amqpalloc_free(receiver_link_name);
Azure.IoT Build 0:6ae2f7bca550 509 }
Azure.IoT Build 0:6ae2f7bca550 510
Azure.IoT Build 0:6ae2f7bca550 511 amqpalloc_free(sender_link_name);
Azure.IoT Build 0:6ae2f7bca550 512 }
Azure.IoT Build 0:6ae2f7bca550 513
Azure.IoT Build 0:6ae2f7bca550 514 amqpvalue_destroy(target);
Azure.IoT Build 0:6ae2f7bca550 515 }
Azure.IoT Build 0:6ae2f7bca550 516
Azure.IoT Build 0:6ae2f7bca550 517 amqpvalue_destroy(source);
Azure.IoT Build 0:6ae2f7bca550 518 }
Azure.IoT Build 0:6ae2f7bca550 519 }
Azure.IoT Build 0:6ae2f7bca550 520 }
Azure.IoT Build 0:6ae2f7bca550 521
Azure.IoT Build 0:6ae2f7bca550 522 return result;
Azure.IoT Build 0:6ae2f7bca550 523 }
Azure.IoT Build 0:6ae2f7bca550 524
Azure.IoT Build 0:6ae2f7bca550 525 void amqpmanagement_destroy(AMQP_MANAGEMENT_HANDLE amqp_management)
Azure.IoT Build 0:6ae2f7bca550 526 {
Azure.IoT Build 0:6ae2f7bca550 527 if (amqp_management != NULL)
Azure.IoT Build 0:6ae2f7bca550 528 {
Azure.IoT Build 0:6ae2f7bca550 529 (void)amqpmanagement_close(amqp_management);
Azure.IoT Build 0:6ae2f7bca550 530
Azure.IoT Build 0:6ae2f7bca550 531 if (amqp_management->operation_message_count > 0)
Azure.IoT Build 0:6ae2f7bca550 532 {
Azure.IoT Build 0:6ae2f7bca550 533 size_t i;
Azure.IoT Build 0:6ae2f7bca550 534 for (i = 0; i < amqp_management->operation_message_count; i++)
Azure.IoT Build 0:6ae2f7bca550 535 {
Azure.IoT Build 0:6ae2f7bca550 536 message_destroy(amqp_management->operation_messages[i]->message);
Azure.IoT Build 0:6ae2f7bca550 537 amqpalloc_free(amqp_management->operation_messages[i]);
Azure.IoT Build 0:6ae2f7bca550 538 }
Azure.IoT Build 0:6ae2f7bca550 539
Azure.IoT Build 0:6ae2f7bca550 540 amqpalloc_free(amqp_management->operation_messages);
Azure.IoT Build 0:6ae2f7bca550 541 }
Azure.IoT Build 0:6ae2f7bca550 542
Azure.IoT Build 0:6ae2f7bca550 543 link_destroy(amqp_management->sender_link);
Azure.IoT Build 0:6ae2f7bca550 544 link_destroy(amqp_management->receiver_link);
Azure.IoT Build 0:6ae2f7bca550 545 messagesender_destroy(amqp_management->message_sender);
Azure.IoT Build 0:6ae2f7bca550 546 messagereceiver_destroy(amqp_management->message_receiver);
Azure.IoT Build 0:6ae2f7bca550 547 amqpalloc_free(amqp_management);
Azure.IoT Build 0:6ae2f7bca550 548 }
Azure.IoT Build 0:6ae2f7bca550 549 }
Azure.IoT Build 0:6ae2f7bca550 550
Azure.IoT Build 0:6ae2f7bca550 551 int amqpmanagement_open(AMQP_MANAGEMENT_HANDLE amqp_management)
Azure.IoT Build 0:6ae2f7bca550 552 {
Azure.IoT Build 0:6ae2f7bca550 553 int result;
Azure.IoT Build 0:6ae2f7bca550 554
Azure.IoT Build 0:6ae2f7bca550 555 if (amqp_management == NULL)
Azure.IoT Build 0:6ae2f7bca550 556 {
Azure.IoT Build 0:6ae2f7bca550 557 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 558 }
Azure.IoT Build 0:6ae2f7bca550 559 else
Azure.IoT Build 0:6ae2f7bca550 560 {
Azure.IoT Build 0:6ae2f7bca550 561 if (messagereceiver_open(amqp_management->message_receiver, on_message_received, amqp_management) != 0)
Azure.IoT Build 0:6ae2f7bca550 562 {
Azure.IoT Build 0:6ae2f7bca550 563 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 564 }
Azure.IoT Build 0:6ae2f7bca550 565 else
Azure.IoT Build 0:6ae2f7bca550 566 {
Azure.IoT Build 0:6ae2f7bca550 567 if (messagesender_open(amqp_management->message_sender) != 0)
Azure.IoT Build 0:6ae2f7bca550 568 {
Azure.IoT Build 0:6ae2f7bca550 569 messagereceiver_close(amqp_management->message_receiver);
Azure.IoT Build 0:6ae2f7bca550 570 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 571 }
Azure.IoT Build 0:6ae2f7bca550 572 else
Azure.IoT Build 0:6ae2f7bca550 573 {
Azure.IoT Build 0:6ae2f7bca550 574 result = 0;
Azure.IoT Build 0:6ae2f7bca550 575 }
Azure.IoT Build 0:6ae2f7bca550 576 }
Azure.IoT Build 0:6ae2f7bca550 577 }
Azure.IoT Build 0:6ae2f7bca550 578
Azure.IoT Build 0:6ae2f7bca550 579 return result;
Azure.IoT Build 0:6ae2f7bca550 580 }
Azure.IoT Build 0:6ae2f7bca550 581
Azure.IoT Build 0:6ae2f7bca550 582 int amqpmanagement_close(AMQP_MANAGEMENT_HANDLE amqp_management)
Azure.IoT Build 0:6ae2f7bca550 583 {
Azure.IoT Build 0:6ae2f7bca550 584 int result;
Azure.IoT Build 0:6ae2f7bca550 585
Azure.IoT Build 0:6ae2f7bca550 586 if (amqp_management == NULL)
Azure.IoT Build 0:6ae2f7bca550 587 {
Azure.IoT Build 0:6ae2f7bca550 588 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 589 }
Azure.IoT Build 0:6ae2f7bca550 590 else
Azure.IoT Build 0:6ae2f7bca550 591 {
Azure.IoT Build 0:6ae2f7bca550 592 if ((messagesender_close(amqp_management->message_sender) != 0) ||
Azure.IoT Build 0:6ae2f7bca550 593 (messagereceiver_close(amqp_management->message_receiver) != 0))
Azure.IoT Build 0:6ae2f7bca550 594 {
Azure.IoT Build 0:6ae2f7bca550 595 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 596 }
Azure.IoT Build 0:6ae2f7bca550 597 else
Azure.IoT Build 0:6ae2f7bca550 598 {
Azure.IoT Build 0:6ae2f7bca550 599 result = 0;
Azure.IoT Build 0:6ae2f7bca550 600 }
Azure.IoT Build 0:6ae2f7bca550 601 }
Azure.IoT Build 0:6ae2f7bca550 602
Azure.IoT Build 0:6ae2f7bca550 603 return result;
Azure.IoT Build 0:6ae2f7bca550 604 }
Azure.IoT Build 0:6ae2f7bca550 605
Azure.IoT Build 0:6ae2f7bca550 606 int amqpmanagement_start_operation(AMQP_MANAGEMENT_HANDLE amqp_management, const char* operation, const char* type, const char* locales, MESSAGE_HANDLE message, ON_OPERATION_COMPLETE on_operation_complete, void* context)
Azure.IoT Build 0:6ae2f7bca550 607 {
Azure.IoT Build 0:6ae2f7bca550 608 int result;
Azure.IoT Build 0:6ae2f7bca550 609
Azure.IoT Build 0:6ae2f7bca550 610 if ((amqp_management == NULL) ||
Azure.IoT Build 0:6ae2f7bca550 611 (operation == NULL))
Azure.IoT Build 0:6ae2f7bca550 612 {
Azure.IoT Build 0:6ae2f7bca550 613 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 614 }
Azure.IoT Build 0:6ae2f7bca550 615 else
Azure.IoT Build 0:6ae2f7bca550 616 {
Azure.IoT Build 0:6ae2f7bca550 617 AMQP_VALUE application_properties;
Azure.IoT Build 0:6ae2f7bca550 618 if (message_get_application_properties(message, &application_properties) != 0)
Azure.IoT Build 0:6ae2f7bca550 619 {
Azure.IoT Build 0:6ae2f7bca550 620 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 621 }
Azure.IoT Build 0:6ae2f7bca550 622 else
Azure.IoT Build 0:6ae2f7bca550 623 {
Azure.IoT Build 0:6ae2f7bca550 624 if ((add_string_key_value_pair_to_map(application_properties, "operation", operation) != 0) ||
Azure.IoT Build 0:6ae2f7bca550 625 (add_string_key_value_pair_to_map(application_properties, "type", type) != 0) ||
Azure.IoT Build 0:6ae2f7bca550 626 ((locales != NULL) && (add_string_key_value_pair_to_map(application_properties, "locales", locales) != 0)))
Azure.IoT Build 0:6ae2f7bca550 627 {
Azure.IoT Build 0:6ae2f7bca550 628 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 629 }
Azure.IoT Build 0:6ae2f7bca550 630 else
Azure.IoT Build 0:6ae2f7bca550 631 {
Azure.IoT Build 0:6ae2f7bca550 632 if ((message_set_application_properties(message, application_properties) != 0) ||
Azure.IoT Build 0:6ae2f7bca550 633 (set_message_id(message, amqp_management->next_message_id) != 0))
Azure.IoT Build 0:6ae2f7bca550 634 {
Azure.IoT Build 0:6ae2f7bca550 635 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 636 }
Azure.IoT Build 0:6ae2f7bca550 637 else
Azure.IoT Build 0:6ae2f7bca550 638 {
Azure.IoT Build 0:6ae2f7bca550 639 OPERATION_MESSAGE_INSTANCE* pending_operation_message = amqpalloc_malloc(sizeof(OPERATION_MESSAGE_INSTANCE));
Azure.IoT Build 0:6ae2f7bca550 640 if (pending_operation_message == NULL)
Azure.IoT Build 0:6ae2f7bca550 641 {
Azure.IoT Build 0:6ae2f7bca550 642 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 643 }
Azure.IoT Build 0:6ae2f7bca550 644 else
Azure.IoT Build 0:6ae2f7bca550 645 {
Azure.IoT Build 0:6ae2f7bca550 646 pending_operation_message->message = message_clone(message);
Azure.IoT Build 0:6ae2f7bca550 647 pending_operation_message->callback_context = context;
Azure.IoT Build 0:6ae2f7bca550 648 pending_operation_message->on_operation_complete = on_operation_complete;
Azure.IoT Build 0:6ae2f7bca550 649 pending_operation_message->operation_state = OPERATION_STATE_NOT_SENT;
Azure.IoT Build 0:6ae2f7bca550 650 pending_operation_message->message_id = amqp_management->next_message_id;
Azure.IoT Build 0:6ae2f7bca550 651
Azure.IoT Build 0:6ae2f7bca550 652 amqp_management->next_message_id++;
Azure.IoT Build 0:6ae2f7bca550 653
Azure.IoT Build 0:6ae2f7bca550 654 OPERATION_MESSAGE_INSTANCE** new_operation_messages = amqpalloc_realloc(amqp_management->operation_messages, (amqp_management->operation_message_count + 1) * sizeof(OPERATION_MESSAGE_INSTANCE*));
Azure.IoT Build 0:6ae2f7bca550 655 if (new_operation_messages == NULL)
Azure.IoT Build 0:6ae2f7bca550 656 {
Azure.IoT Build 0:6ae2f7bca550 657 message_destroy(message);
Azure.IoT Build 0:6ae2f7bca550 658 amqpalloc_free(pending_operation_message);
Azure.IoT Build 0:6ae2f7bca550 659 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 660 }
Azure.IoT Build 0:6ae2f7bca550 661 else
Azure.IoT Build 0:6ae2f7bca550 662 {
Azure.IoT Build 0:6ae2f7bca550 663 amqp_management->operation_messages = new_operation_messages;
Azure.IoT Build 0:6ae2f7bca550 664 amqp_management->operation_messages[amqp_management->operation_message_count] = pending_operation_message;
Azure.IoT Build 0:6ae2f7bca550 665 amqp_management->operation_message_count++;
Azure.IoT Build 0:6ae2f7bca550 666
Azure.IoT Build 0:6ae2f7bca550 667 if (send_operation_messages(amqp_management) != 0)
Azure.IoT Build 0:6ae2f7bca550 668 {
Azure.IoT Build 0:6ae2f7bca550 669 if (on_operation_complete != NULL)
Azure.IoT Build 0:6ae2f7bca550 670 {
Azure.IoT Build 0:6ae2f7bca550 671 on_operation_complete(context, OPERATION_RESULT_CBS_ERROR, 0, NULL);
Azure.IoT Build 0:6ae2f7bca550 672 }
Azure.IoT Build 0:6ae2f7bca550 673
Azure.IoT Build 0:6ae2f7bca550 674 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 675 }
Azure.IoT Build 0:6ae2f7bca550 676 else
Azure.IoT Build 0:6ae2f7bca550 677 {
Azure.IoT Build 0:6ae2f7bca550 678 result = 0;
Azure.IoT Build 0:6ae2f7bca550 679 }
Azure.IoT Build 0:6ae2f7bca550 680 }
Azure.IoT Build 0:6ae2f7bca550 681 }
Azure.IoT Build 0:6ae2f7bca550 682 }
Azure.IoT Build 0:6ae2f7bca550 683 }
Azure.IoT Build 0:6ae2f7bca550 684
Azure.IoT Build 0:6ae2f7bca550 685 amqpvalue_destroy(application_properties);
Azure.IoT Build 0:6ae2f7bca550 686 }
Azure.IoT Build 0:6ae2f7bca550 687 }
Azure.IoT Build 0:6ae2f7bca550 688
Azure.IoT Build 0:6ae2f7bca550 689 return result;
Azure.IoT Build 0:6ae2f7bca550 690 }