Microsoft Azure IoTHub client AMQP transport
Dependents: sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more
This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks
Diff: iothubtransportamqp.c
- Revision:
- 20:8dec76e7ba34
- Parent:
- 19:ea016664011a
- Child:
- 21:32a1746384ba
--- a/iothubtransportamqp.c Mon Jul 18 16:44:19 2016 -0700 +++ b/iothubtransportamqp.c Fri Jul 29 15:52:42 2016 -0700 @@ -79,7 +79,7 @@ { // Key associated to the device to be used. STRING_HANDLE deviceKey; - + // SAS associated to the device to be used. STRING_HANDLE deviceSasToken; @@ -93,6 +93,31 @@ AMQP_TRANSPORT_CREDENTIAL_UNION credential; }AMQP_TRANSPORT_CREDENTIAL; +/*the below structure contains fields that are only used with CBS (when authentication mechanisn is based on deviceKey or deviceSasToken)*/ +/*these fields are mutually exclusive with the fields of the AMQP_TRANSPORT_STATE_X509 authentication*/ +typedef struct AMQP_TRANSPORT_STATE_CBS_TAG +{ + // A component of the SAS token. Currently this must be an empty string. + STRING_HANDLE sasTokenKeyName; + size_t sas_token_lifetime; + // Maximum period of time for the transport to wait before refreshing the SAS token it created previously, in milliseconds. + size_t sas_token_refresh_time; + // Maximum time the transport waits for uAMQP cbs_put_token() to complete before marking it a failure, in milliseconds. + size_t cbs_request_timeout; + + // AMQP SASL I/O transport created on top of the TLS I/O layer. + XIO_HANDLE sasl_io; + // AMQP SASL I/O mechanism to be used. + SASL_MECHANISM_HANDLE sasl_mechanism; + + // Connection instance with the Azure IoT CBS. + CBS_HANDLE cbs; + // Current state of the CBS connection. + CBS_STATE cbs_state; + // Time when the current SAS token was created, in seconds since epoch. + size_t current_sas_token_create_time; +}AMQP_TRANSPORT_STATE_CBS; + typedef struct AMQP_TRANSPORT_STATE_TAG { // FQDN of the IoT Hub. @@ -105,16 +130,11 @@ STRING_HANDLE targetAddress; // Address to which the transport will connect to and receive messages from. STRING_HANDLE messageReceiveAddress; - // A component of the SAS token. Currently this must be an empty string. - STRING_HANDLE sasTokenKeyName; + // Internal parameter that identifies the current logical device within the service. STRING_HANDLE devicesPath; // How long a SAS token created by the transport is valid, in milliseconds. - size_t sas_token_lifetime; - // Maximum period of time for the transport to wait before refreshing the SAS token it created previously, in milliseconds. - size_t sas_token_refresh_time; - // Maximum time the transport waits for uAMQP cbs_put_token() to complete before marking it a failure, in milliseconds. - size_t cbs_request_timeout; + // Maximum time for the connection establishment/retry logic should wait for a connection to succeed, in milliseconds. size_t connection_timeout; // Saved reference to the IoTHub LL Client. @@ -124,10 +144,7 @@ XIO_HANDLE tls_io; // Pointer to the function that creates the TLS I/O (internal use only). TLS_IO_TRANSPORT_PROVIDER tls_io_transport_provider; - // AMQP SASL I/O transport created on top of the TLS I/O layer. - XIO_HANDLE sasl_io; - // AMQP SASL I/O mechanism to be used. - SASL_MECHANISM_HANDLE sasl_mechanism; + // AMQP connection. CONNECTION_HANDLE connection; // Current AMQP connection state; @@ -150,16 +167,17 @@ PDLIST_ENTRY waitingToSend; // Internal list with the items currently being processed/sent through uAMQP. DLIST_ENTRY inProgress; - // Connection instance with the Azure IoT CBS. - CBS_HANDLE cbs; - // Current state of the CBS connection. - CBS_STATE cbs_state; - // Time when the current SAS token was created, in seconds since epoch. - size_t current_sas_token_create_time; + + // all things CBS (and only CBS) + AMQP_TRANSPORT_STATE_CBS cbs; + // Mark if device is registered in transport (only one device per transport). bool isRegistered; // Turns logging on and off bool is_trace_on; + + /*here are the options from the xio layer if any is saved*/ + OPTIONHANDLER_HANDLE xioOptions; } AMQP_TRANSPORT_INSTANCE; @@ -363,18 +381,18 @@ /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_155: [uAMQP message properties shall be retrieved using message_get_properties.] */ if ((api_call_result = message_get_properties(uamqp_message, &uamqp_message_properties)) != 0) { - /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_156: [If message_get_properties fails, the error shall be notified and on_message_received' shall continue.] */ + /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_156: [If message_get_properties fails, the error shall be notified and 'on_message_received' shall continue.] */ LogError("Failed to get property properties map from uAMQP message (error code %d).", api_call_result); return_value = __LINE__; } else { return_value = 0; // Properties 'message-id' and 'correlation-id' are optional according to the AMQP 1.0 spec. - + /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_157: [The message-id property shall be read from the uAMQP message by calling properties_get_message_id.] */ if ((api_call_result = properties_get_message_id(uamqp_message_properties, &uamqp_message_property)) != 0) { - /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_158: [If properties_get_message_id fails, the error shall be notified and on_message_received' shall continue.] */ + /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_158: [If properties_get_message_id fails, the error shall be notified and 'on_message_received' shall continue.] */ LogInfo("Failed to get value of uAMQP message 'message-id' property (%d).", api_call_result); return_value = __LINE__; } @@ -383,14 +401,14 @@ /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_159: [The message-id value shall be retrieved from the AMQP_VALUE as char* by calling amqpvalue_get_string.] */ if ((api_call_result = amqpvalue_get_string(uamqp_message_property, &uamqp_message_property_value)) != 0) { - /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_160: [If amqpvalue_get_string fails, the error shall be notified and on_message_received' shall continue.] */ + /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_160: [If amqpvalue_get_string fails, the error shall be notified and 'on_message_received' shall continue.] */ LogError("Failed to get value of uAMQP message 'message-id' property (%d).", api_call_result); return_value = __LINE__; } /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_161: [The message-id property shall be set on the IOTHUB_MESSAGE_HANDLE by calling IoTHubMessage_SetMessageId, passing the value read from the uAMQP message.] */ else if (IoTHubMessage_SetMessageId(iothub_message_handle, uamqp_message_property_value) != IOTHUB_MESSAGE_OK) { - /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_162: [If IoTHubMessage_SetMessageId fails, the error shall be notified and on_message_received' shall continue.] */ + /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_162: [If IoTHubMessage_SetMessageId fails, the error shall be notified and 'on_message_received' shall continue.] */ LogError("Failed to set IOTHUB_MESSAGE_HANDLE 'message-id' property."); return_value = __LINE__; } @@ -399,7 +417,7 @@ /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_163: [The correlation-id property shall be read from the uAMQP message by calling properties_get_correlation_id.] */ if ((api_call_result = properties_get_correlation_id(uamqp_message_properties, &uamqp_message_property)) != 0) { - /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_164: [If properties_get_correlation_id fails, the error shall be notified and on_message_received' shall continue.] */ + /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_164: [If properties_get_correlation_id fails, the error shall be notified and 'on_message_received' shall continue.] */ LogError("Failed to get value of uAMQP message 'correlation-id' property (%d).", api_call_result); return_value = __LINE__; } @@ -408,14 +426,14 @@ /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_165: [The correlation-id value shall be retrieved from the AMQP_VALUE as char* by calling amqpvalue_get_string.] */ if ((api_call_result = amqpvalue_get_string(uamqp_message_property, &uamqp_message_property_value)) != 0) { - /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_166: [If amqpvalue_get_string fails, the error shall be notified and on_message_received' shall continue.] */ + /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_166: [If amqpvalue_get_string fails, the error shall be notified and 'on_message_received' shall continue.] */ LogError("Failed to get value of uAMQP message 'correlation-id' property (%d).", api_call_result); return_value = __LINE__; } /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_167: [The correlation-id property shall be set on the IOTHUB_MESSAGE_HANDLE by calling IoTHubMessage_SetCorrelationId, passing the value read from the uAMQP message.] */ else if (IoTHubMessage_SetCorrelationId(iothub_message_handle, uamqp_message_property_value) != IOTHUB_MESSAGE_OK) { - /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_168: [If IoTHubMessage_SetCorrelationId fails, the error shall be notified and on_message_received' shall continue.] */ + /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_168: [If IoTHubMessage_SetCorrelationId fails, the error shall be notified and 'on_message_received' shall continue.] */ LogError("Failed to set IOTHUB_MESSAGE_HANDLE 'correlation-id' property."); return_value = __LINE__; } @@ -429,87 +447,94 @@ static int readApplicationPropertiesFromuAMQPMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, MESSAGE_HANDLE uamqp_message) { int result; - AMQP_VALUE uamqp_app_properties; + AMQP_VALUE uamqp_app_properties = NULL; uint32_t property_count; MAP_HANDLE iothub_message_properties_map; // Codes_SRS_IOTHUBTRANSPORTAMQP_09_170: [The IOTHUB_MESSAGE_HANDLE properties shall be retrieved using IoTHubMessage_Properties.] if ((iothub_message_properties_map = IoTHubMessage_Properties(iothub_message_handle)) == NULL) { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_186: [If IoTHubMessage_Properties fails, the error shall be notified and on_message_received' shall continue.] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_186: [If IoTHubMessage_Properties fails, the error shall be notified and 'on_message_received' shall continue.] LogError("Failed to get property map from IoTHub message."); result = __LINE__; } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_171: [uAMQP message application properties shall be retrieved using message_get_application_properties.] else if ((result = message_get_application_properties(uamqp_message, &uamqp_app_properties)) != 0) { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_172: [If message_get_application_properties fails, the error shall be notified and on_message_received' shall continue.] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_172: [If message_get_application_properties fails, the error shall be notified and 'on_message_received' shall continue.] LogError("Failed reading the incoming uAMQP message properties (return code %d).", result); result = __LINE__; } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_187: [If message_get_application_properties succeeds but returns a NULL application properties map (there are no properties), on_message_received' shall continue normally.] - else if (uamqp_app_properties == NULL) - { - result = 0; - } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_173: [The actual uAMQP message application properties should be extracted from the result of message_get_application_properties using amqpvalue_get_inplace_described_value.] - else if ((uamqp_app_properties = amqpvalue_get_inplace_described_value(uamqp_app_properties)) == NULL) - { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_174: [If amqpvalue_get_inplace_described_value fails, the error shall be notified and on_message_received' shall continue.] - LogError("Failed getting the map of uAMQP message application properties (return code %d).", result); - result = __LINE__; - } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_175: [The number of items in the uAMQP message application properties shall be obtained using amqpvalue_get_map_pair_count.] - else if ((result = amqpvalue_get_map_pair_count(uamqp_app_properties, &property_count)) != 0) + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_187: [If message_get_application_properties succeeds but returns a NULL application properties map (there are no properties), 'on_message_received' shall continue normally.] + else { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_176: [If amqpvalue_get_map_pair_count fails, the error shall be notified and on_message_received' shall continue.] - LogError("Failed reading the number of values in the uAMQP property map (return code %d).", result); - result = __LINE__; - } - else - { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_177: [on_message_received' shall iterate through each uAMQP application property and add it on IOTHUB_MESSAGE_HANDLE properties.] - uint32_t i; - for (i = 0; i < property_count; i++) + if (uamqp_app_properties == NULL) + { + result = 0; + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_173: [The actual uAMQP message application properties should be extracted from the result of message_get_application_properties using amqpvalue_get_inplace_described_value.] + else if ((uamqp_app_properties = amqpvalue_get_inplace_described_value(uamqp_app_properties)) == NULL) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_174: [If amqpvalue_get_inplace_described_value fails, the error shall be notified and 'on_message_received' shall continue.] + LogError("Failed getting the map of uAMQP message application properties (return code %d).", result); + result = __LINE__; + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_175: [The number of items in the uAMQP message application properties shall be obtained using amqpvalue_get_map_pair_count.] + else if ((result = amqpvalue_get_map_pair_count(uamqp_app_properties, &property_count)) != 0) { - AMQP_VALUE map_key_name; - AMQP_VALUE map_key_value; - const char *key_name; - const char* key_value; + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_176: [If amqpvalue_get_map_pair_count fails, the error shall be notified and 'on_message_received' shall continue.] + LogError("Failed reading the number of values in the uAMQP property map (return code %d).", result); + result = __LINE__; + } + else + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_177: ['on_message_received' shall iterate through each uAMQP application property and add it on IOTHUB_MESSAGE_HANDLE properties.] + uint32_t i; + for (i = 0; i < property_count; i++) + { + AMQP_VALUE map_key_name = NULL; + AMQP_VALUE map_key_value = NULL; + const char *key_name; + const char* key_value; - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_178: [The uAMQP application property name and value shall be obtained using amqpvalue_get_map_key_value_pair.] - if ((result = amqpvalue_get_map_key_value_pair(uamqp_app_properties, i, &map_key_name, &map_key_value)) != 0) - { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_179: [If amqpvalue_get_map_key_value_pair fails, the error shall be notified and on_message_received' shall continue.] - LogError("Failed reading the key/value pair from the uAMQP property map (return code %d).", result); - result = __LINE__; - break; - } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_180: [The uAMQP application property name shall be extracted as string using amqpvalue_get_string.] - else if ((result = amqpvalue_get_string(map_key_name, &key_name)) != 0) - { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_181: [If amqpvalue_get_string fails, the error shall be notified and on_message_received' shall continue.] - LogError("Failed parsing the uAMQP property name (return code %d).", result); - result = __LINE__; - break; - } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_182: [The uAMQP application property value shall be extracted as string using amqpvalue_get_string.] - else if ((result = amqpvalue_get_string(map_key_value, &key_value)) != 0) - { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_183: [If amqpvalue_get_string fails, the error shall be notified and on_message_received' shall continue.] - LogError("Failed parsing the uAMQP property value (return code %d).", result); - result = __LINE__; - break; - } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_184: [The application property name and value shall be added to IOTHUB_MESSAGE_HANDLE properties using Map_AddOrUpdate.] - else if (Map_AddOrUpdate(iothub_message_properties_map, key_name, key_value) != MAP_OK) - { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_185: [If Map_AddOrUpdate fails, the error shall be notified and on_message_received' shall continue.] - LogError("Failed to add/update IoTHub message property map."); - result = __LINE__; - break; + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_178: [The uAMQP application property name and value shall be obtained using amqpvalue_get_map_key_value_pair.] + if ((result = amqpvalue_get_map_key_value_pair(uamqp_app_properties, i, &map_key_name, &map_key_value)) != 0) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_179: [If amqpvalue_get_map_key_value_pair fails, the error shall be notified and 'on_message_received' shall continue.] + LogError("Failed reading the key/value pair from the uAMQP property map (return code %d).", result); + result = __LINE__; + break; + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_180: [The uAMQP application property name shall be extracted as string using amqpvalue_get_string.] + else if ((result = amqpvalue_get_string(map_key_name, &key_name)) != 0) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_181: [If amqpvalue_get_string fails, the error shall be notified and 'on_message_received' shall continue.] + LogError("Failed parsing the uAMQP property name (return code %d).", result); + result = __LINE__; + break; + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_182: [The uAMQP application property value shall be extracted as string using amqpvalue_get_string.] + else if ((result = amqpvalue_get_string(map_key_value, &key_value)) != 0) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_183: [If amqpvalue_get_string fails, the error shall be notified and 'on_message_received' shall continue.] + LogError("Failed parsing the uAMQP property value (return code %d).", result); + result = __LINE__; + break; + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_184: [The application property name and value shall be added to IOTHUB_MESSAGE_HANDLE properties using Map_AddOrUpdate.] + else if (Map_AddOrUpdate(iothub_message_properties_map, key_name, key_value) != MAP_OK) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_185: [If Map_AddOrUpdate fails, the error shall be notified and 'on_message_received' shall continue.] + LogError("Failed to add/update IoTHub message property map."); + result = __LINE__; + break; + } + amqpvalue_destroy(map_key_name); + amqpvalue_destroy(map_key_value); + } } + amqpvalue_destroy(uamqp_app_properties); } return result; @@ -557,7 +582,7 @@ if (operation_result == CBS_OPERATION_RESULT_OK) { - transportState->cbs_state = CBS_STATE_AUTHENTICATED; + transportState->cbs.cbs_state = CBS_STATE_AUTHENTICATED; } else { @@ -601,42 +626,42 @@ } else { - /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_153: [The callback on_message_received' shall read the message-id property from the uAMQP message and set it on the IoT Hub Message if the property is defined.] */ - /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_154: [The callback on_message_received' shall read the correlation-id property from the uAMQP message and set it on the IoT Hub Message if the property is defined.] */ + /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_153: [The callback 'on_message_received' shall read the message-id property from the uAMQP message and set it on the IoT Hub Message if the property is defined.] */ + /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_154: [The callback 'on_message_received' shall read the correlation-id property from the uAMQP message and set it on the IoT Hub Message if the property is defined.] */ if (readPropertiesFromuAMQPMessage(iothub_message, message) != 0) { LogError("Transport failed reading properties of the message received."); } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_169: [The callback on_message_received' shall read the application properties from the uAMQP message and set it on the IoT Hub Message if any are provided.] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_169: [The callback 'on_message_received' shall read the application properties from the uAMQP message and set it on the IoT Hub Message if any are provided.] if (readApplicationPropertiesFromuAMQPMessage(iothub_message, message) != 0) { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_188: [If on_message_received' fails reading the application properties from the uAMQP message, it shall NOT call IoTHubClient_LL_MessageCallback and shall reject the message.] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_188: [If 'on_message_received' fails reading the application properties from the uAMQP message, it shall NOT call IoTHubClient_LL_MessageCallback and shall reject the message.] LogError("Transport failed reading application properties of the message received."); - + result = messaging_delivery_rejected("Rejected due to failure reading AMQP message", "Failed reading application properties"); } else { IOTHUBMESSAGE_DISPOSITION_RESULT disposition_result; - disposition_result = IoTHubClient_LL_MessageCallback((IOTHUB_CLIENT_LL_HANDLE)context, iothub_message); + disposition_result = IoTHubClient_LL_MessageCallback((IOTHUB_CLIENT_LL_HANDLE)context, iothub_message); - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_105: [The callback 'on_message_received' shall return the result of messaging_delivery_accepted() if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_ACCEPTED] - if (disposition_result == IOTHUBMESSAGE_ACCEPTED) - { - result = messaging_delivery_accepted(); - } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_106: [The callback 'on_message_received' shall return the result of messaging_delivery_released() if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_ABANDONED] - else if (disposition_result == IOTHUBMESSAGE_ABANDONED) - { - result = messaging_delivery_released(); - } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_107: [The callback 'on_message_received' shall return the result of messaging_delivery_rejected("Rejected by application", "Rejected by application") if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_REJECTED] - else if (disposition_result == IOTHUBMESSAGE_REJECTED) - { - result = messaging_delivery_rejected("Rejected by application", "Rejected by application"); - } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_105: [The callback 'on_message_received' shall return the result of messaging_delivery_accepted() if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_ACCEPTED] + if (disposition_result == IOTHUBMESSAGE_ACCEPTED) + { + result = messaging_delivery_accepted(); + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_106: [The callback 'on_message_received' shall return the result of messaging_delivery_released() if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_ABANDONED] + else if (disposition_result == IOTHUBMESSAGE_ABANDONED) + { + result = messaging_delivery_released(); + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_107: [The callback 'on_message_received' shall return the result of messaging_delivery_rejected("Rejected by application", "Rejected by application") if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_REJECTED] + else if (disposition_result == IOTHUBMESSAGE_REJECTED) + { + result = messaging_delivery_rejected("Rejected by application", "Rejected by application"); + } } IoTHubMessage_Destroy(iothub_message); @@ -647,19 +672,30 @@ static XIO_HANDLE getTLSIOTransport(const char* fqdn, int port) { + XIO_HANDLE result; TLSIO_CONFIG tls_io_config; tls_io_config.hostname = fqdn; tls_io_config.port = port; const IO_INTERFACE_DESCRIPTION* io_interface_description = platform_get_default_tlsio(); - return xio_create(io_interface_description, &tls_io_config); + result = xio_create(io_interface_description, &tls_io_config); + if (result == NULL) + { + LogError("unable to xio_create"); + /*return as is*/ + } + else + { + /*also return as is*/ + } + return result; } static void destroyConnection(AMQP_TRANSPORT_INSTANCE* transport_state) { - if (transport_state->cbs != NULL) + if (transport_state->cbs.cbs != NULL) { - cbs_destroy(transport_state->cbs); - transport_state->cbs = NULL; + cbs_destroy(transport_state->cbs.cbs); + transport_state->cbs.cbs = NULL; } if (transport_state->session != NULL) @@ -674,20 +710,26 @@ transport_state->connection = NULL; } - if (transport_state->sasl_io != NULL) + if (transport_state->cbs.sasl_io != NULL) { - xio_destroy(transport_state->sasl_io); - transport_state->sasl_io = NULL; + xio_destroy(transport_state->cbs.sasl_io); + transport_state->cbs.sasl_io = NULL; } - if (transport_state->sasl_mechanism != NULL) + if (transport_state->cbs.sasl_mechanism != NULL) { - saslmechanism_destroy(transport_state->sasl_mechanism); - transport_state->sasl_mechanism = NULL; + saslmechanism_destroy(transport_state->cbs.sasl_mechanism); + transport_state->cbs.sasl_mechanism = NULL; } if (transport_state->tls_io != NULL) { + /*before destroying, we shall save its options for later use*/ + transport_state->xioOptions = xio_retrieveoptions(transport_state->tls_io); + if (transport_state->xioOptions == NULL) + { + LogError("unable to retrieve xio_retrieveoptions"); + } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_034: [IoTHubTransportAMQP_Destroy shall destroy the AMQP TLS I/O transport.] xio_destroy(transport_state->tls_io); transport_state->tls_io = NULL; @@ -727,76 +769,149 @@ result = RESULT_FAILURE; LogError("Failed to obtain a TLS I/O transport layer."); } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_056: [IoTHubTransportAMQP_DoWork shall create the SASL mechanism using AMQP's saslmechanism_create() API] - else if ((transport_state->sasl_mechanism = saslmechanism_create(saslmssbcbs_get_interface(), NULL)) == NULL) - { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_057: [If saslmechanism_create() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately] - result = RESULT_FAILURE; - LogError("Failed to create a SASL mechanism."); - } else { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_060: [IoTHubTransportAMQP_DoWork shall create the SASL I / O layer using the xio_create() C Shared Utility API] - SASLCLIENTIO_CONFIG sasl_client_config; - sasl_client_config.sasl_mechanism = transport_state->sasl_mechanism; - sasl_client_config.underlying_io = transport_state->tls_io; - if ((transport_state->sasl_io = xio_create(saslclientio_get_interface_description(), &sasl_client_config)) == NULL) - { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_061: [If xio_create() fails creating the SASL I/O layer, IoTHubTransportAMQP_DoWork shall fail and return immediately] - result = RESULT_FAILURE; - LogError("Failed to create a SASL I/O layer."); - } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_062: [IoTHubTransportAMQP_DoWork shall create the connection with the IoT service using connection_create2() AMQP API, passing the SASL I/O layer, IoT Hub FQDN and container ID as parameters (pass NULL for callbacks)] - else if ((transport_state->connection = connection_create2(transport_state->sasl_io, STRING_c_str(transport_state->iotHubHostFqdn), DEFAULT_CONTAINER_ID, NULL, NULL, NULL, NULL, on_connection_io_error, (void*)transport_state)) == NULL) - { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_063: [If connection_create2() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately.] - result = RESULT_FAILURE; - LogError("Failed to create the AMQP connection."); - } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_137: [IoTHubTransportAMQP_DoWork shall create the AMQP session session_create() AMQP API, passing the connection instance as parameter] - else if ((transport_state->session = session_create(transport_state->connection, NULL, NULL)) == NULL) - { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_138 : [If session_create() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately] - result = RESULT_FAILURE; - LogError("Failed to create the AMQP session."); - } - else + /*in the case when a tls_io_transport has been created, replay its options*/ + if (transport_state->xioOptions != NULL) { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_065: [IoTHubTransportAMQP_DoWork shall apply a default value of UINT_MAX for the parameter 'AMQP incoming window'] - if (session_set_incoming_window(transport_state->session, (uint32_t)DEFAULT_INCOMING_WINDOW_SIZE) != 0) - { - LogError("Failed to set the AMQP incoming window size."); - } - - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_115: [IoTHubTransportAMQP_DoWork shall apply a default value of 100 for the parameter 'AMQP outgoing window'] - if (session_set_outgoing_window(transport_state->session, DEFAULT_OUTGOING_WINDOW_SIZE) != 0) + if (OptionHandler_FeedOptions(transport_state->xioOptions, transport_state->tls_io) != 0) { - LogError("Failed to set the AMQP outgoing window size."); - } - - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_066: [IoTHubTransportAMQP_DoWork shall establish the CBS connection using the cbs_create() AMQP API] - if ((transport_state->cbs = cbs_create(transport_state->session, on_amqp_management_state_changed, NULL)) == NULL) - { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_067: [If cbs_create() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately] - result = RESULT_FAILURE; - LogError("Failed to create the CBS connection."); - } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_139: [IoTHubTransportAMQP_DoWork shall open the CBS connection using the cbs_open() AMQP API] - else if (cbs_open(transport_state->cbs) != 0) - { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_140: [If cbs_open() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately] - result = RESULT_FAILURE; - LogError("Failed to open the connection with CBS."); + LogError("unable to replay options to TLS"); /*pessimistically hope TLS will fail, be recreated and options re-given*/ } else { - transport_state->connection_establish_time = getSecondsSinceEpoch(); - transport_state->cbs_state = CBS_STATE_IDLE; - connection_set_trace(transport_state->connection, transport_state->is_trace_on); - (void)xio_setoption(transport_state->sasl_io, "logtrace", &transport_state->is_trace_on); - result = RESULT_OK; + /*everything is fine, forget the saved options...*/ + OptionHandler_Destroy(transport_state->xioOptions); + transport_state->xioOptions = NULL; } } + + switch (transport_state->credential.credentialType) + { + case (DEVICE_KEY): + case (DEVICE_SAS_TOKEN): + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_056: [IoTHubTransportAMQP_DoWork shall create the SASL mechanism using AMQP's saslmechanism_create() API] + if ((transport_state->cbs.sasl_mechanism = saslmechanism_create(saslmssbcbs_get_interface(), NULL)) == NULL) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_057: [If saslmechanism_create() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately] + result = RESULT_FAILURE; + LogError("Failed to create a SASL mechanism."); + } + else + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_060: [IoTHubTransportAMQP_DoWork shall create the SASL I / O layer using the xio_create() C Shared Utility API] + SASLCLIENTIO_CONFIG sasl_client_config; + sasl_client_config.sasl_mechanism = transport_state->cbs.sasl_mechanism; + sasl_client_config.underlying_io = transport_state->tls_io; + if ((transport_state->cbs.sasl_io = xio_create(saslclientio_get_interface_description(), &sasl_client_config)) == NULL) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_061: [If xio_create() fails creating the SASL I/O layer, IoTHubTransportAMQP_DoWork shall fail and return immediately] + result = RESULT_FAILURE; + LogError("Failed to create a SASL I/O layer."); + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_062: [IoTHubTransportAMQP_DoWork shall create the connection with the IoT service using connection_create2() AMQP API, passing the SASL I/O layer, IoT Hub FQDN and container ID as parameters (pass NULL for callbacks)] + else if ((transport_state->connection = connection_create2(transport_state->cbs.sasl_io, STRING_c_str(transport_state->iotHubHostFqdn), DEFAULT_CONTAINER_ID, NULL, NULL, NULL, NULL, on_connection_io_error, (void*)transport_state)) == NULL) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_063: [If connection_create2() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately.] + result = RESULT_FAILURE; + LogError("Failed to create the AMQP connection."); + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_137: [IoTHubTransportAMQP_DoWork shall create the AMQP session session_create() AMQP API, passing the connection instance as parameter] + else if ((transport_state->session = session_create(transport_state->connection, NULL, NULL)) == NULL) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_138 : [If session_create() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately] + result = RESULT_FAILURE; + LogError("Failed to create the AMQP session."); + } + else + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_065: [IoTHubTransportAMQP_DoWork shall apply a default value of UINT_MAX for the parameter 'AMQP incoming window'] + if (session_set_incoming_window(transport_state->session, (uint32_t)DEFAULT_INCOMING_WINDOW_SIZE) != 0) + { + LogError("Failed to set the AMQP incoming window size."); + } + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_115: [IoTHubTransportAMQP_DoWork shall apply a default value of 100 for the parameter 'AMQP outgoing window'] + if (session_set_outgoing_window(transport_state->session, DEFAULT_OUTGOING_WINDOW_SIZE) != 0) + { + LogError("Failed to set the AMQP outgoing window size."); + } + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_066: [IoTHubTransportAMQP_DoWork shall establish the CBS connection using the cbs_create() AMQP API] + if ((transport_state->cbs.cbs = cbs_create(transport_state->session, on_amqp_management_state_changed, NULL)) == NULL) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_067: [If cbs_create() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately] + result = RESULT_FAILURE; + LogError("Failed to create the CBS connection."); + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_139: [IoTHubTransportAMQP_DoWork shall open the CBS connection using the cbs_open() AMQP API] + else if (cbs_open(transport_state->cbs.cbs) != 0) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_140: [If cbs_open() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately] + result = RESULT_FAILURE; + LogError("Failed to open the connection with CBS."); + } + else + { + transport_state->connection_establish_time = getSecondsSinceEpoch(); + transport_state->cbs.cbs_state = CBS_STATE_IDLE; + connection_set_trace(transport_state->connection, transport_state->is_trace_on); + (void)xio_setoption(transport_state->cbs.sasl_io, "logtrace", &transport_state->is_trace_on); + result = RESULT_OK; + } + } + } + break; + } + case(X509): + { + /*Codes_SRS_IOTHUBTRANSPORTAMQP_02_006: [ IoTHubTransportAMQP_DoWork shall not establish a CBS connection. ]*/ + /*Codes_SRS_IOTHUBTRANSPORTAMQP_02_005: [ IoTHubTransportAMQP_DoWork shall create the connection with the IoT service using connection_create2() AMQP API, passing the TLS I/O layer, IoT Hub FQDN and container ID as parameters (pass NULL for callbacks) ]*/ + if ((transport_state->connection = connection_create2(transport_state->tls_io, STRING_c_str(transport_state->iotHubHostFqdn), DEFAULT_CONTAINER_ID, NULL, NULL, NULL, NULL, on_connection_io_error, (void*)transport_state)) == NULL) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_063: [If connection_create2() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately.] + result = RESULT_FAILURE; + LogError("Failed to create the AMQP connection."); + } + else + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_137: [IoTHubTransportAMQP_DoWork shall create the AMQP session session_create() AMQP API, passing the connection instance as parameter] + if ((transport_state->session = session_create(transport_state->connection, NULL, NULL)) == NULL) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_138 : [If session_create() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately] + result = RESULT_FAILURE; + LogError("Failed to create the AMQP session."); + } + else + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_065: [IoTHubTransportAMQP_DoWork shall apply a default value of UINT_MAX for the parameter 'AMQP incoming window'] + if (session_set_incoming_window(transport_state->session, (uint32_t)DEFAULT_INCOMING_WINDOW_SIZE) != 0) + { + LogError("Failed to set the AMQP incoming window size."); + } + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_115: [IoTHubTransportAMQP_DoWork shall apply a default value of 100 for the parameter 'AMQP outgoing window'] + if (session_set_outgoing_window(transport_state->session, DEFAULT_OUTGOING_WINDOW_SIZE) != 0) + { + LogError("Failed to set the AMQP outgoing window size."); + } + + transport_state->connection_establish_time = getSecondsSinceEpoch(); + connection_set_trace(transport_state->connection, transport_state->is_trace_on); + (void)xio_setoption(transport_state->tls_io, "logtrace", &transport_state->is_trace_on); + result = RESULT_OK; + } + } + break; + } + default: + { + LogError("internal error: unexpected enum value for transport_state->credential.credentialType = %d", transport_state->credential.credentialType); + result = RESULT_FAILURE; + break; + } + }/*switch*/ } if (result == RESULT_FAILURE) @@ -810,15 +925,15 @@ static int handSASTokenToCbs(AMQP_TRANSPORT_INSTANCE* transport_state, STRING_HANDLE sasToken, size_t sas_token_create_time) { int result; - if (cbs_put_token(transport_state->cbs, CBS_AUDIENCE, STRING_c_str(transport_state->devicesPath), STRING_c_str(sasToken), on_put_token_complete, transport_state) != RESULT_OK) + if (cbs_put_token(transport_state->cbs.cbs, CBS_AUDIENCE, STRING_c_str(transport_state->devicesPath), STRING_c_str(sasToken), on_put_token_complete, transport_state) != RESULT_OK) { LogError("Failed applying new SAS token to CBS."); result = __LINE__; } else { - transport_state->cbs_state = CBS_STATE_AUTH_IN_PROGRESS; - transport_state->current_sas_token_create_time = sas_token_create_time; + transport_state->cbs.cbs_state = CBS_STATE_AUTH_IN_PROGRESS; + transport_state->cbs.current_sas_token_create_time = sas_token_create_time; result = RESULT_OK; } return result; @@ -831,75 +946,75 @@ size_t sas_token_create_time = getSecondsSinceEpoch(); // I.e.: NOW, in seconds since epoch. // Codes_SRS_IOTHUBTRANSPORTAMQP_09_083: [Each new SAS token created by the transport shall be valid for up to 'sas_token_lifetime' milliseconds from the time of creation] - size_t new_expiry_time = sas_token_create_time + (transport_state->sas_token_lifetime / 1000); + size_t new_expiry_time = sas_token_create_time + (transport_state->cbs.sas_token_lifetime / 1000); STRING_HANDLE newSASToken; switch (transport_state->credential.credentialType) { - default: + default: + { + result = __LINE__; + LogError("internal error, unexpected enum value transport_state->credential.credentialType=%d", transport_state->credential.credentialType); + break; + } + case DEVICE_KEY: + { + newSASToken = SASToken_Create(transport_state->credential.credential.deviceKey, transport_state->devicesPath, transport_state->cbs.sasTokenKeyName, new_expiry_time); + if (newSASToken == NULL) { - result = __LINE__; - LogError("internal error, unexpected enum value transport_state->credential.credentialType=%d", transport_state->credential.credentialType); - break; + LogError("Could not generate a new SAS token for the CBS."); + result = RESULT_FAILURE; } - case DEVICE_KEY: + else { - newSASToken = SASToken_Create(transport_state->credential.credential.deviceKey, transport_state->devicesPath, transport_state->sasTokenKeyName, new_expiry_time); - if (newSASToken == NULL) + if (handSASTokenToCbs(transport_state, newSASToken, sas_token_create_time) != 0) { - LogError("Could not generate a new SAS token for the CBS."); + LogError("unable to handSASTokenToCbs"); result = RESULT_FAILURE; } else { - if (handSASTokenToCbs(transport_state, newSASToken, sas_token_create_time) != 0) - { - LogError("unable to handSASTokenToCbs"); - result = RESULT_FAILURE; - } - else - { - result = RESULT_OK; - } + result = RESULT_OK; + } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_145: [Each new SAS token created shall be deleted from memory immediately after sending it to CBS] - STRING_delete(newSASToken); - } - break; + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_145: [Each new SAS token created shall be deleted from memory immediately after sending it to CBS] + STRING_delete(newSASToken); } - case DEVICE_SAS_TOKEN: + break; + } + case DEVICE_SAS_TOKEN: + { + newSASToken = STRING_clone(transport_state->credential.credential.deviceSasToken); + if (newSASToken == NULL) { - newSASToken = STRING_clone(transport_state->credential.credential.deviceSasToken); - if (newSASToken == NULL) + LogError("Could not generate a new SAS token for the CBS."); + result = RESULT_FAILURE; + } + else + { + if (handSASTokenToCbs(transport_state, newSASToken, sas_token_create_time) != 0) { - LogError("Could not generate a new SAS token for the CBS."); + LogError("unable to handSASTokenToCbs"); result = RESULT_FAILURE; } else { - if (handSASTokenToCbs(transport_state, newSASToken, sas_token_create_time) != 0) - { - LogError("unable to handSASTokenToCbs"); - result = RESULT_FAILURE; - } - else - { - result = RESULT_OK; - } + result = RESULT_OK; + } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_145: [Each new SAS token created shall be deleted from memory immediately after sending it to CBS] - STRING_delete(newSASToken); - } - break; + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_145: [Each new SAS token created shall be deleted from memory immediately after sending it to CBS] + STRING_delete(newSASToken); } + break; + } } return result; } static int verifyAuthenticationTimeout(AMQP_TRANSPORT_INSTANCE* transport_state) { - return ((getSecondsSinceEpoch() - transport_state->current_sas_token_create_time) * 1000 >= transport_state->cbs_request_timeout) ? RESULT_TIMEOUT : RESULT_OK; + return ((getSecondsSinceEpoch() - transport_state->cbs.current_sas_token_create_time) * 1000 >= transport_state->cbs.cbs_request_timeout) ? RESULT_TIMEOUT : RESULT_OK; } static void attachDeviceClientTypeToLink(LINK_HANDLE link) @@ -970,9 +1085,21 @@ void on_event_sender_state_changed(void* context, MESSAGE_SENDER_STATE new_state, MESSAGE_SENDER_STATE previous_state) { - (void)context; - (void)new_state; - (void)previous_state; + if (context != NULL) + { + AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)context; + + if (transport_state->is_trace_on) + { + LogInfo("Event sender state changed [%d->%d]", previous_state, new_state); + } + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_192: [If a message sender instance changes its state to MESSAGE_SENDER_STATE_ERROR (first transition only) the connection retry logic shall be triggered] + if (new_state != previous_state && new_state == MESSAGE_SENDER_STATE_ERROR) + { + transport_state->connection_state = AMQP_MANAGEMENT_STATE_ERROR; + } + } } static int createEventSender(AMQP_TRANSPORT_INSTANCE* transport_state) @@ -1009,6 +1136,7 @@ attachDeviceClientTypeToLink(transport_state->sender_link); // Codes_SRS_IOTHUBTRANSPORTAMQP_09_070: [IoTHubTransportAMQP_DoWork shall create the AMQP message sender using messagesender_create() AMQP API] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_191: [IoTHubTransportAMQP_DoWork shall create each AMQP message sender tracking its state changes with a callback function] if ((transport_state->message_sender = messagesender_create(transport_state->sender_link, on_event_sender_state_changed, (void*)transport_state)) == NULL) { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_071: [IoTHubTransportAMQP_DoWork shall fail and return immediately if the AMQP message sender instance fails to be created, flagging the connection to be re-established] @@ -1063,6 +1191,25 @@ return result; } +void on_message_receiver_state_changed(const void* context, MESSAGE_RECEIVER_STATE new_state, MESSAGE_RECEIVER_STATE previous_state) +{ + if (context != NULL) + { + AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)context; + + if (transport_state->is_trace_on) + { + LogInfo("Message receiver state changed [%d->%d]", previous_state, new_state); + } + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_190: [If a message_receiver instance changes its state to MESSAGE_RECEIVER_STATE_ERROR (first transition only) the connection retry logic shall be triggered] + if (new_state != previous_state && new_state == MESSAGE_RECEIVER_STATE_ERROR) + { + transport_state->connection_state = AMQP_MANAGEMENT_STATE_ERROR; + } + } +} + static int createMessageReceiver(AMQP_TRANSPORT_INSTANCE* transport_state, IOTHUB_CLIENT_LL_HANDLE iothub_client_handle) { int result = RESULT_FAILURE; @@ -1103,7 +1250,8 @@ attachDeviceClientTypeToLink(transport_state->receiver_link); // Codes_SRS_IOTHUBTRANSPORTAMQP_09_077: [IoTHubTransportAMQP_DoWork shall create the AMQP message receiver using messagereceiver_create() AMQP API] - if ((transport_state->message_receiver = messagereceiver_create(transport_state->receiver_link, NULL, NULL)) == NULL) + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_189: [IoTHubTransportAMQP_DoWork shall create each AMQP message_receiver tracking its state changes with a callback function] + if ((transport_state->message_receiver = messagereceiver_create(transport_state->receiver_link, on_message_receiver_state_changed, (void*)transport_state)) == NULL) { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_078: [IoTHubTransportAMQP_DoWork shall fail and return immediately if the AMQP message receiver instance fails to be created, flagging the connection to be re-established] LogError("Could not allocate AMQP message receiver."); @@ -1252,8 +1400,8 @@ } else { - return ((getSecondsSinceEpoch() - transport_state->current_sas_token_create_time) >= (transport_state->sas_token_refresh_time / 1000)) ? true : false; -} + return ((getSecondsSinceEpoch() - transport_state->cbs.current_sas_token_create_time) >= (transport_state->cbs.sas_token_refresh_time / 1000)) ? true : false; + } } static void prepareForConnectionRetry(AMQP_TRANSPORT_INSTANCE* transport_state) @@ -1270,31 +1418,31 @@ { switch (transport_state->credential.credentialType) { - default: - { - LogError("internal error: unexpected enum value transport_state->credential.credentialType=%d", transport_state->credential.credentialType); - break; - } - case (CREDENTIAL_NOT_BUILD): - { - /*nothing to do*/ - break; - } - case(X509): - { - /*nothing to do here, x509certificate and x509privatekey are both NULL*/ - break; - } - case(DEVICE_KEY): - { - STRING_delete(transport_state->credential.credential.deviceKey); - break; - } - case(DEVICE_SAS_TOKEN): - { - STRING_delete(transport_state->credential.credential.deviceSasToken); - break; - } + default: + { + LogError("internal error: unexpected enum value transport_state->credential.credentialType=%d", transport_state->credential.credentialType); + break; + } + case (CREDENTIAL_NOT_BUILD): + { + /*nothing to do*/ + break; + } + case(X509): + { + /*nothing to do here, x509certificate and x509privatekey are both NULL*/ + break; + } + case(DEVICE_KEY): + { + STRING_delete(transport_state->credential.credential.deviceKey); + break; + } + case(DEVICE_SAS_TOKEN): + { + STRING_delete(transport_state->credential.credential.deviceSasToken); + break; + } } } @@ -1378,13 +1526,8 @@ transport_state->iotHubPort = DEFAULT_IOTHUB_AMQP_PORT; transport_state->devicesPath = NULL; transport_state->messageReceiveAddress = NULL; - transport_state->sasTokenKeyName = NULL; transport_state->targetAddress = NULL; transport_state->waitingToSend = config->waitingToSend; - - transport_state->cbs = NULL; - transport_state->cbs_state = CBS_STATE_IDLE; - transport_state->current_sas_token_create_time = 0; transport_state->connection = NULL; transport_state->connection_state = AMQP_MANAGEMENT_STATE_IDLE; transport_state->connection_establish_time = 0; @@ -1393,8 +1536,6 @@ transport_state->message_receiver = NULL; transport_state->message_sender = NULL; transport_state->receiver_link = NULL; - transport_state->sasl_io = NULL; - transport_state->sasl_mechanism = NULL; transport_state->sender_link = NULL; transport_state->session = NULL; transport_state->tls_io = NULL; @@ -1402,6 +1543,15 @@ transport_state->isRegistered = false; transport_state->is_trace_on = false; + transport_state->cbs.cbs = NULL; + transport_state->cbs.sasTokenKeyName = NULL; + transport_state->cbs.cbs_state = CBS_STATE_IDLE; + transport_state->cbs.current_sas_token_create_time = 0; + transport_state->cbs.sasl_io = NULL; + transport_state->cbs.sasl_mechanism = NULL; + + transport_state->xioOptions = NULL; + transport_state->waitingToSend = config->waitingToSend; DList_InitializeListHead(&transport_state->inProgress); @@ -1434,26 +1584,20 @@ LogError("Failed to allocate transport_state->messageReceiveAddress."); cleanup_required = true; } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_016: [IoTHubTransportAMQP_Create shall initialize handle->sasTokenKeyName with a zero-length STRING_HANDLE instance.] - else if ((transport_state->sasTokenKeyName = STRING_new()) == NULL) + else { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_017: [If IoTHubTransportAMQP_Create fails to initialize handle->sasTokenKeyName with a zero-length STRING the function shall fail and return NULL.] - LogError("Failed to allocate transport_state->sasTokenKeyName."); - cleanup_required = true; - } - else - { - if (config->upperConfig->deviceSasToken != NULL) { - if (config->upperConfig->deviceKey != NULL) + /*only SAS token specified*/ + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_016: [IoTHubTransportAMQP_Create shall initialize handle->sasTokenKeyName with a zero-length STRING_HANDLE instance.] + if ((transport_state->cbs.sasTokenKeyName = STRING_new()) == NULL) { - LogError("invalid configuration, both deviceSasToken AND deviceKey specified"); + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_017: [If IoTHubTransportAMQP_Create fails to initialize handle->sasTokenKeyName with a zero-length STRING the function shall fail and return NULL.] + LogError("Failed to allocate transport_state->sasTokenKeyName."); cleanup_required = true; } else { - /*only SAS token specified*/ transport_state->credential.credential.deviceSasToken = STRING_construct(config->upperConfig->deviceSasToken); if (transport_state->credential.credential.deviceSasToken == NULL) { @@ -1465,6 +1609,7 @@ transport_state->credential.credentialType = DEVICE_SAS_TOKEN; } } + } else { @@ -1472,20 +1617,30 @@ if (config->upperConfig->deviceKey != NULL) { /*it is device key*/ - - transport_state->credential.credential.deviceKey = STRING_construct(config->upperConfig->deviceKey); - if (transport_state->credential.credential.deviceKey == NULL) + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_016: [IoTHubTransportAMQP_Create shall initialize handle->sasTokenKeyName with a zero-length STRING_HANDLE instance.] + if ((transport_state->cbs.sasTokenKeyName = STRING_new()) == NULL) { - LogError("unable to STRING_construct for a deviceKey"); + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_017: [If IoTHubTransportAMQP_Create fails to initialize handle->sasTokenKeyName with a zero-length STRING the function shall fail and return NULL.] + LogError("Failed to allocate transport_state->sasTokenKeyName."); cleanup_required = true; } else { - transport_state->credential.credentialType = DEVICE_KEY; + transport_state->credential.credential.deviceKey = STRING_construct(config->upperConfig->deviceKey); + if (transport_state->credential.credential.deviceKey == NULL) + { + LogError("unable to STRING_construct for a deviceKey"); + cleanup_required = true; + } + else + { + transport_state->credential.credentialType = DEVICE_KEY; + } } } else { + /*Codes_SRS_IOTHUBTRANSPORTAMQP_02_004: [ If both deviceKey and deviceSasToken fields are NULL then IoTHubTransportAMQP_Create shall assume a x509 authentication. ]*/ /*Codes_SRS_IOTHUBTRANSPORTAMQP_02_003: [ IoTHubTransportAMQP_Register shall assume a x509 authentication mechanism when both deviceKey and deviceSasToken are NULL. ]*/ /*when both SAS token AND devicekey are NULL*/ transport_state->credential.credentialType = X509; @@ -1494,24 +1649,25 @@ } } - if(transport_state->credential.credentialType != CREDENTIAL_NOT_BUILD ) + if (transport_state->credential.credentialType != CREDENTIAL_NOT_BUILD) { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_020: [IoTHubTransportAMQP_Create shall set parameter transport_state->sas_token_lifetime with the default value of 3600000 (milliseconds).] - transport_state->sas_token_lifetime = DEFAULT_SAS_TOKEN_LIFETIME_MS; + transport_state->cbs.sas_token_lifetime = DEFAULT_SAS_TOKEN_LIFETIME_MS; // Codes_SRS_IOTHUBTRANSPORTAMQP_09_128: [IoTHubTransportAMQP_Create shall set parameter transport_state->sas_token_refresh_time with the default value of sas_token_lifetime/2 (milliseconds).] - transport_state->sas_token_refresh_time = transport_state->sas_token_lifetime / 2; + transport_state->cbs.sas_token_refresh_time = transport_state->cbs.sas_token_lifetime / 2; // Codes_SRS_IOTHUBTRANSPORTAMQP_09_129 : [IoTHubTransportAMQP_Create shall set parameter transport_state->cbs_request_timeout with the default value of 30000 (milliseconds).] - transport_state->cbs_request_timeout = DEFAULT_CBS_REQUEST_TIMEOUT_MS; + transport_state->cbs.cbs_request_timeout = DEFAULT_CBS_REQUEST_TIMEOUT_MS; } + } if (cleanup_required) { credential_destroy(transport_state); - if (transport_state->sasTokenKeyName != NULL) - STRING_delete(transport_state->sasTokenKeyName); + if (transport_state->cbs.sasTokenKeyName != NULL) + STRING_delete(transport_state->cbs.sasTokenKeyName); if (transport_state->targetAddress != NULL) STRING_delete(transport_state->targetAddress); if (transport_state->messageReceiveAddress != NULL) @@ -1555,7 +1711,7 @@ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_035 : [IoTHubTransportAMQP_Destroy shall delete its internally - set parameters(deviceKey, targetAddress, devicesPath, sasTokenKeyName).] STRING_delete(transport_state->targetAddress); STRING_delete(transport_state->messageReceiveAddress); - STRING_delete(transport_state->sasTokenKeyName); + STRING_delete(transport_state->cbs.sasTokenKeyName); credential_destroy(transport_state); STRING_delete(transport_state->devicesPath); STRING_delete(transport_state->iotHubHostFqdn); @@ -1563,6 +1719,11 @@ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_036 : [IoTHubTransportAMQP_Destroy shall return the remaining items in inProgress to waitingToSend list.] rollEventsBackToWaitList(transport_state); + if (transport_state->xioOptions != NULL) + { + OptionHandler_Destroy(transport_state->xioOptions); + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_150: [IoTHubTransportAMQP_Destroy shall destroy the transport instance] free(transport_state); } @@ -1601,50 +1762,96 @@ LogError("AMQP transport failed to establish connection with service."); trigger_connection_retry = true; } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_081: [IoTHubTransportAMQP_DoWork shall put a new SAS token if the one has not been out already, or if the previous one failed to be put due to timeout of cbs_put_token().] - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_082: [IoTHubTransportAMQP_DoWork shall refresh the SAS token if the current token has been used for more than 'sas_token_refresh_time' milliseconds] - else if ((transport_state->cbs_state == CBS_STATE_IDLE || isSasTokenRefreshRequired(transport_state)) && - startAuthentication(transport_state) != RESULT_OK) - { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_146: [If the SAS token fails to be sent to CBS (cbs_put_token), IoTHubTransportAMQP_DoWork shall fail and exit immediately] - LogError("Failed authenticating AMQP connection within CBS."); - trigger_connection_retry = true; - } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_084: [IoTHubTransportAMQP_DoWork shall wait for 'cbs_request_timeout' milliseconds for the cbs_put_token() to complete before failing due to timeout] - else if (transport_state->cbs_state == CBS_STATE_AUTH_IN_PROGRESS && - verifyAuthenticationTimeout(transport_state) == RESULT_TIMEOUT) - { - LogError("AMQP transport authentication timed out."); - trigger_connection_retry = true; - } - else if (transport_state->cbs_state == CBS_STATE_AUTHENTICATED) + else { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_121: [IoTHubTransportAMQP_DoWork shall create an AMQP message_receiver if transport_state->message_receive is NULL and transport_state->receive_messages is true] - if (transport_state->receive_messages == true && - transport_state->message_receiver == NULL && - createMessageReceiver(transport_state, iotHubClientHandle) != RESULT_OK) - { - LogError("Failed creating AMQP transport message receiver."); - trigger_connection_retry = true; - } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_122: [IoTHubTransportAMQP_DoWork shall destroy the transport_state->message_receiver (and set it to NULL) if it exists and transport_state->receive_messages is false] - else if (transport_state->receive_messages == false && - transport_state->message_receiver != NULL && - destroyMessageReceiver(transport_state) != RESULT_OK) + switch(transport_state->credential.credentialType) { - LogError("Failed destroying AMQP transport message receiver."); - } + case(DEVICE_KEY): + case(DEVICE_SAS_TOKEN): + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_081: [IoTHubTransportAMQP_DoWork shall put a new SAS token if the one has not been out already, or if the previous one failed to be put due to timeout of cbs_put_token().] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_082: [IoTHubTransportAMQP_DoWork shall refresh the SAS token if the current token has been used for more than 'sas_token_refresh_time' milliseconds] + if ((transport_state->cbs.cbs_state == CBS_STATE_IDLE || isSasTokenRefreshRequired(transport_state)) && + startAuthentication(transport_state) != RESULT_OK) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_146: [If the SAS token fails to be sent to CBS (cbs_put_token), IoTHubTransportAMQP_DoWork shall fail and exit immediately] + LogError("Failed authenticating AMQP connection within CBS."); + trigger_connection_retry = true; + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_084: [IoTHubTransportAMQP_DoWork shall wait for 'cbs_request_timeout' milliseconds for the cbs_put_token() to complete before failing due to timeout] + else if (transport_state->cbs.cbs_state == CBS_STATE_AUTH_IN_PROGRESS && + verifyAuthenticationTimeout(transport_state) == RESULT_TIMEOUT) + { + LogError("AMQP transport authentication timed out."); + trigger_connection_retry = true; + } + else if (transport_state->cbs.cbs_state == CBS_STATE_AUTHENTICATED) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_121: [IoTHubTransportAMQP_DoWork shall create an AMQP message_receiver if transport_state->message_receive is NULL and transport_state->receive_messages is true] + if (transport_state->receive_messages == true && + transport_state->message_receiver == NULL && + createMessageReceiver(transport_state, iotHubClientHandle) != RESULT_OK) + { + LogError("Failed creating AMQP transport message receiver."); + trigger_connection_retry = true; + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_122: [IoTHubTransportAMQP_DoWork shall destroy the transport_state->message_receiver (and set it to NULL) if it exists and transport_state->receive_messages is false] + else if (transport_state->receive_messages == false && + transport_state->message_receiver != NULL && + destroyMessageReceiver(transport_state) != RESULT_OK) + { + LogError("Failed destroying AMQP transport message receiver."); + } - if (transport_state->message_sender == NULL && - createEventSender(transport_state) != RESULT_OK) - { - LogError("Failed creating AMQP transport event sender."); - trigger_connection_retry = true; - } - else if (sendPendingEvents(transport_state) != RESULT_OK) - { - LogError("AMQP transport failed sending events."); - } + if (transport_state->message_sender == NULL && + createEventSender(transport_state) != RESULT_OK) + { + LogError("Failed creating AMQP transport event sender."); + trigger_connection_retry = true; + } + else if (sendPendingEvents(transport_state) != RESULT_OK) + { + LogError("AMQP transport failed sending events."); + } + } + break; + } + case (X509): + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_121: [IoTHubTransportAMQP_DoWork shall create an AMQP message_receiver if transport_state->message_receive is NULL and transport_state->receive_messages is true] + if (transport_state->receive_messages == true && + transport_state->message_receiver == NULL && + createMessageReceiver(transport_state, iotHubClientHandle) != RESULT_OK) + { + LogError("Failed creating AMQP transport message receiver."); + trigger_connection_retry = true; + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_122: [IoTHubTransportAMQP_DoWork shall destroy the transport_state->message_receiver (and set it to NULL) if it exists and transport_state->receive_messages is false] + else if (transport_state->receive_messages == false && + transport_state->message_receiver != NULL && + destroyMessageReceiver(transport_state) != RESULT_OK) + { + LogError("Failed destroying AMQP transport message receiver."); + } + + if (transport_state->message_sender == NULL && + createEventSender(transport_state) != RESULT_OK) + { + LogError("Failed creating AMQP transport event sender."); + trigger_connection_retry = true; + } + else if (sendPendingEvents(transport_state) != RESULT_OK) + { + LogError("AMQP transport failed sending events."); + } + break; + } + default: + { + LogError("internal error: unexpected enum value : transport_state->credential.credentialType = %d", transport_state->credential.credentialType); + trigger_connection_retry = true; + } + }/*switch*/ } if (trigger_connection_retry) @@ -1754,19 +1961,19 @@ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_048: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "sas_token_lifetime", returning IOTHUB_CLIENT_OK] if (strcmp("sas_token_lifetime", option) == 0) { - transport_state->sas_token_lifetime = *((size_t*)value); + transport_state->cbs.sas_token_lifetime = *((size_t*)value); result = IOTHUB_CLIENT_OK; } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_049: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "sas_token_refresh_time", returning IOTHUB_CLIENT_OK] else if (strcmp("sas_token_refresh_time", option) == 0) { - transport_state->sas_token_refresh_time = *((size_t*)value); + transport_state->cbs.sas_token_refresh_time = *((size_t*)value); result = IOTHUB_CLIENT_OK; } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_148: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "cbs_request_timeout", returning IOTHUB_CLIENT_OK] else if (strcmp("cbs_request_timeout", option) == 0) { - transport_state->cbs_request_timeout = *((size_t*)value); + transport_state->cbs.cbs_request_timeout = *((size_t*)value); result = IOTHUB_CLIENT_OK; } else if (strcmp("logtrace", option) == 0) @@ -1778,6 +1985,18 @@ } result = IOTHUB_CLIENT_OK; } + /*Codes_SRS_IOTHUBTRANSPORTAMQP_02_007: [ If optionName is x509certificate and the authentication method is not x509 then IoTHubTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG. ]*/ + else if ((strcmp("x509certificate", option) == 0) && (transport_state->credential.credentialType != X509)) + { + LogError("x509certificate specified, but authentication method is not x509"); + result = IOTHUB_CLIENT_INVALID_ARG; + } + /*Codes_SRS_IOTHUBTRANSPORTAMQP_02_008: [ If optionName is x509privatekey and the authentication method is not x509 then IoTHubTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG. ]*/ + else if ((strcmp("x509privatekey", option) == 0) && (transport_state->credential.credentialType != X509)) + { + LogError("x509privatekey specified, but authentication method is not x509"); + result = IOTHUB_CLIENT_INVALID_ARG; + } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_047: [If the option name does not match one of the options handled by this module, then IoTHubTransportAMQP_SetOption shall get the handle to the XIO and invoke the xio_setoption passing down the option name and value parameters.] else { @@ -1789,6 +2008,21 @@ } else { + /*in the case when a tls_io_transport has been created, replay its options*/ + if (transport_state->xioOptions != NULL) + { + if (OptionHandler_FeedOptions(transport_state->xioOptions, transport_state->tls_io) != 0) + { + LogError("unable to replay options to TLS"); /*pessimistically hope TLS will fail, be recreated and options re-given*/ + } + else + { + /*everything is fine, forget the saved options...*/ + OptionHandler_Destroy(transport_state->xioOptions); + transport_state->xioOptions = NULL; + } + } + /* Codes_SRS_IOTHUBTRANSPORTAMQP_03_001: [If xio_setoption fails, IoTHubTransportAMQP_SetOption shall return IOTHUB_CLIENT_ERROR.] */ if (xio_setoption(transport_state->tls_io, option, value) == 0) { @@ -1797,7 +2031,7 @@ else { result = IOTHUB_CLIENT_ERROR; - LogError("Invalid option (%s) passed to uAMQP transport SetOption()", option); + LogError("Invalid option (%s) passed to IoTHubTransportAMQP_SetOption", option); } } } @@ -1828,7 +2062,7 @@ result = NULL; } // Codes_SRS_IOTHUBTRANSPORTAMQP_03_003: [IoTHubTransportAMQP_Register shall return NULL if both deviceKey and deviceSasToken are not NULL.] - else if ( (device->deviceSasToken != NULL) && (device->deviceKey != NULL) ) + else if ((device->deviceSasToken != NULL) && (device->deviceKey != NULL)) { LogError("invalid parameter both device->deviceSasToken and device->deviceKey were NULL"); result = NULL;