Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependencies: EthernetInterface NTPClient iothub_amqp_transport iothub_client mbed-rtos mbed
Fork of iothub_client_sample_amqp by
header_detect_io.c
00001 // Copyright (c) Microsoft. All rights reserved. 00002 // Licensed under the MIT license. See LICENSE file in the project root for full license information. 00003 00004 #include "azure_uamqp_c/header_detect_io.h" 00005 #include "azure_uamqp_c/amqpalloc.h" 00006 00007 typedef enum IO_STATE_TAG 00008 { 00009 IO_STATE_NOT_OPEN, 00010 IO_STATE_OPENING_UNDERLYING_IO, 00011 IO_STATE_WAIT_FOR_HEADER, 00012 IO_STATE_OPEN, 00013 IO_STATE_CLOSING, 00014 IO_STATE_ERROR 00015 } IO_STATE; 00016 00017 typedef struct HEADER_DETECT_IO_INSTANCE_TAG 00018 { 00019 XIO_HANDLE underlying_io; 00020 size_t header_pos; 00021 IO_STATE io_state; 00022 ON_IO_OPEN_COMPLETE on_io_open_complete; 00023 ON_IO_CLOSE_COMPLETE on_io_close_complete; 00024 ON_IO_ERROR on_io_error; 00025 ON_BYTES_RECEIVED on_bytes_received; 00026 void* on_io_open_complete_context; 00027 void* on_io_close_complete_context; 00028 void* on_io_error_context; 00029 void* on_bytes_received_context; 00030 } HEADER_DETECT_IO_INSTANCE; 00031 00032 static const unsigned char amqp_header[] = { 'A', 'M', 'Q', 'P', 0, 1, 0, 0 }; 00033 00034 static void indicate_error(HEADER_DETECT_IO_INSTANCE* header_detect_io_instance) 00035 { 00036 if (header_detect_io_instance->on_io_error != NULL) 00037 { 00038 header_detect_io_instance->on_io_error(header_detect_io_instance->on_io_error_context); 00039 } 00040 } 00041 00042 static void indicate_open_complete(HEADER_DETECT_IO_INSTANCE* header_detect_io_instance, IO_OPEN_RESULT open_result) 00043 { 00044 if (header_detect_io_instance->on_io_open_complete != NULL) 00045 { 00046 header_detect_io_instance->on_io_open_complete(header_detect_io_instance->on_io_open_complete_context, open_result); 00047 } 00048 } 00049 00050 static void indicate_close_complete(HEADER_DETECT_IO_INSTANCE* header_detect_io_instance) 00051 { 00052 if (header_detect_io_instance->on_io_close_complete != NULL) 00053 { 00054 header_detect_io_instance->on_io_close_complete(header_detect_io_instance->on_io_close_complete_context); 00055 } 00056 } 00057 00058 static void on_underlying_io_error(void* context); 00059 static void on_send_complete_close(void* context, IO_SEND_RESULT send_result) 00060 { 00061 (void)send_result; 00062 on_underlying_io_error(context); 00063 } 00064 00065 static void on_underlying_io_bytes_received(void* context, const unsigned char* buffer, size_t size) 00066 { 00067 HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)context; 00068 00069 while (size > 0) 00070 { 00071 switch (header_detect_io_instance->io_state) 00072 { 00073 default: 00074 break; 00075 00076 case IO_STATE_WAIT_FOR_HEADER: 00077 if (amqp_header[header_detect_io_instance->header_pos] != buffer[0]) 00078 { 00079 /* Send expected header, then close as per spec. We do not care if we fail */ 00080 (void)xio_send(header_detect_io_instance->underlying_io, amqp_header, sizeof(amqp_header), on_send_complete_close, context); 00081 00082 header_detect_io_instance->io_state = IO_STATE_NOT_OPEN; 00083 indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR); 00084 size = 0; 00085 } 00086 else 00087 { 00088 header_detect_io_instance->header_pos++; 00089 size--; 00090 buffer++; 00091 if (header_detect_io_instance->header_pos == sizeof(amqp_header)) 00092 { 00093 if (xio_send(header_detect_io_instance->underlying_io, amqp_header, sizeof(amqp_header), NULL, NULL) != 0) 00094 { 00095 header_detect_io_instance->io_state = IO_STATE_NOT_OPEN; 00096 indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR); 00097 } 00098 else 00099 { 00100 header_detect_io_instance->io_state = IO_STATE_OPEN; 00101 indicate_open_complete(header_detect_io_instance, IO_OPEN_OK); 00102 } 00103 } 00104 } 00105 break; 00106 00107 case IO_STATE_OPEN: 00108 header_detect_io_instance->on_bytes_received(header_detect_io_instance->on_bytes_received_context, buffer, size); 00109 size = 0; 00110 break; 00111 } 00112 } 00113 } 00114 00115 static void on_underlying_io_close_complete(void* context) 00116 { 00117 HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)context; 00118 00119 switch (header_detect_io_instance->io_state) 00120 { 00121 default: 00122 break; 00123 00124 case IO_STATE_CLOSING: 00125 header_detect_io_instance->io_state = IO_STATE_NOT_OPEN; 00126 indicate_close_complete(header_detect_io_instance); 00127 break; 00128 00129 case IO_STATE_WAIT_FOR_HEADER: 00130 case IO_STATE_OPENING_UNDERLYING_IO: 00131 header_detect_io_instance->io_state = IO_STATE_NOT_OPEN; 00132 indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR); 00133 break; 00134 } 00135 } 00136 00137 static void on_underlying_io_open_complete(void* context, IO_OPEN_RESULT open_result) 00138 { 00139 HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)context; 00140 00141 switch (header_detect_io_instance->io_state) 00142 { 00143 default: 00144 break; 00145 00146 case IO_STATE_OPENING_UNDERLYING_IO: 00147 if (open_result == IO_OPEN_OK) 00148 { 00149 header_detect_io_instance->io_state = IO_STATE_WAIT_FOR_HEADER; 00150 } 00151 else 00152 { 00153 if (xio_close(header_detect_io_instance->underlying_io, on_underlying_io_close_complete, header_detect_io_instance) != 0) 00154 { 00155 header_detect_io_instance->io_state = IO_STATE_NOT_OPEN; 00156 indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR); 00157 } 00158 } 00159 00160 break; 00161 } 00162 } 00163 00164 static void on_underlying_io_error(void* context) 00165 { 00166 HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)context; 00167 00168 switch (header_detect_io_instance->io_state) 00169 { 00170 default: 00171 break; 00172 00173 case IO_STATE_WAIT_FOR_HEADER: 00174 case IO_STATE_OPENING_UNDERLYING_IO: 00175 header_detect_io_instance->io_state = IO_STATE_NOT_OPEN; 00176 indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR); 00177 break; 00178 00179 case IO_STATE_OPEN: 00180 header_detect_io_instance->io_state = IO_STATE_ERROR; 00181 indicate_error(header_detect_io_instance); 00182 break; 00183 } 00184 } 00185 00186 CONCRETE_IO_HANDLE headerdetectio_create(void* io_create_parameters) 00187 { 00188 HEADER_DETECT_IO_INSTANCE* result; 00189 00190 if (io_create_parameters == NULL) 00191 { 00192 result = NULL; 00193 } 00194 else 00195 { 00196 HEADERDETECTIO_CONFIG* header_detect_io_config = (HEADERDETECTIO_CONFIG*)io_create_parameters; 00197 result = (HEADER_DETECT_IO_INSTANCE*)amqpalloc_malloc(sizeof(HEADER_DETECT_IO_INSTANCE)); 00198 if (result != NULL) 00199 { 00200 result->underlying_io = header_detect_io_config->underlying_io; 00201 result->on_io_open_complete = NULL; 00202 result->on_io_close_complete = NULL; 00203 result->on_io_error = NULL; 00204 result->on_bytes_received = NULL; 00205 result->on_io_open_complete_context = NULL; 00206 result->on_io_close_complete_context = NULL; 00207 result->on_io_error_context = NULL; 00208 result->on_bytes_received_context = NULL; 00209 00210 result->io_state = IO_STATE_NOT_OPEN; 00211 } 00212 } 00213 00214 return result; 00215 } 00216 00217 void headerdetectio_destroy(CONCRETE_IO_HANDLE header_detect_io) 00218 { 00219 if (header_detect_io != NULL) 00220 { 00221 HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io; 00222 (void)headerdetectio_close(header_detect_io, NULL, NULL); 00223 xio_destroy(header_detect_io_instance->underlying_io); 00224 amqpalloc_free(header_detect_io); 00225 } 00226 } 00227 00228 int headerdetectio_open(CONCRETE_IO_HANDLE header_detect_io, ON_IO_OPEN_COMPLETE on_io_open_complete, void* on_io_open_complete_context, ON_BYTES_RECEIVED on_bytes_received, void* on_bytes_received_context, ON_IO_ERROR on_io_error, void* on_io_error_context) 00229 { 00230 int result; 00231 00232 if (header_detect_io == NULL) 00233 { 00234 result = __LINE__; 00235 } 00236 else 00237 { 00238 HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io; 00239 00240 if (header_detect_io_instance->io_state != IO_STATE_NOT_OPEN && 00241 header_detect_io_instance->io_state != IO_STATE_OPEN) 00242 { 00243 result = __LINE__; 00244 } 00245 else 00246 { 00247 header_detect_io_instance->on_bytes_received = on_bytes_received; 00248 header_detect_io_instance->on_io_open_complete = on_io_open_complete; 00249 header_detect_io_instance->on_io_error = on_io_error; 00250 header_detect_io_instance->on_bytes_received_context = on_bytes_received_context; 00251 header_detect_io_instance->on_io_open_complete_context = on_io_open_complete_context; 00252 header_detect_io_instance->on_io_error_context = on_io_error_context; 00253 00254 if (header_detect_io_instance->io_state == IO_STATE_OPEN) 00255 { 00256 indicate_open_complete(header_detect_io_instance, IO_OPEN_OK); 00257 result = 0; 00258 } 00259 else 00260 { 00261 header_detect_io_instance->header_pos = 0; 00262 header_detect_io_instance->io_state = IO_STATE_OPENING_UNDERLYING_IO; 00263 00264 if (xio_open(header_detect_io_instance->underlying_io, on_underlying_io_open_complete, header_detect_io_instance, on_underlying_io_bytes_received, header_detect_io_instance, on_underlying_io_error, header_detect_io_instance) != 0) 00265 { 00266 result = __LINE__; 00267 } 00268 else 00269 { 00270 result = 0; 00271 } 00272 } 00273 } 00274 } 00275 00276 return result; 00277 } 00278 00279 int headerdetectio_close(CONCRETE_IO_HANDLE header_detect_io, ON_IO_CLOSE_COMPLETE on_io_close_complete, void* callback_context) 00280 { 00281 int result; 00282 00283 if (header_detect_io == NULL) 00284 { 00285 result = __LINE__; 00286 } 00287 else 00288 { 00289 HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io; 00290 00291 if ((header_detect_io_instance->io_state == IO_STATE_NOT_OPEN) || 00292 (header_detect_io_instance->io_state == IO_STATE_CLOSING)) 00293 { 00294 result = __LINE__; 00295 } 00296 else 00297 { 00298 header_detect_io_instance->io_state = IO_STATE_CLOSING; 00299 header_detect_io_instance->on_io_close_complete = on_io_close_complete; 00300 header_detect_io_instance->on_io_close_complete_context = callback_context; 00301 00302 if (xio_close(header_detect_io_instance->underlying_io, on_underlying_io_close_complete, header_detect_io_instance) != 0) 00303 { 00304 result = __LINE__; 00305 } 00306 else 00307 { 00308 result = 0; 00309 } 00310 } 00311 } 00312 00313 return result; 00314 } 00315 00316 int headerdetectio_send(CONCRETE_IO_HANDLE header_detect_io, const void* buffer, size_t size, ON_SEND_COMPLETE on_send_complete, void* callback_context) 00317 { 00318 int result; 00319 00320 if (header_detect_io == NULL) 00321 { 00322 result = __LINE__; 00323 } 00324 else 00325 { 00326 HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io; 00327 00328 if (header_detect_io_instance->io_state != IO_STATE_OPEN) 00329 { 00330 result = __LINE__; 00331 } 00332 else 00333 { 00334 if (xio_send(header_detect_io_instance->underlying_io, buffer, size, on_send_complete, callback_context) != 0) 00335 { 00336 result = __LINE__; 00337 } 00338 else 00339 { 00340 result = 0; 00341 } 00342 } 00343 } 00344 00345 return result; 00346 } 00347 00348 void headerdetectio_dowork(CONCRETE_IO_HANDLE header_detect_io) 00349 { 00350 if (header_detect_io != NULL) 00351 { 00352 HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io; 00353 00354 if ((header_detect_io_instance->io_state != IO_STATE_NOT_OPEN) && 00355 (header_detect_io_instance->io_state != IO_STATE_ERROR)) 00356 { 00357 xio_dowork(header_detect_io_instance->underlying_io); 00358 } 00359 } 00360 } 00361 00362 int headerdetectio_setoption(CONCRETE_IO_HANDLE header_detect_io, const char* optionName, const void* value) 00363 { 00364 int result; 00365 00366 if (header_detect_io == NULL) 00367 { 00368 result = __LINE__; 00369 } 00370 else 00371 { 00372 HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io; 00373 00374 if (header_detect_io_instance->underlying_io == NULL) 00375 { 00376 result = __LINE__; 00377 } 00378 else 00379 { 00380 result = xio_setoption(header_detect_io_instance->underlying_io, optionName, value); 00381 } 00382 } 00383 00384 return result; 00385 } 00386 00387 /*this function will clone an option given by name and value*/ 00388 static void* headerdetectio_CloneOption(const char* name, const void* value) 00389 { 00390 (void)(name, value); 00391 return NULL; 00392 } 00393 00394 /*this function destroys an option previously created*/ 00395 static void headerdetectio_DestroyOption(const char* name, const void* value) 00396 { 00397 (void)(name, value); 00398 } 00399 00400 static OPTIONHANDLER_HANDLE headerdetectio_retrieveoptions(CONCRETE_IO_HANDLE handle) 00401 { 00402 OPTIONHANDLER_HANDLE result; 00403 (void)handle; 00404 result = OptionHandler_Create(headerdetectio_CloneOption, headerdetectio_DestroyOption, headerdetectio_setoption); 00405 if (result == NULL) 00406 { 00407 LogError("unable to OptionHandler_Create"); 00408 /*return as is*/ 00409 } 00410 else 00411 { 00412 /*insert here work to add the options to "result" handle*/ 00413 } 00414 return result; 00415 } 00416 00417 static const IO_INTERFACE_DESCRIPTION header_detect_io_interface_description = 00418 { 00419 headerdetectio_retrieveoptions, 00420 headerdetectio_create, 00421 headerdetectio_destroy, 00422 headerdetectio_open, 00423 headerdetectio_close, 00424 headerdetectio_send, 00425 headerdetectio_dowork, 00426 headerdetectio_setoption 00427 }; 00428 00429 const IO_INTERFACE_DESCRIPTION* headerdetectio_get_interface_description(void) 00430 { 00431 return &header_detect_io_interface_description; 00432 }
Generated on Tue Jul 12 2022 12:43:19 by
