Microsoft Azure IoTHub client AMQP transport
Dependents: sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more
This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks
Diff: iothubtransportamqp.c
- Revision:
- 11:62d7b956e76e
- Parent:
- 10:75c5e0d8537d
- Child:
- 12:841a4c36bd36
--- a/iothubtransportamqp.c Fri Mar 11 16:59:34 2016 -0800 +++ b/iothubtransportamqp.c Fri Mar 25 15:59:44 2016 -0700 @@ -53,9 +53,9 @@ #define MESSAGE_RECEIVER_MAX_LINK_SIZE 65536 #define MESSAGE_SENDER_LINK_NAME "sender-link" #define MESSAGE_SENDER_SOURCE_ADDRESS "ingress" -#define MESSAGE_SENDER_MAX_LINK_SIZE 65536 +#define MESSAGE_SENDER_MAX_LINK_SIZE UINT64_MAX -typedef XIO_HANDLE(*TLS_IO_TRANSPORT_PROVIDER)(const char* fqdn, int port, const char* certificates); +typedef XIO_HANDLE(*TLS_IO_TRANSPORT_PROVIDER)(const char* fqdn, int port); typedef enum CBS_STATE_TAG { @@ -70,8 +70,6 @@ STRING_HANDLE iotHubHostFqdn; // AMQP port of the IoT Hub. int iotHubPort; - // Certificates to be used by the TLS I/O. - char* trusted_certificates; // Key associated to the device to be used. STRING_HANDLE deviceKey; // Address to which the transport will connect to and send events. @@ -94,7 +92,7 @@ size_t connection_timeout; // Saved reference to the IoTHub LL Client. IOTHUB_CLIENT_LL_HANDLE iothub_client_handle; - + // TSL I/O transport. XIO_HANDLE tls_io; // Pointer to the function that creates the TLS I/O (internal use only). @@ -298,12 +296,12 @@ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_104: [The callback 'on_message_received' shall invoke IoTHubClient_LL_MessageCallback() passing the client and the incoming message handles as parameters] IOTHUB_MESSAGE_HANDLE iothub_message = NULL; MESSAGE_BODY_TYPE body_type; - + if (message_get_body_type(message, &body_type) != 0) { LogError("Failed to get the type of the message received by the transport.\r\n"); } - else + else { if (body_type == MESSAGE_BODY_TYPE_DATA) { @@ -318,7 +316,7 @@ } } } - + if (iothub_message == NULL) { disposition_result = IOTHUBMESSAGE_REJECTED; @@ -343,12 +341,13 @@ { result = messaging_delivery_rejected("Rejected by application", "Rejected by application"); } + IoTHubMessage_Destroy(iothub_message); } return result; } -static XIO_HANDLE getTLSIOTransport(const char* fqdn, int port, const char* certificates) +static XIO_HANDLE getTLSIOTransport(const char* fqdn, int port) { TLSIO_CONFIG tls_io_config = { fqdn, port }; const IO_INTERFACE_DESCRIPTION* io_interface_description = platform_get_default_tlsio(); @@ -399,7 +398,7 @@ { (void)previous_amqp_management_state; AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)context; - + if (transport_state != NULL) { transport_state->connection_state = new_amqp_management_state; @@ -412,7 +411,7 @@ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_110: [IoTHubTransportAMQP_DoWork shall create the TLS IO using transport_state->io_transport_provider callback function] if (transport_state->tls_io == NULL && - (transport_state->tls_io = transport_state->tls_io_transport_provider(STRING_c_str(transport_state->iotHubHostFqdn), transport_state->iotHubPort, transport_state->trusted_certificates)) == NULL) + (transport_state->tls_io = transport_state->tls_io_transport_provider(STRING_c_str(transport_state->iotHubHostFqdn), transport_state->iotHubPort)) == NULL) { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_136: [If transport_state->io_transport_provider_callback fails, IoTHubTransportAMQP_DoWork shall fail and return immediately] result = RESULT_FAILURE; @@ -499,8 +498,8 @@ int result; size_t sas_token_create_time = getSecondsSinceEpoch(); // I.e.: NOW, in seconds since epoch. - - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_083: [Each new SAS token created by the transport shall be valid for up to 'sas_token_lifetime' milliseconds from the time of creation] + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_083: [Each new SAS token created by the transport shall be valid for up to 'sas_token_lifetime' milliseconds from the time of creation] size_t new_expiry_time = sas_token_create_time + (transport_state->sas_token_lifetime / 1000); STRING_HANDLE newSASToken = SASToken_Create(transport_state->deviceKey, transport_state->devicesPath, transport_state->sasTokenKeyName, new_expiry_time); @@ -681,7 +680,7 @@ if (target != NULL) amqpvalue_destroy(target); } - + return result; } @@ -894,7 +893,7 @@ while ((message = getNextEventToSend(transport_state)) != NULL) { result = RESULT_FAILURE; - + IOTHUBMESSAGE_CONTENT_TYPE contentType = IoTHubMessage_GetContentType(message->messageHandle); const unsigned char* messageContent; size_t messageContentSize; @@ -908,7 +907,7 @@ LogError("Failed tracking the event to be sent.\r\n"); } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_087: [If the event contains a message of type IOTHUBMESSAGE_BYTEARRAY, IoTHubTransportAMQP_DoWork shall obtain its char* representation and size using IoTHubMessage_GetByteArray()] - else if (contentType == IOTHUBMESSAGE_BYTEARRAY && + else if (contentType == IOTHUBMESSAGE_BYTEARRAY && IoTHubMessage_GetByteArray(message->messageHandle, &messageContent, &messageContentSize) != IOTHUB_MESSAGE_OK) { LogError("Failed getting the BYTE array representation of the event content to be sent.\r\n"); @@ -944,7 +943,7 @@ binary_data.bytes = messageContent; binary_data.length = messageContentSize; - + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_095: [IoTHubTransportAMQP_DoWork shall set the AMQP message body using message_add_body_amqp_data() uAMQP API] if (message_add_body_amqp_data(amqp_message, binary_data) != RESULT_OK) { @@ -1021,7 +1020,7 @@ // API functions -static TRANSPORT_HANDLE IoTHubTransportAMQP_Create(const IOTHUBTRANSPORT_CONFIG* config) +static TRANSPORT_LL_HANDLE IoTHubTransportAMQP_Create(const IOTHUBTRANSPORT_CONFIG* config) { AMQP_TRANSPORT_INSTANCE* transport_state = NULL; bool cleanup_required = false; @@ -1088,7 +1087,6 @@ { transport_state->iotHubHostFqdn = NULL; transport_state->iotHubPort = DEFAULT_IOTHUB_AMQP_PORT; - transport_state->trusted_certificates = NULL; transport_state->deviceKey = NULL; transport_state->devicesPath = NULL; transport_state->messageReceiveAddress = NULL; @@ -1154,13 +1152,13 @@ } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_018: [IoTHubTransportAMQP_Create shall store a copy of config->deviceKey (passed by upper layer) into the transport's own deviceKey field] else if ((transport_state->deviceKey = STRING_new()) == NULL || - STRING_copy(transport_state->deviceKey, config->upperConfig->deviceKey) != 0) + STRING_copy(transport_state->deviceKey, config->upperConfig->deviceKey) != 0) { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_019: [If IoTHubTransportAMQP_Create fails to copy config->deviceKey, the function shall fail and return NULL.] LogError("Failed to allocate transport_state->deviceKey.\r\n"); cleanup_required = true; } - else + else { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_020: [IoTHubTransportAMQP_Create shall set parameter transport_state->sas_token_lifetime with the default value of 3600000 (milliseconds).] transport_state->sas_token_lifetime = DEFAULT_SAS_TOKEN_LIFETIME_MS; @@ -1200,12 +1198,12 @@ return transport_state; } -static void IoTHubTransportAMQP_Destroy(TRANSPORT_HANDLE handle) +static void IoTHubTransportAMQP_Destroy(TRANSPORT_LL_HANDLE handle) { if (handle != NULL) { AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)handle; - + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_024: [IoTHubTransportAMQP_Destroy shall destroy the AMQP message_sender.] // Codes_SRS_IOTHUBTRANSPORTAMQP_09_029 : [IoTHubTransportAMQP_Destroy shall destroy the AMQP link.] destroyEventSender(transport_state); @@ -1237,7 +1235,7 @@ } } -static void IoTHubTransportAMQP_DoWork(TRANSPORT_HANDLE handle, IOTHUB_CLIENT_LL_HANDLE iotHubClientHandle) +static void IoTHubTransportAMQP_DoWork(TRANSPORT_LL_HANDLE handle, IOTHUB_CLIENT_LL_HANDLE iotHubClientHandle) { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_051: [IoTHubTransportAMQP_DoWork shall fail and return immediately if the transport handle parameter is NULL] if (handle == NULL) @@ -1267,15 +1265,15 @@ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_081: [IoTHubTransportAMQP_DoWork shall put a new SAS token if the one has not been out already, or if the previous one failed to be put due to timeout of cbs_put_token().] // Codes_SRS_IOTHUBTRANSPORTAMQP_09_082: [IoTHubTransportAMQP_DoWork shall refresh the SAS token if the current token has been used for more than 'sas_token_refresh_time' milliseconds] else if ((transport_state->cbs_state == CBS_STATE_IDLE || isSasTokenRefreshRequired(transport_state)) && - startAuthentication(transport_state) != RESULT_OK) + startAuthentication(transport_state) != RESULT_OK) { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_146: [If the SAS token fails to be sent to CBS (cbs_put_token), IoTHubTransportAMQP_DoWork shall fail and exit immediately] LogError("Failed authenticating AMQP connection within CBS.\r\n"); trigger_connection_retry = true; } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_084: [IoTHubTransportAMQP_DoWork shall wait for 'cbs_request_timeout' milliseconds for the cbs_put_token() to complete before failing due to timeout] - else if (transport_state->cbs_state == CBS_STATE_AUTH_IN_PROGRESS && - verifyAuthenticationTimeout(transport_state) == RESULT_TIMEOUT) + else if (transport_state->cbs_state == CBS_STATE_AUTH_IN_PROGRESS && + verifyAuthenticationTimeout(transport_state) == RESULT_TIMEOUT) { LogError("AMQP transport authentication timed out.\r\n"); trigger_connection_retry = true; @@ -1327,7 +1325,7 @@ static int IoTHubTransportAMQP_Subscribe(IOTHUB_DEVICE_HANDLE handle) { int result; - + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_037: [IoTHubTransportAMQP_Subscribe shall fail if the transport handle parameter received is NULL.] if (handle == NULL) { @@ -1396,7 +1394,7 @@ return result; } -static IOTHUB_CLIENT_RESULT IoTHubTransportAMQP_SetOption(TRANSPORT_HANDLE handle, const char* option, const void* value) +static IOTHUB_CLIENT_RESULT IoTHubTransportAMQP_SetOption(TRANSPORT_LL_HANDLE handle, const char* option, const void* value) { IOTHUB_CLIENT_RESULT result; @@ -1416,13 +1414,8 @@ { AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)handle; - if (strcmp("trusted_certificates", option) == 0) - { - transport_state->trusted_certificates = (char*)value; - result = IOTHUB_CLIENT_OK; - } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_048: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "sas_token_lifetime", returning IOTHUB_CLIENT_OK] - else if (strcmp("sas_token_lifetime", option) == 0) + if (strcmp("sas_token_lifetime", option) == 0) { transport_state->sas_token_lifetime = *((size_t*)value); result = IOTHUB_CLIENT_OK; @@ -1445,23 +1438,40 @@ transport_state->message_send_timeout = *((size_t*)value); result = IOTHUB_CLIENT_OK; } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_047: [If optionName is not an option supported then IotHubTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_047: [If the option name does not match one of the options handled by this module, then IoTHubTransportAMQP_SetOption shall get the handle to the XIO and invoke the xio_setoption passing down the option name and value parameters.] else { - result = IOTHUB_CLIENT_INVALID_ARG; - LogError("Invalid option (%s) passed to uAMQP transport SetOption()\r\n", option); + if (transport_state->tls_io == NULL && + (transport_state->tls_io = transport_state->tls_io_transport_provider(STRING_c_str(transport_state->iotHubHostFqdn), transport_state->iotHubPort)) == NULL) + { + result = IOTHUB_CLIENT_ERROR; + LogError("Failed to obtain a TLS I/O transport layer.\r\n"); + } + else + { + /* Codes_SRS_IOTHUBTRANSPORTUAMQP_03_001: [If xio_setoption fails, IoTHubTransportAMQP_SetOption shall return IOTHUB_CLIENT_ERROR.] */ + if (xio_setoption(transport_state->tls_io, option, value) == 0) + { + result = IOTHUB_CLIENT_OK; + } + else + { + result = IOTHUB_CLIENT_ERROR; + LogError("Invalid option (%s) passed to uAMQP transport SetOption()\r\n", option); + } + } } } return result; } -static IOTHUB_DEVICE_HANDLE IoTHubTransportAMQ_Register(TRANSPORT_HANDLE handle, const char* deviceId, const char* deviceKey, IOTHUB_CLIENT_LL_HANDLE iotHubClientHandle, PDLIST_ENTRY waitingToSend) +static IOTHUB_DEVICE_HANDLE IoTHubTransportAMQ_Register(TRANSPORT_LL_HANDLE handle, const char* deviceId, const char* deviceKey, IOTHUB_CLIENT_LL_HANDLE iotHubClientHandle, PDLIST_ENTRY waitingToSend) { IOTHUB_DEVICE_HANDLE result; // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_001: [IoTHubTransportAMQ_Register shall return NULL if deviceId, deviceKey or waitingToSend are NULL.] - // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_005: [IoTHubTransportAMQ_Register shall return NULL if the TRANSPORT_HANDLE is NULL.] - if ((handle ==NULL) || (deviceId == NULL) || (deviceKey == NULL) || (waitingToSend == NULL)) + // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_005: [IoTHubTransportAMQ_Register shall return NULL if the TRANSPORT_LL_HANDLE is NULL.] + if ((handle == NULL) || (deviceId == NULL) || (deviceKey == NULL) || (waitingToSend == NULL)) { result = NULL; } @@ -1498,7 +1508,7 @@ else { transport_state->isRegistered = true; - // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_003: [IoTHubTransportAMQ_Register shall return the TRANSPORT_HANDLE as the IOTHUB_DEVICE_HANDLE.] + // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_003: [IoTHubTransportAMQ_Register shall return the TRANSPORT_LL_HANDLE as the IOTHUB_DEVICE_HANDLE.] result = (IOTHUB_DEVICE_HANDLE)handle; } } @@ -1521,14 +1531,14 @@ } static TRANSPORT_PROVIDER thisTransportProvider = { - IoTHubTransportAMQP_SetOption, - IoTHubTransportAMQP_Create, - IoTHubTransportAMQP_Destroy, + IoTHubTransportAMQP_SetOption, + IoTHubTransportAMQP_Create, + IoTHubTransportAMQP_Destroy, IoTHubTransportAMQ_Register, IoTHubTransportAMQ_Unregister, - IoTHubTransportAMQP_Subscribe, - IoTHubTransportAMQP_Unsubscribe, - IoTHubTransportAMQP_DoWork, + IoTHubTransportAMQP_Subscribe, + IoTHubTransportAMQP_Unsubscribe, + IoTHubTransportAMQP_DoWork, IoTHubTransportAMQP_GetSendStatus };