Microsoft Azure IoTHub client AMQP transport
Dependents: sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more
This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks
Diff: iothubtransportamqp.c
- Revision:
- 21:32a1746384ba
- Parent:
- 20:8dec76e7ba34
- Child:
- 22:8b70cf813f25
diff -r 8dec76e7ba34 -r 32a1746384ba iothubtransportamqp.c --- a/iothubtransportamqp.c Fri Jul 29 15:52:42 2016 -0700 +++ b/iothubtransportamqp.c Fri Aug 12 10:03:28 2016 -0700 @@ -29,6 +29,7 @@ #include "azure_uamqp_c/saslclientio.h" #include "iothub_client_ll.h" +#include "iothub_client_options.h" #include "iothub_client_private.h" #include "iothubtransportamqp.h" #include "iothub_client_version.h" @@ -263,8 +264,90 @@ } } +/*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_193: [**IoTHubTransportAMQP_DoWork shall set the AMQP the message-id and correlation-id if found to the UAMQP message before passing down**]***/ +static int addPropertiesTouAMQPMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, MESSAGE_HANDLE uamqp_message) +{ + int result = RESULT_OK; + const char* messageId; + const char* correlationId; + PROPERTIES_HANDLE uamqp_message_properties; + int api_call_result; -static int addPropertiesTouAMQPMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, MESSAGE_HANDLE uamqp_message) + /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_194: [**Uamqp message properties shall be retrieved using message_get_properties to update message-id/Correlation-Id **]***/ + if ((api_call_result = message_get_properties(uamqp_message, &uamqp_message_properties)) != 0) + { + LogError("Failed to get properties map from uAMQP message (error code %d).", api_call_result); + result = __LINE__; + } + /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_195: [**If UAMQP message properties were not present then new properties shall be created using properties_create()**]***/ + else if (uamqp_message_properties == NULL && + (uamqp_message_properties = properties_create()) == NULL) + { + LogError("Failed to create properties map for uAMQP message (error code %d).", api_call_result); + result = __LINE__; + } + else + { + /***Codes_SRS_IOTHUBTRANSPORTAMQP_25_200: [**As message - id is optional field, if it is not set by the client, processing shall ignore and continue normally**] * */ + /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_196: [**Message-id from the IotHub Client shall be read using IoTHubMessage_GetMessageId()**]***/ + if ((messageId = IoTHubMessage_GetMessageId(iothub_message_handle)) != NULL) + { + AMQP_VALUE uamqp_message_id; + /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_197: [**Uamqp message id shall be created using amqpvalue_create_string()**]***/ + if ((uamqp_message_id = amqpvalue_create_string(messageId)) == NULL) + { + LogError("Failed to create an AMQP_VALUE for the messageId property value."); + result = __LINE__; + } + else + { + /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_198: [**Message id would be set to Uamqp using properties_set_message_id()**]***/ + if ((api_call_result = properties_set_message_id(uamqp_message_properties, uamqp_message_id)) != 0) + { + LogInfo("Failed to set value of uAMQP message 'message-id' property (%d).", api_call_result); + result = __LINE__; + } + /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_199: [**Uamqp value used for message id shall be destroyed using amqpvalue_destroy() upon completion of its use**]***/ + amqpvalue_destroy(uamqp_message_id); + } + } + /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_205: [**As Correlation-id is optional field, if it is not set by the client, processing shall ignore and continue normally**]***/ + /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_201: [**Correlation-id from the IotHub Client shall be read using IoTHubMessage_GetCorrelationId()**]***/ + if ((correlationId = IoTHubMessage_GetCorrelationId(iothub_message_handle)) != NULL) + { + AMQP_VALUE uamqp_correlation_id; + /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_202: [**Uamqp value for Correlation id shall be created using amqpvalue_create_string()**]***/ + if ((uamqp_correlation_id = amqpvalue_create_string(correlationId)) == NULL) + { + LogError("Failed to create an AMQP_VALUE for the messageId property value."); + result = __LINE__; + } + else + { + /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_203: [**Correlation id would be set to Uamqp using properties_set_correlation_id()**]***/ + if ((api_call_result = properties_set_correlation_id(uamqp_message_properties, uamqp_correlation_id)) != 0) + { + LogInfo("Failed to set value of uAMQP message 'message-id' property (%d).", api_call_result); + result = __LINE__; + } + /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_204: [**Uamqp value used for Correlation id shall be destroyed using amqpvalue_destroy() upon completion of its use**]***/ + amqpvalue_destroy(uamqp_correlation_id); + } + } + /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_206: [**Modified Uamqp properties shall be set using message_set_properties()**]***/ + if ((api_call_result = message_set_properties(uamqp_message, uamqp_message_properties)) != 0) + { + LogError("Failed to set properties map on uAMQP message (error code %d).", api_call_result); + result = __LINE__; + } + } + + properties_destroy(uamqp_message_properties); + + return result; +} + +static int addApplicationPropertiesTouAMQPMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, MESSAGE_HANDLE uamqp_message) { int result; MAP_HANDLE properties_map; @@ -857,7 +940,7 @@ transport_state->connection_establish_time = getSecondsSinceEpoch(); transport_state->cbs.cbs_state = CBS_STATE_IDLE; connection_set_trace(transport_state->connection, transport_state->is_trace_on); - (void)xio_setoption(transport_state->cbs.sasl_io, "logtrace", &transport_state->is_trace_on); + (void)xio_setoption(transport_state->cbs.sasl_io, OPTION_LOG_TRACE, &transport_state->is_trace_on); result = RESULT_OK; } } @@ -899,7 +982,7 @@ transport_state->connection_establish_time = getSecondsSinceEpoch(); connection_set_trace(transport_state->connection, transport_state->is_trace_on); - (void)xio_setoption(transport_state->tls_io, "logtrace", &transport_state->is_trace_on); + (void)xio_setoption(transport_state->tls_io, OPTION_LOG_TRACE, &transport_state->is_trace_on); result = RESULT_OK; } } @@ -1085,21 +1168,21 @@ void on_event_sender_state_changed(void* context, MESSAGE_SENDER_STATE new_state, MESSAGE_SENDER_STATE previous_state) { - if (context != NULL) - { - AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)context; + if (context != NULL) + { + AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)context; - if (transport_state->is_trace_on) - { - LogInfo("Event sender state changed [%d->%d]", previous_state, new_state); - } + if (transport_state->is_trace_on) + { + LogInfo("Event sender state changed [%d->%d]", previous_state, new_state); + } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_192: [If a message sender instance changes its state to MESSAGE_SENDER_STATE_ERROR (first transition only) the connection retry logic shall be triggered] - if (new_state != previous_state && new_state == MESSAGE_SENDER_STATE_ERROR) - { - transport_state->connection_state = AMQP_MANAGEMENT_STATE_ERROR; - } - } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_192: [If a message sender instance changes its state to MESSAGE_SENDER_STATE_ERROR (first transition only) the connection retry logic shall be triggered] + if (new_state != previous_state && new_state == MESSAGE_SENDER_STATE_ERROR) + { + transport_state->connection_state = AMQP_MANAGEMENT_STATE_ERROR; + } + } } static int createEventSender(AMQP_TRANSPORT_INSTANCE* transport_state) @@ -1136,7 +1219,7 @@ attachDeviceClientTypeToLink(transport_state->sender_link); // Codes_SRS_IOTHUBTRANSPORTAMQP_09_070: [IoTHubTransportAMQP_DoWork shall create the AMQP message sender using messagesender_create() AMQP API] - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_191: [IoTHubTransportAMQP_DoWork shall create each AMQP message sender tracking its state changes with a callback function] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_191: [IoTHubTransportAMQP_DoWork shall create each AMQP message sender tracking its state changes with a callback function] if ((transport_state->message_sender = messagesender_create(transport_state->sender_link, on_event_sender_state_changed, (void*)transport_state)) == NULL) { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_071: [IoTHubTransportAMQP_DoWork shall fail and return immediately if the AMQP message sender instance fails to be created, flagging the connection to be re-established] @@ -1193,21 +1276,21 @@ void on_message_receiver_state_changed(const void* context, MESSAGE_RECEIVER_STATE new_state, MESSAGE_RECEIVER_STATE previous_state) { - if (context != NULL) - { - AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)context; + if (context != NULL) + { + AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)context; - if (transport_state->is_trace_on) - { - LogInfo("Message receiver state changed [%d->%d]", previous_state, new_state); - } + if (transport_state->is_trace_on) + { + LogInfo("Message receiver state changed [%d->%d]", previous_state, new_state); + } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_190: [If a message_receiver instance changes its state to MESSAGE_RECEIVER_STATE_ERROR (first transition only) the connection retry logic shall be triggered] - if (new_state != previous_state && new_state == MESSAGE_RECEIVER_STATE_ERROR) - { - transport_state->connection_state = AMQP_MANAGEMENT_STATE_ERROR; - } - } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_190: [If a message_receiver instance changes its state to MESSAGE_RECEIVER_STATE_ERROR (first transition only) the connection retry logic shall be triggered] + if (new_state != previous_state && new_state == MESSAGE_RECEIVER_STATE_ERROR) + { + transport_state->connection_state = AMQP_MANAGEMENT_STATE_ERROR; + } + } } static int createMessageReceiver(AMQP_TRANSPORT_INSTANCE* transport_state, IOTHUB_CLIENT_LL_HANDLE iothub_client_handle) @@ -1250,7 +1333,7 @@ attachDeviceClientTypeToLink(transport_state->receiver_link); // Codes_SRS_IOTHUBTRANSPORTAMQP_09_077: [IoTHubTransportAMQP_DoWork shall create the AMQP message receiver using messagereceiver_create() AMQP API] - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_189: [IoTHubTransportAMQP_DoWork shall create each AMQP message_receiver tracking its state changes with a callback function] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_189: [IoTHubTransportAMQP_DoWork shall create each AMQP message_receiver tracking its state changes with a callback function] if ((transport_state->message_receiver = messagereceiver_create(transport_state->receiver_link, on_message_receiver_state_changed, (void*)transport_state)) == NULL) { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_078: [IoTHubTransportAMQP_DoWork shall fail and return immediately if the AMQP message receiver instance fails to be created, flagging the connection to be re-established] @@ -1349,6 +1432,11 @@ /* Codes_SRS_IOTHUBTRANSPORTAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */ is_message_error = true; } + else if (addApplicationPropertiesTouAMQPMessage(message->messageHandle, amqp_message) != 0) + { + /* Codes_SRS_IOTHUBTRANSPORTAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */ + is_message_error = true; + } else { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_097: [IoTHubTransportAMQP_DoWork shall pass the encoded AMQP message to AMQP for sending (along with on_message_send_complete callback) using messagesender_send()] @@ -1959,24 +2047,24 @@ AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)handle; // Codes_SRS_IOTHUBTRANSPORTAMQP_09_048: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "sas_token_lifetime", returning IOTHUB_CLIENT_OK] - if (strcmp("sas_token_lifetime", option) == 0) + if (strcmp(OPTION_SAS_TOKEN_LIFETIME, option) == 0) { transport_state->cbs.sas_token_lifetime = *((size_t*)value); result = IOTHUB_CLIENT_OK; } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_049: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "sas_token_refresh_time", returning IOTHUB_CLIENT_OK] - else if (strcmp("sas_token_refresh_time", option) == 0) + else if (strcmp(OPTION_SAS_TOKEN_REFRESH_TIME, option) == 0) { transport_state->cbs.sas_token_refresh_time = *((size_t*)value); result = IOTHUB_CLIENT_OK; } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_148: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "cbs_request_timeout", returning IOTHUB_CLIENT_OK] - else if (strcmp("cbs_request_timeout", option) == 0) + else if (strcmp(OPTION_CBS_REQUEST_TIMEOUT, option) == 0) { transport_state->cbs.cbs_request_timeout = *((size_t*)value); result = IOTHUB_CLIENT_OK; } - else if (strcmp("logtrace", option) == 0) + else if (strcmp(OPTION_LOG_TRACE, option) == 0) { transport_state->is_trace_on = *((bool*)value); if (transport_state->connection != NULL) @@ -1986,13 +2074,13 @@ result = IOTHUB_CLIENT_OK; } /*Codes_SRS_IOTHUBTRANSPORTAMQP_02_007: [ If optionName is x509certificate and the authentication method is not x509 then IoTHubTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG. ]*/ - else if ((strcmp("x509certificate", option) == 0) && (transport_state->credential.credentialType != X509)) + else if ((strcmp(OPTION_X509_CERT, option) == 0) && (transport_state->credential.credentialType != X509)) { LogError("x509certificate specified, but authentication method is not x509"); result = IOTHUB_CLIENT_INVALID_ARG; } /*Codes_SRS_IOTHUBTRANSPORTAMQP_02_008: [ If optionName is x509privatekey and the authentication method is not x509 then IoTHubTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG. ]*/ - else if ((strcmp("x509privatekey", option) == 0) && (transport_state->credential.credentialType != X509)) + else if ((strcmp(OPTION_X509_PRIVATE_KEY, option) == 0) && (transport_state->credential.credentialType != X509)) { LogError("x509privatekey specified, but authentication method is not x509"); result = IOTHUB_CLIENT_INVALID_ARG;