Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependents: sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more
Revision 12:841a4c36bd36, committed 2016-04-08
- Comitter:
- Azure.IoT Build
- Date:
- Fri Apr 08 13:23:54 2016 -0700
- Parent:
- 11:62d7b956e76e
- Child:
- 13:a4af7c301e02
- Commit message:
- 1.0.4
Changed in this revision
| iothubtransportamqp.c | Show annotated file Show diff for this revision Revisions of this file |
| iothubtransportamqp.h | Show annotated file Show diff for this revision Revisions of this file |
--- a/iothubtransportamqp.c Fri Mar 25 15:59:44 2016 -0700
+++ b/iothubtransportamqp.c Fri Apr 08 13:23:54 2016 -0700
@@ -8,27 +8,25 @@
#include <stdint.h>
#include <time.h>
#include <limits.h>
-#include "gballoc.h"
+#include "azure_c_shared_utility/gballoc.h"
+#include "azure_c_shared_utility/crt_abstractions.h"
+#include "azure_c_shared_utility/doublylinkedlist.h"
+#include "azure_c_shared_utility/iot_logging.h"
+#include "azure_c_shared_utility/platform.h"
+#include "azure_c_shared_utility/sastoken.h"
+#include "azure_c_shared_utility/strings.h"
+#include "azure_c_shared_utility/urlencode.h"
+#include "azure_c_shared_utility/tlsio.h"
-#include "cbs.h"
-#include "link.h"
-#include "message.h"
-#include "amqpvalue.h"
-#include "message_receiver.h"
-#include "message_sender.h"
-#include "messaging.h"
-#include "sasl_mssbcbs.h"
-#include "saslclientio.h"
-
-#include "crt_abstractions.h"
-#include "doublylinkedlist.h"
-#include "iot_logging.h"
-#include "platform.h"
-#include "sastoken.h"
-#include "strings.h"
-#include "urlencode.h"
-
-#include "tlsio.h"
+#include "azure_uamqp_c/cbs.h"
+#include "azure_uamqp_c/link.h"
+#include "azure_uamqp_c/message.h"
+#include "azure_uamqp_c/amqpvalue.h"
+#include "azure_uamqp_c/message_receiver.h"
+#include "azure_uamqp_c/message_sender.h"
+#include "azure_uamqp_c/messaging.h"
+#include "azure_uamqp_c/sasl_mssbcbs.h"
+#include "azure_uamqp_c/saslclientio.h"
#include "iothub_client_ll.h"
#include "iothub_client_private.h"
@@ -43,7 +41,6 @@
#define DEFAULT_IOTHUB_AMQP_PORT 5671
#define DEFAULT_SAS_TOKEN_LIFETIME_MS 3600000
#define DEFAULT_CBS_REQUEST_TIMEOUT_MS 30000
-#define DEFAULT_MESSAGE_SEND_TIMEOUT_MS 300000
#define CBS_AUDIENCE "servicebus.windows.net:sastoken"
#define DEFAULT_CONTAINER_ID "default_container_id"
#define DEFAULT_INCOMING_WINDOW_SIZE UINT_MAX
@@ -86,8 +83,6 @@
size_t sas_token_refresh_time;
// Maximum time the transport waits for uAMQP cbs_put_token() to complete before marking it a failure, in milliseconds.
size_t cbs_request_timeout;
- // Maximum time the transport waits for an event to be sent before marking it a failure, in milliseconds.
- size_t message_send_timeout;
// Maximum time for the connection establishment/retry logic should wait for a connection to succeed, in milliseconds.
size_t connection_timeout;
// Saved reference to the IoTHub LL Client.
@@ -133,14 +128,6 @@
bool isRegistered;
} AMQP_TRANSPORT_INSTANCE;
-// This structure is used to track an event being sent and the time it was sent.
-typedef struct EVENT_TRACKER_TAG
-{
- IOTHUB_CLIENT_LL_HANDLE iothub_client_handle;
- IOTHUB_MESSAGE_LIST* message;
- time_t time_sent;
- DLIST_ENTRY entry;
-} EVENT_TRACKER;
// Auxiliary functions
@@ -172,27 +159,10 @@
return (size_t)(difftime(get_time(NULL), (time_t)0));
}
-static EVENT_TRACKER* trackEvent(IOTHUB_MESSAGE_LIST* message, AMQP_TRANSPORT_INSTANCE* transport_state)
+static void trackEventInProgress(IOTHUB_MESSAGE_LIST* message, AMQP_TRANSPORT_INSTANCE* transport_state)
{
- EVENT_TRACKER* event_tracker;
-
- if ((event_tracker = (EVENT_TRACKER*)malloc(sizeof(EVENT_TRACKER))) != NULL)
- {
- event_tracker->iothub_client_handle = transport_state->iothub_client_handle;
- event_tracker->message = message;
- event_tracker->time_sent = get_time(NULL);
- DList_InitializeListHead(&event_tracker->entry);
-
- DList_RemoveEntryList(&message->entry);
- DList_InsertTailList(&transport_state->inProgress, &event_tracker->entry);
- }
-
- return event_tracker;
-}
-
-static void destroyEventTracker(EVENT_TRACKER* event_tracker)
-{
- free(event_tracker);
+ DList_RemoveEntryList(&message->entry);
+ DList_InsertTailList(&transport_state->inProgress, &message->entry);
}
static IOTHUB_MESSAGE_LIST* getNextEventToSend(AMQP_TRANSPORT_INSTANCE* transport_state)
@@ -212,22 +182,21 @@
return message;
}
-static int isEventInInProgressList(EVENT_TRACKER* event_tracker)
+static int isEventInInProgressList(IOTHUB_MESSAGE_LIST* message)
{
- return !DList_IsListEmpty(&event_tracker->entry);
+ return !DList_IsListEmpty(&message->entry);
}
-static void removeEventFromInProgressList(EVENT_TRACKER* event_tracker)
+static void removeEventFromInProgressList(IOTHUB_MESSAGE_LIST* message)
{
- DList_RemoveEntryList(&event_tracker->entry);
- DList_InitializeListHead(&event_tracker->entry);
+ DList_RemoveEntryList(&message->entry);
+ DList_InitializeListHead(&message->entry);
}
-static void rollEventBackToWaitList(EVENT_TRACKER* event_tracker, AMQP_TRANSPORT_INSTANCE* transport_state)
+static void rollEventBackToWaitList(IOTHUB_MESSAGE_LIST* message, AMQP_TRANSPORT_INSTANCE* transport_state)
{
- removeEventFromInProgressList(event_tracker);
- DList_InsertTailList(transport_state->waitingToSend, &event_tracker->message->entry);
- destroyEventTracker(event_tracker);
+ removeEventFromInProgressList(message);
+ DList_InsertTailList(transport_state->waitingToSend, &message->entry);
}
static void rollEventsBackToWaitList(AMQP_TRANSPORT_INSTANCE* transport_state)
@@ -236,15 +205,15 @@
while (entry != &transport_state->inProgress)
{
- EVENT_TRACKER* event_tracker = containingRecord(entry, EVENT_TRACKER, entry);
+ IOTHUB_MESSAGE_LIST* message = containingRecord(entry, IOTHUB_MESSAGE_LIST, entry);
entry = entry->Blink;
- rollEventBackToWaitList(event_tracker, transport_state);
+ rollEventBackToWaitList(message, transport_state);
}
}
static void on_message_send_complete(void* context, MESSAGE_SEND_RESULT send_result)
{
- EVENT_TRACKER* event_tracker = (EVENT_TRACKER*)context;
+ IOTHUB_MESSAGE_LIST* message = (IOTHUB_MESSAGE_LIST*)context;
IOTHUB_CLIENT_RESULT iot_hub_send_result;
@@ -254,35 +223,35 @@
iot_hub_send_result = IOTHUB_CLIENT_CONFIRMATION_OK;
}
// Codes_SRS_IOTHUBTRANSPORTAMQP_09_143: [The callback 'on_message_send_complete' shall pass to the upper layer callback an IOTHUB_CLIENT_CONFIRMATION_ERROR if the result received is MESSAGE_SEND_ERROR]
- else if (send_result == MESSAGE_SEND_ERROR)
+ else
{
iot_hub_send_result = IOTHUB_CLIENT_CONFIRMATION_ERROR;
}
// Codes_SRS_IOTHUBTRANSPORTAMQP_09_102: [The callback 'on_message_send_complete' shall invoke the upper layer callback for message received if provided]
- if (event_tracker->message->callback != NULL)
+ if (message->callback != NULL)
{
- event_tracker->message->callback(iot_hub_send_result, event_tracker->message->context);
+ message->callback(iot_hub_send_result, message->context);
}
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_100: [The callback 'on_message_send_complete' shall remove the target message from the in-progress list after the upper layer callback]
+ if (isEventInInProgressList(message))
+ {
+ removeEventFromInProgressList(message);
+ }
+
// Codes_SRS_IOTHUBTRANSPORTAMQP_09_151: [The callback 'on_message_send_complete' shall destroy the message handle (IOTHUB_MESSAGE_HANDLE) using IoTHubMessage_Destroy()]
- IoTHubMessage_Destroy(event_tracker->message->messageHandle);
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_152: [The callback 'on_message_send_complete' shall destroy the IOTHUB_MESSAGE_LIST instance]
- free(event_tracker->message);
+ IoTHubMessage_Destroy(message->messageHandle);
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_100: [The callback 'on_message_send_complete' shall remove the target message from the in-progress list after the upper layer callback]
- if (isEventInInProgressList(event_tracker))
- {
- removeEventFromInProgressList(event_tracker);
- destroyEventTracker(event_tracker);
- }
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_152: [The callback 'on_message_send_complete' shall destroy the IOTHUB_MESSAGE_LIST instance]
+ free(message);
}
static void on_put_token_complete(void* context, CBS_OPERATION_RESULT operation_result, unsigned int status_code, const char* status_description)
{
AMQP_TRANSPORT_INSTANCE* transportState = (AMQP_TRANSPORT_INSTANCE*)context;
- if (operation_result == OPERATION_RESULT_OK)
+ if (operation_result == CBS_OPERATION_RESULT_OK)
{
transportState->cbs_state = CBS_STATE_AUTHENTICATED;
}
@@ -535,26 +504,6 @@
return ((getSecondsSinceEpoch() - transport_state->current_sas_token_create_time) * 1000 >= transport_state->cbs_request_timeout) ? RESULT_TIMEOUT : RESULT_OK;
}
-static void handleEventSendTimeouts(AMQP_TRANSPORT_INSTANCE* transport_state)
-{
- time_t current_time = get_time(NULL);
- PDLIST_ENTRY entry = transport_state->inProgress.Flink;
-
- while (entry != &transport_state->inProgress)
- {
- EVENT_TRACKER* event_tracker = containingRecord(entry, EVENT_TRACKER, entry);
-
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_085: [IoTHubTransportAMQP_DoWork shall attempt to send all the queued messages for up to 'message_send_timeout' milliseconds]
- if (difftime(current_time, event_tracker->time_sent) * 1000 >= transport_state->message_send_timeout)
- {
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_120: [If a 'message_send_timeout' occurs the timed out events removed from the inProgress and the upper layer notified of the send error]
- on_message_send_complete(event_tracker, MESSAGE_SEND_ERROR);
- }
-
- entry = entry->Flink;
- }
-}
-
static void attachDeviceClientTypeToLink(LINK_HANDLE link)
{
fields attach_properties;
@@ -897,17 +846,14 @@
IOTHUBMESSAGE_CONTENT_TYPE contentType = IoTHubMessage_GetContentType(message->messageHandle);
const unsigned char* messageContent;
size_t messageContentSize;
- MESSAGE_HANDLE amqp_message;
+ MESSAGE_HANDLE amqp_message = NULL;
bool is_message_error = false;
- EVENT_TRACKER* event_tracker;
// Codes_SRS_IOTHUBTRANSPORTAMQP_09_086: [IoTHubTransportAMQP_DoWork shall move queued events to an "in-progress" list right before processing them for sending]
- if ((event_tracker = trackEvent(message, transport_state)) == NULL)
- {
- LogError("Failed tracking the event to be sent.\r\n");
- }
+ trackEventInProgress(message, transport_state);
+
// Codes_SRS_IOTHUBTRANSPORTAMQP_09_087: [If the event contains a message of type IOTHUBMESSAGE_BYTEARRAY, IoTHubTransportAMQP_DoWork shall obtain its char* representation and size using IoTHubMessage_GetByteArray()]
- else if (contentType == IOTHUBMESSAGE_BYTEARRAY &&
+ if (contentType == IOTHUBMESSAGE_BYTEARRAY &&
IoTHubMessage_GetByteArray(message->messageHandle, &messageContent, &messageContentSize) != IOTHUB_MESSAGE_OK)
{
LogError("Failed getting the BYTE array representation of the event content to be sent.\r\n");
@@ -959,7 +905,7 @@
else
{
// Codes_SRS_IOTHUBTRANSPORTAMQP_09_097: [IoTHubTransportAMQP_DoWork shall pass the encoded AMQP message to AMQP for sending (along with on_message_send_complete callback) using messagesender_send()]
- if (messagesender_send(transport_state->message_sender, amqp_message, on_message_send_complete, event_tracker) != RESULT_OK)
+ if (messagesender_send(transport_state->message_sender, amqp_message, on_message_send_complete, message) != RESULT_OK)
{
LogError("Failed sending the AMQP message.\r\n");
}
@@ -979,26 +925,19 @@
if (result != RESULT_OK)
{
- if (event_tracker == NULL)
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_088: [If IoTHubMessage_GetByteArray() fails, IoTHubTransportAMQP_DoWork shall remove the event from the in-progress list and invoke the upper layer callback reporting the error]
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_091: [If IoTHubMessage_GetString() fails, IoTHubTransportAMQP_DoWork shall remove the event from the in-progress list and invoke the upper layer callback reporting the error]
+ if (is_message_error)
{
- break;
+ on_message_send_complete(message, MESSAGE_SEND_ERROR);
}
else
{
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_088: [If IoTHubMessage_GetByteArray() fails, IoTHubTransportAMQP_DoWork shall remove the event from the in-progress list and invoke the upper layer callback reporting the error]
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_091: [If IoTHubMessage_GetString() fails, IoTHubTransportAMQP_DoWork shall remove the event from the in-progress list and invoke the upper layer callback reporting the error]
- if (is_message_error)
- {
- on_message_send_complete(event_tracker, MESSAGE_SEND_ERROR);
- }
- else
- {
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_111: [If message_create() fails, IoTHubTransportAMQP_DoWork notify the failure, roll back the event to waitToSent list and return]
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_112: [If message_add_body_amqp_data() fails, IoTHubTransportAMQP_DoWork notify the failure, roll back the event to waitToSent list and return]
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_113: [If messagesender_send() fails, IoTHubTransportAMQP_DoWork notify the failure, roll back the event to waitToSent list and return]
- rollEventBackToWaitList(event_tracker, transport_state);
- break;
- }
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_111: [If message_create() fails, IoTHubTransportAMQP_DoWork notify the failure, roll back the event to waitToSent list and return]
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_112: [If message_add_body_amqp_data() fails, IoTHubTransportAMQP_DoWork notify the failure, roll back the event to waitToSent list and return]
+ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_113: [If messagesender_send() fails, IoTHubTransportAMQP_DoWork notify the failure, roll back the event to waitToSent list and return]
+ rollEventBackToWaitList(message, transport_state);
+ break;
}
}
}
@@ -1168,9 +1107,6 @@
// Codes_SRS_IOTHUBTRANSPORTAMQP_09_129 : [IoTHubTransportAMQP_Create shall set parameter transport_state->cbs_request_timeout with the default value of 30000 (milliseconds).]
transport_state->cbs_request_timeout = DEFAULT_CBS_REQUEST_TIMEOUT_MS;
-
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_130 : [IoTHubTransportAMQP_Create shall set parameter transport_state->message_send_timeout with the default value of 300000 (milliseconds).]
- transport_state->message_send_timeout = DEFAULT_MESSAGE_SEND_TIMEOUT_MS;
}
}
}
@@ -1317,8 +1253,6 @@
// Codes_SRS_IOTHUBTRANSPORTAMQP_09_103: [IoTHubTransportAMQP_DoWork shall invoke connection_dowork() on AMQP for triggering sending and receiving messages]
connection_dowork(transport_state->connection);
}
-
- handleEventSendTimeouts(transport_state);
}
}
@@ -1432,12 +1366,6 @@
transport_state->cbs_request_timeout = *((size_t*)value);
result = IOTHUB_CLIENT_OK;
}
- // Codes_SRS_IOTHUBTRANSPORTAMQP_09_149: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "message_send_timeout", returning IOTHUB_CLIENT_OK]
- else if (strcmp("message_send_timeout", option) == 0)
- {
- transport_state->message_send_timeout = *((size_t*)value);
- result = IOTHUB_CLIENT_OK;
- }
// Codes_SRS_IOTHUBTRANSPORTAMQP_09_047: [If the option name does not match one of the options handled by this module, then IoTHubTransportAMQP_SetOption shall get the handle to the XIO and invoke the xio_setoption passing down the option name and value parameters.]
else
{
@@ -1466,11 +1394,11 @@
return result;
}
-static IOTHUB_DEVICE_HANDLE IoTHubTransportAMQ_Register(TRANSPORT_LL_HANDLE handle, const char* deviceId, const char* deviceKey, IOTHUB_CLIENT_LL_HANDLE iotHubClientHandle, PDLIST_ENTRY waitingToSend)
+static IOTHUB_DEVICE_HANDLE IoTHubTransportAMQP_Register(TRANSPORT_LL_HANDLE handle, const char* deviceId, const char* deviceKey, IOTHUB_CLIENT_LL_HANDLE iotHubClientHandle, PDLIST_ENTRY waitingToSend)
{
IOTHUB_DEVICE_HANDLE result;
- // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_001: [IoTHubTransportAMQ_Register shall return NULL if deviceId, deviceKey or waitingToSend are NULL.]
- // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_005: [IoTHubTransportAMQ_Register shall return NULL if the TRANSPORT_LL_HANDLE is NULL.]
+ // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_001: [IoTHubTransportAMQP_Register shall return NULL if deviceId, deviceKey or waitingToSend are NULL.]
+ // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_005: [IoTHubTransportAMQP_Register shall return NULL if the TRANSPORT_LL_HANDLE is NULL.]
if ((handle == NULL) || (deviceId == NULL) || (deviceKey == NULL) || (waitingToSend == NULL))
{
result = NULL;
@@ -1487,7 +1415,7 @@
}
else
{
- // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_002: [IoTHubTransportAMQ_Register shall return NULL if deviceId or deviceKey do not match the deviceId and deviceKey passed in during IoTHubTransportAMQP_Create.]
+ // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_002: [IoTHubTransportAMQP_Register shall return NULL if deviceId or deviceKey do not match the deviceId and deviceKey passed in during IoTHubTransportAMQP_Create.]
if (strcmp(STRING_c_str(transport_state->devicesPath), STRING_c_str(devicesPath)) != 0)
{
LogError("Attemping to add new device to AMQP transport, not allowed.");
@@ -1508,7 +1436,7 @@
else
{
transport_state->isRegistered = true;
- // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_003: [IoTHubTransportAMQ_Register shall return the TRANSPORT_LL_HANDLE as the IOTHUB_DEVICE_HANDLE.]
+ // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_003: [IoTHubTransportAMQP_Register shall return the TRANSPORT_LL_HANDLE as the IOTHUB_DEVICE_HANDLE.]
result = (IOTHUB_DEVICE_HANDLE)handle;
}
}
@@ -1519,8 +1447,8 @@
return result;
}
-// Codes_SRS_IOTHUBTRANSPORTUAMQP_17_004: [IoTHubTransportAMQ_Unregister shall return.]
-static void IoTHubTransportAMQ_Unregister(IOTHUB_DEVICE_HANDLE deviceHandle)
+// Codes_SRS_IOTHUBTRANSPORTUAMQP_17_004: [IoTHubTransportAMQP_Unregister shall return.]
+static void IoTHubTransportAMQP_Unregister(IOTHUB_DEVICE_HANDLE deviceHandle)
{
if (deviceHandle != NULL)
{
@@ -1534,8 +1462,8 @@
IoTHubTransportAMQP_SetOption,
IoTHubTransportAMQP_Create,
IoTHubTransportAMQP_Destroy,
- IoTHubTransportAMQ_Register,
- IoTHubTransportAMQ_Unregister,
+ IoTHubTransportAMQP_Register,
+ IoTHubTransportAMQP_Unregister,
IoTHubTransportAMQP_Subscribe,
IoTHubTransportAMQP_Unsubscribe,
IoTHubTransportAMQP_DoWork,
--- a/iothubtransportamqp.h Fri Mar 25 15:59:44 2016 -0700 +++ b/iothubtransportamqp.h Fri Apr 08 13:23:54 2016 -0700 @@ -4,7 +4,8 @@ #ifndef IOTHUBTRANSPORTAMQP_H #define IOTHUBTRANSPORTAMQP_H -#include "iothub_client_private.h" +#include "iothub_client_ll.h" +#include "iothub_transport_ll.h" #ifdef __cplusplus extern "C"
