Microsoft Azure IoTHub client AMQP transport

Dependents:   sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more

This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks

Committer:
AzureIoTClient
Date:
Mon Nov 02 19:21:13 2015 -0800
Revision:
5:8d58d20699dd
Parent:
4:57e049bce51e
Child:
7:07bc440836b3
v1.0.0-preview.4

Who changed what in which revision?

UserRevisionLine numberNew contents of line
AzureIoTClient 4:57e049bce51e 1 // Copyright (c) Microsoft. All rights reserved.
AzureIoTClient 4:57e049bce51e 2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
AzureIoTClient 4:57e049bce51e 3
AzureIoTClient 4:57e049bce51e 4 #include <stdlib.h>
AzureIoTClient 4:57e049bce51e 5 #ifdef _CRTDBG_MAP_ALLOC
AzureIoTClient 4:57e049bce51e 6 #include <crtdbg.h>
AzureIoTClient 4:57e049bce51e 7 #endif
AzureIoTClient 4:57e049bce51e 8 #include "gballoc.h"
AzureIoTClient 4:57e049bce51e 9
AzureIoTClient 4:57e049bce51e 10 #include "proton/message.h"
AzureIoTClient 4:57e049bce51e 11 #include "proton/messenger.h"
AzureIoTClient 4:57e049bce51e 12
AzureIoTClient 4:57e049bce51e 13 #include "iot_logging.h"
AzureIoTClient 4:57e049bce51e 14 #include "strings.h"
AzureIoTClient 4:57e049bce51e 15 #include "urlencode.h"
AzureIoTClient 4:57e049bce51e 16 #include "doublylinkedlist.h"
AzureIoTClient 4:57e049bce51e 17 #include "crt_abstractions.h"
AzureIoTClient 4:57e049bce51e 18 #include "sastoken.h"
AzureIoTClient 4:57e049bce51e 19
AzureIoTClient 4:57e049bce51e 20 #include "iothub_client_ll.h"
AzureIoTClient 4:57e049bce51e 21 #include "iothub_client_private.h"
AzureIoTClient 4:57e049bce51e 22 #include "iothubtransportamqp.h"
AzureIoTClient 4:57e049bce51e 23 #include "iothub_client_amqp_internal.h"
AzureIoTClient 4:57e049bce51e 24
AzureIoTClient 4:57e049bce51e 25 static const size_t NUMBER_OF_MESSENGER_STOP_TRIES = PROTON_MESSENGER_STOP_TRIES;
AzureIoTClient 4:57e049bce51e 26 static const size_t MAXIMUM_EVENT_LENGTH = PROTON_MAXIMUM_EVENT_LENGTH;
AzureIoTClient 4:57e049bce51e 27
AzureIoTClient 4:57e049bce51e 28
AzureIoTClient 4:57e049bce51e 29 #define amqp_batch_result_values \
AzureIoTClient 4:57e049bce51e 30 amqp_batch_nowork, \
AzureIoTClient 4:57e049bce51e 31 amqp_batch_batch, \
AzureIoTClient 4:57e049bce51e 32 amqp_batch_nobatch, \
AzureIoTClient 4:57e049bce51e 33 amqp_batch_error \
AzureIoTClient 4:57e049bce51e 34
AzureIoTClient 4:57e049bce51e 35 DEFINE_ENUM(amqp_batch_result, amqp_batch_result_values);
AzureIoTClient 4:57e049bce51e 36
AzureIoTClient 4:57e049bce51e 37 static const int IO_PROCESSING_YIELD_IN_MILLISECONDS = PROTON_PROCESSING_YIELD_IN_MILLISECONDS;
AzureIoTClient 4:57e049bce51e 38
AzureIoTClient 4:57e049bce51e 39 static void processReceives(TRANSPORT_HANDLE handle);
AzureIoTClient 4:57e049bce51e 40
AzureIoTClient 4:57e049bce51e 41 typedef struct MESSENGER_CONTAINER_TAG
AzureIoTClient 4:57e049bce51e 42 {
AzureIoTClient 4:57e049bce51e 43 //
AzureIoTClient 4:57e049bce51e 44 // This links all of these records onto the messageCorral in the transport state.
AzureIoTClient 4:57e049bce51e 45 DLIST_ENTRY entry;
AzureIoTClient 4:57e049bce51e 46
AzureIoTClient 4:57e049bce51e 47 //
AzureIoTClient 4:57e049bce51e 48 // This is a messenger that couldn't be stopped. (You have to admire its initiative.) We'll keep trying to stop it when
AzureIoTClient 4:57e049bce51e 49 // we have a spare moment.
AzureIoTClient 4:57e049bce51e 50 pn_messenger_t* messengerThatDidNotStop;
AzureIoTClient 4:57e049bce51e 51 } MESSENGER_CONTAINER, *PMESSENGER_CONTAINER;
AzureIoTClient 4:57e049bce51e 52
AzureIoTClient 4:57e049bce51e 53 //
AzureIoTClient 4:57e049bce51e 54 // Transport specific structure used to contain everything needed to send an event item.
AzureIoTClient 4:57e049bce51e 55 //
AzureIoTClient 4:57e049bce51e 56 typedef struct AMQP_WORK_ITEM_TAG
AzureIoTClient 4:57e049bce51e 57 {
AzureIoTClient 4:57e049bce51e 58 //
AzureIoTClient 4:57e049bce51e 59 // At what time should we give up on sending this message.
AzureIoTClient 4:57e049bce51e 60 size_t expiry;
AzureIoTClient 4:57e049bce51e 61
AzureIoTClient 4:57e049bce51e 62 //
AzureIoTClient 4:57e049bce51e 63 // Used by the transport to determine if the IO operation is complete.
AzureIoTClient 4:57e049bce51e 64 pn_tracker_t tracker;
AzureIoTClient 4:57e049bce51e 65
AzureIoTClient 4:57e049bce51e 66 //
AzureIoTClient 4:57e049bce51e 67 // This acts as a listhead for all of the messages that make up one transfer frame.
AzureIoTClient 4:57e049bce51e 68 // If no batching is being done then this list will have at most one item. If batching
AzureIoTClient 4:57e049bce51e 69 // is being done then this could be lengthy!
AzureIoTClient 4:57e049bce51e 70 DLIST_ENTRY eventMessages;
AzureIoTClient 4:57e049bce51e 71
AzureIoTClient 4:57e049bce51e 72 //
AzureIoTClient 4:57e049bce51e 73 // This is used to hold this work item either on the work in progress list or on the available work item list.
AzureIoTClient 4:57e049bce51e 74 DLIST_ENTRY link;
AzureIoTClient 4:57e049bce51e 75 } AMQP_WORK_ITEM, *PAMQP_WORK_ITEM;
AzureIoTClient 4:57e049bce51e 76
AzureIoTClient 4:57e049bce51e 77 static void rollbackEvent(PAMQP_TRANSPORT_STATE transportState);
AzureIoTClient 4:57e049bce51e 78
AzureIoTClient 4:57e049bce51e 79 static bool checkForErrorsThenWork(PAMQP_TRANSPORT_STATE transportState)
AzureIoTClient 4:57e049bce51e 80 {
AzureIoTClient 4:57e049bce51e 81 pn_error_t* errorStruct;
AzureIoTClient 4:57e049bce51e 82 bool result;
AzureIoTClient 4:57e049bce51e 83 errorStruct = pn_messenger_error(transportState->messenger);
AzureIoTClient 4:57e049bce51e 84 if (pn_error_code(errorStruct) != 0)
AzureIoTClient 4:57e049bce51e 85 {
AzureIoTClient 4:57e049bce51e 86 LogError("An error was detected in the networking manager: %d\r\n", pn_error_code(errorStruct));
AzureIoTClient 4:57e049bce51e 87 transportState->messengerInitialized = false;
AzureIoTClient 4:57e049bce51e 88 result = false;
AzureIoTClient 4:57e049bce51e 89 }
AzureIoTClient 4:57e049bce51e 90 else
AzureIoTClient 4:57e049bce51e 91 {
AzureIoTClient 4:57e049bce51e 92 int protonResult;
AzureIoTClient 4:57e049bce51e 93 protonResult = pn_messenger_work(transportState->messenger, IO_PROCESSING_YIELD_IN_MILLISECONDS);
AzureIoTClient 4:57e049bce51e 94 if ((protonResult < 0) && (protonResult != PN_TIMEOUT))
AzureIoTClient 4:57e049bce51e 95 {
AzureIoTClient 4:57e049bce51e 96 LogError("A networking error occured. Proton error code: %d\r\n", protonResult);
AzureIoTClient 4:57e049bce51e 97 transportState->messengerInitialized = false;
AzureIoTClient 4:57e049bce51e 98 result = false;
AzureIoTClient 4:57e049bce51e 99 }
AzureIoTClient 4:57e049bce51e 100 else
AzureIoTClient 4:57e049bce51e 101 {
AzureIoTClient 4:57e049bce51e 102 result = true;
AzureIoTClient 4:57e049bce51e 103 }
AzureIoTClient 4:57e049bce51e 104 }
AzureIoTClient 4:57e049bce51e 105 return result;
AzureIoTClient 4:57e049bce51e 106 }
AzureIoTClient 5:8d58d20699dd 107
AzureIoTClient 5:8d58d20699dd 108 static int setRecvMessageIds(IOTHUB_MESSAGE_HANDLE ioTMessage, pn_message_t* msg)
AzureIoTClient 5:8d58d20699dd 109 {
AzureIoTClient 5:8d58d20699dd 110 int result;
AzureIoTClient 5:8d58d20699dd 111
AzureIoTClient 5:8d58d20699dd 112 // Function can not fail
AzureIoTClient 5:8d58d20699dd 113 pn_atom_t msgIdInfo = pn_message_get_id(msg);
AzureIoTClient 5:8d58d20699dd 114 if (msgIdInfo.type != PN_NULL)
AzureIoTClient 5:8d58d20699dd 115 {
AzureIoTClient 5:8d58d20699dd 116 if (msgIdInfo.type != PN_STRING)
AzureIoTClient 5:8d58d20699dd 117 {
AzureIoTClient 5:8d58d20699dd 118 LogError("Message Id Failure: Invalid message id Type %d\r\n", msgIdInfo.type);
AzureIoTClient 5:8d58d20699dd 119 result = __LINE__;
AzureIoTClient 5:8d58d20699dd 120 }
AzureIoTClient 5:8d58d20699dd 121 else
AzureIoTClient 5:8d58d20699dd 122 {
AzureIoTClient 5:8d58d20699dd 123 if (IoTHubMessage_SetMessageId(ioTMessage, msgIdInfo.u.as_bytes.start) != IOTHUB_MESSAGE_OK)
AzureIoTClient 5:8d58d20699dd 124 {
AzureIoTClient 5:8d58d20699dd 125 result = __LINE__;
AzureIoTClient 5:8d58d20699dd 126 }
AzureIoTClient 5:8d58d20699dd 127 else
AzureIoTClient 5:8d58d20699dd 128 {
AzureIoTClient 5:8d58d20699dd 129 result = 0;
AzureIoTClient 5:8d58d20699dd 130 }
AzureIoTClient 5:8d58d20699dd 131 }
AzureIoTClient 5:8d58d20699dd 132 }
AzureIoTClient 5:8d58d20699dd 133 else
AzureIoTClient 5:8d58d20699dd 134 {
AzureIoTClient 5:8d58d20699dd 135 // NULL values mean it's not set
AzureIoTClient 5:8d58d20699dd 136 result = 0;
AzureIoTClient 5:8d58d20699dd 137 }
AzureIoTClient 5:8d58d20699dd 138
AzureIoTClient 5:8d58d20699dd 139 if (result == 0)
AzureIoTClient 5:8d58d20699dd 140 {
AzureIoTClient 5:8d58d20699dd 141 pn_atom_t corrIdInfo = pn_message_get_correlation_id(msg);
AzureIoTClient 5:8d58d20699dd 142 if (corrIdInfo.type != PN_NULL)
AzureIoTClient 5:8d58d20699dd 143 {
AzureIoTClient 5:8d58d20699dd 144 if (corrIdInfo.type != PN_STRING)
AzureIoTClient 5:8d58d20699dd 145 {
AzureIoTClient 5:8d58d20699dd 146 LogError("Message Id Failure: Invalid correlation id Type %d\r\n", corrIdInfo.type);
AzureIoTClient 5:8d58d20699dd 147 result = __LINE__;
AzureIoTClient 5:8d58d20699dd 148 }
AzureIoTClient 5:8d58d20699dd 149 else
AzureIoTClient 5:8d58d20699dd 150 {
AzureIoTClient 5:8d58d20699dd 151 if (IoTHubMessage_SetCorrelationId(ioTMessage, corrIdInfo.u.as_bytes.start) != IOTHUB_MESSAGE_OK)
AzureIoTClient 5:8d58d20699dd 152 {
AzureIoTClient 5:8d58d20699dd 153 result = __LINE__;
AzureIoTClient 5:8d58d20699dd 154 }
AzureIoTClient 5:8d58d20699dd 155 else
AzureIoTClient 5:8d58d20699dd 156 {
AzureIoTClient 5:8d58d20699dd 157 result = 0;
AzureIoTClient 5:8d58d20699dd 158 }
AzureIoTClient 5:8d58d20699dd 159 }
AzureIoTClient 5:8d58d20699dd 160 }
AzureIoTClient 5:8d58d20699dd 161 else
AzureIoTClient 5:8d58d20699dd 162 {
AzureIoTClient 5:8d58d20699dd 163 // NULL values mean it's not set
AzureIoTClient 5:8d58d20699dd 164 result = 0;
AzureIoTClient 5:8d58d20699dd 165 }
AzureIoTClient 5:8d58d20699dd 166 }
AzureIoTClient 5:8d58d20699dd 167 return result;
AzureIoTClient 5:8d58d20699dd 168 }
AzureIoTClient 5:8d58d20699dd 169
AzureIoTClient 5:8d58d20699dd 170 static int setSendMessageIds(PDLIST_ENTRY messagesEntry, pn_message_t* msg)
AzureIoTClient 5:8d58d20699dd 171 {
AzureIoTClient 5:8d58d20699dd 172 int result;
AzureIoTClient 5:8d58d20699dd 173
AzureIoTClient 5:8d58d20699dd 174 IOTHUB_MESSAGE_LIST* currentMessage = containingRecord(messagesEntry->Flink, IOTHUB_MESSAGE_LIST, entry);
AzureIoTClient 5:8d58d20699dd 175 const char* messageId = IoTHubMessage_GetMessageId(currentMessage->messageHandle);
AzureIoTClient 5:8d58d20699dd 176 if (messageId != NULL)
AzureIoTClient 5:8d58d20699dd 177 {
AzureIoTClient 5:8d58d20699dd 178 pn_bytes_t dataBytes = pn_bytes(strlen(messageId), messageId);
AzureIoTClient 5:8d58d20699dd 179 pn_atom_t pnMsgId;
AzureIoTClient 5:8d58d20699dd 180 pnMsgId.type = PN_STRING;
AzureIoTClient 5:8d58d20699dd 181 pnMsgId.u.as_bytes = dataBytes;
AzureIoTClient 5:8d58d20699dd 182 if (pn_message_set_id(msg, pnMsgId) == 0)
AzureIoTClient 5:8d58d20699dd 183 {
AzureIoTClient 5:8d58d20699dd 184 result = 0;
AzureIoTClient 5:8d58d20699dd 185 }
AzureIoTClient 5:8d58d20699dd 186 else
AzureIoTClient 5:8d58d20699dd 187 {
AzureIoTClient 5:8d58d20699dd 188 LogError("Failure setting pn_message_set_id.\r\n");
AzureIoTClient 5:8d58d20699dd 189 result = __LINE__;
AzureIoTClient 5:8d58d20699dd 190 }
AzureIoTClient 5:8d58d20699dd 191 }
AzureIoTClient 5:8d58d20699dd 192 else
AzureIoTClient 5:8d58d20699dd 193 {
AzureIoTClient 5:8d58d20699dd 194 result = 0;
AzureIoTClient 5:8d58d20699dd 195 }
AzureIoTClient 5:8d58d20699dd 196
AzureIoTClient 5:8d58d20699dd 197 if (result == 0)
AzureIoTClient 5:8d58d20699dd 198 {
AzureIoTClient 5:8d58d20699dd 199 const char* correlationId = IoTHubMessage_GetCorrelationId(currentMessage->messageHandle);
AzureIoTClient 5:8d58d20699dd 200 if (correlationId != NULL)
AzureIoTClient 5:8d58d20699dd 201 {
AzureIoTClient 5:8d58d20699dd 202 pn_bytes_t dataBytes = pn_bytes(strlen(correlationId), correlationId);
AzureIoTClient 5:8d58d20699dd 203 pn_atom_t pnCorrelationId;
AzureIoTClient 5:8d58d20699dd 204 pnCorrelationId.type = PN_STRING;
AzureIoTClient 5:8d58d20699dd 205 pnCorrelationId.u.as_bytes = dataBytes;
AzureIoTClient 5:8d58d20699dd 206 if (pn_message_set_correlation_id(msg, pnCorrelationId) == 0)
AzureIoTClient 5:8d58d20699dd 207 {
AzureIoTClient 5:8d58d20699dd 208 result = 0;
AzureIoTClient 5:8d58d20699dd 209 }
AzureIoTClient 5:8d58d20699dd 210 else
AzureIoTClient 5:8d58d20699dd 211 {
AzureIoTClient 5:8d58d20699dd 212 LogError("Failure setting pn_message_set_correlation_id.\r\n");
AzureIoTClient 5:8d58d20699dd 213 result = __LINE__;
AzureIoTClient 5:8d58d20699dd 214 }
AzureIoTClient 5:8d58d20699dd 215 }
AzureIoTClient 5:8d58d20699dd 216 else
AzureIoTClient 5:8d58d20699dd 217 {
AzureIoTClient 5:8d58d20699dd 218 result = 0;
AzureIoTClient 5:8d58d20699dd 219 }
AzureIoTClient 5:8d58d20699dd 220 }
AzureIoTClient 5:8d58d20699dd 221 return result;
AzureIoTClient 5:8d58d20699dd 222 }
AzureIoTClient 5:8d58d20699dd 223
AzureIoTClient 4:57e049bce51e 224 /* Code_SRS_IOTHUBTRANSPORTTAMQP_07_001: [All IoTHubMessage_Properties shall be enumerated and entered in the pn_message_properties Map.] */
AzureIoTClient 4:57e049bce51e 225 static int setProperties(PDLIST_ENTRY messagesMakingUpBatch, pn_message_t* msg)
AzureIoTClient 4:57e049bce51e 226 {
AzureIoTClient 4:57e049bce51e 227 int result = 0;
AzureIoTClient 4:57e049bce51e 228 size_t index;
AzureIoTClient 4:57e049bce51e 229
AzureIoTClient 4:57e049bce51e 230 pn_data_t* propData;
AzureIoTClient 4:57e049bce51e 231 const char*const* keys;
AzureIoTClient 4:57e049bce51e 232 const char*const* values;
AzureIoTClient 4:57e049bce51e 233 size_t propertyCount = 0;
AzureIoTClient 4:57e049bce51e 234
AzureIoTClient 4:57e049bce51e 235 IOTHUB_MESSAGE_LIST* currentMessage = containingRecord(messagesMakingUpBatch->Flink, IOTHUB_MESSAGE_LIST, entry);
AzureIoTClient 4:57e049bce51e 236 MAP_HANDLE mapProperties = IoTHubMessage_Properties(currentMessage->messageHandle);
AzureIoTClient 4:57e049bce51e 237 if (mapProperties == NULL)
AzureIoTClient 4:57e049bce51e 238 {
AzureIoTClient 4:57e049bce51e 239 LogError("Failure IoTHubMessage_Properties.\r\n");
AzureIoTClient 4:57e049bce51e 240 result = __LINE__;
AzureIoTClient 4:57e049bce51e 241 }
AzureIoTClient 4:57e049bce51e 242 else if (Map_GetInternals(mapProperties, &keys, &values, &propertyCount) != MAP_OK)
AzureIoTClient 4:57e049bce51e 243 {
AzureIoTClient 4:57e049bce51e 244 LogError("Failure retrieving Map_GetInternals.\r\n");
AzureIoTClient 4:57e049bce51e 245 result = __LINE__;
AzureIoTClient 4:57e049bce51e 246 }
AzureIoTClient 4:57e049bce51e 247 else if (propertyCount > 0)
AzureIoTClient 4:57e049bce51e 248 {
AzureIoTClient 4:57e049bce51e 249 if ( (propData = pn_message_properties(msg) ) == NULL)
AzureIoTClient 4:57e049bce51e 250 {
AzureIoTClient 4:57e049bce51e 251 LogError("Failure pn_message_properties.\r\n");
AzureIoTClient 4:57e049bce51e 252 result = __LINE__;
AzureIoTClient 4:57e049bce51e 253 }
AzureIoTClient 4:57e049bce51e 254 else if (pn_data_put_map(propData) != 0)
AzureIoTClient 4:57e049bce51e 255 {
AzureIoTClient 4:57e049bce51e 256 LogError("Failure pn_data_put_map.\r\n");
AzureIoTClient 4:57e049bce51e 257 result = __LINE__;
AzureIoTClient 4:57e049bce51e 258 }
AzureIoTClient 4:57e049bce51e 259 else if (pn_data_enter(propData) == false )
AzureIoTClient 4:57e049bce51e 260 {
AzureIoTClient 4:57e049bce51e 261 LogError("Failure pn_data_enter.\r\n");
AzureIoTClient 4:57e049bce51e 262 result = __LINE__;
AzureIoTClient 4:57e049bce51e 263 }
AzureIoTClient 4:57e049bce51e 264 else
AzureIoTClient 4:57e049bce51e 265 {
AzureIoTClient 4:57e049bce51e 266 for (index = 0; index < propertyCount; index++)
AzureIoTClient 4:57e049bce51e 267 {
AzureIoTClient 4:57e049bce51e 268 if (pn_data_put_symbol(propData, pn_bytes(strlen(keys[index]), keys[index])) != 0)
AzureIoTClient 4:57e049bce51e 269 {
AzureIoTClient 4:57e049bce51e 270 LogError("Failure pn_data_put_symbol.\r\n");
AzureIoTClient 4:57e049bce51e 271 break;
AzureIoTClient 4:57e049bce51e 272 }
AzureIoTClient 4:57e049bce51e 273 else if (pn_data_put_string(propData, pn_bytes(strlen(values[index]), values[index])) != 0)
AzureIoTClient 4:57e049bce51e 274 {
AzureIoTClient 4:57e049bce51e 275 LogError("Failure pn_data_put_string.\r\n");
AzureIoTClient 4:57e049bce51e 276 break;
AzureIoTClient 4:57e049bce51e 277 }
AzureIoTClient 4:57e049bce51e 278 }
AzureIoTClient 4:57e049bce51e 279 if (index != propertyCount)
AzureIoTClient 4:57e049bce51e 280 {
AzureIoTClient 4:57e049bce51e 281 result = __LINE__;
AzureIoTClient 4:57e049bce51e 282 }
AzureIoTClient 4:57e049bce51e 283 else if (!pn_data_exit(propData) )
AzureIoTClient 4:57e049bce51e 284 {
AzureIoTClient 4:57e049bce51e 285 LogError("Failure pn_data_exit.\r\n");
AzureIoTClient 4:57e049bce51e 286 result = __LINE__;
AzureIoTClient 4:57e049bce51e 287 }
AzureIoTClient 4:57e049bce51e 288 else
AzureIoTClient 4:57e049bce51e 289 {
AzureIoTClient 4:57e049bce51e 290 result = 0;
AzureIoTClient 4:57e049bce51e 291 }
AzureIoTClient 4:57e049bce51e 292 }
AzureIoTClient 4:57e049bce51e 293 }
AzureIoTClient 4:57e049bce51e 294 return result;
AzureIoTClient 4:57e049bce51e 295 }
AzureIoTClient 4:57e049bce51e 296
AzureIoTClient 4:57e049bce51e 297 static int getMapString(pn_data_t* propData, pn_bytes_t* mapValue)
AzureIoTClient 4:57e049bce51e 298 {
AzureIoTClient 4:57e049bce51e 299 int result;
AzureIoTClient 4:57e049bce51e 300 if (pn_data_next(propData) == true)
AzureIoTClient 4:57e049bce51e 301 {
AzureIoTClient 4:57e049bce51e 302 if (pn_data_type(propData) == PN_STRING)
AzureIoTClient 4:57e049bce51e 303 {
AzureIoTClient 4:57e049bce51e 304 *mapValue = pn_data_get_string(propData);
AzureIoTClient 4:57e049bce51e 305 result = 0;
AzureIoTClient 4:57e049bce51e 306 }
AzureIoTClient 4:57e049bce51e 307 else
AzureIoTClient 4:57e049bce51e 308 {
AzureIoTClient 4:57e049bce51e 309 LogError("Failure pn_data_type.\r\n");
AzureIoTClient 4:57e049bce51e 310 result = __LINE__;
AzureIoTClient 4:57e049bce51e 311 }
AzureIoTClient 4:57e049bce51e 312 }
AzureIoTClient 4:57e049bce51e 313 else
AzureIoTClient 4:57e049bce51e 314 {
AzureIoTClient 4:57e049bce51e 315 LogError("Failure pn_data_next.\r\n");
AzureIoTClient 4:57e049bce51e 316 result = __LINE__;
AzureIoTClient 4:57e049bce51e 317 }
AzureIoTClient 4:57e049bce51e 318 return result;
AzureIoTClient 4:57e049bce51e 319 }
AzureIoTClient 4:57e049bce51e 320
AzureIoTClient 4:57e049bce51e 321 static int getMapInt(pn_data_t* propData, int32_t* mapValue)
AzureIoTClient 4:57e049bce51e 322 {
AzureIoTClient 4:57e049bce51e 323 int result;
AzureIoTClient 4:57e049bce51e 324 if (pn_data_next(propData) == true)
AzureIoTClient 4:57e049bce51e 325 {
AzureIoTClient 4:57e049bce51e 326 if (pn_data_type(propData) == PN_INT)
AzureIoTClient 4:57e049bce51e 327 {
AzureIoTClient 4:57e049bce51e 328 *mapValue = pn_data_get_int(propData);
AzureIoTClient 4:57e049bce51e 329 result = 0;
AzureIoTClient 4:57e049bce51e 330 }
AzureIoTClient 4:57e049bce51e 331 else
AzureIoTClient 4:57e049bce51e 332 {
AzureIoTClient 4:57e049bce51e 333 LogError("Failure pn_data_type.\r\n");
AzureIoTClient 4:57e049bce51e 334 result = __LINE__;
AzureIoTClient 4:57e049bce51e 335 }
AzureIoTClient 4:57e049bce51e 336 }
AzureIoTClient 4:57e049bce51e 337 else
AzureIoTClient 4:57e049bce51e 338 {
AzureIoTClient 4:57e049bce51e 339 LogError("Failure pn_data_next.\r\n");
AzureIoTClient 4:57e049bce51e 340 result = __LINE__;
AzureIoTClient 4:57e049bce51e 341 }
AzureIoTClient 4:57e049bce51e 342 return result;
AzureIoTClient 4:57e049bce51e 343 }
AzureIoTClient 4:57e049bce51e 344
AzureIoTClient 4:57e049bce51e 345 /* Code_SRS_IOTHUBTRANSPORTTAMQP_07_002: [On messages the properties shall be retrieved from the message using pn_message_properties and will be entered in the IoTHubMessage_Properties Map.] */
AzureIoTClient 4:57e049bce51e 346 static int cloneProperties(IOTHUB_MESSAGE_HANDLE ioTMessage, pn_message_t* msg)
AzureIoTClient 4:57e049bce51e 347 {
AzureIoTClient 4:57e049bce51e 348 int result = 0;
AzureIoTClient 4:57e049bce51e 349 size_t index = 0;
AzureIoTClient 4:57e049bce51e 350 pn_data_t* propData = pn_message_properties(msg);
AzureIoTClient 4:57e049bce51e 351 if (propData == NULL)
AzureIoTClient 4:57e049bce51e 352 {
AzureIoTClient 4:57e049bce51e 353 LogError("Failure pn_message_properties.\r\n");
AzureIoTClient 4:57e049bce51e 354 result = __LINE__;
AzureIoTClient 4:57e049bce51e 355 }
AzureIoTClient 4:57e049bce51e 356 else if (pn_data_next(propData) == false )
AzureIoTClient 4:57e049bce51e 357 {
AzureIoTClient 4:57e049bce51e 358 // This should fail if there is no next sibling which means that there
AzureIoTClient 4:57e049bce51e 359 // are no property and it should continue
AzureIoTClient 4:57e049bce51e 360 result = 0;
AzureIoTClient 4:57e049bce51e 361 }
AzureIoTClient 4:57e049bce51e 362 else
AzureIoTClient 4:57e049bce51e 363 {
AzureIoTClient 4:57e049bce51e 364 size_t propertyCount = pn_data_get_map(propData)/2;
AzureIoTClient 4:57e049bce51e 365 if (propertyCount > 0)
AzureIoTClient 4:57e049bce51e 366 {
AzureIoTClient 4:57e049bce51e 367 if (pn_data_enter(propData) == false)
AzureIoTClient 4:57e049bce51e 368 {
AzureIoTClient 4:57e049bce51e 369 LogError("Failure pn_data_enter.\r\n");
AzureIoTClient 4:57e049bce51e 370 result = __LINE__;
AzureIoTClient 4:57e049bce51e 371 }
AzureIoTClient 4:57e049bce51e 372 else
AzureIoTClient 4:57e049bce51e 373 {
AzureIoTClient 4:57e049bce51e 374 // Get the Properties from the Message
AzureIoTClient 4:57e049bce51e 375 MAP_HANDLE properties = IoTHubMessage_Properties(ioTMessage);
AzureIoTClient 4:57e049bce51e 376 if (properties == NULL)
AzureIoTClient 4:57e049bce51e 377 {
AzureIoTClient 4:57e049bce51e 378 result = __LINE__;
AzureIoTClient 4:57e049bce51e 379 }
AzureIoTClient 4:57e049bce51e 380 else
AzureIoTClient 4:57e049bce51e 381 {
AzureIoTClient 4:57e049bce51e 382 for (index = 0; index < propertyCount; index++)
AzureIoTClient 4:57e049bce51e 383 {
AzureIoTClient 4:57e049bce51e 384 pn_bytes_t propValue;
AzureIoTClient 4:57e049bce51e 385 pn_bytes_t propName;
AzureIoTClient 4:57e049bce51e 386 if (getMapString(propData, &propValue) != 0)
AzureIoTClient 4:57e049bce51e 387 {
AzureIoTClient 4:57e049bce51e 388 result = __LINE__;
AzureIoTClient 4:57e049bce51e 389 break;
AzureIoTClient 4:57e049bce51e 390 }
AzureIoTClient 4:57e049bce51e 391 if (getMapString(propData, &propName) != 0)
AzureIoTClient 4:57e049bce51e 392 {
AzureIoTClient 4:57e049bce51e 393 result = __LINE__;
AzureIoTClient 4:57e049bce51e 394 break;
AzureIoTClient 4:57e049bce51e 395 }
AzureIoTClient 4:57e049bce51e 396
AzureIoTClient 4:57e049bce51e 397 if (Map_AddOrUpdate(properties, propValue.start, propName.start) != MAP_OK)
AzureIoTClient 4:57e049bce51e 398 {
AzureIoTClient 4:57e049bce51e 399 LogError("Failure Map_AddOrUpdate.\r\n");
AzureIoTClient 4:57e049bce51e 400 result = __LINE__;
AzureIoTClient 4:57e049bce51e 401 break;
AzureIoTClient 4:57e049bce51e 402 }
AzureIoTClient 4:57e049bce51e 403 }
AzureIoTClient 4:57e049bce51e 404 if (index != propertyCount)
AzureIoTClient 4:57e049bce51e 405 {
AzureIoTClient 4:57e049bce51e 406 result = __LINE__;
AzureIoTClient 4:57e049bce51e 407 }
AzureIoTClient 4:57e049bce51e 408 else if (!pn_data_exit(propData) )
AzureIoTClient 4:57e049bce51e 409 {
AzureIoTClient 4:57e049bce51e 410 LogError("Failure pn_data_exit.\r\n");
AzureIoTClient 4:57e049bce51e 411 result = __LINE__;
AzureIoTClient 4:57e049bce51e 412 }
AzureIoTClient 4:57e049bce51e 413 else
AzureIoTClient 4:57e049bce51e 414 {
AzureIoTClient 4:57e049bce51e 415 result = 0;
AzureIoTClient 4:57e049bce51e 416 }
AzureIoTClient 4:57e049bce51e 417 }
AzureIoTClient 4:57e049bce51e 418 }
AzureIoTClient 4:57e049bce51e 419 }
AzureIoTClient 4:57e049bce51e 420 }
AzureIoTClient 4:57e049bce51e 421 return result;
AzureIoTClient 4:57e049bce51e 422 }
AzureIoTClient 4:57e049bce51e 423
AzureIoTClient 4:57e049bce51e 424 static void putSecondListAfterHeadOfFirst(PDLIST_ENTRY first, PDLIST_ENTRY second)
AzureIoTClient 4:57e049bce51e 425 {
AzureIoTClient 4:57e049bce51e 426 DList_AppendTailList(first->Flink, second);
AzureIoTClient 4:57e049bce51e 427 DList_RemoveEntryList(second);
AzureIoTClient 4:57e049bce51e 428 DList_InitializeListHead(second); // Just to be tidy.
AzureIoTClient 4:57e049bce51e 429 }
AzureIoTClient 4:57e049bce51e 430
AzureIoTClient 4:57e049bce51e 431 static bool stopMessenger(pn_messenger_t* messenger)
AzureIoTClient 4:57e049bce51e 432 {
AzureIoTClient 4:57e049bce51e 433 bool succeeded;
AzureIoTClient 4:57e049bce51e 434 int result;
AzureIoTClient 4:57e049bce51e 435 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_021: [Invoke pn_messenger_stop on the messenger.]*/
AzureIoTClient 4:57e049bce51e 436 result = pn_messenger_stop(messenger);
AzureIoTClient 4:57e049bce51e 437 if (result == 0)
AzureIoTClient 4:57e049bce51e 438 {
AzureIoTClient 4:57e049bce51e 439 succeeded = true;
AzureIoTClient 4:57e049bce51e 440 }
AzureIoTClient 4:57e049bce51e 441 else if (result != PN_INPROGRESS)
AzureIoTClient 4:57e049bce51e 442 {
AzureIoTClient 4:57e049bce51e 443 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_023: [If pn_messenger_stop returns any result other than PN_INPROGRESS then place the messenger on a list for attempting to stop later.]*/
AzureIoTClient 4:57e049bce51e 444 succeeded = false;
AzureIoTClient 4:57e049bce51e 445 }
AzureIoTClient 4:57e049bce51e 446 else
AzureIoTClient 4:57e049bce51e 447 {
AzureIoTClient 4:57e049bce51e 448 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_024: [If pn_messenger_stop returns PN_INPROGRESS invoke pn_messenger_stopped a fixed number of time, seeking a return of true.]*/
AzureIoTClient 4:57e049bce51e 449 size_t numberOfTrys;
AzureIoTClient 4:57e049bce51e 450 succeeded = false;
AzureIoTClient 4:57e049bce51e 451 for (numberOfTrys = NUMBER_OF_MESSENGER_STOP_TRIES; numberOfTrys > 0; numberOfTrys--)
AzureIoTClient 4:57e049bce51e 452 {
AzureIoTClient 4:57e049bce51e 453 pn_messenger_work(messenger,100);
AzureIoTClient 4:57e049bce51e 454 if (pn_messenger_stopped(messenger))
AzureIoTClient 4:57e049bce51e 455 {
AzureIoTClient 4:57e049bce51e 456 succeeded = true;
AzureIoTClient 4:57e049bce51e 457 break;
AzureIoTClient 4:57e049bce51e 458 }
AzureIoTClient 4:57e049bce51e 459 }
AzureIoTClient 4:57e049bce51e 460 }
AzureIoTClient 4:57e049bce51e 461 return succeeded;
AzureIoTClient 4:57e049bce51e 462 }
AzureIoTClient 4:57e049bce51e 463
AzureIoTClient 4:57e049bce51e 464 static void herdTheMessengers(PAMQP_TRANSPORT_STATE state, bool logMessengerStopFailures)
AzureIoTClient 4:57e049bce51e 465 {
AzureIoTClient 4:57e049bce51e 466 PDLIST_ENTRY currentMessengerLink = state->messengerCorral.Flink;
AzureIoTClient 4:57e049bce51e 467
AzureIoTClient 4:57e049bce51e 468 //
AzureIoTClient 4:57e049bce51e 469 // We walk the list of messenger containers. We try to stop each one in turn.
AzureIoTClient 4:57e049bce51e 470 //
AzureIoTClient 4:57e049bce51e 471 // If we are able to stop the contained messenger, we will free the actual messenger. We will then free the messenger container.
AzureIoTClient 4:57e049bce51e 472 //
AzureIoTClient 4:57e049bce51e 473 while (currentMessengerLink != &state->messengerCorral)
AzureIoTClient 4:57e049bce51e 474 {
AzureIoTClient 4:57e049bce51e 475 PDLIST_ENTRY nextLink = currentMessengerLink->Flink;
AzureIoTClient 4:57e049bce51e 476 pn_messenger_t* oldMessenger = containingRecord(currentMessengerLink, MESSENGER_CONTAINER, entry)->messengerThatDidNotStop;
AzureIoTClient 4:57e049bce51e 477 if (stopMessenger(oldMessenger))
AzureIoTClient 4:57e049bce51e 478 {
AzureIoTClient 4:57e049bce51e 479 (void)(DList_RemoveEntryList(currentMessengerLink));
AzureIoTClient 4:57e049bce51e 480 free(containingRecord(currentMessengerLink, MESSENGER_CONTAINER, entry));
AzureIoTClient 4:57e049bce51e 481 pn_messenger_free(oldMessenger);
AzureIoTClient 4:57e049bce51e 482 }
AzureIoTClient 4:57e049bce51e 483 else
AzureIoTClient 4:57e049bce51e 484 {
AzureIoTClient 4:57e049bce51e 485 if (logMessengerStopFailures == true)
AzureIoTClient 4:57e049bce51e 486 {
AzureIoTClient 4:57e049bce51e 487 LogError("Unable to free the messenger at address: %p\r\n", oldMessenger);
AzureIoTClient 4:57e049bce51e 488 }
AzureIoTClient 4:57e049bce51e 489 }
AzureIoTClient 4:57e049bce51e 490 currentMessengerLink = nextLink;
AzureIoTClient 4:57e049bce51e 491 }
AzureIoTClient 4:57e049bce51e 492 }
AzureIoTClient 4:57e049bce51e 493
AzureIoTClient 4:57e049bce51e 494 static void disposeOfOldMessenger(PAMQP_TRANSPORT_STATE state)
AzureIoTClient 4:57e049bce51e 495 {
AzureIoTClient 4:57e049bce51e 496 if (state->messenger)
AzureIoTClient 4:57e049bce51e 497 {
AzureIoTClient 4:57e049bce51e 498 if (stopMessenger(state->messenger))
AzureIoTClient 4:57e049bce51e 499 {
AzureIoTClient 4:57e049bce51e 500 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_022: [If pn_messenger_stop returns 0 then invoke pn_messenger_free on the messenger.]*/
AzureIoTClient 4:57e049bce51e 501 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_025: [If pn_messenger_stopped returns true, then call pn_messenger_free.]*/
AzureIoTClient 4:57e049bce51e 502 //
AzureIoTClient 4:57e049bce51e 503 // Yay. It successfully stopped.
AzureIoTClient 4:57e049bce51e 504 //
AzureIoTClient 4:57e049bce51e 505 pn_messenger_free(state->messenger);
AzureIoTClient 4:57e049bce51e 506 }
AzureIoTClient 4:57e049bce51e 507 else
AzureIoTClient 4:57e049bce51e 508 {
AzureIoTClient 4:57e049bce51e 509 //
AzureIoTClient 4:57e049bce51e 510 // If we couldn't destroy it, then put it on a list and try to do it later in our "spare" time.
AzureIoTClient 4:57e049bce51e 511 //
AzureIoTClient 4:57e049bce51e 512 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_023: [If pn_messenger_stop returns any result other than PN_INPROGRESS then place the messenger on a list for attempting to stop later.]*/
AzureIoTClient 4:57e049bce51e 513 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_026: [If pn_messenger_stopped never returns true, then place the messenger on a list for attempting to stop later.]*/
AzureIoTClient 4:57e049bce51e 514 PMESSENGER_CONTAINER newContainer = malloc(sizeof(MESSENGER_CONTAINER));
AzureIoTClient 4:57e049bce51e 515 if (newContainer == NULL)
AzureIoTClient 4:57e049bce51e 516 {
AzureIoTClient 4:57e049bce51e 517 //
AzureIoTClient 4:57e049bce51e 518 // Darn! This is not good. These containers are not very big. If we can't allocate one, things are pretty bad.
AzureIoTClient 4:57e049bce51e 519 // However, it doesn't make sense to actually free this messenger. It hasn't been stopped and we can NOT be sure who
AzureIoTClient 4:57e049bce51e 520 // is referencing the memory that it has allocated. The safest thing to do in this very bad situation is to drop
AzureIoTClient 4:57e049bce51e 521 // it on the floor. It's a leak, but a leak is better than anything else. There is a pretty good likelyhood that
AzureIoTClient 4:57e049bce51e 522 // the app is exiting soon. The dropped memory will be cleaned up by exit.
AzureIoTClient 4:57e049bce51e 523 ;
AzureIoTClient 4:57e049bce51e 524 }
AzureIoTClient 4:57e049bce51e 525 else
AzureIoTClient 4:57e049bce51e 526 {
AzureIoTClient 4:57e049bce51e 527 newContainer->messengerThatDidNotStop = state->messenger;
AzureIoTClient 4:57e049bce51e 528 DList_InsertTailList(&state->messengerCorral, &newContainer->entry);
AzureIoTClient 4:57e049bce51e 529 }
AzureIoTClient 4:57e049bce51e 530 }
AzureIoTClient 4:57e049bce51e 531 state->messenger = NULL;
AzureIoTClient 4:57e049bce51e 532 }
AzureIoTClient 4:57e049bce51e 533 }
AzureIoTClient 4:57e049bce51e 534
AzureIoTClient 4:57e049bce51e 535 static void clientTransportAMQP_Destroy(TRANSPORT_HANDLE handle)
AzureIoTClient 4:57e049bce51e 536 {
AzureIoTClient 4:57e049bce51e 537 PAMQP_TRANSPORT_STATE transportState = (PAMQP_TRANSPORT_STATE)handle;
AzureIoTClient 4:57e049bce51e 538
AzureIoTClient 4:57e049bce51e 539 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_010: [clientTransportAMQP_Destroy shall do nothing if the handle is NULL.]*/
AzureIoTClient 4:57e049bce51e 540 if (transportState)
AzureIoTClient 4:57e049bce51e 541 {
AzureIoTClient 4:57e049bce51e 542 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_018: [clientTransportAMQP_Destroy shall free all the resources currently in use.]*/
AzureIoTClient 4:57e049bce51e 543 //
AzureIoTClient 4:57e049bce51e 544 // Go through all of the active work items. Complete their associated messages.
AzureIoTClient 4:57e049bce51e 545 //
AzureIoTClient 4:57e049bce51e 546 while (!DList_IsListEmpty(&transportState->workInProgress))
AzureIoTClient 4:57e049bce51e 547 {
AzureIoTClient 4:57e049bce51e 548 PDLIST_ENTRY currentEntry = DList_RemoveHeadList(&transportState->workInProgress);
AzureIoTClient 4:57e049bce51e 549 PAMQP_WORK_ITEM currentItem = containingRecord(currentEntry, AMQP_WORK_ITEM, link);
AzureIoTClient 4:57e049bce51e 550 IoTHubClient_LL_SendComplete(transportState->savedClientHandle, &currentItem->eventMessages, IOTHUB_BATCHSTATE_FAILED);
AzureIoTClient 4:57e049bce51e 551 DList_InsertTailList(&transportState->availableWorkItems, &currentItem->link);
AzureIoTClient 4:57e049bce51e 552 }
AzureIoTClient 4:57e049bce51e 553
AzureIoTClient 4:57e049bce51e 554 //
AzureIoTClient 4:57e049bce51e 555 // Get rid of work items that aren't being used, and that should be everything given the above loop!
AzureIoTClient 4:57e049bce51e 556 //
AzureIoTClient 4:57e049bce51e 557 while (!DList_IsListEmpty(&transportState->availableWorkItems))
AzureIoTClient 4:57e049bce51e 558 {
AzureIoTClient 4:57e049bce51e 559 PDLIST_ENTRY currentEntry = DList_RemoveHeadList(&transportState->availableWorkItems);
AzureIoTClient 4:57e049bce51e 560 free(containingRecord(currentEntry, AMQP_WORK_ITEM, link));
AzureIoTClient 4:57e049bce51e 561 }
AzureIoTClient 4:57e049bce51e 562
AzureIoTClient 4:57e049bce51e 563 disposeOfOldMessenger(transportState);
AzureIoTClient 4:57e049bce51e 564
AzureIoTClient 4:57e049bce51e 565 //
AzureIoTClient 4:57e049bce51e 566 // herdTheMessengers will NOT free a messenger that couldn't be stopped. Therefore the following
AzureIoTClient 4:57e049bce51e 567 // function could result in a memory leak of old messengers. This is by FAR less of a problem then
AzureIoTClient 4:57e049bce51e 568 // having memory returned in to pool that something else might still be referring to. (To get all folksy
AzureIoTClient 4:57e049bce51e 569 // here, don't put a sick chicken back in the hen house.)
AzureIoTClient 4:57e049bce51e 570 //
AzureIoTClient 4:57e049bce51e 571
AzureIoTClient 4:57e049bce51e 572 herdTheMessengers(transportState, true);
AzureIoTClient 4:57e049bce51e 573 if (transportState->message)
AzureIoTClient 4:57e049bce51e 574 {
AzureIoTClient 4:57e049bce51e 575 pn_message_free(transportState->message);
AzureIoTClient 4:57e049bce51e 576 }
AzureIoTClient 4:57e049bce51e 577 STRING_delete(transportState->messageAddress);
AzureIoTClient 4:57e049bce51e 578 STRING_delete(transportState->eventAddress);
AzureIoTClient 4:57e049bce51e 579 STRING_delete(transportState->urledDeviceId);
AzureIoTClient 4:57e049bce51e 580 STRING_delete(transportState->devicesPortionPath);
AzureIoTClient 4:57e049bce51e 581 STRING_delete(transportState->cbsAddress);
AzureIoTClient 4:57e049bce51e 582 STRING_delete(transportState->deviceKey);
AzureIoTClient 4:57e049bce51e 583 STRING_delete(transportState->zeroLengthString);
AzureIoTClient 5:8d58d20699dd 584 if (transportState->trustedCertificates != NULL)
AzureIoTClient 5:8d58d20699dd 585 {
AzureIoTClient 5:8d58d20699dd 586 free(transportState->trustedCertificates);
AzureIoTClient 5:8d58d20699dd 587 }
AzureIoTClient 4:57e049bce51e 588
AzureIoTClient 5:8d58d20699dd 589 free(transportState);
AzureIoTClient 4:57e049bce51e 590 }
AzureIoTClient 4:57e049bce51e 591 }
AzureIoTClient 4:57e049bce51e 592
AzureIoTClient 4:57e049bce51e 593 static int putToken(PAMQP_TRANSPORT_STATE transportState, STRING_HANDLE token)
AzureIoTClient 4:57e049bce51e 594 {
AzureIoTClient 4:57e049bce51e 595 int result = 0;
AzureIoTClient 4:57e049bce51e 596
AzureIoTClient 4:57e049bce51e 597 pn_data_t *properties;
AzureIoTClient 4:57e049bce51e 598 pn_atom_t messageId;
AzureIoTClient 4:57e049bce51e 599
AzureIoTClient 4:57e049bce51e 600 messageId.u.as_ulong = transportState->sendTokenMessageId;
AzureIoTClient 4:57e049bce51e 601 messageId.type = PN_ULONG;
AzureIoTClient 4:57e049bce51e 602 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_130: [putToken clears the single messages that is used for all communication via proton.]*/
AzureIoTClient 4:57e049bce51e 603 pn_message_clear(transportState->message);
AzureIoTClient 4:57e049bce51e 604 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_131: [putToken obtains the properties of the message.]*/
AzureIoTClient 4:57e049bce51e 605 properties = pn_message_properties(transportState->message);
AzureIoTClient 4:57e049bce51e 606 if (properties == NULL)
AzureIoTClient 4:57e049bce51e 607 {
AzureIoTClient 4:57e049bce51e 608 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_132: [If this fails then putToken fails.]*/
AzureIoTClient 4:57e049bce51e 609 LogError("unable to pn_message_properties\r\n");
AzureIoTClient 4:57e049bce51e 610 result = __LINE__;
AzureIoTClient 4:57e049bce51e 611 }
AzureIoTClient 4:57e049bce51e 612 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_133: [putToken constructs a proton map which describes the put token operation. If this construction fails then putToken fails.]*/
AzureIoTClient 4:57e049bce51e 613 else if (!((pn_data_put_map(properties) == 0) &&
AzureIoTClient 4:57e049bce51e 614 (pn_data_enter(properties) == true) &&
AzureIoTClient 4:57e049bce51e 615 (pn_data_put_string(properties, pn_bytes(strlen(PROTON_MAP_OPERATIONS_KEY), PROTON_MAP_OPERATIONS_KEY)) == 0) && // key
AzureIoTClient 4:57e049bce51e 616 (pn_data_put_string(properties, pn_bytes(strlen(PROTON_MAP_PUT_TOKEN_OPERATION), PROTON_MAP_PUT_TOKEN_OPERATION)) == 0) && // value
AzureIoTClient 4:57e049bce51e 617 (pn_data_put_string(properties, pn_bytes(strlen(PROTON_MAP_TYPE_KEY), PROTON_MAP_TYPE_KEY)) == 0) && // key
AzureIoTClient 4:57e049bce51e 618 (pn_data_put_string(properties, pn_bytes(strlen(PROTON_MAP_TOKEN_TYPE), PROTON_MAP_TOKEN_TYPE)) == 0) && // value
AzureIoTClient 4:57e049bce51e 619 (pn_data_put_string(properties, pn_bytes(strlen(PROTON_MAP_NAME_KEY), PROTON_MAP_NAME_KEY)) == 0) && // key
AzureIoTClient 4:57e049bce51e 620 (pn_data_put_string(properties, pn_bytes(STRING_length(transportState->devicesPortionPath), STRING_c_str(transportState->devicesPortionPath))) == 0) && // value
AzureIoTClient 4:57e049bce51e 621 (pn_data_exit(properties))
AzureIoTClient 4:57e049bce51e 622 ))
AzureIoTClient 4:57e049bce51e 623 {
AzureIoTClient 4:57e049bce51e 624 LogError("unable to build CBS put token map\r\n");
AzureIoTClient 4:57e049bce51e 625 result = __LINE__;
AzureIoTClient 4:57e049bce51e 626 }
AzureIoTClient 4:57e049bce51e 627 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_134: [putToken sets the address (CBS endpoint) that this put token operation is targeted to.]*/
AzureIoTClient 4:57e049bce51e 628 else if (pn_message_set_reply_to(transportState->message, CBS_REPLY_TO) != 0)
AzureIoTClient 4:57e049bce51e 629 {
AzureIoTClient 4:57e049bce51e 630 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_135: [If this fails then putToken fails.]*/
AzureIoTClient 4:57e049bce51e 631 LogError("unable to pn_message_set_reply_to\r\n");
AzureIoTClient 4:57e049bce51e 632 result = __LINE__;
AzureIoTClient 4:57e049bce51e 633 }
AzureIoTClient 4:57e049bce51e 634 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_136: [putToken sets the reply to address for the put token operation.]*/
AzureIoTClient 4:57e049bce51e 635 else if (pn_message_set_address(transportState->message, STRING_c_str(transportState->cbsAddress)) != 0)
AzureIoTClient 4:57e049bce51e 636 {
AzureIoTClient 4:57e049bce51e 637 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_137: [If this fails then putToken fails.] */
AzureIoTClient 4:57e049bce51e 638 LogError("unable to pn_message_set_address\r\n");
AzureIoTClient 4:57e049bce51e 639 result = __LINE__;
AzureIoTClient 4:57e049bce51e 640 }
AzureIoTClient 4:57e049bce51e 641 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_138: [putToken sets the message to not be inferred.]*/
AzureIoTClient 4:57e049bce51e 642 else if (pn_message_set_inferred(transportState->message, false) != 0)
AzureIoTClient 4:57e049bce51e 643 {
AzureIoTClient 4:57e049bce51e 644 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_139: [If this fails then putToken fails.]*/
AzureIoTClient 4:57e049bce51e 645 LogError("unable to pn_message_set_inferred\r\n");
AzureIoTClient 4:57e049bce51e 646 result = __LINE__;
AzureIoTClient 4:57e049bce51e 647 }
AzureIoTClient 4:57e049bce51e 648 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_140: [putToken sets the messageId of the message to the expiry of the token.]*/
AzureIoTClient 4:57e049bce51e 649 else if (pn_message_set_id(transportState->message, messageId) != 0)
AzureIoTClient 4:57e049bce51e 650 {
AzureIoTClient 4:57e049bce51e 651 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_141: [If this fails then putToken fails.]*/
AzureIoTClient 4:57e049bce51e 652 LogError("Unable to set the message id on the transfer of the token to the CBS\r\n");
AzureIoTClient 4:57e049bce51e 653 result = __LINE__;
AzureIoTClient 4:57e049bce51e 654 }
AzureIoTClient 4:57e049bce51e 655 else
AzureIoTClient 4:57e049bce51e 656 {
AzureIoTClient 4:57e049bce51e 657 pn_data_t* body;
AzureIoTClient 4:57e049bce51e 658 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_142: [putToken obtains the body of the message.]*/
AzureIoTClient 4:57e049bce51e 659 body = pn_message_body(transportState->message);
AzureIoTClient 4:57e049bce51e 660 if (body == NULL)
AzureIoTClient 4:57e049bce51e 661 {
AzureIoTClient 4:57e049bce51e 662 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_143: [If this fails then putToken fails.]*/
AzureIoTClient 4:57e049bce51e 663 LogError("Unable to pn_message_body\r\n");
AzureIoTClient 4:57e049bce51e 664 result = __LINE__;
AzureIoTClient 4:57e049bce51e 665 }
AzureIoTClient 4:57e049bce51e 666 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_144: [putToken places the actual SAS token into the body of the message.]*/
AzureIoTClient 4:57e049bce51e 667 else if (pn_data_put_string(body, pn_bytes(STRING_length(token), STRING_c_str(token))) != 0)
AzureIoTClient 4:57e049bce51e 668 {
AzureIoTClient 4:57e049bce51e 669 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_145: [If this fails then putToken fails.]*/
AzureIoTClient 4:57e049bce51e 670 LogError("Unable to pn_data_put_string\r\n");
AzureIoTClient 4:57e049bce51e 671 result = __LINE__;
AzureIoTClient 4:57e049bce51e 672 }
AzureIoTClient 4:57e049bce51e 673 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_146: [putToken invokes pn_messenger_put.]*/
AzureIoTClient 4:57e049bce51e 674 else if (pn_messenger_put(transportState->messenger, transportState->message) != 0)
AzureIoTClient 4:57e049bce51e 675 {
AzureIoTClient 4:57e049bce51e 676 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_147: [If this fails then putToken fails.]*/
AzureIoTClient 4:57e049bce51e 677 LogError("Unable to pn_messenger_put\r\n");
AzureIoTClient 4:57e049bce51e 678 result = __LINE__;
AzureIoTClient 4:57e049bce51e 679 transportState->messengerInitialized = false;
AzureIoTClient 4:57e049bce51e 680 }
AzureIoTClient 4:57e049bce51e 681 else
AzureIoTClient 4:57e049bce51e 682 {
AzureIoTClient 4:57e049bce51e 683 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_148: [A proton tracker is obtained.]*/
AzureIoTClient 4:57e049bce51e 684 pn_tracker_t tracker = pn_messenger_outgoing_tracker(transportState->messenger);
AzureIoTClient 4:57e049bce51e 685 time_t beginningOfWaitLoop = get_time(NULL);
AzureIoTClient 4:57e049bce51e 686 time_t currentTime;
AzureIoTClient 4:57e049bce51e 687 result = __LINE__;
AzureIoTClient 4:57e049bce51e 688 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_149: [putToken will loop invoking pn_messenger_work waiting for the message to reach a terminal delivery state.]*/
AzureIoTClient 4:57e049bce51e 689 do
AzureIoTClient 4:57e049bce51e 690 {
AzureIoTClient 4:57e049bce51e 691 int protonResult;
AzureIoTClient 4:57e049bce51e 692 if (checkForErrorsThenWork(transportState) == false)
AzureIoTClient 4:57e049bce51e 693 {
AzureIoTClient 4:57e049bce51e 694 break;
AzureIoTClient 4:57e049bce51e 695 }
AzureIoTClient 4:57e049bce51e 696 else
AzureIoTClient 4:57e049bce51e 697 {
AzureIoTClient 4:57e049bce51e 698 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_150: [If a terminal delivery state is not reached then putToken fails.]*/
AzureIoTClient 4:57e049bce51e 699 if ((protonResult = pn_messenger_status(transportState->messenger, tracker)) != PN_STATUS_PENDING)
AzureIoTClient 4:57e049bce51e 700 {
AzureIoTClient 4:57e049bce51e 701 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_151: [On reaching a terminal delivery state, if the message status is NOT accepted then putToken fails.]*/
AzureIoTClient 4:57e049bce51e 702 if (protonResult != PN_STATUS_ACCEPTED)
AzureIoTClient 4:57e049bce51e 703 {
AzureIoTClient 4:57e049bce51e 704 LogError("Error sending the token: %s\r\n", pn_error_text(pn_messenger_error(transportState->messenger)));
AzureIoTClient 4:57e049bce51e 705 result = __LINE__;
AzureIoTClient 4:57e049bce51e 706 transportState->messengerInitialized = false;
AzureIoTClient 4:57e049bce51e 707 }
AzureIoTClient 4:57e049bce51e 708 else
AzureIoTClient 4:57e049bce51e 709 {
AzureIoTClient 4:57e049bce51e 710 result = 0;
AzureIoTClient 4:57e049bce51e 711 }
AzureIoTClient 4:57e049bce51e 712 break;
AzureIoTClient 4:57e049bce51e 713 }
AzureIoTClient 4:57e049bce51e 714 }
AzureIoTClient 4:57e049bce51e 715 currentTime = get_time(NULL);
AzureIoTClient 4:57e049bce51e 716 } while (difftime(currentTime, beginningOfWaitLoop) < transportState->cbsRequestAcceptTime);
AzureIoTClient 4:57e049bce51e 717
AzureIoTClient 4:57e049bce51e 718 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_152: [The tracker is settled.]*/
AzureIoTClient 4:57e049bce51e 719 pn_messenger_settle(transportState->messenger, tracker, 0);
AzureIoTClient 4:57e049bce51e 720 if (result == 0)
AzureIoTClient 4:57e049bce51e 721 {
AzureIoTClient 4:57e049bce51e 722 //
AzureIoTClient 4:57e049bce51e 723 // Successfully sent the token. Wait around for a reply.
AzureIoTClient 4:57e049bce51e 724 //
AzureIoTClient 4:57e049bce51e 725 transportState->waitingForPutTokenReply = true;
AzureIoTClient 4:57e049bce51e 726 beginningOfWaitLoop = get_time(NULL);
AzureIoTClient 4:57e049bce51e 727 result = __LINE__;
AzureIoTClient 4:57e049bce51e 728 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_153: [putToken will again loop waiting for a reply.]*/
AzureIoTClient 4:57e049bce51e 729 do
AzureIoTClient 4:57e049bce51e 730 {
AzureIoTClient 4:57e049bce51e 731 processReceives(transportState);
AzureIoTClient 4:57e049bce51e 732 if (transportState->waitingForPutTokenReply == true)
AzureIoTClient 4:57e049bce51e 733 {
AzureIoTClient 4:57e049bce51e 734 if (checkForErrorsThenWork(transportState) == false)
AzureIoTClient 4:57e049bce51e 735 {
AzureIoTClient 4:57e049bce51e 736 break;
AzureIoTClient 4:57e049bce51e 737 }
AzureIoTClient 4:57e049bce51e 738 }
AzureIoTClient 4:57e049bce51e 739 else
AzureIoTClient 4:57e049bce51e 740 {
AzureIoTClient 4:57e049bce51e 741 result = 0;
AzureIoTClient 4:57e049bce51e 742 break;
AzureIoTClient 4:57e049bce51e 743 }
AzureIoTClient 4:57e049bce51e 744 currentTime = get_time(NULL);
AzureIoTClient 4:57e049bce51e 745 } while (difftime(currentTime, beginningOfWaitLoop) < transportState->cbsReplyTime);
AzureIoTClient 4:57e049bce51e 746 }
AzureIoTClient 4:57e049bce51e 747 else
AzureIoTClient 4:57e049bce51e 748 {
AzureIoTClient 4:57e049bce51e 749 LogError("An network error occured while waiting for a CBS reply.\r\n");
AzureIoTClient 4:57e049bce51e 750 transportState->messengerInitialized = false;
AzureIoTClient 4:57e049bce51e 751 }
AzureIoTClient 4:57e049bce51e 752 }
AzureIoTClient 4:57e049bce51e 753 }
AzureIoTClient 4:57e049bce51e 754 pn_message_clear(transportState->message);
AzureIoTClient 4:57e049bce51e 755 return result;
AzureIoTClient 4:57e049bce51e 756 }
AzureIoTClient 4:57e049bce51e 757
AzureIoTClient 4:57e049bce51e 758 static void renewIfNecessaryTheCBS(PAMQP_TRANSPORT_STATE transportState)
AzureIoTClient 4:57e049bce51e 759 {
AzureIoTClient 4:57e049bce51e 760
AzureIoTClient 4:57e049bce51e 761 //
AzureIoTClient 4:57e049bce51e 762 // There is an expiry stored in the state. It has the last expiry value used to define a
AzureIoTClient 4:57e049bce51e 763 // sas token. If we are within 20 minutes OR we're PAST that expiration generate a new
AzureIoTClient 4:57e049bce51e 764 // expiry time and create a new sas token and "put" it to the cbs.
AzureIoTClient 4:57e049bce51e 765 //
AzureIoTClient 4:57e049bce51e 766
AzureIoTClient 4:57e049bce51e 767 size_t secondsSinceEpoch;
AzureIoTClient 4:57e049bce51e 768 int differenceWithLastExpiry;
AzureIoTClient 4:57e049bce51e 769
AzureIoTClient 4:57e049bce51e 770 transportState->putTokenWasSuccessful = false;
AzureIoTClient 4:57e049bce51e 771 //
AzureIoTClient 4:57e049bce51e 772 // Get the number of seconds since the epoch.
AzureIoTClient 4:57e049bce51e 773 //
AzureIoTClient 4:57e049bce51e 774 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_163: [Invocation of renewIfNecessaryTheCBS will first obtain the current number of seconds since the epoch.*/
AzureIoTClient 4:57e049bce51e 775 secondsSinceEpoch = (size_t)(difftime(get_time(NULL), EPOCH_TIME_T_VALUE)+0); // adding zero because a compiler feels it's necessary.
AzureIoTClient 4:57e049bce51e 776 differenceWithLastExpiry = transportState->lastExpiryUsed - secondsSinceEpoch;
AzureIoTClient 4:57e049bce51e 777 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_164: [If the difference between lastExpiryUsed and the current value is less than refresh seconds then renewIfNecessaryTheCBS will attempt to renew the SAS.]*/
AzureIoTClient 4:57e049bce51e 778 if ((differenceWithLastExpiry <= 0) ||
AzureIoTClient 4:57e049bce51e 779 (((size_t)differenceWithLastExpiry) < transportState->sasRefreshLine)) // Within refresh minutes (or past if negative) of the expiration
AzureIoTClient 4:57e049bce51e 780 {
AzureIoTClient 4:57e049bce51e 781 //
AzureIoTClient 4:57e049bce51e 782 // We already have the seconds since the epoch. Add an hour to it and generate the new SAS.
AzureIoTClient 4:57e049bce51e 783 //
AzureIoTClient 4:57e049bce51e 784 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_165: [If SAS renewal is to occur then lifetime seconds is added to the current number of seconds.]*/
AzureIoTClient 4:57e049bce51e 785 size_t possibleNewExpiry = secondsSinceEpoch + transportState->sasTokenLifetime;
AzureIoTClient 4:57e049bce51e 786 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_169: [SASToken_Create is invoked.] */
AzureIoTClient 4:57e049bce51e 787 STRING_HANDLE newSASToken = SASToken_Create(transportState->deviceKey, transportState->devicesPortionPath, transportState->zeroLengthString, possibleNewExpiry);
AzureIoTClient 4:57e049bce51e 788 if (newSASToken == NULL)
AzureIoTClient 4:57e049bce51e 789 {
AzureIoTClient 4:57e049bce51e 790 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_170: [If SASToken_Create returns NULL then renewal is considered a failure and we move on to the next phase of DoWork.]*/
AzureIoTClient 4:57e049bce51e 791 LogError("Could not generate a new SAS token for the CBS\r\n");
AzureIoTClient 4:57e049bce51e 792 }
AzureIoTClient 4:57e049bce51e 793 else
AzureIoTClient 4:57e049bce51e 794 {
AzureIoTClient 4:57e049bce51e 795 //
AzureIoTClient 4:57e049bce51e 796 // We are going to save off in the transport state this POSSIBLE expiry value. Sending token and response processing code
AzureIoTClient 4:57e049bce51e 797 // will use it as the message id and correlation id for the request/response. This will work
AzureIoTClient 4:57e049bce51e 798 // because the code that sends the token will wait for AT LEAST one second for a valid response. If a valid response comes
AzureIoTClient 4:57e049bce51e 799 // back in that period of time then even if for some bizarre reason we need to renew again in less than a second, we know
AzureIoTClient 4:57e049bce51e 800 // there won't be a spurious response (because we just got the resonse!).
AzureIoTClient 4:57e049bce51e 801 //
AzureIoTClient 4:57e049bce51e 802 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_166: [This value is saved in the state as sendTokenMessageId.]*/
AzureIoTClient 4:57e049bce51e 803 transportState->sendTokenMessageId = possibleNewExpiry;
AzureIoTClient 4:57e049bce51e 804 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_171: [renewIfNecessaryTheCBS will invoke a local static function putToken.]*/
AzureIoTClient 4:57e049bce51e 805 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_172: [If putToken fails then the renewal is considered a failure and we move on to the next phase of DoWork.]*/
AzureIoTClient 4:57e049bce51e 806 if (putToken(transportState, newSASToken) == 0)
AzureIoTClient 4:57e049bce51e 807 {
AzureIoTClient 4:57e049bce51e 808 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_173: [Note that putToken actually sets a state value which will indicate whether the CBS accepted the token and responded with success.]*/
AzureIoTClient 4:57e049bce51e 809 if (transportState->putTokenWasSuccessful == true)
AzureIoTClient 4:57e049bce51e 810 {
AzureIoTClient 4:57e049bce51e 811 transportState->lastExpiryUsed = possibleNewExpiry;
AzureIoTClient 4:57e049bce51e 812 }
AzureIoTClient 4:57e049bce51e 813 }
AzureIoTClient 4:57e049bce51e 814 STRING_delete(newSASToken);
AzureIoTClient 4:57e049bce51e 815 }
AzureIoTClient 4:57e049bce51e 816 }
AzureIoTClient 4:57e049bce51e 817 }
AzureIoTClient 4:57e049bce51e 818
AzureIoTClient 4:57e049bce51e 819 static bool protonMessengerInit(PAMQP_TRANSPORT_STATE amqpState)
AzureIoTClient 4:57e049bce51e 820 {
AzureIoTClient 4:57e049bce51e 821 if (!amqpState->messengerInitialized)
AzureIoTClient 4:57e049bce51e 822 {
AzureIoTClient 4:57e049bce51e 823 rollbackEvent(amqpState);
AzureIoTClient 4:57e049bce51e 824 disposeOfOldMessenger(amqpState);
AzureIoTClient 4:57e049bce51e 825 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_027: [DoWork shall invoke pn_messenger to create a new messenger.]*/
AzureIoTClient 4:57e049bce51e 826 if ((amqpState->messenger = pn_messenger(NULL)) == NULL)
AzureIoTClient 4:57e049bce51e 827 {
AzureIoTClient 4:57e049bce51e 828 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_028: [If pn_messenger returns NULL then the messenger initialization fails.]*/
AzureIoTClient 4:57e049bce51e 829 LogError("The messenger is not able to be created.\r\n");
AzureIoTClient 4:57e049bce51e 830 }
AzureIoTClient 5:8d58d20699dd 831 /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_001: [If the option "TrustedCerts" has been set (length greater than 0), the trusted certificates string shall be passed to Proton by a call to pn_messenger_set_trusted_certificates.] */
AzureIoTClient 5:8d58d20699dd 832 else if ((amqpState->trustedCertificates != NULL) &&
AzureIoTClient 5:8d58d20699dd 833 (strlen(amqpState->trustedCertificates) > 0) &&
AzureIoTClient 5:8d58d20699dd 834 (pn_messenger_set_trusted_certificates(amqpState->messenger, amqpState->trustedCertificates) != 0))
AzureIoTClient 5:8d58d20699dd 835 {
AzureIoTClient 5:8d58d20699dd 836 /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_002: [If pn_messenger_set_trusted_certificates fails, then messenger initialization shall fail.] */
AzureIoTClient 5:8d58d20699dd 837 LogError("unable to pass certificate information via pn_messenger_set_trusted_certificates\r\n");
AzureIoTClient 5:8d58d20699dd 838 }
AzureIoTClient 5:8d58d20699dd 839 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_029: [pn_messenger_start shall be invoked.]*/
AzureIoTClient 4:57e049bce51e 840 else if (pn_messenger_start(amqpState->messenger) != 0)
AzureIoTClient 4:57e049bce51e 841 {
AzureIoTClient 4:57e049bce51e 842 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_030: [If pn_messenger_start returns a non-zero value then messenger initialization fails.]*/
AzureIoTClient 4:57e049bce51e 843 LogError("unable to pn_messenger_start\r\n");
AzureIoTClient 4:57e049bce51e 844 }
AzureIoTClient 4:57e049bce51e 845 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_031: [pn_messenger_set_blocking shall be invoked.]*/
AzureIoTClient 4:57e049bce51e 846 else if (pn_messenger_set_blocking(amqpState->messenger, false) != 0)
AzureIoTClient 4:57e049bce51e 847 {
AzureIoTClient 4:57e049bce51e 848 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_032: [If pn_messenger_set_blocking returns a non-zero value then messenger initialization fails.]*/
AzureIoTClient 4:57e049bce51e 849 LogError("unable to pn_messenger_set_blocking\r\n");
AzureIoTClient 4:57e049bce51e 850 }
AzureIoTClient 4:57e049bce51e 851 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_033: [pn_messenger_set_snd_settle_mode shall be invoked.]*/
AzureIoTClient 4:57e049bce51e 852 else if (pn_messenger_set_snd_settle_mode(amqpState->messenger, PN_SND_UNSETTLED) != 0)
AzureIoTClient 4:57e049bce51e 853 {
AzureIoTClient 4:57e049bce51e 854 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_034: [If pn_messenger_set_snd_settle_mode returns a non-zero value then messenger initialization fails.]*/
AzureIoTClient 4:57e049bce51e 855 LogError("unable to set the send settle mode\r\n");
AzureIoTClient 4:57e049bce51e 856 }
AzureIoTClient 4:57e049bce51e 857 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_035: [pn_messenger_set_rcv_settle_mode shall be invoked.]*/
AzureIoTClient 4:57e049bce51e 858 else if (pn_messenger_set_rcv_settle_mode(amqpState->messenger, PN_RCV_FIRST) != 0)
AzureIoTClient 4:57e049bce51e 859 {
AzureIoTClient 4:57e049bce51e 860 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_036: [If pn_messenger_set_rcv_settle_mode returns a non-zero value then messenger initialization fails.]*/
AzureIoTClient 4:57e049bce51e 861 LogError("unable to set the receive settle mode\r\n");
AzureIoTClient 4:57e049bce51e 862 }
AzureIoTClient 4:57e049bce51e 863 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_037: [pn_messenger_set_outgoing_window shall be invoked.]*/
AzureIoTClient 4:57e049bce51e 864 else if (pn_messenger_set_outgoing_window(amqpState->messenger, PROTON_OUTGOING_WINDOW_SIZE) != 0)
AzureIoTClient 4:57e049bce51e 865 {
AzureIoTClient 4:57e049bce51e 866 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_038: [If pn_messenger_set_outgoing_window returns a non-zero value then messenger initialization fails.]*/
AzureIoTClient 4:57e049bce51e 867 LogError("unable to pn_messenger_set_outgoing_window\r\n");
AzureIoTClient 4:57e049bce51e 868 }
AzureIoTClient 4:57e049bce51e 869 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_039: [pn_messenger_set_incoming_window shall be invoked.]*/
AzureIoTClient 4:57e049bce51e 870 else if (pn_messenger_set_incoming_window(amqpState->messenger, PROTON_INCOMING_WINDOW_SIZE) != 0)
AzureIoTClient 4:57e049bce51e 871 {
AzureIoTClient 4:57e049bce51e 872 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_040: [If pn_messenger_set_incoming_window returns a non-zero value then messenger initialization fails.]*/
AzureIoTClient 4:57e049bce51e 873 LogError("unable to pn_messenger_set_incoming_window\r\n");
AzureIoTClient 4:57e049bce51e 874 }
AzureIoTClient 4:57e049bce51e 875 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_156: [pn_messenger_subscribe will be invoked on the cbs address.]*/
AzureIoTClient 4:57e049bce51e 876 else if ((amqpState->cbsSubscription = pn_messenger_subscribe(amqpState->messenger, (const char*)STRING_c_str(amqpState->cbsAddress))) == NULL)
AzureIoTClient 4:57e049bce51e 877 {
AzureIoTClient 4:57e049bce51e 878 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_157: [If the pn_messenger_subscribe returns a non-zero value then messenger initialization fails.]*/
AzureIoTClient 4:57e049bce51e 879 LogError("unable to create a subscription to the cbs address\r\n");
AzureIoTClient 4:57e049bce51e 880 }
AzureIoTClient 4:57e049bce51e 881 else
AzureIoTClient 4:57e049bce51e 882 {
AzureIoTClient 4:57e049bce51e 883 //We've got the subscription. Now we attempt to connect to the cbs.
AzureIoTClient 4:57e049bce51e 884 /*Codes_During messenger initialization a state variable known as lastExpiryUsed will be set to zero.*/
AzureIoTClient 4:57e049bce51e 885 amqpState->lastExpiryUsed = 0;
AzureIoTClient 4:57e049bce51e 886 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_159: [renewIfNecessaryCBS will be invoked.]*/
AzureIoTClient 4:57e049bce51e 887 renewIfNecessaryTheCBS(amqpState);
AzureIoTClient 4:57e049bce51e 888 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_160: [If after renewIfNecessaryCBS is invoked, the state variable putTokenWasSuccessful is false then the messenger initialization fails.]*/
AzureIoTClient 4:57e049bce51e 889 if (amqpState->putTokenWasSuccessful == true)
AzureIoTClient 4:57e049bce51e 890 {
AzureIoTClient 4:57e049bce51e 891 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_041: [pn_messenger_subscribe shall be invoked.]*/
AzureIoTClient 4:57e049bce51e 892 if ((amqpState->messageSubscription = pn_messenger_subscribe(amqpState->messenger, (const char*)STRING_c_str(amqpState->messageAddress))) == NULL)
AzureIoTClient 4:57e049bce51e 893 {
AzureIoTClient 4:57e049bce51e 894 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_042: [If pn_messenger_subscribe returns a non-zero value then messenger initialization fails.]*/
AzureIoTClient 4:57e049bce51e 895 LogError("unable to create a subscription to the message address\r\n");
AzureIoTClient 4:57e049bce51e 896 }
AzureIoTClient 4:57e049bce51e 897 else
AzureIoTClient 4:57e049bce51e 898 {
AzureIoTClient 4:57e049bce51e 899 amqpState->messengerInitialized = true;
AzureIoTClient 4:57e049bce51e 900 }
AzureIoTClient 4:57e049bce51e 901 }
AzureIoTClient 4:57e049bce51e 902 }
AzureIoTClient 4:57e049bce51e 903 }
AzureIoTClient 4:57e049bce51e 904 return amqpState->messengerInitialized;
AzureIoTClient 4:57e049bce51e 905 }
AzureIoTClient 4:57e049bce51e 906
AzureIoTClient 4:57e049bce51e 907 static amqp_batch_result prepareBatch(size_t maximumPayload, PDLIST_ENTRY waitingToSend, PDLIST_ENTRY messagesMakingUpBatch, size_t* payloadSize, const unsigned char** payload)
AzureIoTClient 4:57e049bce51e 908 {
AzureIoTClient 4:57e049bce51e 909 amqp_batch_result result;
AzureIoTClient 4:57e049bce51e 910 if (DList_IsListEmpty(waitingToSend))
AzureIoTClient 4:57e049bce51e 911 {
AzureIoTClient 4:57e049bce51e 912 result = amqp_batch_nowork;
AzureIoTClient 4:57e049bce51e 913 }
AzureIoTClient 4:57e049bce51e 914 else
AzureIoTClient 4:57e049bce51e 915 {
AzureIoTClient 4:57e049bce51e 916 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_074: [If there is a waitingToSend entry remove it from the list.]*/
AzureIoTClient 4:57e049bce51e 917 IOTHUB_MESSAGE_LIST* currentMessage;
AzureIoTClient 4:57e049bce51e 918 IOTHUBMESSAGE_CONTENT_TYPE contentType;
AzureIoTClient 4:57e049bce51e 919 size_t messageLength;
AzureIoTClient 4:57e049bce51e 920 const unsigned char* messagePayload;
AzureIoTClient 4:57e049bce51e 921 DList_InitializeListHead(messagesMakingUpBatch);
AzureIoTClient 4:57e049bce51e 922 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_086: [If the pn_messenger_put succeeds then DoWork will save off a tracker obtained by invoking pn_messenger_outgoing_tracker and save it the head of the availableWorkItems. The eventMessages in the AMQP_WORK_ITEM head will also contain the IOTHUB_MESSAGE_LIST.]*/
AzureIoTClient 4:57e049bce51e 923 DList_InsertTailList(messagesMakingUpBatch, DList_RemoveHeadList(waitingToSend));
AzureIoTClient 4:57e049bce51e 924 currentMessage = containingRecord(messagesMakingUpBatch->Flink, IOTHUB_MESSAGE_LIST, entry);
AzureIoTClient 4:57e049bce51e 925 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_075: [Get its size and payload address by calling IoTHubMessage_GetByteArray.]*/
AzureIoTClient 4:57e049bce51e 926 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_008: [If the message type is IOTHUBMESSAGE_STRING then get the data by using IoTHubMesage_GetString and set the size to the length of the string.] */
AzureIoTClient 4:57e049bce51e 927 contentType = IoTHubMessage_GetContentType(currentMessage->messageHandle);
AzureIoTClient 4:57e049bce51e 928 if (!(
AzureIoTClient 4:57e049bce51e 929 ((contentType == IOTHUBMESSAGE_BYTEARRAY) && (IoTHubMessage_GetByteArray(currentMessage->messageHandle, &messagePayload, &messageLength) == IOTHUB_MESSAGE_OK)) ||
AzureIoTClient 4:57e049bce51e 930 ((contentType == IOTHUBMESSAGE_STRING) && (
AzureIoTClient 4:57e049bce51e 931 messagePayload = (const unsigned char*)IoTHubMessage_GetString(currentMessage->messageHandle),
AzureIoTClient 4:57e049bce51e 932 (messageLength = (messagePayload == NULL) ? 0 : strlen((const char*)messagePayload)),
AzureIoTClient 4:57e049bce51e 933 messagePayload != NULL)
AzureIoTClient 4:57e049bce51e 934 )
AzureIoTClient 4:57e049bce51e 935 ))
AzureIoTClient 4:57e049bce51e 936 {
AzureIoTClient 4:57e049bce51e 937 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_076: [If that fails then fail that event.]*/
AzureIoTClient 4:57e049bce51e 938 LogError("Failure in getting message content. Type had decimal value = %d\r\n", contentType);
AzureIoTClient 4:57e049bce51e 939 result = amqp_batch_error;
AzureIoTClient 4:57e049bce51e 940 }
AzureIoTClient 4:57e049bce51e 941 else if (messageLength > maximumPayload)
AzureIoTClient 4:57e049bce51e 942 {
AzureIoTClient 4:57e049bce51e 943 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_077: [If the size of the payload is greater than the maximum (currently 256*1024) fail that event.]*/
AzureIoTClient 4:57e049bce51e 944 LogError("Batch length is too large.\r\n");
AzureIoTClient 4:57e049bce51e 945 result = amqp_batch_error;
AzureIoTClient 4:57e049bce51e 946 }
AzureIoTClient 4:57e049bce51e 947 else
AzureIoTClient 4:57e049bce51e 948 {
AzureIoTClient 4:57e049bce51e 949 *payloadSize = messageLength;
AzureIoTClient 4:57e049bce51e 950 *payload = messagePayload;
AzureIoTClient 4:57e049bce51e 951 result = amqp_batch_nobatch;
AzureIoTClient 4:57e049bce51e 952 }
AzureIoTClient 4:57e049bce51e 953 }
AzureIoTClient 4:57e049bce51e 954 return result;
AzureIoTClient 4:57e049bce51e 955 }
AzureIoTClient 4:57e049bce51e 956
AzureIoTClient 4:57e049bce51e 957 static bool prepareSimpleProtonMessage(pn_message_t* message, STRING_HANDLE endpoint, size_t payloadLength, const unsigned char* payload)
AzureIoTClient 4:57e049bce51e 958 {
AzureIoTClient 4:57e049bce51e 959 bool result = false;
AzureIoTClient 4:57e049bce51e 960 pn_data_t * body;
AzureIoTClient 4:57e049bce51e 961
AzureIoTClient 4:57e049bce51e 962 pn_message_clear(message);
AzureIoTClient 4:57e049bce51e 963 if (pn_message_set_address(message, STRING_c_str(endpoint)) != 0)
AzureIoTClient 4:57e049bce51e 964 {
AzureIoTClient 4:57e049bce51e 965 LogError("Unable to set amqp address for proton message.\r\n");
AzureIoTClient 4:57e049bce51e 966 }
AzureIoTClient 4:57e049bce51e 967 else if (pn_message_set_inferred(message, true) != 0)
AzureIoTClient 4:57e049bce51e 968 {
AzureIoTClient 4:57e049bce51e 969 LogError("Unable to set inferred to true for proton message.\r\n");
AzureIoTClient 4:57e049bce51e 970 }
AzureIoTClient 4:57e049bce51e 971 else if ((body = pn_message_body(message)) == NULL)
AzureIoTClient 4:57e049bce51e 972 {
AzureIoTClient 4:57e049bce51e 973 LogError("Unable to set proton message body.\r\n");
AzureIoTClient 4:57e049bce51e 974 }
AzureIoTClient 4:57e049bce51e 975 else if (pn_data_put_binary(body, pn_bytes(payloadLength, (const char*)payload)) != 0)
AzureIoTClient 4:57e049bce51e 976 {
AzureIoTClient 4:57e049bce51e 977 LogError("Unable to set binary data for message body.\r\n");
AzureIoTClient 4:57e049bce51e 978 }
AzureIoTClient 4:57e049bce51e 979 else
AzureIoTClient 4:57e049bce51e 980 {
AzureIoTClient 4:57e049bce51e 981 result = true;
AzureIoTClient 4:57e049bce51e 982 }
AzureIoTClient 4:57e049bce51e 983
AzureIoTClient 4:57e049bce51e 984 return result;
AzureIoTClient 4:57e049bce51e 985 }
AzureIoTClient 4:57e049bce51e 986
AzureIoTClient 4:57e049bce51e 987 static void rollbackEvent(PAMQP_TRANSPORT_STATE transportState)
AzureIoTClient 4:57e049bce51e 988 {
AzureIoTClient 4:57e049bce51e 989 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_175: [The list of work in progress is tested to see if it is empty.]*/
AzureIoTClient 4:57e049bce51e 990 while (!DList_IsListEmpty(&transportState->workInProgress))
AzureIoTClient 4:57e049bce51e 991 {
AzureIoTClient 4:57e049bce51e 992 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_176: [If it is not empty then the list is traversed.]*/
AzureIoTClient 4:57e049bce51e 993 int settleResult;
AzureIoTClient 4:57e049bce51e 994 int protonResult;
AzureIoTClient 4:57e049bce51e 995 PDLIST_ENTRY newestEntry = transportState->workInProgress.Blink;
AzureIoTClient 4:57e049bce51e 996 PAMQP_WORK_ITEM currentWork = containingRecord(newestEntry, AMQP_WORK_ITEM, link);
AzureIoTClient 4:57e049bce51e 997 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_179: [The status of the work item is acquired.]*/
AzureIoTClient 4:57e049bce51e 998 protonResult = pn_messenger_status(transportState->messenger, currentWork->tracker);
AzureIoTClient 4:57e049bce51e 999 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_180: [The item is settled.]*/
AzureIoTClient 4:57e049bce51e 1000 if ((settleResult = pn_messenger_settle(transportState->messenger, currentWork->tracker, 0)) != 0)
AzureIoTClient 4:57e049bce51e 1001 {
AzureIoTClient 4:57e049bce51e 1002 LogError("Attempt to settle a tracker produced an error: %d\r\n", settleResult);
AzureIoTClient 4:57e049bce51e 1003 }
AzureIoTClient 4:57e049bce51e 1004 if ((protonResult == PN_STATUS_ACCEPTED) ||
AzureIoTClient 4:57e049bce51e 1005 (protonResult == PN_STATUS_REJECTED) ||
AzureIoTClient 4:57e049bce51e 1006 (protonResult == PN_STATUS_RELEASED))
AzureIoTClient 4:57e049bce51e 1007 {
AzureIoTClient 4:57e049bce51e 1008 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_182: [Items that have been settled by the service will have their IOTHUB_MESSAGE_LIST completed.]*/
AzureIoTClient 4:57e049bce51e 1009 IoTHubClient_LL_SendComplete(transportState->savedClientHandle, &currentWork->eventMessages, (protonResult == PN_STATUS_ACCEPTED) ? IOTHUB_BATCHSTATE_SUCCESS : IOTHUB_BATCHSTATE_FAILED);
AzureIoTClient 4:57e049bce51e 1010 }
AzureIoTClient 4:57e049bce51e 1011 else
AzureIoTClient 4:57e049bce51e 1012 {
AzureIoTClient 4:57e049bce51e 1013 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_178: [Items that are not settled by the service will have their IOTHUB_MESSAGE_LIST removed from the work item and placed back on the waiting to send.]*/
AzureIoTClient 4:57e049bce51e 1014 putSecondListAfterHeadOfFirst(transportState->waitingToSend->Flink, &currentWork->eventMessages);
AzureIoTClient 4:57e049bce51e 1015 }
AzureIoTClient 4:57e049bce51e 1016 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_181: [The work item is removed from the in progress list and placed on the available list.]*/
AzureIoTClient 4:57e049bce51e 1017 DList_RemoveEntryList(newestEntry);
AzureIoTClient 4:57e049bce51e 1018 DList_InsertTailList(&transportState->availableWorkItems, newestEntry);
AzureIoTClient 4:57e049bce51e 1019 }
AzureIoTClient 4:57e049bce51e 1020 }
AzureIoTClient 4:57e049bce51e 1021 static void reclaimEventResources(PAMQP_TRANSPORT_STATE transportState)
AzureIoTClient 4:57e049bce51e 1022 {
AzureIoTClient 4:57e049bce51e 1023 PDLIST_ENTRY currentListEntry;
AzureIoTClient 4:57e049bce51e 1024 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_064: [Obtain the head of the workInProgress]*/
AzureIoTClient 4:57e049bce51e 1025 currentListEntry = transportState->workInProgress.Flink;
AzureIoTClient 4:57e049bce51e 1026 while (currentListEntry != &transportState->workInProgress)
AzureIoTClient 4:57e049bce51e 1027 {
AzureIoTClient 4:57e049bce51e 1028 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_065: [While head != listhead reclaimEventResources will do as follows.] */
AzureIoTClient 4:57e049bce51e 1029 int protonResult;
AzureIoTClient 4:57e049bce51e 1030 PAMQP_WORK_ITEM currentWork = containingRecord(currentListEntry, AMQP_WORK_ITEM, link);
AzureIoTClient 4:57e049bce51e 1031 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_066: [Check the current status via pn_messenger_status via the tracker stored in the AMQP_WORK_ITEM.]*/
AzureIoTClient 4:57e049bce51e 1032 if ((protonResult = pn_messenger_status(transportState->messenger, currentWork->tracker)) == PN_STATUS_PENDING)
AzureIoTClient 4:57e049bce51e 1033 {
AzureIoTClient 4:57e049bce51e 1034 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_183: [if the amount of time that the work item has been pending exceeds a timeout stored in the work item, the messenger will be marked as not initialized.]*/
AzureIoTClient 4:57e049bce51e 1035 size_t secondsSinceTheEpoch = (size_t)(difftime(get_time(NULL), EPOCH_TIME_T_VALUE) + 0);
AzureIoTClient 4:57e049bce51e 1036 if (currentWork->expiry < secondsSinceTheEpoch)
AzureIoTClient 4:57e049bce51e 1037 {
AzureIoTClient 4:57e049bce51e 1038 LogError("A event operation timed out.\r\n");
AzureIoTClient 4:57e049bce51e 1039 transportState->messengerInitialized = false;
AzureIoTClient 4:57e049bce51e 1040 }
AzureIoTClient 4:57e049bce51e 1041 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_067: [reclaimEventResources shall go onto the next work item if the proton status is PN_STATUS_PENDING.]*/
AzureIoTClient 4:57e049bce51e 1042 currentListEntry = currentListEntry->Flink;
AzureIoTClient 4:57e049bce51e 1043 }
AzureIoTClient 4:57e049bce51e 1044 else
AzureIoTClient 4:57e049bce51e 1045 {
AzureIoTClient 4:57e049bce51e 1046 DLIST_ENTRY savedFromCurrentListEntry;
AzureIoTClient 4:57e049bce51e 1047 int settleResult;
AzureIoTClient 4:57e049bce51e 1048 // Yay! It's finished. Take it off the in progress list.
AzureIoTClient 4:57e049bce51e 1049 savedFromCurrentListEntry.Flink = currentListEntry->Flink; // We need to save this because it will be stomped on when we remove it from the work in progress.
AzureIoTClient 4:57e049bce51e 1050
AzureIoTClient 4:57e049bce51e 1051 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_068: [Otherwise reclaimEventResources will use the result of IOTHUB_BATCHSTATE_SUCCESS if the proton status was PN_STATUS_ACCEPTED.]*/
AzureIoTClient 4:57e049bce51e 1052 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_069: [Otherwise reclaimEventResources will use the status of IOTHUB_BATCHSTATE_FAILED if the proton status was NOT PN_STATUS_ACCEPTED.] */
AzureIoTClient 4:57e049bce51e 1053 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_070: [reclaimEventResources will invoke IotHubClient_SendBatchComplete with the IOTHUB_CLIENT_LL_HANDLE, supplied to reclaimEventResources, also pass the eventMessages stored in the AMQP_WORK_ITEM, and finally pass the above IOTHUB_BATCH status.]*/
AzureIoTClient 4:57e049bce51e 1054 IoTHubClient_LL_SendComplete(transportState->savedClientHandle, &currentWork->eventMessages, (protonResult == PN_STATUS_ACCEPTED) ? IOTHUB_BATCHSTATE_SUCCESS : IOTHUB_BATCHSTATE_FAILED);
AzureIoTClient 4:57e049bce51e 1055 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_090: [reclaimEventResources will release the tracker by invoking pn_messenger_settle.]*/
AzureIoTClient 4:57e049bce51e 1056 if ((settleResult = pn_messenger_settle(transportState->messenger, currentWork->tracker, 0)) != 0)
AzureIoTClient 4:57e049bce51e 1057 {
AzureIoTClient 4:57e049bce51e 1058 LogError("Attempt to settle a tracker produced an error: %d\r\n", settleResult);
AzureIoTClient 4:57e049bce51e 1059 }
AzureIoTClient 4:57e049bce51e 1060 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_071: [reclaimEventResources will take the current work item from the workInProgress list and insert it on the tail of the availableWorkItems list.]*/
AzureIoTClient 4:57e049bce51e 1061 DList_RemoveEntryList(currentListEntry);
AzureIoTClient 4:57e049bce51e 1062 DList_InsertTailList(&transportState->availableWorkItems, currentListEntry);
AzureIoTClient 4:57e049bce51e 1063 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_072: [reclaimEventResources will continue to iterate over the workInProgress list until it returns to the list head.]*/
AzureIoTClient 4:57e049bce51e 1064 currentListEntry = savedFromCurrentListEntry.Flink;
AzureIoTClient 4:57e049bce51e 1065 }
AzureIoTClient 4:57e049bce51e 1066 }
AzureIoTClient 4:57e049bce51e 1067 }
AzureIoTClient 4:57e049bce51e 1068
AzureIoTClient 4:57e049bce51e 1069 static void sendEvent(PAMQP_TRANSPORT_STATE transportState)
AzureIoTClient 4:57e049bce51e 1070 {
AzureIoTClient 4:57e049bce51e 1071 while (!DList_IsListEmpty(&transportState->availableWorkItems))
AzureIoTClient 4:57e049bce51e 1072 {
AzureIoTClient 4:57e049bce51e 1073 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_073: [While the availableWorkItems is NOT empty loop.] */
AzureIoTClient 4:57e049bce51e 1074 PAMQP_WORK_ITEM headOfAvailable = containingRecord(transportState->availableWorkItems.Flink, AMQP_WORK_ITEM, link);
AzureIoTClient 4:57e049bce51e 1075 size_t payloadSize;
AzureIoTClient 4:57e049bce51e 1076 const unsigned char* payloadAddress;
AzureIoTClient 4:57e049bce51e 1077 amqp_batch_result amqpBatchResult;
AzureIoTClient 4:57e049bce51e 1078 amqpBatchResult = prepareBatch(MAXIMUM_EVENT_LENGTH, transportState->waitingToSend, &headOfAvailable->eventMessages, &payloadSize, &payloadAddress);
AzureIoTClient 4:57e049bce51e 1079 if (amqpBatchResult == amqp_batch_nowork)
AzureIoTClient 4:57e049bce51e 1080 {
AzureIoTClient 4:57e049bce51e 1081 // no work to do. Stop trying.
AzureIoTClient 4:57e049bce51e 1082 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_089: [If there is nothing on the list then go on to the message.]*/
AzureIoTClient 4:57e049bce51e 1083 break;
AzureIoTClient 4:57e049bce51e 1084 }
AzureIoTClient 4:57e049bce51e 1085 else if (amqpBatchResult == amqp_batch_error)
AzureIoTClient 4:57e049bce51e 1086 {
AzureIoTClient 4:57e049bce51e 1087 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_078: [If the event failed call invoke IotHubClient_SendBatchComplete as above utilizing the eventMessages list of the head of the availableWorkItems and a status of IOTHUB_BATCHSTATE_FAILED.]*/
AzureIoTClient 4:57e049bce51e 1088 IoTHubClient_LL_SendComplete(transportState->savedClientHandle, &headOfAvailable->eventMessages, IOTHUB_BATCHSTATE_FAILED);
AzureIoTClient 4:57e049bce51e 1089 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_079: [Continue this send loop.]*/
AzureIoTClient 4:57e049bce51e 1090 }
AzureIoTClient 4:57e049bce51e 1091 else
AzureIoTClient 4:57e049bce51e 1092 {
AzureIoTClient 4:57e049bce51e 1093 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_080: [If the event did not fail then prepare a proton message using the following proton API: pn_message_set_address, pn_message_set_inferred, pn_message_body and pn_data_put_binary with the above saved size and address values.]*/
AzureIoTClient 4:57e049bce51e 1094 if (prepareSimpleProtonMessage(transportState->message, transportState->eventAddress, payloadSize, payloadAddress) == false)
AzureIoTClient 4:57e049bce51e 1095 {
AzureIoTClient 4:57e049bce51e 1096 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_081: [sendEvent will take the item that had been the head of the waitingToSend and put it back at the head of the waitingToSend list.]*/
AzureIoTClient 4:57e049bce51e 1097 putSecondListAfterHeadOfFirst(transportState->waitingToSend, &headOfAvailable->eventMessages);
AzureIoTClient 4:57e049bce51e 1098 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_082: [sendEvent will then break out of the send loop.]*/
AzureIoTClient 4:57e049bce51e 1099 break;
AzureIoTClient 4:57e049bce51e 1100 }
AzureIoTClient 4:57e049bce51e 1101 else if (setProperties(&headOfAvailable->eventMessages, transportState->message) != 0)
AzureIoTClient 4:57e049bce51e 1102 {
AzureIoTClient 4:57e049bce51e 1103 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_081: [sendEvent will take the item that had been the head of the waitingToSend and put it back at the head of the waitingToSend list.]*/
AzureIoTClient 4:57e049bce51e 1104 putSecondListAfterHeadOfFirst(transportState->waitingToSend, &headOfAvailable->eventMessages);
AzureIoTClient 4:57e049bce51e 1105 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_082: [sendEvent will then break out of the send loop.]*/
AzureIoTClient 4:57e049bce51e 1106 break;
AzureIoTClient 4:57e049bce51e 1107 }
AzureIoTClient 5:8d58d20699dd 1108 else if (setSendMessageIds(&headOfAvailable->eventMessages, transportState->message))
AzureIoTClient 5:8d58d20699dd 1109 {
AzureIoTClient 5:8d58d20699dd 1110 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_081: [sendEvent will take the item that had been the head of the waitingToSend and put it back at the head of the waitingToSend list.]*/
AzureIoTClient 5:8d58d20699dd 1111 putSecondListAfterHeadOfFirst(transportState->waitingToSend, &headOfAvailable->eventMessages);
AzureIoTClient 5:8d58d20699dd 1112 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_082: [sendEvent will then break out of the send loop.]*/
AzureIoTClient 5:8d58d20699dd 1113 break;
AzureIoTClient 5:8d58d20699dd 1114 }
AzureIoTClient 4:57e049bce51e 1115 else
AzureIoTClient 4:57e049bce51e 1116 {
AzureIoTClient 4:57e049bce51e 1117 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_083: [If the proton API is successful then invoke pn_messenger_put.]*/
AzureIoTClient 4:57e049bce51e 1118 int protonResult;
AzureIoTClient 4:57e049bce51e 1119 protonResult = pn_messenger_put(transportState->messenger, transportState->message);
AzureIoTClient 4:57e049bce51e 1120 if (protonResult != 0)
AzureIoTClient 4:57e049bce51e 1121 {
AzureIoTClient 4:57e049bce51e 1122 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_084: [If that fails, sendEvent will take the item that had been the head of the waitingToSend and put it back at the head of the waitingToSend list.]*/
AzureIoTClient 4:57e049bce51e 1123 putSecondListAfterHeadOfFirst(transportState->waitingToSend, &headOfAvailable->eventMessages);
AzureIoTClient 4:57e049bce51e 1124 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_085: [sendEvent will then break out of the send loop.]*/
AzureIoTClient 4:57e049bce51e 1125 transportState->messengerInitialized = false;
AzureIoTClient 4:57e049bce51e 1126 break;
AzureIoTClient 4:57e049bce51e 1127 }
AzureIoTClient 4:57e049bce51e 1128 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_184: [The work item will also contain a timeout that shall denote how long the transport will wait before initiating error recovery.]*/
AzureIoTClient 4:57e049bce51e 1129 headOfAvailable->expiry = (size_t)(difftime(get_time(NULL), EPOCH_TIME_T_VALUE) + transportState->eventTimeout);
AzureIoTClient 4:57e049bce51e 1130 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_086: [If the pn_messenger_put succeeds then sendEvent will save off a tracker obtained by invoking pn_messenger_outgoing_tracker and save it the head of the availableWorkItems. The eventMessages in the AMQP_WORK_ITEM head will also contain the IOTHUB_MESSAGE_LIST.]*/
AzureIoTClient 4:57e049bce51e 1131 headOfAvailable->tracker = pn_messenger_outgoing_tracker(transportState->messenger);
AzureIoTClient 4:57e049bce51e 1132 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_087: [The head of the availableWorkItems will be removed and placed on the workInProgress list.]*/
AzureIoTClient 4:57e049bce51e 1133 DList_InsertTailList(&transportState->workInProgress, DList_RemoveHeadList(&transportState->availableWorkItems));
AzureIoTClient 4:57e049bce51e 1134 }
AzureIoTClient 4:57e049bce51e 1135 }
AzureIoTClient 4:57e049bce51e 1136 }
AzureIoTClient 4:57e049bce51e 1137 }
AzureIoTClient 4:57e049bce51e 1138 static void processCBSReply(PAMQP_TRANSPORT_STATE transportState, pn_tracker_t tracker)
AzureIoTClient 4:57e049bce51e 1139 {
AzureIoTClient 4:57e049bce51e 1140 bool goodMessage = false;
AzureIoTClient 4:57e049bce51e 1141 pn_atom_t correlationId;
AzureIoTClient 4:57e049bce51e 1142
AzureIoTClient 4:57e049bce51e 1143 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_112: [The correlationId message property is obtained via a call to pn_message_get_correlation_id.]*/
AzureIoTClient 4:57e049bce51e 1144 correlationId = pn_message_get_correlation_id(transportState->message);
AzureIoTClient 4:57e049bce51e 1145
AzureIoTClient 4:57e049bce51e 1146 //
AzureIoTClient 4:57e049bce51e 1147 // If it isn't the one we're looking for, simply accept it, because we don't want to see it and we can't do anything with it.
AzureIoTClient 4:57e049bce51e 1148 //
AzureIoTClient 4:57e049bce51e 1149
AzureIoTClient 4:57e049bce51e 1150 if (correlationId.u.as_ulong != transportState->sendTokenMessageId)
AzureIoTClient 4:57e049bce51e 1151 {
AzureIoTClient 4:57e049bce51e 1152 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_113: [The id is compared to the state variable sendTokenMessageId. If there is no match, an error is logged, the message is abandoned and processCBSReply returns.]*/
AzureIoTClient 4:57e049bce51e 1153 LogError("An old reply from a CBS renewal drifed in. Moving on\r\n");
AzureIoTClient 4:57e049bce51e 1154 }
AzureIoTClient 4:57e049bce51e 1155 else
AzureIoTClient 4:57e049bce51e 1156 {
AzureIoTClient 4:57e049bce51e 1157 //
AzureIoTClient 4:57e049bce51e 1158 // This is the response we're looking for!
AzureIoTClient 4:57e049bce51e 1159 //
AzureIoTClient 4:57e049bce51e 1160
AzureIoTClient 4:57e049bce51e 1161 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_114: [Obtain the properties of the message by invoking pn_message_properties.]*/
AzureIoTClient 4:57e049bce51e 1162 pn_data_t* propertyData = pn_message_properties(transportState->message);
AzureIoTClient 4:57e049bce51e 1163
AzureIoTClient 4:57e049bce51e 1164 transportState->waitingForPutTokenReply = false;
AzureIoTClient 4:57e049bce51e 1165 if (propertyData == NULL)
AzureIoTClient 4:57e049bce51e 1166 {
AzureIoTClient 4:57e049bce51e 1167 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_115: [If NULL then abandon the message and return.]*/
AzureIoTClient 4:57e049bce51e 1168 LogError("Invalid response back from the CBS - no application properties\r\n");
AzureIoTClient 4:57e049bce51e 1169 }
AzureIoTClient 4:57e049bce51e 1170 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_116: [Advance to the first property of the message via pn_data_next.]*/
AzureIoTClient 4:57e049bce51e 1171 else if (pn_data_next(propertyData) == false)
AzureIoTClient 4:57e049bce51e 1172 {
AzureIoTClient 4:57e049bce51e 1173 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_117: [If this fails, abandon and return.]*/
AzureIoTClient 4:57e049bce51e 1174 LogError("Invalid response back from the CBS - application properties malformed\r\n");
AzureIoTClient 4:57e049bce51e 1175 }
AzureIoTClient 4:57e049bce51e 1176 else
AzureIoTClient 4:57e049bce51e 1177 {
AzureIoTClient 4:57e049bce51e 1178 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_118: [Invoke pn_data_get_map to obtain the number of properties.]*/
AzureIoTClient 4:57e049bce51e 1179 size_t numberOfProperties = pn_data_get_map(propertyData) / 2;
AzureIoTClient 4:57e049bce51e 1180 if (numberOfProperties < 1) // We MUST have at least the status property
AzureIoTClient 4:57e049bce51e 1181 {
AzureIoTClient 4:57e049bce51e 1182 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_119: [If the number of properties is 0 then abandon and return.]*/
AzureIoTClient 4:57e049bce51e 1183 LogError("Invalid response back from the CBS - invalid number of properties: %zu\r\n", pn_data_get_map(propertyData));
AzureIoTClient 4:57e049bce51e 1184 }
AzureIoTClient 4:57e049bce51e 1185 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_120: [Enter the property map by invoking pn_data_enter.]*/
AzureIoTClient 4:57e049bce51e 1186 else if (pn_data_enter(propertyData) == false)
AzureIoTClient 4:57e049bce51e 1187 {
AzureIoTClient 4:57e049bce51e 1188 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_121: [If that fails then abandon and return.]*/
AzureIoTClient 4:57e049bce51e 1189 LogError("Invalid response back from the CBS - unable to access the sent properties\r\n");
AzureIoTClient 4:57e049bce51e 1190 }
AzureIoTClient 4:57e049bce51e 1191 else
AzureIoTClient 4:57e049bce51e 1192 {
AzureIoTClient 4:57e049bce51e 1193 pn_bytes_t propertyName;
AzureIoTClient 4:57e049bce51e 1194 int32_t resultStatus;
AzureIoTClient 4:57e049bce51e 1195 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_122: [Get the property name.]*/
AzureIoTClient 4:57e049bce51e 1196 if (getMapString(propertyData, &propertyName) != 0)
AzureIoTClient 4:57e049bce51e 1197 {
AzureIoTClient 4:57e049bce51e 1198 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_123: [If that fails then abandon and return.]*/
AzureIoTClient 4:57e049bce51e 1199 LogError("Unable to acquire the status value from the cbs request\r\n");
AzureIoTClient 4:57e049bce51e 1200 }
AzureIoTClient 4:57e049bce51e 1201 else if (strcmp(propertyName.start, PROTON_MAP_STATUS_CODE) != 0)
AzureIoTClient 4:57e049bce51e 1202 {
AzureIoTClient 4:57e049bce51e 1203 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_124: [If the property name is not "status-code" than abandon and return.]*/
AzureIoTClient 4:57e049bce51e 1204 LogError("Unknown application property returned: %s\r\n", propertyName.start);
AzureIoTClient 4:57e049bce51e 1205 }
AzureIoTClient 4:57e049bce51e 1206 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_125: [Get the actual status code.]*/
AzureIoTClient 4:57e049bce51e 1207 else if (getMapInt(propertyData, &resultStatus) != 0)
AzureIoTClient 4:57e049bce51e 1208 {
AzureIoTClient 4:57e049bce51e 1209 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_126: [If that fails then abandon and return.]*/
AzureIoTClient 4:57e049bce51e 1210 LogError("Unable to acquire the actual status of the CBS put token\r\n");
AzureIoTClient 4:57e049bce51e 1211 }
AzureIoTClient 4:57e049bce51e 1212 else if (resultStatus != 200)
AzureIoTClient 4:57e049bce51e 1213 {
AzureIoTClient 4:57e049bce51e 1214 pn_bytes_t statusDescription;
AzureIoTClient 4:57e049bce51e 1215 //
AzureIoTClient 4:57e049bce51e 1216 // Yeah, it's not what we want to hear but it's not as though anything was wrong with
AzureIoTClient 4:57e049bce51e 1217 // the message.
AzureIoTClient 4:57e049bce51e 1218 //
AzureIoTClient 4:57e049bce51e 1219 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_127: [If the status code is not equal to 200 then attempt to acquire the explanation from the properties.]*/
AzureIoTClient 4:57e049bce51e 1220 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_128: [Accept the message and return.]*/
AzureIoTClient 4:57e049bce51e 1221 goodMessage = true;
AzureIoTClient 4:57e049bce51e 1222 LogError("The CBS put token operation failed: %d\r\n", resultStatus);
AzureIoTClient 4:57e049bce51e 1223 if (numberOfProperties >= 2)
AzureIoTClient 4:57e049bce51e 1224 {
AzureIoTClient 4:57e049bce51e 1225 //
AzureIoTClient 4:57e049bce51e 1226 // Well we have an error. See if the next property is the status description.
AzureIoTClient 4:57e049bce51e 1227 // If it is, then log the value as further explanation
AzureIoTClient 4:57e049bce51e 1228 //
AzureIoTClient 4:57e049bce51e 1229 if (getMapString(propertyData, &propertyName) == 0)
AzureIoTClient 4:57e049bce51e 1230 {
AzureIoTClient 4:57e049bce51e 1231 if (strcmp(propertyName.start, PROTON_MAP_STATUS_DESCRIPTION) == 0)
AzureIoTClient 4:57e049bce51e 1232 {
AzureIoTClient 4:57e049bce51e 1233 if (getMapString(propertyData, &statusDescription) == 0)
AzureIoTClient 4:57e049bce51e 1234 {
AzureIoTClient 4:57e049bce51e 1235 LogError("The CBS put token operation failed due to: %s\r\n", statusDescription.start);
AzureIoTClient 4:57e049bce51e 1236 }
AzureIoTClient 4:57e049bce51e 1237 }
AzureIoTClient 4:57e049bce51e 1238 }
AzureIoTClient 4:57e049bce51e 1239 }
AzureIoTClient 4:57e049bce51e 1240 }
AzureIoTClient 4:57e049bce51e 1241 else
AzureIoTClient 4:57e049bce51e 1242 {
AzureIoTClient 4:57e049bce51e 1243 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_129: [If the status code was 200 then set the state variable putTokenWasSuccessful, accept the message and return.]*/
AzureIoTClient 4:57e049bce51e 1244 goodMessage = true;
AzureIoTClient 4:57e049bce51e 1245 transportState->putTokenWasSuccessful = true;
AzureIoTClient 4:57e049bce51e 1246 }
AzureIoTClient 4:57e049bce51e 1247 }
AzureIoTClient 4:57e049bce51e 1248 }
AzureIoTClient 4:57e049bce51e 1249 }
AzureIoTClient 4:57e049bce51e 1250 if (goodMessage == true)
AzureIoTClient 4:57e049bce51e 1251 {
AzureIoTClient 4:57e049bce51e 1252 pn_messenger_accept(transportState->messenger, tracker, 0);
AzureIoTClient 4:57e049bce51e 1253 }
AzureIoTClient 4:57e049bce51e 1254 else
AzureIoTClient 4:57e049bce51e 1255 {
AzureIoTClient 4:57e049bce51e 1256 pn_messenger_release(transportState->messenger, tracker, 0);
AzureIoTClient 4:57e049bce51e 1257 }
AzureIoTClient 4:57e049bce51e 1258 pn_messenger_settle(transportState->messenger, tracker, 0);
AzureIoTClient 4:57e049bce51e 1259 }
AzureIoTClient 4:57e049bce51e 1260
AzureIoTClient 4:57e049bce51e 1261 static void processMessage(PAMQP_TRANSPORT_STATE transportState, pn_tracker_t tracker)
AzureIoTClient 4:57e049bce51e 1262 {
AzureIoTClient 4:57e049bce51e 1263 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_111: [If the state variable DoWork_PullMessage is false then the message shall be rejected and processMessage shall return.]*/
AzureIoTClient 4:57e049bce51e 1264 if (transportState->DoWork_PullMessages == false)
AzureIoTClient 4:57e049bce51e 1265 {
AzureIoTClient 4:57e049bce51e 1266 //
AzureIoTClient 4:57e049bce51e 1267 // Nothing to do with the message. We reject it. In that way it can come back later.
AzureIoTClient 4:57e049bce51e 1268 //
AzureIoTClient 4:57e049bce51e 1269 pn_messenger_reject(transportState->messenger, tracker, 0);
AzureIoTClient 4:57e049bce51e 1270 pn_messenger_settle(transportState->messenger, tracker, 0);
AzureIoTClient 4:57e049bce51e 1271 }
AzureIoTClient 4:57e049bce51e 1272 else
AzureIoTClient 4:57e049bce51e 1273 {
AzureIoTClient 4:57e049bce51e 1274 pn_data_t *body;
AzureIoTClient 4:57e049bce51e 1275
AzureIoTClient 4:57e049bce51e 1276 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_050: [processMessage will acquire the body of the message by invoking pn_message_body.]*/
AzureIoTClient 4:57e049bce51e 1277 if ((body = pn_message_body(transportState->message)) == NULL)
AzureIoTClient 4:57e049bce51e 1278 {
AzureIoTClient 4:57e049bce51e 1279 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_051: [If the body is NULL then the message will be abandoned.]*/
AzureIoTClient 4:57e049bce51e 1280 LogError("Failed to get the proton message body\r\n");
AzureIoTClient 4:57e049bce51e 1281 pn_messenger_release(transportState->messenger, tracker, 0);
AzureIoTClient 4:57e049bce51e 1282 pn_messenger_settle(transportState->messenger, tracker, 0);
AzureIoTClient 4:57e049bce51e 1283 }
AzureIoTClient 4:57e049bce51e 1284 else
AzureIoTClient 4:57e049bce51e 1285 {
AzureIoTClient 4:57e049bce51e 1286 pn_bytes_t sizeAndData;
AzureIoTClient 4:57e049bce51e 1287 bool keepProcessingThisMessage = true;
AzureIoTClient 4:57e049bce51e 1288 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_054: [processMessage will check for a null (note the lowercase) body by invoking pn_data_next.]*/
AzureIoTClient 4:57e049bce51e 1289 if (pn_data_next(body) == false)
AzureIoTClient 4:57e049bce51e 1290 {
AzureIoTClient 4:57e049bce51e 1291 //
AzureIoTClient 4:57e049bce51e 1292 // We have a null body. This is reasonable. It should also be pointed out that
AzureIoTClient 4:57e049bce51e 1293 // pn_data_next call shouldn't be thought of as checking for the existence of something.
AzureIoTClient 4:57e049bce51e 1294 // It should be thought of as actually MOVING through a tree structure in the message.
AzureIoTClient 4:57e049bce51e 1295 //
AzureIoTClient 4:57e049bce51e 1296 sizeAndData.size = 0;
AzureIoTClient 4:57e049bce51e 1297 sizeAndData.start = NULL;
AzureIoTClient 4:57e049bce51e 1298 }
AzureIoTClient 4:57e049bce51e 1299 else
AzureIoTClient 4:57e049bce51e 1300 {
AzureIoTClient 4:57e049bce51e 1301 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_055: [If the body is NOT null then processMessage will check that the body is of type PN_BINARY.]*/
AzureIoTClient 4:57e049bce51e 1302 if (PN_BINARY != pn_data_type(body))
AzureIoTClient 4:57e049bce51e 1303 {
AzureIoTClient 4:57e049bce51e 1304 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_056: [processMessage will abandon the message with body types that are NOT PN_BINARY, the IOTHUB_MESSAGE_HANDLE will be destroyed and the loop will be continued.]*/
AzureIoTClient 4:57e049bce51e 1305 LogError("Message received via AMQP was not PN_BINARY\r\n");
AzureIoTClient 4:57e049bce51e 1306 sizeAndData.size = 0; // Compilers get upset.
AzureIoTClient 4:57e049bce51e 1307 sizeAndData.start = NULL;
AzureIoTClient 4:57e049bce51e 1308 pn_messenger_release(transportState->messenger, tracker, 0);
AzureIoTClient 4:57e049bce51e 1309 pn_messenger_settle(transportState->messenger, tracker, 0);
AzureIoTClient 4:57e049bce51e 1310 keepProcessingThisMessage = false;
AzureIoTClient 4:57e049bce51e 1311 }
AzureIoTClient 4:57e049bce51e 1312 else
AzureIoTClient 4:57e049bce51e 1313 {
AzureIoTClient 4:57e049bce51e 1314 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_057: [The length and a pointer to the payload will be acquired and saved by invoking pn_data_get_binary.]*/
AzureIoTClient 4:57e049bce51e 1315 sizeAndData = pn_data_get_binary(body);
AzureIoTClient 4:57e049bce51e 1316 }
AzureIoTClient 4:57e049bce51e 1317 }
AzureIoTClient 4:57e049bce51e 1318 if (keepProcessingThisMessage == true)
AzureIoTClient 4:57e049bce51e 1319 {
AzureIoTClient 4:57e049bce51e 1320 IOTHUB_MESSAGE_HANDLE ioTMessage;
AzureIoTClient 4:57e049bce51e 1321 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_058: [The IOTHUB_MESSAGE_HANDLE will be set by invoking IotHubMessage_SetData with the previously saved length and pointer to payload.]*/
AzureIoTClient 4:57e049bce51e 1322 if ((ioTMessage = IoTHubMessage_CreateFromByteArray((const unsigned char*)sizeAndData.start, sizeAndData.size)) == NULL)
AzureIoTClient 4:57e049bce51e 1323 {
AzureIoTClient 4:57e049bce51e 1324 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_059: [processMessage will abandon the message if IotHubMessage_SetData returns any status other than IOTHUB_MESSAGE_OK.]*/
AzureIoTClient 4:57e049bce51e 1325 LogError("Failed to set data for IoT hub message\r\n");
AzureIoTClient 4:57e049bce51e 1326 pn_messenger_release(transportState->messenger, tracker, 0);
AzureIoTClient 4:57e049bce51e 1327 pn_messenger_settle(transportState->messenger, tracker, 0);
AzureIoTClient 4:57e049bce51e 1328 }
AzureIoTClient 4:57e049bce51e 1329 else if (cloneProperties(ioTMessage, transportState->message) != 0)
AzureIoTClient 4:57e049bce51e 1330 {
AzureIoTClient 5:8d58d20699dd 1331 LogError("Failed to clone properties for IoT hub message\r\n");
AzureIoTClient 5:8d58d20699dd 1332 pn_messenger_release(transportState->messenger, tracker, 0);
AzureIoTClient 5:8d58d20699dd 1333 pn_messenger_settle(transportState->messenger, tracker, 0);
AzureIoTClient 5:8d58d20699dd 1334 }
AzureIoTClient 5:8d58d20699dd 1335 /* Codes_SRS_IOTHUBTRANSPORTTAMQP_07_003: [If the pn_message_get_id value is not NULL then the value will be set by calling IotHubMessage_SetMessageId.] */
AzureIoTClient 5:8d58d20699dd 1336 else if (setRecvMessageIds(ioTMessage, transportState->message) != 0)
AzureIoTClient 5:8d58d20699dd 1337 {
AzureIoTClient 5:8d58d20699dd 1338 LogError("Failed to set MessageId for IoT hub message\r\n");
AzureIoTClient 4:57e049bce51e 1339 pn_messenger_release(transportState->messenger, tracker, 0);
AzureIoTClient 4:57e049bce51e 1340 pn_messenger_settle(transportState->messenger, tracker, 0);
AzureIoTClient 4:57e049bce51e 1341 }
AzureIoTClient 4:57e049bce51e 1342 else
AzureIoTClient 4:57e049bce51e 1343 {
AzureIoTClient 4:57e049bce51e 1344 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_060: [If IOTHUB_MESSAGE_OK had been returned IoTHubClient_LL_MessageCallback will be invoked with the IOTHUB_MESSAGE_HANDLE.]*/
AzureIoTClient 4:57e049bce51e 1345 IOTHUBMESSAGE_DISPOSITION_RESULT upperLayerDisposition = IoTHubClient_LL_MessageCallback(transportState->savedClientHandle, ioTMessage);
AzureIoTClient 4:57e049bce51e 1346
AzureIoTClient 4:57e049bce51e 1347 if (upperLayerDisposition == IOTHUBMESSAGE_ACCEPTED)
AzureIoTClient 4:57e049bce51e 1348 {
AzureIoTClient 4:57e049bce51e 1349 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_006: [If IoTHubClient_MessageCallback returns IOTHUBMESSAGE_ACCEPTED value then the message will be accepted.] */
AzureIoTClient 4:57e049bce51e 1350 pn_messenger_accept(transportState->messenger, tracker, 0);
AzureIoTClient 4:57e049bce51e 1351 }
AzureIoTClient 4:57e049bce51e 1352 else if (upperLayerDisposition == IOTHUBMESSAGE_ABANDONED)
AzureIoTClient 4:57e049bce51e 1353 {
AzureIoTClient 4:57e049bce51e 1354 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_007: [If IoTHubClient_MessageCallback returns IOTHUBMESSAGE_ABANDONED value then the message will be abandoned.] */
AzureIoTClient 4:57e049bce51e 1355 pn_messenger_release(transportState->messenger, tracker, 0);
AzureIoTClient 4:57e049bce51e 1356 }
AzureIoTClient 4:57e049bce51e 1357 else
AzureIoTClient 4:57e049bce51e 1358 {
AzureIoTClient 4:57e049bce51e 1359 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_061: [If IoTHubClient_MessageCallback returns IOTHUBMESSAGE_REJECTED value then the message will be rejected.]*/
AzureIoTClient 4:57e049bce51e 1360 pn_messenger_reject(transportState->messenger, tracker, 0);
AzureIoTClient 4:57e049bce51e 1361 }
AzureIoTClient 4:57e049bce51e 1362 pn_messenger_settle(transportState->messenger, tracker, 0);
AzureIoTClient 4:57e049bce51e 1363 }
AzureIoTClient 4:57e049bce51e 1364 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_063: [processMessage will destroy the IOTHUB_MESSAGE_HANDLE by invoking IoTHubMessage_Destroy.]*/
AzureIoTClient 4:57e049bce51e 1365 IoTHubMessage_Destroy(ioTMessage);
AzureIoTClient 4:57e049bce51e 1366 }
AzureIoTClient 4:57e049bce51e 1367 }
AzureIoTClient 4:57e049bce51e 1368 }
AzureIoTClient 4:57e049bce51e 1369 }
AzureIoTClient 4:57e049bce51e 1370
AzureIoTClient 4:57e049bce51e 1371 static void processReceives(TRANSPORT_HANDLE handle)
AzureIoTClient 4:57e049bce51e 1372 {
AzureIoTClient 4:57e049bce51e 1373 PAMQP_TRANSPORT_STATE transportState = (PAMQP_TRANSPORT_STATE)handle;
AzureIoTClient 4:57e049bce51e 1374 int receiveResult;
AzureIoTClient 4:57e049bce51e 1375
AzureIoTClient 4:57e049bce51e 1376 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_043: [If DoWork_PullMessage is false then DoWork will cancel any preceding messages by invoking pn_messenger_recv.]*/
AzureIoTClient 4:57e049bce51e 1377
AzureIoTClient 4:57e049bce51e 1378 if ((transportState->DoWork_PullMessages == false) && (transportState->waitingForPutTokenReply == false))
AzureIoTClient 4:57e049bce51e 1379 {
AzureIoTClient 4:57e049bce51e 1380 if ((receiveResult = pn_messenger_recv(transportState->messenger, 0)) != 0)
AzureIoTClient 4:57e049bce51e 1381 {
AzureIoTClient 4:57e049bce51e 1382 if ((receiveResult != PN_INPROGRESS) && (receiveResult != PN_TIMEOUT))
AzureIoTClient 4:57e049bce51e 1383 {
AzureIoTClient 4:57e049bce51e 1384 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_046: [If pn_messenger_recv fails it will be logged.]*/
AzureIoTClient 4:57e049bce51e 1385 LogError("Error attempting to receive messages: %d\r\n", receiveResult);
AzureIoTClient 4:57e049bce51e 1386 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_105: [processReceives will then go on to the next action of DoWork.]*/
AzureIoTClient 4:57e049bce51e 1387 transportState->messengerInitialized = false;
AzureIoTClient 4:57e049bce51e 1388 }
AzureIoTClient 4:57e049bce51e 1389 }
AzureIoTClient 4:57e049bce51e 1390 }
AzureIoTClient 4:57e049bce51e 1391 else
AzureIoTClient 4:57e049bce51e 1392 {
AzureIoTClient 4:57e049bce51e 1393 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_044: [If the DoWork_PullMessage or the waitingForPutTokenReply flag is true then processReceives will pull messages with pn_messenger_recv.]*/
AzureIoTClient 4:57e049bce51e 1394 if ((receiveResult = pn_messenger_recv(transportState->messenger, PROTON_INCOMING_WINDOW_SIZE)) != 0)
AzureIoTClient 4:57e049bce51e 1395 {
AzureIoTClient 4:57e049bce51e 1396 if ((receiveResult != PN_INPROGRESS) && (receiveResult != PN_TIMEOUT))
AzureIoTClient 4:57e049bce51e 1397 {
AzureIoTClient 4:57e049bce51e 1398 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_046: [If pn_messenger_recv fails it will be logged.]*/
AzureIoTClient 4:57e049bce51e 1399 LogError("Error attempting to receive messages: %d\r\n", receiveResult);
AzureIoTClient 4:57e049bce51e 1400 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_105: [processReceives will then go on to the next action of DoWork.]*/
AzureIoTClient 4:57e049bce51e 1401 transportState->messengerInitialized = false;
AzureIoTClient 4:57e049bce51e 1402 }
AzureIoTClient 4:57e049bce51e 1403 }
AzureIoTClient 4:57e049bce51e 1404 else
AzureIoTClient 4:57e049bce51e 1405 {
AzureIoTClient 4:57e049bce51e 1406 //
AzureIoTClient 4:57e049bce51e 1407 // If any are present we need to dispose of messages in the incoming queue.
AzureIoTClient 4:57e049bce51e 1408 //
AzureIoTClient 4:57e049bce51e 1409 size_t depthOfReceiveQueue;
AzureIoTClient 4:57e049bce51e 1410 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_045: [processReceives will check the number of messages received by invoking pn_messenger_incoming.]*/
AzureIoTClient 4:57e049bce51e 1411 for (depthOfReceiveQueue = pn_messenger_incoming(transportState->messenger); depthOfReceiveQueue > 0; depthOfReceiveQueue--)
AzureIoTClient 4:57e049bce51e 1412 {
AzureIoTClient 4:57e049bce51e 1413 int result;
AzureIoTClient 4:57e049bce51e 1414 pn_message_clear(transportState->message);
AzureIoTClient 4:57e049bce51e 1415 //
AzureIoTClient 4:57e049bce51e 1416 // Ok something is there. First thing is to get it.
AzureIoTClient 4:57e049bce51e 1417 //
AzureIoTClient 4:57e049bce51e 1418 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_047: [processReceives will retrieve a message with pn_messenger_get.]*/
AzureIoTClient 4:57e049bce51e 1419 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_048: [If pn_messenger_get fails we log and break the receive loop.]*/
AzureIoTClient 4:57e049bce51e 1420 if ((result = pn_messenger_get(transportState->messenger, transportState->message)) != 0)
AzureIoTClient 4:57e049bce51e 1421 {
AzureIoTClient 4:57e049bce51e 1422 //
AzureIoTClient 4:57e049bce51e 1423 // Well that's pretty odd. We shouldn't have gotten here if the message queue was empty!
AzureIoTClient 4:57e049bce51e 1424 //
AzureIoTClient 4:57e049bce51e 1425 LogError("Message reception queue unexpectedly empty! Code: %d\r\n", result);
AzureIoTClient 4:57e049bce51e 1426 break;
AzureIoTClient 4:57e049bce51e 1427 }
AzureIoTClient 4:57e049bce51e 1428 else
AzureIoTClient 4:57e049bce51e 1429 {
AzureIoTClient 4:57e049bce51e 1430 //
AzureIoTClient 4:57e049bce51e 1431 // We'll need a tracker to dispose of the message. We need the subscription to determine who should process it.
AzureIoTClient 4:57e049bce51e 1432 //
AzureIoTClient 4:57e049bce51e 1433 pn_subscription_t* theMessageSource;
AzureIoTClient 4:57e049bce51e 1434 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_049: [processReceives shall acquire a tracker by invoking pn_messenger_incoming_tracker.]*/
AzureIoTClient 4:57e049bce51e 1435 pn_tracker_t tracker = pn_messenger_incoming_tracker(transportState->messenger);
AzureIoTClient 4:57e049bce51e 1436
AzureIoTClient 4:57e049bce51e 1437 //
AzureIoTClient 4:57e049bce51e 1438 // Currently we can receive two types of messages. We can recieve status messages from the $CBS that indicates the success
AzureIoTClient 4:57e049bce51e 1439 // or failure of renewing the SAS for the device.
AzureIoTClient 4:57e049bce51e 1440 //
AzureIoTClient 4:57e049bce51e 1441 // Otherwise we can recieve a simple message. We can get a pointer to the subscription from the tracker. We can compare
AzureIoTClient 4:57e049bce51e 1442 // that pointer to the subscriptions that we stored when we initialized the messenger. We then dispatch appropriately.
AzureIoTClient 4:57e049bce51e 1443 //
AzureIoTClient 4:57e049bce51e 1444
AzureIoTClient 4:57e049bce51e 1445 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_106: [processReceives acquires the subscription of the message by invoking pn_messneger_incoming_subscription.] This value is henceforth known as theMessageSource.*/
AzureIoTClient 4:57e049bce51e 1446 theMessageSource = pn_messenger_incoming_subscription(transportState->messenger);
AzureIoTClient 4:57e049bce51e 1447 if (theMessageSource == transportState->messageSubscription)
AzureIoTClient 4:57e049bce51e 1448 {
AzureIoTClient 4:57e049bce51e 1449 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_108: [If theMessageSource is equal to the state variable cbsSubscription invoke processCBSReply.]*/
AzureIoTClient 4:57e049bce51e 1450 processMessage(transportState, tracker);
AzureIoTClient 4:57e049bce51e 1451 }
AzureIoTClient 4:57e049bce51e 1452 else if (theMessageSource == transportState->cbsSubscription)
AzureIoTClient 4:57e049bce51e 1453 {
AzureIoTClient 4:57e049bce51e 1454 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_108: [If theMessageSource is equal to the state variable cbsSubscription invoke processCBSReply.]*/
AzureIoTClient 4:57e049bce51e 1455 processCBSReply(transportState, tracker);
AzureIoTClient 4:57e049bce51e 1456 }
AzureIoTClient 4:57e049bce51e 1457 else
AzureIoTClient 4:57e049bce51e 1458 {
AzureIoTClient 4:57e049bce51e 1459 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_109: [Otherwise, log an error and abandon the message so that it never returns.]*/
AzureIoTClient 4:57e049bce51e 1460 LogError("Message received from an unknown sender\r\n");
AzureIoTClient 4:57e049bce51e 1461 pn_messenger_release(transportState->messenger, tracker, 0);
AzureIoTClient 4:57e049bce51e 1462 pn_messenger_settle(transportState->messenger, tracker, 0);
AzureIoTClient 4:57e049bce51e 1463 }
AzureIoTClient 4:57e049bce51e 1464 }
AzureIoTClient 4:57e049bce51e 1465 }
AzureIoTClient 4:57e049bce51e 1466 pn_message_clear(transportState->message);
AzureIoTClient 4:57e049bce51e 1467 }
AzureIoTClient 4:57e049bce51e 1468 }
AzureIoTClient 4:57e049bce51e 1469 }
AzureIoTClient 4:57e049bce51e 1470
AzureIoTClient 4:57e049bce51e 1471 static void clientTransportAMQP_DoWork(TRANSPORT_HANDLE handle, IOTHUB_CLIENT_LL_HANDLE iotHubClientHandle)
AzureIoTClient 4:57e049bce51e 1472 {
AzureIoTClient 4:57e049bce51e 1473 PAMQP_TRANSPORT_STATE transportState = (PAMQP_TRANSPORT_STATE)handle;
AzureIoTClient 4:57e049bce51e 1474
AzureIoTClient 4:57e049bce51e 1475 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_019: [clientTransportAMQP_Dowork shall do no work if handle is NULL.]*/
AzureIoTClient 4:57e049bce51e 1476 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_020: [clientTransportAMQP_Dowork shall do no work if iotHubClientHandle is NULL.]*/
AzureIoTClient 4:57e049bce51e 1477 if (transportState && iotHubClientHandle)
AzureIoTClient 4:57e049bce51e 1478 {
AzureIoTClient 4:57e049bce51e 1479 transportState->savedClientHandle = iotHubClientHandle;
AzureIoTClient 4:57e049bce51e 1480 if (protonMessengerInit(transportState))
AzureIoTClient 4:57e049bce51e 1481 {
AzureIoTClient 4:57e049bce51e 1482 renewIfNecessaryTheCBS(transportState);
AzureIoTClient 4:57e049bce51e 1483 if (transportState->messengerInitialized == true) reclaimEventResources(transportState);
AzureIoTClient 4:57e049bce51e 1484 if (transportState->messengerInitialized == true) sendEvent(transportState);
AzureIoTClient 4:57e049bce51e 1485 if (transportState->messengerInitialized == true) processReceives(transportState);
AzureIoTClient 4:57e049bce51e 1486
AzureIoTClient 4:57e049bce51e 1487 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_088: [pn_messenger_work shall be invoked following all the above actions.]*/
AzureIoTClient 4:57e049bce51e 1488 if (transportState->messengerInitialized == true) checkForErrorsThenWork(transportState);
AzureIoTClient 4:57e049bce51e 1489 }
AzureIoTClient 4:57e049bce51e 1490
AzureIoTClient 4:57e049bce51e 1491 //
AzureIoTClient 4:57e049bce51e 1492 // Try to get rid of any old messengers that we were NOT able to stop.
AzureIoTClient 4:57e049bce51e 1493 //
AzureIoTClient 4:57e049bce51e 1494 herdTheMessengers(transportState, false);
AzureIoTClient 4:57e049bce51e 1495 }
AzureIoTClient 4:57e049bce51e 1496 return;
AzureIoTClient 4:57e049bce51e 1497 }
AzureIoTClient 4:57e049bce51e 1498
AzureIoTClient 4:57e049bce51e 1499 static bool createUrledDeviceId(PAMQP_TRANSPORT_STATE state, const IOTHUBTRANSPORT_CONFIG* config)
AzureIoTClient 4:57e049bce51e 1500 {
AzureIoTClient 4:57e049bce51e 1501 return ((state->urledDeviceId = URL_EncodeString(config->upperConfig->deviceId)) == NULL) ? (false) : (true);
AzureIoTClient 4:57e049bce51e 1502 }
AzureIoTClient 4:57e049bce51e 1503 static bool createDevicesPortionPath(PAMQP_TRANSPORT_STATE state, const char* host)
AzureIoTClient 4:57e049bce51e 1504 {
AzureIoTClient 4:57e049bce51e 1505 return (((state->devicesPortionPath = STRING_construct(host)) == NULL) ||
AzureIoTClient 4:57e049bce51e 1506 (STRING_concat(state->devicesPortionPath, "/devices/") != 0) ||
AzureIoTClient 4:57e049bce51e 1507 (STRING_concat_with_STRING(state->devicesPortionPath, state->urledDeviceId) != 0)) ? (false) : (true);
AzureIoTClient 4:57e049bce51e 1508 }
AzureIoTClient 4:57e049bce51e 1509 static bool createEventAddress(PAMQP_TRANSPORT_STATE state)
AzureIoTClient 4:57e049bce51e 1510 {
AzureIoTClient 4:57e049bce51e 1511 return (((state->eventAddress = STRING_construct(AMQPS_SCHEME)) == NULL) ||
AzureIoTClient 4:57e049bce51e 1512 (STRING_concat_with_STRING(state->eventAddress, state->devicesPortionPath) != 0) ||
AzureIoTClient 4:57e049bce51e 1513 (STRING_concat(state->eventAddress, EVENT_ENDPOINT) != 0)) ? (false) : (true);
AzureIoTClient 4:57e049bce51e 1514 }
AzureIoTClient 4:57e049bce51e 1515
AzureIoTClient 4:57e049bce51e 1516 static bool createMessageAddress(PAMQP_TRANSPORT_STATE state)
AzureIoTClient 4:57e049bce51e 1517 {
AzureIoTClient 4:57e049bce51e 1518 return (((state->messageAddress = STRING_construct(AMQPS_SCHEME)) == NULL) ||
AzureIoTClient 4:57e049bce51e 1519 (STRING_concat_with_STRING(state->messageAddress, state->devicesPortionPath) != 0) ||
AzureIoTClient 4:57e049bce51e 1520 (STRING_concat(state->messageAddress, MESSAGE_ENDPOINT) != 0)) ? (false) : (true);
AzureIoTClient 4:57e049bce51e 1521 }
AzureIoTClient 4:57e049bce51e 1522
AzureIoTClient 4:57e049bce51e 1523 static bool createCbsAddress(PAMQP_TRANSPORT_STATE state, const char* host)
AzureIoTClient 4:57e049bce51e 1524 {
AzureIoTClient 4:57e049bce51e 1525 return (((state->cbsAddress = STRING_construct(AMQPS_SCHEME)) == NULL) ||
AzureIoTClient 4:57e049bce51e 1526 (STRING_concat(state->cbsAddress,host) != 0) ||
AzureIoTClient 4:57e049bce51e 1527 (STRING_concat(state->cbsAddress, CBS_ENDPOINT) != 0)) ? (false) : (true);
AzureIoTClient 4:57e049bce51e 1528 }
AzureIoTClient 4:57e049bce51e 1529
AzureIoTClient 4:57e049bce51e 1530 static STRING_HANDLE createHost(const IOTHUBTRANSPORT_CONFIG* config)
AzureIoTClient 4:57e049bce51e 1531 {
AzureIoTClient 4:57e049bce51e 1532 STRING_HANDLE result = NULL;
AzureIoTClient 4:57e049bce51e 1533 if (((result = STRING_construct(config->upperConfig->iotHubName)) == NULL) ||
AzureIoTClient 4:57e049bce51e 1534 (STRING_concat(result, ".") != 0) ||
AzureIoTClient 4:57e049bce51e 1535 (STRING_concat(result, config->upperConfig->iotHubSuffix) != 0))
AzureIoTClient 4:57e049bce51e 1536 {
AzureIoTClient 4:57e049bce51e 1537 if (result != NULL)
AzureIoTClient 4:57e049bce51e 1538 {
AzureIoTClient 4:57e049bce51e 1539 STRING_delete(result);
AzureIoTClient 4:57e049bce51e 1540 result = NULL;
AzureIoTClient 4:57e049bce51e 1541 }
AzureIoTClient 4:57e049bce51e 1542 }
AzureIoTClient 4:57e049bce51e 1543 return result;
AzureIoTClient 4:57e049bce51e 1544 }
AzureIoTClient 4:57e049bce51e 1545
AzureIoTClient 4:57e049bce51e 1546 static TRANSPORT_HANDLE clientTransportAMQP_Create(const IOTHUBTRANSPORT_CONFIG* config)
AzureIoTClient 4:57e049bce51e 1547 {
AzureIoTClient 4:57e049bce51e 1548 PAMQP_TRANSPORT_STATE amqpState = NULL;
AzureIoTClient 4:57e049bce51e 1549 if (!config)
AzureIoTClient 4:57e049bce51e 1550 {
AzureIoTClient 4:57e049bce51e 1551 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_001: [If parameter config is NULL then clientTransportAMQP_Create shall fail and return NULL.]*/
AzureIoTClient 4:57e049bce51e 1552 LogError("IoTHub amqp client tranport null configuration parameter.\r\n");
AzureIoTClient 4:57e049bce51e 1553 }
AzureIoTClient 4:57e049bce51e 1554 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_002: [clientTransportAMQP_Create shall fail and return NULL if any other fields of the config structure are NULL.]*/
AzureIoTClient 4:57e049bce51e 1555 else if (config->upperConfig->protocol == NULL)
AzureIoTClient 4:57e049bce51e 1556 {
AzureIoTClient 4:57e049bce51e 1557 LogError("invalid configuration (NULL protocol detected)\r\n");
AzureIoTClient 4:57e049bce51e 1558 }
AzureIoTClient 4:57e049bce51e 1559 else if (config->upperConfig->deviceId == NULL)
AzureIoTClient 4:57e049bce51e 1560 {
AzureIoTClient 4:57e049bce51e 1561 LogError("invalid configuration (NULL deviceId detected)\r\n");
AzureIoTClient 4:57e049bce51e 1562 }
AzureIoTClient 4:57e049bce51e 1563 else if (config->upperConfig->deviceKey == NULL)
AzureIoTClient 4:57e049bce51e 1564 {
AzureIoTClient 4:57e049bce51e 1565 LogError("invalid configuration (NULL deviceKey detected)\r\n");
AzureIoTClient 4:57e049bce51e 1566 }
AzureIoTClient 4:57e049bce51e 1567 else if (config->upperConfig->iotHubName == NULL)
AzureIoTClient 4:57e049bce51e 1568 {
AzureIoTClient 4:57e049bce51e 1569 LogError("invalid configuration (NULL iotHubName detected)\r\n");
AzureIoTClient 4:57e049bce51e 1570 }
AzureIoTClient 4:57e049bce51e 1571 else if (config->upperConfig->iotHubSuffix == NULL)
AzureIoTClient 4:57e049bce51e 1572 {
AzureIoTClient 4:57e049bce51e 1573 LogError("invalid configuration (NULL iotHubSuffix detected)\r\n");
AzureIoTClient 4:57e049bce51e 1574 }
AzureIoTClient 4:57e049bce51e 1575 else if (!config->waitingToSend)
AzureIoTClient 4:57e049bce51e 1576 {
AzureIoTClient 4:57e049bce51e 1577 LogError("invalid configuration (NULL waitingToSend list detected)\r\n");
AzureIoTClient 4:57e049bce51e 1578 }
AzureIoTClient 4:57e049bce51e 1579 else if (strlen(config->upperConfig->deviceId) > 128U)
AzureIoTClient 4:57e049bce51e 1580 {
AzureIoTClient 4:57e049bce51e 1581 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_016: [clientTransportAMQP_Create shall fail and return NULL if the deviceId length is greater than 128.]*/
AzureIoTClient 4:57e049bce51e 1582 LogError("deviceName is too long\r\n");
AzureIoTClient 4:57e049bce51e 1583 }
AzureIoTClient 4:57e049bce51e 1584 else if ((strlen(config->upperConfig->deviceId) == 0) ||
AzureIoTClient 4:57e049bce51e 1585 (strlen(config->upperConfig->deviceKey) == 0) ||
AzureIoTClient 4:57e049bce51e 1586 (strlen(config->upperConfig->iotHubName) == 0) ||
AzureIoTClient 4:57e049bce51e 1587 (strlen(config->upperConfig->iotHubSuffix) == 0))
AzureIoTClient 4:57e049bce51e 1588 {
AzureIoTClient 4:57e049bce51e 1589 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_017: [clientTransportAMQP_Create shall fail and return NULL if any string config field is zero length.]*/
AzureIoTClient 4:57e049bce51e 1590 LogError("zero length config parameter\r\n");
AzureIoTClient 4:57e049bce51e 1591 }
AzureIoTClient 4:57e049bce51e 1592 else
AzureIoTClient 4:57e049bce51e 1593 {
AzureIoTClient 4:57e049bce51e 1594 amqpState = malloc(sizeof(AMQP_TRANSPORT_STATE));
AzureIoTClient 4:57e049bce51e 1595 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_003: [clientTransportAMQP_Create shall fail and return NULL if memory allocation of the transports basic internal state structures fails.]*/
AzureIoTClient 4:57e049bce51e 1596 if (amqpState == NULL)
AzureIoTClient 4:57e049bce51e 1597 {
AzureIoTClient 4:57e049bce51e 1598 LogError("Could not allocate AMQP transport state\r\n");
AzureIoTClient 4:57e049bce51e 1599 }
AzureIoTClient 4:57e049bce51e 1600 else
AzureIoTClient 4:57e049bce51e 1601 {
AzureIoTClient 4:57e049bce51e 1602 amqpState->messageAddress = NULL;
AzureIoTClient 4:57e049bce51e 1603 amqpState->eventAddress = NULL;
AzureIoTClient 4:57e049bce51e 1604 amqpState->urledDeviceId = NULL;
AzureIoTClient 4:57e049bce51e 1605 amqpState->devicesPortionPath = NULL;
AzureIoTClient 4:57e049bce51e 1606 amqpState->cbsAddress = NULL;
AzureIoTClient 4:57e049bce51e 1607 amqpState->deviceKey = NULL;
AzureIoTClient 4:57e049bce51e 1608 amqpState->savedClientHandle = NULL;
AzureIoTClient 4:57e049bce51e 1609 amqpState->messenger = NULL;
AzureIoTClient 4:57e049bce51e 1610 amqpState->messageSubscription = NULL;
AzureIoTClient 4:57e049bce51e 1611 amqpState->cbsSubscription = NULL;
AzureIoTClient 4:57e049bce51e 1612 amqpState->zeroLengthString = NULL;
AzureIoTClient 4:57e049bce51e 1613 amqpState->messengerInitialized = false;
AzureIoTClient 4:57e049bce51e 1614 amqpState->waitingForPutTokenReply = false;
AzureIoTClient 4:57e049bce51e 1615 amqpState->cbsRequestAcceptTime = CBS_DEFAULT_REQUEST_ACCEPT_TIME;
AzureIoTClient 4:57e049bce51e 1616 amqpState->cbsReplyTime = CBS_DEFAULT_REPLY_TIME;
AzureIoTClient 4:57e049bce51e 1617 amqpState->sasTokenLifetime = SAS_TOKEN_DEFAULT_LIFETIME;
AzureIoTClient 4:57e049bce51e 1618 amqpState->sasRefreshLine = SAS_TOKEN_DEFAULT_REFRESH_LINE;
AzureIoTClient 4:57e049bce51e 1619 amqpState->eventTimeout = EVENT_TIMEOUT_DEFAULT;
AzureIoTClient 4:57e049bce51e 1620 amqpState->waitingToSend = config->waitingToSend;
AzureIoTClient 4:57e049bce51e 1621 DList_InitializeListHead(&amqpState->workInProgress);
AzureIoTClient 4:57e049bce51e 1622 DList_InitializeListHead(&amqpState->availableWorkItems);
AzureIoTClient 4:57e049bce51e 1623 DList_InitializeListHead(&amqpState->messengerCorral);
AzureIoTClient 4:57e049bce51e 1624 amqpState->DoWork_PullMessages = false;
AzureIoTClient 5:8d58d20699dd 1625 amqpState->trustedCertificates = NULL;
AzureIoTClient 4:57e049bce51e 1626 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_003: [clientTransportAMQP_Create shall fail and return NULL if memory allocation of the transports basic internal state structures fails.]*/
AzureIoTClient 4:57e049bce51e 1627 if ((amqpState->message = pn_message()) == NULL)
AzureIoTClient 4:57e049bce51e 1628 {
AzureIoTClient 4:57e049bce51e 1629 LogError("Unable to preallocate the proton message!\r\n");
AzureIoTClient 4:57e049bce51e 1630 clientTransportAMQP_Destroy(amqpState);
AzureIoTClient 4:57e049bce51e 1631 amqpState = NULL;
AzureIoTClient 4:57e049bce51e 1632 }
AzureIoTClient 4:57e049bce51e 1633 else
AzureIoTClient 4:57e049bce51e 1634 {
AzureIoTClient 4:57e049bce51e 1635 size_t workItemsToPreallocate;
AzureIoTClient 4:57e049bce51e 1636 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_003: [clientTransportAMQP_Create shall fail and return NULL if memory allocation of the transports basic internal state structures fails.]*/
AzureIoTClient 4:57e049bce51e 1637 for (workItemsToPreallocate = PROTON_EVENT_OUTGOING_WINDOW_SIZE; workItemsToPreallocate > 0; workItemsToPreallocate--)
AzureIoTClient 4:57e049bce51e 1638 {
AzureIoTClient 4:57e049bce51e 1639 PAMQP_WORK_ITEM currentItem = malloc(sizeof(AMQP_WORK_ITEM));
AzureIoTClient 4:57e049bce51e 1640 if (!currentItem)
AzureIoTClient 4:57e049bce51e 1641 {
AzureIoTClient 4:57e049bce51e 1642 //
AzureIoTClient 4:57e049bce51e 1643 // Not a particularly good sign. Fail the create.
AzureIoTClient 4:57e049bce51e 1644 //
AzureIoTClient 4:57e049bce51e 1645 LogError("Unable to preallocate work item!\r\n");
AzureIoTClient 4:57e049bce51e 1646 clientTransportAMQP_Destroy(amqpState);
AzureIoTClient 4:57e049bce51e 1647 amqpState = NULL;
AzureIoTClient 4:57e049bce51e 1648 break;
AzureIoTClient 4:57e049bce51e 1649 }
AzureIoTClient 4:57e049bce51e 1650 else
AzureIoTClient 4:57e049bce51e 1651 {
AzureIoTClient 4:57e049bce51e 1652 DList_InsertTailList(&amqpState->availableWorkItems, &currentItem->link);
AzureIoTClient 4:57e049bce51e 1653 }
AzureIoTClient 4:57e049bce51e 1654 }
AzureIoTClient 4:57e049bce51e 1655 if (amqpState != NULL)
AzureIoTClient 4:57e049bce51e 1656 {
AzureIoTClient 4:57e049bce51e 1657 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_095: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as host, from the following pieces: "config->iotHubName + "." + config->iotHubSuffix.]*/
AzureIoTClient 4:57e049bce51e 1658 STRING_HANDLE host = createHost(config);
AzureIoTClient 4:57e049bce51e 1659 if (host != NULL)
AzureIoTClient 4:57e049bce51e 1660 {
AzureIoTClient 4:57e049bce51e 1661 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_091: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as urledDeviceId, by url encoding the config->deviceId.]*/
AzureIoTClient 4:57e049bce51e 1662 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_093: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as devicePortionOfPath, from the following pieces: host + "/devices/" + urledDeviceId.]*/
AzureIoTClient 4:57e049bce51e 1663 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_097: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as eventAddress, from the following pieces: "amqps://" + devicesPortionOfPath + "/messages/events".]*/
AzureIoTClient 4:57e049bce51e 1664 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_099: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as messageAddress, from the following pieces: "amqps://" + devicesPortionOfPath + "/messages/devicebound".]*/
AzureIoTClient 4:57e049bce51e 1665 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_101: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as cbsAddress, from the following pieces: "amqps://" + host + "/$cbs".]*/
AzureIoTClient 4:57e049bce51e 1666 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_103: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as deviceKey, from the config->deviceKey.]*/
AzureIoTClient 4:57e049bce51e 1667 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_186: [An empty string shall be created. If this fails then clientTransportAMQP shall fail and return NULL.]*/
AzureIoTClient 4:57e049bce51e 1668 if ((!createUrledDeviceId(amqpState, config)) ||
AzureIoTClient 4:57e049bce51e 1669 (!createDevicesPortionPath(amqpState, STRING_c_str(host))) ||
AzureIoTClient 4:57e049bce51e 1670 (!createEventAddress(amqpState)) ||
AzureIoTClient 4:57e049bce51e 1671 (!createMessageAddress(amqpState)) ||
AzureIoTClient 4:57e049bce51e 1672 (!createCbsAddress(amqpState, STRING_c_str(host))) ||
AzureIoTClient 4:57e049bce51e 1673 ((amqpState->deviceKey = STRING_construct(config->upperConfig->deviceKey)) == NULL) ||
AzureIoTClient 4:57e049bce51e 1674 ((amqpState->zeroLengthString = STRING_new()) == NULL))
AzureIoTClient 4:57e049bce51e 1675 {
AzureIoTClient 4:57e049bce51e 1676 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_092: [If creating the urledDeviceId fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/
AzureIoTClient 4:57e049bce51e 1677 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_094: [If creating the devicePortionOfPath fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/
AzureIoTClient 4:57e049bce51e 1678 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_098: [If creating the eventAddress fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/
AzureIoTClient 4:57e049bce51e 1679 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_100: [If creating the messageAddress fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/
AzureIoTClient 4:57e049bce51e 1680 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_102: [If creating the cbsAddress fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/
AzureIoTClient 4:57e049bce51e 1681 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_104: [If creating the deviceKey fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/
AzureIoTClient 4:57e049bce51e 1682 clientTransportAMQP_Destroy(amqpState);
AzureIoTClient 4:57e049bce51e 1683 amqpState = NULL;
AzureIoTClient 4:57e049bce51e 1684 }
AzureIoTClient 4:57e049bce51e 1685 STRING_delete(host);
AzureIoTClient 4:57e049bce51e 1686 }
AzureIoTClient 4:57e049bce51e 1687 else
AzureIoTClient 4:57e049bce51e 1688 {
AzureIoTClient 4:57e049bce51e 1689 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_096: [If creating the schemeAndHost fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/
AzureIoTClient 4:57e049bce51e 1690 clientTransportAMQP_Destroy(amqpState);
AzureIoTClient 4:57e049bce51e 1691 amqpState = NULL;
AzureIoTClient 4:57e049bce51e 1692 }
AzureIoTClient 4:57e049bce51e 1693 }
AzureIoTClient 4:57e049bce51e 1694 }
AzureIoTClient 4:57e049bce51e 1695 }
AzureIoTClient 4:57e049bce51e 1696 }
AzureIoTClient 4:57e049bce51e 1697 return amqpState;
AzureIoTClient 4:57e049bce51e 1698 }
AzureIoTClient 4:57e049bce51e 1699
AzureIoTClient 4:57e049bce51e 1700 static int clientTransportAMQP_Subscribe(TRANSPORT_HANDLE handle)
AzureIoTClient 4:57e049bce51e 1701 {
AzureIoTClient 4:57e049bce51e 1702 int result;
AzureIoTClient 4:57e049bce51e 1703 if (handle == NULL)
AzureIoTClient 4:57e049bce51e 1704 {
AzureIoTClient 4:57e049bce51e 1705 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_013: [If the parameter handle is NULL then clientTransportAMQP_Subscribe shall fail and return a non-zero value.]*/
AzureIoTClient 4:57e049bce51e 1706 LogError("Invalid handle to IoTHubClient amqp subscribe.\r\n");
AzureIoTClient 4:57e049bce51e 1707 result = 1;
AzureIoTClient 4:57e049bce51e 1708 }
AzureIoTClient 4:57e049bce51e 1709 else
AzureIoTClient 4:57e049bce51e 1710 {
AzureIoTClient 4:57e049bce51e 1711 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_014: [clientTransportAMQP_Subscribe shall succeed and return a value of zero.]*/
AzureIoTClient 4:57e049bce51e 1712 PAMQP_TRANSPORT_STATE transportState = (PAMQP_TRANSPORT_STATE)handle;
AzureIoTClient 4:57e049bce51e 1713 transportState->DoWork_PullMessages = true;
AzureIoTClient 4:57e049bce51e 1714 result = 0;
AzureIoTClient 4:57e049bce51e 1715 }
AzureIoTClient 4:57e049bce51e 1716 return result;
AzureIoTClient 4:57e049bce51e 1717 }
AzureIoTClient 4:57e049bce51e 1718
AzureIoTClient 4:57e049bce51e 1719 static void clientTransportAMQP_Unsubscribe(TRANSPORT_HANDLE handle)
AzureIoTClient 4:57e049bce51e 1720 {
AzureIoTClient 4:57e049bce51e 1721 PAMQP_TRANSPORT_STATE transportState = (PAMQP_TRANSPORT_STATE)handle;
AzureIoTClient 4:57e049bce51e 1722 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_015: [clientTransportAMQP_Unsubscribe shall do nothing if handle is NULL.]*/
AzureIoTClient 4:57e049bce51e 1723 if (transportState)
AzureIoTClient 4:57e049bce51e 1724 {
AzureIoTClient 4:57e049bce51e 1725 transportState->DoWork_PullMessages = false;
AzureIoTClient 4:57e049bce51e 1726 }
AzureIoTClient 4:57e049bce51e 1727 }
AzureIoTClient 4:57e049bce51e 1728
AzureIoTClient 4:57e049bce51e 1729 static IOTHUB_CLIENT_RESULT clientTransportAMQP_GetSendStatus(TRANSPORT_HANDLE handle, IOTHUB_CLIENT_STATUS *iotHubClientStatus)
AzureIoTClient 4:57e049bce51e 1730 {
AzureIoTClient 4:57e049bce51e 1731 IOTHUB_CLIENT_RESULT result;
AzureIoTClient 4:57e049bce51e 1732
AzureIoTClient 4:57e049bce51e 1733 /* Codes_SRS_IOTHUBTRANSPORTTAMQP_09_001: [clientTransportAMQP_GetSendStatus IoTHubTransportHttp_GetSendStatus shall return IOTHUB_CLIENT_INVALID_ARG if called with NULL parameter] */
AzureIoTClient 4:57e049bce51e 1734 if (handle == NULL)
AzureIoTClient 4:57e049bce51e 1735 {
AzureIoTClient 4:57e049bce51e 1736 result = IOTHUB_CLIENT_INVALID_ARG;
AzureIoTClient 4:57e049bce51e 1737 LogError("Invalid handle to IoTHubClient AMQP transport instance.\r\n");
AzureIoTClient 4:57e049bce51e 1738 }
AzureIoTClient 4:57e049bce51e 1739 else if (iotHubClientStatus == NULL)
AzureIoTClient 4:57e049bce51e 1740 {
AzureIoTClient 4:57e049bce51e 1741 result = IOTHUB_CLIENT_INVALID_ARG;
AzureIoTClient 4:57e049bce51e 1742 LogError("Invalid pointer to output parameter IOTHUB_CLIENT_STATUS.\r\n");
AzureIoTClient 4:57e049bce51e 1743 }
AzureIoTClient 4:57e049bce51e 1744 else
AzureIoTClient 4:57e049bce51e 1745 {
AzureIoTClient 4:57e049bce51e 1746 PAMQP_TRANSPORT_STATE handleData = (PAMQP_TRANSPORT_STATE)handle;
AzureIoTClient 4:57e049bce51e 1747
AzureIoTClient 4:57e049bce51e 1748 /* Codes_SRS_IOTHUBTRANSPORTTAMQP_09_002: [clientTransportAMQP_GetSendStatus shall return IOTHUB_CLIENT_OK and status IOTHUB_CLIENT_SEND_STATUS_IDLE if there are currently no event items to be sent or being sent] */
AzureIoTClient 4:57e049bce51e 1749 if (!DList_IsListEmpty(handleData->waitingToSend) || !DList_IsListEmpty(&(handleData->workInProgress)))
AzureIoTClient 4:57e049bce51e 1750 {
AzureIoTClient 4:57e049bce51e 1751 *iotHubClientStatus = IOTHUB_CLIENT_SEND_STATUS_BUSY;
AzureIoTClient 4:57e049bce51e 1752 }
AzureIoTClient 4:57e049bce51e 1753 /* Codes_SRS_IOTHUBTRANSPORTTAMQP_09_003: [clientTransportAMQP_GetSendStatus shall return IOTHUB_CLIENT_OK and status IOTHUB_CLIENT_SEND_STATUS_BUSY if there are currently event items to be sent or being sent] */
AzureIoTClient 4:57e049bce51e 1754 else
AzureIoTClient 4:57e049bce51e 1755 {
AzureIoTClient 4:57e049bce51e 1756 *iotHubClientStatus = IOTHUB_CLIENT_SEND_STATUS_IDLE;
AzureIoTClient 4:57e049bce51e 1757 }
AzureIoTClient 4:57e049bce51e 1758
AzureIoTClient 4:57e049bce51e 1759 result = IOTHUB_CLIENT_OK;
AzureIoTClient 4:57e049bce51e 1760 }
AzureIoTClient 4:57e049bce51e 1761
AzureIoTClient 4:57e049bce51e 1762 return result;
AzureIoTClient 4:57e049bce51e 1763 }
AzureIoTClient 4:57e049bce51e 1764
AzureIoTClient 4:57e049bce51e 1765 static IOTHUB_CLIENT_RESULT clientTransportAMQP_SetOption(TRANSPORT_HANDLE handle, const char* option, const void* value)
AzureIoTClient 4:57e049bce51e 1766 {
AzureIoTClient 4:57e049bce51e 1767 IOTHUB_CLIENT_RESULT result;
AzureIoTClient 4:57e049bce51e 1768 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_001: [If handle parameter is NULL then clientTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.] */
AzureIoTClient 4:57e049bce51e 1769 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_002: [If parameter optionName is NULL then clientTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.] */
AzureIoTClient 4:57e049bce51e 1770 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_003: [If parameter value is NULL then clientTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.] */
AzureIoTClient 4:57e049bce51e 1771
AzureIoTClient 5:8d58d20699dd 1772 if (
AzureIoTClient 5:8d58d20699dd 1773 (handle == NULL) ||
AzureIoTClient 5:8d58d20699dd 1774 (option == NULL) ||
AzureIoTClient 5:8d58d20699dd 1775 (value == NULL)
AzureIoTClient 5:8d58d20699dd 1776 )
AzureIoTClient 5:8d58d20699dd 1777 {
AzureIoTClient 5:8d58d20699dd 1778 result = IOTHUB_CLIENT_INVALID_ARG;
AzureIoTClient 5:8d58d20699dd 1779 LogError("invalid parameter (NULL) passed to clientTransportAMQP_SetOption\r\n");
AzureIoTClient 5:8d58d20699dd 1780 }
AzureIoTClient 5:8d58d20699dd 1781 else
AzureIoTClient 5:8d58d20699dd 1782 {
AzureIoTClient 5:8d58d20699dd 1783 /* Codes_SRS_IOTHUBTRANSPORTTAMQP_02_005: [clientTransportAMQP_SetOption shall set the option "optionName" to *value.] */
AzureIoTClient 5:8d58d20699dd 1784 /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_003: ["TrustedCerts"] */
AzureIoTClient 5:8d58d20699dd 1785 if (strcmp("TrustedCerts", option) == 0)
AzureIoTClient 5:8d58d20699dd 1786 {
AzureIoTClient 5:8d58d20699dd 1787 PAMQP_TRANSPORT_STATE handleData = (PAMQP_TRANSPORT_STATE)handle;
AzureIoTClient 5:8d58d20699dd 1788 char* newTrsutedCertificates;
AzureIoTClient 4:57e049bce51e 1789
AzureIoTClient 5:8d58d20699dd 1790 if (mallocAndStrcpy_s(&newTrsutedCertificates, (const char*)value) != 0)
AzureIoTClient 5:8d58d20699dd 1791 {
AzureIoTClient 5:8d58d20699dd 1792 /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_006: [If any other error occurs while setting the option, clientTransportAMQP_SetOption shall return IOTHUB_CLIENT_ERROR.] */
AzureIoTClient 5:8d58d20699dd 1793 result = IOTHUB_CLIENT_ERROR;
AzureIoTClient 5:8d58d20699dd 1794 }
AzureIoTClient 5:8d58d20699dd 1795 else
AzureIoTClient 5:8d58d20699dd 1796 {
AzureIoTClient 5:8d58d20699dd 1797 /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_004: [Sets a string that should be used as trusted certificates by the transport, freeing any previous TrustedCerts option value.] */
AzureIoTClient 5:8d58d20699dd 1798 if (handleData->trustedCertificates != NULL)
AzureIoTClient 5:8d58d20699dd 1799 {
AzureIoTClient 5:8d58d20699dd 1800 free(handleData->trustedCertificates);
AzureIoTClient 5:8d58d20699dd 1801 }
AzureIoTClient 4:57e049bce51e 1802
AzureIoTClient 5:8d58d20699dd 1803 handleData->trustedCertificates = newTrsutedCertificates;
AzureIoTClient 5:8d58d20699dd 1804 result = IOTHUB_CLIENT_OK;
AzureIoTClient 5:8d58d20699dd 1805 }
AzureIoTClient 5:8d58d20699dd 1806 }
AzureIoTClient 5:8d58d20699dd 1807 else
AzureIoTClient 5:8d58d20699dd 1808 {
AzureIoTClient 5:8d58d20699dd 1809 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_004: [If optionName is not an option supported then clientTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.]*/
AzureIoTClient 5:8d58d20699dd 1810 result = IOTHUB_CLIENT_INVALID_ARG;
AzureIoTClient 5:8d58d20699dd 1811 }
AzureIoTClient 5:8d58d20699dd 1812 }
AzureIoTClient 4:57e049bce51e 1813
AzureIoTClient 4:57e049bce51e 1814 return result;
AzureIoTClient 4:57e049bce51e 1815 }
AzureIoTClient 4:57e049bce51e 1816
AzureIoTClient 4:57e049bce51e 1817 static TRANSPORT_PROVIDER myfunc = {
AzureIoTClient 4:57e049bce51e 1818 clientTransportAMQP_SetOption,
AzureIoTClient 4:57e049bce51e 1819 clientTransportAMQP_Create, clientTransportAMQP_Destroy, clientTransportAMQP_Subscribe, clientTransportAMQP_Unsubscribe, clientTransportAMQP_DoWork, clientTransportAMQP_GetSendStatus
AzureIoTClient 4:57e049bce51e 1820 };
AzureIoTClient 4:57e049bce51e 1821
AzureIoTClient 4:57e049bce51e 1822 extern const void* AMQP_Protocol(void)
AzureIoTClient 4:57e049bce51e 1823 {
AzureIoTClient 4:57e049bce51e 1824 return &myfunc;
AzureIoTClient 4:57e049bce51e 1825 }