Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependents: sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more
Revision 10:75c5e0d8537d, committed 2016-03-11
- Comitter:
- Azure.IoT Build
- Date:
- Fri Mar 11 16:59:34 2016 -0800
- Parent:
- 9:8abcd13782f4
- Child:
- 11:62d7b956e76e
- Commit message:
- 1.0.2
Changed in this revision
| iothubtransportamqp.c | Show annotated file Show diff for this revision Revisions of this file |
--- a/iothubtransportamqp.c Tue Feb 16 14:24:48 2016 -0800
+++ b/iothubtransportamqp.c Fri Mar 11 16:59:34 2016 -0800
@@ -5,11 +5,15 @@
#ifdef _CRTDBG_MAP_ALLOC
#include <crtdbg.h>
#endif
+#include <stdint.h>
+#include <time.h>
+#include <limits.h>
#include "gballoc.h"
#include "cbs.h"
#include "link.h"
#include "message.h"
+#include "amqpvalue.h"
#include "message_receiver.h"
#include "message_sender.h"
#include "messaging.h"
@@ -35,7 +39,6 @@
#define RESULT_FAILURE 1
#define RESULT_TIMEOUT 2
-#define TIME_MAX SIZE_MAX
#define RFC1035_MAX_FQDN_LENGTH 255
#define DEFAULT_IOTHUB_AMQP_PORT 5671
#define DEFAULT_SAS_TOKEN_LIFETIME_MS 3600000
@@ -43,7 +46,7 @@
#define DEFAULT_MESSAGE_SEND_TIMEOUT_MS 300000
#define CBS_AUDIENCE "servicebus.windows.net:sastoken"
#define DEFAULT_CONTAINER_ID "default_container_id"
-#define DEFAULT_INCOMING_WINDOW_SIZE SIZE_MAX
+#define DEFAULT_INCOMING_WINDOW_SIZE UINT_MAX
#define DEFAULT_OUTGOING_WINDOW_SIZE 100
#define MESSAGE_RECEIVER_LINK_NAME "receiver-link"
#define MESSAGE_RECEIVER_TARGET_ADDRESS "ingress-rx"
@@ -63,71 +66,73 @@
typedef struct AMQP_TRANSPORT_STATE_TAG
{
- // FQDN of the IoT Hub.
- STRING_HANDLE iotHubHostFqdn;
- // AMQP port of the IoT Hub.
- int iotHubPort;
- // Certificates to be used by the TLS I/O.
- char* trusted_certificates;
- // Key associated to the device to be used.
- STRING_HANDLE deviceKey;
- // Address to which the transport will connect to and send events.
- STRING_HANDLE targetAddress;
- // Address to which the transport will connect to and receive messages from.
- STRING_HANDLE messageReceiveAddress;
- // A component of the SAS token. Currently this must be an empty string.
- STRING_HANDLE sasTokenKeyName;
- // Internal parameter that identifies the current logical device within the service.
- STRING_HANDLE devicesPath;
- // How long a SAS token created by the transport is valid, in milliseconds.
- size_t sas_token_lifetime;
- // Maximum period of time for the transport to wait before refreshing the SAS token it created previously, in milliseconds.
- size_t sas_token_refresh_time;
- // Maximum time the transport waits for uAMQP cbs_put_token() to complete before marking it a failure, in milliseconds.
- size_t cbs_request_timeout;
- // Maximum time the transport waits for an event to be sent before marking it a failure, in milliseconds.
- size_t message_send_timeout;
- // Maximum time for the connection establishment/retry logic should wait for a connection to succeed, in milliseconds.
- size_t connection_timeout;
- // Saved reference to the IoTHub LL Client.
- IOTHUB_CLIENT_LL_HANDLE iothub_client_handle;
-
- // TSL I/O transport.
- XIO_HANDLE tls_io;
- // Pointer to the function that creates the TLS I/O (internal use only).
- TLS_IO_TRANSPORT_PROVIDER tls_io_transport_provider;
- // AMQP SASL I/O transport created on top of the TLS I/O layer.
- XIO_HANDLE sasl_io;
- // AMQP SASL I/O mechanism to be used.
- SASL_MECHANISM_HANDLE sasl_mechanism;
- // AMQP connection.
- CONNECTION_HANDLE connection;
- // Current AMQP connection state;
- AMQP_MANAGEMENT_STATE connection_state;
- // Last time the AMQP connection establishment was initiated.
- size_t connection_establish_time;
- // AMQP session.
- SESSION_HANDLE session;
- // AMQP link used by the event sender.
- LINK_HANDLE sender_link;
- // uAMQP event sender.
- MESSAGE_SENDER_HANDLE message_sender;
- // Internal flag that controls if messages should be received or not.
- bool receive_messages;
- // AMQP link used by the message receiver.
- LINK_HANDLE receiver_link;
- // uAMQP message receiver.
- MESSAGE_RECEIVER_HANDLE message_receiver;
- // List with events still pending to be sent. It is provided by the upper layer.
- PDLIST_ENTRY waitingToSend;
- // Internal list with the items currently being processed/sent through uAMQP.
- DLIST_ENTRY inProgress;
- // Connection instance with the Azure IoT CBS.
- CBS_HANDLE cbs;
- // Current state of the CBS connection.
- CBS_STATE cbs_state;
- // Time when the current SAS token was created, in seconds since epoch.
- size_t current_sas_token_create_time;
+ // FQDN of the IoT Hub.
+ STRING_HANDLE iotHubHostFqdn;
+ // AMQP port of the IoT Hub.
+ int iotHubPort;
+ // Certificates to be used by the TLS I/O.
+ char* trusted_certificates;
+ // Key associated to the device to be used.
+ STRING_HANDLE deviceKey;
+ // Address to which the transport will connect to and send events.
+ STRING_HANDLE targetAddress;
+ // Address to which the transport will connect to and receive messages from.
+ STRING_HANDLE messageReceiveAddress;
+ // A component of the SAS token. Currently this must be an empty string.
+ STRING_HANDLE sasTokenKeyName;
+ // Internal parameter that identifies the current logical device within the service.
+ STRING_HANDLE devicesPath;
+ // How long a SAS token created by the transport is valid, in milliseconds.
+ size_t sas_token_lifetime;
+ // Maximum period of time for the transport to wait before refreshing the SAS token it created previously, in milliseconds.
+ size_t sas_token_refresh_time;
+ // Maximum time the transport waits for uAMQP cbs_put_token() to complete before marking it a failure, in milliseconds.
+ size_t cbs_request_timeout;
+ // Maximum time the transport waits for an event to be sent before marking it a failure, in milliseconds.
+ size_t message_send_timeout;
+ // Maximum time for the connection establishment/retry logic should wait for a connection to succeed, in milliseconds.
+ size_t connection_timeout;
+ // Saved reference to the IoTHub LL Client.
+ IOTHUB_CLIENT_LL_HANDLE iothub_client_handle;
+
+ // TSL I/O transport.
+ XIO_HANDLE tls_io;
+ // Pointer to the function that creates the TLS I/O (internal use only).
+ TLS_IO_TRANSPORT_PROVIDER tls_io_transport_provider;
+ // AMQP SASL I/O transport created on top of the TLS I/O layer.
+ XIO_HANDLE sasl_io;
+ // AMQP SASL I/O mechanism to be used.
+ SASL_MECHANISM_HANDLE sasl_mechanism;
+ // AMQP connection.
+ CONNECTION_HANDLE connection;
+ // Current AMQP connection state;
+ AMQP_MANAGEMENT_STATE connection_state;
+ // Last time the AMQP connection establishment was initiated.
+ size_t connection_establish_time;
+ // AMQP session.
+ SESSION_HANDLE session;
+ // AMQP link used by the event sender.
+ LINK_HANDLE sender_link;
+ // uAMQP event sender.
+ MESSAGE_SENDER_HANDLE message_sender;
+ // Internal flag that controls if messages should be received or not.
+ bool receive_messages;
+ // AMQP link used by the message receiver.
+ LINK_HANDLE receiver_link;
+ // uAMQP message receiver.
+ MESSAGE_RECEIVER_HANDLE message_receiver;
+ // List with events still pending to be sent. It is provided by the upper layer.
+ PDLIST_ENTRY waitingToSend;
+ // Internal list with the items currently being processed/sent through uAMQP.
+ DLIST_ENTRY inProgress;
+ // Connection instance with the Azure IoT CBS.
+ CBS_HANDLE cbs;
+ // Current state of the CBS connection.
+ CBS_STATE cbs_state;
+ // Time when the current SAS token was created, in seconds since epoch.
+ size_t current_sas_token_create_time;
+ // Mark if device is registered in transport (only one device per transport).
+ bool isRegistered;
} AMQP_TRANSPORT_INSTANCE;
// This structure is used to track an event being sent and the time it was sent.
@@ -145,29 +150,33 @@
static STRING_HANDLE concat3Params(const char* prefix, const char* infix, const char* suffix)
{
STRING_HANDLE result = NULL;
- char* concat = NULL;
+ char* concat;
size_t totalLength = strlen(prefix) + strlen(infix) + strlen(suffix) + 1; // One extra for \0.
- if ((concat = (char*)malloc(sizeof(char) * totalLength)) != NULL)
+ if ((concat = (char*)malloc(totalLength)) != NULL)
{
- if (snprintf(concat, totalLength, "%s%s%s", prefix, infix, suffix) > 0)
- {
- result = STRING_construct(concat);
- }
+ (void)strcpy(concat, prefix);
+ (void)strcat(concat, infix);
+ (void)strcat(concat, suffix);
+ result = STRING_construct(concat);
free(concat);
}
+ else
+ {
+ result = NULL;
+ }
return result;
}
-static size_t getSecondsSinceEpoch()
+static size_t getSecondsSinceEpoch(void)
{
- return (size_t)(difftime(get_time(NULL), (time_t)0) + 0);
+ return (size_t)(difftime(get_time(NULL), (time_t)0));
}
static EVENT_TRACKER* trackEvent(IOTHUB_MESSAGE_LIST* message, AMQP_TRANSPORT_INSTANCE* transport_state)
{
- EVENT_TRACKER* event_tracker = NULL;
+ EVENT_TRACKER* event_tracker;
if ((event_tracker = (EVENT_TRACKER*)malloc(sizeof(EVENT_TRACKER))) != NULL)
{
@@ -190,13 +199,17 @@
static IOTHUB_MESSAGE_LIST* getNextEventToSend(AMQP_TRANSPORT_INSTANCE* transport_state)
{
- IOTHUB_MESSAGE_LIST* message = NULL;
+ IOTHUB_MESSAGE_LIST* message;
if (!DList_IsListEmpty(transport_state->waitingToSend))
{
PDLIST_ENTRY list_entry = transport_state->waitingToSend->Flink;
message = containingRecord(list_entry, IOTHUB_MESSAGE_LIST, entry);
}
+ else
+ {
+ message = NULL;
+ }
return message;
}
@@ -231,46 +244,43 @@
}
}
-static void on_message_send_complete(const void* context, MESSAGE_SEND_RESULT send_result)
+static void on_message_send_complete(void* context, MESSAGE_SEND_RESULT send_result)
{
- if (context != NULL)
+ EVENT_TRACKER* event_tracker = (EVENT_TRACKER*)context;
+
+ IOTHUB_CLIENT_RESULT iot_hub_send_result;
+
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_142: [The callback 'on_message_send_complete' shall pass to the upper layer callback an IOTHUB_CLIENT_CONFIRMATION_OK if the result received is MESSAGE_SEND_OK]
+ if (send_result == MESSAGE_SEND_OK)
{
- EVENT_TRACKER* event_tracker = (EVENT_TRACKER*)context;
-
- IOTHUB_CLIENT_RESULT iot_hub_send_result;
+ iot_hub_send_result = IOTHUB_CLIENT_CONFIRMATION_OK;
+ }
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_143: [The callback 'on_message_send_complete' shall pass to the upper layer callback an IOTHUB_CLIENT_CONFIRMATION_ERROR if the result received is MESSAGE_SEND_ERROR]
+ else if (send_result == MESSAGE_SEND_ERROR)
+ {
+ iot_hub_send_result = IOTHUB_CLIENT_CONFIRMATION_ERROR;
+ }
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_142: [The callback on_message_send_complete shall pass to the upper layer callback an IOTHUB_CLIENT_CONFIRMATION_OK if the result received is MESSAGE_SEND_OK]
- if (send_result == MESSAGE_SEND_OK)
- {
- iot_hub_send_result = IOTHUB_CLIENT_CONFIRMATION_OK;
- }
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_143: [The callback on_message_send_complete shall pass to the upper layer callback an IOTHUB_CLIENT_CONFIRMATION_ERROR if the result received is MESSAGE_SEND_ERROR]
- else if (send_result == MESSAGE_SEND_ERROR)
- {
- iot_hub_send_result = IOTHUB_CLIENT_CONFIRMATION_ERROR;
- }
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_102: [The callback 'on_message_send_complete' shall invoke the upper layer callback for message received if provided]
+ if (event_tracker->message->callback != NULL)
+ {
+ event_tracker->message->callback(iot_hub_send_result, event_tracker->message->context);
+ }
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_102: [The callback on_message_send_complete shall invoke the upper layer callback for message received if provided]
- if (event_tracker->message->callback != NULL)
- {
- event_tracker->message->callback(iot_hub_send_result, event_tracker->message->context);
- }
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_151: [The callback 'on_message_send_complete' shall destroy the message handle (IOTHUB_MESSAGE_HANDLE) using IoTHubMessage_Destroy()]
+ IoTHubMessage_Destroy(event_tracker->message->messageHandle);
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_152: [The callback 'on_message_send_complete' shall destroy the IOTHUB_MESSAGE_LIST instance]
+ free(event_tracker->message);
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_151: [The callback on_message_send_complete shall destroy the message handle (IOTHUB_MESSAGE_HANDLE) using IoTHubMessage_Destroy()]
- IoTHubMessage_Destroy(event_tracker->message->messageHandle);
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_152: [The callback on_message_send_complete shall destroy the IOTHUB_MESSAGE_LIST instance]
- free(event_tracker->message);
-
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_100: [The callback on_message_send_complete shall remove the target message from the in-progress list after the upper layer callback]
- if (isEventInInProgressList(event_tracker))
- {
- removeEventFromInProgressList(event_tracker);
- destroyEventTracker(event_tracker);
- }
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_100: [The callback 'on_message_send_complete' shall remove the target message from the in-progress list after the upper layer callback]
+ if (isEventInInProgressList(event_tracker))
+ {
+ removeEventFromInProgressList(event_tracker);
+ destroyEventTracker(event_tracker);
}
}
-static void on_put_token_complete(const void* context, CBS_OPERATION_RESULT operation_result, unsigned int status_code, const char* status_description)
+static void on_put_token_complete(void* context, CBS_OPERATION_RESULT operation_result, unsigned int status_code, const char* status_description)
{
AMQP_TRANSPORT_INSTANCE* transportState = (AMQP_TRANSPORT_INSTANCE*)context;
@@ -280,16 +290,14 @@
}
}
-AMQP_VALUE on_message_received(const void* context, MESSAGE_HANDLE message)
+static AMQP_VALUE on_message_received(const void* context, MESSAGE_HANDLE message)
{
AMQP_VALUE result = NULL;
- IOTHUBMESSAGE_DISPOSITION_RESULT disposition_result = IOTHUBMESSAGE_REJECTED;
+ IOTHUBMESSAGE_DISPOSITION_RESULT disposition_result;
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_104: [The callback on_message_received shall invoke IoTHubClient_LL_MessageCallback() passing the client and the incoming message handles as parameters]
- IOTHUB_CLIENT_LL_HANDLE iothub_client_handle = (IOTHUB_CLIENT_LL_HANDLE)context;
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_104: [The callback 'on_message_received' shall invoke IoTHubClient_LL_MessageCallback() passing the client and the incoming message handles as parameters]
IOTHUB_MESSAGE_HANDLE iothub_message = NULL;
MESSAGE_BODY_TYPE body_type;
-
if (message_get_body_type(message, &body_type) != 0)
{
@@ -313,23 +321,24 @@
if (iothub_message == NULL)
{
+ disposition_result = IOTHUBMESSAGE_REJECTED;
LogError("Transport failed processing the message received.\r\n");
}
else
{
disposition_result = IoTHubClient_LL_MessageCallback((IOTHUB_CLIENT_LL_HANDLE)context, iothub_message);
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_105: [The callback on_message_received shall return the result of messaging_delivery_accepted() if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_ACCEPTED]
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_105: [The callback 'on_message_received' shall return the result of messaging_delivery_accepted() if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_ACCEPTED]
if (disposition_result == IOTHUBMESSAGE_ACCEPTED)
{
result = messaging_delivery_accepted();
}
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_106: [The callback on_message_received shall return the result of messaging_delivery_released() if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_ABANDONED]
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_106: [The callback 'on_message_received' shall return the result of messaging_delivery_released() if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_ABANDONED]
else if (disposition_result == IOTHUBMESSAGE_ABANDONED)
{
result = messaging_delivery_released();
}
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_107: [The callback on_message_received shall return the result of messaging_delivery_rejected(Rejected by application, Rejected by application) if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_REJECTED]
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_107: [The callback 'on_message_received' shall return the result of messaging_delivery_rejected("Rejected by application", "Rejected by application") if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_REJECTED]
else if (disposition_result == IOTHUBMESSAGE_REJECTED)
{
result = messaging_delivery_rejected("Rejected by application", "Rejected by application");
@@ -339,61 +348,60 @@
return result;
}
-XIO_HANDLE getTLSIOTransport(const char* fqdn, int port, const char* certificates)
+static XIO_HANDLE getTLSIOTransport(const char* fqdn, int port, const char* certificates)
{
- TLSIO_CONFIG tls_io_config = { fqdn, port };
- const IO_INTERFACE_DESCRIPTION* io_interface_description = platform_get_default_tlsio();
- return xio_create(io_interface_description, &tls_io_config, NULL);
+ TLSIO_CONFIG tls_io_config = { fqdn, port };
+ const IO_INTERFACE_DESCRIPTION* io_interface_description = platform_get_default_tlsio();
+ return xio_create(io_interface_description, &tls_io_config, NULL);
}
static void destroyConnection(AMQP_TRANSPORT_INSTANCE* transport_state)
{
- if (transport_state != NULL)
+ if (transport_state->cbs != NULL)
{
- if (transport_state->cbs != NULL)
- {
- cbs_destroy(transport_state->cbs);
- transport_state->cbs = NULL;
- }
+ cbs_destroy(transport_state->cbs);
+ transport_state->cbs = NULL;
+ }
- if (transport_state->session != NULL)
- {
- session_destroy(transport_state->session);
- transport_state->session = NULL;
- }
+ if (transport_state->session != NULL)
+ {
+ session_destroy(transport_state->session);
+ transport_state->session = NULL;
+ }
- if (transport_state->connection != NULL)
- {
- connection_destroy(transport_state->connection);
- transport_state->connection = NULL;
- }
+ if (transport_state->connection != NULL)
+ {
+ connection_destroy(transport_state->connection);
+ transport_state->connection = NULL;
+ }
- if (transport_state->sasl_io != NULL)
- {
- xio_destroy(transport_state->sasl_io);
- transport_state->sasl_io = NULL;
- }
+ if (transport_state->sasl_io != NULL)
+ {
+ xio_destroy(transport_state->sasl_io);
+ transport_state->sasl_io = NULL;
+ }
- if (transport_state->sasl_mechanism != NULL)
- {
- saslmechanism_destroy(transport_state->sasl_mechanism);
- transport_state->sasl_mechanism = NULL;
- }
+ if (transport_state->sasl_mechanism != NULL)
+ {
+ saslmechanism_destroy(transport_state->sasl_mechanism);
+ transport_state->sasl_mechanism = NULL;
+ }
- if (transport_state->tls_io != NULL)
- {
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_034: [IoTHubTransportAMQP_Destroy shall destroy the AMQP TLS I/O transport.]
- xio_destroy(transport_state->tls_io);
- transport_state->tls_io = NULL;
- }
- }
+ if (transport_state->tls_io != NULL)
+ {
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_034: [IoTHubTransportAMQP_Destroy shall destroy the AMQP TLS I/O transport.]
+ xio_destroy(transport_state->tls_io);
+ transport_state->tls_io = NULL;
+ }
}
static void on_amqp_management_state_changed(void* context, AMQP_MANAGEMENT_STATE new_amqp_management_state, AMQP_MANAGEMENT_STATE previous_amqp_management_state)
{
- if (context != NULL)
+ (void)previous_amqp_management_state;
+ AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)context;
+
+ if (transport_state != NULL)
{
- AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)context;
transport_state->connection_state = new_amqp_management_state;
}
}
@@ -404,13 +412,13 @@
// Codes_SRS_IOTHUBTRANSPORTAMQP_09_110: [IoTHubTransportAMQP_DoWork shall create the TLS IO using transport_state->io_transport_provider callback function]
if (transport_state->tls_io == NULL &&
- (transport_state->tls_io = transport_state->tls_io_transport_provider(STRING_c_str(transport_state->iotHubHostFqdn), transport_state->iotHubPort, transport_state->trusted_certificates)) == NULL)
+ (transport_state->tls_io = transport_state->tls_io_transport_provider(STRING_c_str(transport_state->iotHubHostFqdn), transport_state->iotHubPort, transport_state->trusted_certificates)) == NULL)
{
// Codes_SRS_IOTHUBTRANSPORTAMQP_09_136: [If transport_state->io_transport_provider_callback fails, IoTHubTransportAMQP_DoWork shall fail and return immediately]
result = RESULT_FAILURE;
- LogError("Failed to obtain a TLS I/O transport layer.\r\n");
+ LogError("Failed to obtain a TLS I/O transport layer.\r\n");
}
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_056: [IoTHubTransportAMQP_DoWork shall create the SASL mechanism using AMQPs saslmechanism_create() API]
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_056: [IoTHubTransportAMQP_DoWork shall create the SASL mechanism using AMQP's saslmechanism_create() API]
else if ((transport_state->sasl_mechanism = saslmechanism_create(saslmssbcbs_get_interface(), NULL)) == NULL)
{
// Codes_SRS_IOTHUBTRANSPORTAMQP_09_057: [If saslmechanism_create() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately]
@@ -443,13 +451,13 @@
}
else
{
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_065: [IoTHubTransportAMQP_DoWork shall apply a default value of UINT_MAX for the parameter AMQP incoming window]
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_065: [IoTHubTransportAMQP_DoWork shall apply a default value of UINT_MAX for the parameter 'AMQP incoming window']
if (session_set_incoming_window(transport_state->session, (uint32_t)DEFAULT_INCOMING_WINDOW_SIZE) != 0)
{
LogError("Failed to set the AMQP incoming window size.\r\n");
}
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_115: [IoTHubTransportAMQP_DoWork shall apply a default value of 100 for the parameter AMQP outgoing window]
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_115: [IoTHubTransportAMQP_DoWork shall apply a default value of 100 for the parameter 'AMQP outgoing window']
if (session_set_outgoing_window(transport_state->session, DEFAULT_OUTGOING_WINDOW_SIZE) != 0)
{
LogError("Failed to set the AMQP outgoing window size.\r\n");
@@ -463,7 +471,7 @@
LogError("Failed to create the CBS connection.\r\n");
}
// Codes_SRS_IOTHUBTRANSPORTAMQP_09_139: [IoTHubTransportAMQP_DoWork shall open the CBS connection using the cbs_open() AMQP API]
- else if ((cbs_open(transport_state->cbs)) != 0)
+ else if (cbs_open(transport_state->cbs) != 0)
{
// Codes_SRS_IOTHUBTRANSPORTAMQP_09_140: [If cbs_open() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately]
result = RESULT_FAILURE;
@@ -488,11 +496,11 @@
static int startAuthentication(AMQP_TRANSPORT_INSTANCE* transport_state)
{
- int result = RESULT_FAILURE;
+ int result;
size_t sas_token_create_time = getSecondsSinceEpoch(); // I.e.: NOW, in seconds since epoch.
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_083: [Each new SAS token created by the transport shall be valid for up to sas_token_lifetime milliseconds from the time of creation]
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_083: [Each new SAS token created by the transport shall be valid for up to 'sas_token_lifetime' milliseconds from the time of creation]
size_t new_expiry_time = sas_token_create_time + (transport_state->sas_token_lifetime / 1000);
STRING_HANDLE newSASToken = SASToken_Create(transport_state->deviceKey, transport_state->devicesPath, transport_state->sasTokenKeyName, new_expiry_time);
@@ -500,10 +508,12 @@
if (newSASToken == NULL)
{
LogError("Could not generate a new SAS token for the CBS\r\n");
+ result = RESULT_FAILURE;
}
else if (cbs_put_token(transport_state->cbs, CBS_AUDIENCE, STRING_c_str(transport_state->devicesPath), STRING_c_str(newSASToken), on_put_token_complete, transport_state) != RESULT_OK)
{
LogError("Failed applying new SAS token to CBS\r\n");
+ result = RESULT_FAILURE;
}
else
{
@@ -514,7 +524,9 @@
// Codes_SRS_IOTHUBTRANSPORTAMQP_09_145: [Each new SAS token created shall be deleted from memory immediately after sending it to CBS]
if (newSASToken != NULL)
+ {
STRING_delete(newSASToken);
+ }
return result;
}
@@ -533,10 +545,10 @@
{
EVENT_TRACKER* event_tracker = containingRecord(entry, EVENT_TRACKER, entry);
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_085: [IoTHubTransportAMQP_DoWork shall attempt to send all the queued messages for up to message_send_timeout milliseconds]
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_085: [IoTHubTransportAMQP_DoWork shall attempt to send all the queued messages for up to 'message_send_timeout' milliseconds]
if (difftime(current_time, event_tracker->time_sent) * 1000 >= transport_state->message_send_timeout)
{
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_120: [If a message_send_timeout occurs the timed out events removed from the inProgress and the upper layer notified of the send error]
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_120: [If a 'message_send_timeout' occurs the timed out events removed from the inProgress and the upper layer notified of the send error]
on_message_send_complete(event_tracker, MESSAGE_SEND_ERROR);
}
@@ -546,9 +558,9 @@
static void attachDeviceClientTypeToLink(LINK_HANDLE link)
{
- fields attach_properties = NULL;
- AMQP_VALUE deviceClientTypeKeyName = NULL;
- AMQP_VALUE deviceClientTypeValue = NULL;
+ fields attach_properties;
+ AMQP_VALUE deviceClientTypeKeyName;
+ AMQP_VALUE deviceClientTypeValue;
int result;
//
@@ -565,46 +577,49 @@
{
LogError("Failed to create the map for device client type.\r\n");
}
- else if ((deviceClientTypeKeyName = amqpvalue_create_symbol("com.microsoft:client-version")) == NULL)
- {
- LogError("Failed to create the key name for the device client type.\r\n");
- }
- else if ((deviceClientTypeValue = amqpvalue_create_string(CLIENT_DEVICE_TYPE_PREFIX CLIENT_DEVICE_BACKSLASH IOTHUB_SDK_VERSION)) == NULL)
- {
- LogError("Failed to create the key value for the device client type.\r\n");
- }
- else if ((result = amqpvalue_set_map_value(attach_properties, deviceClientTypeKeyName, deviceClientTypeValue)) != 0)
+ else
{
- LogError("Failed to set the property map for the device client type. Error code is: %d\r\n", result);
+ if ((deviceClientTypeKeyName = amqpvalue_create_symbol("com.microsoft:client-version")) == NULL)
+ {
+ LogError("Failed to create the key name for the device client type.\r\n");
+ }
+ else
+ {
+ if ((deviceClientTypeValue = amqpvalue_create_string(CLIENT_DEVICE_TYPE_PREFIX CLIENT_DEVICE_BACKSLASH IOTHUB_SDK_VERSION)) == NULL)
+ {
+ LogError("Failed to create the key value for the device client type.\r\n");
+ }
+ else
+ {
+ if ((result = amqpvalue_set_map_value(attach_properties, deviceClientTypeKeyName, deviceClientTypeValue)) != 0)
+ {
+ LogError("Failed to set the property map for the device client type. Error code is: %d\r\n", result);
+ }
+ else if ((result = link_set_attach_properties(link, attach_properties)) != 0)
+ {
+ LogError("Unable to attach the device client type to the link properties. Error code is: %d\r\n", result);
+ }
+
+ amqpvalue_destroy(deviceClientTypeValue);
+ }
+
+ amqpvalue_destroy(deviceClientTypeKeyName);
+ }
+
+ amqpvalue_destroy(attach_properties);
}
- else if ((result = link_set_attach_properties(link, attach_properties)) != 0)
- {
- LogError("Unable to attach the device client type to the link properties. Error code is: %d\r\n", result);
-}
- amqpvalue_destroy(attach_properties);
- amqpvalue_destroy(deviceClientTypeKeyName);
- amqpvalue_destroy(deviceClientTypeValue);
-
}
-static int destroyEventSender(AMQP_TRANSPORT_INSTANCE* transport_state)
+static void destroyEventSender(AMQP_TRANSPORT_INSTANCE* transport_state)
{
- int result = RESULT_FAILURE;
-
if (transport_state->message_sender != NULL)
{
messagesender_destroy(transport_state->message_sender);
-
transport_state->message_sender = NULL;
link_destroy(transport_state->sender_link);
-
transport_state->sender_link = NULL;
-
- result = RESULT_OK;
}
-
- return result;
}
static int createEventSender(AMQP_TRANSPORT_INSTANCE* transport_state)
@@ -616,7 +631,7 @@
AMQP_VALUE source = NULL;
AMQP_VALUE target = NULL;
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_068: [IoTHubTransportAMQP_DoWork shall create the AMQP link for sending messages using source as ingress, target as the IoT hub FQDN, link name as sender-link and role as role_sender]
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_068: [IoTHubTransportAMQP_DoWork shall create the AMQP link for sending messages using 'source' as "ingress", target as the IoT hub FQDN, link name as "sender-link" and role as 'role_sender']
if ((source = messaging_create_source(MESSAGE_SENDER_SOURCE_ADDRESS)) == NULL)
{
LogError("Failed creating AMQP messaging source attribute.\r\n");
@@ -632,7 +647,7 @@
}
else
{
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_119: [IoTHubTransportAMQP_DoWork shall apply a default value of 65536 for the parameter Link MAX message size]
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_119: [IoTHubTransportAMQP_DoWork shall apply a default value of 65536 for the parameter 'Link MAX message size']
if (link_set_max_message_size(transport_state->sender_link, MESSAGE_SENDER_MAX_LINK_SIZE) != RESULT_OK)
{
LogError("Failed setting AMQP link max message size.\r\n");
@@ -704,7 +719,7 @@
AMQP_VALUE source = NULL;
AMQP_VALUE target = NULL;
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_074: [IoTHubTransportAMQP_DoWork shall create the AMQP link for receiving messages using source as messageReceiveAddress, target as the ingress-rx, link name as receiver-link and role as role_receiver]
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_074: [IoTHubTransportAMQP_DoWork shall create the AMQP link for receiving messages using 'source' as messageReceiveAddress, target as the "ingress-rx", link name as "receiver-link" and role as 'role_receiver']
if ((source = messaging_create_source(STRING_c_str(transport_state->messageReceiveAddress))) == NULL)
{
LogError("Failed creating AMQP message receiver source attribute.\r\n");
@@ -726,7 +741,7 @@
}
else
{
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_119: [IoTHubTransportAMQP_DoWork shall apply a default value of 65536 for the parameter Link MAX message size]
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_119: [IoTHubTransportAMQP_DoWork shall apply a default value of 65536 for the parameter 'Link MAX message size']
if (link_set_max_message_size(transport_state->receiver_link, MESSAGE_RECEIVER_MAX_LINK_SIZE) != RESULT_OK)
{
LogError("Failed setting AMQP link max message size for message receiver.\r\n");
@@ -743,7 +758,7 @@
else
{
// Codes_SRS_IOTHUBTRANSPORTAMQP_09_079: [IoTHubTransportAMQP_DoWork shall open the AMQP message receiver using messagereceiver_open() AMQP API, passing a callback function for handling C2D incoming messages]
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_123: [IoTHubTransportAMQP_DoWork shall create each AMQP message_receiver passing the on_message_received as the callback function]
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_123: [IoTHubTransportAMQP_DoWork shall create each AMQP message_receiver passing the 'on_message_received' as the callback function]
if (messagereceiver_open(transport_state->message_receiver, on_message_received, (const void*)iothub_client_handle) != RESULT_OK)
{
// Codes_SRS_IOTHUBTRANSPORTAMQP_09_080: [IoTHubTransportAMQP_DoWork shall fail and return immediately if the AMQP message receiver instance fails to be opened, flagging the connection to be re-established]
@@ -765,23 +780,129 @@
return result;
}
+static int addPropertiesTouAMQPMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, MESSAGE_HANDLE uamqp_message)
+{
+ int result;
+ MAP_HANDLE properties_map;
+ const char* const* propertyKeys;
+ const char* const* propertyValues;
+ size_t propertyCount;
+
+ /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_007: [The IoTHub message properties shall be obtained by calling IoTHubMessage_Properties.] */
+ properties_map = IoTHubMessage_Properties(iothub_message_handle);
+ if (properties_map == NULL)
+ {
+ /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */
+ LogError("Failed to get property map from IoTHub message.\r\n");
+ result = __LINE__;
+ }
+ /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_015: [The actual keys and values, as well as the number of properties shall be obtained by calling Map_GetInternals on the handle obtained from IoTHubMessage_Properties.] */
+ else if (Map_GetInternals(properties_map, &propertyKeys, &propertyValues, &propertyCount) != MAP_OK)
+ {
+ /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */
+ LogError("Failed to get the internals of the property map.\r\n");
+ result = __LINE__;
+ }
+ else
+ {
+ /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_016: [If the number of properties is 0, no uAMQP map shall be created and no application properties shall be set on the uAMQP message.] */
+ if (propertyCount != 0)
+ {
+ size_t i;
+ /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_009: [The uAMQP map shall be created by calling amqpvalue_create_map.] */
+ AMQP_VALUE uamqp_map = amqpvalue_create_map();
+ if (uamqp_map == NULL)
+ {
+ /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */
+ LogError("Failed to create uAMQP map for the properties.\r\n");
+ result = __LINE__;
+ }
+ else
+ {
+ for (i = 0; i < propertyCount; i++)
+ {
+ /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_010: [A key uAMQP value shall be created by using amqpvalue_create_string.] */
+ AMQP_VALUE map_key_value = amqpvalue_create_string(propertyKeys[i]);
+ if (map_key_value == NULL)
+ {
+ /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */
+ LogError("Failed to create uAMQP property key value.\r\n");
+ break;
+ }
+
+ /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_011: [A value uAMQP value shall be created by using amqpvalue_create_string.] */
+ AMQP_VALUE map_value_value = amqpvalue_create_string(propertyValues[i]);
+ if (map_value_value == NULL)
+ {
+ amqpvalue_destroy(map_key_value);
+ /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */
+ LogError("Failed to create uAMQP property key value.\r\n");
+ break;
+ }
+
+ /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_008: [All properties shall be transferred to a uAMQP map.] */
+ /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_012: [The key/value pair for the property shall be set into the uAMQP property map by calling amqpvalue_map_set_value.] */
+ if (amqpvalue_set_map_value(uamqp_map, map_key_value, map_value_value) != 0)
+ {
+ amqpvalue_destroy(map_key_value);
+ amqpvalue_destroy(map_value_value);
+ /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */
+ LogError("Failed to create uAMQP property key value.\r\n");
+ break;
+ }
+
+ amqpvalue_destroy(map_key_value);
+ amqpvalue_destroy(map_value_value);
+ }
+
+ if (i < propertyCount)
+ {
+ result = __LINE__;
+ }
+ else
+ {
+ /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_013: [After all properties have been filled in the uAMQP map, the uAMQP properties map shall be set on the uAMQP message by calling message_set_application_properties.] */
+ if (message_set_application_properties(uamqp_message, uamqp_map) != 0)
+ {
+ /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */
+ LogError("Failed to transfer the message properties to the uAMQP message.\r\n");
+ result = __LINE__;
+ }
+ else
+ {
+ result = 0;
+ }
+ }
+
+ amqpvalue_destroy(uamqp_map);
+ }
+ }
+ else
+ {
+ result = 0;
+ }
+ }
+
+ return result;
+}
+
static int sendPendingEvents(AMQP_TRANSPORT_INSTANCE* transport_state)
{
int result = RESULT_OK;
- IOTHUB_MESSAGE_LIST* message = NULL;
+ IOTHUB_MESSAGE_LIST* message;
while ((message = getNextEventToSend(transport_state)) != NULL)
{
result = RESULT_FAILURE;
IOTHUBMESSAGE_CONTENT_TYPE contentType = IoTHubMessage_GetContentType(message->messageHandle);
- const unsigned char* messageContent = NULL;
- size_t messageContentSize = 0;
+ const unsigned char* messageContent;
+ size_t messageContentSize;
MESSAGE_HANDLE amqp_message;
bool is_message_error = false;
EVENT_TRACKER* event_tracker;
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_086: [IoTHubTransportAMQP_DoWork shall move queued events to an in-progress list right before processing them for sending]
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_086: [IoTHubTransportAMQP_DoWork shall move queued events to an "in-progress" list right before processing them for sending]
if ((event_tracker = trackEvent(message, transport_state)) == NULL)
{
LogError("Failed tracking the event to be sent.\r\n");
@@ -796,7 +917,7 @@
// Codes_SRS_IOTHUBTRANSPORTAMQP_09_089: [If the event contains a message of type IOTHUBMESSAGE_STRING, IoTHubTransportAMQP_DoWork shall obtain its char* representation using IoTHubMessage_GetString()]
// Codes_SRS_IOTHUBTRANSPORTAMQP_09_090: [If the event contains a message of type IOTHUBMESSAGE_STRING, IoTHubTransportAMQP_DoWork shall obtain the size of its char* representation using strlen()]
else if (contentType == IOTHUBMESSAGE_STRING &&
- ((messageContent = IoTHubMessage_GetString(message->messageHandle)) == NULL || (messageContentSize = strlen(messageContent)) <= 0))
+ ((messageContent = IoTHubMessage_GetString(message->messageHandle)) == NULL))
{
LogError("Failed getting the STRING representation of the event content to be sent.\r\n");
is_message_error = true;
@@ -807,30 +928,46 @@
LogError("Cannot send events with content type IOTHUBMESSAGE_UNKNOWN.\r\n");
is_message_error = true;
}
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_093: [IoTHubTransportAMQP_DoWork shall create an amqp message using message_create() AMQP API]
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_093: [IoTHubTransportAMQP_DoWork shall create an amqp message using message_create() uAMQP API]
else if ((amqp_message = message_create()) == NULL)
{
LogError("Failed allocating the AMQP message for sending the event.\r\n");
}
else
{
- BINARY_DATA binary_data = { messageContent, messageContentSize };
+ BINARY_DATA binary_data;
+
+ if (contentType == IOTHUBMESSAGE_STRING)
+ {
+ messageContentSize = strlen(messageContent);
+ }
+
+ binary_data.bytes = messageContent;
+ binary_data.length = messageContentSize;
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_095: [IoTHubTransportAMQP_DoWork shall set the AMQP message body using message_add_body_amqp_data() AMQP API]
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_095: [IoTHubTransportAMQP_DoWork shall set the AMQP message body using message_add_body_amqp_data() uAMQP API]
if (message_add_body_amqp_data(amqp_message, binary_data) != RESULT_OK)
{
LogError("Failed setting the body of the AMQP message.\r\n");
}
else
{
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_097: [IoTHubTransportAMQP_DoWork shall pass the encoded AMQP message to AMQP for sending (along with on_message_send_complete callback) using messagesender_send()]
- if (messagesender_send(transport_state->message_sender, amqp_message, on_message_send_complete, event_tracker) != RESULT_OK)
+ if (addPropertiesTouAMQPMessage(message->messageHandle, amqp_message) != 0)
{
- LogError("Failed sending the AMQP message.\r\n");
+ /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */
+ is_message_error = true;
}
else
{
- result = RESULT_OK;
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_097: [IoTHubTransportAMQP_DoWork shall pass the encoded AMQP message to AMQP for sending (along with on_message_send_complete callback) using messagesender_send()]
+ if (messagesender_send(transport_state->message_sender, amqp_message, on_message_send_complete, event_tracker) != RESULT_OK)
+ {
+ LogError("Failed sending the AMQP message.\r\n");
+ }
+ else
+ {
+ result = RESULT_OK;
+ }
}
}
}
@@ -888,7 +1025,7 @@
{
AMQP_TRANSPORT_INSTANCE* transport_state = NULL;
bool cleanup_required = false;
- int deviceIdLength = -1;
+ size_t deviceIdLength;
// Codes_SRS_IOTHUBTRANSPORTAMQP_09_005: [If parameter config (or its fields) is NULL then IoTHubTransportAMQP_Create shall fail and return NULL.]
if (config == NULL || config->upperConfig == NULL || config->waitingToSend == NULL)
@@ -940,7 +1077,7 @@
}
else
{
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_009: [IoTHubTransportAMQP_Create shall fail and return NULL if memory allocation of the transports internal state structure fails.]
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_009: [IoTHubTransportAMQP_Create shall fail and return NULL if memory allocation of the transport's internal state structure fails.]
transport_state = (AMQP_TRANSPORT_INSTANCE*)malloc(sizeof(AMQP_TRANSPORT_INSTANCE));
if (transport_state == NULL)
@@ -951,7 +1088,7 @@
{
transport_state->iotHubHostFqdn = NULL;
transport_state->iotHubPort = DEFAULT_IOTHUB_AMQP_PORT;
- transport_state->trusted_certificates = NULL;
+ transport_state->trusted_certificates = NULL;
transport_state->deviceKey = NULL;
transport_state->devicesPath = NULL;
transport_state->messageReceiveAddress = NULL;
@@ -975,7 +1112,8 @@
transport_state->sender_link = NULL;
transport_state->session = NULL;
transport_state->tls_io = NULL;
- transport_state->tls_io_transport_provider = getTLSIOTransport;
+ transport_state->tls_io_transport_provider = getTLSIOTransport;
+ transport_state->isRegistered = false;
transport_state->waitingToSend = config->waitingToSend;
DList_InitializeListHead(&transport_state->inProgress);
@@ -986,7 +1124,7 @@
LogError("Failed to set transport_state->iotHubHostFqdn.\r\n");
cleanup_required = true;
}
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_012: [IoTHubTransportAMQP_Create shall create an immutable string, referred to as devicesPath, from the following parts: host_fqdn + /devices/ + deviceId.]
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_012: [IoTHubTransportAMQP_Create shall create an immutable string, referred to as devicesPath, from the following parts: host_fqdn + "/devices/" + deviceId.]
else if ((transport_state->devicesPath = concat3Params(STRING_c_str(transport_state->iotHubHostFqdn), "/devices/", config->upperConfig->deviceId)) == NULL)
{
// Codes_SRS_IOTHUBTRANSPORTAMQP_09_013: [If creating devicesPath fails for any reason then IoTHubTransportAMQP_Create shall fail and return NULL.]
@@ -1014,7 +1152,7 @@
LogError("Failed to allocate transport_state->sasTokenKeyName.\r\n");
cleanup_required = true;
}
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_018: [IoTHubTransportAMQP_Create shall store a copy of config->deviceKey (passed by upper layer) into the transports own deviceKey field]
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_018: [IoTHubTransportAMQP_Create shall store a copy of config->deviceKey (passed by upper layer) into the transport's own deviceKey field]
else if ((transport_state->deviceKey = STRING_new()) == NULL ||
STRING_copy(transport_state->deviceKey, config->upperConfig->deviceKey) != 0)
{
@@ -1127,7 +1265,7 @@
trigger_connection_retry = true;
}
// Codes_SRS_IOTHUBTRANSPORTAMQP_09_081: [IoTHubTransportAMQP_DoWork shall put a new SAS token if the one has not been out already, or if the previous one failed to be put due to timeout of cbs_put_token().]
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_082: [IoTHubTransportAMQP_DoWork shall refresh the SAS token if the current token has been used for more than sas_token_refresh_time milliseconds]
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_082: [IoTHubTransportAMQP_DoWork shall refresh the SAS token if the current token has been used for more than 'sas_token_refresh_time' milliseconds]
else if ((transport_state->cbs_state == CBS_STATE_IDLE || isSasTokenRefreshRequired(transport_state)) &&
startAuthentication(transport_state) != RESULT_OK)
{
@@ -1135,7 +1273,7 @@
LogError("Failed authenticating AMQP connection within CBS.\r\n");
trigger_connection_retry = true;
}
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_084: [IoTHubTransportAMQP_DoWork shall wait for cbs_request_timeout milliseconds for the cbs_put_token() to complete before failing due to timeout]
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_084: [IoTHubTransportAMQP_DoWork shall wait for 'cbs_request_timeout' milliseconds for the cbs_put_token() to complete before failing due to timeout]
else if (transport_state->cbs_state == CBS_STATE_AUTH_IN_PROGRESS &&
verifyAuthenticationTimeout(transport_state) == RESULT_TIMEOUT)
{
@@ -1169,7 +1307,6 @@
else if (sendPendingEvents(transport_state) != RESULT_OK)
{
LogError("AMQP transport failed sending events.\r\n");
- trigger_connection_retry = true;
}
}
@@ -1187,7 +1324,7 @@
}
}
-static int IoTHubTransportAMQP_Subscribe(TRANSPORT_HANDLE handle)
+static int IoTHubTransportAMQP_Subscribe(IOTHUB_DEVICE_HANDLE handle)
{
int result;
@@ -1208,7 +1345,7 @@
return result;
}
-static void IoTHubTransportAMQP_Unsubscribe(TRANSPORT_HANDLE handle)
+static void IoTHubTransportAMQP_Unsubscribe(IOTHUB_DEVICE_HANDLE handle)
{
// Codes_SRS_IOTHUBTRANSPORTAMQP_09_039: [IoTHubTransportAMQP_Unsubscribe shall fail if the transport handle parameter received is NULL.]
if (handle == NULL)
@@ -1223,7 +1360,7 @@
}
}
-static IOTHUB_CLIENT_RESULT IoTHubTransportAMQP_GetSendStatus(TRANSPORT_HANDLE handle, IOTHUB_CLIENT_STATUS *iotHubClientStatus)
+static IOTHUB_CLIENT_RESULT IoTHubTransportAMQP_GetSendStatus(IOTHUB_DEVICE_HANDLE handle, IOTHUB_CLIENT_STATUS *iotHubClientStatus)
{
IOTHUB_CLIENT_RESULT result;
@@ -1279,50 +1416,116 @@
{
AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)handle;
- if (strcmp("trusted_certificates", option) == 0)
- {
- transport_state->trusted_certificates = (char*)value;
- result = IOTHUB_CLIENT_OK;
- }
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_048: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "sas_token_lifetime", returning IOTHUB_CLIENT_OK]
- else if (strcmp("sas_token_lifetime", option) == 0)
- {
- transport_state->sas_token_lifetime = *((size_t*)value);
- result = IOTHUB_CLIENT_OK;
- }
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_049: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "sas_token_refresh_time", returning IOTHUB_CLIENT_OK]
- else if (strcmp("sas_token_refresh_time", option) == 0)
- {
- transport_state->sas_token_refresh_time = *((size_t*)value);
- result = IOTHUB_CLIENT_OK;
- }
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_148: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "cbs_request_timeout", returning IOTHUB_CLIENT_OK]
- else if (strcmp("cbs_request_timeout", option) == 0)
- {
- transport_state->cbs_request_timeout = *((size_t*)value);
- result = IOTHUB_CLIENT_OK;
- }
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_149: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "message_send_timeout", returning IOTHUB_CLIENT_OK]
- else if (strcmp("message_send_timeout", option) == 0)
- {
- transport_state->message_send_timeout = *((size_t*)value);
- result = IOTHUB_CLIENT_OK;
- }
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_047: [If optionName is not an option supported then IotHubTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.]
- else
- {
- result = IOTHUB_CLIENT_INVALID_ARG;
- LogError("Invalid option (%s) passed to uAMQP transport SetOption()\r\n", option);
- }
- }
+ if (strcmp("trusted_certificates", option) == 0)
+ {
+ transport_state->trusted_certificates = (char*)value;
+ result = IOTHUB_CLIENT_OK;
+ }
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_048: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "sas_token_lifetime", returning IOTHUB_CLIENT_OK]
+ else if (strcmp("sas_token_lifetime", option) == 0)
+ {
+ transport_state->sas_token_lifetime = *((size_t*)value);
+ result = IOTHUB_CLIENT_OK;
+ }
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_049: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "sas_token_refresh_time", returning IOTHUB_CLIENT_OK]
+ else if (strcmp("sas_token_refresh_time", option) == 0)
+ {
+ transport_state->sas_token_refresh_time = *((size_t*)value);
+ result = IOTHUB_CLIENT_OK;
+ }
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_148: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "cbs_request_timeout", returning IOTHUB_CLIENT_OK]
+ else if (strcmp("cbs_request_timeout", option) == 0)
+ {
+ transport_state->cbs_request_timeout = *((size_t*)value);
+ result = IOTHUB_CLIENT_OK;
+ }
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_149: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "message_send_timeout", returning IOTHUB_CLIENT_OK]
+ else if (strcmp("message_send_timeout", option) == 0)
+ {
+ transport_state->message_send_timeout = *((size_t*)value);
+ result = IOTHUB_CLIENT_OK;
+ }
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_047: [If optionName is not an option supported then IotHubTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.]
+ else
+ {
+ result = IOTHUB_CLIENT_INVALID_ARG;
+ LogError("Invalid option (%s) passed to uAMQP transport SetOption()\r\n", option);
+ }
+ }
return result;
}
+static IOTHUB_DEVICE_HANDLE IoTHubTransportAMQ_Register(TRANSPORT_HANDLE handle, const char* deviceId, const char* deviceKey, IOTHUB_CLIENT_LL_HANDLE iotHubClientHandle, PDLIST_ENTRY waitingToSend)
+{
+ IOTHUB_DEVICE_HANDLE result;
+ // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_001: [IoTHubTransportAMQ_Register shall return NULL if deviceId, deviceKey or waitingToSend are NULL.]
+ // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_005: [IoTHubTransportAMQ_Register shall return NULL if the TRANSPORT_HANDLE is NULL.]
+ if ((handle ==NULL) || (deviceId == NULL) || (deviceKey == NULL) || (waitingToSend == NULL))
+ {
+ result = NULL;
+ }
+ else
+ {
+ AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)handle;
+
+ STRING_HANDLE devicesPath = concat3Params(STRING_c_str(transport_state->iotHubHostFqdn), "/devices/", deviceId);
+ if (devicesPath == NULL)
+ {
+ LogError("Could not create a comparison string");
+ result = NULL;
+ }
+ else
+ {
+ // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_002: [IoTHubTransportAMQ_Register shall return NULL if deviceId or deviceKey do not match the deviceId and deviceKey passed in during IoTHubTransportAMQP_Create.]
+ if (strcmp(STRING_c_str(transport_state->devicesPath), STRING_c_str(devicesPath)) != 0)
+ {
+ LogError("Attemping to add new device to AMQP transport, not allowed.");
+ result = NULL;
+ }
+ else if (strcmp(STRING_c_str(transport_state->deviceKey), deviceKey) != 0)
+ {
+ LogError("Attemping to add new device to AMQP transport, not allowed.");
+ result = NULL;
+ }
+ else
+ {
+ if (transport_state->isRegistered == true)
+ {
+ LogError("Transport already has device registered by id: [%s]", deviceId);
+ result = NULL;
+ }
+ else
+ {
+ transport_state->isRegistered = true;
+ // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_003: [IoTHubTransportAMQ_Register shall return the TRANSPORT_HANDLE as the IOTHUB_DEVICE_HANDLE.]
+ result = (IOTHUB_DEVICE_HANDLE)handle;
+ }
+ }
+ STRING_delete(devicesPath);
+ }
+ }
+
+ return result;
+}
+
+// Codes_SRS_IOTHUBTRANSPORTUAMQP_17_004: [IoTHubTransportAMQ_Unregister shall return.]
+static void IoTHubTransportAMQ_Unregister(IOTHUB_DEVICE_HANDLE deviceHandle)
+{
+ if (deviceHandle != NULL)
+ {
+ AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)deviceHandle;
+
+ transport_state->isRegistered = false;
+ }
+}
+
static TRANSPORT_PROVIDER thisTransportProvider = {
IoTHubTransportAMQP_SetOption,
IoTHubTransportAMQP_Create,
IoTHubTransportAMQP_Destroy,
+ IoTHubTransportAMQ_Register,
+ IoTHubTransportAMQ_Unregister,
IoTHubTransportAMQP_Subscribe,
IoTHubTransportAMQP_Unsubscribe,
IoTHubTransportAMQP_DoWork,
