Microsoft Azure IoTHub client AMQP transport

Dependents:   sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more

This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks

Revision:
11:62d7b956e76e
Parent:
10:75c5e0d8537d
Child:
12:841a4c36bd36
--- a/iothubtransportamqp.c	Fri Mar 11 16:59:34 2016 -0800
+++ b/iothubtransportamqp.c	Fri Mar 25 15:59:44 2016 -0700
@@ -53,9 +53,9 @@
 #define MESSAGE_RECEIVER_MAX_LINK_SIZE 65536
 #define MESSAGE_SENDER_LINK_NAME "sender-link"
 #define MESSAGE_SENDER_SOURCE_ADDRESS "ingress"
-#define MESSAGE_SENDER_MAX_LINK_SIZE 65536
+#define MESSAGE_SENDER_MAX_LINK_SIZE UINT64_MAX
 
-typedef XIO_HANDLE(*TLS_IO_TRANSPORT_PROVIDER)(const char* fqdn, int port, const char* certificates);
+typedef XIO_HANDLE(*TLS_IO_TRANSPORT_PROVIDER)(const char* fqdn, int port);
 
 typedef enum CBS_STATE_TAG
 {
@@ -70,8 +70,6 @@
     STRING_HANDLE iotHubHostFqdn;
     // AMQP port of the IoT Hub.
     int iotHubPort;
-    // Certificates to be used by the TLS I/O.
-    char* trusted_certificates;
     // Key associated to the device to be used.
     STRING_HANDLE deviceKey;
     // Address to which the transport will connect to and send events.
@@ -94,7 +92,7 @@
     size_t connection_timeout;
     // Saved reference to the IoTHub LL Client.
     IOTHUB_CLIENT_LL_HANDLE iothub_client_handle;
-    
+
     // TSL I/O transport.
     XIO_HANDLE tls_io;
     // Pointer to the function that creates the TLS I/O (internal use only).
@@ -298,12 +296,12 @@
     // Codes_SRS_IOTHUBTRANSPORTAMQP_09_104: [The callback 'on_message_received' shall invoke IoTHubClient_LL_MessageCallback() passing the client and the incoming message handles as parameters] 
     IOTHUB_MESSAGE_HANDLE iothub_message = NULL;
     MESSAGE_BODY_TYPE body_type;
-    
+
     if (message_get_body_type(message, &body_type) != 0)
     {
         LogError("Failed to get the type of the message received by the transport.\r\n");
     }
-    else 
+    else
     {
         if (body_type == MESSAGE_BODY_TYPE_DATA)
         {
@@ -318,7 +316,7 @@
             }
         }
     }
-    
+
     if (iothub_message == NULL)
     {
         disposition_result = IOTHUBMESSAGE_REJECTED;
@@ -343,12 +341,13 @@
         {
             result = messaging_delivery_rejected("Rejected by application", "Rejected by application");
         }
+        IoTHubMessage_Destroy(iothub_message);
     }
 
     return result;
 }
 
-static XIO_HANDLE getTLSIOTransport(const char* fqdn, int port, const char* certificates)
+static XIO_HANDLE getTLSIOTransport(const char* fqdn, int port)
 {
     TLSIO_CONFIG tls_io_config = { fqdn, port };
     const IO_INTERFACE_DESCRIPTION* io_interface_description = platform_get_default_tlsio();
@@ -399,7 +398,7 @@
 {
     (void)previous_amqp_management_state;
     AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)context;
-    
+
     if (transport_state != NULL)
     {
         transport_state->connection_state = new_amqp_management_state;
@@ -412,7 +411,7 @@
 
     // Codes_SRS_IOTHUBTRANSPORTAMQP_09_110: [IoTHubTransportAMQP_DoWork shall create the TLS IO using transport_state->io_transport_provider callback function] 
     if (transport_state->tls_io == NULL &&
-        (transport_state->tls_io = transport_state->tls_io_transport_provider(STRING_c_str(transport_state->iotHubHostFqdn), transport_state->iotHubPort, transport_state->trusted_certificates)) == NULL)
+        (transport_state->tls_io = transport_state->tls_io_transport_provider(STRING_c_str(transport_state->iotHubHostFqdn), transport_state->iotHubPort)) == NULL)
     {
         // Codes_SRS_IOTHUBTRANSPORTAMQP_09_136: [If transport_state->io_transport_provider_callback fails, IoTHubTransportAMQP_DoWork shall fail and return immediately]
         result = RESULT_FAILURE;
@@ -499,8 +498,8 @@
     int result;
 
     size_t sas_token_create_time = getSecondsSinceEpoch(); // I.e.: NOW, in seconds since epoch.
-    
-    // Codes_SRS_IOTHUBTRANSPORTAMQP_09_083: [Each new SAS token created by the transport shall be valid for up to 'sas_token_lifetime' milliseconds from the time of creation]
+
+                                                           // Codes_SRS_IOTHUBTRANSPORTAMQP_09_083: [Each new SAS token created by the transport shall be valid for up to 'sas_token_lifetime' milliseconds from the time of creation]
     size_t new_expiry_time = sas_token_create_time + (transport_state->sas_token_lifetime / 1000);
 
     STRING_HANDLE newSASToken = SASToken_Create(transport_state->deviceKey, transport_state->devicesPath, transport_state->sasTokenKeyName, new_expiry_time);
@@ -681,7 +680,7 @@
         if (target != NULL)
             amqpvalue_destroy(target);
     }
-    
+
     return result;
 }
 
