Mark Radbourne / Mbed 2 deprecated iothub_client_sample_amqp

Dependencies:   EthernetInterface NTPClient iothub_amqp_transport iothub_client mbed-rtos mbed

Fork of iothub_client_sample_amqp by Azure IoT

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers message_receiver.c Source File

message_receiver.c

00001 // Copyright (c) Microsoft. All rights reserved.
00002 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
00003 
00004 #include <stdlib.h>
00005 #ifdef _CRTDBG_MAP_ALLOC
00006 #include <crtdbg.h>
00007 #endif
00008 #include "azure_uamqp_c/message_receiver.h"
00009 #include "azure_uamqp_c/amqpalloc.h"
00010 #include "azure_uamqp_c/amqpvalue.h"
00011 #include "azure_uamqp_c/amqp_definitions.h"
00012 
00013 typedef struct MESSAGE_RECEIVER_INSTANCE_TAG
00014 {
00015     LINK_HANDLE link;
00016     ON_MESSAGE_RECEIVED on_message_received;
00017     ON_MESSAGE_RECEIVER_STATE_CHANGED on_message_receiver_state_changed;
00018     MESSAGE_RECEIVER_STATE message_receiver_state;
00019     const void* on_message_receiver_state_changed_context;
00020     const void* callback_context;
00021     MESSAGE_HANDLE decoded_message;
00022     bool decode_error;
00023 } MESSAGE_RECEIVER_INSTANCE;
00024 
00025 static void set_message_receiver_state(MESSAGE_RECEIVER_INSTANCE* message_receiver_instance, MESSAGE_RECEIVER_STATE new_state)
00026 {
00027     MESSAGE_RECEIVER_STATE previous_state = message_receiver_instance->message_receiver_state;
00028     message_receiver_instance->message_receiver_state = new_state;
00029     if (message_receiver_instance->on_message_receiver_state_changed != NULL)
00030     {
00031         message_receiver_instance->on_message_receiver_state_changed(message_receiver_instance->on_message_receiver_state_changed_context, new_state, previous_state);
00032     }
00033 }
00034 
00035 static void decode_message_value_callback(void* context, AMQP_VALUE decoded_value)
00036 {
00037     MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)context;
00038     MESSAGE_HANDLE decoded_message = message_receiver_instance->decoded_message;
00039     AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(decoded_value);
00040 
00041     if (is_application_properties_type_by_descriptor(descriptor))
00042     {
00043         if (message_set_application_properties(decoded_message, decoded_value) != 0)
00044         {
00045             message_receiver_instance->decode_error = true;
00046         }
00047     }
00048     else if (is_properties_type_by_descriptor(descriptor))
00049     {
00050         PROPERTIES_HANDLE properties;
00051         if (amqpvalue_get_properties(decoded_value, &properties) != 0)
00052         {
00053             message_receiver_instance->decode_error = true;
00054         }
00055         else
00056         {
00057             if (message_set_properties(decoded_message, properties) != 0)
00058             {
00059                 message_receiver_instance->decode_error = true;
00060             }
00061 
00062             properties_destroy(properties);
00063         }
00064     }
00065     else if (is_delivery_annotations_type_by_descriptor(descriptor))
00066     {
00067         annotations delivery_annotations = amqpvalue_get_inplace_described_value(decoded_value);
00068         if ((delivery_annotations == NULL) ||
00069             (message_set_delivery_annotations(decoded_message, delivery_annotations) != 0))
00070         {
00071             message_receiver_instance->decode_error = true;
00072         }
00073     }
00074     else if (is_message_annotations_type_by_descriptor(descriptor))
00075     {
00076         annotations message_annotations = amqpvalue_get_inplace_described_value(decoded_value);
00077         if ((message_annotations == NULL) ||
00078             (message_set_message_annotations(decoded_message, message_annotations) != 0))
00079         {
00080             message_receiver_instance->decode_error = true;
00081         }
00082     }
00083     else if (is_header_type_by_descriptor(descriptor))
00084     {
00085         HEADER_HANDLE header;
00086         if (amqpvalue_get_header(decoded_value, &header) != 0)
00087         {
00088             message_receiver_instance->decode_error = true;
00089         }
00090         else
00091         {
00092             if (message_set_header(decoded_message, header) != 0)
00093             {
00094                 message_receiver_instance->decode_error = true;
00095             }
00096 
00097             header_destroy(header);
00098         }
00099     }
00100     else if (is_footer_type_by_descriptor(descriptor))
00101     {
00102         annotations footer = amqpvalue_get_inplace_described_value(decoded_value);
00103         if ((footer == NULL) ||
00104             (message_set_footer(decoded_message, footer) != 0))
00105         {
00106             message_receiver_instance->decode_error = true;
00107         }
00108     }
00109     else if (is_amqp_value_type_by_descriptor(descriptor))
00110     {
00111         MESSAGE_BODY_TYPE body_type;
00112         message_get_body_type(decoded_message, &body_type);
00113         if (body_type != MESSAGE_BODY_TYPE_NONE)
00114         {
00115             message_receiver_instance->decode_error = true;
00116         }
00117         else
00118         {
00119             AMQP_VALUE body_amqp_value = amqpvalue_get_inplace_described_value(decoded_value);
00120             if ((body_amqp_value == NULL) ||
00121                 (message_set_body_amqp_value(decoded_message, body_amqp_value) != 0))
00122             {
00123                 message_receiver_instance->decode_error = true;
00124             }
00125         }
00126     }
00127     else if (is_data_type_by_descriptor(descriptor))
00128     {
00129         MESSAGE_BODY_TYPE body_type;
00130         message_get_body_type(decoded_message, &body_type);
00131         if ((body_type != MESSAGE_BODY_TYPE_NONE) &&
00132             (body_type != MESSAGE_BODY_TYPE_DATA))
00133         {
00134             message_receiver_instance->decode_error = true;
00135         }
00136         else
00137         {
00138             AMQP_VALUE body_data_value = amqpvalue_get_inplace_described_value(decoded_value);
00139             data data_value;
00140 
00141             if ((body_data_value == NULL) ||
00142                 (amqpvalue_get_data(body_data_value, &data_value) != 0))
00143             {
00144                 message_receiver_instance->decode_error = true;
00145             }
00146             else
00147             {
00148                 BINARY_DATA binary_data;
00149                 binary_data.bytes = data_value.bytes;
00150                 binary_data.length = data_value.length;
00151                 if (message_add_body_amqp_data(decoded_message, binary_data) != 0)
00152                 {
00153                     message_receiver_instance->decode_error = true;
00154                 }
00155             }
00156         }
00157     }
00158 }
00159 
00160 static AMQP_VALUE on_transfer_received(void* context, TRANSFER_HANDLE transfer, uint32_t payload_size, const unsigned char* payload_bytes)
00161 {
00162     AMQP_VALUE result = NULL;
00163 
00164     MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)context;
00165     (void)transfer;
00166     if (message_receiver_instance->on_message_received != NULL)
00167     {
00168         MESSAGE_HANDLE message = message_create();
00169         if (message == NULL)
00170         {
00171             set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR);
00172         }
00173         else
00174         {
00175             message_receiver_instance->decoded_message;
00176             AMQPVALUE_DECODER_HANDLE amqpvalue_decoder = amqpvalue_decoder_create(decode_message_value_callback, message_receiver_instance);
00177             if (amqpvalue_decoder == NULL)
00178             {
00179                 set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR);
00180             }
00181             else
00182             {
00183                 message_receiver_instance->decoded_message = message;
00184                 message_receiver_instance->decode_error = false;
00185                 if (amqpvalue_decode_bytes(amqpvalue_decoder, payload_bytes, payload_size) != 0)
00186                 {
00187                     set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR);
00188                 }
00189                 else
00190                 {
00191                     if (message_receiver_instance->decode_error)
00192                     {
00193                         set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR);
00194                     }
00195                     else
00196                     {
00197                         result = message_receiver_instance->on_message_received(message_receiver_instance->callback_context, message);
00198                     }
00199                 }
00200 
00201                 amqpvalue_decoder_destroy(amqpvalue_decoder);
00202             }
00203 
00204             message_destroy(message);
00205         }
00206     }
00207 
00208     return result;
00209 }
00210 
00211 static void on_link_state_changed(void* context, LINK_STATE new_link_state, LINK_STATE previous_link_state)
00212 {
00213     MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)context;
00214     (void)previous_link_state;
00215 
00216     switch (new_link_state)
00217     {
00218     case LINK_STATE_ATTACHED:
00219         if (message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_OPENING)
00220         {
00221             set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_OPEN);
00222         }
00223         break;
00224     case LINK_STATE_DETACHED:
00225         if ((message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_OPEN) ||
00226             (message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_CLOSING))
00227         {
00228             /* User initiated transition, we should be good */
00229             set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_IDLE);
00230         }
00231         else if (message_receiver_instance->message_receiver_state != MESSAGE_RECEIVER_STATE_IDLE)
00232         {
00233             /* Any other transition must be an error */
00234             set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR);
00235         }
00236         break;
00237     case LINK_STATE_ERROR:
00238         if (message_receiver_instance->message_receiver_state != MESSAGE_RECEIVER_STATE_ERROR)
00239         {
00240             set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR);
00241         }
00242         break;
00243     }
00244 }
00245 
00246 MESSAGE_RECEIVER_HANDLE messagereceiver_create(LINK_HANDLE link, ON_MESSAGE_RECEIVER_STATE_CHANGED on_message_receiver_state_changed, void* context)
00247 {
00248     MESSAGE_RECEIVER_INSTANCE* result = (MESSAGE_RECEIVER_INSTANCE*)amqpalloc_malloc(sizeof(MESSAGE_RECEIVER_INSTANCE));
00249     if (result != NULL)
00250     {
00251         result->link = link;
00252         result->on_message_receiver_state_changed = on_message_receiver_state_changed;
00253         result->on_message_receiver_state_changed_context = context;
00254         result->message_receiver_state = MESSAGE_RECEIVER_STATE_IDLE;
00255     }
00256 
00257     return result;
00258 }
00259 
00260 void messagereceiver_destroy(MESSAGE_RECEIVER_HANDLE message_receiver)
00261 {
00262     if (message_receiver != NULL)
00263     {
00264         (void)messagereceiver_close(message_receiver);
00265         amqpalloc_free(message_receiver);
00266     }
00267 }
00268 
00269 int messagereceiver_open(MESSAGE_RECEIVER_HANDLE message_receiver, ON_MESSAGE_RECEIVED on_message_received, const void* callback_context)
00270 {
00271     int result;
00272 
00273     if (message_receiver == NULL)
00274     {
00275         result = __LINE__;
00276     }
00277     else
00278     {
00279         MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)message_receiver;
00280 
00281         if (message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_IDLE)
00282         {
00283             set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_OPENING);
00284             if (link_attach(message_receiver_instance->link, on_transfer_received, on_link_state_changed, NULL, message_receiver_instance) != 0)
00285             {
00286                 result = __LINE__;
00287                 set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR);
00288             }
00289             else
00290             {
00291                 message_receiver_instance->on_message_received = on_message_received;
00292                 message_receiver_instance->callback_context = callback_context;
00293 
00294                 result = 0;
00295             }
00296         }
00297         else
00298         {
00299             result = 0;
00300         }
00301     }
00302 
00303     return result;
00304 }
00305 
00306 int messagereceiver_close(MESSAGE_RECEIVER_HANDLE message_receiver)
00307 {
00308     int result;
00309 
00310     if (message_receiver == NULL)
00311     {
00312         result = __LINE__;
00313     }
00314     else
00315     {
00316         MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)message_receiver;
00317 
00318         if ((message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_OPENING) ||
00319             (message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_OPEN))
00320         {
00321             set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_CLOSING);
00322 
00323             if (link_detach(message_receiver_instance->link, true) != 0)
00324             {
00325                 result = __LINE__;
00326                 set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR);
00327             }
00328             else
00329             {
00330                 result = 0;
00331             }
00332         }
00333         else
00334         {
00335             result = 0;
00336         }
00337     }
00338 
00339     return result;
00340 }