Xin Zhang / azure-iot-c-sdk-f767zi

Dependents:   samplemqtt

Committer:
XinZhangMS
Date:
Thu Aug 23 06:52:14 2018 +0000
Revision:
0:f7f1f0d76dd6
azure-c-sdk for mbed os supporting NUCLEO_F767ZI

Who changed what in which revision?

UserRevisionLine numberNew contents of line
XinZhangMS 0:f7f1f0d76dd6 1 // Copyright (c) Microsoft. All rights reserved.
XinZhangMS 0:f7f1f0d76dd6 2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
XinZhangMS 0:f7f1f0d76dd6 3
XinZhangMS 0:f7f1f0d76dd6 4 #include <stdlib.h>
XinZhangMS 0:f7f1f0d76dd6 5 #include <stdint.h>
XinZhangMS 0:f7f1f0d76dd6 6 #include <stddef.h>
XinZhangMS 0:f7f1f0d76dd6 7 #include <string.h>
XinZhangMS 0:f7f1f0d76dd6 8 #include "azure_c_shared_utility/optimize_size.h"
XinZhangMS 0:f7f1f0d76dd6 9 #include "azure_c_shared_utility/gballoc.h"
XinZhangMS 0:f7f1f0d76dd6 10 #include "azure_c_shared_utility/xlogging.h"
XinZhangMS 0:f7f1f0d76dd6 11 #include "azure_uamqp_c/amqp_frame_codec.h"
XinZhangMS 0:f7f1f0d76dd6 12 #include "azure_uamqp_c/frame_codec.h"
XinZhangMS 0:f7f1f0d76dd6 13 #include "azure_uamqp_c/amqpvalue.h"
XinZhangMS 0:f7f1f0d76dd6 14
XinZhangMS 0:f7f1f0d76dd6 15 typedef enum AMQP_FRAME_DECODE_STATE_TAG
XinZhangMS 0:f7f1f0d76dd6 16 {
XinZhangMS 0:f7f1f0d76dd6 17 AMQP_FRAME_DECODE_FRAME,
XinZhangMS 0:f7f1f0d76dd6 18 AMQP_FRAME_DECODE_ERROR
XinZhangMS 0:f7f1f0d76dd6 19 } AMQP_FRAME_DECODE_STATE;
XinZhangMS 0:f7f1f0d76dd6 20
XinZhangMS 0:f7f1f0d76dd6 21 typedef struct AMQP_FRAME_CODEC_TAG
XinZhangMS 0:f7f1f0d76dd6 22 {
XinZhangMS 0:f7f1f0d76dd6 23 FRAME_CODEC_HANDLE frame_codec;
XinZhangMS 0:f7f1f0d76dd6 24
XinZhangMS 0:f7f1f0d76dd6 25 /* decode */
XinZhangMS 0:f7f1f0d76dd6 26 AMQP_FRAME_RECEIVED_CALLBACK frame_received_callback;
XinZhangMS 0:f7f1f0d76dd6 27 AMQP_EMPTY_FRAME_RECEIVED_CALLBACK empty_frame_received_callback;
XinZhangMS 0:f7f1f0d76dd6 28 AMQP_FRAME_CODEC_ERROR_CALLBACK error_callback;
XinZhangMS 0:f7f1f0d76dd6 29 void* callback_context;
XinZhangMS 0:f7f1f0d76dd6 30 AMQPVALUE_DECODER_HANDLE decoder;
XinZhangMS 0:f7f1f0d76dd6 31 AMQP_FRAME_DECODE_STATE decode_state;
XinZhangMS 0:f7f1f0d76dd6 32 AMQP_VALUE decoded_performative;
XinZhangMS 0:f7f1f0d76dd6 33 } AMQP_FRAME_CODEC;
XinZhangMS 0:f7f1f0d76dd6 34
XinZhangMS 0:f7f1f0d76dd6 35 static void amqp_value_decoded(void* context, AMQP_VALUE decoded_value)
XinZhangMS 0:f7f1f0d76dd6 36 {
XinZhangMS 0:f7f1f0d76dd6 37 AMQP_FRAME_CODEC_HANDLE amqp_frame_codec = (AMQP_FRAME_CODEC_HANDLE)context;
XinZhangMS 0:f7f1f0d76dd6 38 uint64_t performative_descriptor_ulong;
XinZhangMS 0:f7f1f0d76dd6 39 AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(decoded_value);
XinZhangMS 0:f7f1f0d76dd6 40
XinZhangMS 0:f7f1f0d76dd6 41 /* Codes_SRS_AMQP_FRAME_CODEC_01_060: [If any error occurs while decoding a frame, the decoder shall switch to an error state where decoding shall not be possible anymore.] */
XinZhangMS 0:f7f1f0d76dd6 42 if ((descriptor == NULL) ||
XinZhangMS 0:f7f1f0d76dd6 43 (amqpvalue_get_ulong(descriptor, &performative_descriptor_ulong) != 0) ||
XinZhangMS 0:f7f1f0d76dd6 44 /* Codes_SRS_AMQP_FRAME_CODEC_01_003: [The performative MUST be one of those defined in section 2.7 and is encoded as a described type in the AMQP type system.] */
XinZhangMS 0:f7f1f0d76dd6 45 (performative_descriptor_ulong < AMQP_OPEN) ||
XinZhangMS 0:f7f1f0d76dd6 46 (performative_descriptor_ulong > AMQP_CLOSE))
XinZhangMS 0:f7f1f0d76dd6 47 {
XinZhangMS 0:f7f1f0d76dd6 48 /* Codes_SRS_AMQP_FRAME_CODEC_01_060: [If any error occurs while decoding a frame, the decoder shall switch to an error state where decoding shall not be possible anymore.] */
XinZhangMS 0:f7f1f0d76dd6 49 amqp_frame_codec->decode_state = AMQP_FRAME_DECODE_ERROR;
XinZhangMS 0:f7f1f0d76dd6 50 }
XinZhangMS 0:f7f1f0d76dd6 51 else
XinZhangMS 0:f7f1f0d76dd6 52 {
XinZhangMS 0:f7f1f0d76dd6 53 amqp_frame_codec->decoded_performative = decoded_value;
XinZhangMS 0:f7f1f0d76dd6 54 }
XinZhangMS 0:f7f1f0d76dd6 55 }
XinZhangMS 0:f7f1f0d76dd6 56
XinZhangMS 0:f7f1f0d76dd6 57 static void frame_received(void* context, const unsigned char* type_specific, uint32_t type_specific_size, const unsigned char* frame_body, uint32_t frame_body_size)
XinZhangMS 0:f7f1f0d76dd6 58 {
XinZhangMS 0:f7f1f0d76dd6 59 AMQP_FRAME_CODEC_HANDLE amqp_frame_codec = (AMQP_FRAME_CODEC_HANDLE)context;
XinZhangMS 0:f7f1f0d76dd6 60 uint16_t channel;
XinZhangMS 0:f7f1f0d76dd6 61
XinZhangMS 0:f7f1f0d76dd6 62 switch (amqp_frame_codec->decode_state)
XinZhangMS 0:f7f1f0d76dd6 63 {
XinZhangMS 0:f7f1f0d76dd6 64 default:
XinZhangMS 0:f7f1f0d76dd6 65 /* Codes_SRS_AMQP_FRAME_CODEC_01_050: [All subsequent decoding shall fail and no AMQP frames shall be indicated from that point on to the consumers of amqp_frame_codec.] */
XinZhangMS 0:f7f1f0d76dd6 66 case AMQP_FRAME_DECODE_ERROR:
XinZhangMS 0:f7f1f0d76dd6 67 break;
XinZhangMS 0:f7f1f0d76dd6 68
XinZhangMS 0:f7f1f0d76dd6 69 case AMQP_FRAME_DECODE_FRAME:
XinZhangMS 0:f7f1f0d76dd6 70 /* Codes_SRS_AMQP_FRAME_CODEC_01_049: [If not enough type specific bytes are received to decode the channel number, the decoding shall stop with an error.] */
XinZhangMS 0:f7f1f0d76dd6 71 if (type_specific_size < 2)
XinZhangMS 0:f7f1f0d76dd6 72 {
XinZhangMS 0:f7f1f0d76dd6 73 amqp_frame_codec->decode_state = AMQP_FRAME_DECODE_ERROR;
XinZhangMS 0:f7f1f0d76dd6 74
XinZhangMS 0:f7f1f0d76dd6 75 /* Codes_SRS_AMQP_FRAME_CODEC_01_069: [If any error occurs while decoding a frame, the decoder shall indicate the error by calling the amqp_frame_codec_error_callback and passing to it the callback context argument that was given in amqp_frame_codec_create.] */
XinZhangMS 0:f7f1f0d76dd6 76 amqp_frame_codec->error_callback(amqp_frame_codec->callback_context);
XinZhangMS 0:f7f1f0d76dd6 77 }
XinZhangMS 0:f7f1f0d76dd6 78 else
XinZhangMS 0:f7f1f0d76dd6 79 {
XinZhangMS 0:f7f1f0d76dd6 80 /* Codes_SRS_AMQP_FRAME_CODEC_01_001: [Bytes 6 and 7 of an AMQP frame contain the channel number ] */
XinZhangMS 0:f7f1f0d76dd6 81 channel = ((uint16_t)type_specific[0]) << 8;
XinZhangMS 0:f7f1f0d76dd6 82 channel += type_specific[1];
XinZhangMS 0:f7f1f0d76dd6 83
XinZhangMS 0:f7f1f0d76dd6 84 if (frame_body_size == 0)
XinZhangMS 0:f7f1f0d76dd6 85 {
XinZhangMS 0:f7f1f0d76dd6 86 /* Codes_SRS_AMQP_FRAME_CODEC_01_048: [When a frame header is received from frame_codec and the frame payload size is 0, empty_frame_received_callback shall be invoked, while passing the channel number as argument.] */
XinZhangMS 0:f7f1f0d76dd6 87 /* Codes_SRS_AMQP_FRAME_CODEC_01_007: [An AMQP frame with no body MAY be used to generate artificial traffic as needed to satisfy any negotiated idle timeout interval ] */
XinZhangMS 0:f7f1f0d76dd6 88 amqp_frame_codec->empty_frame_received_callback(amqp_frame_codec->callback_context, channel);
XinZhangMS 0:f7f1f0d76dd6 89 }
XinZhangMS 0:f7f1f0d76dd6 90 else
XinZhangMS 0:f7f1f0d76dd6 91 {
XinZhangMS 0:f7f1f0d76dd6 92 /* Codes_SRS_AMQP_FRAME_CODEC_01_051: [If the frame payload is greater than 0, amqp_frame_codec shall decode the performative as a described AMQP type.] */
XinZhangMS 0:f7f1f0d76dd6 93 /* Codes_SRS_AMQP_FRAME_CODEC_01_002: [The frame body is defined as a performative followed by an opaque payload.] */
XinZhangMS 0:f7f1f0d76dd6 94 amqp_frame_codec->decoded_performative = NULL;
XinZhangMS 0:f7f1f0d76dd6 95
XinZhangMS 0:f7f1f0d76dd6 96 while ((frame_body_size > 0) &&
XinZhangMS 0:f7f1f0d76dd6 97 (amqp_frame_codec->decoded_performative == NULL) &&
XinZhangMS 0:f7f1f0d76dd6 98 (amqp_frame_codec->decode_state != AMQP_FRAME_DECODE_ERROR))
XinZhangMS 0:f7f1f0d76dd6 99 {
XinZhangMS 0:f7f1f0d76dd6 100 /* Codes_SRS_AMQP_FRAME_CODEC_01_052: [Decoding the performative shall be done by feeding the bytes to the decoder create in amqp_frame_codec_create.] */
XinZhangMS 0:f7f1f0d76dd6 101 if (amqpvalue_decode_bytes(amqp_frame_codec->decoder, frame_body, 1) != 0)
XinZhangMS 0:f7f1f0d76dd6 102 {
XinZhangMS 0:f7f1f0d76dd6 103 /* Codes_SRS_AMQP_FRAME_CODEC_01_060: [If any error occurs while decoding a frame, the decoder shall switch to an error state where decoding shall not be possible anymore.] */
XinZhangMS 0:f7f1f0d76dd6 104 amqp_frame_codec->decode_state = AMQP_FRAME_DECODE_ERROR;
XinZhangMS 0:f7f1f0d76dd6 105 }
XinZhangMS 0:f7f1f0d76dd6 106 else
XinZhangMS 0:f7f1f0d76dd6 107 {
XinZhangMS 0:f7f1f0d76dd6 108 frame_body_size--;
XinZhangMS 0:f7f1f0d76dd6 109 frame_body++;
XinZhangMS 0:f7f1f0d76dd6 110 }
XinZhangMS 0:f7f1f0d76dd6 111 }
XinZhangMS 0:f7f1f0d76dd6 112
XinZhangMS 0:f7f1f0d76dd6 113 if (amqp_frame_codec->decode_state == AMQP_FRAME_DECODE_ERROR)
XinZhangMS 0:f7f1f0d76dd6 114 {
XinZhangMS 0:f7f1f0d76dd6 115 /* Codes_SRS_AMQP_FRAME_CODEC_01_069: [If any error occurs while decoding a frame, the decoder shall indicate the error by calling the amqp_frame_codec_error_callback and passing to it the callback context argument that was given in amqp_frame_codec_create.] */
XinZhangMS 0:f7f1f0d76dd6 116 amqp_frame_codec->error_callback(amqp_frame_codec->callback_context);
XinZhangMS 0:f7f1f0d76dd6 117 }
XinZhangMS 0:f7f1f0d76dd6 118 else
XinZhangMS 0:f7f1f0d76dd6 119 {
XinZhangMS 0:f7f1f0d76dd6 120 /* Codes_SRS_AMQP_FRAME_CODEC_01_004: [The remaining bytes in the frame body form the payload for that frame.] */
XinZhangMS 0:f7f1f0d76dd6 121 /* Codes_SRS_AMQP_FRAME_CODEC_01_067: [When the performative is decoded, the rest of the frame_bytes shall not be given to the AMQP decoder, but they shall be buffered so that later they are given to the frame_received callback.] */
XinZhangMS 0:f7f1f0d76dd6 122 /* Codes_SRS_AMQP_FRAME_CODEC_01_054: [Once the performative is decoded and all frame payload bytes are received, the callback frame_received_callback shall be called.] */
XinZhangMS 0:f7f1f0d76dd6 123 /* Codes_SRS_AMQP_FRAME_CODEC_01_068: [A pointer to all the payload bytes shall also be passed to frame_received_callback.] */
XinZhangMS 0:f7f1f0d76dd6 124 amqp_frame_codec->frame_received_callback(amqp_frame_codec->callback_context, channel, amqp_frame_codec->decoded_performative, frame_body, frame_body_size);
XinZhangMS 0:f7f1f0d76dd6 125 }
XinZhangMS 0:f7f1f0d76dd6 126 }
XinZhangMS 0:f7f1f0d76dd6 127 }
XinZhangMS 0:f7f1f0d76dd6 128 break;
XinZhangMS 0:f7f1f0d76dd6 129 }
XinZhangMS 0:f7f1f0d76dd6 130 }
XinZhangMS 0:f7f1f0d76dd6 131
XinZhangMS 0:f7f1f0d76dd6 132 static int encode_bytes(void* context, const unsigned char* bytes, size_t length)
XinZhangMS 0:f7f1f0d76dd6 133 {
XinZhangMS 0:f7f1f0d76dd6 134 PAYLOAD* payload = (PAYLOAD*)context;
XinZhangMS 0:f7f1f0d76dd6 135 (void)memcpy((unsigned char*)payload->bytes + payload->length, bytes, length);
XinZhangMS 0:f7f1f0d76dd6 136 payload->length += length;
XinZhangMS 0:f7f1f0d76dd6 137 return 0;
XinZhangMS 0:f7f1f0d76dd6 138 }
XinZhangMS 0:f7f1f0d76dd6 139
XinZhangMS 0:f7f1f0d76dd6 140 /* Codes_SRS_AMQP_FRAME_CODEC_01_011: [amqp_frame_codec_create shall create an instance of an amqp_frame_codec and return a non-NULL handle to it.] */
XinZhangMS 0:f7f1f0d76dd6 141 AMQP_FRAME_CODEC_HANDLE amqp_frame_codec_create(FRAME_CODEC_HANDLE frame_codec, AMQP_FRAME_RECEIVED_CALLBACK frame_received_callback,
XinZhangMS 0:f7f1f0d76dd6 142 AMQP_EMPTY_FRAME_RECEIVED_CALLBACK empty_frame_received_callback, AMQP_FRAME_CODEC_ERROR_CALLBACK amqp_frame_codec_error_callback, void* callback_context)
XinZhangMS 0:f7f1f0d76dd6 143 {
XinZhangMS 0:f7f1f0d76dd6 144 AMQP_FRAME_CODEC_HANDLE result;
XinZhangMS 0:f7f1f0d76dd6 145
XinZhangMS 0:f7f1f0d76dd6 146 /* Codes_SRS_AMQP_FRAME_CODEC_01_012: [If any of the arguments frame_codec, frame_received_callback, amqp_frame_codec_error_callback or empty_frame_received_callback is NULL, amqp_frame_codec_create shall return NULL.] */
XinZhangMS 0:f7f1f0d76dd6 147 if ((frame_codec == NULL) ||
XinZhangMS 0:f7f1f0d76dd6 148 (frame_received_callback == NULL) ||
XinZhangMS 0:f7f1f0d76dd6 149 (empty_frame_received_callback == NULL) ||
XinZhangMS 0:f7f1f0d76dd6 150 (amqp_frame_codec_error_callback == NULL))
XinZhangMS 0:f7f1f0d76dd6 151 {
XinZhangMS 0:f7f1f0d76dd6 152 LogError("Bad arguments: frame_codec = %p, frame_received_callback = %p, empty_frame_received_callback = %p, amqp_frame_codec_error_callback = %p",
XinZhangMS 0:f7f1f0d76dd6 153 frame_codec, frame_received_callback, empty_frame_received_callback, amqp_frame_codec_error_callback);
XinZhangMS 0:f7f1f0d76dd6 154 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 155 }
XinZhangMS 0:f7f1f0d76dd6 156 else
XinZhangMS 0:f7f1f0d76dd6 157 {
XinZhangMS 0:f7f1f0d76dd6 158 result = (AMQP_FRAME_CODEC_HANDLE)malloc(sizeof(AMQP_FRAME_CODEC));
XinZhangMS 0:f7f1f0d76dd6 159 /* Codes_SRS_AMQP_FRAME_CODEC_01_020: [If allocating memory for the new amqp_frame_codec fails, then amqp_frame_codec_create shall fail and return NULL.] */
XinZhangMS 0:f7f1f0d76dd6 160 if (result == NULL)
XinZhangMS 0:f7f1f0d76dd6 161 {
XinZhangMS 0:f7f1f0d76dd6 162 LogError("Could not allocate memory for AMQP frame codec");
XinZhangMS 0:f7f1f0d76dd6 163 }
XinZhangMS 0:f7f1f0d76dd6 164 else
XinZhangMS 0:f7f1f0d76dd6 165 {
XinZhangMS 0:f7f1f0d76dd6 166 result->frame_codec = frame_codec;
XinZhangMS 0:f7f1f0d76dd6 167 result->frame_received_callback = frame_received_callback;
XinZhangMS 0:f7f1f0d76dd6 168 result->empty_frame_received_callback = empty_frame_received_callback;
XinZhangMS 0:f7f1f0d76dd6 169 result->error_callback = amqp_frame_codec_error_callback;
XinZhangMS 0:f7f1f0d76dd6 170 result->callback_context = callback_context;
XinZhangMS 0:f7f1f0d76dd6 171 result->decode_state = AMQP_FRAME_DECODE_FRAME;
XinZhangMS 0:f7f1f0d76dd6 172
XinZhangMS 0:f7f1f0d76dd6 173 /* Codes_SRS_AMQP_FRAME_CODEC_01_018: [amqp_frame_codec_create shall create a decoder to be used for decoding AMQP values.] */
XinZhangMS 0:f7f1f0d76dd6 174 result->decoder = amqpvalue_decoder_create(amqp_value_decoded, result);
XinZhangMS 0:f7f1f0d76dd6 175 if (result->decoder == NULL)
XinZhangMS 0:f7f1f0d76dd6 176 {
XinZhangMS 0:f7f1f0d76dd6 177 /* Codes_SRS_AMQP_FRAME_CODEC_01_019: [If creating the decoder fails, amqp_frame_codec_create shall fail and return NULL.] */
XinZhangMS 0:f7f1f0d76dd6 178 LogError("Could not create AMQP decoder");
XinZhangMS 0:f7f1f0d76dd6 179 free(result);
XinZhangMS 0:f7f1f0d76dd6 180 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 181 }
XinZhangMS 0:f7f1f0d76dd6 182 else
XinZhangMS 0:f7f1f0d76dd6 183 {
XinZhangMS 0:f7f1f0d76dd6 184 /* Codes_SRS_AMQP_FRAME_CODEC_01_013: [amqp_frame_codec_create shall subscribe for AMQP frames with the given frame_codec.] */
XinZhangMS 0:f7f1f0d76dd6 185 if (frame_codec_subscribe(frame_codec, FRAME_TYPE_AMQP, frame_received, result) != 0)
XinZhangMS 0:f7f1f0d76dd6 186 {
XinZhangMS 0:f7f1f0d76dd6 187 /* Codes_SRS_AMQP_FRAME_CODEC_01_014: [If subscribing for AMQP frames fails, amqp_frame_codec_create shall fail and return NULL.] */
XinZhangMS 0:f7f1f0d76dd6 188 LogError("Could not subscribe for received AMQP frames");
XinZhangMS 0:f7f1f0d76dd6 189 amqpvalue_decoder_destroy(result->decoder);
XinZhangMS 0:f7f1f0d76dd6 190 free(result);
XinZhangMS 0:f7f1f0d76dd6 191 result = NULL;
XinZhangMS 0:f7f1f0d76dd6 192 }
XinZhangMS 0:f7f1f0d76dd6 193 }
XinZhangMS 0:f7f1f0d76dd6 194 }
XinZhangMS 0:f7f1f0d76dd6 195 }
XinZhangMS 0:f7f1f0d76dd6 196
XinZhangMS 0:f7f1f0d76dd6 197 return result;
XinZhangMS 0:f7f1f0d76dd6 198 }
XinZhangMS 0:f7f1f0d76dd6 199
XinZhangMS 0:f7f1f0d76dd6 200 void amqp_frame_codec_destroy(AMQP_FRAME_CODEC_HANDLE amqp_frame_codec)
XinZhangMS 0:f7f1f0d76dd6 201 {
XinZhangMS 0:f7f1f0d76dd6 202 if (amqp_frame_codec == NULL)
XinZhangMS 0:f7f1f0d76dd6 203 {
XinZhangMS 0:f7f1f0d76dd6 204 /* Codes_SRS_AMQP_FRAME_CODEC_01_016: [If amqp_frame_codec is NULL, amqp_frame_codec_destroy shall do nothing.] */
XinZhangMS 0:f7f1f0d76dd6 205 LogError("NULL amqp_frame_codec");
XinZhangMS 0:f7f1f0d76dd6 206 }
XinZhangMS 0:f7f1f0d76dd6 207 else
XinZhangMS 0:f7f1f0d76dd6 208 {
XinZhangMS 0:f7f1f0d76dd6 209 /* Codes_SRS_AMQP_FRAME_CODEC_01_017: [amqp_frame_codec_destroy shall unsubscribe from receiving AMQP frames from the frame_codec that was passed to amqp_frame_codec_create.] */
XinZhangMS 0:f7f1f0d76dd6 210 (void)frame_codec_unsubscribe(amqp_frame_codec->frame_codec, FRAME_TYPE_AMQP);
XinZhangMS 0:f7f1f0d76dd6 211
XinZhangMS 0:f7f1f0d76dd6 212 /* Codes_SRS_AMQP_FRAME_CODEC_01_021: [The decoder created in amqp_frame_codec_create shall be destroyed by amqp_frame_codec_destroy.] */
XinZhangMS 0:f7f1f0d76dd6 213 amqpvalue_decoder_destroy(amqp_frame_codec->decoder);
XinZhangMS 0:f7f1f0d76dd6 214
XinZhangMS 0:f7f1f0d76dd6 215 /* Codes_SRS_AMQP_FRAME_CODEC_01_015: [amqp_frame_codec_destroy shall free all resources associated with the amqp_frame_codec instance.] */
XinZhangMS 0:f7f1f0d76dd6 216 free(amqp_frame_codec);
XinZhangMS 0:f7f1f0d76dd6 217 }
XinZhangMS 0:f7f1f0d76dd6 218 }
XinZhangMS 0:f7f1f0d76dd6 219
XinZhangMS 0:f7f1f0d76dd6 220 int amqp_frame_codec_encode_frame(AMQP_FRAME_CODEC_HANDLE amqp_frame_codec, uint16_t channel, AMQP_VALUE performative, const PAYLOAD* payloads, size_t payload_count, ON_BYTES_ENCODED on_bytes_encoded, void* callback_context)
XinZhangMS 0:f7f1f0d76dd6 221 {
XinZhangMS 0:f7f1f0d76dd6 222 int result;
XinZhangMS 0:f7f1f0d76dd6 223
XinZhangMS 0:f7f1f0d76dd6 224 /* Codes_SRS_AMQP_FRAME_CODEC_01_024: [If frame_codec, performative or on_bytes_encoded is NULL, amqp_frame_codec_encode_frame shall fail and return a non-zero value.] */
XinZhangMS 0:f7f1f0d76dd6 225 if ((amqp_frame_codec == NULL) ||
XinZhangMS 0:f7f1f0d76dd6 226 (performative == NULL) ||
XinZhangMS 0:f7f1f0d76dd6 227 (on_bytes_encoded == NULL))
XinZhangMS 0:f7f1f0d76dd6 228 {
XinZhangMS 0:f7f1f0d76dd6 229 LogError("Bad arguments: amqp_frame_codec = %p, performative = %p, on_bytes_encoded = %p",
XinZhangMS 0:f7f1f0d76dd6 230 amqp_frame_codec, performative, on_bytes_encoded);
XinZhangMS 0:f7f1f0d76dd6 231 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 232 }
XinZhangMS 0:f7f1f0d76dd6 233 else
XinZhangMS 0:f7f1f0d76dd6 234 {
XinZhangMS 0:f7f1f0d76dd6 235 AMQP_VALUE descriptor;
XinZhangMS 0:f7f1f0d76dd6 236 uint64_t performative_ulong;
XinZhangMS 0:f7f1f0d76dd6 237 size_t encoded_size;
XinZhangMS 0:f7f1f0d76dd6 238
XinZhangMS 0:f7f1f0d76dd6 239 if ((descriptor = amqpvalue_get_inplace_descriptor(performative)) == NULL)
XinZhangMS 0:f7f1f0d76dd6 240 {
XinZhangMS 0:f7f1f0d76dd6 241 /* Codes_SRS_AMQP_FRAME_CODEC_01_029: [If any error occurs during encoding, amqp_frame_codec_encode_frame shall fail and return a non-zero value.] */
XinZhangMS 0:f7f1f0d76dd6 242 LogError("Getting the descriptor failed");
XinZhangMS 0:f7f1f0d76dd6 243 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 244 }
XinZhangMS 0:f7f1f0d76dd6 245 else if (amqpvalue_get_ulong(descriptor, &performative_ulong) != 0)
XinZhangMS 0:f7f1f0d76dd6 246 {
XinZhangMS 0:f7f1f0d76dd6 247 /* Codes_SRS_AMQP_FRAME_CODEC_01_029: [If any error occurs during encoding, amqp_frame_codec_encode_frame shall fail and return a non-zero value.] */
XinZhangMS 0:f7f1f0d76dd6 248 LogError("Getting the descriptor ulong failed");
XinZhangMS 0:f7f1f0d76dd6 249 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 250 }
XinZhangMS 0:f7f1f0d76dd6 251 /* Codes_SRS_AMQP_FRAME_CODEC_01_008: [The performative MUST be one of those defined in section 2.7 and is encoded as a described type in the AMQP type system.] */
XinZhangMS 0:f7f1f0d76dd6 252 else if ((performative_ulong < AMQP_OPEN) ||
XinZhangMS 0:f7f1f0d76dd6 253 (performative_ulong > AMQP_CLOSE))
XinZhangMS 0:f7f1f0d76dd6 254 {
XinZhangMS 0:f7f1f0d76dd6 255 /* Codes_SRS_AMQP_FRAME_CODEC_01_029: [If any error occurs during encoding, amqp_frame_codec_encode_frame shall fail and return a non-zero value.] */
XinZhangMS 0:f7f1f0d76dd6 256 LogError("Bad arguments: amqp_frame_codec = %p, performative = %p, on_bytes_encoded = %p",
XinZhangMS 0:f7f1f0d76dd6 257 amqp_frame_codec, performative, on_bytes_encoded);
XinZhangMS 0:f7f1f0d76dd6 258 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 259 }
XinZhangMS 0:f7f1f0d76dd6 260 /* Codes_SRS_AMQP_FRAME_CODEC_01_027: [The encoded size of the performative and its fields shall be obtained by calling amqpvalue_get_encoded_size.] */
XinZhangMS 0:f7f1f0d76dd6 261 else if (amqpvalue_get_encoded_size(performative, &encoded_size) != 0)
XinZhangMS 0:f7f1f0d76dd6 262 {
XinZhangMS 0:f7f1f0d76dd6 263 /* Codes_SRS_AMQP_FRAME_CODEC_01_029: [If any error occurs during encoding, amqp_frame_codec_encode_frame shall fail and return a non-zero value.] */
XinZhangMS 0:f7f1f0d76dd6 264 LogError("Getting the encoded size failed");
XinZhangMS 0:f7f1f0d76dd6 265 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 266 }
XinZhangMS 0:f7f1f0d76dd6 267 else
XinZhangMS 0:f7f1f0d76dd6 268 {
XinZhangMS 0:f7f1f0d76dd6 269 unsigned char* amqp_performative_bytes = (unsigned char*)malloc(encoded_size);
XinZhangMS 0:f7f1f0d76dd6 270 if (amqp_performative_bytes == NULL)
XinZhangMS 0:f7f1f0d76dd6 271 {
XinZhangMS 0:f7f1f0d76dd6 272 LogError("Could not allocate performative bytes");
XinZhangMS 0:f7f1f0d76dd6 273 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 274 }
XinZhangMS 0:f7f1f0d76dd6 275 else
XinZhangMS 0:f7f1f0d76dd6 276 {
XinZhangMS 0:f7f1f0d76dd6 277 PAYLOAD* new_payloads = (PAYLOAD*)malloc(sizeof(PAYLOAD) * (payload_count + 1));
XinZhangMS 0:f7f1f0d76dd6 278 if (new_payloads == NULL)
XinZhangMS 0:f7f1f0d76dd6 279 {
XinZhangMS 0:f7f1f0d76dd6 280 LogError("Could not allocate frame payloads");
XinZhangMS 0:f7f1f0d76dd6 281 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 282 }
XinZhangMS 0:f7f1f0d76dd6 283 else
XinZhangMS 0:f7f1f0d76dd6 284 {
XinZhangMS 0:f7f1f0d76dd6 285 /* Codes_SRS_AMQP_FRAME_CODEC_01_070: [The payloads argument for frame_codec_encode_frame shall be made of the payload for the encoded performative and the payloads passed to amqp_frame_codec_encode_frame.] */
XinZhangMS 0:f7f1f0d76dd6 286 /* Codes_SRS_AMQP_FRAME_CODEC_01_028: [The encode result for the performative shall be placed in a PAYLOAD structure.] */
XinZhangMS 0:f7f1f0d76dd6 287 new_payloads[0].bytes = amqp_performative_bytes;
XinZhangMS 0:f7f1f0d76dd6 288 new_payloads[0].length = 0;
XinZhangMS 0:f7f1f0d76dd6 289
XinZhangMS 0:f7f1f0d76dd6 290 if (payload_count > 0)
XinZhangMS 0:f7f1f0d76dd6 291 {
XinZhangMS 0:f7f1f0d76dd6 292 (void)memcpy(new_payloads + 1, payloads, sizeof(PAYLOAD) * payload_count);
XinZhangMS 0:f7f1f0d76dd6 293 }
XinZhangMS 0:f7f1f0d76dd6 294
XinZhangMS 0:f7f1f0d76dd6 295 if (amqpvalue_encode(performative, encode_bytes, &new_payloads[0]) != 0)
XinZhangMS 0:f7f1f0d76dd6 296 {
XinZhangMS 0:f7f1f0d76dd6 297 LogError("amqpvalue_encode failed");
XinZhangMS 0:f7f1f0d76dd6 298 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 299 }
XinZhangMS 0:f7f1f0d76dd6 300 else
XinZhangMS 0:f7f1f0d76dd6 301 {
XinZhangMS 0:f7f1f0d76dd6 302 unsigned char channel_bytes[2];
XinZhangMS 0:f7f1f0d76dd6 303
XinZhangMS 0:f7f1f0d76dd6 304 channel_bytes[0] = channel >> 8;
XinZhangMS 0:f7f1f0d76dd6 305 channel_bytes[1] = channel & 0xFF;
XinZhangMS 0:f7f1f0d76dd6 306
XinZhangMS 0:f7f1f0d76dd6 307 /* Codes_SRS_AMQP_FRAME_CODEC_01_005: [Bytes 6 and 7 of an AMQP frame contain the channel number ] */
XinZhangMS 0:f7f1f0d76dd6 308 /* Codes_SRS_AMQP_FRAME_CODEC_01_025: [amqp_frame_codec_encode_frame shall encode the frame header by using frame_codec_encode_frame.] */
XinZhangMS 0:f7f1f0d76dd6 309 /* Codes_SRS_AMQP_FRAME_CODEC_01_006: [The frame body is defined as a performative followed by an opaque payload.] */
XinZhangMS 0:f7f1f0d76dd6 310 if (frame_codec_encode_frame(amqp_frame_codec->frame_codec, FRAME_TYPE_AMQP, new_payloads, payload_count + 1, channel_bytes, sizeof(channel_bytes), on_bytes_encoded, callback_context) != 0)
XinZhangMS 0:f7f1f0d76dd6 311 {
XinZhangMS 0:f7f1f0d76dd6 312 /* Codes_SRS_AMQP_FRAME_CODEC_01_029: [If any error occurs during encoding, amqp_frame_codec_encode_frame shall fail and return a non-zero value.] */
XinZhangMS 0:f7f1f0d76dd6 313 LogError("frame_codec_encode_frame failed");
XinZhangMS 0:f7f1f0d76dd6 314 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 315 }
XinZhangMS 0:f7f1f0d76dd6 316 else
XinZhangMS 0:f7f1f0d76dd6 317 {
XinZhangMS 0:f7f1f0d76dd6 318 /* Codes_SRS_AMQP_FRAME_CODEC_01_022: [amqp_frame_codec_begin_encode_frame shall encode the frame header and AMQP performative in an AMQP frame and on success it shall return 0.] */
XinZhangMS 0:f7f1f0d76dd6 319 result = 0;
XinZhangMS 0:f7f1f0d76dd6 320 }
XinZhangMS 0:f7f1f0d76dd6 321 }
XinZhangMS 0:f7f1f0d76dd6 322
XinZhangMS 0:f7f1f0d76dd6 323 free(new_payloads);
XinZhangMS 0:f7f1f0d76dd6 324 }
XinZhangMS 0:f7f1f0d76dd6 325
XinZhangMS 0:f7f1f0d76dd6 326 free(amqp_performative_bytes);
XinZhangMS 0:f7f1f0d76dd6 327 }
XinZhangMS 0:f7f1f0d76dd6 328 }
XinZhangMS 0:f7f1f0d76dd6 329 }
XinZhangMS 0:f7f1f0d76dd6 330
XinZhangMS 0:f7f1f0d76dd6 331 return result;
XinZhangMS 0:f7f1f0d76dd6 332 }
XinZhangMS 0:f7f1f0d76dd6 333
XinZhangMS 0:f7f1f0d76dd6 334 /* Codes_SRS_AMQP_FRAME_CODEC_01_042: [amqp_frame_codec_encode_empty_frame shall encode a frame with no payload.] */
XinZhangMS 0:f7f1f0d76dd6 335 /* Codes_SRS_AMQP_FRAME_CODEC_01_010: [An AMQP frame with no body MAY be used to generate artificial traffic as needed to satisfy any negotiated idle timeout interval ] */
XinZhangMS 0:f7f1f0d76dd6 336 int amqp_frame_codec_encode_empty_frame(AMQP_FRAME_CODEC_HANDLE amqp_frame_codec, uint16_t channel, ON_BYTES_ENCODED on_bytes_encoded, void* callback_context)
XinZhangMS 0:f7f1f0d76dd6 337 {
XinZhangMS 0:f7f1f0d76dd6 338 int result;
XinZhangMS 0:f7f1f0d76dd6 339
XinZhangMS 0:f7f1f0d76dd6 340 /* Codes_SRS_AMQP_FRAME_CODEC_01_045: [If amqp_frame_codec is NULL, amqp_frame_codec_encode_empty_frame shall fail and return a non-zero value.] */
XinZhangMS 0:f7f1f0d76dd6 341 if (amqp_frame_codec == NULL)
XinZhangMS 0:f7f1f0d76dd6 342 {
XinZhangMS 0:f7f1f0d76dd6 343 LogError("NULL amqp_frame_codec");
XinZhangMS 0:f7f1f0d76dd6 344 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 345 }
XinZhangMS 0:f7f1f0d76dd6 346 else
XinZhangMS 0:f7f1f0d76dd6 347 {
XinZhangMS 0:f7f1f0d76dd6 348 unsigned char channel_bytes[2];
XinZhangMS 0:f7f1f0d76dd6 349
XinZhangMS 0:f7f1f0d76dd6 350 channel_bytes[0] = channel >> 8;
XinZhangMS 0:f7f1f0d76dd6 351 channel_bytes[1] = channel & 0xFF;
XinZhangMS 0:f7f1f0d76dd6 352
XinZhangMS 0:f7f1f0d76dd6 353 /* Codes_SRS_AMQP_FRAME_CODEC_01_044: [amqp_frame_codec_encode_empty_frame shall use frame_codec_encode_frame to encode the frame.] */
XinZhangMS 0:f7f1f0d76dd6 354 if (frame_codec_encode_frame(amqp_frame_codec->frame_codec, FRAME_TYPE_AMQP, NULL, 0, channel_bytes, sizeof(channel_bytes), on_bytes_encoded, callback_context) != 0)
XinZhangMS 0:f7f1f0d76dd6 355 {
XinZhangMS 0:f7f1f0d76dd6 356 /* Codes_SRS_AMQP_FRAME_CODEC_01_046: [If encoding fails in any way, amqp_frame_codec_encode_empty_frame shall fail and return a non-zero value.] */
XinZhangMS 0:f7f1f0d76dd6 357 LogError("frame_codec_encode_frame failed when encoding empty frame");
XinZhangMS 0:f7f1f0d76dd6 358 result = __FAILURE__;
XinZhangMS 0:f7f1f0d76dd6 359 }
XinZhangMS 0:f7f1f0d76dd6 360 else
XinZhangMS 0:f7f1f0d76dd6 361 {
XinZhangMS 0:f7f1f0d76dd6 362 /* Codes_SRS_AMQP_FRAME_CODEC_01_043: [On success, amqp_frame_codec_encode_empty_frame shall return 0.] */
XinZhangMS 0:f7f1f0d76dd6 363 result = 0;
XinZhangMS 0:f7f1f0d76dd6 364 }
XinZhangMS 0:f7f1f0d76dd6 365 }
XinZhangMS 0:f7f1f0d76dd6 366
XinZhangMS 0:f7f1f0d76dd6 367 return result;
XinZhangMS 0:f7f1f0d76dd6 368 }