Mark Radbourne / Mbed 2 deprecated iothub_client_sample_amqp

Dependencies:   EthernetInterface NTPClient iothub_amqp_transport iothub_client mbed-rtos mbed

Fork of iothub_client_sample_amqp by Azure IoT

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers header_detect_io.c Source File

header_detect_io.c

00001 // Copyright (c) Microsoft. All rights reserved.
00002 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
00003 
00004 #include "azure_uamqp_c/header_detect_io.h"
00005 #include "azure_uamqp_c/amqpalloc.h"
00006 
00007 typedef enum IO_STATE_TAG
00008 {
00009     IO_STATE_NOT_OPEN,
00010     IO_STATE_OPENING_UNDERLYING_IO,
00011     IO_STATE_WAIT_FOR_HEADER,
00012     IO_STATE_OPEN,
00013     IO_STATE_CLOSING,
00014     IO_STATE_ERROR
00015 } IO_STATE;
00016 
00017 typedef struct HEADER_DETECT_IO_INSTANCE_TAG
00018 {
00019     XIO_HANDLE underlying_io;
00020     size_t header_pos;
00021     IO_STATE io_state;
00022     ON_IO_OPEN_COMPLETE on_io_open_complete;
00023     ON_IO_CLOSE_COMPLETE on_io_close_complete;
00024     ON_IO_ERROR on_io_error;
00025     ON_BYTES_RECEIVED on_bytes_received;
00026     void* on_io_open_complete_context;
00027     void* on_io_close_complete_context;
00028     void* on_io_error_context;
00029     void* on_bytes_received_context;
00030 } HEADER_DETECT_IO_INSTANCE;
00031 
00032 static const unsigned char amqp_header[] = { 'A', 'M', 'Q', 'P', 0, 1, 0, 0 };
00033 
00034 static void indicate_error(HEADER_DETECT_IO_INSTANCE* header_detect_io_instance)
00035 {
00036     if (header_detect_io_instance->on_io_error != NULL)
00037     {
00038         header_detect_io_instance->on_io_error(header_detect_io_instance->on_io_error_context);
00039     }
00040 }
00041 
00042 static void indicate_open_complete(HEADER_DETECT_IO_INSTANCE* header_detect_io_instance, IO_OPEN_RESULT open_result)
00043 {
00044     if (header_detect_io_instance->on_io_open_complete != NULL)
00045     {
00046         header_detect_io_instance->on_io_open_complete(header_detect_io_instance->on_io_open_complete_context, open_result);
00047     }
00048 }
00049 
00050 static void indicate_close_complete(HEADER_DETECT_IO_INSTANCE* header_detect_io_instance)
00051 {
00052     if (header_detect_io_instance->on_io_close_complete != NULL)
00053     {
00054         header_detect_io_instance->on_io_close_complete(header_detect_io_instance->on_io_close_complete_context);
00055     }
00056 }
00057 
00058 static void on_underlying_io_error(void* context);
00059 static void on_send_complete_close(void* context, IO_SEND_RESULT send_result)
00060 {
00061     (void)send_result;
00062     on_underlying_io_error(context);
00063 }
00064 
00065 static void on_underlying_io_bytes_received(void* context, const unsigned char* buffer, size_t size)
00066 {
00067     HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)context;
00068 
00069     while (size > 0)
00070     {
00071         switch (header_detect_io_instance->io_state)
00072         {
00073         default:
00074             break;
00075 
00076         case IO_STATE_WAIT_FOR_HEADER:
00077             if (amqp_header[header_detect_io_instance->header_pos] != buffer[0])
00078             {
00079                 /* Send expected header, then close as per spec.  We do not care if we fail */
00080                 (void)xio_send(header_detect_io_instance->underlying_io, amqp_header, sizeof(amqp_header), on_send_complete_close, context);
00081 
00082                 header_detect_io_instance->io_state = IO_STATE_NOT_OPEN;
00083                 indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR);
00084                 size = 0;
00085             }
00086             else
00087             {
00088                 header_detect_io_instance->header_pos++;
00089                 size--;
00090                 buffer++;
00091                 if (header_detect_io_instance->header_pos == sizeof(amqp_header))
00092                 {
00093                     if (xio_send(header_detect_io_instance->underlying_io, amqp_header, sizeof(amqp_header), NULL, NULL) != 0)
00094                     {
00095                         header_detect_io_instance->io_state = IO_STATE_NOT_OPEN;
00096                         indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR);
00097                     }
00098                     else
00099                     {
00100                         header_detect_io_instance->io_state = IO_STATE_OPEN;
00101                         indicate_open_complete(header_detect_io_instance, IO_OPEN_OK);
00102                     }
00103                 }
00104             }
00105             break;
00106 
00107         case IO_STATE_OPEN:
00108             header_detect_io_instance->on_bytes_received(header_detect_io_instance->on_bytes_received_context, buffer, size);
00109             size = 0;
00110             break;
00111         }
00112     }
00113 }
00114 
00115 static void on_underlying_io_close_complete(void* context)
00116 {
00117     HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)context;
00118 
00119     switch (header_detect_io_instance->io_state)
00120     {
00121     default:
00122         break;
00123 
00124     case IO_STATE_CLOSING:
00125         header_detect_io_instance->io_state = IO_STATE_NOT_OPEN;
00126         indicate_close_complete(header_detect_io_instance);
00127         break;
00128 
00129     case IO_STATE_WAIT_FOR_HEADER:
00130     case IO_STATE_OPENING_UNDERLYING_IO:
00131         header_detect_io_instance->io_state = IO_STATE_NOT_OPEN;
00132         indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR);
00133         break;
00134     }
00135 }
00136 
00137 static void on_underlying_io_open_complete(void* context, IO_OPEN_RESULT open_result)
00138 {
00139     HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)context;
00140 
00141     switch (header_detect_io_instance->io_state)
00142     {
00143     default:
00144         break;
00145 
00146     case IO_STATE_OPENING_UNDERLYING_IO:
00147         if (open_result == IO_OPEN_OK)
00148         {
00149             header_detect_io_instance->io_state = IO_STATE_WAIT_FOR_HEADER;
00150         }
00151         else
00152         {
00153             if (xio_close(header_detect_io_instance->underlying_io, on_underlying_io_close_complete, header_detect_io_instance) != 0)
00154             {
00155                 header_detect_io_instance->io_state = IO_STATE_NOT_OPEN;
00156                 indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR);
00157             }
00158         }
00159 
00160         break;
00161     }
00162 }
00163 
00164 static void on_underlying_io_error(void* context)
00165 {
00166     HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)context;
00167 
00168     switch (header_detect_io_instance->io_state)
00169     {
00170     default:
00171         break;
00172 
00173     case IO_STATE_WAIT_FOR_HEADER:
00174     case IO_STATE_OPENING_UNDERLYING_IO:
00175         header_detect_io_instance->io_state = IO_STATE_NOT_OPEN;
00176         indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR);
00177         break;
00178 
00179     case IO_STATE_OPEN:
00180         header_detect_io_instance->io_state = IO_STATE_ERROR;
00181         indicate_error(header_detect_io_instance);
00182         break;
00183     }
00184 }
00185 
00186 CONCRETE_IO_HANDLE headerdetectio_create(void* io_create_parameters)
00187 {
00188     HEADER_DETECT_IO_INSTANCE* result;
00189 
00190     if (io_create_parameters == NULL)
00191     {
00192         result = NULL;
00193     }
00194     else
00195     {
00196         HEADERDETECTIO_CONFIG* header_detect_io_config = (HEADERDETECTIO_CONFIG*)io_create_parameters;
00197         result = (HEADER_DETECT_IO_INSTANCE*)amqpalloc_malloc(sizeof(HEADER_DETECT_IO_INSTANCE));
00198         if (result != NULL)
00199         {
00200             result->underlying_io = header_detect_io_config->underlying_io;
00201             result->on_io_open_complete = NULL;
00202             result->on_io_close_complete = NULL;
00203             result->on_io_error = NULL;
00204             result->on_bytes_received = NULL;
00205             result->on_io_open_complete_context = NULL;
00206             result->on_io_close_complete_context = NULL;
00207             result->on_io_error_context = NULL;
00208             result->on_bytes_received_context = NULL;
00209 
00210             result->io_state = IO_STATE_NOT_OPEN;
00211         }
00212     }
00213 
00214     return result;
00215 }
00216 
00217 void headerdetectio_destroy(CONCRETE_IO_HANDLE header_detect_io)
00218 {
00219     if (header_detect_io != NULL)
00220     {
00221         HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io;
00222         (void)headerdetectio_close(header_detect_io, NULL, NULL);
00223         xio_destroy(header_detect_io_instance->underlying_io);
00224         amqpalloc_free(header_detect_io);
00225     }
00226 }
00227 
00228 int headerdetectio_open(CONCRETE_IO_HANDLE header_detect_io, ON_IO_OPEN_COMPLETE on_io_open_complete, void* on_io_open_complete_context, ON_BYTES_RECEIVED on_bytes_received, void* on_bytes_received_context, ON_IO_ERROR on_io_error, void* on_io_error_context)
00229 {
00230     int result;
00231 
00232     if (header_detect_io == NULL)
00233     {
00234         result = __LINE__;
00235     }
00236     else
00237     {
00238         HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io;
00239 
00240         if (header_detect_io_instance->io_state != IO_STATE_NOT_OPEN &&
00241             header_detect_io_instance->io_state != IO_STATE_OPEN)
00242         {
00243             result = __LINE__;
00244         }
00245         else
00246         {
00247             header_detect_io_instance->on_bytes_received = on_bytes_received;
00248             header_detect_io_instance->on_io_open_complete = on_io_open_complete;
00249             header_detect_io_instance->on_io_error = on_io_error;
00250             header_detect_io_instance->on_bytes_received_context = on_bytes_received_context;
00251             header_detect_io_instance->on_io_open_complete_context = on_io_open_complete_context;
00252             header_detect_io_instance->on_io_error_context = on_io_error_context;
00253 
00254             if (header_detect_io_instance->io_state == IO_STATE_OPEN)
00255             {
00256                 indicate_open_complete(header_detect_io_instance, IO_OPEN_OK);
00257                 result = 0;
00258             }
00259             else
00260             {
00261                 header_detect_io_instance->header_pos = 0;
00262                 header_detect_io_instance->io_state = IO_STATE_OPENING_UNDERLYING_IO;
00263 
00264                 if (xio_open(header_detect_io_instance->underlying_io, on_underlying_io_open_complete, header_detect_io_instance, on_underlying_io_bytes_received, header_detect_io_instance, on_underlying_io_error, header_detect_io_instance) != 0)
00265                 {
00266                     result = __LINE__;
00267                 }
00268                 else
00269                 {
00270                     result = 0;
00271                 }
00272             }
00273         }
00274     }
00275 
00276     return result;
00277 }
00278 
00279 int headerdetectio_close(CONCRETE_IO_HANDLE header_detect_io, ON_IO_CLOSE_COMPLETE on_io_close_complete, void* callback_context)
00280 {
00281     int result;
00282 
00283     if (header_detect_io == NULL)
00284     {
00285         result = __LINE__;
00286     }
00287     else
00288     {
00289         HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io;
00290 
00291         if ((header_detect_io_instance->io_state == IO_STATE_NOT_OPEN) ||
00292             (header_detect_io_instance->io_state == IO_STATE_CLOSING))
00293         {
00294             result = __LINE__;
00295         }
00296         else
00297         {
00298             header_detect_io_instance->io_state = IO_STATE_CLOSING;
00299             header_detect_io_instance->on_io_close_complete = on_io_close_complete;
00300             header_detect_io_instance->on_io_close_complete_context = callback_context;
00301 
00302             if (xio_close(header_detect_io_instance->underlying_io, on_underlying_io_close_complete, header_detect_io_instance) != 0)
00303             {
00304                 result = __LINE__;
00305             }
00306             else
00307             {
00308                 result = 0;
00309             }
00310         }
00311     }
00312 
00313     return result;
00314 }
00315 
00316 int headerdetectio_send(CONCRETE_IO_HANDLE header_detect_io, const void* buffer, size_t size, ON_SEND_COMPLETE on_send_complete, void* callback_context)
00317 {
00318     int result;
00319 
00320     if (header_detect_io == NULL)
00321     {
00322         result = __LINE__;
00323     }
00324     else
00325     {
00326         HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io;
00327 
00328         if (header_detect_io_instance->io_state != IO_STATE_OPEN)
00329         {
00330             result = __LINE__;
00331         }
00332         else
00333         {
00334             if (xio_send(header_detect_io_instance->underlying_io, buffer, size, on_send_complete, callback_context) != 0)
00335             {
00336                 result = __LINE__;
00337             }
00338             else
00339             {
00340                 result = 0;
00341             }
00342         }
00343     }
00344 
00345     return result;
00346 }
00347 
00348 void headerdetectio_dowork(CONCRETE_IO_HANDLE header_detect_io)
00349 {
00350     if (header_detect_io != NULL)
00351     {
00352         HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io;
00353 
00354         if ((header_detect_io_instance->io_state != IO_STATE_NOT_OPEN) &&
00355             (header_detect_io_instance->io_state != IO_STATE_ERROR))
00356         {
00357             xio_dowork(header_detect_io_instance->underlying_io);
00358         }
00359     }
00360 }
00361 
00362 int headerdetectio_setoption(CONCRETE_IO_HANDLE header_detect_io, const char* optionName, const void* value)
00363 {
00364     int result;
00365 
00366     if (header_detect_io == NULL)
00367     {
00368         result = __LINE__;
00369     }
00370     else
00371     {
00372         HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io;
00373 
00374         if (header_detect_io_instance->underlying_io == NULL)
00375         {
00376             result = __LINE__;
00377         }
00378         else
00379         {
00380             result = xio_setoption(header_detect_io_instance->underlying_io, optionName, value);
00381         }
00382     }
00383 
00384     return result;
00385 }
00386 
00387 /*this function will clone an option given by name and value*/
00388 static void* headerdetectio_CloneOption(const char* name, const void* value)
00389 {
00390     (void)(name, value);
00391     return NULL;
00392 }
00393 
00394 /*this function destroys an option previously created*/
00395 static void headerdetectio_DestroyOption(const char* name, const void* value)
00396 {
00397     (void)(name, value);
00398 }
00399 
00400 static OPTIONHANDLER_HANDLE headerdetectio_retrieveoptions(CONCRETE_IO_HANDLE handle)
00401 {
00402     OPTIONHANDLER_HANDLE result;
00403     (void)handle;
00404     result = OptionHandler_Create(headerdetectio_CloneOption, headerdetectio_DestroyOption, headerdetectio_setoption);
00405     if (result == NULL)
00406     {
00407         LogError("unable to OptionHandler_Create");
00408         /*return as is*/
00409     }
00410     else
00411     {
00412         /*insert here work to add the options to "result" handle*/
00413     }
00414     return result;
00415 }
00416 
00417 static const IO_INTERFACE_DESCRIPTION header_detect_io_interface_description =
00418 {
00419     headerdetectio_retrieveoptions,
00420     headerdetectio_create,
00421     headerdetectio_destroy,
00422     headerdetectio_open,
00423     headerdetectio_close,
00424     headerdetectio_send,
00425     headerdetectio_dowork,
00426     headerdetectio_setoption
00427 };
00428 
00429 const IO_INTERFACE_DESCRIPTION* headerdetectio_get_interface_description(void)
00430 {
00431     return &header_detect_io_interface_description;
00432 }