Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependencies: EthernetInterface NTPClient iothub_amqp_transport iothub_client mbed-rtos mbed
Fork of iothub_client_sample_amqp by
message_receiver.c
00001 // Copyright (c) Microsoft. All rights reserved. 00002 // Licensed under the MIT license. See LICENSE file in the project root for full license information. 00003 00004 #include <stdlib.h> 00005 #ifdef _CRTDBG_MAP_ALLOC 00006 #include <crtdbg.h> 00007 #endif 00008 #include "azure_uamqp_c/message_receiver.h" 00009 #include "azure_uamqp_c/amqpalloc.h" 00010 #include "azure_uamqp_c/amqpvalue.h" 00011 #include "azure_uamqp_c/amqp_definitions.h" 00012 00013 typedef struct MESSAGE_RECEIVER_INSTANCE_TAG 00014 { 00015 LINK_HANDLE link; 00016 ON_MESSAGE_RECEIVED on_message_received; 00017 ON_MESSAGE_RECEIVER_STATE_CHANGED on_message_receiver_state_changed; 00018 MESSAGE_RECEIVER_STATE message_receiver_state; 00019 const void* on_message_receiver_state_changed_context; 00020 const void* callback_context; 00021 MESSAGE_HANDLE decoded_message; 00022 bool decode_error; 00023 } MESSAGE_RECEIVER_INSTANCE; 00024 00025 static void set_message_receiver_state(MESSAGE_RECEIVER_INSTANCE* message_receiver_instance, MESSAGE_RECEIVER_STATE new_state) 00026 { 00027 MESSAGE_RECEIVER_STATE previous_state = message_receiver_instance->message_receiver_state; 00028 message_receiver_instance->message_receiver_state = new_state; 00029 if (message_receiver_instance->on_message_receiver_state_changed != NULL) 00030 { 00031 message_receiver_instance->on_message_receiver_state_changed(message_receiver_instance->on_message_receiver_state_changed_context, new_state, previous_state); 00032 } 00033 } 00034 00035 static void decode_message_value_callback(void* context, AMQP_VALUE decoded_value) 00036 { 00037 MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)context; 00038 MESSAGE_HANDLE decoded_message = message_receiver_instance->decoded_message; 00039 AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(decoded_value); 00040 00041 if (is_application_properties_type_by_descriptor(descriptor)) 00042 { 00043 if (message_set_application_properties(decoded_message, decoded_value) != 0) 00044 { 00045 message_receiver_instance->decode_error = true; 00046 } 00047 } 00048 else if (is_properties_type_by_descriptor(descriptor)) 00049 { 00050 PROPERTIES_HANDLE properties; 00051 if (amqpvalue_get_properties(decoded_value, &properties) != 0) 00052 { 00053 message_receiver_instance->decode_error = true; 00054 } 00055 else 00056 { 00057 if (message_set_properties(decoded_message, properties) != 0) 00058 { 00059 message_receiver_instance->decode_error = true; 00060 } 00061 00062 properties_destroy(properties); 00063 } 00064 } 00065 else if (is_delivery_annotations_type_by_descriptor(descriptor)) 00066 { 00067 annotations delivery_annotations = amqpvalue_get_inplace_described_value(decoded_value); 00068 if ((delivery_annotations == NULL) || 00069 (message_set_delivery_annotations(decoded_message, delivery_annotations) != 0)) 00070 { 00071 message_receiver_instance->decode_error = true; 00072 } 00073 } 00074 else if (is_message_annotations_type_by_descriptor(descriptor)) 00075 { 00076 annotations message_annotations = amqpvalue_get_inplace_described_value(decoded_value); 00077 if ((message_annotations == NULL) || 00078 (message_set_message_annotations(decoded_message, message_annotations) != 0)) 00079 { 00080 message_receiver_instance->decode_error = true; 00081 } 00082 } 00083 else if (is_header_type_by_descriptor(descriptor)) 00084 { 00085 HEADER_HANDLE header; 00086 if (amqpvalue_get_header(decoded_value, &header) != 0) 00087 { 00088 message_receiver_instance->decode_error = true; 00089 } 00090 else 00091 { 00092 if (message_set_header(decoded_message, header) != 0) 00093 { 00094 message_receiver_instance->decode_error = true; 00095 } 00096 00097 header_destroy(header); 00098 } 00099 } 00100 else if (is_footer_type_by_descriptor(descriptor)) 00101 { 00102 annotations footer = amqpvalue_get_inplace_described_value(decoded_value); 00103 if ((footer == NULL) || 00104 (message_set_footer(decoded_message, footer) != 0)) 00105 { 00106 message_receiver_instance->decode_error = true; 00107 } 00108 } 00109 else if (is_amqp_value_type_by_descriptor(descriptor)) 00110 { 00111 MESSAGE_BODY_TYPE body_type; 00112 message_get_body_type(decoded_message, &body_type); 00113 if (body_type != MESSAGE_BODY_TYPE_NONE) 00114 { 00115 message_receiver_instance->decode_error = true; 00116 } 00117 else 00118 { 00119 AMQP_VALUE body_amqp_value = amqpvalue_get_inplace_described_value(decoded_value); 00120 if ((body_amqp_value == NULL) || 00121 (message_set_body_amqp_value(decoded_message, body_amqp_value) != 0)) 00122 { 00123 message_receiver_instance->decode_error = true; 00124 } 00125 } 00126 } 00127 else if (is_data_type_by_descriptor(descriptor)) 00128 { 00129 MESSAGE_BODY_TYPE body_type; 00130 message_get_body_type(decoded_message, &body_type); 00131 if ((body_type != MESSAGE_BODY_TYPE_NONE) && 00132 (body_type != MESSAGE_BODY_TYPE_DATA)) 00133 { 00134 message_receiver_instance->decode_error = true; 00135 } 00136 else 00137 { 00138 AMQP_VALUE body_data_value = amqpvalue_get_inplace_described_value(decoded_value); 00139 data data_value; 00140 00141 if ((body_data_value == NULL) || 00142 (amqpvalue_get_data(body_data_value, &data_value) != 0)) 00143 { 00144 message_receiver_instance->decode_error = true; 00145 } 00146 else 00147 { 00148 BINARY_DATA binary_data; 00149 binary_data.bytes = data_value.bytes; 00150 binary_data.length = data_value.length; 00151 if (message_add_body_amqp_data(decoded_message, binary_data) != 0) 00152 { 00153 message_receiver_instance->decode_error = true; 00154 } 00155 } 00156 } 00157 } 00158 } 00159 00160 static AMQP_VALUE on_transfer_received(void* context, TRANSFER_HANDLE transfer, uint32_t payload_size, const unsigned char* payload_bytes) 00161 { 00162 AMQP_VALUE result = NULL; 00163 00164 MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)context; 00165 (void)transfer; 00166 if (message_receiver_instance->on_message_received != NULL) 00167 { 00168 MESSAGE_HANDLE message = message_create(); 00169 if (message == NULL) 00170 { 00171 set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR); 00172 } 00173 else 00174 { 00175 message_receiver_instance->decoded_message; 00176 AMQPVALUE_DECODER_HANDLE amqpvalue_decoder = amqpvalue_decoder_create(decode_message_value_callback, message_receiver_instance); 00177 if (amqpvalue_decoder == NULL) 00178 { 00179 set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR); 00180 } 00181 else 00182 { 00183 message_receiver_instance->decoded_message = message; 00184 message_receiver_instance->decode_error = false; 00185 if (amqpvalue_decode_bytes(amqpvalue_decoder, payload_bytes, payload_size) != 0) 00186 { 00187 set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR); 00188 } 00189 else 00190 { 00191 if (message_receiver_instance->decode_error) 00192 { 00193 set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR); 00194 } 00195 else 00196 { 00197 result = message_receiver_instance->on_message_received(message_receiver_instance->callback_context, message); 00198 } 00199 } 00200 00201 amqpvalue_decoder_destroy(amqpvalue_decoder); 00202 } 00203 00204 message_destroy(message); 00205 } 00206 } 00207 00208 return result; 00209 } 00210 00211 static void on_link_state_changed(void* context, LINK_STATE new_link_state, LINK_STATE previous_link_state) 00212 { 00213 MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)context; 00214 (void)previous_link_state; 00215 00216 switch (new_link_state) 00217 { 00218 case LINK_STATE_ATTACHED: 00219 if (message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_OPENING) 00220 { 00221 set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_OPEN); 00222 } 00223 break; 00224 case LINK_STATE_DETACHED: 00225 if ((message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_OPEN) || 00226 (message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_CLOSING)) 00227 { 00228 /* User initiated transition, we should be good */ 00229 set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_IDLE); 00230 } 00231 else if (message_receiver_instance->message_receiver_state != MESSAGE_RECEIVER_STATE_IDLE) 00232 { 00233 /* Any other transition must be an error */ 00234 set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR); 00235 } 00236 break; 00237 case LINK_STATE_ERROR: 00238 if (message_receiver_instance->message_receiver_state != MESSAGE_RECEIVER_STATE_ERROR) 00239 { 00240 set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR); 00241 } 00242 break; 00243 } 00244 } 00245 00246 MESSAGE_RECEIVER_HANDLE messagereceiver_create(LINK_HANDLE link, ON_MESSAGE_RECEIVER_STATE_CHANGED on_message_receiver_state_changed, void* context) 00247 { 00248 MESSAGE_RECEIVER_INSTANCE* result = (MESSAGE_RECEIVER_INSTANCE*)amqpalloc_malloc(sizeof(MESSAGE_RECEIVER_INSTANCE)); 00249 if (result != NULL) 00250 { 00251 result->link = link; 00252 result->on_message_receiver_state_changed = on_message_receiver_state_changed; 00253 result->on_message_receiver_state_changed_context = context; 00254 result->message_receiver_state = MESSAGE_RECEIVER_STATE_IDLE; 00255 } 00256 00257 return result; 00258 } 00259 00260 void messagereceiver_destroy(MESSAGE_RECEIVER_HANDLE message_receiver) 00261 { 00262 if (message_receiver != NULL) 00263 { 00264 (void)messagereceiver_close(message_receiver); 00265 amqpalloc_free(message_receiver); 00266 } 00267 } 00268 00269 int messagereceiver_open(MESSAGE_RECEIVER_HANDLE message_receiver, ON_MESSAGE_RECEIVED on_message_received, const void* callback_context) 00270 { 00271 int result; 00272 00273 if (message_receiver == NULL) 00274 { 00275 result = __LINE__; 00276 } 00277 else 00278 { 00279 MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)message_receiver; 00280 00281 if (message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_IDLE) 00282 { 00283 set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_OPENING); 00284 if (link_attach(message_receiver_instance->link, on_transfer_received, on_link_state_changed, NULL, message_receiver_instance) != 0) 00285 { 00286 result = __LINE__; 00287 set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR); 00288 } 00289 else 00290 { 00291 message_receiver_instance->on_message_received = on_message_received; 00292 message_receiver_instance->callback_context = callback_context; 00293 00294 result = 0; 00295 } 00296 } 00297 else 00298 { 00299 result = 0; 00300 } 00301 } 00302 00303 return result; 00304 } 00305 00306 int messagereceiver_close(MESSAGE_RECEIVER_HANDLE message_receiver) 00307 { 00308 int result; 00309 00310 if (message_receiver == NULL) 00311 { 00312 result = __LINE__; 00313 } 00314 else 00315 { 00316 MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)message_receiver; 00317 00318 if ((message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_OPENING) || 00319 (message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_OPEN)) 00320 { 00321 set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_CLOSING); 00322 00323 if (link_detach(message_receiver_instance->link, true) != 0) 00324 { 00325 result = __LINE__; 00326 set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR); 00327 } 00328 else 00329 { 00330 result = 0; 00331 } 00332 } 00333 else 00334 { 00335 result = 0; 00336 } 00337 } 00338 00339 return result; 00340 }
Generated on Tue Jul 12 2022 12:43:21 by
