Mark Radbourne / Mbed 2 deprecated iothub_client_sample_amqp

Dependencies:   EthernetInterface NTPClient iothub_amqp_transport iothub_client mbed-rtos mbed

Fork of iothub_client_sample_amqp by Azure IoT

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers amqp_frame_codec.c Source File

amqp_frame_codec.c

00001 // Copyright (c) Microsoft. All rights reserved.
00002 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
00003 
00004 #include <stdlib.h>
00005 #ifdef _CRTDBG_MAP_ALLOC
00006 #include <crtdbg.h>
00007 #endif
00008 #include <stdint.h>
00009 #include <stddef.h>
00010 #include <string.h>
00011 #include "azure_uamqp_c/amqp_frame_codec.h"
00012 #include "azure_uamqp_c/frame_codec.h"
00013 #include "azure_uamqp_c/amqpalloc.h"
00014 #include "azure_uamqp_c/amqpvalue.h"
00015 
00016 typedef enum AMQP_FRAME_DECODE_STATE_TAG
00017 {
00018     AMQP_FRAME_DECODE_FRAME,
00019     AMQP_FRAME_DECODE_ERROR
00020 } AMQP_FRAME_DECODE_STATE;
00021 
00022 typedef struct AMQP_FRAME_CODEC_INSTANCE_TAG
00023 {
00024     FRAME_CODEC_HANDLE frame_codec;
00025 
00026     /* decode */
00027     AMQP_FRAME_RECEIVED_CALLBACK frame_received_callback;
00028     AMQP_EMPTY_FRAME_RECEIVED_CALLBACK empty_frame_received_callback;
00029     AMQP_FRAME_CODEC_ERROR_CALLBACK error_callback;
00030     void* callback_context;
00031     AMQPVALUE_DECODER_HANDLE decoder;
00032     AMQP_FRAME_DECODE_STATE decode_state;
00033     AMQP_VALUE decoded_performative;
00034 } AMQP_FRAME_CODEC_INSTANCE;
00035 
00036 static void amqp_value_decoded(void* context, AMQP_VALUE decoded_value)
00037 {
00038     AMQP_FRAME_CODEC_INSTANCE* amqp_frame_codec_instance = (AMQP_FRAME_CODEC_INSTANCE*)context;
00039     uint64_t performative_descriptor_ulong;
00040     AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(decoded_value);
00041 
00042     /* Codes_SRS_AMQP_FRAME_CODEC_01_060: [If any error occurs while decoding a frame, the decoder shall switch to an error state where decoding shall not be possible anymore.] */
00043     if ((descriptor == NULL) ||
00044         (amqpvalue_get_ulong(descriptor, &performative_descriptor_ulong) != 0) ||
00045         /* Codes_SRS_AMQP_FRAME_CODEC_01_003: [The performative MUST be one of those defined in section 2.7 and is encoded as a described type in the AMQP type system.] */
00046         (performative_descriptor_ulong < AMQP_OPEN) ||
00047         (performative_descriptor_ulong > AMQP_CLOSE))
00048     {
00049         /* Codes_SRS_AMQP_FRAME_CODEC_01_060: [If any error occurs while decoding a frame, the decoder shall switch to an error state where decoding shall not be possible anymore.] */
00050         amqp_frame_codec_instance->decode_state = AMQP_FRAME_DECODE_ERROR;
00051     }
00052     else
00053     {
00054         amqp_frame_codec_instance->decoded_performative = decoded_value;
00055     }
00056 }
00057 
00058 static void frame_received(void* context, const unsigned char* type_specific, uint32_t type_specific_size, const unsigned char* frame_body, uint32_t frame_body_size)
00059 {
00060     AMQP_FRAME_CODEC_INSTANCE* amqp_frame_codec_instance = (AMQP_FRAME_CODEC_INSTANCE*)context;
00061     uint16_t channel;
00062 
00063     switch (amqp_frame_codec_instance->decode_state)
00064     {
00065     default:
00066     /* Codes_SRS_AMQP_FRAME_CODEC_01_050: [All subsequent decoding shall fail and no AMQP frames shall be indicated from that point on to the consumers of amqp_frame_codec.] */
00067     case AMQP_FRAME_DECODE_ERROR:
00068         break;
00069 
00070     case AMQP_FRAME_DECODE_FRAME:
00071         /* Codes_SRS_AMQP_FRAME_CODEC_01_049: [If not enough type specific bytes are received to decode the channel number, the decoding shall stop with an error.] */
00072         if (type_specific_size < 2)
00073         {
00074             amqp_frame_codec_instance->decode_state = AMQP_FRAME_DECODE_ERROR;
00075 
00076             /* Codes_SRS_AMQP_FRAME_CODEC_01_069: [If any error occurs while decoding a frame, the decoder shall indicate the error by calling the amqp_frame_codec_error_callback  and passing to it the callback context argument that was given in amqp_frame_codec_create.] */
00077             amqp_frame_codec_instance->error_callback(amqp_frame_codec_instance->callback_context);
00078         }
00079         else
00080         {
00081             /* Codes_SRS_AMQP_FRAME_CODEC_01_001: [Bytes 6 and 7 of an AMQP frame contain the channel number ] */
00082             channel = ((uint16_t)type_specific[0]) << 8;
00083             channel += type_specific[1];
00084 
00085             if (frame_body_size == 0)
00086             {
00087                 /* Codes_SRS_AMQP_FRAME_CODEC_01_048: [When a frame header is received from frame_codec and the frame payload size is 0, empty_frame_received_callback shall be invoked, while passing the channel number as argument.] */
00088                 /* Codes_SRS_AMQP_FRAME_CODEC_01_007: [An AMQP frame with no body MAY be used to generate artificial traffic as needed to satisfy any negotiated idle timeout interval ] */
00089                 amqp_frame_codec_instance->empty_frame_received_callback(amqp_frame_codec_instance->callback_context, channel);
00090             }
00091             else
00092             {
00093                 /* Codes_SRS_AMQP_FRAME_CODEC_01_051: [If the frame payload is greater than 0, amqp_frame_codec shall decode the performative as a described AMQP type.] */
00094                 /* Codes_SRS_AMQP_FRAME_CODEC_01_002: [The frame body is defined as a performative followed by an opaque payload.] */
00095                 amqp_frame_codec_instance->decoded_performative = NULL;
00096 
00097                 while ((frame_body_size > 0) &&
00098                        (amqp_frame_codec_instance->decoded_performative == NULL) &&
00099                        (amqp_frame_codec_instance->decode_state != AMQP_FRAME_DECODE_ERROR))
00100                 {
00101                     /* Codes_SRS_AMQP_FRAME_CODEC_01_052: [Decoding the performative shall be done by feeding the bytes to the decoder create in amqp_frame_codec_create.] */
00102                     if (amqpvalue_decode_bytes(amqp_frame_codec_instance->decoder, frame_body, 1) != 0)
00103                     {
00104                         /* Codes_SRS_AMQP_FRAME_CODEC_01_060: [If any error occurs while decoding a frame, the decoder shall switch to an error state where decoding shall not be possible anymore.] */
00105                         amqp_frame_codec_instance->decode_state = AMQP_FRAME_DECODE_ERROR;
00106                     }
00107                     else
00108                     {
00109                         frame_body_size--;
00110                         frame_body++;
00111                     }
00112                 }
00113 
00114                 if (amqp_frame_codec_instance->decode_state == AMQP_FRAME_DECODE_ERROR)
00115                 {
00116                     /* Codes_SRS_AMQP_FRAME_CODEC_01_069: [If any error occurs while decoding a frame, the decoder shall indicate the error by calling the amqp_frame_codec_error_callback  and passing to it the callback context argument that was given in amqp_frame_codec_create.] */
00117                     amqp_frame_codec_instance->error_callback(amqp_frame_codec_instance->callback_context);
00118                 }
00119                 else
00120                 {
00121                     /* Codes_SRS_AMQP_FRAME_CODEC_01_004: [The remaining bytes in the frame body form the payload for that frame.] */
00122                     /* Codes_SRS_AMQP_FRAME_CODEC_01_067: [When the performative is decoded, the rest of the frame_bytes shall not be given to the AMQP decoder, but they shall be buffered so that later they are given to the frame_received callback.] */
00123                     /* Codes_SRS_AMQP_FRAME_CODEC_01_054: [Once the performative is decoded and all frame payload bytes are received, the callback frame_received_callback shall be called.] */
00124                     /* Codes_SRS_AMQP_FRAME_CODEC_01_068: [A pointer to all the payload bytes shall also be passed to frame_received_callback.] */
00125                     amqp_frame_codec_instance->frame_received_callback(amqp_frame_codec_instance->callback_context, channel, amqp_frame_codec_instance->decoded_performative, frame_body, frame_body_size);
00126                 }
00127             }
00128         }
00129         break;
00130     }
00131 }
00132 
00133 static int encode_bytes(void* context, const unsigned char* bytes, size_t length)
00134 {
00135     PAYLOAD* payload = (PAYLOAD*)context;
00136     (void)memcpy((unsigned char*)payload->bytes + payload->length, bytes, length);
00137     payload->length += length;
00138     return 0;
00139 }
00140 
00141 /* Codes_SRS_AMQP_FRAME_CODEC_01_011: [amqp_frame_codec_create shall create an instance of an amqp_frame_codec and return a non-NULL handle to it.] */
00142 AMQP_FRAME_CODEC_HANDLE amqp_frame_codec_create(FRAME_CODEC_HANDLE frame_codec, AMQP_FRAME_RECEIVED_CALLBACK frame_received_callback,
00143     AMQP_EMPTY_FRAME_RECEIVED_CALLBACK empty_frame_received_callback, AMQP_FRAME_CODEC_ERROR_CALLBACK amqp_frame_codec_error_callback, void* callback_context)
00144 {
00145     AMQP_FRAME_CODEC_INSTANCE* result;
00146 
00147     /* Codes_SRS_AMQP_FRAME_CODEC_01_012: [If any of the arguments frame_codec, frame_received_callback, amqp_frame_codec_error_callback or empty_frame_received_callback is NULL, amqp_frame_codec_create shall return NULL.] */
00148     if ((frame_codec == NULL) ||
00149         (frame_received_callback == NULL) ||
00150         (empty_frame_received_callback == NULL) ||
00151         (amqp_frame_codec_error_callback == NULL))
00152     {
00153         result = NULL;
00154     }
00155     else
00156     {
00157         result = (AMQP_FRAME_CODEC_INSTANCE*)amqpalloc_malloc(sizeof(AMQP_FRAME_CODEC_INSTANCE));
00158         /* Codes_SRS_AMQP_FRAME_CODEC_01_020: [If allocating memory for the new amqp_frame_codec fails, then amqp_frame_codec_create shall fail and return NULL.] */
00159         if (result != NULL)
00160         {
00161             result->frame_codec = frame_codec;
00162             result->frame_received_callback = frame_received_callback;
00163             result->empty_frame_received_callback = empty_frame_received_callback;
00164             result->error_callback = amqp_frame_codec_error_callback;
00165             result->callback_context = callback_context;
00166             result->decode_state = AMQP_FRAME_DECODE_FRAME;
00167 
00168             /* Codes_SRS_AMQP_FRAME_CODEC_01_018: [amqp_frame_codec_create shall create a decoder to be used for decoding AMQP values.] */
00169             result->decoder = amqpvalue_decoder_create(amqp_value_decoded, result);
00170             if (result->decoder == NULL)
00171             {
00172                 /* Codes_SRS_AMQP_FRAME_CODEC_01_019: [If creating the decoder fails, amqp_frame_codec_create shall fail and return NULL.] */
00173                 amqpalloc_free(result);
00174                 result = NULL;
00175             }
00176             else
00177             {
00178                 /* Codes_SRS_AMQP_FRAME_CODEC_01_013: [amqp_frame_codec_create shall subscribe for AMQP frames with the given frame_codec.] */
00179                 if (frame_codec_subscribe(frame_codec, FRAME_TYPE_AMQP, frame_received, result) != 0)
00180                 {
00181                     /* Codes_SRS_AMQP_FRAME_CODEC_01_014: [If subscribing for AMQP frames fails, amqp_frame_codec_create shall fail and return NULL.] */
00182                     amqpvalue_decoder_destroy(result->decoder);
00183                     amqpalloc_free(result);
00184                     result = NULL;
00185                 }
00186             }
00187         }
00188     }
00189 
00190     return result;
00191 }
00192 
00193 void amqp_frame_codec_destroy(AMQP_FRAME_CODEC_HANDLE amqp_frame_codec)
00194 {
00195     if (amqp_frame_codec != NULL)
00196     {
00197         /* Codes_SRS_AMQP_FRAME_CODEC_01_017: [amqp_frame_codec_destroy shall unsubscribe from receiving AMQP frames from the frame_codec that was passed to amqp_frame_codec_create.] */
00198         (void)frame_codec_unsubscribe(amqp_frame_codec->frame_codec, FRAME_TYPE_AMQP);
00199 
00200         /* Codes_SRS_AMQP_FRAME_CODEC_01_021: [The decoder created in amqp_frame_codec_create shall be destroyed by amqp_frame_codec_destroy.] */
00201         amqpvalue_decoder_destroy(amqp_frame_codec->decoder);
00202 
00203         /* Codes_SRS_AMQP_FRAME_CODEC_01_015: [amqp_frame_codec_destroy shall free all resources associated with the amqp_frame_codec instance.] */
00204         amqpalloc_free(amqp_frame_codec);
00205     }
00206 }
00207 
00208 int amqp_frame_codec_encode_frame(AMQP_FRAME_CODEC_HANDLE amqp_frame_codec, uint16_t channel, const AMQP_VALUE performative, const PAYLOAD* payloads, size_t payload_count, ON_BYTES_ENCODED on_bytes_encoded, void* callback_context)
00209 {
00210     int result;
00211 
00212     /* Codes_SRS_AMQP_FRAME_CODEC_01_024: [If frame_codec, performative or on_bytes_encoded is NULL, amqp_frame_codec_encode_frame shall fail and return a non-zero value.] */
00213     if ((amqp_frame_codec == NULL) ||
00214         (performative == NULL) ||
00215         (on_bytes_encoded == NULL))
00216     {
00217         result = __LINE__;
00218     }
00219     else
00220     {
00221         AMQP_VALUE descriptor;
00222         uint64_t performative_ulong;
00223         size_t encoded_size;
00224 
00225         if (((descriptor = amqpvalue_get_inplace_descriptor(performative)) == NULL) ||
00226             (amqpvalue_get_ulong(descriptor, &performative_ulong) != 0) ||
00227             /* Codes_SRS_AMQP_FRAME_CODEC_01_008: [The performative MUST be one of those defined in section 2.7 and is encoded as a described type in the AMQP type system.] */
00228             (performative_ulong < AMQP_OPEN) ||
00229             (performative_ulong > AMQP_CLOSE))
00230         {
00231             /* Codes_SRS_AMQP_FRAME_CODEC_01_029: [If any error occurs during encoding, amqp_frame_codec_encode_frame shall fail and return a non-zero value.] */
00232             result = __LINE__;
00233         }
00234         /* Codes_SRS_AMQP_FRAME_CODEC_01_027: [The encoded size of the performative and its fields shall be obtained by calling amqpvalue_get_encoded_size.] */
00235         else if (amqpvalue_get_encoded_size(performative, &encoded_size) != 0)
00236         {
00237             /* Codes_SRS_AMQP_FRAME_CODEC_01_029: [If any error occurs during encoding, amqp_frame_codec_encode_frame shall fail and return a non-zero value.] */
00238             result = __LINE__;
00239         }
00240         else
00241         {
00242             unsigned char* amqp_performative_bytes = (unsigned char*)amqpalloc_malloc(encoded_size);
00243             if (amqp_performative_bytes == NULL)
00244             {
00245                 result = __LINE__;
00246             }
00247             else
00248             {
00249                 PAYLOAD* new_payloads = (PAYLOAD*)amqpalloc_malloc(sizeof(PAYLOAD) * (payload_count + 1));
00250                 if (new_payloads == NULL)
00251                 {
00252                     result = __LINE__;
00253                 }
00254                 else
00255                 {
00256                     /* Codes_SRS_AMQP_FRAME_CODEC_01_070: [The payloads argument for frame_codec_encode_frame shall be made of the payload for the encoded performative and the payloads passed to amqp_frame_codec_encode_frame.] */
00257                     /* Codes_SRS_AMQP_FRAME_CODEC_01_028: [The encode result for the performative shall be placed in a PAYLOAD structure.] */
00258                     new_payloads[0].bytes = amqp_performative_bytes;
00259                     new_payloads[0].length = 0;
00260 
00261                     if (payload_count > 0)
00262                     {
00263                         (void)memcpy(new_payloads + 1, payloads, sizeof(PAYLOAD) * payload_count);
00264                     }
00265 
00266                     if (amqpvalue_encode(performative, encode_bytes, &new_payloads[0]) != 0)
00267                     {
00268                         result = __LINE__;
00269                     }
00270                     else
00271                     {
00272                         unsigned char channel_bytes[2];
00273 
00274                         channel_bytes[0] = channel >> 8;
00275                         channel_bytes[1] = channel & 0xFF;
00276 
00277                         /* Codes_SRS_AMQP_FRAME_CODEC_01_005: [Bytes 6 and 7 of an AMQP frame contain the channel number ] */
00278                         /* Codes_SRS_AMQP_FRAME_CODEC_01_025: [amqp_frame_codec_encode_frame shall encode the frame header by using frame_codec_encode_frame.] */
00279                         /* Codes_SRS_AMQP_FRAME_CODEC_01_006: [The frame body is defined as a performative followed by an opaque payload.] */
00280                         if (frame_codec_encode_frame(amqp_frame_codec->frame_codec, FRAME_TYPE_AMQP, new_payloads, payload_count + 1, channel_bytes, sizeof(channel_bytes), on_bytes_encoded, callback_context) != 0)
00281                         {
00282                             /* Codes_SRS_AMQP_FRAME_CODEC_01_029: [If any error occurs during encoding, amqp_frame_codec_encode_frame shall fail and return a non-zero value.] */
00283                             result = __LINE__;
00284                         }
00285                         else
00286                         {
00287                             /* Codes_SRS_AMQP_FRAME_CODEC_01_022: [amqp_frame_codec_begin_encode_frame shall encode the frame header and AMQP performative in an AMQP frame and on success it shall return 0.] */
00288                             result = 0;
00289                         }
00290                     }
00291 
00292                     amqpalloc_free(new_payloads);
00293                 }
00294 
00295                 amqpalloc_free(amqp_performative_bytes);
00296             }
00297         }
00298     }
00299 
00300     return result;
00301 }
00302 
00303 /* Codes_SRS_AMQP_FRAME_CODEC_01_042: [amqp_frame_codec_encode_empty_frame shall encode a frame with no payload.] */
00304 /* Codes_SRS_AMQP_FRAME_CODEC_01_010: [An AMQP frame with no body MAY be used to generate artificial traffic as needed to satisfy any negotiated idle timeout interval ] */
00305 int amqp_frame_codec_encode_empty_frame(AMQP_FRAME_CODEC_HANDLE amqp_frame_codec, uint16_t channel, ON_BYTES_ENCODED on_bytes_encoded, void* callback_context)
00306 {
00307     int result;
00308 
00309     /* Codes_SRS_AMQP_FRAME_CODEC_01_045: [If amqp_frame_codec is NULL, amqp_frame_codec_encode_empty_frame shall fail and return a non-zero value.] */
00310     if (amqp_frame_codec == NULL)
00311     {
00312         result = __LINE__;
00313     }
00314     else
00315     {
00316         unsigned char channel_bytes[2];
00317         
00318         channel_bytes[0] = channel >> 8;
00319         channel_bytes[1] = channel & 0xFF;
00320 
00321         /* Codes_SRS_AMQP_FRAME_CODEC_01_044: [amqp_frame_codec_encode_empty_frame shall use frame_codec_encode_frame to encode the frame.] */
00322         if (frame_codec_encode_frame(amqp_frame_codec->frame_codec, FRAME_TYPE_AMQP, NULL, 0, channel_bytes, sizeof(channel_bytes), on_bytes_encoded, callback_context) != 0)
00323         {
00324             /* Codes_SRS_AMQP_FRAME_CODEC_01_046: [If encoding fails in any way, amqp_frame_codec_encode_empty_frame shall fail and return a non-zero value.]  */
00325             result = __LINE__;
00326         }
00327         else
00328         {
00329             /* Codes_SRS_AMQP_FRAME_CODEC_01_043: [On success, amqp_frame_codec_encode_empty_frame shall return 0.] */
00330             result = 0;
00331         }
00332     }
00333 
00334     return result;
00335 }