@@ -894,7 +893,7 @@
     while ((message = getNextEventToSend(transport_state)) != NULL)
     {
         result = RESULT_FAILURE;
-        
+
         IOTHUBMESSAGE_CONTENT_TYPE contentType = IoTHubMessage_GetContentType(message->messageHandle);
         const unsigned char* messageContent;
         size_t messageContentSize;
@@ -908,7 +907,7 @@
             LogError("Failed tracking the event to be sent.\r\n");
         }
         // Codes_SRS_IOTHUBTRANSPORTAMQP_09_087: [If the event contains a message of type IOTHUBMESSAGE_BYTEARRAY, IoTHubTransportAMQP_DoWork shall obtain its char* representation and size using IoTHubMessage_GetByteArray()] 
-        else if (contentType == IOTHUBMESSAGE_BYTEARRAY && 
+        else if (contentType == IOTHUBMESSAGE_BYTEARRAY &&
             IoTHubMessage_GetByteArray(message->messageHandle, &messageContent, &messageContentSize) != IOTHUB_MESSAGE_OK)
         {
             LogError("Failed getting the BYTE array representation of the event content to be sent.\r\n");
@@ -944,7 +943,7 @@
 
             binary_data.bytes = messageContent;
             binary_data.length = messageContentSize;
-            
+
             // Codes_SRS_IOTHUBTRANSPORTAMQP_09_095: [IoTHubTransportAMQP_DoWork shall set the AMQP message body using message_add_body_amqp_data() uAMQP API] 
             if (message_add_body_amqp_data(amqp_message, binary_data) != RESULT_OK)
             {
@@ -1021,7 +1020,7 @@
 
 // API functions
 
-static TRANSPORT_HANDLE IoTHubTransportAMQP_Create(const IOTHUBTRANSPORT_CONFIG* config)
+static TRANSPORT_LL_HANDLE IoTHubTransportAMQP_Create(const IOTHUBTRANSPORT_CONFIG* config)
 {
     AMQP_TRANSPORT_INSTANCE* transport_state = NULL;
     bool cleanup_required = false;
@@ -1088,7 +1087,6 @@
         {
             transport_state->iotHubHostFqdn = NULL;
             transport_state->iotHubPort = DEFAULT_IOTHUB_AMQP_PORT;
-            transport_state->trusted_certificates = NULL;
             transport_state->deviceKey = NULL;
             transport_state->devicesPath = NULL;
             transport_state->messageReceiveAddress = NULL;
@@ -1154,13 +1152,13 @@
             }
             // Codes_SRS_IOTHUBTRANSPORTAMQP_09_018: [IoTHubTransportAMQP_Create shall store a copy of config->deviceKey (passed by upper layer) into the transport's own deviceKey field] 
             else if ((transport_state->deviceKey = STRING_new()) == NULL ||
-                    STRING_copy(transport_state->deviceKey, config->upperConfig->deviceKey) != 0)
+                STRING_copy(transport_state->deviceKey, config->upperConfig->deviceKey) != 0)
             {
                 // Codes_SRS_IOTHUBTRANSPORTAMQP_09_019: [If IoTHubTransportAMQP_Create fails to copy config->deviceKey, the function shall fail and return NULL.]
                 LogError("Failed to allocate transport_state->deviceKey.\r\n");
                 cleanup_required = true;
             }
-            else 
+            else
             {
                 // Codes_SRS_IOTHUBTRANSPORTAMQP_09_020: [IoTHubTransportAMQP_Create shall set parameter transport_state->sas_token_lifetime with the default value of 3600000 (milliseconds).]
                 transport_state->sas_token_lifetime = DEFAULT_SAS_TOKEN_LIFETIME_MS;
@@ -1200,12 +1198,12 @@
     return transport_state;
 }
 
-static void IoTHubTransportAMQP_Destroy(TRANSPORT_HANDLE handle)
+static void IoTHubTransportAMQP_Destroy(TRANSPORT_LL_HANDLE handle)
 {
     if (handle != NULL)
     {
         AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)handle;
-        
+
         // Codes_SRS_IOTHUBTRANSPORTAMQP_09_024: [IoTHubTransportAMQP_Destroy shall destroy the AMQP message_sender.]
         // Codes_SRS_IOTHUBTRANSPORTAMQP_09_029 : [IoTHubTransportAMQP_Destroy shall destroy the AMQP link.]
         destroyEventSender(transport_state);
@@ -1237,7 +1235,7 @@
     }
 }
 
-static void IoTHubTransportAMQP_DoWork(TRANSPORT_HANDLE handle, IOTHUB_CLIENT_LL_HANDLE iotHubClientHandle)
+static void IoTHubTransportAMQP_DoWork(TRANSPORT_LL_HANDLE handle, IOTHUB_CLIENT_LL_HANDLE iotHubClientHandle)
 {
     // Codes_SRS_IOTHUBTRANSPORTAMQP_09_051: [IoTHubTransportAMQP_DoWork shall fail and return immediately if the transport handle parameter is NULL] 
     if (handle == NULL)
@@ -1267,15 +1265,15 @@
         // Codes_SRS_IOTHUBTRANSPORTAMQP_09_081: [IoTHubTransportAMQP_DoWork shall put a new SAS token if the one has not been out already, or if the previous one failed to be put due to timeout of cbs_put_token().]
         // Codes_SRS_IOTHUBTRANSPORTAMQP_09_082: [IoTHubTransportAMQP_DoWork shall refresh the SAS token if the current token has been used for more than 'sas_token_refresh_time' milliseconds]
         else if ((transport_state->cbs_state == CBS_STATE_IDLE || isSasTokenRefreshRequired(transport_state)) &&
-                startAuthentication(transport_state) != RESULT_OK)
+            startAuthentication(transport_state) != RESULT_OK)
         {
             // Codes_SRS_IOTHUBTRANSPORTAMQP_09_146: [If the SAS token fails to be sent to CBS (cbs_put_token), IoTHubTransportAMQP_DoWork shall fail and exit immediately]
             LogError("Failed authenticating AMQP connection within CBS.\r\n");
             trigger_connection_retry = true;
         }
         // Codes_SRS_IOTHUBTRANSPORTAMQP_09_084: [IoTHubTransportAMQP_DoWork shall wait for 'cbs_request_timeout' milliseconds for the cbs_put_token() to complete before failing due to timeout]
-        else if (transport_state->cbs_state == CBS_STATE_AUTH_IN_PROGRESS &&  
-                verifyAuthenticationTimeout(transport_state) == RESULT_TIMEOUT)
+        else if (transport_state->cbs_state == CBS_STATE_AUTH_IN_PROGRESS &&
+            verifyAuthenticationTimeout(transport_state) == RESULT_TIMEOUT)
         {
             LogError("AMQP transport authentication timed out.\r\n");
             trigger_connection_retry = true;
@@ -1327,7 +1325,7 @@
 static int IoTHubTransportAMQP_Subscribe(IOTHUB_DEVICE_HANDLE handle)
 {
     int result;
-    
+
     // Codes_SRS_IOTHUBTRANSPORTAMQP_09_037: [IoTHubTransportAMQP_Subscribe shall fail if the transport handle parameter received is NULL.] 
     if (handle == NULL)
     {
@@ -1396,7 +1394,7 @@
     return result;
 }
 
-static IOTHUB_CLIENT_RESULT IoTHubTransportAMQP_SetOption(TRANSPORT_HANDLE handle, const char* option, const void* value)
+static IOTHUB_CLIENT_RESULT IoTHubTransportAMQP_SetOption(TRANSPORT_LL_HANDLE handle, const char* option, const void* value)
 {
     IOTHUB_CLIENT_RESULT result;
 
@@ -1416,13 +1414,8 @@
     {
         AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)handle;
 
-        if (strcmp("trusted_certificates", option) == 0)
-        {
-            transport_state->trusted_certificates = (char*)value;
-            result = IOTHUB_CLIENT_OK;
-        }
         // Codes_SRS_IOTHUBTRANSPORTAMQP_09_048: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "sas_token_lifetime", returning IOTHUB_CLIENT_OK] 
-        else if (strcmp("sas_token_lifetime", option) == 0)
+        if (strcmp("sas_token_lifetime", option) == 0)
         {
             transport_state->sas_token_lifetime = *((size_t*)value);
             result = IOTHUB_CLIENT_OK;
@@ -1445,23 +1438,40 @@
             transport_state->message_send_timeout = *((size_t*)value);
             result = IOTHUB_CLIENT_OK;
         }
-        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_047: [If optionName is not an option supported then IotHubTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.] 
+        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_047: [If the option name does not match one of the options handled by this module, then IoTHubTransportAMQP_SetOption shall get  the handle to the XIO and invoke the xio_setoption passing down the option name and value parameters.] 
         else
         {
-            result = IOTHUB_CLIENT_INVALID_ARG;
-            LogError("Invalid option (%s) passed to uAMQP transport SetOption()\r\n", option);
+            if (transport_state->tls_io == NULL &&
+                (transport_state->tls_io = transport_state->tls_io_transport_provider(STRING_c_str(transport_state->iotHubHostFqdn), transport_state->iotHubPort)) == NULL)
+            {
+                result = IOTHUB_CLIENT_ERROR;
+                LogError("Failed to obtain a TLS I/O transport layer.\r\n");
+            }
+            else
+            {
+                /* Codes_SRS_IOTHUBTRANSPORTUAMQP_03_001: [If xio_setoption fails, IoTHubTransportAMQP_SetOption shall return IOTHUB_CLIENT_ERROR.] */
+                if (xio_setoption(transport_state->tls_io, option, value) == 0)
+                {
+                    result = IOTHUB_CLIENT_OK;
+                }
+                else
+                {
+                    result = IOTHUB_CLIENT_ERROR;
+                    LogError("Invalid option (%s) passed to uAMQP transport SetOption()\r\n", option);
+                }
+            }
         }
     }
 
     return result;
 }
 
-static IOTHUB_DEVICE_HANDLE IoTHubTransportAMQ_Register(TRANSPORT_HANDLE handle, const char* deviceId, const char* deviceKey, IOTHUB_CLIENT_LL_HANDLE iotHubClientHandle, PDLIST_ENTRY waitingToSend)
+static IOTHUB_DEVICE_HANDLE IoTHubTransportAMQ_Register(TRANSPORT_LL_HANDLE handle, const char* deviceId, const char* deviceKey, IOTHUB_CLIENT_LL_HANDLE iotHubClientHandle, PDLIST_ENTRY waitingToSend)
 {
     IOTHUB_DEVICE_HANDLE result;
     // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_001: [IoTHubTransportAMQ_Register shall return NULL if deviceId, deviceKey or waitingToSend are NULL.] 
-    // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_005: [IoTHubTransportAMQ_Register shall return NULL if the TRANSPORT_HANDLE is NULL.]
-    if ((handle ==NULL) || (deviceId == NULL) || (deviceKey == NULL) || (waitingToSend == NULL))
+    // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_005: [IoTHubTransportAMQ_Register shall return NULL if the TRANSPORT_LL_HANDLE is NULL.]
+    if ((handle == NULL) || (deviceId == NULL) || (deviceKey == NULL) || (waitingToSend == NULL))
     {
         result = NULL;
     }
@@ -1498,7 +1508,7 @@
                 else
                 {
                     transport_state->isRegistered = true;
-                    // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_003: [IoTHubTransportAMQ_Register shall return the TRANSPORT_HANDLE as the IOTHUB_DEVICE_HANDLE.] 
+                    // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_003: [IoTHubTransportAMQ_Register shall return the TRANSPORT_LL_HANDLE as the IOTHUB_DEVICE_HANDLE.] 
                     result = (IOTHUB_DEVICE_HANDLE)handle;
                 }
             }
@@ -1521,14 +1531,14 @@
 }
 
 static TRANSPORT_PROVIDER thisTransportProvider = {
-    IoTHubTransportAMQP_SetOption, 
-    IoTHubTransportAMQP_Create, 
-    IoTHubTransportAMQP_Destroy, 
+    IoTHubTransportAMQP_SetOption,
+    IoTHubTransportAMQP_Create,
+    IoTHubTransportAMQP_Destroy,
     IoTHubTransportAMQ_Register,
     IoTHubTransportAMQ_Unregister,
-    IoTHubTransportAMQP_Subscribe, 
-    IoTHubTransportAMQP_Unsubscribe, 
-    IoTHubTransportAMQP_DoWork, 
+    IoTHubTransportAMQP_Subscribe,
+    IoTHubTransportAMQP_Unsubscribe,
+    IoTHubTransportAMQP_DoWork,
     IoTHubTransportAMQP_GetSendStatus
 };