Microsoft Azure IoTHub client AMQP transport
Dependents: sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more
This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks
Diff: iothubtransportamqp.c
- Revision:
- 13:a4af7c301e02
- Parent:
- 12:841a4c36bd36
- Child:
- 14:8e8e42008807
--- a/iothubtransportamqp.c Fri Apr 08 13:23:54 2016 -0700 +++ b/iothubtransportamqp.c Sun Apr 24 16:37:38 2016 -0700 @@ -268,7 +268,7 @@ if (message_get_body_type(message, &body_type) != 0) { - LogError("Failed to get the type of the message received by the transport.\r\n"); + LogError("Failed to get the type of the message received by the transport."); } else { @@ -277,7 +277,7 @@ BINARY_DATA binary_data; if (message_get_body_amqp_data(message, 0, &binary_data) != 0) { - LogError("Failed to get the body of the message received by the transport.\r\n"); + LogError("Failed to get the body of the message received by the transport."); } else { @@ -289,7 +289,7 @@ if (iothub_message == NULL) { disposition_result = IOTHUBMESSAGE_REJECTED; - LogError("Transport failed processing the message received.\r\n"); + LogError("Transport failed processing the message received."); } else { @@ -374,6 +374,16 @@ } } +static void on_connection_io_error(void* context) +{ + AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)context; + + if (transport_state != NULL) + { + transport_state->connection_state = AMQP_MANAGEMENT_STATE_ERROR; + } +} + static int establishConnection(AMQP_TRANSPORT_INSTANCE* transport_state) { int result; @@ -384,14 +394,14 @@ { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_136: [If transport_state->io_transport_provider_callback fails, IoTHubTransportAMQP_DoWork shall fail and return immediately] result = RESULT_FAILURE; - LogError("Failed to obtain a TLS I/O transport layer.\r\n"); + LogError("Failed to obtain a TLS I/O transport layer."); } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_056: [IoTHubTransportAMQP_DoWork shall create the SASL mechanism using AMQP's saslmechanism_create() API] else if ((transport_state->sasl_mechanism = saslmechanism_create(saslmssbcbs_get_interface(), NULL)) == NULL) { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_057: [If saslmechanism_create() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately] result = RESULT_FAILURE; - LogError("Failed to create a SASL mechanism.\r\n"); + LogError("Failed to create a SASL mechanism."); } else { @@ -401,34 +411,34 @@ { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_061: [If xio_create() fails creating the SASL I/O layer, IoTHubTransportAMQP_DoWork shall fail and return immediately] result = RESULT_FAILURE; - LogError("Failed to create a SASL I/O layer.\r\n"); + LogError("Failed to create a SASL I/O layer."); } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_062: [IoTHubTransportAMQP_DoWork shall create the connection with the IoT service using connection_create() AMQP API, passing the SASL I/O layer, IoT Hub FQDN and container ID as parameters (pass NULL for callbacks)] - else if ((transport_state->connection = connection_create(transport_state->sasl_io, STRING_c_str(transport_state->iotHubHostFqdn), DEFAULT_CONTAINER_ID, NULL, NULL)) == NULL) + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_062: [IoTHubTransportAMQP_DoWork shall create the connection with the IoT service using connection_create2() AMQP API, passing the SASL I/O layer, IoT Hub FQDN and container ID as parameters (pass NULL for callbacks)] + else if ((transport_state->connection = connection_create2(transport_state->sasl_io, STRING_c_str(transport_state->iotHubHostFqdn), DEFAULT_CONTAINER_ID, NULL, NULL, NULL, NULL, on_connection_io_error, (void*)transport_state, NULL)) == NULL) { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_063: [If connection_create() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately.] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_063: [If connection_create2() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately.] result = RESULT_FAILURE; - LogError("Failed to create the AMQP connection.\r\n"); + LogError("Failed to create the AMQP connection."); } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_137: [IoTHubTransportAMQP_DoWork shall create the AMQP session session_create() AMQP API, passing the connection instance as parameter] else if ((transport_state->session = session_create(transport_state->connection, NULL, NULL)) == NULL) { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_138 : [If session_create() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately] result = RESULT_FAILURE; - LogError("Failed to create the AMQP session.\r\n"); + LogError("Failed to create the AMQP session."); } else { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_065: [IoTHubTransportAMQP_DoWork shall apply a default value of UINT_MAX for the parameter 'AMQP incoming window'] if (session_set_incoming_window(transport_state->session, (uint32_t)DEFAULT_INCOMING_WINDOW_SIZE) != 0) { - LogError("Failed to set the AMQP incoming window size.\r\n"); + LogError("Failed to set the AMQP incoming window size."); } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_115: [IoTHubTransportAMQP_DoWork shall apply a default value of 100 for the parameter 'AMQP outgoing window'] if (session_set_outgoing_window(transport_state->session, DEFAULT_OUTGOING_WINDOW_SIZE) != 0) { - LogError("Failed to set the AMQP outgoing window size.\r\n"); + LogError("Failed to set the AMQP outgoing window size."); } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_066: [IoTHubTransportAMQP_DoWork shall establish the CBS connection using the cbs_create() AMQP API] @@ -436,14 +446,14 @@ { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_067: [If cbs_create() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately] result = RESULT_FAILURE; - LogError("Failed to create the CBS connection.\r\n"); + LogError("Failed to create the CBS connection."); } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_139: [IoTHubTransportAMQP_DoWork shall open the CBS connection using the cbs_open() AMQP API] else if (cbs_open(transport_state->cbs) != 0) { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_140: [If cbs_open() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately] result = RESULT_FAILURE; - LogError("Failed to open the connection with CBS.\r\n"); + LogError("Failed to open the connection with CBS."); } else { @@ -475,12 +485,12 @@ if (newSASToken == NULL) { - LogError("Could not generate a new SAS token for the CBS\r\n"); + LogError("Could not generate a new SAS token for the CBS"); result = RESULT_FAILURE; } else if (cbs_put_token(transport_state->cbs, CBS_AUDIENCE, STRING_c_str(transport_state->devicesPath), STRING_c_str(newSASToken), on_put_token_complete, transport_state) != RESULT_OK) { - LogError("Failed applying new SAS token to CBS\r\n"); + LogError("Failed applying new SAS token to CBS"); result = RESULT_FAILURE; } else @@ -523,29 +533,29 @@ if ((attach_properties = amqpvalue_create_map()) == NULL) { - LogError("Failed to create the map for device client type.\r\n"); + LogError("Failed to create the map for device client type."); } else { if ((deviceClientTypeKeyName = amqpvalue_create_symbol("com.microsoft:client-version")) == NULL) { - LogError("Failed to create the key name for the device client type.\r\n"); + LogError("Failed to create the key name for the device client type."); } else { if ((deviceClientTypeValue = amqpvalue_create_string(CLIENT_DEVICE_TYPE_PREFIX CLIENT_DEVICE_BACKSLASH IOTHUB_SDK_VERSION)) == NULL) { - LogError("Failed to create the key value for the device client type.\r\n"); + LogError("Failed to create the key value for the device client type."); } else { if ((result = amqpvalue_set_map_value(attach_properties, deviceClientTypeKeyName, deviceClientTypeValue)) != 0) { - LogError("Failed to set the property map for the device client type. Error code is: %d\r\n", result); + LogError("Failed to set the property map for the device client type. Error code is: %d", result); } else if ((result = link_set_attach_properties(link, attach_properties)) != 0) { - LogError("Unable to attach the device client type to the link properties. Error code is: %d\r\n", result); + LogError("Unable to attach the device client type to the link properties. Error code is: %d", result); } amqpvalue_destroy(deviceClientTypeValue); @@ -570,6 +580,11 @@ } } +void on_event_sender_state_changed(void* context, MESSAGE_SENDER_STATE new_state, MESSAGE_SENDER_STATE previous_state) +{ + LogInfo("Event sender state changed [%d->%d]", previous_state, new_state); +} + static int createEventSender(AMQP_TRANSPORT_INSTANCE* transport_state) { int result = RESULT_FAILURE; @@ -582,32 +597,32 @@ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_068: [IoTHubTransportAMQP_DoWork shall create the AMQP link for sending messages using 'source' as "ingress", target as the IoT hub FQDN, link name as "sender-link" and role as 'role_sender'] if ((source = messaging_create_source(MESSAGE_SENDER_SOURCE_ADDRESS)) == NULL) { - LogError("Failed creating AMQP messaging source attribute.\r\n"); + LogError("Failed creating AMQP messaging source attribute."); } else if ((target = messaging_create_target(STRING_c_str(transport_state->targetAddress))) == NULL) { - LogError("Failed creating AMQP messaging target attribute.\r\n"); + LogError("Failed creating AMQP messaging target attribute."); } else if ((transport_state->sender_link = link_create(transport_state->session, MESSAGE_SENDER_LINK_NAME, role_sender, source, target)) == NULL) { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_069: [If IoTHubTransportAMQP_DoWork fails to create the AMQP link for sending messages, the function shall fail and return immediately, flagging the connection to be re-stablished] - LogError("Failed creating AMQP link for message sender.\r\n"); + LogError("Failed creating AMQP link for message sender."); } else { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_119: [IoTHubTransportAMQP_DoWork shall apply a default value of 65536 for the parameter 'Link MAX message size'] if (link_set_max_message_size(transport_state->sender_link, MESSAGE_SENDER_MAX_LINK_SIZE) != RESULT_OK) { - LogError("Failed setting AMQP link max message size.\r\n"); + LogError("Failed setting AMQP link max message size."); } attachDeviceClientTypeToLink(transport_state->sender_link); // Codes_SRS_IOTHUBTRANSPORTAMQP_09_070: [IoTHubTransportAMQP_DoWork shall create the AMQP message sender using messagesender_create() AMQP API] - if ((transport_state->message_sender = messagesender_create(transport_state->sender_link, NULL, NULL, NULL)) == NULL) + if ((transport_state->message_sender = messagesender_create(transport_state->sender_link, on_event_sender_state_changed, (void*)transport_state, NULL)) == NULL) { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_071: [IoTHubTransportAMQP_DoWork shall fail and return immediately if the AMQP message sender instance fails to be created, flagging the connection to be re-established] - LogError("Could not allocate AMQP message sender\r\n"); + LogError("Could not allocate AMQP message sender"); } else { @@ -615,7 +630,7 @@ if (messagesender_open(transport_state->message_sender) != RESULT_OK) { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_073: [IoTHubTransportAMQP_DoWork shall fail and return immediately if the AMQP message sender instance fails to be opened, flagging the connection to be re-established] - LogError("Failed opening the AMQP message sender.\r\n"); + LogError("Failed opening the AMQP message sender."); } else { @@ -641,7 +656,7 @@ { if (messagereceiver_close(transport_state->message_receiver) != RESULT_OK) { - LogError("Failed closing the AMQP message receiver.\r\n"); + LogError("Failed closing the AMQP message receiver."); } messagereceiver_destroy(transport_state->message_receiver); @@ -670,29 +685,29 @@ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_074: [IoTHubTransportAMQP_DoWork shall create the AMQP link for receiving messages using 'source' as messageReceiveAddress, target as the "ingress-rx", link name as "receiver-link" and role as 'role_receiver'] if ((source = messaging_create_source(STRING_c_str(transport_state->messageReceiveAddress))) == NULL) { - LogError("Failed creating AMQP message receiver source attribute.\r\n"); + LogError("Failed creating AMQP message receiver source attribute."); } else if ((target = messaging_create_target(MESSAGE_RECEIVER_TARGET_ADDRESS)) == NULL) { - LogError("Failed creating AMQP message receiver target attribute.\r\n"); + LogError("Failed creating AMQP message receiver target attribute."); } else if ((transport_state->receiver_link = link_create(transport_state->session, MESSAGE_RECEIVER_LINK_NAME, role_receiver, source, target)) == NULL) { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_075: [If IoTHubTransportAMQP_DoWork fails to create the AMQP link for receiving messages, the function shall fail and return immediately, flagging the connection to be re-stablished] - LogError("Failed creating AMQP link for message receiver.\r\n"); + LogError("Failed creating AMQP link for message receiver."); } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_076: [IoTHubTransportAMQP_DoWork shall set the receiver link settle mode as receiver_settle_mode_first] else if (link_set_rcv_settle_mode(transport_state->receiver_link, receiver_settle_mode_first) != RESULT_OK) { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_141: [If IoTHubTransportAMQP_DoWork fails to set the settle mode on the AMQP link for receiving messages, the function shall fail and return immediately, flagging the connection to be re-stablished] - LogError("Failed setting AMQP link settle mode for message receiver.\r\n"); + LogError("Failed setting AMQP link settle mode for message receiver."); } else { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_119: [IoTHubTransportAMQP_DoWork shall apply a default value of 65536 for the parameter 'Link MAX message size'] if (link_set_max_message_size(transport_state->receiver_link, MESSAGE_RECEIVER_MAX_LINK_SIZE) != RESULT_OK) { - LogError("Failed setting AMQP link max message size for message receiver.\r\n"); + LogError("Failed setting AMQP link max message size for message receiver."); } attachDeviceClientTypeToLink(transport_state->receiver_link); @@ -701,7 +716,7 @@ if ((transport_state->message_receiver = messagereceiver_create(transport_state->receiver_link, NULL, NULL)) == NULL) { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_078: [IoTHubTransportAMQP_DoWork shall fail and return immediately if the AMQP message receiver instance fails to be created, flagging the connection to be re-established] - LogError("Could not allocate AMQP message receiver.\r\n"); + LogError("Could not allocate AMQP message receiver."); } else { @@ -710,7 +725,7 @@ if (messagereceiver_open(transport_state->message_receiver, on_message_received, (const void*)iothub_client_handle) != RESULT_OK) { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_080: [IoTHubTransportAMQP_DoWork shall fail and return immediately if the AMQP message receiver instance fails to be opened, flagging the connection to be re-established] - LogError("Failed opening the AMQP message receiver.\r\n"); + LogError("Failed opening the AMQP message receiver."); } else { @@ -741,14 +756,14 @@ if (properties_map == NULL) { /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */ - LogError("Failed to get property map from IoTHub message.\r\n"); + LogError("Failed to get property map from IoTHub message."); result = __LINE__; } /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_015: [The actual keys and values, as well as the number of properties shall be obtained by calling Map_GetInternals on the handle obtained from IoTHubMessage_Properties.] */ else if (Map_GetInternals(properties_map, &propertyKeys, &propertyValues, &propertyCount) != MAP_OK) { /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */ - LogError("Failed to get the internals of the property map.\r\n"); + LogError("Failed to get the internals of the property map."); result = __LINE__; } else @@ -762,7 +777,7 @@ if (uamqp_map == NULL) { /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */ - LogError("Failed to create uAMQP map for the properties.\r\n"); + LogError("Failed to create uAMQP map for the properties."); result = __LINE__; } else @@ -774,7 +789,7 @@ if (map_key_value == NULL) { /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */ - LogError("Failed to create uAMQP property key value.\r\n"); + LogError("Failed to create uAMQP property key value."); break; } @@ -784,7 +799,7 @@ { amqpvalue_destroy(map_key_value); /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */ - LogError("Failed to create uAMQP property key value.\r\n"); + LogError("Failed to create uAMQP property key value."); break; } @@ -795,7 +810,7 @@ amqpvalue_destroy(map_key_value); amqpvalue_destroy(map_value_value); /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */ - LogError("Failed to create uAMQP property key value.\r\n"); + LogError("Failed to create uAMQP property key value."); break; } @@ -813,7 +828,7 @@ if (message_set_application_properties(uamqp_message, uamqp_map) != 0) { /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */ - LogError("Failed to transfer the message properties to the uAMQP message.\r\n"); + LogError("Failed to transfer the message properties to the uAMQP message."); result = __LINE__; } else @@ -856,7 +871,7 @@ if (contentType == IOTHUBMESSAGE_BYTEARRAY && IoTHubMessage_GetByteArray(message->messageHandle, &messageContent, &messageContentSize) != IOTHUB_MESSAGE_OK) { - LogError("Failed getting the BYTE array representation of the event content to be sent.\r\n"); + LogError("Failed getting the BYTE array representation of the event content to be sent."); is_message_error = true; } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_089: [If the event contains a message of type IOTHUBMESSAGE_STRING, IoTHubTransportAMQP_DoWork shall obtain its char* representation using IoTHubMessage_GetString()] @@ -864,19 +879,19 @@ else if (contentType == IOTHUBMESSAGE_STRING && ((messageContent = IoTHubMessage_GetString(message->messageHandle)) == NULL)) { - LogError("Failed getting the STRING representation of the event content to be sent.\r\n"); + LogError("Failed getting the STRING representation of the event content to be sent."); is_message_error = true; } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_092: [If the event contains a message of type IOTHUBMESSAGE_UNKNOWN, IoTHubTransportAMQP_DoWork shall remove the event from the in-progress list and invoke the upper layer callback reporting the error] else if (contentType == IOTHUBMESSAGE_UNKNOWN) { - LogError("Cannot send events with content type IOTHUBMESSAGE_UNKNOWN.\r\n"); + LogError("Cannot send events with content type IOTHUBMESSAGE_UNKNOWN."); is_message_error = true; } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_093: [IoTHubTransportAMQP_DoWork shall create an amqp message using message_create() uAMQP API] else if ((amqp_message = message_create()) == NULL) { - LogError("Failed allocating the AMQP message for sending the event.\r\n"); + LogError("Failed allocating the AMQP message for sending the event."); } else { @@ -893,7 +908,7 @@ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_095: [IoTHubTransportAMQP_DoWork shall set the AMQP message body using message_add_body_amqp_data() uAMQP API] if (message_add_body_amqp_data(amqp_message, binary_data) != RESULT_OK) { - LogError("Failed setting the body of the AMQP message.\r\n"); + LogError("Failed setting the body of the AMQP message."); } else { @@ -907,7 +922,7 @@ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_097: [IoTHubTransportAMQP_DoWork shall pass the encoded AMQP message to AMQP for sending (along with on_message_send_complete callback) using messagesender_send()] if (messagesender_send(transport_state->message_sender, amqp_message, on_message_send_complete, message) != RESULT_OK) { - LogError("Failed sending the AMQP message.\r\n"); + LogError("Failed sending the AMQP message."); } else { @@ -952,7 +967,10 @@ static void prepareForConnectionRetry(AMQP_TRANSPORT_INSTANCE* transport_state) { + destroyMessageReceiver(transport_state); + destroyEventSender(transport_state); destroyConnection(transport_state); + transport_state->connection_state = AMQP_MANAGEMENT_STATE_IDLE; rollEventsBackToWaitList(transport_state); } @@ -968,32 +986,32 @@ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_005: [If parameter config (or its fields) is NULL then IoTHubTransportAMQP_Create shall fail and return NULL.] if (config == NULL || config->upperConfig == NULL || config->waitingToSend == NULL) { - LogError("IoTHub AMQP client transport null configuration parameter.\r\n"); + LogError("IoTHub AMQP client transport null configuration parameter."); } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_006: [IoTHubTransportAMQP_Create shall fail and return NULL if any fields of the config structure are NULL.] else if (config->upperConfig->protocol == NULL) { - LogError("Invalid configuration (NULL protocol detected)\r\n"); + LogError("Invalid configuration (NULL protocol detected)"); } else if (config->upperConfig->deviceId == NULL) { - LogError("Invalid configuration (NULL deviceId detected)\r\n"); + LogError("Invalid configuration (NULL deviceId detected)"); } else if (config->upperConfig->deviceKey == NULL) { - LogError("Invalid configuration (NULL deviceKey detected)\r\n"); + LogError("Invalid configuration (NULL deviceKey detected)"); } else if (config->upperConfig->iotHubName == NULL) { - LogError("Invalid configuration (NULL iotHubName detected)\r\n"); + LogError("Invalid configuration (NULL iotHubName detected)"); } else if (config->upperConfig->iotHubSuffix == NULL) { - LogError("Invalid configuration (NULL iotHubSuffix detected)\r\n"); + LogError("Invalid configuration (NULL iotHubSuffix detected)"); } else if (!config->waitingToSend) { - LogError("Invalid configuration (NULL waitingToSend list detected)\r\n"); + LogError("Invalid configuration (NULL waitingToSend list detected)"); } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_008: [IoTHubTransportAMQP_Create shall fail and return NULL if any config field of type string is zero length.] else if ((deviceIdLength = strlen(config->upperConfig->deviceId)) == 0 || @@ -1001,17 +1019,17 @@ (strlen(config->upperConfig->iotHubName) == 0) || (strlen(config->upperConfig->iotHubSuffix) == 0)) { - LogError("Zero-length config parameter (deviceId, deviceKey, iotHubName or iotHubSuffix)\r\n"); + LogError("Zero-length config parameter (deviceId, deviceKey, iotHubName or iotHubSuffix)"); } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_007: [IoTHubTransportAMQP_Create shall fail and return NULL if the deviceId length is greater than 128.] else if (deviceIdLength > 128U) { - LogError("deviceId is too long\r\n"); + LogError("deviceId is too long"); } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_134: [IoTHubTransportAMQP_Create shall fail and return NULL if the combined length of config->iotHubName and config->iotHubSuffix exceeds 254 bytes (RFC1035)] else if ((strlen(config->upperConfig->iotHubName) + strlen(config->upperConfig->iotHubSuffix)) > (RFC1035_MAX_FQDN_LENGTH - 1)) { - LogError("The lengths of iotHubName and iotHubSuffix together exceed the maximum FQDN length allowed (RFC 1035)\r\n"); + LogError("The lengths of iotHubName and iotHubSuffix together exceed the maximum FQDN length allowed (RFC 1035)"); } else { @@ -1020,7 +1038,7 @@ if (transport_state == NULL) { - LogError("Could not allocate AMQP transport state\r\n"); + LogError("Could not allocate AMQP transport state"); } else { @@ -1058,35 +1076,35 @@ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_010: [IoTHubTransportAMQP_Create shall create an immutable string, referred to as iotHubHostFqdn, from the following pieces: config->iotHubName + "." + config->iotHubSuffix.] if ((transport_state->iotHubHostFqdn = concat3Params(config->upperConfig->iotHubName, ".", config->upperConfig->iotHubSuffix)) == NULL) { - LogError("Failed to set transport_state->iotHubHostFqdn.\r\n"); + LogError("Failed to set transport_state->iotHubHostFqdn."); cleanup_required = true; } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_012: [IoTHubTransportAMQP_Create shall create an immutable string, referred to as devicesPath, from the following parts: host_fqdn + "/devices/" + deviceId.] else if ((transport_state->devicesPath = concat3Params(STRING_c_str(transport_state->iotHubHostFqdn), "/devices/", config->upperConfig->deviceId)) == NULL) { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_013: [If creating devicesPath fails for any reason then IoTHubTransportAMQP_Create shall fail and return NULL.] - LogError("Failed to allocate transport_state->devicesPath.\r\n"); + LogError("Failed to allocate transport_state->devicesPath."); cleanup_required = true; } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_014: [IoTHubTransportAMQP_Create shall create an immutable string, referred to as targetAddress, from the following parts: "amqps://" + devicesPath + "/messages/events".] else if ((transport_state->targetAddress = concat3Params("amqps://", STRING_c_str(transport_state->devicesPath), "/messages/events")) == NULL) { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_015: [If creating the targetAddress fails for any reason then IoTHubTransportAMQP_Create shall fail and return NULL.] - LogError("Failed to allocate transport_state->targetAddress.\r\n"); + LogError("Failed to allocate transport_state->targetAddress."); cleanup_required = true; } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_053: [IoTHubTransportAMQP_Create shall define the source address for receiving messages as "amqps://" + devicesPath + "/messages/devicebound", stored in the transport handle as messageReceiveAddress] else if ((transport_state->messageReceiveAddress = concat3Params("amqps://", STRING_c_str(transport_state->devicesPath), "/messages/devicebound")) == NULL) { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_054: [If creating the messageReceiveAddress fails for any reason then IoTHubTransportAMQP_Create shall fail and return NULL.] - LogError("Failed to allocate transport_state->messageReceiveAddress.\r\n"); + LogError("Failed to allocate transport_state->messageReceiveAddress."); cleanup_required = true; } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_016: [IoTHubTransportAMQP_Create shall initialize handle->sasTokenKeyName with a zero-length STRING_HANDLE instance.] else if ((transport_state->sasTokenKeyName = STRING_new()) == NULL) { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_017: [If IoTHubTransportAMQP_Create fails to initialize handle->sasTokenKeyName with a zero-length STRING the function shall fail and return NULL.] - LogError("Failed to allocate transport_state->sasTokenKeyName.\r\n"); + LogError("Failed to allocate transport_state->sasTokenKeyName."); cleanup_required = true; } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_018: [IoTHubTransportAMQP_Create shall store a copy of config->deviceKey (passed by upper layer) into the transport's own deviceKey field] @@ -1094,7 +1112,7 @@ STRING_copy(transport_state->deviceKey, config->upperConfig->deviceKey) != 0) { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_019: [If IoTHubTransportAMQP_Create fails to copy config->deviceKey, the function shall fail and return NULL.] - LogError("Failed to allocate transport_state->deviceKey.\r\n"); + LogError("Failed to allocate transport_state->deviceKey."); cleanup_required = true; } else @@ -1176,12 +1194,12 @@ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_051: [IoTHubTransportAMQP_DoWork shall fail and return immediately if the transport handle parameter is NULL] if (handle == NULL) { - LogError("IoTHubClient DoWork failed: transport handle parameter is NULL.\r\n"); + LogError("IoTHubClient DoWork failed: transport handle parameter is NULL."); } // Codes_[IoTHubTransportAMQP_DoWork shall fail and return immediately if the client handle parameter is NULL] else if (iotHubClientHandle == NULL) { - LogError("IoTHubClient DoWork failed: client handle parameter is NULL.\r\n"); + LogError("IoTHubClient DoWork failed: client handle parameter is NULL."); } else { @@ -1191,11 +1209,17 @@ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_147: [IoTHubTransportAMQP_DoWork shall save a reference to the client handle in transport_state->iothub_client_handle] transport_state->iothub_client_handle = iotHubClientHandle; + if (transport_state->connection != NULL && + transport_state->connection_state == AMQP_MANAGEMENT_STATE_ERROR) + { + LogError("An error occured on AMQP connection. The connection will be restablished."); + trigger_connection_retry = true; + } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_055: [If the transport handle has a NULL connection, IoTHubTransportAMQP_DoWork shall instantiate and initialize the AMQP components and establish the connection] - if (transport_state->connection == NULL && + else if (transport_state->connection == NULL && establishConnection(transport_state) != RESULT_OK) { - LogError("AMQP transport failed to establish connection with service.\r\n"); + LogError("AMQP transport failed to establish connection with service."); trigger_connection_retry = true; } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_081: [IoTHubTransportAMQP_DoWork shall put a new SAS token if the one has not been out already, or if the previous one failed to be put due to timeout of cbs_put_token().] @@ -1204,14 +1228,14 @@ startAuthentication(transport_state) != RESULT_OK) { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_146: [If the SAS token fails to be sent to CBS (cbs_put_token), IoTHubTransportAMQP_DoWork shall fail and exit immediately] - LogError("Failed authenticating AMQP connection within CBS.\r\n"); + LogError("Failed authenticating AMQP connection within CBS."); trigger_connection_retry = true; } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_084: [IoTHubTransportAMQP_DoWork shall wait for 'cbs_request_timeout' milliseconds for the cbs_put_token() to complete before failing due to timeout] else if (transport_state->cbs_state == CBS_STATE_AUTH_IN_PROGRESS && verifyAuthenticationTimeout(transport_state) == RESULT_TIMEOUT) { - LogError("AMQP transport authentication timed out.\r\n"); + LogError("AMQP transport authentication timed out."); trigger_connection_retry = true; } else if (transport_state->cbs_state == CBS_STATE_AUTHENTICATED) @@ -1221,7 +1245,7 @@ transport_state->message_receiver == NULL && createMessageReceiver(transport_state, iotHubClientHandle) != RESULT_OK) { - LogError("Failed creating AMQP transport message receiver.\r\n"); + LogError("Failed creating AMQP transport message receiver."); trigger_connection_retry = true; } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_122: [IoTHubTransportAMQP_DoWork shall destroy the transport_state->message_receiver (and set it to NULL) if it exists and transport_state->receive_messages is false] @@ -1229,18 +1253,18 @@ transport_state->message_receiver != NULL && destroyMessageReceiver(transport_state) != RESULT_OK) { - LogError("Failed destroying AMQP transport message receiver.\r\n"); + LogError("Failed destroying AMQP transport message receiver."); } if (transport_state->message_sender == NULL && createEventSender(transport_state) != RESULT_OK) { - LogError("Failed creating AMQP transport event sender.\r\n"); + LogError("Failed creating AMQP transport event sender."); trigger_connection_retry = true; } else if (sendPendingEvents(transport_state) != RESULT_OK) { - LogError("AMQP transport failed sending events.\r\n"); + LogError("AMQP transport failed sending events."); } } @@ -1263,7 +1287,7 @@ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_037: [IoTHubTransportAMQP_Subscribe shall fail if the transport handle parameter received is NULL.] if (handle == NULL) { - LogError("Invalid handle to IoTHubClient AMQP transport.\r\n"); + LogError("Invalid handle to IoTHubClient AMQP transport."); result = __LINE__; } else @@ -1282,7 +1306,7 @@ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_039: [IoTHubTransportAMQP_Unsubscribe shall fail if the transport handle parameter received is NULL.] if (handle == NULL) { - LogError("Invalid handle to IoTHubClient AMQP transport.\r\n"); + LogError("Invalid handle to IoTHubClient AMQP transport."); } else { @@ -1300,12 +1324,12 @@ if (handle == NULL) { result = IOTHUB_CLIENT_INVALID_ARG; - LogError("Invalid handle to IoTHubClient AMQP transport instance.\r\n"); + LogError("Invalid handle to IoTHubClient AMQP transport instance."); } else if (iotHubClientStatus == NULL) { result = IOTHUB_CLIENT_INVALID_ARG; - LogError("Invalid pointer to output parameter IOTHUB_CLIENT_STATUS.\r\n"); + LogError("Invalid pointer to output parameter IOTHUB_CLIENT_STATUS."); } else { @@ -1342,7 +1366,7 @@ ) { result = IOTHUB_CLIENT_INVALID_ARG; - LogError("Invalid parameter (NULL) passed to AMQP transport SetOption()\r\n"); + LogError("Invalid parameter (NULL) passed to AMQP transport SetOption()"); } else { @@ -1373,7 +1397,7 @@ (transport_state->tls_io = transport_state->tls_io_transport_provider(STRING_c_str(transport_state->iotHubHostFqdn), transport_state->iotHubPort)) == NULL) { result = IOTHUB_CLIENT_ERROR; - LogError("Failed to obtain a TLS I/O transport layer.\r\n"); + LogError("Failed to obtain a TLS I/O transport layer."); } else { @@ -1385,7 +1409,7 @@ else { result = IOTHUB_CLIENT_ERROR; - LogError("Invalid option (%s) passed to uAMQP transport SetOption()\r\n", option); + LogError("Invalid option (%s) passed to uAMQP transport SetOption()", option); } } }