Microsoft Azure IoTHub client AMQP transport
Dependents: sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more
This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks
Diff: iothubtransportamqp.c
- Revision:
- 10:75c5e0d8537d
- Parent:
- 9:8abcd13782f4
- Child:
- 11:62d7b956e76e
--- a/iothubtransportamqp.c Tue Feb 16 14:24:48 2016 -0800 +++ b/iothubtransportamqp.c Fri Mar 11 16:59:34 2016 -0800 @@ -5,11 +5,15 @@ #ifdef _CRTDBG_MAP_ALLOC #include <crtdbg.h> #endif +#include <stdint.h> +#include <time.h> +#include <limits.h> #include "gballoc.h" #include "cbs.h" #include "link.h" #include "message.h" +#include "amqpvalue.h" #include "message_receiver.h" #include "message_sender.h" #include "messaging.h" @@ -35,7 +39,6 @@ #define RESULT_FAILURE 1 #define RESULT_TIMEOUT 2 -#define TIME_MAX SIZE_MAX #define RFC1035_MAX_FQDN_LENGTH 255 #define DEFAULT_IOTHUB_AMQP_PORT 5671 #define DEFAULT_SAS_TOKEN_LIFETIME_MS 3600000 @@ -43,7 +46,7 @@ #define DEFAULT_MESSAGE_SEND_TIMEOUT_MS 300000 #define CBS_AUDIENCE "servicebus.windows.net:sastoken" #define DEFAULT_CONTAINER_ID "default_container_id" -#define DEFAULT_INCOMING_WINDOW_SIZE SIZE_MAX +#define DEFAULT_INCOMING_WINDOW_SIZE UINT_MAX #define DEFAULT_OUTGOING_WINDOW_SIZE 100 #define MESSAGE_RECEIVER_LINK_NAME "receiver-link" #define MESSAGE_RECEIVER_TARGET_ADDRESS "ingress-rx" @@ -63,71 +66,73 @@ typedef struct AMQP_TRANSPORT_STATE_TAG { - // FQDN of the IoT Hub. - STRING_HANDLE iotHubHostFqdn; - // AMQP port of the IoT Hub. - int iotHubPort; - // Certificates to be used by the TLS I/O. - char* trusted_certificates; - // Key associated to the device to be used. - STRING_HANDLE deviceKey; - // Address to which the transport will connect to and send events. - STRING_HANDLE targetAddress; - // Address to which the transport will connect to and receive messages from. - STRING_HANDLE messageReceiveAddress; - // A component of the SAS token. Currently this must be an empty string. - STRING_HANDLE sasTokenKeyName; - // Internal parameter that identifies the current logical device within the service. - STRING_HANDLE devicesPath; - // How long a SAS token created by the transport is valid, in milliseconds. - size_t sas_token_lifetime; - // Maximum period of time for the transport to wait before refreshing the SAS token it created previously, in milliseconds. - size_t sas_token_refresh_time; - // Maximum time the transport waits for uAMQP cbs_put_token() to complete before marking it a failure, in milliseconds. - size_t cbs_request_timeout; - // Maximum time the transport waits for an event to be sent before marking it a failure, in milliseconds. - size_t message_send_timeout; - // Maximum time for the connection establishment/retry logic should wait for a connection to succeed, in milliseconds. - size_t connection_timeout; - // Saved reference to the IoTHub LL Client. - IOTHUB_CLIENT_LL_HANDLE iothub_client_handle; - - // TSL I/O transport. - XIO_HANDLE tls_io; - // Pointer to the function that creates the TLS I/O (internal use only). - TLS_IO_TRANSPORT_PROVIDER tls_io_transport_provider; - // AMQP SASL I/O transport created on top of the TLS I/O layer. - XIO_HANDLE sasl_io; - // AMQP SASL I/O mechanism to be used. - SASL_MECHANISM_HANDLE sasl_mechanism; - // AMQP connection. - CONNECTION_HANDLE connection; - // Current AMQP connection state; - AMQP_MANAGEMENT_STATE connection_state; - // Last time the AMQP connection establishment was initiated. - size_t connection_establish_time; - // AMQP session. - SESSION_HANDLE session; - // AMQP link used by the event sender. - LINK_HANDLE sender_link; - // uAMQP event sender. - MESSAGE_SENDER_HANDLE message_sender; - // Internal flag that controls if messages should be received or not. - bool receive_messages; - // AMQP link used by the message receiver. - LINK_HANDLE receiver_link; - // uAMQP message receiver. - MESSAGE_RECEIVER_HANDLE message_receiver; - // List with events still pending to be sent. It is provided by the upper layer. - PDLIST_ENTRY waitingToSend; - // Internal list with the items currently being processed/sent through uAMQP. - DLIST_ENTRY inProgress; - // Connection instance with the Azure IoT CBS. - CBS_HANDLE cbs; - // Current state of the CBS connection. - CBS_STATE cbs_state; - // Time when the current SAS token was created, in seconds since epoch. - size_t current_sas_token_create_time; + // FQDN of the IoT Hub. + STRING_HANDLE iotHubHostFqdn; + // AMQP port of the IoT Hub. + int iotHubPort; + // Certificates to be used by the TLS I/O. + char* trusted_certificates; + // Key associated to the device to be used. + STRING_HANDLE deviceKey; + // Address to which the transport will connect to and send events. + STRING_HANDLE targetAddress; + // Address to which the transport will connect to and receive messages from. + STRING_HANDLE messageReceiveAddress; + // A component of the SAS token. Currently this must be an empty string. + STRING_HANDLE sasTokenKeyName; + // Internal parameter that identifies the current logical device within the service. + STRING_HANDLE devicesPath; + // How long a SAS token created by the transport is valid, in milliseconds. + size_t sas_token_lifetime; + // Maximum period of time for the transport to wait before refreshing the SAS token it created previously, in milliseconds. + size_t sas_token_refresh_time; + // Maximum time the transport waits for uAMQP cbs_put_token() to complete before marking it a failure, in milliseconds. + size_t cbs_request_timeout; + // Maximum time the transport waits for an event to be sent before marking it a failure, in milliseconds. + size_t message_send_timeout; + // Maximum time for the connection establishment/retry logic should wait for a connection to succeed, in milliseconds. + size_t connection_timeout; + // Saved reference to the IoTHub LL Client. + IOTHUB_CLIENT_LL_HANDLE iothub_client_handle; + + // TSL I/O transport. + XIO_HANDLE tls_io; + // Pointer to the function that creates the TLS I/O (internal use only). + TLS_IO_TRANSPORT_PROVIDER tls_io_transport_provider; + // AMQP SASL I/O transport created on top of the TLS I/O layer. + XIO_HANDLE sasl_io; + // AMQP SASL I/O mechanism to be used. + SASL_MECHANISM_HANDLE sasl_mechanism; + // AMQP connection. + CONNECTION_HANDLE connection; + // Current AMQP connection state; + AMQP_MANAGEMENT_STATE connection_state; + // Last time the AMQP connection establishment was initiated. + size_t connection_establish_time; + // AMQP session. + SESSION_HANDLE session; + // AMQP link used by the event sender. + LINK_HANDLE sender_link; + // uAMQP event sender. + MESSAGE_SENDER_HANDLE message_sender; + // Internal flag that controls if messages should be received or not. + bool receive_messages; + // AMQP link used by the message receiver. + LINK_HANDLE receiver_link; + // uAMQP message receiver. + MESSAGE_RECEIVER_HANDLE message_receiver; + // List with events still pending to be sent. It is provided by the upper layer. + PDLIST_ENTRY waitingToSend; + // Internal list with the items currently being processed/sent through uAMQP. + DLIST_ENTRY inProgress; + // Connection instance with the Azure IoT CBS. + CBS_HANDLE cbs; + // Current state of the CBS connection. + CBS_STATE cbs_state; + // Time when the current SAS token was created, in seconds since epoch. + size_t current_sas_token_create_time; + // Mark if device is registered in transport (only one device per transport). + bool isRegistered; } AMQP_TRANSPORT_INSTANCE; // This structure is used to track an event being sent and the time it was sent. @@ -145,29 +150,33 @@ static STRING_HANDLE concat3Params(const char* prefix, const char* infix, const char* suffix) { STRING_HANDLE result = NULL; - char* concat = NULL; + char* concat; size_t totalLength = strlen(prefix) + strlen(infix) + strlen(suffix) + 1; // One extra for \0. - if ((concat = (char*)malloc(sizeof(char) * totalLength)) != NULL) + if ((concat = (char*)malloc(totalLength)) != NULL) { - if (snprintf(concat, totalLength, "%s%s%s", prefix, infix, suffix) > 0) - { - result = STRING_construct(concat); - } + (void)strcpy(concat, prefix); + (void)strcat(concat, infix); + (void)strcat(concat, suffix); + result = STRING_construct(concat); free(concat); } + else + { + result = NULL; + } return result; } -static size_t getSecondsSinceEpoch() +static size_t getSecondsSinceEpoch(void) { - return (size_t)(difftime(get_time(NULL), (time_t)0) + 0); + return (size_t)(difftime(get_time(NULL), (time_t)0)); } static EVENT_TRACKER* trackEvent(IOTHUB_MESSAGE_LIST* message, AMQP_TRANSPORT_INSTANCE* transport_state) { - EVENT_TRACKER* event_tracker = NULL; + EVENT_TRACKER* event_tracker; if ((event_tracker = (EVENT_TRACKER*)malloc(sizeof(EVENT_TRACKER))) != NULL) { @@ -190,13 +199,17 @@ static IOTHUB_MESSAGE_LIST* getNextEventToSend(AMQP_TRANSPORT_INSTANCE* transport_state) { - IOTHUB_MESSAGE_LIST* message = NULL; + IOTHUB_MESSAGE_LIST* message; if (!DList_IsListEmpty(transport_state->waitingToSend)) { PDLIST_ENTRY list_entry = transport_state->waitingToSend->Flink; message = containingRecord(list_entry, IOTHUB_MESSAGE_LIST, entry); } + else + { + message = NULL; + } return message; } @@ -231,46 +244,43 @@ } } -static void on_message_send_complete(const void* context, MESSAGE_SEND_RESULT send_result) +static void on_message_send_complete(void* context, MESSAGE_SEND_RESULT send_result) { - if (context != NULL) + EVENT_TRACKER* event_tracker = (EVENT_TRACKER*)context; + + IOTHUB_CLIENT_RESULT iot_hub_send_result; + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_142: [The callback 'on_message_send_complete' shall pass to the upper layer callback an IOTHUB_CLIENT_CONFIRMATION_OK if the result received is MESSAGE_SEND_OK] + if (send_result == MESSAGE_SEND_OK) { - EVENT_TRACKER* event_tracker = (EVENT_TRACKER*)context; - - IOTHUB_CLIENT_RESULT iot_hub_send_result; + iot_hub_send_result = IOTHUB_CLIENT_CONFIRMATION_OK; + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_143: [The callback 'on_message_send_complete' shall pass to the upper layer callback an IOTHUB_CLIENT_CONFIRMATION_ERROR if the result received is MESSAGE_SEND_ERROR] + else if (send_result == MESSAGE_SEND_ERROR) + { + iot_hub_send_result = IOTHUB_CLIENT_CONFIRMATION_ERROR; + } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_142: [The callback on_message_send_complete shall pass to the upper layer callback an IOTHUB_CLIENT_CONFIRMATION_OK if the result received is MESSAGE_SEND_OK] - if (send_result == MESSAGE_SEND_OK) - { - iot_hub_send_result = IOTHUB_CLIENT_CONFIRMATION_OK; - } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_143: [The callback on_message_send_complete shall pass to the upper layer callback an IOTHUB_CLIENT_CONFIRMATION_ERROR if the result received is MESSAGE_SEND_ERROR] - else if (send_result == MESSAGE_SEND_ERROR) - { - iot_hub_send_result = IOTHUB_CLIENT_CONFIRMATION_ERROR; - } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_102: [The callback 'on_message_send_complete' shall invoke the upper layer callback for message received if provided] + if (event_tracker->message->callback != NULL) + { + event_tracker->message->callback(iot_hub_send_result, event_tracker->message->context); + } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_102: [The callback on_message_send_complete shall invoke the upper layer callback for message received if provided] - if (event_tracker->message->callback != NULL) - { - event_tracker->message->callback(iot_hub_send_result, event_tracker->message->context); - } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_151: [The callback 'on_message_send_complete' shall destroy the message handle (IOTHUB_MESSAGE_HANDLE) using IoTHubMessage_Destroy()] + IoTHubMessage_Destroy(event_tracker->message->messageHandle); + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_152: [The callback 'on_message_send_complete' shall destroy the IOTHUB_MESSAGE_LIST instance] + free(event_tracker->message); - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_151: [The callback on_message_send_complete shall destroy the message handle (IOTHUB_MESSAGE_HANDLE) using IoTHubMessage_Destroy()] - IoTHubMessage_Destroy(event_tracker->message->messageHandle); - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_152: [The callback on_message_send_complete shall destroy the IOTHUB_MESSAGE_LIST instance] - free(event_tracker->message); - - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_100: [The callback on_message_send_complete shall remove the target message from the in-progress list after the upper layer callback] - if (isEventInInProgressList(event_tracker)) - { - removeEventFromInProgressList(event_tracker); - destroyEventTracker(event_tracker); - } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_100: [The callback 'on_message_send_complete' shall remove the target message from the in-progress list after the upper layer callback] + if (isEventInInProgressList(event_tracker)) + { + removeEventFromInProgressList(event_tracker); + destroyEventTracker(event_tracker); } } -static void on_put_token_complete(const void* context, CBS_OPERATION_RESULT operation_result, unsigned int status_code, const char* status_description) +static void on_put_token_complete(void* context, CBS_OPERATION_RESULT operation_result, unsigned int status_code, const char* status_description) { AMQP_TRANSPORT_INSTANCE* transportState = (AMQP_TRANSPORT_INSTANCE*)context; @@ -280,16 +290,14 @@ } } -AMQP_VALUE on_message_received(const void* context, MESSAGE_HANDLE message) +static AMQP_VALUE on_message_received(const void* context, MESSAGE_HANDLE message) { AMQP_VALUE result = NULL; - IOTHUBMESSAGE_DISPOSITION_RESULT disposition_result = IOTHUBMESSAGE_REJECTED; + IOTHUBMESSAGE_DISPOSITION_RESULT disposition_result; - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_104: [The callback on_message_received shall invoke IoTHubClient_LL_MessageCallback() passing the client and the incoming message handles as parameters] - IOTHUB_CLIENT_LL_HANDLE iothub_client_handle = (IOTHUB_CLIENT_LL_HANDLE)context; + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_104: [The callback 'on_message_received' shall invoke IoTHubClient_LL_MessageCallback() passing the client and the incoming message handles as parameters] IOTHUB_MESSAGE_HANDLE iothub_message = NULL; MESSAGE_BODY_TYPE body_type; - if (message_get_body_type(message, &body_type) != 0) { @@ -313,23 +321,24 @@ if (iothub_message == NULL) { + disposition_result = IOTHUBMESSAGE_REJECTED; LogError("Transport failed processing the message received.\r\n"); } else { disposition_result = IoTHubClient_LL_MessageCallback((IOTHUB_CLIENT_LL_HANDLE)context, iothub_message); - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_105: [The callback on_message_received shall return the result of messaging_delivery_accepted() if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_ACCEPTED] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_105: [The callback 'on_message_received' shall return the result of messaging_delivery_accepted() if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_ACCEPTED] if (disposition_result == IOTHUBMESSAGE_ACCEPTED) { result = messaging_delivery_accepted(); } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_106: [The callback on_message_received shall return the result of messaging_delivery_released() if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_ABANDONED] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_106: [The callback 'on_message_received' shall return the result of messaging_delivery_released() if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_ABANDONED] else if (disposition_result == IOTHUBMESSAGE_ABANDONED) { result = messaging_delivery_released(); } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_107: [The callback on_message_received shall return the result of messaging_delivery_rejected(Rejected by application, Rejected by application) if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_REJECTED] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_107: [The callback 'on_message_received' shall return the result of messaging_delivery_rejected("Rejected by application", "Rejected by application") if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_REJECTED] else if (disposition_result == IOTHUBMESSAGE_REJECTED) { result = messaging_delivery_rejected("Rejected by application", "Rejected by application"); @@ -339,61 +348,60 @@ return result; } -XIO_HANDLE getTLSIOTransport(const char* fqdn, int port, const char* certificates) +static XIO_HANDLE getTLSIOTransport(const char* fqdn, int port, const char* certificates) { - TLSIO_CONFIG tls_io_config = { fqdn, port }; - const IO_INTERFACE_DESCRIPTION* io_interface_description = platform_get_default_tlsio(); - return xio_create(io_interface_description, &tls_io_config, NULL); + TLSIO_CONFIG tls_io_config = { fqdn, port }; + const IO_INTERFACE_DESCRIPTION* io_interface_description = platform_get_default_tlsio(); + return xio_create(io_interface_description, &tls_io_config, NULL); } static void destroyConnection(AMQP_TRANSPORT_INSTANCE* transport_state) { - if (transport_state != NULL) + if (transport_state->cbs != NULL) { - if (transport_state->cbs != NULL) - { - cbs_destroy(transport_state->cbs); - transport_state->cbs = NULL; - } + cbs_destroy(transport_state->cbs); + transport_state->cbs = NULL; + } - if (transport_state->session != NULL) - { - session_destroy(transport_state->session); - transport_state->session = NULL; - } + if (transport_state->session != NULL) + { + session_destroy(transport_state->session); + transport_state->session = NULL; + } - if (transport_state->connection != NULL) - { - connection_destroy(transport_state->connection); - transport_state->connection = NULL; - } + if (transport_state->connection != NULL) + { + connection_destroy(transport_state->connection); + transport_state->connection = NULL; + } - if (transport_state->sasl_io != NULL) - { - xio_destroy(transport_state->sasl_io); - transport_state->sasl_io = NULL; - } + if (transport_state->sasl_io != NULL) + { + xio_destroy(transport_state->sasl_io); + transport_state->sasl_io = NULL; + } - if (transport_state->sasl_mechanism != NULL) - { - saslmechanism_destroy(transport_state->sasl_mechanism); - transport_state->sasl_mechanism = NULL; - } + if (transport_state->sasl_mechanism != NULL) + { + saslmechanism_destroy(transport_state->sasl_mechanism); + transport_state->sasl_mechanism = NULL; + } - if (transport_state->tls_io != NULL) - { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_034: [IoTHubTransportAMQP_Destroy shall destroy the AMQP TLS I/O transport.] - xio_destroy(transport_state->tls_io); - transport_state->tls_io = NULL; - } - } + if (transport_state->tls_io != NULL) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_034: [IoTHubTransportAMQP_Destroy shall destroy the AMQP TLS I/O transport.] + xio_destroy(transport_state->tls_io); + transport_state->tls_io = NULL; + } } static void on_amqp_management_state_changed(void* context, AMQP_MANAGEMENT_STATE new_amqp_management_state, AMQP_MANAGEMENT_STATE previous_amqp_management_state) { - if (context != NULL) + (void)previous_amqp_management_state; + AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)context; + + if (transport_state != NULL) { - AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)context; transport_state->connection_state = new_amqp_management_state; } } @@ -404,13 +412,13 @@ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_110: [IoTHubTransportAMQP_DoWork shall create the TLS IO using transport_state->io_transport_provider callback function] if (transport_state->tls_io == NULL && - (transport_state->tls_io = transport_state->tls_io_transport_provider(STRING_c_str(transport_state->iotHubHostFqdn), transport_state->iotHubPort, transport_state->trusted_certificates)) == NULL) + (transport_state->tls_io = transport_state->tls_io_transport_provider(STRING_c_str(transport_state->iotHubHostFqdn), transport_state->iotHubPort, transport_state->trusted_certificates)) == NULL) { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_136: [If transport_state->io_transport_provider_callback fails, IoTHubTransportAMQP_DoWork shall fail and return immediately] result = RESULT_FAILURE; - LogError("Failed to obtain a TLS I/O transport layer.\r\n"); + LogError("Failed to obtain a TLS I/O transport layer.\r\n"); } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_056: [IoTHubTransportAMQP_DoWork shall create the SASL mechanism using AMQPs saslmechanism_create() API] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_056: [IoTHubTransportAMQP_DoWork shall create the SASL mechanism using AMQP's saslmechanism_create() API] else if ((transport_state->sasl_mechanism = saslmechanism_create(saslmssbcbs_get_interface(), NULL)) == NULL) { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_057: [If saslmechanism_create() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately] @@ -443,13 +451,13 @@ } else { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_065: [IoTHubTransportAMQP_DoWork shall apply a default value of UINT_MAX for the parameter AMQP incoming window] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_065: [IoTHubTransportAMQP_DoWork shall apply a default value of UINT_MAX for the parameter 'AMQP incoming window'] if (session_set_incoming_window(transport_state->session, (uint32_t)DEFAULT_INCOMING_WINDOW_SIZE) != 0) { LogError("Failed to set the AMQP incoming window size.\r\n"); } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_115: [IoTHubTransportAMQP_DoWork shall apply a default value of 100 for the parameter AMQP outgoing window] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_115: [IoTHubTransportAMQP_DoWork shall apply a default value of 100 for the parameter 'AMQP outgoing window'] if (session_set_outgoing_window(transport_state->session, DEFAULT_OUTGOING_WINDOW_SIZE) != 0) { LogError("Failed to set the AMQP outgoing window size.\r\n"); @@ -463,7 +471,7 @@ LogError("Failed to create the CBS connection.\r\n"); } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_139: [IoTHubTransportAMQP_DoWork shall open the CBS connection using the cbs_open() AMQP API] - else if ((cbs_open(transport_state->cbs)) != 0) + else if (cbs_open(transport_state->cbs) != 0) { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_140: [If cbs_open() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately] result = RESULT_FAILURE; @@ -488,11 +496,11 @@ static int startAuthentication(AMQP_TRANSPORT_INSTANCE* transport_state) { - int result = RESULT_FAILURE; + int result; size_t sas_token_create_time = getSecondsSinceEpoch(); // I.e.: NOW, in seconds since epoch. - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_083: [Each new SAS token created by the transport shall be valid for up to sas_token_lifetime milliseconds from the time of creation] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_083: [Each new SAS token created by the transport shall be valid for up to 'sas_token_lifetime' milliseconds from the time of creation] size_t new_expiry_time = sas_token_create_time + (transport_state->sas_token_lifetime / 1000); STRING_HANDLE newSASToken = SASToken_Create(transport_state->deviceKey, transport_state->devicesPath, transport_state->sasTokenKeyName, new_expiry_time); @@ -500,10 +508,12 @@ if (newSASToken == NULL) { LogError("Could not generate a new SAS token for the CBS\r\n"); + result = RESULT_FAILURE; } else if (cbs_put_token(transport_state->cbs, CBS_AUDIENCE, STRING_c_str(transport_state->devicesPath), STRING_c_str(newSASToken), on_put_token_complete, transport_state) != RESULT_OK) { LogError("Failed applying new SAS token to CBS\r\n"); + result = RESULT_FAILURE; } else { @@ -514,7 +524,9 @@ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_145: [Each new SAS token created shall be deleted from memory immediately after sending it to CBS] if (newSASToken != NULL) + { STRING_delete(newSASToken); + } return result; } @@ -533,10 +545,10 @@ { EVENT_TRACKER* event_tracker = containingRecord(entry, EVENT_TRACKER, entry); - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_085: [IoTHubTransportAMQP_DoWork shall attempt to send all the queued messages for up to message_send_timeout milliseconds] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_085: [IoTHubTransportAMQP_DoWork shall attempt to send all the queued messages for up to 'message_send_timeout' milliseconds] if (difftime(current_time, event_tracker->time_sent) * 1000 >= transport_state->message_send_timeout) { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_120: [If a message_send_timeout occurs the timed out events removed from the inProgress and the upper layer notified of the send error] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_120: [If a 'message_send_timeout' occurs the timed out events removed from the inProgress and the upper layer notified of the send error] on_message_send_complete(event_tracker, MESSAGE_SEND_ERROR); } @@ -546,9 +558,9 @@ static void attachDeviceClientTypeToLink(LINK_HANDLE link) { - fields attach_properties = NULL; - AMQP_VALUE deviceClientTypeKeyName = NULL; - AMQP_VALUE deviceClientTypeValue = NULL; + fields attach_properties; + AMQP_VALUE deviceClientTypeKeyName; + AMQP_VALUE deviceClientTypeValue; int result; // @@ -565,46 +577,49 @@ { LogError("Failed to create the map for device client type.\r\n"); } - else if ((deviceClientTypeKeyName = amqpvalue_create_symbol("com.microsoft:client-version")) == NULL) - { - LogError("Failed to create the key name for the device client type.\r\n"); - } - else if ((deviceClientTypeValue = amqpvalue_create_string(CLIENT_DEVICE_TYPE_PREFIX CLIENT_DEVICE_BACKSLASH IOTHUB_SDK_VERSION)) == NULL) - { - LogError("Failed to create the key value for the device client type.\r\n"); - } - else if ((result = amqpvalue_set_map_value(attach_properties, deviceClientTypeKeyName, deviceClientTypeValue)) != 0) + else { - LogError("Failed to set the property map for the device client type. Error code is: %d\r\n", result); + if ((deviceClientTypeKeyName = amqpvalue_create_symbol("com.microsoft:client-version")) == NULL) + { + LogError("Failed to create the key name for the device client type.\r\n"); + } + else + { + if ((deviceClientTypeValue = amqpvalue_create_string(CLIENT_DEVICE_TYPE_PREFIX CLIENT_DEVICE_BACKSLASH IOTHUB_SDK_VERSION)) == NULL) + { + LogError("Failed to create the key value for the device client type.\r\n"); + } + else + { + if ((result = amqpvalue_set_map_value(attach_properties, deviceClientTypeKeyName, deviceClientTypeValue)) != 0) + { + LogError("Failed to set the property map for the device client type. Error code is: %d\r\n", result); + } + else if ((result = link_set_attach_properties(link, attach_properties)) != 0) + { + LogError("Unable to attach the device client type to the link properties. Error code is: %d\r\n", result); + } + + amqpvalue_destroy(deviceClientTypeValue); + } + + amqpvalue_destroy(deviceClientTypeKeyName); + } + + amqpvalue_destroy(attach_properties); } - else if ((result = link_set_attach_properties(link, attach_properties)) != 0) - { - LogError("Unable to attach the device client type to the link properties. Error code is: %d\r\n", result); -} - amqpvalue_destroy(attach_properties); - amqpvalue_destroy(deviceClientTypeKeyName); - amqpvalue_destroy(deviceClientTypeValue); - } -static int destroyEventSender(AMQP_TRANSPORT_INSTANCE* transport_state) +static void destroyEventSender(AMQP_TRANSPORT_INSTANCE* transport_state) { - int result = RESULT_FAILURE; - if (transport_state->message_sender != NULL) { messagesender_destroy(transport_state->message_sender); - transport_state->message_sender = NULL; link_destroy(transport_state->sender_link); - transport_state->sender_link = NULL; - - result = RESULT_OK; } - - return result; } static int createEventSender(AMQP_TRANSPORT_INSTANCE* transport_state) @@ -616,7 +631,7 @@ AMQP_VALUE source = NULL; AMQP_VALUE target = NULL; - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_068: [IoTHubTransportAMQP_DoWork shall create the AMQP link for sending messages using source as ingress, target as the IoT hub FQDN, link name as sender-link and role as role_sender] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_068: [IoTHubTransportAMQP_DoWork shall create the AMQP link for sending messages using 'source' as "ingress", target as the IoT hub FQDN, link name as "sender-link" and role as 'role_sender'] if ((source = messaging_create_source(MESSAGE_SENDER_SOURCE_ADDRESS)) == NULL) { LogError("Failed creating AMQP messaging source attribute.\r\n"); @@ -632,7 +647,7 @@ } else { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_119: [IoTHubTransportAMQP_DoWork shall apply a default value of 65536 for the parameter Link MAX message size] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_119: [IoTHubTransportAMQP_DoWork shall apply a default value of 65536 for the parameter 'Link MAX message size'] if (link_set_max_message_size(transport_state->sender_link, MESSAGE_SENDER_MAX_LINK_SIZE) != RESULT_OK) { LogError("Failed setting AMQP link max message size.\r\n"); @@ -704,7 +719,7 @@ AMQP_VALUE source = NULL; AMQP_VALUE target = NULL; - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_074: [IoTHubTransportAMQP_DoWork shall create the AMQP link for receiving messages using source as messageReceiveAddress, target as the ingress-rx, link name as receiver-link and role as role_receiver] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_074: [IoTHubTransportAMQP_DoWork shall create the AMQP link for receiving messages using 'source' as messageReceiveAddress, target as the "ingress-rx", link name as "receiver-link" and role as 'role_receiver'] if ((source = messaging_create_source(STRING_c_str(transport_state->messageReceiveAddress))) == NULL) { LogError("Failed creating AMQP message receiver source attribute.\r\n"); @@ -726,7 +741,7 @@ } else { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_119: [IoTHubTransportAMQP_DoWork shall apply a default value of 65536 for the parameter Link MAX message size] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_119: [IoTHubTransportAMQP_DoWork shall apply a default value of 65536 for the parameter 'Link MAX message size'] if (link_set_max_message_size(transport_state->receiver_link, MESSAGE_RECEIVER_MAX_LINK_SIZE) != RESULT_OK) { LogError("Failed setting AMQP link max message size for message receiver.\r\n"); @@ -743,7 +758,7 @@ else { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_079: [IoTHubTransportAMQP_DoWork shall open the AMQP message receiver using messagereceiver_open() AMQP API, passing a callback function for handling C2D incoming messages] - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_123: [IoTHubTransportAMQP_DoWork shall create each AMQP message_receiver passing the on_message_received as the callback function] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_123: [IoTHubTransportAMQP_DoWork shall create each AMQP message_receiver passing the 'on_message_received' as the callback function] if (messagereceiver_open(transport_state->message_receiver, on_message_received, (const void*)iothub_client_handle) != RESULT_OK) { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_080: [IoTHubTransportAMQP_DoWork shall fail and return immediately if the AMQP message receiver instance fails to be opened, flagging the connection to be re-established] @@ -765,23 +780,129 @@ return result; } +static int addPropertiesTouAMQPMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, MESSAGE_HANDLE uamqp_message) +{ + int result; + MAP_HANDLE properties_map; + const char* const* propertyKeys; + const char* const* propertyValues; + size_t propertyCount; + + /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_007: [The IoTHub message properties shall be obtained by calling IoTHubMessage_Properties.] */ + properties_map = IoTHubMessage_Properties(iothub_message_handle); + if (properties_map == NULL) + { + /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */ + LogError("Failed to get property map from IoTHub message.\r\n"); + result = __LINE__; + } + /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_015: [The actual keys and values, as well as the number of properties shall be obtained by calling Map_GetInternals on the handle obtained from IoTHubMessage_Properties.] */ + else if (Map_GetInternals(properties_map, &propertyKeys, &propertyValues, &propertyCount) != MAP_OK) + { + /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */ + LogError("Failed to get the internals of the property map.\r\n"); + result = __LINE__; + } + else + { + /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_016: [If the number of properties is 0, no uAMQP map shall be created and no application properties shall be set on the uAMQP message.] */ + if (propertyCount != 0) + { + size_t i; + /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_009: [The uAMQP map shall be created by calling amqpvalue_create_map.] */ + AMQP_VALUE uamqp_map = amqpvalue_create_map(); + if (uamqp_map == NULL) + { + /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */ + LogError("Failed to create uAMQP map for the properties.\r\n"); + result = __LINE__; + } + else + { + for (i = 0; i < propertyCount; i++) + { + /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_010: [A key uAMQP value shall be created by using amqpvalue_create_string.] */ + AMQP_VALUE map_key_value = amqpvalue_create_string(propertyKeys[i]); + if (map_key_value == NULL) + { + /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */ + LogError("Failed to create uAMQP property key value.\r\n"); + break; + } + + /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_011: [A value uAMQP value shall be created by using amqpvalue_create_string.] */ + AMQP_VALUE map_value_value = amqpvalue_create_string(propertyValues[i]); + if (map_value_value == NULL) + { + amqpvalue_destroy(map_key_value); + /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */ + LogError("Failed to create uAMQP property key value.\r\n"); + break; + } + + /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_008: [All properties shall be transferred to a uAMQP map.] */ + /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_012: [The key/value pair for the property shall be set into the uAMQP property map by calling amqpvalue_map_set_value.] */ + if (amqpvalue_set_map_value(uamqp_map, map_key_value, map_value_value) != 0) + { + amqpvalue_destroy(map_key_value); + amqpvalue_destroy(map_value_value); + /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */ + LogError("Failed to create uAMQP property key value.\r\n"); + break; + } + + amqpvalue_destroy(map_key_value); + amqpvalue_destroy(map_value_value); + } + + if (i < propertyCount) + { + result = __LINE__; + } + else + { + /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_013: [After all properties have been filled in the uAMQP map, the uAMQP properties map shall be set on the uAMQP message by calling message_set_application_properties.] */ + if (message_set_application_properties(uamqp_message, uamqp_map) != 0) + { + /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */ + LogError("Failed to transfer the message properties to the uAMQP message.\r\n"); + result = __LINE__; + } + else + { + result = 0; + } + } + + amqpvalue_destroy(uamqp_map); + } + } + else + { + result = 0; + } + } + + return result; +} + static int sendPendingEvents(AMQP_TRANSPORT_INSTANCE* transport_state) { int result = RESULT_OK; - IOTHUB_MESSAGE_LIST* message = NULL; + IOTHUB_MESSAGE_LIST* message; while ((message = getNextEventToSend(transport_state)) != NULL) { result = RESULT_FAILURE; IOTHUBMESSAGE_CONTENT_TYPE contentType = IoTHubMessage_GetContentType(message->messageHandle); - const unsigned char* messageContent = NULL; - size_t messageContentSize = 0; + const unsigned char* messageContent; + size_t messageContentSize; MESSAGE_HANDLE amqp_message; bool is_message_error = false; EVENT_TRACKER* event_tracker; - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_086: [IoTHubTransportAMQP_DoWork shall move queued events to an in-progress list right before processing them for sending] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_086: [IoTHubTransportAMQP_DoWork shall move queued events to an "in-progress" list right before processing them for sending] if ((event_tracker = trackEvent(message, transport_state)) == NULL) { LogError("Failed tracking the event to be sent.\r\n"); @@ -796,7 +917,7 @@ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_089: [If the event contains a message of type IOTHUBMESSAGE_STRING, IoTHubTransportAMQP_DoWork shall obtain its char* representation using IoTHubMessage_GetString()] // Codes_SRS_IOTHUBTRANSPORTAMQP_09_090: [If the event contains a message of type IOTHUBMESSAGE_STRING, IoTHubTransportAMQP_DoWork shall obtain the size of its char* representation using strlen()] else if (contentType == IOTHUBMESSAGE_STRING && - ((messageContent = IoTHubMessage_GetString(message->messageHandle)) == NULL || (messageContentSize = strlen(messageContent)) <= 0)) + ((messageContent = IoTHubMessage_GetString(message->messageHandle)) == NULL)) { LogError("Failed getting the STRING representation of the event content to be sent.\r\n"); is_message_error = true; @@ -807,30 +928,46 @@ LogError("Cannot send events with content type IOTHUBMESSAGE_UNKNOWN.\r\n"); is_message_error = true; } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_093: [IoTHubTransportAMQP_DoWork shall create an amqp message using message_create() AMQP API] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_093: [IoTHubTransportAMQP_DoWork shall create an amqp message using message_create() uAMQP API] else if ((amqp_message = message_create()) == NULL) { LogError("Failed allocating the AMQP message for sending the event.\r\n"); } else { - BINARY_DATA binary_data = { messageContent, messageContentSize }; + BINARY_DATA binary_data; + + if (contentType == IOTHUBMESSAGE_STRING) + { + messageContentSize = strlen(messageContent); + } + + binary_data.bytes = messageContent; + binary_data.length = messageContentSize; - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_095: [IoTHubTransportAMQP_DoWork shall set the AMQP message body using message_add_body_amqp_data() AMQP API] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_095: [IoTHubTransportAMQP_DoWork shall set the AMQP message body using message_add_body_amqp_data() uAMQP API] if (message_add_body_amqp_data(amqp_message, binary_data) != RESULT_OK) { LogError("Failed setting the body of the AMQP message.\r\n"); } else { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_097: [IoTHubTransportAMQP_DoWork shall pass the encoded AMQP message to AMQP for sending (along with on_message_send_complete callback) using messagesender_send()] - if (messagesender_send(transport_state->message_sender, amqp_message, on_message_send_complete, event_tracker) != RESULT_OK) + if (addPropertiesTouAMQPMessage(message->messageHandle, amqp_message) != 0) { - LogError("Failed sending the AMQP message.\r\n"); + /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */ + is_message_error = true; } else { - result = RESULT_OK; + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_097: [IoTHubTransportAMQP_DoWork shall pass the encoded AMQP message to AMQP for sending (along with on_message_send_complete callback) using messagesender_send()] + if (messagesender_send(transport_state->message_sender, amqp_message, on_message_send_complete, event_tracker) != RESULT_OK) + { + LogError("Failed sending the AMQP message.\r\n"); + } + else + { + result = RESULT_OK; + } } } } @@ -888,7 +1025,7 @@ { AMQP_TRANSPORT_INSTANCE* transport_state = NULL; bool cleanup_required = false; - int deviceIdLength = -1; + size_t deviceIdLength; // Codes_SRS_IOTHUBTRANSPORTAMQP_09_005: [If parameter config (or its fields) is NULL then IoTHubTransportAMQP_Create shall fail and return NULL.] if (config == NULL || config->upperConfig == NULL || config->waitingToSend == NULL) @@ -940,7 +1077,7 @@ } else { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_009: [IoTHubTransportAMQP_Create shall fail and return NULL if memory allocation of the transports internal state structure fails.] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_009: [IoTHubTransportAMQP_Create shall fail and return NULL if memory allocation of the transport's internal state structure fails.] transport_state = (AMQP_TRANSPORT_INSTANCE*)malloc(sizeof(AMQP_TRANSPORT_INSTANCE)); if (transport_state == NULL) @@ -951,7 +1088,7 @@ { transport_state->iotHubHostFqdn = NULL; transport_state->iotHubPort = DEFAULT_IOTHUB_AMQP_PORT; - transport_state->trusted_certificates = NULL; + transport_state->trusted_certificates = NULL; transport_state->deviceKey = NULL; transport_state->devicesPath = NULL; transport_state->messageReceiveAddress = NULL; @@ -975,7 +1112,8 @@ transport_state->sender_link = NULL; transport_state->session = NULL; transport_state->tls_io = NULL; - transport_state->tls_io_transport_provider = getTLSIOTransport; + transport_state->tls_io_transport_provider = getTLSIOTransport; + transport_state->isRegistered = false; transport_state->waitingToSend = config->waitingToSend; DList_InitializeListHead(&transport_state->inProgress); @@ -986,7 +1124,7 @@ LogError("Failed to set transport_state->iotHubHostFqdn.\r\n"); cleanup_required = true; } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_012: [IoTHubTransportAMQP_Create shall create an immutable string, referred to as devicesPath, from the following parts: host_fqdn + /devices/ + deviceId.] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_012: [IoTHubTransportAMQP_Create shall create an immutable string, referred to as devicesPath, from the following parts: host_fqdn + "/devices/" + deviceId.] else if ((transport_state->devicesPath = concat3Params(STRING_c_str(transport_state->iotHubHostFqdn), "/devices/", config->upperConfig->deviceId)) == NULL) { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_013: [If creating devicesPath fails for any reason then IoTHubTransportAMQP_Create shall fail and return NULL.] @@ -1014,7 +1152,7 @@ LogError("Failed to allocate transport_state->sasTokenKeyName.\r\n"); cleanup_required = true; } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_018: [IoTHubTransportAMQP_Create shall store a copy of config->deviceKey (passed by upper layer) into the transports own deviceKey field] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_018: [IoTHubTransportAMQP_Create shall store a copy of config->deviceKey (passed by upper layer) into the transport's own deviceKey field] else if ((transport_state->deviceKey = STRING_new()) == NULL || STRING_copy(transport_state->deviceKey, config->upperConfig->deviceKey) != 0) { @@ -1127,7 +1265,7 @@ trigger_connection_retry = true; } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_081: [IoTHubTransportAMQP_DoWork shall put a new SAS token if the one has not been out already, or if the previous one failed to be put due to timeout of cbs_put_token().] - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_082: [IoTHubTransportAMQP_DoWork shall refresh the SAS token if the current token has been used for more than sas_token_refresh_time milliseconds] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_082: [IoTHubTransportAMQP_DoWork shall refresh the SAS token if the current token has been used for more than 'sas_token_refresh_time' milliseconds] else if ((transport_state->cbs_state == CBS_STATE_IDLE || isSasTokenRefreshRequired(transport_state)) && startAuthentication(transport_state) != RESULT_OK) { @@ -1135,7 +1273,7 @@ LogError("Failed authenticating AMQP connection within CBS.\r\n"); trigger_connection_retry = true; } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_084: [IoTHubTransportAMQP_DoWork shall wait for cbs_request_timeout milliseconds for the cbs_put_token() to complete before failing due to timeout] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_084: [IoTHubTransportAMQP_DoWork shall wait for 'cbs_request_timeout' milliseconds for the cbs_put_token() to complete before failing due to timeout] else if (transport_state->cbs_state == CBS_STATE_AUTH_IN_PROGRESS && verifyAuthenticationTimeout(transport_state) == RESULT_TIMEOUT) { @@ -1169,7 +1307,6 @@ else if (sendPendingEvents(transport_state) != RESULT_OK) { LogError("AMQP transport failed sending events.\r\n"); - trigger_connection_retry = true; } } @@ -1187,7 +1324,7 @@ } } -static int IoTHubTransportAMQP_Subscribe(TRANSPORT_HANDLE handle) +static int IoTHubTransportAMQP_Subscribe(IOTHUB_DEVICE_HANDLE handle) { int result; @@ -1208,7 +1345,7 @@ return result; } -static void IoTHubTransportAMQP_Unsubscribe(TRANSPORT_HANDLE handle) +static void IoTHubTransportAMQP_Unsubscribe(IOTHUB_DEVICE_HANDLE handle) { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_039: [IoTHubTransportAMQP_Unsubscribe shall fail if the transport handle parameter received is NULL.] if (handle == NULL) @@ -1223,7 +1360,7 @@ } } -static IOTHUB_CLIENT_RESULT IoTHubTransportAMQP_GetSendStatus(TRANSPORT_HANDLE handle, IOTHUB_CLIENT_STATUS *iotHubClientStatus) +static IOTHUB_CLIENT_RESULT IoTHubTransportAMQP_GetSendStatus(IOTHUB_DEVICE_HANDLE handle, IOTHUB_CLIENT_STATUS *iotHubClientStatus) { IOTHUB_CLIENT_RESULT result; @@ -1279,50 +1416,116 @@ { AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)handle; - if (strcmp("trusted_certificates", option) == 0) - { - transport_state->trusted_certificates = (char*)value; - result = IOTHUB_CLIENT_OK; - } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_048: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "sas_token_lifetime", returning IOTHUB_CLIENT_OK] - else if (strcmp("sas_token_lifetime", option) == 0) - { - transport_state->sas_token_lifetime = *((size_t*)value); - result = IOTHUB_CLIENT_OK; - } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_049: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "sas_token_refresh_time", returning IOTHUB_CLIENT_OK] - else if (strcmp("sas_token_refresh_time", option) == 0) - { - transport_state->sas_token_refresh_time = *((size_t*)value); - result = IOTHUB_CLIENT_OK; - } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_148: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "cbs_request_timeout", returning IOTHUB_CLIENT_OK] - else if (strcmp("cbs_request_timeout", option) == 0) - { - transport_state->cbs_request_timeout = *((size_t*)value); - result = IOTHUB_CLIENT_OK; - } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_149: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "message_send_timeout", returning IOTHUB_CLIENT_OK] - else if (strcmp("message_send_timeout", option) == 0) - { - transport_state->message_send_timeout = *((size_t*)value); - result = IOTHUB_CLIENT_OK; - } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_047: [If optionName is not an option supported then IotHubTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.] - else - { - result = IOTHUB_CLIENT_INVALID_ARG; - LogError("Invalid option (%s) passed to uAMQP transport SetOption()\r\n", option); - } - } + if (strcmp("trusted_certificates", option) == 0) + { + transport_state->trusted_certificates = (char*)value; + result = IOTHUB_CLIENT_OK; + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_048: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "sas_token_lifetime", returning IOTHUB_CLIENT_OK] + else if (strcmp("sas_token_lifetime", option) == 0) + { + transport_state->sas_token_lifetime = *((size_t*)value); + result = IOTHUB_CLIENT_OK; + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_049: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "sas_token_refresh_time", returning IOTHUB_CLIENT_OK] + else if (strcmp("sas_token_refresh_time", option) == 0) + { + transport_state->sas_token_refresh_time = *((size_t*)value); + result = IOTHUB_CLIENT_OK; + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_148: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "cbs_request_timeout", returning IOTHUB_CLIENT_OK] + else if (strcmp("cbs_request_timeout", option) == 0) + { + transport_state->cbs_request_timeout = *((size_t*)value); + result = IOTHUB_CLIENT_OK; + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_149: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "message_send_timeout", returning IOTHUB_CLIENT_OK] + else if (strcmp("message_send_timeout", option) == 0) + { + transport_state->message_send_timeout = *((size_t*)value); + result = IOTHUB_CLIENT_OK; + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_047: [If optionName is not an option supported then IotHubTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.] + else + { + result = IOTHUB_CLIENT_INVALID_ARG; + LogError("Invalid option (%s) passed to uAMQP transport SetOption()\r\n", option); + } + } return result; } +static IOTHUB_DEVICE_HANDLE IoTHubTransportAMQ_Register(TRANSPORT_HANDLE handle, const char* deviceId, const char* deviceKey, IOTHUB_CLIENT_LL_HANDLE iotHubClientHandle, PDLIST_ENTRY waitingToSend) +{ + IOTHUB_DEVICE_HANDLE result; + // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_001: [IoTHubTransportAMQ_Register shall return NULL if deviceId, deviceKey or waitingToSend are NULL.] + // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_005: [IoTHubTransportAMQ_Register shall return NULL if the TRANSPORT_HANDLE is NULL.] + if ((handle ==NULL) || (deviceId == NULL) || (deviceKey == NULL) || (waitingToSend == NULL)) + { + result = NULL; + } + else + { + AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)handle; + + STRING_HANDLE devicesPath = concat3Params(STRING_c_str(transport_state->iotHubHostFqdn), "/devices/", deviceId); + if (devicesPath == NULL) + { + LogError("Could not create a comparison string"); + result = NULL; + } + else + { + // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_002: [IoTHubTransportAMQ_Register shall return NULL if deviceId or deviceKey do not match the deviceId and deviceKey passed in during IoTHubTransportAMQP_Create.] + if (strcmp(STRING_c_str(transport_state->devicesPath), STRING_c_str(devicesPath)) != 0) + { + LogError("Attemping to add new device to AMQP transport, not allowed."); + result = NULL; + } + else if (strcmp(STRING_c_str(transport_state->deviceKey), deviceKey) != 0) + { + LogError("Attemping to add new device to AMQP transport, not allowed."); + result = NULL; + } + else + { + if (transport_state->isRegistered == true) + { + LogError("Transport already has device registered by id: [%s]", deviceId); + result = NULL; + } + else + { + transport_state->isRegistered = true; + // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_003: [IoTHubTransportAMQ_Register shall return the TRANSPORT_HANDLE as the IOTHUB_DEVICE_HANDLE.] + result = (IOTHUB_DEVICE_HANDLE)handle; + } + } + STRING_delete(devicesPath); + } + } + + return result; +} + +// Codes_SRS_IOTHUBTRANSPORTUAMQP_17_004: [IoTHubTransportAMQ_Unregister shall return.] +static void IoTHubTransportAMQ_Unregister(IOTHUB_DEVICE_HANDLE deviceHandle) +{ + if (deviceHandle != NULL) + { + AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)deviceHandle; + + transport_state->isRegistered = false; + } +} + static TRANSPORT_PROVIDER thisTransportProvider = { IoTHubTransportAMQP_SetOption, IoTHubTransportAMQP_Create, IoTHubTransportAMQP_Destroy, + IoTHubTransportAMQ_Register, + IoTHubTransportAMQ_Unregister, IoTHubTransportAMQP_Subscribe, IoTHubTransportAMQP_Unsubscribe, IoTHubTransportAMQP_DoWork,