Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependencies: EthernetInterface NTPClient iothub_amqp_transport iothub_client mbed-rtos mbed
Fork of iothub_client_sample_amqp by
amqp_frame_codec.c
00001 // Copyright (c) Microsoft. All rights reserved. 00002 // Licensed under the MIT license. See LICENSE file in the project root for full license information. 00003 00004 #include <stdlib.h> 00005 #ifdef _CRTDBG_MAP_ALLOC 00006 #include <crtdbg.h> 00007 #endif 00008 #include <stdint.h> 00009 #include <stddef.h> 00010 #include <string.h> 00011 #include "azure_uamqp_c/amqp_frame_codec.h" 00012 #include "azure_uamqp_c/frame_codec.h" 00013 #include "azure_uamqp_c/amqpalloc.h" 00014 #include "azure_uamqp_c/amqpvalue.h" 00015 00016 typedef enum AMQP_FRAME_DECODE_STATE_TAG 00017 { 00018 AMQP_FRAME_DECODE_FRAME, 00019 AMQP_FRAME_DECODE_ERROR 00020 } AMQP_FRAME_DECODE_STATE; 00021 00022 typedef struct AMQP_FRAME_CODEC_INSTANCE_TAG 00023 { 00024 FRAME_CODEC_HANDLE frame_codec; 00025 00026 /* decode */ 00027 AMQP_FRAME_RECEIVED_CALLBACK frame_received_callback; 00028 AMQP_EMPTY_FRAME_RECEIVED_CALLBACK empty_frame_received_callback; 00029 AMQP_FRAME_CODEC_ERROR_CALLBACK error_callback; 00030 void* callback_context; 00031 AMQPVALUE_DECODER_HANDLE decoder; 00032 AMQP_FRAME_DECODE_STATE decode_state; 00033 AMQP_VALUE decoded_performative; 00034 } AMQP_FRAME_CODEC_INSTANCE; 00035 00036 static void amqp_value_decoded(void* context, AMQP_VALUE decoded_value) 00037 { 00038 AMQP_FRAME_CODEC_INSTANCE* amqp_frame_codec_instance = (AMQP_FRAME_CODEC_INSTANCE*)context; 00039 uint64_t performative_descriptor_ulong; 00040 AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(decoded_value); 00041 00042 /* Codes_SRS_AMQP_FRAME_CODEC_01_060: [If any error occurs while decoding a frame, the decoder shall switch to an error state where decoding shall not be possible anymore.] */ 00043 if ((descriptor == NULL) || 00044 (amqpvalue_get_ulong(descriptor, &performative_descriptor_ulong) != 0) || 00045 /* Codes_SRS_AMQP_FRAME_CODEC_01_003: [The performative MUST be one of those defined in section 2.7 and is encoded as a described type in the AMQP type system.] */ 00046 (performative_descriptor_ulong < AMQP_OPEN) || 00047 (performative_descriptor_ulong > AMQP_CLOSE)) 00048 { 00049 /* Codes_SRS_AMQP_FRAME_CODEC_01_060: [If any error occurs while decoding a frame, the decoder shall switch to an error state where decoding shall not be possible anymore.] */ 00050 amqp_frame_codec_instance->decode_state = AMQP_FRAME_DECODE_ERROR; 00051 } 00052 else 00053 { 00054 amqp_frame_codec_instance->decoded_performative = decoded_value; 00055 } 00056 } 00057 00058 static void frame_received(void* context, const unsigned char* type_specific, uint32_t type_specific_size, const unsigned char* frame_body, uint32_t frame_body_size) 00059 { 00060 AMQP_FRAME_CODEC_INSTANCE* amqp_frame_codec_instance = (AMQP_FRAME_CODEC_INSTANCE*)context; 00061 uint16_t channel; 00062 00063 switch (amqp_frame_codec_instance->decode_state) 00064 { 00065 default: 00066 /* Codes_SRS_AMQP_FRAME_CODEC_01_050: [All subsequent decoding shall fail and no AMQP frames shall be indicated from that point on to the consumers of amqp_frame_codec.] */ 00067 case AMQP_FRAME_DECODE_ERROR: 00068 break; 00069 00070 case AMQP_FRAME_DECODE_FRAME: 00071 /* Codes_SRS_AMQP_FRAME_CODEC_01_049: [If not enough type specific bytes are received to decode the channel number, the decoding shall stop with an error.] */ 00072 if (type_specific_size < 2) 00073 { 00074 amqp_frame_codec_instance->decode_state = AMQP_FRAME_DECODE_ERROR; 00075 00076 /* Codes_SRS_AMQP_FRAME_CODEC_01_069: [If any error occurs while decoding a frame, the decoder shall indicate the error by calling the amqp_frame_codec_error_callback and passing to it the callback context argument that was given in amqp_frame_codec_create.] */ 00077 amqp_frame_codec_instance->error_callback(amqp_frame_codec_instance->callback_context); 00078 } 00079 else 00080 { 00081 /* Codes_SRS_AMQP_FRAME_CODEC_01_001: [Bytes 6 and 7 of an AMQP frame contain the channel number ] */ 00082 channel = ((uint16_t)type_specific[0]) << 8; 00083 channel += type_specific[1]; 00084 00085 if (frame_body_size == 0) 00086 { 00087 /* Codes_SRS_AMQP_FRAME_CODEC_01_048: [When a frame header is received from frame_codec and the frame payload size is 0, empty_frame_received_callback shall be invoked, while passing the channel number as argument.] */ 00088 /* Codes_SRS_AMQP_FRAME_CODEC_01_007: [An AMQP frame with no body MAY be used to generate artificial traffic as needed to satisfy any negotiated idle timeout interval ] */ 00089 amqp_frame_codec_instance->empty_frame_received_callback(amqp_frame_codec_instance->callback_context, channel); 00090 } 00091 else 00092 { 00093 /* Codes_SRS_AMQP_FRAME_CODEC_01_051: [If the frame payload is greater than 0, amqp_frame_codec shall decode the performative as a described AMQP type.] */ 00094 /* Codes_SRS_AMQP_FRAME_CODEC_01_002: [The frame body is defined as a performative followed by an opaque payload.] */ 00095 amqp_frame_codec_instance->decoded_performative = NULL; 00096 00097 while ((frame_body_size > 0) && 00098 (amqp_frame_codec_instance->decoded_performative == NULL) && 00099 (amqp_frame_codec_instance->decode_state != AMQP_FRAME_DECODE_ERROR)) 00100 { 00101 /* Codes_SRS_AMQP_FRAME_CODEC_01_052: [Decoding the performative shall be done by feeding the bytes to the decoder create in amqp_frame_codec_create.] */ 00102 if (amqpvalue_decode_bytes(amqp_frame_codec_instance->decoder, frame_body, 1) != 0) 00103 { 00104 /* Codes_SRS_AMQP_FRAME_CODEC_01_060: [If any error occurs while decoding a frame, the decoder shall switch to an error state where decoding shall not be possible anymore.] */ 00105 amqp_frame_codec_instance->decode_state = AMQP_FRAME_DECODE_ERROR; 00106 } 00107 else 00108 { 00109 frame_body_size--; 00110 frame_body++; 00111 } 00112 } 00113 00114 if (amqp_frame_codec_instance->decode_state == AMQP_FRAME_DECODE_ERROR) 00115 { 00116 /* Codes_SRS_AMQP_FRAME_CODEC_01_069: [If any error occurs while decoding a frame, the decoder shall indicate the error by calling the amqp_frame_codec_error_callback and passing to it the callback context argument that was given in amqp_frame_codec_create.] */ 00117 amqp_frame_codec_instance->error_callback(amqp_frame_codec_instance->callback_context); 00118 } 00119 else 00120 { 00121 /* Codes_SRS_AMQP_FRAME_CODEC_01_004: [The remaining bytes in the frame body form the payload for that frame.] */ 00122 /* Codes_SRS_AMQP_FRAME_CODEC_01_067: [When the performative is decoded, the rest of the frame_bytes shall not be given to the AMQP decoder, but they shall be buffered so that later they are given to the frame_received callback.] */ 00123 /* Codes_SRS_AMQP_FRAME_CODEC_01_054: [Once the performative is decoded and all frame payload bytes are received, the callback frame_received_callback shall be called.] */ 00124 /* Codes_SRS_AMQP_FRAME_CODEC_01_068: [A pointer to all the payload bytes shall also be passed to frame_received_callback.] */ 00125 amqp_frame_codec_instance->frame_received_callback(amqp_frame_codec_instance->callback_context, channel, amqp_frame_codec_instance->decoded_performative, frame_body, frame_body_size); 00126 } 00127 } 00128 } 00129 break; 00130 } 00131 } 00132 00133 static int encode_bytes(void* context, const unsigned char* bytes, size_t length) 00134 { 00135 PAYLOAD* payload = (PAYLOAD*)context; 00136 (void)memcpy((unsigned char*)payload->bytes + payload->length, bytes, length); 00137 payload->length += length; 00138 return 0; 00139 } 00140 00141 /* Codes_SRS_AMQP_FRAME_CODEC_01_011: [amqp_frame_codec_create shall create an instance of an amqp_frame_codec and return a non-NULL handle to it.] */ 00142 AMQP_FRAME_CODEC_HANDLE amqp_frame_codec_create(FRAME_CODEC_HANDLE frame_codec, AMQP_FRAME_RECEIVED_CALLBACK frame_received_callback, 00143 AMQP_EMPTY_FRAME_RECEIVED_CALLBACK empty_frame_received_callback, AMQP_FRAME_CODEC_ERROR_CALLBACK amqp_frame_codec_error_callback, void* callback_context) 00144 { 00145 AMQP_FRAME_CODEC_INSTANCE* result; 00146 00147 /* Codes_SRS_AMQP_FRAME_CODEC_01_012: [If any of the arguments frame_codec, frame_received_callback, amqp_frame_codec_error_callback or empty_frame_received_callback is NULL, amqp_frame_codec_create shall return NULL.] */ 00148 if ((frame_codec == NULL) || 00149 (frame_received_callback == NULL) || 00150 (empty_frame_received_callback == NULL) || 00151 (amqp_frame_codec_error_callback == NULL)) 00152 { 00153 result = NULL; 00154 } 00155 else 00156 { 00157 result = (AMQP_FRAME_CODEC_INSTANCE*)amqpalloc_malloc(sizeof(AMQP_FRAME_CODEC_INSTANCE)); 00158 /* Codes_SRS_AMQP_FRAME_CODEC_01_020: [If allocating memory for the new amqp_frame_codec fails, then amqp_frame_codec_create shall fail and return NULL.] */ 00159 if (result != NULL) 00160 { 00161 result->frame_codec = frame_codec; 00162 result->frame_received_callback = frame_received_callback; 00163 result->empty_frame_received_callback = empty_frame_received_callback; 00164 result->error_callback = amqp_frame_codec_error_callback; 00165 result->callback_context = callback_context; 00166 result->decode_state = AMQP_FRAME_DECODE_FRAME; 00167 00168 /* Codes_SRS_AMQP_FRAME_CODEC_01_018: [amqp_frame_codec_create shall create a decoder to be used for decoding AMQP values.] */ 00169 result->decoder = amqpvalue_decoder_create(amqp_value_decoded, result); 00170 if (result->decoder == NULL) 00171 { 00172 /* Codes_SRS_AMQP_FRAME_CODEC_01_019: [If creating the decoder fails, amqp_frame_codec_create shall fail and return NULL.] */ 00173 amqpalloc_free(result); 00174 result = NULL; 00175 } 00176 else 00177 { 00178 /* Codes_SRS_AMQP_FRAME_CODEC_01_013: [amqp_frame_codec_create shall subscribe for AMQP frames with the given frame_codec.] */ 00179 if (frame_codec_subscribe(frame_codec, FRAME_TYPE_AMQP, frame_received, result) != 0) 00180 { 00181 /* Codes_SRS_AMQP_FRAME_CODEC_01_014: [If subscribing for AMQP frames fails, amqp_frame_codec_create shall fail and return NULL.] */ 00182 amqpvalue_decoder_destroy(result->decoder); 00183 amqpalloc_free(result); 00184 result = NULL; 00185 } 00186 } 00187 } 00188 } 00189 00190 return result; 00191 } 00192 00193 void amqp_frame_codec_destroy(AMQP_FRAME_CODEC_HANDLE amqp_frame_codec) 00194 { 00195 if (amqp_frame_codec != NULL) 00196 { 00197 /* Codes_SRS_AMQP_FRAME_CODEC_01_017: [amqp_frame_codec_destroy shall unsubscribe from receiving AMQP frames from the frame_codec that was passed to amqp_frame_codec_create.] */ 00198 (void)frame_codec_unsubscribe(amqp_frame_codec->frame_codec, FRAME_TYPE_AMQP); 00199 00200 /* Codes_SRS_AMQP_FRAME_CODEC_01_021: [The decoder created in amqp_frame_codec_create shall be destroyed by amqp_frame_codec_destroy.] */ 00201 amqpvalue_decoder_destroy(amqp_frame_codec->decoder); 00202 00203 /* Codes_SRS_AMQP_FRAME_CODEC_01_015: [amqp_frame_codec_destroy shall free all resources associated with the amqp_frame_codec instance.] */ 00204 amqpalloc_free(amqp_frame_codec); 00205 } 00206 } 00207 00208 int amqp_frame_codec_encode_frame(AMQP_FRAME_CODEC_HANDLE amqp_frame_codec, uint16_t channel, const AMQP_VALUE performative, const PAYLOAD* payloads, size_t payload_count, ON_BYTES_ENCODED on_bytes_encoded, void* callback_context) 00209 { 00210 int result; 00211 00212 /* Codes_SRS_AMQP_FRAME_CODEC_01_024: [If frame_codec, performative or on_bytes_encoded is NULL, amqp_frame_codec_encode_frame shall fail and return a non-zero value.] */ 00213 if ((amqp_frame_codec == NULL) || 00214 (performative == NULL) || 00215 (on_bytes_encoded == NULL)) 00216 { 00217 result = __LINE__; 00218 } 00219 else 00220 { 00221 AMQP_VALUE descriptor; 00222 uint64_t performative_ulong; 00223 size_t encoded_size; 00224 00225 if (((descriptor = amqpvalue_get_inplace_descriptor(performative)) == NULL) || 00226 (amqpvalue_get_ulong(descriptor, &performative_ulong) != 0) || 00227 /* Codes_SRS_AMQP_FRAME_CODEC_01_008: [The performative MUST be one of those defined in section 2.7 and is encoded as a described type in the AMQP type system.] */ 00228 (performative_ulong < AMQP_OPEN) || 00229 (performative_ulong > AMQP_CLOSE)) 00230 { 00231 /* Codes_SRS_AMQP_FRAME_CODEC_01_029: [If any error occurs during encoding, amqp_frame_codec_encode_frame shall fail and return a non-zero value.] */ 00232 result = __LINE__; 00233 } 00234 /* Codes_SRS_AMQP_FRAME_CODEC_01_027: [The encoded size of the performative and its fields shall be obtained by calling amqpvalue_get_encoded_size.] */ 00235 else if (amqpvalue_get_encoded_size(performative, &encoded_size) != 0) 00236 { 00237 /* Codes_SRS_AMQP_FRAME_CODEC_01_029: [If any error occurs during encoding, amqp_frame_codec_encode_frame shall fail and return a non-zero value.] */ 00238 result = __LINE__; 00239 } 00240 else 00241 { 00242 unsigned char* amqp_performative_bytes = (unsigned char*)amqpalloc_malloc(encoded_size); 00243 if (amqp_performative_bytes == NULL) 00244 { 00245 result = __LINE__; 00246 } 00247 else 00248 { 00249 PAYLOAD* new_payloads = (PAYLOAD*)amqpalloc_malloc(sizeof(PAYLOAD) * (payload_count + 1)); 00250 if (new_payloads == NULL) 00251 { 00252 result = __LINE__; 00253 } 00254 else 00255 { 00256 /* Codes_SRS_AMQP_FRAME_CODEC_01_070: [The payloads argument for frame_codec_encode_frame shall be made of the payload for the encoded performative and the payloads passed to amqp_frame_codec_encode_frame.] */ 00257 /* Codes_SRS_AMQP_FRAME_CODEC_01_028: [The encode result for the performative shall be placed in a PAYLOAD structure.] */ 00258 new_payloads[0].bytes = amqp_performative_bytes; 00259 new_payloads[0].length = 0; 00260 00261 if (payload_count > 0) 00262 { 00263 (void)memcpy(new_payloads + 1, payloads, sizeof(PAYLOAD) * payload_count); 00264 } 00265 00266 if (amqpvalue_encode(performative, encode_bytes, &new_payloads[0]) != 0) 00267 { 00268 result = __LINE__; 00269 } 00270 else 00271 { 00272 unsigned char channel_bytes[2]; 00273 00274 channel_bytes[0] = channel >> 8; 00275 channel_bytes[1] = channel & 0xFF; 00276 00277 /* Codes_SRS_AMQP_FRAME_CODEC_01_005: [Bytes 6 and 7 of an AMQP frame contain the channel number ] */ 00278 /* Codes_SRS_AMQP_FRAME_CODEC_01_025: [amqp_frame_codec_encode_frame shall encode the frame header by using frame_codec_encode_frame.] */ 00279 /* Codes_SRS_AMQP_FRAME_CODEC_01_006: [The frame body is defined as a performative followed by an opaque payload.] */ 00280 if (frame_codec_encode_frame(amqp_frame_codec->frame_codec, FRAME_TYPE_AMQP, new_payloads, payload_count + 1, channel_bytes, sizeof(channel_bytes), on_bytes_encoded, callback_context) != 0) 00281 { 00282 /* Codes_SRS_AMQP_FRAME_CODEC_01_029: [If any error occurs during encoding, amqp_frame_codec_encode_frame shall fail and return a non-zero value.] */ 00283 result = __LINE__; 00284 } 00285 else 00286 { 00287 /* Codes_SRS_AMQP_FRAME_CODEC_01_022: [amqp_frame_codec_begin_encode_frame shall encode the frame header and AMQP performative in an AMQP frame and on success it shall return 0.] */ 00288 result = 0; 00289 } 00290 } 00291 00292 amqpalloc_free(new_payloads); 00293 } 00294 00295 amqpalloc_free(amqp_performative_bytes); 00296 } 00297 } 00298 } 00299 00300 return result; 00301 } 00302 00303 /* Codes_SRS_AMQP_FRAME_CODEC_01_042: [amqp_frame_codec_encode_empty_frame shall encode a frame with no payload.] */ 00304 /* Codes_SRS_AMQP_FRAME_CODEC_01_010: [An AMQP frame with no body MAY be used to generate artificial traffic as needed to satisfy any negotiated idle timeout interval ] */ 00305 int amqp_frame_codec_encode_empty_frame(AMQP_FRAME_CODEC_HANDLE amqp_frame_codec, uint16_t channel, ON_BYTES_ENCODED on_bytes_encoded, void* callback_context) 00306 { 00307 int result; 00308 00309 /* Codes_SRS_AMQP_FRAME_CODEC_01_045: [If amqp_frame_codec is NULL, amqp_frame_codec_encode_empty_frame shall fail and return a non-zero value.] */ 00310 if (amqp_frame_codec == NULL) 00311 { 00312 result = __LINE__; 00313 } 00314 else 00315 { 00316 unsigned char channel_bytes[2]; 00317 00318 channel_bytes[0] = channel >> 8; 00319 channel_bytes[1] = channel & 0xFF; 00320 00321 /* Codes_SRS_AMQP_FRAME_CODEC_01_044: [amqp_frame_codec_encode_empty_frame shall use frame_codec_encode_frame to encode the frame.] */ 00322 if (frame_codec_encode_frame(amqp_frame_codec->frame_codec, FRAME_TYPE_AMQP, NULL, 0, channel_bytes, sizeof(channel_bytes), on_bytes_encoded, callback_context) != 0) 00323 { 00324 /* Codes_SRS_AMQP_FRAME_CODEC_01_046: [If encoding fails in any way, amqp_frame_codec_encode_empty_frame shall fail and return a non-zero value.] */ 00325 result = __LINE__; 00326 } 00327 else 00328 { 00329 /* Codes_SRS_AMQP_FRAME_CODEC_01_043: [On success, amqp_frame_codec_encode_empty_frame shall return 0.] */ 00330 result = 0; 00331 } 00332 } 00333 00334 return result; 00335 }
Generated on Tue Jul 12 2022 12:43:16 by
