Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependents: sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more
Revision 21:32a1746384ba, committed 2016-08-12
- Comitter:
- AzureIoTClient
- Date:
- Fri Aug 12 10:03:28 2016 -0700
- Parent:
- 20:8dec76e7ba34
- Child:
- 22:8b70cf813f25
- Commit message:
- 1.0.10
Changed in this revision
| iothubtransportamqp.c | Show annotated file Show diff for this revision Revisions of this file |
--- a/iothubtransportamqp.c Fri Jul 29 15:52:42 2016 -0700
+++ b/iothubtransportamqp.c Fri Aug 12 10:03:28 2016 -0700
@@ -29,6 +29,7 @@
#include "azure_uamqp_c/saslclientio.h"
#include "iothub_client_ll.h"
+#include "iothub_client_options.h"
#include "iothub_client_private.h"
#include "iothubtransportamqp.h"
#include "iothub_client_version.h"
@@ -263,8 +264,90 @@
}
}
+/*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_193: [**IoTHubTransportAMQP_DoWork shall set the AMQP the message-id and correlation-id if found to the UAMQP message before passing down**]***/
+static int addPropertiesTouAMQPMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, MESSAGE_HANDLE uamqp_message)
+{
+ int result = RESULT_OK;
+ const char* messageId;
+ const char* correlationId;
+ PROPERTIES_HANDLE uamqp_message_properties;
+ int api_call_result;
-static int addPropertiesTouAMQPMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, MESSAGE_HANDLE uamqp_message)
+ /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_194: [**Uamqp message properties shall be retrieved using message_get_properties to update message-id/Correlation-Id **]***/
+ if ((api_call_result = message_get_properties(uamqp_message, &uamqp_message_properties)) != 0)
+ {
+ LogError("Failed to get properties map from uAMQP message (error code %d).", api_call_result);
+ result = __LINE__;
+ }
+ /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_195: [**If UAMQP message properties were not present then new properties shall be created using properties_create()**]***/
+ else if (uamqp_message_properties == NULL &&
+ (uamqp_message_properties = properties_create()) == NULL)
+ {
+ LogError("Failed to create properties map for uAMQP message (error code %d).", api_call_result);
+ result = __LINE__;
+ }
+ else
+ {
+ /***Codes_SRS_IOTHUBTRANSPORTAMQP_25_200: [**As message - id is optional field, if it is not set by the client, processing shall ignore and continue normally**] * */
+ /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_196: [**Message-id from the IotHub Client shall be read using IoTHubMessage_GetMessageId()**]***/
+ if ((messageId = IoTHubMessage_GetMessageId(iothub_message_handle)) != NULL)
+ {
+ AMQP_VALUE uamqp_message_id;
+ /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_197: [**Uamqp message id shall be created using amqpvalue_create_string()**]***/
+ if ((uamqp_message_id = amqpvalue_create_string(messageId)) == NULL)
+ {
+ LogError("Failed to create an AMQP_VALUE for the messageId property value.");
+ result = __LINE__;
+ }
+ else
+ {
+ /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_198: [**Message id would be set to Uamqp using properties_set_message_id()**]***/
+ if ((api_call_result = properties_set_message_id(uamqp_message_properties, uamqp_message_id)) != 0)
+ {
+ LogInfo("Failed to set value of uAMQP message 'message-id' property (%d).", api_call_result);
+ result = __LINE__;
+ }
+ /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_199: [**Uamqp value used for message id shall be destroyed using amqpvalue_destroy() upon completion of its use**]***/
+ amqpvalue_destroy(uamqp_message_id);
+ }
+ }
+ /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_205: [**As Correlation-id is optional field, if it is not set by the client, processing shall ignore and continue normally**]***/
+ /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_201: [**Correlation-id from the IotHub Client shall be read using IoTHubMessage_GetCorrelationId()**]***/
+ if ((correlationId = IoTHubMessage_GetCorrelationId(iothub_message_handle)) != NULL)
+ {
+ AMQP_VALUE uamqp_correlation_id;
+ /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_202: [**Uamqp value for Correlation id shall be created using amqpvalue_create_string()**]***/
+ if ((uamqp_correlation_id = amqpvalue_create_string(correlationId)) == NULL)
+ {
+ LogError("Failed to create an AMQP_VALUE for the messageId property value.");
+ result = __LINE__;
+ }
+ else
+ {
+ /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_203: [**Correlation id would be set to Uamqp using properties_set_correlation_id()**]***/
+ if ((api_call_result = properties_set_correlation_id(uamqp_message_properties, uamqp_correlation_id)) != 0)
+ {
+ LogInfo("Failed to set value of uAMQP message 'message-id' property (%d).", api_call_result);
+ result = __LINE__;
+ }
+ /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_204: [**Uamqp value used for Correlation id shall be destroyed using amqpvalue_destroy() upon completion of its use**]***/
+ amqpvalue_destroy(uamqp_correlation_id);
+ }
+ }
+ /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_206: [**Modified Uamqp properties shall be set using message_set_properties()**]***/
+ if ((api_call_result = message_set_properties(uamqp_message, uamqp_message_properties)) != 0)
+ {
+ LogError("Failed to set properties map on uAMQP message (error code %d).", api_call_result);
+ result = __LINE__;
+ }
+ }
+
+ properties_destroy(uamqp_message_properties);
+
+ return result;
+}
+
+static int addApplicationPropertiesTouAMQPMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, MESSAGE_HANDLE uamqp_message)
{
int result;
MAP_HANDLE properties_map;
@@ -857,7 +940,7 @@
transport_state->connection_establish_time = getSecondsSinceEpoch();
transport_state->cbs.cbs_state = CBS_STATE_IDLE;
connection_set_trace(transport_state->connection, transport_state->is_trace_on);
- (void)xio_setoption(transport_state->cbs.sasl_io, "logtrace", &transport_state->is_trace_on);
+ (void)xio_setoption(transport_state->cbs.sasl_io, OPTION_LOG_TRACE, &transport_state->is_trace_on);
result = RESULT_OK;
}
}
@@ -899,7 +982,7 @@
transport_state->connection_establish_time = getSecondsSinceEpoch();
connection_set_trace(transport_state->connection, transport_state->is_trace_on);
- (void)xio_setoption(transport_state->tls_io, "logtrace", &transport_state->is_trace_on);
+ (void)xio_setoption(transport_state->tls_io, OPTION_LOG_TRACE, &transport_state->is_trace_on);
result = RESULT_OK;
}
}
@@ -1085,21 +1168,21 @@
void on_event_sender_state_changed(void* context, MESSAGE_SENDER_STATE new_state, MESSAGE_SENDER_STATE previous_state)
{
- if (context != NULL)
- {
- AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)context;
+ if (context != NULL)
+ {
+ AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)context;
- if (transport_state->is_trace_on)
- {
- LogInfo("Event sender state changed [%d->%d]", previous_state, new_state);
- }
+ if (transport_state->is_trace_on)
+ {
+ LogInfo("Event sender state changed [%d->%d]", previous_state, new_state);
+ }
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_192: [If a message sender instance changes its state to MESSAGE_SENDER_STATE_ERROR (first transition only) the connection retry logic shall be triggered]
- if (new_state != previous_state && new_state == MESSAGE_SENDER_STATE_ERROR)
- {
- transport_state->connection_state = AMQP_MANAGEMENT_STATE_ERROR;
- }
- }
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_192: [If a message sender instance changes its state to MESSAGE_SENDER_STATE_ERROR (first transition only) the connection retry logic shall be triggered]
+ if (new_state != previous_state && new_state == MESSAGE_SENDER_STATE_ERROR)
+ {
+ transport_state->connection_state = AMQP_MANAGEMENT_STATE_ERROR;
+ }
+ }
}
static int createEventSender(AMQP_TRANSPORT_INSTANCE* transport_state)
@@ -1136,7 +1219,7 @@
attachDeviceClientTypeToLink(transport_state->sender_link);
// Codes_SRS_IOTHUBTRANSPORTAMQP_09_070: [IoTHubTransportAMQP_DoWork shall create the AMQP message sender using messagesender_create() AMQP API]
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_191: [IoTHubTransportAMQP_DoWork shall create each AMQP message sender tracking its state changes with a callback function]
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_191: [IoTHubTransportAMQP_DoWork shall create each AMQP message sender tracking its state changes with a callback function]
if ((transport_state->message_sender = messagesender_create(transport_state->sender_link, on_event_sender_state_changed, (void*)transport_state)) == NULL)
{
// Codes_SRS_IOTHUBTRANSPORTAMQP_09_071: [IoTHubTransportAMQP_DoWork shall fail and return immediately if the AMQP message sender instance fails to be created, flagging the connection to be re-established]
@@ -1193,21 +1276,21 @@
void on_message_receiver_state_changed(const void* context, MESSAGE_RECEIVER_STATE new_state, MESSAGE_RECEIVER_STATE previous_state)
{
- if (context != NULL)
- {
- AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)context;
+ if (context != NULL)
+ {
+ AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)context;
- if (transport_state->is_trace_on)
- {
- LogInfo("Message receiver state changed [%d->%d]", previous_state, new_state);
- }
+ if (transport_state->is_trace_on)
+ {
+ LogInfo("Message receiver state changed [%d->%d]", previous_state, new_state);
+ }
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_190: [If a message_receiver instance changes its state to MESSAGE_RECEIVER_STATE_ERROR (first transition only) the connection retry logic shall be triggered]
- if (new_state != previous_state && new_state == MESSAGE_RECEIVER_STATE_ERROR)
- {
- transport_state->connection_state = AMQP_MANAGEMENT_STATE_ERROR;
- }
- }
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_190: [If a message_receiver instance changes its state to MESSAGE_RECEIVER_STATE_ERROR (first transition only) the connection retry logic shall be triggered]
+ if (new_state != previous_state && new_state == MESSAGE_RECEIVER_STATE_ERROR)
+ {
+ transport_state->connection_state = AMQP_MANAGEMENT_STATE_ERROR;
+ }
+ }
}
static int createMessageReceiver(AMQP_TRANSPORT_INSTANCE* transport_state, IOTHUB_CLIENT_LL_HANDLE iothub_client_handle)
@@ -1250,7 +1333,7 @@
attachDeviceClientTypeToLink(transport_state->receiver_link);
// Codes_SRS_IOTHUBTRANSPORTAMQP_09_077: [IoTHubTransportAMQP_DoWork shall create the AMQP message receiver using messagereceiver_create() AMQP API]
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_189: [IoTHubTransportAMQP_DoWork shall create each AMQP message_receiver tracking its state changes with a callback function]
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_189: [IoTHubTransportAMQP_DoWork shall create each AMQP message_receiver tracking its state changes with a callback function]
if ((transport_state->message_receiver = messagereceiver_create(transport_state->receiver_link, on_message_receiver_state_changed, (void*)transport_state)) == NULL)
{
// Codes_SRS_IOTHUBTRANSPORTAMQP_09_078: [IoTHubTransportAMQP_DoWork shall fail and return immediately if the AMQP message receiver instance fails to be created, flagging the connection to be re-established]
@@ -1349,6 +1432,11 @@
/* Codes_SRS_IOTHUBTRANSPORTAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */
is_message_error = true;
}
+ else if (addApplicationPropertiesTouAMQPMessage(message->messageHandle, amqp_message) != 0)
+ {
+ /* Codes_SRS_IOTHUBTRANSPORTAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */
+ is_message_error = true;
+ }
else
{
// Codes_SRS_IOTHUBTRANSPORTAMQP_09_097: [IoTHubTransportAMQP_DoWork shall pass the encoded AMQP message to AMQP for sending (along with on_message_send_complete callback) using messagesender_send()]
@@ -1959,24 +2047,24 @@
AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)handle;
// Codes_SRS_IOTHUBTRANSPORTAMQP_09_048: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "sas_token_lifetime", returning IOTHUB_CLIENT_OK]
- if (strcmp("sas_token_lifetime", option) == 0)
+ if (strcmp(OPTION_SAS_TOKEN_LIFETIME, option) == 0)
{
transport_state->cbs.sas_token_lifetime = *((size_t*)value);
result = IOTHUB_CLIENT_OK;
}
// Codes_SRS_IOTHUBTRANSPORTAMQP_09_049: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "sas_token_refresh_time", returning IOTHUB_CLIENT_OK]
- else if (strcmp("sas_token_refresh_time", option) == 0)
+ else if (strcmp(OPTION_SAS_TOKEN_REFRESH_TIME, option) == 0)
{
transport_state->cbs.sas_token_refresh_time = *((size_t*)value);
result = IOTHUB_CLIENT_OK;
}
// Codes_SRS_IOTHUBTRANSPORTAMQP_09_148: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "cbs_request_timeout", returning IOTHUB_CLIENT_OK]
- else if (strcmp("cbs_request_timeout", option) == 0)
+ else if (strcmp(OPTION_CBS_REQUEST_TIMEOUT, option) == 0)
{
transport_state->cbs.cbs_request_timeout = *((size_t*)value);
result = IOTHUB_CLIENT_OK;
}
- else if (strcmp("logtrace", option) == 0)
+ else if (strcmp(OPTION_LOG_TRACE, option) == 0)
{
transport_state->is_trace_on = *((bool*)value);
if (transport_state->connection != NULL)
@@ -1986,13 +2074,13 @@
result = IOTHUB_CLIENT_OK;
}
/*Codes_SRS_IOTHUBTRANSPORTAMQP_02_007: [ If optionName is x509certificate and the authentication method is not x509 then IoTHubTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG. ]*/
- else if ((strcmp("x509certificate", option) == 0) && (transport_state->credential.credentialType != X509))
+ else if ((strcmp(OPTION_X509_CERT, option) == 0) && (transport_state->credential.credentialType != X509))
{
LogError("x509certificate specified, but authentication method is not x509");
result = IOTHUB_CLIENT_INVALID_ARG;
}
/*Codes_SRS_IOTHUBTRANSPORTAMQP_02_008: [ If optionName is x509privatekey and the authentication method is not x509 then IoTHubTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG. ]*/
- else if ((strcmp("x509privatekey", option) == 0) && (transport_state->credential.credentialType != X509))
+ else if ((strcmp(OPTION_X509_PRIVATE_KEY, option) == 0) && (transport_state->credential.credentialType != X509))
{
LogError("x509privatekey specified, but authentication method is not x509");
result = IOTHUB_CLIENT_INVALID_ARG;
