Microsoft Azure IoTHub client AMQP transport
Dependents: sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more
This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks
iothubtransportamqp.c@5:8d58d20699dd, 2015-11-02 (annotated)
- Committer:
- AzureIoTClient
- Date:
- Mon Nov 02 19:21:13 2015 -0800
- Revision:
- 5:8d58d20699dd
- Parent:
- 4:57e049bce51e
- Child:
- 7:07bc440836b3
v1.0.0-preview.4
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
AzureIoTClient | 4:57e049bce51e | 1 | // Copyright (c) Microsoft. All rights reserved. |
AzureIoTClient | 4:57e049bce51e | 2 | // Licensed under the MIT license. See LICENSE file in the project root for full license information. |
AzureIoTClient | 4:57e049bce51e | 3 | |
AzureIoTClient | 4:57e049bce51e | 4 | #include <stdlib.h> |
AzureIoTClient | 4:57e049bce51e | 5 | #ifdef _CRTDBG_MAP_ALLOC |
AzureIoTClient | 4:57e049bce51e | 6 | #include <crtdbg.h> |
AzureIoTClient | 4:57e049bce51e | 7 | #endif |
AzureIoTClient | 4:57e049bce51e | 8 | #include "gballoc.h" |
AzureIoTClient | 4:57e049bce51e | 9 | |
AzureIoTClient | 4:57e049bce51e | 10 | #include "proton/message.h" |
AzureIoTClient | 4:57e049bce51e | 11 | #include "proton/messenger.h" |
AzureIoTClient | 4:57e049bce51e | 12 | |
AzureIoTClient | 4:57e049bce51e | 13 | #include "iot_logging.h" |
AzureIoTClient | 4:57e049bce51e | 14 | #include "strings.h" |
AzureIoTClient | 4:57e049bce51e | 15 | #include "urlencode.h" |
AzureIoTClient | 4:57e049bce51e | 16 | #include "doublylinkedlist.h" |
AzureIoTClient | 4:57e049bce51e | 17 | #include "crt_abstractions.h" |
AzureIoTClient | 4:57e049bce51e | 18 | #include "sastoken.h" |
AzureIoTClient | 4:57e049bce51e | 19 | |
AzureIoTClient | 4:57e049bce51e | 20 | #include "iothub_client_ll.h" |
AzureIoTClient | 4:57e049bce51e | 21 | #include "iothub_client_private.h" |
AzureIoTClient | 4:57e049bce51e | 22 | #include "iothubtransportamqp.h" |
AzureIoTClient | 4:57e049bce51e | 23 | #include "iothub_client_amqp_internal.h" |
AzureIoTClient | 4:57e049bce51e | 24 | |
AzureIoTClient | 4:57e049bce51e | 25 | static const size_t NUMBER_OF_MESSENGER_STOP_TRIES = PROTON_MESSENGER_STOP_TRIES; |
AzureIoTClient | 4:57e049bce51e | 26 | static const size_t MAXIMUM_EVENT_LENGTH = PROTON_MAXIMUM_EVENT_LENGTH; |
AzureIoTClient | 4:57e049bce51e | 27 | |
AzureIoTClient | 4:57e049bce51e | 28 | |
AzureIoTClient | 4:57e049bce51e | 29 | #define amqp_batch_result_values \ |
AzureIoTClient | 4:57e049bce51e | 30 | amqp_batch_nowork, \ |
AzureIoTClient | 4:57e049bce51e | 31 | amqp_batch_batch, \ |
AzureIoTClient | 4:57e049bce51e | 32 | amqp_batch_nobatch, \ |
AzureIoTClient | 4:57e049bce51e | 33 | amqp_batch_error \ |
AzureIoTClient | 4:57e049bce51e | 34 | |
AzureIoTClient | 4:57e049bce51e | 35 | DEFINE_ENUM(amqp_batch_result, amqp_batch_result_values); |
AzureIoTClient | 4:57e049bce51e | 36 | |
AzureIoTClient | 4:57e049bce51e | 37 | static const int IO_PROCESSING_YIELD_IN_MILLISECONDS = PROTON_PROCESSING_YIELD_IN_MILLISECONDS; |
AzureIoTClient | 4:57e049bce51e | 38 | |
AzureIoTClient | 4:57e049bce51e | 39 | static void processReceives(TRANSPORT_HANDLE handle); |
AzureIoTClient | 4:57e049bce51e | 40 | |
AzureIoTClient | 4:57e049bce51e | 41 | typedef struct MESSENGER_CONTAINER_TAG |
AzureIoTClient | 4:57e049bce51e | 42 | { |
AzureIoTClient | 4:57e049bce51e | 43 | // |
AzureIoTClient | 4:57e049bce51e | 44 | // This links all of these records onto the messageCorral in the transport state. |
AzureIoTClient | 4:57e049bce51e | 45 | DLIST_ENTRY entry; |
AzureIoTClient | 4:57e049bce51e | 46 | |
AzureIoTClient | 4:57e049bce51e | 47 | // |
AzureIoTClient | 4:57e049bce51e | 48 | // This is a messenger that couldn't be stopped. (You have to admire its initiative.) We'll keep trying to stop it when |
AzureIoTClient | 4:57e049bce51e | 49 | // we have a spare moment. |
AzureIoTClient | 4:57e049bce51e | 50 | pn_messenger_t* messengerThatDidNotStop; |
AzureIoTClient | 4:57e049bce51e | 51 | } MESSENGER_CONTAINER, *PMESSENGER_CONTAINER; |
AzureIoTClient | 4:57e049bce51e | 52 | |
AzureIoTClient | 4:57e049bce51e | 53 | // |
AzureIoTClient | 4:57e049bce51e | 54 | // Transport specific structure used to contain everything needed to send an event item. |
AzureIoTClient | 4:57e049bce51e | 55 | // |
AzureIoTClient | 4:57e049bce51e | 56 | typedef struct AMQP_WORK_ITEM_TAG |
AzureIoTClient | 4:57e049bce51e | 57 | { |
AzureIoTClient | 4:57e049bce51e | 58 | // |
AzureIoTClient | 4:57e049bce51e | 59 | // At what time should we give up on sending this message. |
AzureIoTClient | 4:57e049bce51e | 60 | size_t expiry; |
AzureIoTClient | 4:57e049bce51e | 61 | |
AzureIoTClient | 4:57e049bce51e | 62 | // |
AzureIoTClient | 4:57e049bce51e | 63 | // Used by the transport to determine if the IO operation is complete. |
AzureIoTClient | 4:57e049bce51e | 64 | pn_tracker_t tracker; |
AzureIoTClient | 4:57e049bce51e | 65 | |
AzureIoTClient | 4:57e049bce51e | 66 | // |
AzureIoTClient | 4:57e049bce51e | 67 | // This acts as a listhead for all of the messages that make up one transfer frame. |
AzureIoTClient | 4:57e049bce51e | 68 | // If no batching is being done then this list will have at most one item. If batching |
AzureIoTClient | 4:57e049bce51e | 69 | // is being done then this could be lengthy! |
AzureIoTClient | 4:57e049bce51e | 70 | DLIST_ENTRY eventMessages; |
AzureIoTClient | 4:57e049bce51e | 71 | |
AzureIoTClient | 4:57e049bce51e | 72 | // |
AzureIoTClient | 4:57e049bce51e | 73 | // This is used to hold this work item either on the work in progress list or on the available work item list. |
AzureIoTClient | 4:57e049bce51e | 74 | DLIST_ENTRY link; |
AzureIoTClient | 4:57e049bce51e | 75 | } AMQP_WORK_ITEM, *PAMQP_WORK_ITEM; |
AzureIoTClient | 4:57e049bce51e | 76 | |
AzureIoTClient | 4:57e049bce51e | 77 | static void rollbackEvent(PAMQP_TRANSPORT_STATE transportState); |
AzureIoTClient | 4:57e049bce51e | 78 | |
AzureIoTClient | 4:57e049bce51e | 79 | static bool checkForErrorsThenWork(PAMQP_TRANSPORT_STATE transportState) |
AzureIoTClient | 4:57e049bce51e | 80 | { |
AzureIoTClient | 4:57e049bce51e | 81 | pn_error_t* errorStruct; |
AzureIoTClient | 4:57e049bce51e | 82 | bool result; |
AzureIoTClient | 4:57e049bce51e | 83 | errorStruct = pn_messenger_error(transportState->messenger); |
AzureIoTClient | 4:57e049bce51e | 84 | if (pn_error_code(errorStruct) != 0) |
AzureIoTClient | 4:57e049bce51e | 85 | { |
AzureIoTClient | 4:57e049bce51e | 86 | LogError("An error was detected in the networking manager: %d\r\n", pn_error_code(errorStruct)); |
AzureIoTClient | 4:57e049bce51e | 87 | transportState->messengerInitialized = false; |
AzureIoTClient | 4:57e049bce51e | 88 | result = false; |
AzureIoTClient | 4:57e049bce51e | 89 | } |
AzureIoTClient | 4:57e049bce51e | 90 | else |
AzureIoTClient | 4:57e049bce51e | 91 | { |
AzureIoTClient | 4:57e049bce51e | 92 | int protonResult; |
AzureIoTClient | 4:57e049bce51e | 93 | protonResult = pn_messenger_work(transportState->messenger, IO_PROCESSING_YIELD_IN_MILLISECONDS); |
AzureIoTClient | 4:57e049bce51e | 94 | if ((protonResult < 0) && (protonResult != PN_TIMEOUT)) |
AzureIoTClient | 4:57e049bce51e | 95 | { |
AzureIoTClient | 4:57e049bce51e | 96 | LogError("A networking error occured. Proton error code: %d\r\n", protonResult); |
AzureIoTClient | 4:57e049bce51e | 97 | transportState->messengerInitialized = false; |
AzureIoTClient | 4:57e049bce51e | 98 | result = false; |
AzureIoTClient | 4:57e049bce51e | 99 | } |
AzureIoTClient | 4:57e049bce51e | 100 | else |
AzureIoTClient | 4:57e049bce51e | 101 | { |
AzureIoTClient | 4:57e049bce51e | 102 | result = true; |
AzureIoTClient | 4:57e049bce51e | 103 | } |
AzureIoTClient | 4:57e049bce51e | 104 | } |
AzureIoTClient | 4:57e049bce51e | 105 | return result; |
AzureIoTClient | 4:57e049bce51e | 106 | } |
AzureIoTClient | 5:8d58d20699dd | 107 | |
AzureIoTClient | 5:8d58d20699dd | 108 | static int setRecvMessageIds(IOTHUB_MESSAGE_HANDLE ioTMessage, pn_message_t* msg) |
AzureIoTClient | 5:8d58d20699dd | 109 | { |
AzureIoTClient | 5:8d58d20699dd | 110 | int result; |
AzureIoTClient | 5:8d58d20699dd | 111 | |
AzureIoTClient | 5:8d58d20699dd | 112 | // Function can not fail |
AzureIoTClient | 5:8d58d20699dd | 113 | pn_atom_t msgIdInfo = pn_message_get_id(msg); |
AzureIoTClient | 5:8d58d20699dd | 114 | if (msgIdInfo.type != PN_NULL) |
AzureIoTClient | 5:8d58d20699dd | 115 | { |
AzureIoTClient | 5:8d58d20699dd | 116 | if (msgIdInfo.type != PN_STRING) |
AzureIoTClient | 5:8d58d20699dd | 117 | { |
AzureIoTClient | 5:8d58d20699dd | 118 | LogError("Message Id Failure: Invalid message id Type %d\r\n", msgIdInfo.type); |
AzureIoTClient | 5:8d58d20699dd | 119 | result = __LINE__; |
AzureIoTClient | 5:8d58d20699dd | 120 | } |
AzureIoTClient | 5:8d58d20699dd | 121 | else |
AzureIoTClient | 5:8d58d20699dd | 122 | { |
AzureIoTClient | 5:8d58d20699dd | 123 | if (IoTHubMessage_SetMessageId(ioTMessage, msgIdInfo.u.as_bytes.start) != IOTHUB_MESSAGE_OK) |
AzureIoTClient | 5:8d58d20699dd | 124 | { |
AzureIoTClient | 5:8d58d20699dd | 125 | result = __LINE__; |
AzureIoTClient | 5:8d58d20699dd | 126 | } |
AzureIoTClient | 5:8d58d20699dd | 127 | else |
AzureIoTClient | 5:8d58d20699dd | 128 | { |
AzureIoTClient | 5:8d58d20699dd | 129 | result = 0; |
AzureIoTClient | 5:8d58d20699dd | 130 | } |
AzureIoTClient | 5:8d58d20699dd | 131 | } |
AzureIoTClient | 5:8d58d20699dd | 132 | } |
AzureIoTClient | 5:8d58d20699dd | 133 | else |
AzureIoTClient | 5:8d58d20699dd | 134 | { |
AzureIoTClient | 5:8d58d20699dd | 135 | // NULL values mean it's not set |
AzureIoTClient | 5:8d58d20699dd | 136 | result = 0; |
AzureIoTClient | 5:8d58d20699dd | 137 | } |
AzureIoTClient | 5:8d58d20699dd | 138 | |
AzureIoTClient | 5:8d58d20699dd | 139 | if (result == 0) |
AzureIoTClient | 5:8d58d20699dd | 140 | { |
AzureIoTClient | 5:8d58d20699dd | 141 | pn_atom_t corrIdInfo = pn_message_get_correlation_id(msg); |
AzureIoTClient | 5:8d58d20699dd | 142 | if (corrIdInfo.type != PN_NULL) |
AzureIoTClient | 5:8d58d20699dd | 143 | { |
AzureIoTClient | 5:8d58d20699dd | 144 | if (corrIdInfo.type != PN_STRING) |
AzureIoTClient | 5:8d58d20699dd | 145 | { |
AzureIoTClient | 5:8d58d20699dd | 146 | LogError("Message Id Failure: Invalid correlation id Type %d\r\n", corrIdInfo.type); |
AzureIoTClient | 5:8d58d20699dd | 147 | result = __LINE__; |
AzureIoTClient | 5:8d58d20699dd | 148 | } |
AzureIoTClient | 5:8d58d20699dd | 149 | else |
AzureIoTClient | 5:8d58d20699dd | 150 | { |
AzureIoTClient | 5:8d58d20699dd | 151 | if (IoTHubMessage_SetCorrelationId(ioTMessage, corrIdInfo.u.as_bytes.start) != IOTHUB_MESSAGE_OK) |
AzureIoTClient | 5:8d58d20699dd | 152 | { |
AzureIoTClient | 5:8d58d20699dd | 153 | result = __LINE__; |
AzureIoTClient | 5:8d58d20699dd | 154 | } |
AzureIoTClient | 5:8d58d20699dd | 155 | else |
AzureIoTClient | 5:8d58d20699dd | 156 | { |
AzureIoTClient | 5:8d58d20699dd | 157 | result = 0; |
AzureIoTClient | 5:8d58d20699dd | 158 | } |
AzureIoTClient | 5:8d58d20699dd | 159 | } |
AzureIoTClient | 5:8d58d20699dd | 160 | } |
AzureIoTClient | 5:8d58d20699dd | 161 | else |
AzureIoTClient | 5:8d58d20699dd | 162 | { |
AzureIoTClient | 5:8d58d20699dd | 163 | // NULL values mean it's not set |
AzureIoTClient | 5:8d58d20699dd | 164 | result = 0; |
AzureIoTClient | 5:8d58d20699dd | 165 | } |
AzureIoTClient | 5:8d58d20699dd | 166 | } |
AzureIoTClient | 5:8d58d20699dd | 167 | return result; |
AzureIoTClient | 5:8d58d20699dd | 168 | } |
AzureIoTClient | 5:8d58d20699dd | 169 | |
AzureIoTClient | 5:8d58d20699dd | 170 | static int setSendMessageIds(PDLIST_ENTRY messagesEntry, pn_message_t* msg) |
AzureIoTClient | 5:8d58d20699dd | 171 | { |
AzureIoTClient | 5:8d58d20699dd | 172 | int result; |
AzureIoTClient | 5:8d58d20699dd | 173 | |
AzureIoTClient | 5:8d58d20699dd | 174 | IOTHUB_MESSAGE_LIST* currentMessage = containingRecord(messagesEntry->Flink, IOTHUB_MESSAGE_LIST, entry); |
AzureIoTClient | 5:8d58d20699dd | 175 | const char* messageId = IoTHubMessage_GetMessageId(currentMessage->messageHandle); |
AzureIoTClient | 5:8d58d20699dd | 176 | if (messageId != NULL) |
AzureIoTClient | 5:8d58d20699dd | 177 | { |
AzureIoTClient | 5:8d58d20699dd | 178 | pn_bytes_t dataBytes = pn_bytes(strlen(messageId), messageId); |
AzureIoTClient | 5:8d58d20699dd | 179 | pn_atom_t pnMsgId; |
AzureIoTClient | 5:8d58d20699dd | 180 | pnMsgId.type = PN_STRING; |
AzureIoTClient | 5:8d58d20699dd | 181 | pnMsgId.u.as_bytes = dataBytes; |
AzureIoTClient | 5:8d58d20699dd | 182 | if (pn_message_set_id(msg, pnMsgId) == 0) |
AzureIoTClient | 5:8d58d20699dd | 183 | { |
AzureIoTClient | 5:8d58d20699dd | 184 | result = 0; |
AzureIoTClient | 5:8d58d20699dd | 185 | } |
AzureIoTClient | 5:8d58d20699dd | 186 | else |
AzureIoTClient | 5:8d58d20699dd | 187 | { |
AzureIoTClient | 5:8d58d20699dd | 188 | LogError("Failure setting pn_message_set_id.\r\n"); |
AzureIoTClient | 5:8d58d20699dd | 189 | result = __LINE__; |
AzureIoTClient | 5:8d58d20699dd | 190 | } |
AzureIoTClient | 5:8d58d20699dd | 191 | } |
AzureIoTClient | 5:8d58d20699dd | 192 | else |
AzureIoTClient | 5:8d58d20699dd | 193 | { |
AzureIoTClient | 5:8d58d20699dd | 194 | result = 0; |
AzureIoTClient | 5:8d58d20699dd | 195 | } |
AzureIoTClient | 5:8d58d20699dd | 196 | |
AzureIoTClient | 5:8d58d20699dd | 197 | if (result == 0) |
AzureIoTClient | 5:8d58d20699dd | 198 | { |
AzureIoTClient | 5:8d58d20699dd | 199 | const char* correlationId = IoTHubMessage_GetCorrelationId(currentMessage->messageHandle); |
AzureIoTClient | 5:8d58d20699dd | 200 | if (correlationId != NULL) |
AzureIoTClient | 5:8d58d20699dd | 201 | { |
AzureIoTClient | 5:8d58d20699dd | 202 | pn_bytes_t dataBytes = pn_bytes(strlen(correlationId), correlationId); |
AzureIoTClient | 5:8d58d20699dd | 203 | pn_atom_t pnCorrelationId; |
AzureIoTClient | 5:8d58d20699dd | 204 | pnCorrelationId.type = PN_STRING; |
AzureIoTClient | 5:8d58d20699dd | 205 | pnCorrelationId.u.as_bytes = dataBytes; |
AzureIoTClient | 5:8d58d20699dd | 206 | if (pn_message_set_correlation_id(msg, pnCorrelationId) == 0) |
AzureIoTClient | 5:8d58d20699dd | 207 | { |
AzureIoTClient | 5:8d58d20699dd | 208 | result = 0; |
AzureIoTClient | 5:8d58d20699dd | 209 | } |
AzureIoTClient | 5:8d58d20699dd | 210 | else |
AzureIoTClient | 5:8d58d20699dd | 211 | { |
AzureIoTClient | 5:8d58d20699dd | 212 | LogError("Failure setting pn_message_set_correlation_id.\r\n"); |
AzureIoTClient | 5:8d58d20699dd | 213 | result = __LINE__; |
AzureIoTClient | 5:8d58d20699dd | 214 | } |
AzureIoTClient | 5:8d58d20699dd | 215 | } |
AzureIoTClient | 5:8d58d20699dd | 216 | else |
AzureIoTClient | 5:8d58d20699dd | 217 | { |
AzureIoTClient | 5:8d58d20699dd | 218 | result = 0; |
AzureIoTClient | 5:8d58d20699dd | 219 | } |
AzureIoTClient | 5:8d58d20699dd | 220 | } |
AzureIoTClient | 5:8d58d20699dd | 221 | return result; |
AzureIoTClient | 5:8d58d20699dd | 222 | } |
AzureIoTClient | 5:8d58d20699dd | 223 | |
AzureIoTClient | 4:57e049bce51e | 224 | /* Code_SRS_IOTHUBTRANSPORTTAMQP_07_001: [All IoTHubMessage_Properties shall be enumerated and entered in the pn_message_properties Map.] */ |
AzureIoTClient | 4:57e049bce51e | 225 | static int setProperties(PDLIST_ENTRY messagesMakingUpBatch, pn_message_t* msg) |
AzureIoTClient | 4:57e049bce51e | 226 | { |
AzureIoTClient | 4:57e049bce51e | 227 | int result = 0; |
AzureIoTClient | 4:57e049bce51e | 228 | size_t index; |
AzureIoTClient | 4:57e049bce51e | 229 | |
AzureIoTClient | 4:57e049bce51e | 230 | pn_data_t* propData; |
AzureIoTClient | 4:57e049bce51e | 231 | const char*const* keys; |
AzureIoTClient | 4:57e049bce51e | 232 | const char*const* values; |
AzureIoTClient | 4:57e049bce51e | 233 | size_t propertyCount = 0; |
AzureIoTClient | 4:57e049bce51e | 234 | |
AzureIoTClient | 4:57e049bce51e | 235 | IOTHUB_MESSAGE_LIST* currentMessage = containingRecord(messagesMakingUpBatch->Flink, IOTHUB_MESSAGE_LIST, entry); |
AzureIoTClient | 4:57e049bce51e | 236 | MAP_HANDLE mapProperties = IoTHubMessage_Properties(currentMessage->messageHandle); |
AzureIoTClient | 4:57e049bce51e | 237 | if (mapProperties == NULL) |
AzureIoTClient | 4:57e049bce51e | 238 | { |
AzureIoTClient | 4:57e049bce51e | 239 | LogError("Failure IoTHubMessage_Properties.\r\n"); |
AzureIoTClient | 4:57e049bce51e | 240 | result = __LINE__; |
AzureIoTClient | 4:57e049bce51e | 241 | } |
AzureIoTClient | 4:57e049bce51e | 242 | else if (Map_GetInternals(mapProperties, &keys, &values, &propertyCount) != MAP_OK) |
AzureIoTClient | 4:57e049bce51e | 243 | { |
AzureIoTClient | 4:57e049bce51e | 244 | LogError("Failure retrieving Map_GetInternals.\r\n"); |
AzureIoTClient | 4:57e049bce51e | 245 | result = __LINE__; |
AzureIoTClient | 4:57e049bce51e | 246 | } |
AzureIoTClient | 4:57e049bce51e | 247 | else if (propertyCount > 0) |
AzureIoTClient | 4:57e049bce51e | 248 | { |
AzureIoTClient | 4:57e049bce51e | 249 | if ( (propData = pn_message_properties(msg) ) == NULL) |
AzureIoTClient | 4:57e049bce51e | 250 | { |
AzureIoTClient | 4:57e049bce51e | 251 | LogError("Failure pn_message_properties.\r\n"); |
AzureIoTClient | 4:57e049bce51e | 252 | result = __LINE__; |
AzureIoTClient | 4:57e049bce51e | 253 | } |
AzureIoTClient | 4:57e049bce51e | 254 | else if (pn_data_put_map(propData) != 0) |
AzureIoTClient | 4:57e049bce51e | 255 | { |
AzureIoTClient | 4:57e049bce51e | 256 | LogError("Failure pn_data_put_map.\r\n"); |
AzureIoTClient | 4:57e049bce51e | 257 | result = __LINE__; |
AzureIoTClient | 4:57e049bce51e | 258 | } |
AzureIoTClient | 4:57e049bce51e | 259 | else if (pn_data_enter(propData) == false ) |
AzureIoTClient | 4:57e049bce51e | 260 | { |
AzureIoTClient | 4:57e049bce51e | 261 | LogError("Failure pn_data_enter.\r\n"); |
AzureIoTClient | 4:57e049bce51e | 262 | result = __LINE__; |
AzureIoTClient | 4:57e049bce51e | 263 | } |
AzureIoTClient | 4:57e049bce51e | 264 | else |
AzureIoTClient | 4:57e049bce51e | 265 | { |
AzureIoTClient | 4:57e049bce51e | 266 | for (index = 0; index < propertyCount; index++) |
AzureIoTClient | 4:57e049bce51e | 267 | { |
AzureIoTClient | 4:57e049bce51e | 268 | if (pn_data_put_symbol(propData, pn_bytes(strlen(keys[index]), keys[index])) != 0) |
AzureIoTClient | 4:57e049bce51e | 269 | { |
AzureIoTClient | 4:57e049bce51e | 270 | LogError("Failure pn_data_put_symbol.\r\n"); |
AzureIoTClient | 4:57e049bce51e | 271 | break; |
AzureIoTClient | 4:57e049bce51e | 272 | } |
AzureIoTClient | 4:57e049bce51e | 273 | else if (pn_data_put_string(propData, pn_bytes(strlen(values[index]), values[index])) != 0) |
AzureIoTClient | 4:57e049bce51e | 274 | { |
AzureIoTClient | 4:57e049bce51e | 275 | LogError("Failure pn_data_put_string.\r\n"); |
AzureIoTClient | 4:57e049bce51e | 276 | break; |
AzureIoTClient | 4:57e049bce51e | 277 | } |
AzureIoTClient | 4:57e049bce51e | 278 | } |
AzureIoTClient | 4:57e049bce51e | 279 | if (index != propertyCount) |
AzureIoTClient | 4:57e049bce51e | 280 | { |
AzureIoTClient | 4:57e049bce51e | 281 | result = __LINE__; |
AzureIoTClient | 4:57e049bce51e | 282 | } |
AzureIoTClient | 4:57e049bce51e | 283 | else if (!pn_data_exit(propData) ) |
AzureIoTClient | 4:57e049bce51e | 284 | { |
AzureIoTClient | 4:57e049bce51e | 285 | LogError("Failure pn_data_exit.\r\n"); |
AzureIoTClient | 4:57e049bce51e | 286 | result = __LINE__; |
AzureIoTClient | 4:57e049bce51e | 287 | } |
AzureIoTClient | 4:57e049bce51e | 288 | else |
AzureIoTClient | 4:57e049bce51e | 289 | { |
AzureIoTClient | 4:57e049bce51e | 290 | result = 0; |
AzureIoTClient | 4:57e049bce51e | 291 | } |
AzureIoTClient | 4:57e049bce51e | 292 | } |
AzureIoTClient | 4:57e049bce51e | 293 | } |
AzureIoTClient | 4:57e049bce51e | 294 | return result; |
AzureIoTClient | 4:57e049bce51e | 295 | } |
AzureIoTClient | 4:57e049bce51e | 296 | |
AzureIoTClient | 4:57e049bce51e | 297 | static int getMapString(pn_data_t* propData, pn_bytes_t* mapValue) |
AzureIoTClient | 4:57e049bce51e | 298 | { |
AzureIoTClient | 4:57e049bce51e | 299 | int result; |
AzureIoTClient | 4:57e049bce51e | 300 | if (pn_data_next(propData) == true) |
AzureIoTClient | 4:57e049bce51e | 301 | { |
AzureIoTClient | 4:57e049bce51e | 302 | if (pn_data_type(propData) == PN_STRING) |
AzureIoTClient | 4:57e049bce51e | 303 | { |
AzureIoTClient | 4:57e049bce51e | 304 | *mapValue = pn_data_get_string(propData); |
AzureIoTClient | 4:57e049bce51e | 305 | result = 0; |
AzureIoTClient | 4:57e049bce51e | 306 | } |
AzureIoTClient | 4:57e049bce51e | 307 | else |
AzureIoTClient | 4:57e049bce51e | 308 | { |
AzureIoTClient | 4:57e049bce51e | 309 | LogError("Failure pn_data_type.\r\n"); |
AzureIoTClient | 4:57e049bce51e | 310 | result = __LINE__; |
AzureIoTClient | 4:57e049bce51e | 311 | } |
AzureIoTClient | 4:57e049bce51e | 312 | } |
AzureIoTClient | 4:57e049bce51e | 313 | else |
AzureIoTClient | 4:57e049bce51e | 314 | { |
AzureIoTClient | 4:57e049bce51e | 315 | LogError("Failure pn_data_next.\r\n"); |
AzureIoTClient | 4:57e049bce51e | 316 | result = __LINE__; |
AzureIoTClient | 4:57e049bce51e | 317 | } |
AzureIoTClient | 4:57e049bce51e | 318 | return result; |
AzureIoTClient | 4:57e049bce51e | 319 | } |
AzureIoTClient | 4:57e049bce51e | 320 | |
AzureIoTClient | 4:57e049bce51e | 321 | static int getMapInt(pn_data_t* propData, int32_t* mapValue) |
AzureIoTClient | 4:57e049bce51e | 322 | { |
AzureIoTClient | 4:57e049bce51e | 323 | int result; |
AzureIoTClient | 4:57e049bce51e | 324 | if (pn_data_next(propData) == true) |
AzureIoTClient | 4:57e049bce51e | 325 | { |
AzureIoTClient | 4:57e049bce51e | 326 | if (pn_data_type(propData) == PN_INT) |
AzureIoTClient | 4:57e049bce51e | 327 | { |
AzureIoTClient | 4:57e049bce51e | 328 | *mapValue = pn_data_get_int(propData); |
AzureIoTClient | 4:57e049bce51e | 329 | result = 0; |
AzureIoTClient | 4:57e049bce51e | 330 | } |
AzureIoTClient | 4:57e049bce51e | 331 | else |
AzureIoTClient | 4:57e049bce51e | 332 | { |
AzureIoTClient | 4:57e049bce51e | 333 | LogError("Failure pn_data_type.\r\n"); |
AzureIoTClient | 4:57e049bce51e | 334 | result = __LINE__; |
AzureIoTClient | 4:57e049bce51e | 335 | } |
AzureIoTClient | 4:57e049bce51e | 336 | } |
AzureIoTClient | 4:57e049bce51e | 337 | else |
AzureIoTClient | 4:57e049bce51e | 338 | { |
AzureIoTClient | 4:57e049bce51e | 339 | LogError("Failure pn_data_next.\r\n"); |
AzureIoTClient | 4:57e049bce51e | 340 | result = __LINE__; |
AzureIoTClient | 4:57e049bce51e | 341 | } |
AzureIoTClient | 4:57e049bce51e | 342 | return result; |
AzureIoTClient | 4:57e049bce51e | 343 | } |
AzureIoTClient | 4:57e049bce51e | 344 | |
AzureIoTClient | 4:57e049bce51e | 345 | /* Code_SRS_IOTHUBTRANSPORTTAMQP_07_002: [On messages the properties shall be retrieved from the message using pn_message_properties and will be entered in the IoTHubMessage_Properties Map.] */ |
AzureIoTClient | 4:57e049bce51e | 346 | static int cloneProperties(IOTHUB_MESSAGE_HANDLE ioTMessage, pn_message_t* msg) |
AzureIoTClient | 4:57e049bce51e | 347 | { |
AzureIoTClient | 4:57e049bce51e | 348 | int result = 0; |
AzureIoTClient | 4:57e049bce51e | 349 | size_t index = 0; |
AzureIoTClient | 4:57e049bce51e | 350 | pn_data_t* propData = pn_message_properties(msg); |
AzureIoTClient | 4:57e049bce51e | 351 | if (propData == NULL) |
AzureIoTClient | 4:57e049bce51e | 352 | { |
AzureIoTClient | 4:57e049bce51e | 353 | LogError("Failure pn_message_properties.\r\n"); |
AzureIoTClient | 4:57e049bce51e | 354 | result = __LINE__; |
AzureIoTClient | 4:57e049bce51e | 355 | } |
AzureIoTClient | 4:57e049bce51e | 356 | else if (pn_data_next(propData) == false ) |
AzureIoTClient | 4:57e049bce51e | 357 | { |
AzureIoTClient | 4:57e049bce51e | 358 | // This should fail if there is no next sibling which means that there |
AzureIoTClient | 4:57e049bce51e | 359 | // are no property and it should continue |
AzureIoTClient | 4:57e049bce51e | 360 | result = 0; |
AzureIoTClient | 4:57e049bce51e | 361 | } |
AzureIoTClient | 4:57e049bce51e | 362 | else |
AzureIoTClient | 4:57e049bce51e | 363 | { |
AzureIoTClient | 4:57e049bce51e | 364 | size_t propertyCount = pn_data_get_map(propData)/2; |
AzureIoTClient | 4:57e049bce51e | 365 | if (propertyCount > 0) |
AzureIoTClient | 4:57e049bce51e | 366 | { |
AzureIoTClient | 4:57e049bce51e | 367 | if (pn_data_enter(propData) == false) |
AzureIoTClient | 4:57e049bce51e | 368 | { |
AzureIoTClient | 4:57e049bce51e | 369 | LogError("Failure pn_data_enter.\r\n"); |
AzureIoTClient | 4:57e049bce51e | 370 | result = __LINE__; |
AzureIoTClient | 4:57e049bce51e | 371 | } |
AzureIoTClient | 4:57e049bce51e | 372 | else |
AzureIoTClient | 4:57e049bce51e | 373 | { |
AzureIoTClient | 4:57e049bce51e | 374 | // Get the Properties from the Message |
AzureIoTClient | 4:57e049bce51e | 375 | MAP_HANDLE properties = IoTHubMessage_Properties(ioTMessage); |
AzureIoTClient | 4:57e049bce51e | 376 | if (properties == NULL) |
AzureIoTClient | 4:57e049bce51e | 377 | { |
AzureIoTClient | 4:57e049bce51e | 378 | result = __LINE__; |
AzureIoTClient | 4:57e049bce51e | 379 | } |
AzureIoTClient | 4:57e049bce51e | 380 | else |
AzureIoTClient | 4:57e049bce51e | 381 | { |
AzureIoTClient | 4:57e049bce51e | 382 | for (index = 0; index < propertyCount; index++) |
AzureIoTClient | 4:57e049bce51e | 383 | { |
AzureIoTClient | 4:57e049bce51e | 384 | pn_bytes_t propValue; |
AzureIoTClient | 4:57e049bce51e | 385 | pn_bytes_t propName; |
AzureIoTClient | 4:57e049bce51e | 386 | if (getMapString(propData, &propValue) != 0) |
AzureIoTClient | 4:57e049bce51e | 387 | { |
AzureIoTClient | 4:57e049bce51e | 388 | result = __LINE__; |
AzureIoTClient | 4:57e049bce51e | 389 | break; |
AzureIoTClient | 4:57e049bce51e | 390 | } |
AzureIoTClient | 4:57e049bce51e | 391 | if (getMapString(propData, &propName) != 0) |
AzureIoTClient | 4:57e049bce51e | 392 | { |
AzureIoTClient | 4:57e049bce51e | 393 | result = __LINE__; |
AzureIoTClient | 4:57e049bce51e | 394 | break; |
AzureIoTClient | 4:57e049bce51e | 395 | } |
AzureIoTClient | 4:57e049bce51e | 396 | |
AzureIoTClient | 4:57e049bce51e | 397 | if (Map_AddOrUpdate(properties, propValue.start, propName.start) != MAP_OK) |
AzureIoTClient | 4:57e049bce51e | 398 | { |
AzureIoTClient | 4:57e049bce51e | 399 | LogError("Failure Map_AddOrUpdate.\r\n"); |
AzureIoTClient | 4:57e049bce51e | 400 | result = __LINE__; |
AzureIoTClient | 4:57e049bce51e | 401 | break; |
AzureIoTClient | 4:57e049bce51e | 402 | } |
AzureIoTClient | 4:57e049bce51e | 403 | } |
AzureIoTClient | 4:57e049bce51e | 404 | if (index != propertyCount) |
AzureIoTClient | 4:57e049bce51e | 405 | { |
AzureIoTClient | 4:57e049bce51e | 406 | result = __LINE__; |
AzureIoTClient | 4:57e049bce51e | 407 | } |
AzureIoTClient | 4:57e049bce51e | 408 | else if (!pn_data_exit(propData) ) |
AzureIoTClient | 4:57e049bce51e | 409 | { |
AzureIoTClient | 4:57e049bce51e | 410 | LogError("Failure pn_data_exit.\r\n"); |
AzureIoTClient | 4:57e049bce51e | 411 | result = __LINE__; |
AzureIoTClient | 4:57e049bce51e | 412 | } |
AzureIoTClient | 4:57e049bce51e | 413 | else |
AzureIoTClient | 4:57e049bce51e | 414 | { |
AzureIoTClient | 4:57e049bce51e | 415 | result = 0; |
AzureIoTClient | 4:57e049bce51e | 416 | } |
AzureIoTClient | 4:57e049bce51e | 417 | } |
AzureIoTClient | 4:57e049bce51e | 418 | } |
AzureIoTClient | 4:57e049bce51e | 419 | } |
AzureIoTClient | 4:57e049bce51e | 420 | } |
AzureIoTClient | 4:57e049bce51e | 421 | return result; |
AzureIoTClient | 4:57e049bce51e | 422 | } |
AzureIoTClient | 4:57e049bce51e | 423 | |
AzureIoTClient | 4:57e049bce51e | 424 | static void putSecondListAfterHeadOfFirst(PDLIST_ENTRY first, PDLIST_ENTRY second) |
AzureIoTClient | 4:57e049bce51e | 425 | { |
AzureIoTClient | 4:57e049bce51e | 426 | DList_AppendTailList(first->Flink, second); |
AzureIoTClient | 4:57e049bce51e | 427 | DList_RemoveEntryList(second); |
AzureIoTClient | 4:57e049bce51e | 428 | DList_InitializeListHead(second); // Just to be tidy. |
AzureIoTClient | 4:57e049bce51e | 429 | } |
AzureIoTClient | 4:57e049bce51e | 430 | |
AzureIoTClient | 4:57e049bce51e | 431 | static bool stopMessenger(pn_messenger_t* messenger) |
AzureIoTClient | 4:57e049bce51e | 432 | { |
AzureIoTClient | 4:57e049bce51e | 433 | bool succeeded; |
AzureIoTClient | 4:57e049bce51e | 434 | int result; |
AzureIoTClient | 4:57e049bce51e | 435 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_021: [Invoke pn_messenger_stop on the messenger.]*/ |
AzureIoTClient | 4:57e049bce51e | 436 | result = pn_messenger_stop(messenger); |
AzureIoTClient | 4:57e049bce51e | 437 | if (result == 0) |
AzureIoTClient | 4:57e049bce51e | 438 | { |
AzureIoTClient | 4:57e049bce51e | 439 | succeeded = true; |
AzureIoTClient | 4:57e049bce51e | 440 | } |
AzureIoTClient | 4:57e049bce51e | 441 | else if (result != PN_INPROGRESS) |
AzureIoTClient | 4:57e049bce51e | 442 | { |
AzureIoTClient | 4:57e049bce51e | 443 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_023: [If pn_messenger_stop returns any result other than PN_INPROGRESS then place the messenger on a list for attempting to stop later.]*/ |
AzureIoTClient | 4:57e049bce51e | 444 | succeeded = false; |
AzureIoTClient | 4:57e049bce51e | 445 | } |
AzureIoTClient | 4:57e049bce51e | 446 | else |
AzureIoTClient | 4:57e049bce51e | 447 | { |
AzureIoTClient | 4:57e049bce51e | 448 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_024: [If pn_messenger_stop returns PN_INPROGRESS invoke pn_messenger_stopped a fixed number of time, seeking a return of true.]*/ |
AzureIoTClient | 4:57e049bce51e | 449 | size_t numberOfTrys; |
AzureIoTClient | 4:57e049bce51e | 450 | succeeded = false; |
AzureIoTClient | 4:57e049bce51e | 451 | for (numberOfTrys = NUMBER_OF_MESSENGER_STOP_TRIES; numberOfTrys > 0; numberOfTrys--) |
AzureIoTClient | 4:57e049bce51e | 452 | { |
AzureIoTClient | 4:57e049bce51e | 453 | pn_messenger_work(messenger,100); |
AzureIoTClient | 4:57e049bce51e | 454 | if (pn_messenger_stopped(messenger)) |
AzureIoTClient | 4:57e049bce51e | 455 | { |
AzureIoTClient | 4:57e049bce51e | 456 | succeeded = true; |
AzureIoTClient | 4:57e049bce51e | 457 | break; |
AzureIoTClient | 4:57e049bce51e | 458 | } |
AzureIoTClient | 4:57e049bce51e | 459 | } |
AzureIoTClient | 4:57e049bce51e | 460 | } |
AzureIoTClient | 4:57e049bce51e | 461 | return succeeded; |
AzureIoTClient | 4:57e049bce51e | 462 | } |
AzureIoTClient | 4:57e049bce51e | 463 | |
AzureIoTClient | 4:57e049bce51e | 464 | static void herdTheMessengers(PAMQP_TRANSPORT_STATE state, bool logMessengerStopFailures) |
AzureIoTClient | 4:57e049bce51e | 465 | { |
AzureIoTClient | 4:57e049bce51e | 466 | PDLIST_ENTRY currentMessengerLink = state->messengerCorral.Flink; |
AzureIoTClient | 4:57e049bce51e | 467 | |
AzureIoTClient | 4:57e049bce51e | 468 | // |
AzureIoTClient | 4:57e049bce51e | 469 | // We walk the list of messenger containers. We try to stop each one in turn. |
AzureIoTClient | 4:57e049bce51e | 470 | // |
AzureIoTClient | 4:57e049bce51e | 471 | // If we are able to stop the contained messenger, we will free the actual messenger. We will then free the messenger container. |
AzureIoTClient | 4:57e049bce51e | 472 | // |
AzureIoTClient | 4:57e049bce51e | 473 | while (currentMessengerLink != &state->messengerCorral) |
AzureIoTClient | 4:57e049bce51e | 474 | { |
AzureIoTClient | 4:57e049bce51e | 475 | PDLIST_ENTRY nextLink = currentMessengerLink->Flink; |
AzureIoTClient | 4:57e049bce51e | 476 | pn_messenger_t* oldMessenger = containingRecord(currentMessengerLink, MESSENGER_CONTAINER, entry)->messengerThatDidNotStop; |
AzureIoTClient | 4:57e049bce51e | 477 | if (stopMessenger(oldMessenger)) |
AzureIoTClient | 4:57e049bce51e | 478 | { |
AzureIoTClient | 4:57e049bce51e | 479 | (void)(DList_RemoveEntryList(currentMessengerLink)); |
AzureIoTClient | 4:57e049bce51e | 480 | free(containingRecord(currentMessengerLink, MESSENGER_CONTAINER, entry)); |
AzureIoTClient | 4:57e049bce51e | 481 | pn_messenger_free(oldMessenger); |
AzureIoTClient | 4:57e049bce51e | 482 | } |
AzureIoTClient | 4:57e049bce51e | 483 | else |
AzureIoTClient | 4:57e049bce51e | 484 | { |
AzureIoTClient | 4:57e049bce51e | 485 | if (logMessengerStopFailures == true) |
AzureIoTClient | 4:57e049bce51e | 486 | { |
AzureIoTClient | 4:57e049bce51e | 487 | LogError("Unable to free the messenger at address: %p\r\n", oldMessenger); |
AzureIoTClient | 4:57e049bce51e | 488 | } |
AzureIoTClient | 4:57e049bce51e | 489 | } |
AzureIoTClient | 4:57e049bce51e | 490 | currentMessengerLink = nextLink; |
AzureIoTClient | 4:57e049bce51e | 491 | } |
AzureIoTClient | 4:57e049bce51e | 492 | } |
AzureIoTClient | 4:57e049bce51e | 493 | |
AzureIoTClient | 4:57e049bce51e | 494 | static void disposeOfOldMessenger(PAMQP_TRANSPORT_STATE state) |
AzureIoTClient | 4:57e049bce51e | 495 | { |
AzureIoTClient | 4:57e049bce51e | 496 | if (state->messenger) |
AzureIoTClient | 4:57e049bce51e | 497 | { |
AzureIoTClient | 4:57e049bce51e | 498 | if (stopMessenger(state->messenger)) |
AzureIoTClient | 4:57e049bce51e | 499 | { |
AzureIoTClient | 4:57e049bce51e | 500 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_022: [If pn_messenger_stop returns 0 then invoke pn_messenger_free on the messenger.]*/ |
AzureIoTClient | 4:57e049bce51e | 501 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_025: [If pn_messenger_stopped returns true, then call pn_messenger_free.]*/ |
AzureIoTClient | 4:57e049bce51e | 502 | // |
AzureIoTClient | 4:57e049bce51e | 503 | // Yay. It successfully stopped. |
AzureIoTClient | 4:57e049bce51e | 504 | // |
AzureIoTClient | 4:57e049bce51e | 505 | pn_messenger_free(state->messenger); |
AzureIoTClient | 4:57e049bce51e | 506 | } |
AzureIoTClient | 4:57e049bce51e | 507 | else |
AzureIoTClient | 4:57e049bce51e | 508 | { |
AzureIoTClient | 4:57e049bce51e | 509 | // |
AzureIoTClient | 4:57e049bce51e | 510 | // If we couldn't destroy it, then put it on a list and try to do it later in our "spare" time. |
AzureIoTClient | 4:57e049bce51e | 511 | // |
AzureIoTClient | 4:57e049bce51e | 512 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_023: [If pn_messenger_stop returns any result other than PN_INPROGRESS then place the messenger on a list for attempting to stop later.]*/ |
AzureIoTClient | 4:57e049bce51e | 513 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_026: [If pn_messenger_stopped never returns true, then place the messenger on a list for attempting to stop later.]*/ |
AzureIoTClient | 4:57e049bce51e | 514 | PMESSENGER_CONTAINER newContainer = malloc(sizeof(MESSENGER_CONTAINER)); |
AzureIoTClient | 4:57e049bce51e | 515 | if (newContainer == NULL) |
AzureIoTClient | 4:57e049bce51e | 516 | { |
AzureIoTClient | 4:57e049bce51e | 517 | // |
AzureIoTClient | 4:57e049bce51e | 518 | // Darn! This is not good. These containers are not very big. If we can't allocate one, things are pretty bad. |
AzureIoTClient | 4:57e049bce51e | 519 | // However, it doesn't make sense to actually free this messenger. It hasn't been stopped and we can NOT be sure who |
AzureIoTClient | 4:57e049bce51e | 520 | // is referencing the memory that it has allocated. The safest thing to do in this very bad situation is to drop |
AzureIoTClient | 4:57e049bce51e | 521 | // it on the floor. It's a leak, but a leak is better than anything else. There is a pretty good likelyhood that |
AzureIoTClient | 4:57e049bce51e | 522 | // the app is exiting soon. The dropped memory will be cleaned up by exit. |
AzureIoTClient | 4:57e049bce51e | 523 | ; |
AzureIoTClient | 4:57e049bce51e | 524 | } |
AzureIoTClient | 4:57e049bce51e | 525 | else |
AzureIoTClient | 4:57e049bce51e | 526 | { |
AzureIoTClient | 4:57e049bce51e | 527 | newContainer->messengerThatDidNotStop = state->messenger; |
AzureIoTClient | 4:57e049bce51e | 528 | DList_InsertTailList(&state->messengerCorral, &newContainer->entry); |
AzureIoTClient | 4:57e049bce51e | 529 | } |
AzureIoTClient | 4:57e049bce51e | 530 | } |
AzureIoTClient | 4:57e049bce51e | 531 | state->messenger = NULL; |
AzureIoTClient | 4:57e049bce51e | 532 | } |
AzureIoTClient | 4:57e049bce51e | 533 | } |
AzureIoTClient | 4:57e049bce51e | 534 | |
AzureIoTClient | 4:57e049bce51e | 535 | static void clientTransportAMQP_Destroy(TRANSPORT_HANDLE handle) |
AzureIoTClient | 4:57e049bce51e | 536 | { |
AzureIoTClient | 4:57e049bce51e | 537 | PAMQP_TRANSPORT_STATE transportState = (PAMQP_TRANSPORT_STATE)handle; |
AzureIoTClient | 4:57e049bce51e | 538 | |
AzureIoTClient | 4:57e049bce51e | 539 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_010: [clientTransportAMQP_Destroy shall do nothing if the handle is NULL.]*/ |
AzureIoTClient | 4:57e049bce51e | 540 | if (transportState) |
AzureIoTClient | 4:57e049bce51e | 541 | { |
AzureIoTClient | 4:57e049bce51e | 542 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_018: [clientTransportAMQP_Destroy shall free all the resources currently in use.]*/ |
AzureIoTClient | 4:57e049bce51e | 543 | // |
AzureIoTClient | 4:57e049bce51e | 544 | // Go through all of the active work items. Complete their associated messages. |
AzureIoTClient | 4:57e049bce51e | 545 | // |
AzureIoTClient | 4:57e049bce51e | 546 | while (!DList_IsListEmpty(&transportState->workInProgress)) |
AzureIoTClient | 4:57e049bce51e | 547 | { |
AzureIoTClient | 4:57e049bce51e | 548 | PDLIST_ENTRY currentEntry = DList_RemoveHeadList(&transportState->workInProgress); |
AzureIoTClient | 4:57e049bce51e | 549 | PAMQP_WORK_ITEM currentItem = containingRecord(currentEntry, AMQP_WORK_ITEM, link); |
AzureIoTClient | 4:57e049bce51e | 550 | IoTHubClient_LL_SendComplete(transportState->savedClientHandle, ¤tItem->eventMessages, IOTHUB_BATCHSTATE_FAILED); |
AzureIoTClient | 4:57e049bce51e | 551 | DList_InsertTailList(&transportState->availableWorkItems, ¤tItem->link); |
AzureIoTClient | 4:57e049bce51e | 552 | } |
AzureIoTClient | 4:57e049bce51e | 553 | |
AzureIoTClient | 4:57e049bce51e | 554 | // |
AzureIoTClient | 4:57e049bce51e | 555 | // Get rid of work items that aren't being used, and that should be everything given the above loop! |
AzureIoTClient | 4:57e049bce51e | 556 | // |
AzureIoTClient | 4:57e049bce51e | 557 | while (!DList_IsListEmpty(&transportState->availableWorkItems)) |
AzureIoTClient | 4:57e049bce51e | 558 | { |
AzureIoTClient | 4:57e049bce51e | 559 | PDLIST_ENTRY currentEntry = DList_RemoveHeadList(&transportState->availableWorkItems); |
AzureIoTClient | 4:57e049bce51e | 560 | free(containingRecord(currentEntry, AMQP_WORK_ITEM, link)); |
AzureIoTClient | 4:57e049bce51e | 561 | } |
AzureIoTClient | 4:57e049bce51e | 562 | |
AzureIoTClient | 4:57e049bce51e | 563 | disposeOfOldMessenger(transportState); |
AzureIoTClient | 4:57e049bce51e | 564 | |
AzureIoTClient | 4:57e049bce51e | 565 | // |
AzureIoTClient | 4:57e049bce51e | 566 | // herdTheMessengers will NOT free a messenger that couldn't be stopped. Therefore the following |
AzureIoTClient | 4:57e049bce51e | 567 | // function could result in a memory leak of old messengers. This is by FAR less of a problem then |
AzureIoTClient | 4:57e049bce51e | 568 | // having memory returned in to pool that something else might still be referring to. (To get all folksy |
AzureIoTClient | 4:57e049bce51e | 569 | // here, don't put a sick chicken back in the hen house.) |
AzureIoTClient | 4:57e049bce51e | 570 | // |
AzureIoTClient | 4:57e049bce51e | 571 | |
AzureIoTClient | 4:57e049bce51e | 572 | herdTheMessengers(transportState, true); |
AzureIoTClient | 4:57e049bce51e | 573 | if (transportState->message) |
AzureIoTClient | 4:57e049bce51e | 574 | { |
AzureIoTClient | 4:57e049bce51e | 575 | pn_message_free(transportState->message); |
AzureIoTClient | 4:57e049bce51e | 576 | } |
AzureIoTClient | 4:57e049bce51e | 577 | STRING_delete(transportState->messageAddress); |
AzureIoTClient | 4:57e049bce51e | 578 | STRING_delete(transportState->eventAddress); |
AzureIoTClient | 4:57e049bce51e | 579 | STRING_delete(transportState->urledDeviceId); |
AzureIoTClient | 4:57e049bce51e | 580 | STRING_delete(transportState->devicesPortionPath); |
AzureIoTClient | 4:57e049bce51e | 581 | STRING_delete(transportState->cbsAddress); |
AzureIoTClient | 4:57e049bce51e | 582 | STRING_delete(transportState->deviceKey); |
AzureIoTClient | 4:57e049bce51e | 583 | STRING_delete(transportState->zeroLengthString); |
AzureIoTClient | 5:8d58d20699dd | 584 | if (transportState->trustedCertificates != NULL) |
AzureIoTClient | 5:8d58d20699dd | 585 | { |
AzureIoTClient | 5:8d58d20699dd | 586 | free(transportState->trustedCertificates); |
AzureIoTClient | 5:8d58d20699dd | 587 | } |
AzureIoTClient | 4:57e049bce51e | 588 | |
AzureIoTClient | 5:8d58d20699dd | 589 | free(transportState); |
AzureIoTClient | 4:57e049bce51e | 590 | } |
AzureIoTClient | 4:57e049bce51e | 591 | } |
AzureIoTClient | 4:57e049bce51e | 592 | |
AzureIoTClient | 4:57e049bce51e | 593 | static int putToken(PAMQP_TRANSPORT_STATE transportState, STRING_HANDLE token) |
AzureIoTClient | 4:57e049bce51e | 594 | { |
AzureIoTClient | 4:57e049bce51e | 595 | int result = 0; |
AzureIoTClient | 4:57e049bce51e | 596 | |
AzureIoTClient | 4:57e049bce51e | 597 | pn_data_t *properties; |
AzureIoTClient | 4:57e049bce51e | 598 | pn_atom_t messageId; |
AzureIoTClient | 4:57e049bce51e | 599 | |
AzureIoTClient | 4:57e049bce51e | 600 | messageId.u.as_ulong = transportState->sendTokenMessageId; |
AzureIoTClient | 4:57e049bce51e | 601 | messageId.type = PN_ULONG; |
AzureIoTClient | 4:57e049bce51e | 602 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_130: [putToken clears the single messages that is used for all communication via proton.]*/ |
AzureIoTClient | 4:57e049bce51e | 603 | pn_message_clear(transportState->message); |
AzureIoTClient | 4:57e049bce51e | 604 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_131: [putToken obtains the properties of the message.]*/ |
AzureIoTClient | 4:57e049bce51e | 605 | properties = pn_message_properties(transportState->message); |
AzureIoTClient | 4:57e049bce51e | 606 | if (properties == NULL) |
AzureIoTClient | 4:57e049bce51e | 607 | { |
AzureIoTClient | 4:57e049bce51e | 608 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_132: [If this fails then putToken fails.]*/ |
AzureIoTClient | 4:57e049bce51e | 609 | LogError("unable to pn_message_properties\r\n"); |
AzureIoTClient | 4:57e049bce51e | 610 | result = __LINE__; |
AzureIoTClient | 4:57e049bce51e | 611 | } |
AzureIoTClient | 4:57e049bce51e | 612 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_133: [putToken constructs a proton map which describes the put token operation. If this construction fails then putToken fails.]*/ |
AzureIoTClient | 4:57e049bce51e | 613 | else if (!((pn_data_put_map(properties) == 0) && |
AzureIoTClient | 4:57e049bce51e | 614 | (pn_data_enter(properties) == true) && |
AzureIoTClient | 4:57e049bce51e | 615 | (pn_data_put_string(properties, pn_bytes(strlen(PROTON_MAP_OPERATIONS_KEY), PROTON_MAP_OPERATIONS_KEY)) == 0) && // key |
AzureIoTClient | 4:57e049bce51e | 616 | (pn_data_put_string(properties, pn_bytes(strlen(PROTON_MAP_PUT_TOKEN_OPERATION), PROTON_MAP_PUT_TOKEN_OPERATION)) == 0) && // value |
AzureIoTClient | 4:57e049bce51e | 617 | (pn_data_put_string(properties, pn_bytes(strlen(PROTON_MAP_TYPE_KEY), PROTON_MAP_TYPE_KEY)) == 0) && // key |
AzureIoTClient | 4:57e049bce51e | 618 | (pn_data_put_string(properties, pn_bytes(strlen(PROTON_MAP_TOKEN_TYPE), PROTON_MAP_TOKEN_TYPE)) == 0) && // value |
AzureIoTClient | 4:57e049bce51e | 619 | (pn_data_put_string(properties, pn_bytes(strlen(PROTON_MAP_NAME_KEY), PROTON_MAP_NAME_KEY)) == 0) && // key |
AzureIoTClient | 4:57e049bce51e | 620 | (pn_data_put_string(properties, pn_bytes(STRING_length(transportState->devicesPortionPath), STRING_c_str(transportState->devicesPortionPath))) == 0) && // value |
AzureIoTClient | 4:57e049bce51e | 621 | (pn_data_exit(properties)) |
AzureIoTClient | 4:57e049bce51e | 622 | )) |
AzureIoTClient | 4:57e049bce51e | 623 | { |
AzureIoTClient | 4:57e049bce51e | 624 | LogError("unable to build CBS put token map\r\n"); |
AzureIoTClient | 4:57e049bce51e | 625 | result = __LINE__; |
AzureIoTClient | 4:57e049bce51e | 626 | } |
AzureIoTClient | 4:57e049bce51e | 627 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_134: [putToken sets the address (CBS endpoint) that this put token operation is targeted to.]*/ |
AzureIoTClient | 4:57e049bce51e | 628 | else if (pn_message_set_reply_to(transportState->message, CBS_REPLY_TO) != 0) |
AzureIoTClient | 4:57e049bce51e | 629 | { |
AzureIoTClient | 4:57e049bce51e | 630 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_135: [If this fails then putToken fails.]*/ |
AzureIoTClient | 4:57e049bce51e | 631 | LogError("unable to pn_message_set_reply_to\r\n"); |
AzureIoTClient | 4:57e049bce51e | 632 | result = __LINE__; |
AzureIoTClient | 4:57e049bce51e | 633 | } |
AzureIoTClient | 4:57e049bce51e | 634 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_136: [putToken sets the reply to address for the put token operation.]*/ |
AzureIoTClient | 4:57e049bce51e | 635 | else if (pn_message_set_address(transportState->message, STRING_c_str(transportState->cbsAddress)) != 0) |
AzureIoTClient | 4:57e049bce51e | 636 | { |
AzureIoTClient | 4:57e049bce51e | 637 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_137: [If this fails then putToken fails.] */ |
AzureIoTClient | 4:57e049bce51e | 638 | LogError("unable to pn_message_set_address\r\n"); |
AzureIoTClient | 4:57e049bce51e | 639 | result = __LINE__; |
AzureIoTClient | 4:57e049bce51e | 640 | } |
AzureIoTClient | 4:57e049bce51e | 641 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_138: [putToken sets the message to not be inferred.]*/ |
AzureIoTClient | 4:57e049bce51e | 642 | else if (pn_message_set_inferred(transportState->message, false) != 0) |
AzureIoTClient | 4:57e049bce51e | 643 | { |
AzureIoTClient | 4:57e049bce51e | 644 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_139: [If this fails then putToken fails.]*/ |
AzureIoTClient | 4:57e049bce51e | 645 | LogError("unable to pn_message_set_inferred\r\n"); |
AzureIoTClient | 4:57e049bce51e | 646 | result = __LINE__; |
AzureIoTClient | 4:57e049bce51e | 647 | } |
AzureIoTClient | 4:57e049bce51e | 648 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_140: [putToken sets the messageId of the message to the expiry of the token.]*/ |
AzureIoTClient | 4:57e049bce51e | 649 | else if (pn_message_set_id(transportState->message, messageId) != 0) |
AzureIoTClient | 4:57e049bce51e | 650 | { |
AzureIoTClient | 4:57e049bce51e | 651 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_141: [If this fails then putToken fails.]*/ |
AzureIoTClient | 4:57e049bce51e | 652 | LogError("Unable to set the message id on the transfer of the token to the CBS\r\n"); |
AzureIoTClient | 4:57e049bce51e | 653 | result = __LINE__; |
AzureIoTClient | 4:57e049bce51e | 654 | } |
AzureIoTClient | 4:57e049bce51e | 655 | else |
AzureIoTClient | 4:57e049bce51e | 656 | { |
AzureIoTClient | 4:57e049bce51e | 657 | pn_data_t* body; |
AzureIoTClient | 4:57e049bce51e | 658 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_142: [putToken obtains the body of the message.]*/ |
AzureIoTClient | 4:57e049bce51e | 659 | body = pn_message_body(transportState->message); |
AzureIoTClient | 4:57e049bce51e | 660 | if (body == NULL) |
AzureIoTClient | 4:57e049bce51e | 661 | { |
AzureIoTClient | 4:57e049bce51e | 662 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_143: [If this fails then putToken fails.]*/ |
AzureIoTClient | 4:57e049bce51e | 663 | LogError("Unable to pn_message_body\r\n"); |
AzureIoTClient | 4:57e049bce51e | 664 | result = __LINE__; |
AzureIoTClient | 4:57e049bce51e | 665 | } |
AzureIoTClient | 4:57e049bce51e | 666 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_144: [putToken places the actual SAS token into the body of the message.]*/ |
AzureIoTClient | 4:57e049bce51e | 667 | else if (pn_data_put_string(body, pn_bytes(STRING_length(token), STRING_c_str(token))) != 0) |
AzureIoTClient | 4:57e049bce51e | 668 | { |
AzureIoTClient | 4:57e049bce51e | 669 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_145: [If this fails then putToken fails.]*/ |
AzureIoTClient | 4:57e049bce51e | 670 | LogError("Unable to pn_data_put_string\r\n"); |
AzureIoTClient | 4:57e049bce51e | 671 | result = __LINE__; |
AzureIoTClient | 4:57e049bce51e | 672 | } |
AzureIoTClient | 4:57e049bce51e | 673 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_146: [putToken invokes pn_messenger_put.]*/ |
AzureIoTClient | 4:57e049bce51e | 674 | else if (pn_messenger_put(transportState->messenger, transportState->message) != 0) |
AzureIoTClient | 4:57e049bce51e | 675 | { |
AzureIoTClient | 4:57e049bce51e | 676 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_147: [If this fails then putToken fails.]*/ |
AzureIoTClient | 4:57e049bce51e | 677 | LogError("Unable to pn_messenger_put\r\n"); |
AzureIoTClient | 4:57e049bce51e | 678 | result = __LINE__; |
AzureIoTClient | 4:57e049bce51e | 679 | transportState->messengerInitialized = false; |
AzureIoTClient | 4:57e049bce51e | 680 | } |
AzureIoTClient | 4:57e049bce51e | 681 | else |
AzureIoTClient | 4:57e049bce51e | 682 | { |
AzureIoTClient | 4:57e049bce51e | 683 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_148: [A proton tracker is obtained.]*/ |
AzureIoTClient | 4:57e049bce51e | 684 | pn_tracker_t tracker = pn_messenger_outgoing_tracker(transportState->messenger); |
AzureIoTClient | 4:57e049bce51e | 685 | time_t beginningOfWaitLoop = get_time(NULL); |
AzureIoTClient | 4:57e049bce51e | 686 | time_t currentTime; |
AzureIoTClient | 4:57e049bce51e | 687 | result = __LINE__; |
AzureIoTClient | 4:57e049bce51e | 688 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_149: [putToken will loop invoking pn_messenger_work waiting for the message to reach a terminal delivery state.]*/ |
AzureIoTClient | 4:57e049bce51e | 689 | do |
AzureIoTClient | 4:57e049bce51e | 690 | { |
AzureIoTClient | 4:57e049bce51e | 691 | int protonResult; |
AzureIoTClient | 4:57e049bce51e | 692 | if (checkForErrorsThenWork(transportState) == false) |
AzureIoTClient | 4:57e049bce51e | 693 | { |
AzureIoTClient | 4:57e049bce51e | 694 | break; |
AzureIoTClient | 4:57e049bce51e | 695 | } |
AzureIoTClient | 4:57e049bce51e | 696 | else |
AzureIoTClient | 4:57e049bce51e | 697 | { |
AzureIoTClient | 4:57e049bce51e | 698 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_150: [If a terminal delivery state is not reached then putToken fails.]*/ |
AzureIoTClient | 4:57e049bce51e | 699 | if ((protonResult = pn_messenger_status(transportState->messenger, tracker)) != PN_STATUS_PENDING) |
AzureIoTClient | 4:57e049bce51e | 700 | { |
AzureIoTClient | 4:57e049bce51e | 701 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_151: [On reaching a terminal delivery state, if the message status is NOT accepted then putToken fails.]*/ |
AzureIoTClient | 4:57e049bce51e | 702 | if (protonResult != PN_STATUS_ACCEPTED) |
AzureIoTClient | 4:57e049bce51e | 703 | { |
AzureIoTClient | 4:57e049bce51e | 704 | LogError("Error sending the token: %s\r\n", pn_error_text(pn_messenger_error(transportState->messenger))); |
AzureIoTClient | 4:57e049bce51e | 705 | result = __LINE__; |
AzureIoTClient | 4:57e049bce51e | 706 | transportState->messengerInitialized = false; |
AzureIoTClient | 4:57e049bce51e | 707 | } |
AzureIoTClient | 4:57e049bce51e | 708 | else |
AzureIoTClient | 4:57e049bce51e | 709 | { |
AzureIoTClient | 4:57e049bce51e | 710 | result = 0; |
AzureIoTClient | 4:57e049bce51e | 711 | } |
AzureIoTClient | 4:57e049bce51e | 712 | break; |
AzureIoTClient | 4:57e049bce51e | 713 | } |
AzureIoTClient | 4:57e049bce51e | 714 | } |
AzureIoTClient | 4:57e049bce51e | 715 | currentTime = get_time(NULL); |
AzureIoTClient | 4:57e049bce51e | 716 | } while (difftime(currentTime, beginningOfWaitLoop) < transportState->cbsRequestAcceptTime); |
AzureIoTClient | 4:57e049bce51e | 717 | |
AzureIoTClient | 4:57e049bce51e | 718 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_152: [The tracker is settled.]*/ |
AzureIoTClient | 4:57e049bce51e | 719 | pn_messenger_settle(transportState->messenger, tracker, 0); |
AzureIoTClient | 4:57e049bce51e | 720 | if (result == 0) |
AzureIoTClient | 4:57e049bce51e | 721 | { |
AzureIoTClient | 4:57e049bce51e | 722 | // |
AzureIoTClient | 4:57e049bce51e | 723 | // Successfully sent the token. Wait around for a reply. |
AzureIoTClient | 4:57e049bce51e | 724 | // |
AzureIoTClient | 4:57e049bce51e | 725 | transportState->waitingForPutTokenReply = true; |
AzureIoTClient | 4:57e049bce51e | 726 | beginningOfWaitLoop = get_time(NULL); |
AzureIoTClient | 4:57e049bce51e | 727 | result = __LINE__; |
AzureIoTClient | 4:57e049bce51e | 728 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_153: [putToken will again loop waiting for a reply.]*/ |
AzureIoTClient | 4:57e049bce51e | 729 | do |
AzureIoTClient | 4:57e049bce51e | 730 | { |
AzureIoTClient | 4:57e049bce51e | 731 | processReceives(transportState); |
AzureIoTClient | 4:57e049bce51e | 732 | if (transportState->waitingForPutTokenReply == true) |
AzureIoTClient | 4:57e049bce51e | 733 | { |
AzureIoTClient | 4:57e049bce51e | 734 | if (checkForErrorsThenWork(transportState) == false) |
AzureIoTClient | 4:57e049bce51e | 735 | { |
AzureIoTClient | 4:57e049bce51e | 736 | break; |
AzureIoTClient | 4:57e049bce51e | 737 | } |
AzureIoTClient | 4:57e049bce51e | 738 | } |
AzureIoTClient | 4:57e049bce51e | 739 | else |
AzureIoTClient | 4:57e049bce51e | 740 | { |
AzureIoTClient | 4:57e049bce51e | 741 | result = 0; |
AzureIoTClient | 4:57e049bce51e | 742 | break; |
AzureIoTClient | 4:57e049bce51e | 743 | } |
AzureIoTClient | 4:57e049bce51e | 744 | currentTime = get_time(NULL); |
AzureIoTClient | 4:57e049bce51e | 745 | } while (difftime(currentTime, beginningOfWaitLoop) < transportState->cbsReplyTime); |
AzureIoTClient | 4:57e049bce51e | 746 | } |
AzureIoTClient | 4:57e049bce51e | 747 | else |
AzureIoTClient | 4:57e049bce51e | 748 | { |
AzureIoTClient | 4:57e049bce51e | 749 | LogError("An network error occured while waiting for a CBS reply.\r\n"); |
AzureIoTClient | 4:57e049bce51e | 750 | transportState->messengerInitialized = false; |
AzureIoTClient | 4:57e049bce51e | 751 | } |
AzureIoTClient | 4:57e049bce51e | 752 | } |
AzureIoTClient | 4:57e049bce51e | 753 | } |
AzureIoTClient | 4:57e049bce51e | 754 | pn_message_clear(transportState->message); |
AzureIoTClient | 4:57e049bce51e | 755 | return result; |
AzureIoTClient | 4:57e049bce51e | 756 | } |
AzureIoTClient | 4:57e049bce51e | 757 | |
AzureIoTClient | 4:57e049bce51e | 758 | static void renewIfNecessaryTheCBS(PAMQP_TRANSPORT_STATE transportState) |
AzureIoTClient | 4:57e049bce51e | 759 | { |
AzureIoTClient | 4:57e049bce51e | 760 | |
AzureIoTClient | 4:57e049bce51e | 761 | // |
AzureIoTClient | 4:57e049bce51e | 762 | // There is an expiry stored in the state. It has the last expiry value used to define a |
AzureIoTClient | 4:57e049bce51e | 763 | // sas token. If we are within 20 minutes OR we're PAST that expiration generate a new |
AzureIoTClient | 4:57e049bce51e | 764 | // expiry time and create a new sas token and "put" it to the cbs. |
AzureIoTClient | 4:57e049bce51e | 765 | // |
AzureIoTClient | 4:57e049bce51e | 766 | |
AzureIoTClient | 4:57e049bce51e | 767 | size_t secondsSinceEpoch; |
AzureIoTClient | 4:57e049bce51e | 768 | int differenceWithLastExpiry; |
AzureIoTClient | 4:57e049bce51e | 769 | |
AzureIoTClient | 4:57e049bce51e | 770 | transportState->putTokenWasSuccessful = false; |
AzureIoTClient | 4:57e049bce51e | 771 | // |
AzureIoTClient | 4:57e049bce51e | 772 | // Get the number of seconds since the epoch. |
AzureIoTClient | 4:57e049bce51e | 773 | // |
AzureIoTClient | 4:57e049bce51e | 774 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_163: [Invocation of renewIfNecessaryTheCBS will first obtain the current number of seconds since the epoch.*/ |
AzureIoTClient | 4:57e049bce51e | 775 | secondsSinceEpoch = (size_t)(difftime(get_time(NULL), EPOCH_TIME_T_VALUE)+0); // adding zero because a compiler feels it's necessary. |
AzureIoTClient | 4:57e049bce51e | 776 | differenceWithLastExpiry = transportState->lastExpiryUsed - secondsSinceEpoch; |
AzureIoTClient | 4:57e049bce51e | 777 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_164: [If the difference between lastExpiryUsed and the current value is less than refresh seconds then renewIfNecessaryTheCBS will attempt to renew the SAS.]*/ |
AzureIoTClient | 4:57e049bce51e | 778 | if ((differenceWithLastExpiry <= 0) || |
AzureIoTClient | 4:57e049bce51e | 779 | (((size_t)differenceWithLastExpiry) < transportState->sasRefreshLine)) // Within refresh minutes (or past if negative) of the expiration |
AzureIoTClient | 4:57e049bce51e | 780 | { |
AzureIoTClient | 4:57e049bce51e | 781 | // |
AzureIoTClient | 4:57e049bce51e | 782 | // We already have the seconds since the epoch. Add an hour to it and generate the new SAS. |
AzureIoTClient | 4:57e049bce51e | 783 | // |
AzureIoTClient | 4:57e049bce51e | 784 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_165: [If SAS renewal is to occur then lifetime seconds is added to the current number of seconds.]*/ |
AzureIoTClient | 4:57e049bce51e | 785 | size_t possibleNewExpiry = secondsSinceEpoch + transportState->sasTokenLifetime; |
AzureIoTClient | 4:57e049bce51e | 786 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_169: [SASToken_Create is invoked.] */ |
AzureIoTClient | 4:57e049bce51e | 787 | STRING_HANDLE newSASToken = SASToken_Create(transportState->deviceKey, transportState->devicesPortionPath, transportState->zeroLengthString, possibleNewExpiry); |
AzureIoTClient | 4:57e049bce51e | 788 | if (newSASToken == NULL) |
AzureIoTClient | 4:57e049bce51e | 789 | { |
AzureIoTClient | 4:57e049bce51e | 790 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_170: [If SASToken_Create returns NULL then renewal is considered a failure and we move on to the next phase of DoWork.]*/ |
AzureIoTClient | 4:57e049bce51e | 791 | LogError("Could not generate a new SAS token for the CBS\r\n"); |
AzureIoTClient | 4:57e049bce51e | 792 | } |
AzureIoTClient | 4:57e049bce51e | 793 | else |
AzureIoTClient | 4:57e049bce51e | 794 | { |
AzureIoTClient | 4:57e049bce51e | 795 | // |
AzureIoTClient | 4:57e049bce51e | 796 | // We are going to save off in the transport state this POSSIBLE expiry value. Sending token and response processing code |
AzureIoTClient | 4:57e049bce51e | 797 | // will use it as the message id and correlation id for the request/response. This will work |
AzureIoTClient | 4:57e049bce51e | 798 | // because the code that sends the token will wait for AT LEAST one second for a valid response. If a valid response comes |
AzureIoTClient | 4:57e049bce51e | 799 | // back in that period of time then even if for some bizarre reason we need to renew again in less than a second, we know |
AzureIoTClient | 4:57e049bce51e | 800 | // there won't be a spurious response (because we just got the resonse!). |
AzureIoTClient | 4:57e049bce51e | 801 | // |
AzureIoTClient | 4:57e049bce51e | 802 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_166: [This value is saved in the state as sendTokenMessageId.]*/ |
AzureIoTClient | 4:57e049bce51e | 803 | transportState->sendTokenMessageId = possibleNewExpiry; |
AzureIoTClient | 4:57e049bce51e | 804 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_171: [renewIfNecessaryTheCBS will invoke a local static function putToken.]*/ |
AzureIoTClient | 4:57e049bce51e | 805 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_172: [If putToken fails then the renewal is considered a failure and we move on to the next phase of DoWork.]*/ |
AzureIoTClient | 4:57e049bce51e | 806 | if (putToken(transportState, newSASToken) == 0) |
AzureIoTClient | 4:57e049bce51e | 807 | { |
AzureIoTClient | 4:57e049bce51e | 808 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_173: [Note that putToken actually sets a state value which will indicate whether the CBS accepted the token and responded with success.]*/ |
AzureIoTClient | 4:57e049bce51e | 809 | if (transportState->putTokenWasSuccessful == true) |
AzureIoTClient | 4:57e049bce51e | 810 | { |
AzureIoTClient | 4:57e049bce51e | 811 | transportState->lastExpiryUsed = possibleNewExpiry; |
AzureIoTClient | 4:57e049bce51e | 812 | } |
AzureIoTClient | 4:57e049bce51e | 813 | } |
AzureIoTClient | 4:57e049bce51e | 814 | STRING_delete(newSASToken); |
AzureIoTClient | 4:57e049bce51e | 815 | } |
AzureIoTClient | 4:57e049bce51e | 816 | } |
AzureIoTClient | 4:57e049bce51e | 817 | } |
AzureIoTClient | 4:57e049bce51e | 818 | |
AzureIoTClient | 4:57e049bce51e | 819 | static bool protonMessengerInit(PAMQP_TRANSPORT_STATE amqpState) |
AzureIoTClient | 4:57e049bce51e | 820 | { |
AzureIoTClient | 4:57e049bce51e | 821 | if (!amqpState->messengerInitialized) |
AzureIoTClient | 4:57e049bce51e | 822 | { |
AzureIoTClient | 4:57e049bce51e | 823 | rollbackEvent(amqpState); |
AzureIoTClient | 4:57e049bce51e | 824 | disposeOfOldMessenger(amqpState); |
AzureIoTClient | 4:57e049bce51e | 825 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_027: [DoWork shall invoke pn_messenger to create a new messenger.]*/ |
AzureIoTClient | 4:57e049bce51e | 826 | if ((amqpState->messenger = pn_messenger(NULL)) == NULL) |
AzureIoTClient | 4:57e049bce51e | 827 | { |
AzureIoTClient | 4:57e049bce51e | 828 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_028: [If pn_messenger returns NULL then the messenger initialization fails.]*/ |
AzureIoTClient | 4:57e049bce51e | 829 | LogError("The messenger is not able to be created.\r\n"); |
AzureIoTClient | 4:57e049bce51e | 830 | } |
AzureIoTClient | 5:8d58d20699dd | 831 | /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_001: [If the option "TrustedCerts" has been set (length greater than 0), the trusted certificates string shall be passed to Proton by a call to pn_messenger_set_trusted_certificates.] */ |
AzureIoTClient | 5:8d58d20699dd | 832 | else if ((amqpState->trustedCertificates != NULL) && |
AzureIoTClient | 5:8d58d20699dd | 833 | (strlen(amqpState->trustedCertificates) > 0) && |
AzureIoTClient | 5:8d58d20699dd | 834 | (pn_messenger_set_trusted_certificates(amqpState->messenger, amqpState->trustedCertificates) != 0)) |
AzureIoTClient | 5:8d58d20699dd | 835 | { |
AzureIoTClient | 5:8d58d20699dd | 836 | /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_002: [If pn_messenger_set_trusted_certificates fails, then messenger initialization shall fail.] */ |
AzureIoTClient | 5:8d58d20699dd | 837 | LogError("unable to pass certificate information via pn_messenger_set_trusted_certificates\r\n"); |
AzureIoTClient | 5:8d58d20699dd | 838 | } |
AzureIoTClient | 5:8d58d20699dd | 839 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_029: [pn_messenger_start shall be invoked.]*/ |
AzureIoTClient | 4:57e049bce51e | 840 | else if (pn_messenger_start(amqpState->messenger) != 0) |
AzureIoTClient | 4:57e049bce51e | 841 | { |
AzureIoTClient | 4:57e049bce51e | 842 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_030: [If pn_messenger_start returns a non-zero value then messenger initialization fails.]*/ |
AzureIoTClient | 4:57e049bce51e | 843 | LogError("unable to pn_messenger_start\r\n"); |
AzureIoTClient | 4:57e049bce51e | 844 | } |
AzureIoTClient | 4:57e049bce51e | 845 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_031: [pn_messenger_set_blocking shall be invoked.]*/ |
AzureIoTClient | 4:57e049bce51e | 846 | else if (pn_messenger_set_blocking(amqpState->messenger, false) != 0) |
AzureIoTClient | 4:57e049bce51e | 847 | { |
AzureIoTClient | 4:57e049bce51e | 848 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_032: [If pn_messenger_set_blocking returns a non-zero value then messenger initialization fails.]*/ |
AzureIoTClient | 4:57e049bce51e | 849 | LogError("unable to pn_messenger_set_blocking\r\n"); |
AzureIoTClient | 4:57e049bce51e | 850 | } |
AzureIoTClient | 4:57e049bce51e | 851 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_033: [pn_messenger_set_snd_settle_mode shall be invoked.]*/ |
AzureIoTClient | 4:57e049bce51e | 852 | else if (pn_messenger_set_snd_settle_mode(amqpState->messenger, PN_SND_UNSETTLED) != 0) |
AzureIoTClient | 4:57e049bce51e | 853 | { |
AzureIoTClient | 4:57e049bce51e | 854 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_034: [If pn_messenger_set_snd_settle_mode returns a non-zero value then messenger initialization fails.]*/ |
AzureIoTClient | 4:57e049bce51e | 855 | LogError("unable to set the send settle mode\r\n"); |
AzureIoTClient | 4:57e049bce51e | 856 | } |
AzureIoTClient | 4:57e049bce51e | 857 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_035: [pn_messenger_set_rcv_settle_mode shall be invoked.]*/ |
AzureIoTClient | 4:57e049bce51e | 858 | else if (pn_messenger_set_rcv_settle_mode(amqpState->messenger, PN_RCV_FIRST) != 0) |
AzureIoTClient | 4:57e049bce51e | 859 | { |
AzureIoTClient | 4:57e049bce51e | 860 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_036: [If pn_messenger_set_rcv_settle_mode returns a non-zero value then messenger initialization fails.]*/ |
AzureIoTClient | 4:57e049bce51e | 861 | LogError("unable to set the receive settle mode\r\n"); |
AzureIoTClient | 4:57e049bce51e | 862 | } |
AzureIoTClient | 4:57e049bce51e | 863 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_037: [pn_messenger_set_outgoing_window shall be invoked.]*/ |
AzureIoTClient | 4:57e049bce51e | 864 | else if (pn_messenger_set_outgoing_window(amqpState->messenger, PROTON_OUTGOING_WINDOW_SIZE) != 0) |
AzureIoTClient | 4:57e049bce51e | 865 | { |
AzureIoTClient | 4:57e049bce51e | 866 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_038: [If pn_messenger_set_outgoing_window returns a non-zero value then messenger initialization fails.]*/ |
AzureIoTClient | 4:57e049bce51e | 867 | LogError("unable to pn_messenger_set_outgoing_window\r\n"); |
AzureIoTClient | 4:57e049bce51e | 868 | } |
AzureIoTClient | 4:57e049bce51e | 869 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_039: [pn_messenger_set_incoming_window shall be invoked.]*/ |
AzureIoTClient | 4:57e049bce51e | 870 | else if (pn_messenger_set_incoming_window(amqpState->messenger, PROTON_INCOMING_WINDOW_SIZE) != 0) |
AzureIoTClient | 4:57e049bce51e | 871 | { |
AzureIoTClient | 4:57e049bce51e | 872 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_040: [If pn_messenger_set_incoming_window returns a non-zero value then messenger initialization fails.]*/ |
AzureIoTClient | 4:57e049bce51e | 873 | LogError("unable to pn_messenger_set_incoming_window\r\n"); |
AzureIoTClient | 4:57e049bce51e | 874 | } |
AzureIoTClient | 4:57e049bce51e | 875 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_156: [pn_messenger_subscribe will be invoked on the cbs address.]*/ |
AzureIoTClient | 4:57e049bce51e | 876 | else if ((amqpState->cbsSubscription = pn_messenger_subscribe(amqpState->messenger, (const char*)STRING_c_str(amqpState->cbsAddress))) == NULL) |
AzureIoTClient | 4:57e049bce51e | 877 | { |
AzureIoTClient | 4:57e049bce51e | 878 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_157: [If the pn_messenger_subscribe returns a non-zero value then messenger initialization fails.]*/ |
AzureIoTClient | 4:57e049bce51e | 879 | LogError("unable to create a subscription to the cbs address\r\n"); |
AzureIoTClient | 4:57e049bce51e | 880 | } |
AzureIoTClient | 4:57e049bce51e | 881 | else |
AzureIoTClient | 4:57e049bce51e | 882 | { |
AzureIoTClient | 4:57e049bce51e | 883 | //We've got the subscription. Now we attempt to connect to the cbs. |
AzureIoTClient | 4:57e049bce51e | 884 | /*Codes_During messenger initialization a state variable known as lastExpiryUsed will be set to zero.*/ |
AzureIoTClient | 4:57e049bce51e | 885 | amqpState->lastExpiryUsed = 0; |
AzureIoTClient | 4:57e049bce51e | 886 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_159: [renewIfNecessaryCBS will be invoked.]*/ |
AzureIoTClient | 4:57e049bce51e | 887 | renewIfNecessaryTheCBS(amqpState); |
AzureIoTClient | 4:57e049bce51e | 888 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_160: [If after renewIfNecessaryCBS is invoked, the state variable putTokenWasSuccessful is false then the messenger initialization fails.]*/ |
AzureIoTClient | 4:57e049bce51e | 889 | if (amqpState->putTokenWasSuccessful == true) |
AzureIoTClient | 4:57e049bce51e | 890 | { |
AzureIoTClient | 4:57e049bce51e | 891 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_041: [pn_messenger_subscribe shall be invoked.]*/ |
AzureIoTClient | 4:57e049bce51e | 892 | if ((amqpState->messageSubscription = pn_messenger_subscribe(amqpState->messenger, (const char*)STRING_c_str(amqpState->messageAddress))) == NULL) |
AzureIoTClient | 4:57e049bce51e | 893 | { |
AzureIoTClient | 4:57e049bce51e | 894 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_042: [If pn_messenger_subscribe returns a non-zero value then messenger initialization fails.]*/ |
AzureIoTClient | 4:57e049bce51e | 895 | LogError("unable to create a subscription to the message address\r\n"); |
AzureIoTClient | 4:57e049bce51e | 896 | } |
AzureIoTClient | 4:57e049bce51e | 897 | else |
AzureIoTClient | 4:57e049bce51e | 898 | { |
AzureIoTClient | 4:57e049bce51e | 899 | amqpState->messengerInitialized = true; |
AzureIoTClient | 4:57e049bce51e | 900 | } |
AzureIoTClient | 4:57e049bce51e | 901 | } |
AzureIoTClient | 4:57e049bce51e | 902 | } |
AzureIoTClient | 4:57e049bce51e | 903 | } |
AzureIoTClient | 4:57e049bce51e | 904 | return amqpState->messengerInitialized; |
AzureIoTClient | 4:57e049bce51e | 905 | } |
AzureIoTClient | 4:57e049bce51e | 906 | |
AzureIoTClient | 4:57e049bce51e | 907 | static amqp_batch_result prepareBatch(size_t maximumPayload, PDLIST_ENTRY waitingToSend, PDLIST_ENTRY messagesMakingUpBatch, size_t* payloadSize, const unsigned char** payload) |
AzureIoTClient | 4:57e049bce51e | 908 | { |
AzureIoTClient | 4:57e049bce51e | 909 | amqp_batch_result result; |
AzureIoTClient | 4:57e049bce51e | 910 | if (DList_IsListEmpty(waitingToSend)) |
AzureIoTClient | 4:57e049bce51e | 911 | { |
AzureIoTClient | 4:57e049bce51e | 912 | result = amqp_batch_nowork; |
AzureIoTClient | 4:57e049bce51e | 913 | } |
AzureIoTClient | 4:57e049bce51e | 914 | else |
AzureIoTClient | 4:57e049bce51e | 915 | { |
AzureIoTClient | 4:57e049bce51e | 916 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_074: [If there is a waitingToSend entry remove it from the list.]*/ |
AzureIoTClient | 4:57e049bce51e | 917 | IOTHUB_MESSAGE_LIST* currentMessage; |
AzureIoTClient | 4:57e049bce51e | 918 | IOTHUBMESSAGE_CONTENT_TYPE contentType; |
AzureIoTClient | 4:57e049bce51e | 919 | size_t messageLength; |
AzureIoTClient | 4:57e049bce51e | 920 | const unsigned char* messagePayload; |
AzureIoTClient | 4:57e049bce51e | 921 | DList_InitializeListHead(messagesMakingUpBatch); |
AzureIoTClient | 4:57e049bce51e | 922 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_086: [If the pn_messenger_put succeeds then DoWork will save off a tracker obtained by invoking pn_messenger_outgoing_tracker and save it the head of the availableWorkItems. The eventMessages in the AMQP_WORK_ITEM head will also contain the IOTHUB_MESSAGE_LIST.]*/ |
AzureIoTClient | 4:57e049bce51e | 923 | DList_InsertTailList(messagesMakingUpBatch, DList_RemoveHeadList(waitingToSend)); |
AzureIoTClient | 4:57e049bce51e | 924 | currentMessage = containingRecord(messagesMakingUpBatch->Flink, IOTHUB_MESSAGE_LIST, entry); |
AzureIoTClient | 4:57e049bce51e | 925 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_075: [Get its size and payload address by calling IoTHubMessage_GetByteArray.]*/ |
AzureIoTClient | 4:57e049bce51e | 926 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_008: [If the message type is IOTHUBMESSAGE_STRING then get the data by using IoTHubMesage_GetString and set the size to the length of the string.] */ |
AzureIoTClient | 4:57e049bce51e | 927 | contentType = IoTHubMessage_GetContentType(currentMessage->messageHandle); |
AzureIoTClient | 4:57e049bce51e | 928 | if (!( |
AzureIoTClient | 4:57e049bce51e | 929 | ((contentType == IOTHUBMESSAGE_BYTEARRAY) && (IoTHubMessage_GetByteArray(currentMessage->messageHandle, &messagePayload, &messageLength) == IOTHUB_MESSAGE_OK)) || |
AzureIoTClient | 4:57e049bce51e | 930 | ((contentType == IOTHUBMESSAGE_STRING) && ( |
AzureIoTClient | 4:57e049bce51e | 931 | messagePayload = (const unsigned char*)IoTHubMessage_GetString(currentMessage->messageHandle), |
AzureIoTClient | 4:57e049bce51e | 932 | (messageLength = (messagePayload == NULL) ? 0 : strlen((const char*)messagePayload)), |
AzureIoTClient | 4:57e049bce51e | 933 | messagePayload != NULL) |
AzureIoTClient | 4:57e049bce51e | 934 | ) |
AzureIoTClient | 4:57e049bce51e | 935 | )) |
AzureIoTClient | 4:57e049bce51e | 936 | { |
AzureIoTClient | 4:57e049bce51e | 937 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_076: [If that fails then fail that event.]*/ |
AzureIoTClient | 4:57e049bce51e | 938 | LogError("Failure in getting message content. Type had decimal value = %d\r\n", contentType); |
AzureIoTClient | 4:57e049bce51e | 939 | result = amqp_batch_error; |
AzureIoTClient | 4:57e049bce51e | 940 | } |
AzureIoTClient | 4:57e049bce51e | 941 | else if (messageLength > maximumPayload) |
AzureIoTClient | 4:57e049bce51e | 942 | { |
AzureIoTClient | 4:57e049bce51e | 943 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_077: [If the size of the payload is greater than the maximum (currently 256*1024) fail that event.]*/ |
AzureIoTClient | 4:57e049bce51e | 944 | LogError("Batch length is too large.\r\n"); |
AzureIoTClient | 4:57e049bce51e | 945 | result = amqp_batch_error; |
AzureIoTClient | 4:57e049bce51e | 946 | } |
AzureIoTClient | 4:57e049bce51e | 947 | else |
AzureIoTClient | 4:57e049bce51e | 948 | { |
AzureIoTClient | 4:57e049bce51e | 949 | *payloadSize = messageLength; |
AzureIoTClient | 4:57e049bce51e | 950 | *payload = messagePayload; |
AzureIoTClient | 4:57e049bce51e | 951 | result = amqp_batch_nobatch; |
AzureIoTClient | 4:57e049bce51e | 952 | } |
AzureIoTClient | 4:57e049bce51e | 953 | } |
AzureIoTClient | 4:57e049bce51e | 954 | return result; |
AzureIoTClient | 4:57e049bce51e | 955 | } |
AzureIoTClient | 4:57e049bce51e | 956 | |
AzureIoTClient | 4:57e049bce51e | 957 | static bool prepareSimpleProtonMessage(pn_message_t* message, STRING_HANDLE endpoint, size_t payloadLength, const unsigned char* payload) |
AzureIoTClient | 4:57e049bce51e | 958 | { |
AzureIoTClient | 4:57e049bce51e | 959 | bool result = false; |
AzureIoTClient | 4:57e049bce51e | 960 | pn_data_t * body; |
AzureIoTClient | 4:57e049bce51e | 961 | |
AzureIoTClient | 4:57e049bce51e | 962 | pn_message_clear(message); |
AzureIoTClient | 4:57e049bce51e | 963 | if (pn_message_set_address(message, STRING_c_str(endpoint)) != 0) |
AzureIoTClient | 4:57e049bce51e | 964 | { |
AzureIoTClient | 4:57e049bce51e | 965 | LogError("Unable to set amqp address for proton message.\r\n"); |
AzureIoTClient | 4:57e049bce51e | 966 | } |
AzureIoTClient | 4:57e049bce51e | 967 | else if (pn_message_set_inferred(message, true) != 0) |
AzureIoTClient | 4:57e049bce51e | 968 | { |
AzureIoTClient | 4:57e049bce51e | 969 | LogError("Unable to set inferred to true for proton message.\r\n"); |
AzureIoTClient | 4:57e049bce51e | 970 | } |
AzureIoTClient | 4:57e049bce51e | 971 | else if ((body = pn_message_body(message)) == NULL) |
AzureIoTClient | 4:57e049bce51e | 972 | { |
AzureIoTClient | 4:57e049bce51e | 973 | LogError("Unable to set proton message body.\r\n"); |
AzureIoTClient | 4:57e049bce51e | 974 | } |
AzureIoTClient | 4:57e049bce51e | 975 | else if (pn_data_put_binary(body, pn_bytes(payloadLength, (const char*)payload)) != 0) |
AzureIoTClient | 4:57e049bce51e | 976 | { |
AzureIoTClient | 4:57e049bce51e | 977 | LogError("Unable to set binary data for message body.\r\n"); |
AzureIoTClient | 4:57e049bce51e | 978 | } |
AzureIoTClient | 4:57e049bce51e | 979 | else |
AzureIoTClient | 4:57e049bce51e | 980 | { |
AzureIoTClient | 4:57e049bce51e | 981 | result = true; |
AzureIoTClient | 4:57e049bce51e | 982 | } |
AzureIoTClient | 4:57e049bce51e | 983 | |
AzureIoTClient | 4:57e049bce51e | 984 | return result; |
AzureIoTClient | 4:57e049bce51e | 985 | } |
AzureIoTClient | 4:57e049bce51e | 986 | |
AzureIoTClient | 4:57e049bce51e | 987 | static void rollbackEvent(PAMQP_TRANSPORT_STATE transportState) |
AzureIoTClient | 4:57e049bce51e | 988 | { |
AzureIoTClient | 4:57e049bce51e | 989 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_175: [The list of work in progress is tested to see if it is empty.]*/ |
AzureIoTClient | 4:57e049bce51e | 990 | while (!DList_IsListEmpty(&transportState->workInProgress)) |
AzureIoTClient | 4:57e049bce51e | 991 | { |
AzureIoTClient | 4:57e049bce51e | 992 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_176: [If it is not empty then the list is traversed.]*/ |
AzureIoTClient | 4:57e049bce51e | 993 | int settleResult; |
AzureIoTClient | 4:57e049bce51e | 994 | int protonResult; |
AzureIoTClient | 4:57e049bce51e | 995 | PDLIST_ENTRY newestEntry = transportState->workInProgress.Blink; |
AzureIoTClient | 4:57e049bce51e | 996 | PAMQP_WORK_ITEM currentWork = containingRecord(newestEntry, AMQP_WORK_ITEM, link); |
AzureIoTClient | 4:57e049bce51e | 997 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_179: [The status of the work item is acquired.]*/ |
AzureIoTClient | 4:57e049bce51e | 998 | protonResult = pn_messenger_status(transportState->messenger, currentWork->tracker); |
AzureIoTClient | 4:57e049bce51e | 999 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_180: [The item is settled.]*/ |
AzureIoTClient | 4:57e049bce51e | 1000 | if ((settleResult = pn_messenger_settle(transportState->messenger, currentWork->tracker, 0)) != 0) |
AzureIoTClient | 4:57e049bce51e | 1001 | { |
AzureIoTClient | 4:57e049bce51e | 1002 | LogError("Attempt to settle a tracker produced an error: %d\r\n", settleResult); |
AzureIoTClient | 4:57e049bce51e | 1003 | } |
AzureIoTClient | 4:57e049bce51e | 1004 | if ((protonResult == PN_STATUS_ACCEPTED) || |
AzureIoTClient | 4:57e049bce51e | 1005 | (protonResult == PN_STATUS_REJECTED) || |
AzureIoTClient | 4:57e049bce51e | 1006 | (protonResult == PN_STATUS_RELEASED)) |
AzureIoTClient | 4:57e049bce51e | 1007 | { |
AzureIoTClient | 4:57e049bce51e | 1008 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_182: [Items that have been settled by the service will have their IOTHUB_MESSAGE_LIST completed.]*/ |
AzureIoTClient | 4:57e049bce51e | 1009 | IoTHubClient_LL_SendComplete(transportState->savedClientHandle, ¤tWork->eventMessages, (protonResult == PN_STATUS_ACCEPTED) ? IOTHUB_BATCHSTATE_SUCCESS : IOTHUB_BATCHSTATE_FAILED); |
AzureIoTClient | 4:57e049bce51e | 1010 | } |
AzureIoTClient | 4:57e049bce51e | 1011 | else |
AzureIoTClient | 4:57e049bce51e | 1012 | { |
AzureIoTClient | 4:57e049bce51e | 1013 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_178: [Items that are not settled by the service will have their IOTHUB_MESSAGE_LIST removed from the work item and placed back on the waiting to send.]*/ |
AzureIoTClient | 4:57e049bce51e | 1014 | putSecondListAfterHeadOfFirst(transportState->waitingToSend->Flink, ¤tWork->eventMessages); |
AzureIoTClient | 4:57e049bce51e | 1015 | } |
AzureIoTClient | 4:57e049bce51e | 1016 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_181: [The work item is removed from the in progress list and placed on the available list.]*/ |
AzureIoTClient | 4:57e049bce51e | 1017 | DList_RemoveEntryList(newestEntry); |
AzureIoTClient | 4:57e049bce51e | 1018 | DList_InsertTailList(&transportState->availableWorkItems, newestEntry); |
AzureIoTClient | 4:57e049bce51e | 1019 | } |
AzureIoTClient | 4:57e049bce51e | 1020 | } |
AzureIoTClient | 4:57e049bce51e | 1021 | static void reclaimEventResources(PAMQP_TRANSPORT_STATE transportState) |
AzureIoTClient | 4:57e049bce51e | 1022 | { |
AzureIoTClient | 4:57e049bce51e | 1023 | PDLIST_ENTRY currentListEntry; |
AzureIoTClient | 4:57e049bce51e | 1024 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_064: [Obtain the head of the workInProgress]*/ |
AzureIoTClient | 4:57e049bce51e | 1025 | currentListEntry = transportState->workInProgress.Flink; |
AzureIoTClient | 4:57e049bce51e | 1026 | while (currentListEntry != &transportState->workInProgress) |
AzureIoTClient | 4:57e049bce51e | 1027 | { |
AzureIoTClient | 4:57e049bce51e | 1028 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_065: [While head != listhead reclaimEventResources will do as follows.] */ |
AzureIoTClient | 4:57e049bce51e | 1029 | int protonResult; |
AzureIoTClient | 4:57e049bce51e | 1030 | PAMQP_WORK_ITEM currentWork = containingRecord(currentListEntry, AMQP_WORK_ITEM, link); |
AzureIoTClient | 4:57e049bce51e | 1031 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_066: [Check the current status via pn_messenger_status via the tracker stored in the AMQP_WORK_ITEM.]*/ |
AzureIoTClient | 4:57e049bce51e | 1032 | if ((protonResult = pn_messenger_status(transportState->messenger, currentWork->tracker)) == PN_STATUS_PENDING) |
AzureIoTClient | 4:57e049bce51e | 1033 | { |
AzureIoTClient | 4:57e049bce51e | 1034 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_183: [if the amount of time that the work item has been pending exceeds a timeout stored in the work item, the messenger will be marked as not initialized.]*/ |
AzureIoTClient | 4:57e049bce51e | 1035 | size_t secondsSinceTheEpoch = (size_t)(difftime(get_time(NULL), EPOCH_TIME_T_VALUE) + 0); |
AzureIoTClient | 4:57e049bce51e | 1036 | if (currentWork->expiry < secondsSinceTheEpoch) |
AzureIoTClient | 4:57e049bce51e | 1037 | { |
AzureIoTClient | 4:57e049bce51e | 1038 | LogError("A event operation timed out.\r\n"); |
AzureIoTClient | 4:57e049bce51e | 1039 | transportState->messengerInitialized = false; |
AzureIoTClient | 4:57e049bce51e | 1040 | } |
AzureIoTClient | 4:57e049bce51e | 1041 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_067: [reclaimEventResources shall go onto the next work item if the proton status is PN_STATUS_PENDING.]*/ |
AzureIoTClient | 4:57e049bce51e | 1042 | currentListEntry = currentListEntry->Flink; |
AzureIoTClient | 4:57e049bce51e | 1043 | } |
AzureIoTClient | 4:57e049bce51e | 1044 | else |
AzureIoTClient | 4:57e049bce51e | 1045 | { |
AzureIoTClient | 4:57e049bce51e | 1046 | DLIST_ENTRY savedFromCurrentListEntry; |
AzureIoTClient | 4:57e049bce51e | 1047 | int settleResult; |
AzureIoTClient | 4:57e049bce51e | 1048 | // Yay! It's finished. Take it off the in progress list. |
AzureIoTClient | 4:57e049bce51e | 1049 | savedFromCurrentListEntry.Flink = currentListEntry->Flink; // We need to save this because it will be stomped on when we remove it from the work in progress. |
AzureIoTClient | 4:57e049bce51e | 1050 | |
AzureIoTClient | 4:57e049bce51e | 1051 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_068: [Otherwise reclaimEventResources will use the result of IOTHUB_BATCHSTATE_SUCCESS if the proton status was PN_STATUS_ACCEPTED.]*/ |
AzureIoTClient | 4:57e049bce51e | 1052 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_069: [Otherwise reclaimEventResources will use the status of IOTHUB_BATCHSTATE_FAILED if the proton status was NOT PN_STATUS_ACCEPTED.] */ |
AzureIoTClient | 4:57e049bce51e | 1053 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_070: [reclaimEventResources will invoke IotHubClient_SendBatchComplete with the IOTHUB_CLIENT_LL_HANDLE, supplied to reclaimEventResources, also pass the eventMessages stored in the AMQP_WORK_ITEM, and finally pass the above IOTHUB_BATCH status.]*/ |
AzureIoTClient | 4:57e049bce51e | 1054 | IoTHubClient_LL_SendComplete(transportState->savedClientHandle, ¤tWork->eventMessages, (protonResult == PN_STATUS_ACCEPTED) ? IOTHUB_BATCHSTATE_SUCCESS : IOTHUB_BATCHSTATE_FAILED); |
AzureIoTClient | 4:57e049bce51e | 1055 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_090: [reclaimEventResources will release the tracker by invoking pn_messenger_settle.]*/ |
AzureIoTClient | 4:57e049bce51e | 1056 | if ((settleResult = pn_messenger_settle(transportState->messenger, currentWork->tracker, 0)) != 0) |
AzureIoTClient | 4:57e049bce51e | 1057 | { |
AzureIoTClient | 4:57e049bce51e | 1058 | LogError("Attempt to settle a tracker produced an error: %d\r\n", settleResult); |
AzureIoTClient | 4:57e049bce51e | 1059 | } |
AzureIoTClient | 4:57e049bce51e | 1060 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_071: [reclaimEventResources will take the current work item from the workInProgress list and insert it on the tail of the availableWorkItems list.]*/ |
AzureIoTClient | 4:57e049bce51e | 1061 | DList_RemoveEntryList(currentListEntry); |
AzureIoTClient | 4:57e049bce51e | 1062 | DList_InsertTailList(&transportState->availableWorkItems, currentListEntry); |
AzureIoTClient | 4:57e049bce51e | 1063 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_072: [reclaimEventResources will continue to iterate over the workInProgress list until it returns to the list head.]*/ |
AzureIoTClient | 4:57e049bce51e | 1064 | currentListEntry = savedFromCurrentListEntry.Flink; |
AzureIoTClient | 4:57e049bce51e | 1065 | } |
AzureIoTClient | 4:57e049bce51e | 1066 | } |
AzureIoTClient | 4:57e049bce51e | 1067 | } |
AzureIoTClient | 4:57e049bce51e | 1068 | |
AzureIoTClient | 4:57e049bce51e | 1069 | static void sendEvent(PAMQP_TRANSPORT_STATE transportState) |
AzureIoTClient | 4:57e049bce51e | 1070 | { |
AzureIoTClient | 4:57e049bce51e | 1071 | while (!DList_IsListEmpty(&transportState->availableWorkItems)) |
AzureIoTClient | 4:57e049bce51e | 1072 | { |
AzureIoTClient | 4:57e049bce51e | 1073 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_073: [While the availableWorkItems is NOT empty loop.] */ |
AzureIoTClient | 4:57e049bce51e | 1074 | PAMQP_WORK_ITEM headOfAvailable = containingRecord(transportState->availableWorkItems.Flink, AMQP_WORK_ITEM, link); |
AzureIoTClient | 4:57e049bce51e | 1075 | size_t payloadSize; |
AzureIoTClient | 4:57e049bce51e | 1076 | const unsigned char* payloadAddress; |
AzureIoTClient | 4:57e049bce51e | 1077 | amqp_batch_result amqpBatchResult; |
AzureIoTClient | 4:57e049bce51e | 1078 | amqpBatchResult = prepareBatch(MAXIMUM_EVENT_LENGTH, transportState->waitingToSend, &headOfAvailable->eventMessages, &payloadSize, &payloadAddress); |
AzureIoTClient | 4:57e049bce51e | 1079 | if (amqpBatchResult == amqp_batch_nowork) |
AzureIoTClient | 4:57e049bce51e | 1080 | { |
AzureIoTClient | 4:57e049bce51e | 1081 | // no work to do. Stop trying. |
AzureIoTClient | 4:57e049bce51e | 1082 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_089: [If there is nothing on the list then go on to the message.]*/ |
AzureIoTClient | 4:57e049bce51e | 1083 | break; |
AzureIoTClient | 4:57e049bce51e | 1084 | } |
AzureIoTClient | 4:57e049bce51e | 1085 | else if (amqpBatchResult == amqp_batch_error) |
AzureIoTClient | 4:57e049bce51e | 1086 | { |
AzureIoTClient | 4:57e049bce51e | 1087 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_078: [If the event failed call invoke IotHubClient_SendBatchComplete as above utilizing the eventMessages list of the head of the availableWorkItems and a status of IOTHUB_BATCHSTATE_FAILED.]*/ |
AzureIoTClient | 4:57e049bce51e | 1088 | IoTHubClient_LL_SendComplete(transportState->savedClientHandle, &headOfAvailable->eventMessages, IOTHUB_BATCHSTATE_FAILED); |
AzureIoTClient | 4:57e049bce51e | 1089 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_079: [Continue this send loop.]*/ |
AzureIoTClient | 4:57e049bce51e | 1090 | } |
AzureIoTClient | 4:57e049bce51e | 1091 | else |
AzureIoTClient | 4:57e049bce51e | 1092 | { |
AzureIoTClient | 4:57e049bce51e | 1093 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_080: [If the event did not fail then prepare a proton message using the following proton API: pn_message_set_address, pn_message_set_inferred, pn_message_body and pn_data_put_binary with the above saved size and address values.]*/ |
AzureIoTClient | 4:57e049bce51e | 1094 | if (prepareSimpleProtonMessage(transportState->message, transportState->eventAddress, payloadSize, payloadAddress) == false) |
AzureIoTClient | 4:57e049bce51e | 1095 | { |
AzureIoTClient | 4:57e049bce51e | 1096 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_081: [sendEvent will take the item that had been the head of the waitingToSend and put it back at the head of the waitingToSend list.]*/ |
AzureIoTClient | 4:57e049bce51e | 1097 | putSecondListAfterHeadOfFirst(transportState->waitingToSend, &headOfAvailable->eventMessages); |
AzureIoTClient | 4:57e049bce51e | 1098 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_082: [sendEvent will then break out of the send loop.]*/ |
AzureIoTClient | 4:57e049bce51e | 1099 | break; |
AzureIoTClient | 4:57e049bce51e | 1100 | } |
AzureIoTClient | 4:57e049bce51e | 1101 | else if (setProperties(&headOfAvailable->eventMessages, transportState->message) != 0) |
AzureIoTClient | 4:57e049bce51e | 1102 | { |
AzureIoTClient | 4:57e049bce51e | 1103 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_081: [sendEvent will take the item that had been the head of the waitingToSend and put it back at the head of the waitingToSend list.]*/ |
AzureIoTClient | 4:57e049bce51e | 1104 | putSecondListAfterHeadOfFirst(transportState->waitingToSend, &headOfAvailable->eventMessages); |
AzureIoTClient | 4:57e049bce51e | 1105 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_082: [sendEvent will then break out of the send loop.]*/ |
AzureIoTClient | 4:57e049bce51e | 1106 | break; |
AzureIoTClient | 4:57e049bce51e | 1107 | } |
AzureIoTClient | 5:8d58d20699dd | 1108 | else if (setSendMessageIds(&headOfAvailable->eventMessages, transportState->message)) |
AzureIoTClient | 5:8d58d20699dd | 1109 | { |
AzureIoTClient | 5:8d58d20699dd | 1110 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_081: [sendEvent will take the item that had been the head of the waitingToSend and put it back at the head of the waitingToSend list.]*/ |
AzureIoTClient | 5:8d58d20699dd | 1111 | putSecondListAfterHeadOfFirst(transportState->waitingToSend, &headOfAvailable->eventMessages); |
AzureIoTClient | 5:8d58d20699dd | 1112 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_082: [sendEvent will then break out of the send loop.]*/ |
AzureIoTClient | 5:8d58d20699dd | 1113 | break; |
AzureIoTClient | 5:8d58d20699dd | 1114 | } |
AzureIoTClient | 4:57e049bce51e | 1115 | else |
AzureIoTClient | 4:57e049bce51e | 1116 | { |
AzureIoTClient | 4:57e049bce51e | 1117 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_083: [If the proton API is successful then invoke pn_messenger_put.]*/ |
AzureIoTClient | 4:57e049bce51e | 1118 | int protonResult; |
AzureIoTClient | 4:57e049bce51e | 1119 | protonResult = pn_messenger_put(transportState->messenger, transportState->message); |
AzureIoTClient | 4:57e049bce51e | 1120 | if (protonResult != 0) |
AzureIoTClient | 4:57e049bce51e | 1121 | { |
AzureIoTClient | 4:57e049bce51e | 1122 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_084: [If that fails, sendEvent will take the item that had been the head of the waitingToSend and put it back at the head of the waitingToSend list.]*/ |
AzureIoTClient | 4:57e049bce51e | 1123 | putSecondListAfterHeadOfFirst(transportState->waitingToSend, &headOfAvailable->eventMessages); |
AzureIoTClient | 4:57e049bce51e | 1124 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_085: [sendEvent will then break out of the send loop.]*/ |
AzureIoTClient | 4:57e049bce51e | 1125 | transportState->messengerInitialized = false; |
AzureIoTClient | 4:57e049bce51e | 1126 | break; |
AzureIoTClient | 4:57e049bce51e | 1127 | } |
AzureIoTClient | 4:57e049bce51e | 1128 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_184: [The work item will also contain a timeout that shall denote how long the transport will wait before initiating error recovery.]*/ |
AzureIoTClient | 4:57e049bce51e | 1129 | headOfAvailable->expiry = (size_t)(difftime(get_time(NULL), EPOCH_TIME_T_VALUE) + transportState->eventTimeout); |
AzureIoTClient | 4:57e049bce51e | 1130 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_086: [If the pn_messenger_put succeeds then sendEvent will save off a tracker obtained by invoking pn_messenger_outgoing_tracker and save it the head of the availableWorkItems. The eventMessages in the AMQP_WORK_ITEM head will also contain the IOTHUB_MESSAGE_LIST.]*/ |
AzureIoTClient | 4:57e049bce51e | 1131 | headOfAvailable->tracker = pn_messenger_outgoing_tracker(transportState->messenger); |
AzureIoTClient | 4:57e049bce51e | 1132 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_087: [The head of the availableWorkItems will be removed and placed on the workInProgress list.]*/ |
AzureIoTClient | 4:57e049bce51e | 1133 | DList_InsertTailList(&transportState->workInProgress, DList_RemoveHeadList(&transportState->availableWorkItems)); |
AzureIoTClient | 4:57e049bce51e | 1134 | } |
AzureIoTClient | 4:57e049bce51e | 1135 | } |
AzureIoTClient | 4:57e049bce51e | 1136 | } |
AzureIoTClient | 4:57e049bce51e | 1137 | } |
AzureIoTClient | 4:57e049bce51e | 1138 | static void processCBSReply(PAMQP_TRANSPORT_STATE transportState, pn_tracker_t tracker) |
AzureIoTClient | 4:57e049bce51e | 1139 | { |
AzureIoTClient | 4:57e049bce51e | 1140 | bool goodMessage = false; |
AzureIoTClient | 4:57e049bce51e | 1141 | pn_atom_t correlationId; |
AzureIoTClient | 4:57e049bce51e | 1142 | |
AzureIoTClient | 4:57e049bce51e | 1143 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_112: [The correlationId message property is obtained via a call to pn_message_get_correlation_id.]*/ |
AzureIoTClient | 4:57e049bce51e | 1144 | correlationId = pn_message_get_correlation_id(transportState->message); |
AzureIoTClient | 4:57e049bce51e | 1145 | |
AzureIoTClient | 4:57e049bce51e | 1146 | // |
AzureIoTClient | 4:57e049bce51e | 1147 | // If it isn't the one we're looking for, simply accept it, because we don't want to see it and we can't do anything with it. |
AzureIoTClient | 4:57e049bce51e | 1148 | // |
AzureIoTClient | 4:57e049bce51e | 1149 | |
AzureIoTClient | 4:57e049bce51e | 1150 | if (correlationId.u.as_ulong != transportState->sendTokenMessageId) |
AzureIoTClient | 4:57e049bce51e | 1151 | { |
AzureIoTClient | 4:57e049bce51e | 1152 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_113: [The id is compared to the state variable sendTokenMessageId. If there is no match, an error is logged, the message is abandoned and processCBSReply returns.]*/ |
AzureIoTClient | 4:57e049bce51e | 1153 | LogError("An old reply from a CBS renewal drifed in. Moving on\r\n"); |
AzureIoTClient | 4:57e049bce51e | 1154 | } |
AzureIoTClient | 4:57e049bce51e | 1155 | else |
AzureIoTClient | 4:57e049bce51e | 1156 | { |
AzureIoTClient | 4:57e049bce51e | 1157 | // |
AzureIoTClient | 4:57e049bce51e | 1158 | // This is the response we're looking for! |
AzureIoTClient | 4:57e049bce51e | 1159 | // |
AzureIoTClient | 4:57e049bce51e | 1160 | |
AzureIoTClient | 4:57e049bce51e | 1161 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_114: [Obtain the properties of the message by invoking pn_message_properties.]*/ |
AzureIoTClient | 4:57e049bce51e | 1162 | pn_data_t* propertyData = pn_message_properties(transportState->message); |
AzureIoTClient | 4:57e049bce51e | 1163 | |
AzureIoTClient | 4:57e049bce51e | 1164 | transportState->waitingForPutTokenReply = false; |
AzureIoTClient | 4:57e049bce51e | 1165 | if (propertyData == NULL) |
AzureIoTClient | 4:57e049bce51e | 1166 | { |
AzureIoTClient | 4:57e049bce51e | 1167 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_115: [If NULL then abandon the message and return.]*/ |
AzureIoTClient | 4:57e049bce51e | 1168 | LogError("Invalid response back from the CBS - no application properties\r\n"); |
AzureIoTClient | 4:57e049bce51e | 1169 | } |
AzureIoTClient | 4:57e049bce51e | 1170 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_116: [Advance to the first property of the message via pn_data_next.]*/ |
AzureIoTClient | 4:57e049bce51e | 1171 | else if (pn_data_next(propertyData) == false) |
AzureIoTClient | 4:57e049bce51e | 1172 | { |
AzureIoTClient | 4:57e049bce51e | 1173 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_117: [If this fails, abandon and return.]*/ |
AzureIoTClient | 4:57e049bce51e | 1174 | LogError("Invalid response back from the CBS - application properties malformed\r\n"); |
AzureIoTClient | 4:57e049bce51e | 1175 | } |
AzureIoTClient | 4:57e049bce51e | 1176 | else |
AzureIoTClient | 4:57e049bce51e | 1177 | { |
AzureIoTClient | 4:57e049bce51e | 1178 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_118: [Invoke pn_data_get_map to obtain the number of properties.]*/ |
AzureIoTClient | 4:57e049bce51e | 1179 | size_t numberOfProperties = pn_data_get_map(propertyData) / 2; |
AzureIoTClient | 4:57e049bce51e | 1180 | if (numberOfProperties < 1) // We MUST have at least the status property |
AzureIoTClient | 4:57e049bce51e | 1181 | { |
AzureIoTClient | 4:57e049bce51e | 1182 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_119: [If the number of properties is 0 then abandon and return.]*/ |
AzureIoTClient | 4:57e049bce51e | 1183 | LogError("Invalid response back from the CBS - invalid number of properties: %zu\r\n", pn_data_get_map(propertyData)); |
AzureIoTClient | 4:57e049bce51e | 1184 | } |
AzureIoTClient | 4:57e049bce51e | 1185 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_120: [Enter the property map by invoking pn_data_enter.]*/ |
AzureIoTClient | 4:57e049bce51e | 1186 | else if (pn_data_enter(propertyData) == false) |
AzureIoTClient | 4:57e049bce51e | 1187 | { |
AzureIoTClient | 4:57e049bce51e | 1188 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_121: [If that fails then abandon and return.]*/ |
AzureIoTClient | 4:57e049bce51e | 1189 | LogError("Invalid response back from the CBS - unable to access the sent properties\r\n"); |
AzureIoTClient | 4:57e049bce51e | 1190 | } |
AzureIoTClient | 4:57e049bce51e | 1191 | else |
AzureIoTClient | 4:57e049bce51e | 1192 | { |
AzureIoTClient | 4:57e049bce51e | 1193 | pn_bytes_t propertyName; |
AzureIoTClient | 4:57e049bce51e | 1194 | int32_t resultStatus; |
AzureIoTClient | 4:57e049bce51e | 1195 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_122: [Get the property name.]*/ |
AzureIoTClient | 4:57e049bce51e | 1196 | if (getMapString(propertyData, &propertyName) != 0) |
AzureIoTClient | 4:57e049bce51e | 1197 | { |
AzureIoTClient | 4:57e049bce51e | 1198 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_123: [If that fails then abandon and return.]*/ |
AzureIoTClient | 4:57e049bce51e | 1199 | LogError("Unable to acquire the status value from the cbs request\r\n"); |
AzureIoTClient | 4:57e049bce51e | 1200 | } |
AzureIoTClient | 4:57e049bce51e | 1201 | else if (strcmp(propertyName.start, PROTON_MAP_STATUS_CODE) != 0) |
AzureIoTClient | 4:57e049bce51e | 1202 | { |
AzureIoTClient | 4:57e049bce51e | 1203 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_124: [If the property name is not "status-code" than abandon and return.]*/ |
AzureIoTClient | 4:57e049bce51e | 1204 | LogError("Unknown application property returned: %s\r\n", propertyName.start); |
AzureIoTClient | 4:57e049bce51e | 1205 | } |
AzureIoTClient | 4:57e049bce51e | 1206 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_125: [Get the actual status code.]*/ |
AzureIoTClient | 4:57e049bce51e | 1207 | else if (getMapInt(propertyData, &resultStatus) != 0) |
AzureIoTClient | 4:57e049bce51e | 1208 | { |
AzureIoTClient | 4:57e049bce51e | 1209 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_126: [If that fails then abandon and return.]*/ |
AzureIoTClient | 4:57e049bce51e | 1210 | LogError("Unable to acquire the actual status of the CBS put token\r\n"); |
AzureIoTClient | 4:57e049bce51e | 1211 | } |
AzureIoTClient | 4:57e049bce51e | 1212 | else if (resultStatus != 200) |
AzureIoTClient | 4:57e049bce51e | 1213 | { |
AzureIoTClient | 4:57e049bce51e | 1214 | pn_bytes_t statusDescription; |
AzureIoTClient | 4:57e049bce51e | 1215 | // |
AzureIoTClient | 4:57e049bce51e | 1216 | // Yeah, it's not what we want to hear but it's not as though anything was wrong with |
AzureIoTClient | 4:57e049bce51e | 1217 | // the message. |
AzureIoTClient | 4:57e049bce51e | 1218 | // |
AzureIoTClient | 4:57e049bce51e | 1219 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_127: [If the status code is not equal to 200 then attempt to acquire the explanation from the properties.]*/ |
AzureIoTClient | 4:57e049bce51e | 1220 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_128: [Accept the message and return.]*/ |
AzureIoTClient | 4:57e049bce51e | 1221 | goodMessage = true; |
AzureIoTClient | 4:57e049bce51e | 1222 | LogError("The CBS put token operation failed: %d\r\n", resultStatus); |
AzureIoTClient | 4:57e049bce51e | 1223 | if (numberOfProperties >= 2) |
AzureIoTClient | 4:57e049bce51e | 1224 | { |
AzureIoTClient | 4:57e049bce51e | 1225 | // |
AzureIoTClient | 4:57e049bce51e | 1226 | // Well we have an error. See if the next property is the status description. |
AzureIoTClient | 4:57e049bce51e | 1227 | // If it is, then log the value as further explanation |
AzureIoTClient | 4:57e049bce51e | 1228 | // |
AzureIoTClient | 4:57e049bce51e | 1229 | if (getMapString(propertyData, &propertyName) == 0) |
AzureIoTClient | 4:57e049bce51e | 1230 | { |
AzureIoTClient | 4:57e049bce51e | 1231 | if (strcmp(propertyName.start, PROTON_MAP_STATUS_DESCRIPTION) == 0) |
AzureIoTClient | 4:57e049bce51e | 1232 | { |
AzureIoTClient | 4:57e049bce51e | 1233 | if (getMapString(propertyData, &statusDescription) == 0) |
AzureIoTClient | 4:57e049bce51e | 1234 | { |
AzureIoTClient | 4:57e049bce51e | 1235 | LogError("The CBS put token operation failed due to: %s\r\n", statusDescription.start); |
AzureIoTClient | 4:57e049bce51e | 1236 | } |
AzureIoTClient | 4:57e049bce51e | 1237 | } |
AzureIoTClient | 4:57e049bce51e | 1238 | } |
AzureIoTClient | 4:57e049bce51e | 1239 | } |
AzureIoTClient | 4:57e049bce51e | 1240 | } |
AzureIoTClient | 4:57e049bce51e | 1241 | else |
AzureIoTClient | 4:57e049bce51e | 1242 | { |
AzureIoTClient | 4:57e049bce51e | 1243 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_129: [If the status code was 200 then set the state variable putTokenWasSuccessful, accept the message and return.]*/ |
AzureIoTClient | 4:57e049bce51e | 1244 | goodMessage = true; |
AzureIoTClient | 4:57e049bce51e | 1245 | transportState->putTokenWasSuccessful = true; |
AzureIoTClient | 4:57e049bce51e | 1246 | } |
AzureIoTClient | 4:57e049bce51e | 1247 | } |
AzureIoTClient | 4:57e049bce51e | 1248 | } |
AzureIoTClient | 4:57e049bce51e | 1249 | } |
AzureIoTClient | 4:57e049bce51e | 1250 | if (goodMessage == true) |
AzureIoTClient | 4:57e049bce51e | 1251 | { |
AzureIoTClient | 4:57e049bce51e | 1252 | pn_messenger_accept(transportState->messenger, tracker, 0); |
AzureIoTClient | 4:57e049bce51e | 1253 | } |
AzureIoTClient | 4:57e049bce51e | 1254 | else |
AzureIoTClient | 4:57e049bce51e | 1255 | { |
AzureIoTClient | 4:57e049bce51e | 1256 | pn_messenger_release(transportState->messenger, tracker, 0); |
AzureIoTClient | 4:57e049bce51e | 1257 | } |
AzureIoTClient | 4:57e049bce51e | 1258 | pn_messenger_settle(transportState->messenger, tracker, 0); |
AzureIoTClient | 4:57e049bce51e | 1259 | } |
AzureIoTClient | 4:57e049bce51e | 1260 | |
AzureIoTClient | 4:57e049bce51e | 1261 | static void processMessage(PAMQP_TRANSPORT_STATE transportState, pn_tracker_t tracker) |
AzureIoTClient | 4:57e049bce51e | 1262 | { |
AzureIoTClient | 4:57e049bce51e | 1263 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_111: [If the state variable DoWork_PullMessage is false then the message shall be rejected and processMessage shall return.]*/ |
AzureIoTClient | 4:57e049bce51e | 1264 | if (transportState->DoWork_PullMessages == false) |
AzureIoTClient | 4:57e049bce51e | 1265 | { |
AzureIoTClient | 4:57e049bce51e | 1266 | // |
AzureIoTClient | 4:57e049bce51e | 1267 | // Nothing to do with the message. We reject it. In that way it can come back later. |
AzureIoTClient | 4:57e049bce51e | 1268 | // |
AzureIoTClient | 4:57e049bce51e | 1269 | pn_messenger_reject(transportState->messenger, tracker, 0); |
AzureIoTClient | 4:57e049bce51e | 1270 | pn_messenger_settle(transportState->messenger, tracker, 0); |
AzureIoTClient | 4:57e049bce51e | 1271 | } |
AzureIoTClient | 4:57e049bce51e | 1272 | else |
AzureIoTClient | 4:57e049bce51e | 1273 | { |
AzureIoTClient | 4:57e049bce51e | 1274 | pn_data_t *body; |
AzureIoTClient | 4:57e049bce51e | 1275 | |
AzureIoTClient | 4:57e049bce51e | 1276 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_050: [processMessage will acquire the body of the message by invoking pn_message_body.]*/ |
AzureIoTClient | 4:57e049bce51e | 1277 | if ((body = pn_message_body(transportState->message)) == NULL) |
AzureIoTClient | 4:57e049bce51e | 1278 | { |
AzureIoTClient | 4:57e049bce51e | 1279 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_051: [If the body is NULL then the message will be abandoned.]*/ |
AzureIoTClient | 4:57e049bce51e | 1280 | LogError("Failed to get the proton message body\r\n"); |
AzureIoTClient | 4:57e049bce51e | 1281 | pn_messenger_release(transportState->messenger, tracker, 0); |
AzureIoTClient | 4:57e049bce51e | 1282 | pn_messenger_settle(transportState->messenger, tracker, 0); |
AzureIoTClient | 4:57e049bce51e | 1283 | } |
AzureIoTClient | 4:57e049bce51e | 1284 | else |
AzureIoTClient | 4:57e049bce51e | 1285 | { |
AzureIoTClient | 4:57e049bce51e | 1286 | pn_bytes_t sizeAndData; |
AzureIoTClient | 4:57e049bce51e | 1287 | bool keepProcessingThisMessage = true; |
AzureIoTClient | 4:57e049bce51e | 1288 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_054: [processMessage will check for a null (note the lowercase) body by invoking pn_data_next.]*/ |
AzureIoTClient | 4:57e049bce51e | 1289 | if (pn_data_next(body) == false) |
AzureIoTClient | 4:57e049bce51e | 1290 | { |
AzureIoTClient | 4:57e049bce51e | 1291 | // |
AzureIoTClient | 4:57e049bce51e | 1292 | // We have a null body. This is reasonable. It should also be pointed out that |
AzureIoTClient | 4:57e049bce51e | 1293 | // pn_data_next call shouldn't be thought of as checking for the existence of something. |
AzureIoTClient | 4:57e049bce51e | 1294 | // It should be thought of as actually MOVING through a tree structure in the message. |
AzureIoTClient | 4:57e049bce51e | 1295 | // |
AzureIoTClient | 4:57e049bce51e | 1296 | sizeAndData.size = 0; |
AzureIoTClient | 4:57e049bce51e | 1297 | sizeAndData.start = NULL; |
AzureIoTClient | 4:57e049bce51e | 1298 | } |
AzureIoTClient | 4:57e049bce51e | 1299 | else |
AzureIoTClient | 4:57e049bce51e | 1300 | { |
AzureIoTClient | 4:57e049bce51e | 1301 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_055: [If the body is NOT null then processMessage will check that the body is of type PN_BINARY.]*/ |
AzureIoTClient | 4:57e049bce51e | 1302 | if (PN_BINARY != pn_data_type(body)) |
AzureIoTClient | 4:57e049bce51e | 1303 | { |
AzureIoTClient | 4:57e049bce51e | 1304 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_056: [processMessage will abandon the message with body types that are NOT PN_BINARY, the IOTHUB_MESSAGE_HANDLE will be destroyed and the loop will be continued.]*/ |
AzureIoTClient | 4:57e049bce51e | 1305 | LogError("Message received via AMQP was not PN_BINARY\r\n"); |
AzureIoTClient | 4:57e049bce51e | 1306 | sizeAndData.size = 0; // Compilers get upset. |
AzureIoTClient | 4:57e049bce51e | 1307 | sizeAndData.start = NULL; |
AzureIoTClient | 4:57e049bce51e | 1308 | pn_messenger_release(transportState->messenger, tracker, 0); |
AzureIoTClient | 4:57e049bce51e | 1309 | pn_messenger_settle(transportState->messenger, tracker, 0); |
AzureIoTClient | 4:57e049bce51e | 1310 | keepProcessingThisMessage = false; |
AzureIoTClient | 4:57e049bce51e | 1311 | } |
AzureIoTClient | 4:57e049bce51e | 1312 | else |
AzureIoTClient | 4:57e049bce51e | 1313 | { |
AzureIoTClient | 4:57e049bce51e | 1314 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_057: [The length and a pointer to the payload will be acquired and saved by invoking pn_data_get_binary.]*/ |
AzureIoTClient | 4:57e049bce51e | 1315 | sizeAndData = pn_data_get_binary(body); |
AzureIoTClient | 4:57e049bce51e | 1316 | } |
AzureIoTClient | 4:57e049bce51e | 1317 | } |
AzureIoTClient | 4:57e049bce51e | 1318 | if (keepProcessingThisMessage == true) |
AzureIoTClient | 4:57e049bce51e | 1319 | { |
AzureIoTClient | 4:57e049bce51e | 1320 | IOTHUB_MESSAGE_HANDLE ioTMessage; |
AzureIoTClient | 4:57e049bce51e | 1321 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_058: [The IOTHUB_MESSAGE_HANDLE will be set by invoking IotHubMessage_SetData with the previously saved length and pointer to payload.]*/ |
AzureIoTClient | 4:57e049bce51e | 1322 | if ((ioTMessage = IoTHubMessage_CreateFromByteArray((const unsigned char*)sizeAndData.start, sizeAndData.size)) == NULL) |
AzureIoTClient | 4:57e049bce51e | 1323 | { |
AzureIoTClient | 4:57e049bce51e | 1324 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_059: [processMessage will abandon the message if IotHubMessage_SetData returns any status other than IOTHUB_MESSAGE_OK.]*/ |
AzureIoTClient | 4:57e049bce51e | 1325 | LogError("Failed to set data for IoT hub message\r\n"); |
AzureIoTClient | 4:57e049bce51e | 1326 | pn_messenger_release(transportState->messenger, tracker, 0); |
AzureIoTClient | 4:57e049bce51e | 1327 | pn_messenger_settle(transportState->messenger, tracker, 0); |
AzureIoTClient | 4:57e049bce51e | 1328 | } |
AzureIoTClient | 4:57e049bce51e | 1329 | else if (cloneProperties(ioTMessage, transportState->message) != 0) |
AzureIoTClient | 4:57e049bce51e | 1330 | { |
AzureIoTClient | 5:8d58d20699dd | 1331 | LogError("Failed to clone properties for IoT hub message\r\n"); |
AzureIoTClient | 5:8d58d20699dd | 1332 | pn_messenger_release(transportState->messenger, tracker, 0); |
AzureIoTClient | 5:8d58d20699dd | 1333 | pn_messenger_settle(transportState->messenger, tracker, 0); |
AzureIoTClient | 5:8d58d20699dd | 1334 | } |
AzureIoTClient | 5:8d58d20699dd | 1335 | /* Codes_SRS_IOTHUBTRANSPORTTAMQP_07_003: [If the pn_message_get_id value is not NULL then the value will be set by calling IotHubMessage_SetMessageId.] */ |
AzureIoTClient | 5:8d58d20699dd | 1336 | else if (setRecvMessageIds(ioTMessage, transportState->message) != 0) |
AzureIoTClient | 5:8d58d20699dd | 1337 | { |
AzureIoTClient | 5:8d58d20699dd | 1338 | LogError("Failed to set MessageId for IoT hub message\r\n"); |
AzureIoTClient | 4:57e049bce51e | 1339 | pn_messenger_release(transportState->messenger, tracker, 0); |
AzureIoTClient | 4:57e049bce51e | 1340 | pn_messenger_settle(transportState->messenger, tracker, 0); |
AzureIoTClient | 4:57e049bce51e | 1341 | } |
AzureIoTClient | 4:57e049bce51e | 1342 | else |
AzureIoTClient | 4:57e049bce51e | 1343 | { |
AzureIoTClient | 4:57e049bce51e | 1344 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_060: [If IOTHUB_MESSAGE_OK had been returned IoTHubClient_LL_MessageCallback will be invoked with the IOTHUB_MESSAGE_HANDLE.]*/ |
AzureIoTClient | 4:57e049bce51e | 1345 | IOTHUBMESSAGE_DISPOSITION_RESULT upperLayerDisposition = IoTHubClient_LL_MessageCallback(transportState->savedClientHandle, ioTMessage); |
AzureIoTClient | 4:57e049bce51e | 1346 | |
AzureIoTClient | 4:57e049bce51e | 1347 | if (upperLayerDisposition == IOTHUBMESSAGE_ACCEPTED) |
AzureIoTClient | 4:57e049bce51e | 1348 | { |
AzureIoTClient | 4:57e049bce51e | 1349 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_006: [If IoTHubClient_MessageCallback returns IOTHUBMESSAGE_ACCEPTED value then the message will be accepted.] */ |
AzureIoTClient | 4:57e049bce51e | 1350 | pn_messenger_accept(transportState->messenger, tracker, 0); |
AzureIoTClient | 4:57e049bce51e | 1351 | } |
AzureIoTClient | 4:57e049bce51e | 1352 | else if (upperLayerDisposition == IOTHUBMESSAGE_ABANDONED) |
AzureIoTClient | 4:57e049bce51e | 1353 | { |
AzureIoTClient | 4:57e049bce51e | 1354 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_007: [If IoTHubClient_MessageCallback returns IOTHUBMESSAGE_ABANDONED value then the message will be abandoned.] */ |
AzureIoTClient | 4:57e049bce51e | 1355 | pn_messenger_release(transportState->messenger, tracker, 0); |
AzureIoTClient | 4:57e049bce51e | 1356 | } |
AzureIoTClient | 4:57e049bce51e | 1357 | else |
AzureIoTClient | 4:57e049bce51e | 1358 | { |
AzureIoTClient | 4:57e049bce51e | 1359 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_061: [If IoTHubClient_MessageCallback returns IOTHUBMESSAGE_REJECTED value then the message will be rejected.]*/ |
AzureIoTClient | 4:57e049bce51e | 1360 | pn_messenger_reject(transportState->messenger, tracker, 0); |
AzureIoTClient | 4:57e049bce51e | 1361 | } |
AzureIoTClient | 4:57e049bce51e | 1362 | pn_messenger_settle(transportState->messenger, tracker, 0); |
AzureIoTClient | 4:57e049bce51e | 1363 | } |
AzureIoTClient | 4:57e049bce51e | 1364 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_063: [processMessage will destroy the IOTHUB_MESSAGE_HANDLE by invoking IoTHubMessage_Destroy.]*/ |
AzureIoTClient | 4:57e049bce51e | 1365 | IoTHubMessage_Destroy(ioTMessage); |
AzureIoTClient | 4:57e049bce51e | 1366 | } |
AzureIoTClient | 4:57e049bce51e | 1367 | } |
AzureIoTClient | 4:57e049bce51e | 1368 | } |
AzureIoTClient | 4:57e049bce51e | 1369 | } |
AzureIoTClient | 4:57e049bce51e | 1370 | |
AzureIoTClient | 4:57e049bce51e | 1371 | static void processReceives(TRANSPORT_HANDLE handle) |
AzureIoTClient | 4:57e049bce51e | 1372 | { |
AzureIoTClient | 4:57e049bce51e | 1373 | PAMQP_TRANSPORT_STATE transportState = (PAMQP_TRANSPORT_STATE)handle; |
AzureIoTClient | 4:57e049bce51e | 1374 | int receiveResult; |
AzureIoTClient | 4:57e049bce51e | 1375 | |
AzureIoTClient | 4:57e049bce51e | 1376 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_043: [If DoWork_PullMessage is false then DoWork will cancel any preceding messages by invoking pn_messenger_recv.]*/ |
AzureIoTClient | 4:57e049bce51e | 1377 | |
AzureIoTClient | 4:57e049bce51e | 1378 | if ((transportState->DoWork_PullMessages == false) && (transportState->waitingForPutTokenReply == false)) |
AzureIoTClient | 4:57e049bce51e | 1379 | { |
AzureIoTClient | 4:57e049bce51e | 1380 | if ((receiveResult = pn_messenger_recv(transportState->messenger, 0)) != 0) |
AzureIoTClient | 4:57e049bce51e | 1381 | { |
AzureIoTClient | 4:57e049bce51e | 1382 | if ((receiveResult != PN_INPROGRESS) && (receiveResult != PN_TIMEOUT)) |
AzureIoTClient | 4:57e049bce51e | 1383 | { |
AzureIoTClient | 4:57e049bce51e | 1384 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_046: [If pn_messenger_recv fails it will be logged.]*/ |
AzureIoTClient | 4:57e049bce51e | 1385 | LogError("Error attempting to receive messages: %d\r\n", receiveResult); |
AzureIoTClient | 4:57e049bce51e | 1386 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_105: [processReceives will then go on to the next action of DoWork.]*/ |
AzureIoTClient | 4:57e049bce51e | 1387 | transportState->messengerInitialized = false; |
AzureIoTClient | 4:57e049bce51e | 1388 | } |
AzureIoTClient | 4:57e049bce51e | 1389 | } |
AzureIoTClient | 4:57e049bce51e | 1390 | } |
AzureIoTClient | 4:57e049bce51e | 1391 | else |
AzureIoTClient | 4:57e049bce51e | 1392 | { |
AzureIoTClient | 4:57e049bce51e | 1393 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_044: [If the DoWork_PullMessage or the waitingForPutTokenReply flag is true then processReceives will pull messages with pn_messenger_recv.]*/ |
AzureIoTClient | 4:57e049bce51e | 1394 | if ((receiveResult = pn_messenger_recv(transportState->messenger, PROTON_INCOMING_WINDOW_SIZE)) != 0) |
AzureIoTClient | 4:57e049bce51e | 1395 | { |
AzureIoTClient | 4:57e049bce51e | 1396 | if ((receiveResult != PN_INPROGRESS) && (receiveResult != PN_TIMEOUT)) |
AzureIoTClient | 4:57e049bce51e | 1397 | { |
AzureIoTClient | 4:57e049bce51e | 1398 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_046: [If pn_messenger_recv fails it will be logged.]*/ |
AzureIoTClient | 4:57e049bce51e | 1399 | LogError("Error attempting to receive messages: %d\r\n", receiveResult); |
AzureIoTClient | 4:57e049bce51e | 1400 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_105: [processReceives will then go on to the next action of DoWork.]*/ |
AzureIoTClient | 4:57e049bce51e | 1401 | transportState->messengerInitialized = false; |
AzureIoTClient | 4:57e049bce51e | 1402 | } |
AzureIoTClient | 4:57e049bce51e | 1403 | } |
AzureIoTClient | 4:57e049bce51e | 1404 | else |
AzureIoTClient | 4:57e049bce51e | 1405 | { |
AzureIoTClient | 4:57e049bce51e | 1406 | // |
AzureIoTClient | 4:57e049bce51e | 1407 | // If any are present we need to dispose of messages in the incoming queue. |
AzureIoTClient | 4:57e049bce51e | 1408 | // |
AzureIoTClient | 4:57e049bce51e | 1409 | size_t depthOfReceiveQueue; |
AzureIoTClient | 4:57e049bce51e | 1410 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_045: [processReceives will check the number of messages received by invoking pn_messenger_incoming.]*/ |
AzureIoTClient | 4:57e049bce51e | 1411 | for (depthOfReceiveQueue = pn_messenger_incoming(transportState->messenger); depthOfReceiveQueue > 0; depthOfReceiveQueue--) |
AzureIoTClient | 4:57e049bce51e | 1412 | { |
AzureIoTClient | 4:57e049bce51e | 1413 | int result; |
AzureIoTClient | 4:57e049bce51e | 1414 | pn_message_clear(transportState->message); |
AzureIoTClient | 4:57e049bce51e | 1415 | // |
AzureIoTClient | 4:57e049bce51e | 1416 | // Ok something is there. First thing is to get it. |
AzureIoTClient | 4:57e049bce51e | 1417 | // |
AzureIoTClient | 4:57e049bce51e | 1418 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_047: [processReceives will retrieve a message with pn_messenger_get.]*/ |
AzureIoTClient | 4:57e049bce51e | 1419 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_048: [If pn_messenger_get fails we log and break the receive loop.]*/ |
AzureIoTClient | 4:57e049bce51e | 1420 | if ((result = pn_messenger_get(transportState->messenger, transportState->message)) != 0) |
AzureIoTClient | 4:57e049bce51e | 1421 | { |
AzureIoTClient | 4:57e049bce51e | 1422 | // |
AzureIoTClient | 4:57e049bce51e | 1423 | // Well that's pretty odd. We shouldn't have gotten here if the message queue was empty! |
AzureIoTClient | 4:57e049bce51e | 1424 | // |
AzureIoTClient | 4:57e049bce51e | 1425 | LogError("Message reception queue unexpectedly empty! Code: %d\r\n", result); |
AzureIoTClient | 4:57e049bce51e | 1426 | break; |
AzureIoTClient | 4:57e049bce51e | 1427 | } |
AzureIoTClient | 4:57e049bce51e | 1428 | else |
AzureIoTClient | 4:57e049bce51e | 1429 | { |
AzureIoTClient | 4:57e049bce51e | 1430 | // |
AzureIoTClient | 4:57e049bce51e | 1431 | // We'll need a tracker to dispose of the message. We need the subscription to determine who should process it. |
AzureIoTClient | 4:57e049bce51e | 1432 | // |
AzureIoTClient | 4:57e049bce51e | 1433 | pn_subscription_t* theMessageSource; |
AzureIoTClient | 4:57e049bce51e | 1434 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_049: [processReceives shall acquire a tracker by invoking pn_messenger_incoming_tracker.]*/ |
AzureIoTClient | 4:57e049bce51e | 1435 | pn_tracker_t tracker = pn_messenger_incoming_tracker(transportState->messenger); |
AzureIoTClient | 4:57e049bce51e | 1436 | |
AzureIoTClient | 4:57e049bce51e | 1437 | // |
AzureIoTClient | 4:57e049bce51e | 1438 | // Currently we can receive two types of messages. We can recieve status messages from the $CBS that indicates the success |
AzureIoTClient | 4:57e049bce51e | 1439 | // or failure of renewing the SAS for the device. |
AzureIoTClient | 4:57e049bce51e | 1440 | // |
AzureIoTClient | 4:57e049bce51e | 1441 | // Otherwise we can recieve a simple message. We can get a pointer to the subscription from the tracker. We can compare |
AzureIoTClient | 4:57e049bce51e | 1442 | // that pointer to the subscriptions that we stored when we initialized the messenger. We then dispatch appropriately. |
AzureIoTClient | 4:57e049bce51e | 1443 | // |
AzureIoTClient | 4:57e049bce51e | 1444 | |
AzureIoTClient | 4:57e049bce51e | 1445 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_106: [processReceives acquires the subscription of the message by invoking pn_messneger_incoming_subscription.] This value is henceforth known as theMessageSource.*/ |
AzureIoTClient | 4:57e049bce51e | 1446 | theMessageSource = pn_messenger_incoming_subscription(transportState->messenger); |
AzureIoTClient | 4:57e049bce51e | 1447 | if (theMessageSource == transportState->messageSubscription) |
AzureIoTClient | 4:57e049bce51e | 1448 | { |
AzureIoTClient | 4:57e049bce51e | 1449 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_108: [If theMessageSource is equal to the state variable cbsSubscription invoke processCBSReply.]*/ |
AzureIoTClient | 4:57e049bce51e | 1450 | processMessage(transportState, tracker); |
AzureIoTClient | 4:57e049bce51e | 1451 | } |
AzureIoTClient | 4:57e049bce51e | 1452 | else if (theMessageSource == transportState->cbsSubscription) |
AzureIoTClient | 4:57e049bce51e | 1453 | { |
AzureIoTClient | 4:57e049bce51e | 1454 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_108: [If theMessageSource is equal to the state variable cbsSubscription invoke processCBSReply.]*/ |
AzureIoTClient | 4:57e049bce51e | 1455 | processCBSReply(transportState, tracker); |
AzureIoTClient | 4:57e049bce51e | 1456 | } |
AzureIoTClient | 4:57e049bce51e | 1457 | else |
AzureIoTClient | 4:57e049bce51e | 1458 | { |
AzureIoTClient | 4:57e049bce51e | 1459 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_109: [Otherwise, log an error and abandon the message so that it never returns.]*/ |
AzureIoTClient | 4:57e049bce51e | 1460 | LogError("Message received from an unknown sender\r\n"); |
AzureIoTClient | 4:57e049bce51e | 1461 | pn_messenger_release(transportState->messenger, tracker, 0); |
AzureIoTClient | 4:57e049bce51e | 1462 | pn_messenger_settle(transportState->messenger, tracker, 0); |
AzureIoTClient | 4:57e049bce51e | 1463 | } |
AzureIoTClient | 4:57e049bce51e | 1464 | } |
AzureIoTClient | 4:57e049bce51e | 1465 | } |
AzureIoTClient | 4:57e049bce51e | 1466 | pn_message_clear(transportState->message); |
AzureIoTClient | 4:57e049bce51e | 1467 | } |
AzureIoTClient | 4:57e049bce51e | 1468 | } |
AzureIoTClient | 4:57e049bce51e | 1469 | } |
AzureIoTClient | 4:57e049bce51e | 1470 | |
AzureIoTClient | 4:57e049bce51e | 1471 | static void clientTransportAMQP_DoWork(TRANSPORT_HANDLE handle, IOTHUB_CLIENT_LL_HANDLE iotHubClientHandle) |
AzureIoTClient | 4:57e049bce51e | 1472 | { |
AzureIoTClient | 4:57e049bce51e | 1473 | PAMQP_TRANSPORT_STATE transportState = (PAMQP_TRANSPORT_STATE)handle; |
AzureIoTClient | 4:57e049bce51e | 1474 | |
AzureIoTClient | 4:57e049bce51e | 1475 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_019: [clientTransportAMQP_Dowork shall do no work if handle is NULL.]*/ |
AzureIoTClient | 4:57e049bce51e | 1476 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_020: [clientTransportAMQP_Dowork shall do no work if iotHubClientHandle is NULL.]*/ |
AzureIoTClient | 4:57e049bce51e | 1477 | if (transportState && iotHubClientHandle) |
AzureIoTClient | 4:57e049bce51e | 1478 | { |
AzureIoTClient | 4:57e049bce51e | 1479 | transportState->savedClientHandle = iotHubClientHandle; |
AzureIoTClient | 4:57e049bce51e | 1480 | if (protonMessengerInit(transportState)) |
AzureIoTClient | 4:57e049bce51e | 1481 | { |
AzureIoTClient | 4:57e049bce51e | 1482 | renewIfNecessaryTheCBS(transportState); |
AzureIoTClient | 4:57e049bce51e | 1483 | if (transportState->messengerInitialized == true) reclaimEventResources(transportState); |
AzureIoTClient | 4:57e049bce51e | 1484 | if (transportState->messengerInitialized == true) sendEvent(transportState); |
AzureIoTClient | 4:57e049bce51e | 1485 | if (transportState->messengerInitialized == true) processReceives(transportState); |
AzureIoTClient | 4:57e049bce51e | 1486 | |
AzureIoTClient | 4:57e049bce51e | 1487 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_088: [pn_messenger_work shall be invoked following all the above actions.]*/ |
AzureIoTClient | 4:57e049bce51e | 1488 | if (transportState->messengerInitialized == true) checkForErrorsThenWork(transportState); |
AzureIoTClient | 4:57e049bce51e | 1489 | } |
AzureIoTClient | 4:57e049bce51e | 1490 | |
AzureIoTClient | 4:57e049bce51e | 1491 | // |
AzureIoTClient | 4:57e049bce51e | 1492 | // Try to get rid of any old messengers that we were NOT able to stop. |
AzureIoTClient | 4:57e049bce51e | 1493 | // |
AzureIoTClient | 4:57e049bce51e | 1494 | herdTheMessengers(transportState, false); |
AzureIoTClient | 4:57e049bce51e | 1495 | } |
AzureIoTClient | 4:57e049bce51e | 1496 | return; |
AzureIoTClient | 4:57e049bce51e | 1497 | } |
AzureIoTClient | 4:57e049bce51e | 1498 | |
AzureIoTClient | 4:57e049bce51e | 1499 | static bool createUrledDeviceId(PAMQP_TRANSPORT_STATE state, const IOTHUBTRANSPORT_CONFIG* config) |
AzureIoTClient | 4:57e049bce51e | 1500 | { |
AzureIoTClient | 4:57e049bce51e | 1501 | return ((state->urledDeviceId = URL_EncodeString(config->upperConfig->deviceId)) == NULL) ? (false) : (true); |
AzureIoTClient | 4:57e049bce51e | 1502 | } |
AzureIoTClient | 4:57e049bce51e | 1503 | static bool createDevicesPortionPath(PAMQP_TRANSPORT_STATE state, const char* host) |
AzureIoTClient | 4:57e049bce51e | 1504 | { |
AzureIoTClient | 4:57e049bce51e | 1505 | return (((state->devicesPortionPath = STRING_construct(host)) == NULL) || |
AzureIoTClient | 4:57e049bce51e | 1506 | (STRING_concat(state->devicesPortionPath, "/devices/") != 0) || |
AzureIoTClient | 4:57e049bce51e | 1507 | (STRING_concat_with_STRING(state->devicesPortionPath, state->urledDeviceId) != 0)) ? (false) : (true); |
AzureIoTClient | 4:57e049bce51e | 1508 | } |
AzureIoTClient | 4:57e049bce51e | 1509 | static bool createEventAddress(PAMQP_TRANSPORT_STATE state) |
AzureIoTClient | 4:57e049bce51e | 1510 | { |
AzureIoTClient | 4:57e049bce51e | 1511 | return (((state->eventAddress = STRING_construct(AMQPS_SCHEME)) == NULL) || |
AzureIoTClient | 4:57e049bce51e | 1512 | (STRING_concat_with_STRING(state->eventAddress, state->devicesPortionPath) != 0) || |
AzureIoTClient | 4:57e049bce51e | 1513 | (STRING_concat(state->eventAddress, EVENT_ENDPOINT) != 0)) ? (false) : (true); |
AzureIoTClient | 4:57e049bce51e | 1514 | } |
AzureIoTClient | 4:57e049bce51e | 1515 | |
AzureIoTClient | 4:57e049bce51e | 1516 | static bool createMessageAddress(PAMQP_TRANSPORT_STATE state) |
AzureIoTClient | 4:57e049bce51e | 1517 | { |
AzureIoTClient | 4:57e049bce51e | 1518 | return (((state->messageAddress = STRING_construct(AMQPS_SCHEME)) == NULL) || |
AzureIoTClient | 4:57e049bce51e | 1519 | (STRING_concat_with_STRING(state->messageAddress, state->devicesPortionPath) != 0) || |
AzureIoTClient | 4:57e049bce51e | 1520 | (STRING_concat(state->messageAddress, MESSAGE_ENDPOINT) != 0)) ? (false) : (true); |
AzureIoTClient | 4:57e049bce51e | 1521 | } |
AzureIoTClient | 4:57e049bce51e | 1522 | |
AzureIoTClient | 4:57e049bce51e | 1523 | static bool createCbsAddress(PAMQP_TRANSPORT_STATE state, const char* host) |
AzureIoTClient | 4:57e049bce51e | 1524 | { |
AzureIoTClient | 4:57e049bce51e | 1525 | return (((state->cbsAddress = STRING_construct(AMQPS_SCHEME)) == NULL) || |
AzureIoTClient | 4:57e049bce51e | 1526 | (STRING_concat(state->cbsAddress,host) != 0) || |
AzureIoTClient | 4:57e049bce51e | 1527 | (STRING_concat(state->cbsAddress, CBS_ENDPOINT) != 0)) ? (false) : (true); |
AzureIoTClient | 4:57e049bce51e | 1528 | } |
AzureIoTClient | 4:57e049bce51e | 1529 | |
AzureIoTClient | 4:57e049bce51e | 1530 | static STRING_HANDLE createHost(const IOTHUBTRANSPORT_CONFIG* config) |
AzureIoTClient | 4:57e049bce51e | 1531 | { |
AzureIoTClient | 4:57e049bce51e | 1532 | STRING_HANDLE result = NULL; |
AzureIoTClient | 4:57e049bce51e | 1533 | if (((result = STRING_construct(config->upperConfig->iotHubName)) == NULL) || |
AzureIoTClient | 4:57e049bce51e | 1534 | (STRING_concat(result, ".") != 0) || |
AzureIoTClient | 4:57e049bce51e | 1535 | (STRING_concat(result, config->upperConfig->iotHubSuffix) != 0)) |
AzureIoTClient | 4:57e049bce51e | 1536 | { |
AzureIoTClient | 4:57e049bce51e | 1537 | if (result != NULL) |
AzureIoTClient | 4:57e049bce51e | 1538 | { |
AzureIoTClient | 4:57e049bce51e | 1539 | STRING_delete(result); |
AzureIoTClient | 4:57e049bce51e | 1540 | result = NULL; |
AzureIoTClient | 4:57e049bce51e | 1541 | } |
AzureIoTClient | 4:57e049bce51e | 1542 | } |
AzureIoTClient | 4:57e049bce51e | 1543 | return result; |
AzureIoTClient | 4:57e049bce51e | 1544 | } |
AzureIoTClient | 4:57e049bce51e | 1545 | |
AzureIoTClient | 4:57e049bce51e | 1546 | static TRANSPORT_HANDLE clientTransportAMQP_Create(const IOTHUBTRANSPORT_CONFIG* config) |
AzureIoTClient | 4:57e049bce51e | 1547 | { |
AzureIoTClient | 4:57e049bce51e | 1548 | PAMQP_TRANSPORT_STATE amqpState = NULL; |
AzureIoTClient | 4:57e049bce51e | 1549 | if (!config) |
AzureIoTClient | 4:57e049bce51e | 1550 | { |
AzureIoTClient | 4:57e049bce51e | 1551 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_001: [If parameter config is NULL then clientTransportAMQP_Create shall fail and return NULL.]*/ |
AzureIoTClient | 4:57e049bce51e | 1552 | LogError("IoTHub amqp client tranport null configuration parameter.\r\n"); |
AzureIoTClient | 4:57e049bce51e | 1553 | } |
AzureIoTClient | 4:57e049bce51e | 1554 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_002: [clientTransportAMQP_Create shall fail and return NULL if any other fields of the config structure are NULL.]*/ |
AzureIoTClient | 4:57e049bce51e | 1555 | else if (config->upperConfig->protocol == NULL) |
AzureIoTClient | 4:57e049bce51e | 1556 | { |
AzureIoTClient | 4:57e049bce51e | 1557 | LogError("invalid configuration (NULL protocol detected)\r\n"); |
AzureIoTClient | 4:57e049bce51e | 1558 | } |
AzureIoTClient | 4:57e049bce51e | 1559 | else if (config->upperConfig->deviceId == NULL) |
AzureIoTClient | 4:57e049bce51e | 1560 | { |
AzureIoTClient | 4:57e049bce51e | 1561 | LogError("invalid configuration (NULL deviceId detected)\r\n"); |
AzureIoTClient | 4:57e049bce51e | 1562 | } |
AzureIoTClient | 4:57e049bce51e | 1563 | else if (config->upperConfig->deviceKey == NULL) |
AzureIoTClient | 4:57e049bce51e | 1564 | { |
AzureIoTClient | 4:57e049bce51e | 1565 | LogError("invalid configuration (NULL deviceKey detected)\r\n"); |
AzureIoTClient | 4:57e049bce51e | 1566 | } |
AzureIoTClient | 4:57e049bce51e | 1567 | else if (config->upperConfig->iotHubName == NULL) |
AzureIoTClient | 4:57e049bce51e | 1568 | { |
AzureIoTClient | 4:57e049bce51e | 1569 | LogError("invalid configuration (NULL iotHubName detected)\r\n"); |
AzureIoTClient | 4:57e049bce51e | 1570 | } |
AzureIoTClient | 4:57e049bce51e | 1571 | else if (config->upperConfig->iotHubSuffix == NULL) |
AzureIoTClient | 4:57e049bce51e | 1572 | { |
AzureIoTClient | 4:57e049bce51e | 1573 | LogError("invalid configuration (NULL iotHubSuffix detected)\r\n"); |
AzureIoTClient | 4:57e049bce51e | 1574 | } |
AzureIoTClient | 4:57e049bce51e | 1575 | else if (!config->waitingToSend) |
AzureIoTClient | 4:57e049bce51e | 1576 | { |
AzureIoTClient | 4:57e049bce51e | 1577 | LogError("invalid configuration (NULL waitingToSend list detected)\r\n"); |
AzureIoTClient | 4:57e049bce51e | 1578 | } |
AzureIoTClient | 4:57e049bce51e | 1579 | else if (strlen(config->upperConfig->deviceId) > 128U) |
AzureIoTClient | 4:57e049bce51e | 1580 | { |
AzureIoTClient | 4:57e049bce51e | 1581 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_016: [clientTransportAMQP_Create shall fail and return NULL if the deviceId length is greater than 128.]*/ |
AzureIoTClient | 4:57e049bce51e | 1582 | LogError("deviceName is too long\r\n"); |
AzureIoTClient | 4:57e049bce51e | 1583 | } |
AzureIoTClient | 4:57e049bce51e | 1584 | else if ((strlen(config->upperConfig->deviceId) == 0) || |
AzureIoTClient | 4:57e049bce51e | 1585 | (strlen(config->upperConfig->deviceKey) == 0) || |
AzureIoTClient | 4:57e049bce51e | 1586 | (strlen(config->upperConfig->iotHubName) == 0) || |
AzureIoTClient | 4:57e049bce51e | 1587 | (strlen(config->upperConfig->iotHubSuffix) == 0)) |
AzureIoTClient | 4:57e049bce51e | 1588 | { |
AzureIoTClient | 4:57e049bce51e | 1589 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_017: [clientTransportAMQP_Create shall fail and return NULL if any string config field is zero length.]*/ |
AzureIoTClient | 4:57e049bce51e | 1590 | LogError("zero length config parameter\r\n"); |
AzureIoTClient | 4:57e049bce51e | 1591 | } |
AzureIoTClient | 4:57e049bce51e | 1592 | else |
AzureIoTClient | 4:57e049bce51e | 1593 | { |
AzureIoTClient | 4:57e049bce51e | 1594 | amqpState = malloc(sizeof(AMQP_TRANSPORT_STATE)); |
AzureIoTClient | 4:57e049bce51e | 1595 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_003: [clientTransportAMQP_Create shall fail and return NULL if memory allocation of the transports basic internal state structures fails.]*/ |
AzureIoTClient | 4:57e049bce51e | 1596 | if (amqpState == NULL) |
AzureIoTClient | 4:57e049bce51e | 1597 | { |
AzureIoTClient | 4:57e049bce51e | 1598 | LogError("Could not allocate AMQP transport state\r\n"); |
AzureIoTClient | 4:57e049bce51e | 1599 | } |
AzureIoTClient | 4:57e049bce51e | 1600 | else |
AzureIoTClient | 4:57e049bce51e | 1601 | { |
AzureIoTClient | 4:57e049bce51e | 1602 | amqpState->messageAddress = NULL; |
AzureIoTClient | 4:57e049bce51e | 1603 | amqpState->eventAddress = NULL; |
AzureIoTClient | 4:57e049bce51e | 1604 | amqpState->urledDeviceId = NULL; |
AzureIoTClient | 4:57e049bce51e | 1605 | amqpState->devicesPortionPath = NULL; |
AzureIoTClient | 4:57e049bce51e | 1606 | amqpState->cbsAddress = NULL; |
AzureIoTClient | 4:57e049bce51e | 1607 | amqpState->deviceKey = NULL; |
AzureIoTClient | 4:57e049bce51e | 1608 | amqpState->savedClientHandle = NULL; |
AzureIoTClient | 4:57e049bce51e | 1609 | amqpState->messenger = NULL; |
AzureIoTClient | 4:57e049bce51e | 1610 | amqpState->messageSubscription = NULL; |
AzureIoTClient | 4:57e049bce51e | 1611 | amqpState->cbsSubscription = NULL; |
AzureIoTClient | 4:57e049bce51e | 1612 | amqpState->zeroLengthString = NULL; |
AzureIoTClient | 4:57e049bce51e | 1613 | amqpState->messengerInitialized = false; |
AzureIoTClient | 4:57e049bce51e | 1614 | amqpState->waitingForPutTokenReply = false; |
AzureIoTClient | 4:57e049bce51e | 1615 | amqpState->cbsRequestAcceptTime = CBS_DEFAULT_REQUEST_ACCEPT_TIME; |
AzureIoTClient | 4:57e049bce51e | 1616 | amqpState->cbsReplyTime = CBS_DEFAULT_REPLY_TIME; |
AzureIoTClient | 4:57e049bce51e | 1617 | amqpState->sasTokenLifetime = SAS_TOKEN_DEFAULT_LIFETIME; |
AzureIoTClient | 4:57e049bce51e | 1618 | amqpState->sasRefreshLine = SAS_TOKEN_DEFAULT_REFRESH_LINE; |
AzureIoTClient | 4:57e049bce51e | 1619 | amqpState->eventTimeout = EVENT_TIMEOUT_DEFAULT; |
AzureIoTClient | 4:57e049bce51e | 1620 | amqpState->waitingToSend = config->waitingToSend; |
AzureIoTClient | 4:57e049bce51e | 1621 | DList_InitializeListHead(&amqpState->workInProgress); |
AzureIoTClient | 4:57e049bce51e | 1622 | DList_InitializeListHead(&amqpState->availableWorkItems); |
AzureIoTClient | 4:57e049bce51e | 1623 | DList_InitializeListHead(&amqpState->messengerCorral); |
AzureIoTClient | 4:57e049bce51e | 1624 | amqpState->DoWork_PullMessages = false; |
AzureIoTClient | 5:8d58d20699dd | 1625 | amqpState->trustedCertificates = NULL; |
AzureIoTClient | 4:57e049bce51e | 1626 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_003: [clientTransportAMQP_Create shall fail and return NULL if memory allocation of the transports basic internal state structures fails.]*/ |
AzureIoTClient | 4:57e049bce51e | 1627 | if ((amqpState->message = pn_message()) == NULL) |
AzureIoTClient | 4:57e049bce51e | 1628 | { |
AzureIoTClient | 4:57e049bce51e | 1629 | LogError("Unable to preallocate the proton message!\r\n"); |
AzureIoTClient | 4:57e049bce51e | 1630 | clientTransportAMQP_Destroy(amqpState); |
AzureIoTClient | 4:57e049bce51e | 1631 | amqpState = NULL; |
AzureIoTClient | 4:57e049bce51e | 1632 | } |
AzureIoTClient | 4:57e049bce51e | 1633 | else |
AzureIoTClient | 4:57e049bce51e | 1634 | { |
AzureIoTClient | 4:57e049bce51e | 1635 | size_t workItemsToPreallocate; |
AzureIoTClient | 4:57e049bce51e | 1636 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_003: [clientTransportAMQP_Create shall fail and return NULL if memory allocation of the transports basic internal state structures fails.]*/ |
AzureIoTClient | 4:57e049bce51e | 1637 | for (workItemsToPreallocate = PROTON_EVENT_OUTGOING_WINDOW_SIZE; workItemsToPreallocate > 0; workItemsToPreallocate--) |
AzureIoTClient | 4:57e049bce51e | 1638 | { |
AzureIoTClient | 4:57e049bce51e | 1639 | PAMQP_WORK_ITEM currentItem = malloc(sizeof(AMQP_WORK_ITEM)); |
AzureIoTClient | 4:57e049bce51e | 1640 | if (!currentItem) |
AzureIoTClient | 4:57e049bce51e | 1641 | { |
AzureIoTClient | 4:57e049bce51e | 1642 | // |
AzureIoTClient | 4:57e049bce51e | 1643 | // Not a particularly good sign. Fail the create. |
AzureIoTClient | 4:57e049bce51e | 1644 | // |
AzureIoTClient | 4:57e049bce51e | 1645 | LogError("Unable to preallocate work item!\r\n"); |
AzureIoTClient | 4:57e049bce51e | 1646 | clientTransportAMQP_Destroy(amqpState); |
AzureIoTClient | 4:57e049bce51e | 1647 | amqpState = NULL; |
AzureIoTClient | 4:57e049bce51e | 1648 | break; |
AzureIoTClient | 4:57e049bce51e | 1649 | } |
AzureIoTClient | 4:57e049bce51e | 1650 | else |
AzureIoTClient | 4:57e049bce51e | 1651 | { |
AzureIoTClient | 4:57e049bce51e | 1652 | DList_InsertTailList(&amqpState->availableWorkItems, ¤tItem->link); |
AzureIoTClient | 4:57e049bce51e | 1653 | } |
AzureIoTClient | 4:57e049bce51e | 1654 | } |
AzureIoTClient | 4:57e049bce51e | 1655 | if (amqpState != NULL) |
AzureIoTClient | 4:57e049bce51e | 1656 | { |
AzureIoTClient | 4:57e049bce51e | 1657 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_095: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as host, from the following pieces: "config->iotHubName + "." + config->iotHubSuffix.]*/ |
AzureIoTClient | 4:57e049bce51e | 1658 | STRING_HANDLE host = createHost(config); |
AzureIoTClient | 4:57e049bce51e | 1659 | if (host != NULL) |
AzureIoTClient | 4:57e049bce51e | 1660 | { |
AzureIoTClient | 4:57e049bce51e | 1661 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_091: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as urledDeviceId, by url encoding the config->deviceId.]*/ |
AzureIoTClient | 4:57e049bce51e | 1662 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_093: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as devicePortionOfPath, from the following pieces: host + "/devices/" + urledDeviceId.]*/ |
AzureIoTClient | 4:57e049bce51e | 1663 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_097: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as eventAddress, from the following pieces: "amqps://" + devicesPortionOfPath + "/messages/events".]*/ |
AzureIoTClient | 4:57e049bce51e | 1664 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_099: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as messageAddress, from the following pieces: "amqps://" + devicesPortionOfPath + "/messages/devicebound".]*/ |
AzureIoTClient | 4:57e049bce51e | 1665 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_101: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as cbsAddress, from the following pieces: "amqps://" + host + "/$cbs".]*/ |
AzureIoTClient | 4:57e049bce51e | 1666 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_103: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as deviceKey, from the config->deviceKey.]*/ |
AzureIoTClient | 4:57e049bce51e | 1667 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_186: [An empty string shall be created. If this fails then clientTransportAMQP shall fail and return NULL.]*/ |
AzureIoTClient | 4:57e049bce51e | 1668 | if ((!createUrledDeviceId(amqpState, config)) || |
AzureIoTClient | 4:57e049bce51e | 1669 | (!createDevicesPortionPath(amqpState, STRING_c_str(host))) || |
AzureIoTClient | 4:57e049bce51e | 1670 | (!createEventAddress(amqpState)) || |
AzureIoTClient | 4:57e049bce51e | 1671 | (!createMessageAddress(amqpState)) || |
AzureIoTClient | 4:57e049bce51e | 1672 | (!createCbsAddress(amqpState, STRING_c_str(host))) || |
AzureIoTClient | 4:57e049bce51e | 1673 | ((amqpState->deviceKey = STRING_construct(config->upperConfig->deviceKey)) == NULL) || |
AzureIoTClient | 4:57e049bce51e | 1674 | ((amqpState->zeroLengthString = STRING_new()) == NULL)) |
AzureIoTClient | 4:57e049bce51e | 1675 | { |
AzureIoTClient | 4:57e049bce51e | 1676 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_092: [If creating the urledDeviceId fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/ |
AzureIoTClient | 4:57e049bce51e | 1677 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_094: [If creating the devicePortionOfPath fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/ |
AzureIoTClient | 4:57e049bce51e | 1678 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_098: [If creating the eventAddress fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/ |
AzureIoTClient | 4:57e049bce51e | 1679 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_100: [If creating the messageAddress fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/ |
AzureIoTClient | 4:57e049bce51e | 1680 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_102: [If creating the cbsAddress fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/ |
AzureIoTClient | 4:57e049bce51e | 1681 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_104: [If creating the deviceKey fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/ |
AzureIoTClient | 4:57e049bce51e | 1682 | clientTransportAMQP_Destroy(amqpState); |
AzureIoTClient | 4:57e049bce51e | 1683 | amqpState = NULL; |
AzureIoTClient | 4:57e049bce51e | 1684 | } |
AzureIoTClient | 4:57e049bce51e | 1685 | STRING_delete(host); |
AzureIoTClient | 4:57e049bce51e | 1686 | } |
AzureIoTClient | 4:57e049bce51e | 1687 | else |
AzureIoTClient | 4:57e049bce51e | 1688 | { |
AzureIoTClient | 4:57e049bce51e | 1689 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_096: [If creating the schemeAndHost fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/ |
AzureIoTClient | 4:57e049bce51e | 1690 | clientTransportAMQP_Destroy(amqpState); |
AzureIoTClient | 4:57e049bce51e | 1691 | amqpState = NULL; |
AzureIoTClient | 4:57e049bce51e | 1692 | } |
AzureIoTClient | 4:57e049bce51e | 1693 | } |
AzureIoTClient | 4:57e049bce51e | 1694 | } |
AzureIoTClient | 4:57e049bce51e | 1695 | } |
AzureIoTClient | 4:57e049bce51e | 1696 | } |
AzureIoTClient | 4:57e049bce51e | 1697 | return amqpState; |
AzureIoTClient | 4:57e049bce51e | 1698 | } |
AzureIoTClient | 4:57e049bce51e | 1699 | |
AzureIoTClient | 4:57e049bce51e | 1700 | static int clientTransportAMQP_Subscribe(TRANSPORT_HANDLE handle) |
AzureIoTClient | 4:57e049bce51e | 1701 | { |
AzureIoTClient | 4:57e049bce51e | 1702 | int result; |
AzureIoTClient | 4:57e049bce51e | 1703 | if (handle == NULL) |
AzureIoTClient | 4:57e049bce51e | 1704 | { |
AzureIoTClient | 4:57e049bce51e | 1705 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_013: [If the parameter handle is NULL then clientTransportAMQP_Subscribe shall fail and return a non-zero value.]*/ |
AzureIoTClient | 4:57e049bce51e | 1706 | LogError("Invalid handle to IoTHubClient amqp subscribe.\r\n"); |
AzureIoTClient | 4:57e049bce51e | 1707 | result = 1; |
AzureIoTClient | 4:57e049bce51e | 1708 | } |
AzureIoTClient | 4:57e049bce51e | 1709 | else |
AzureIoTClient | 4:57e049bce51e | 1710 | { |
AzureIoTClient | 4:57e049bce51e | 1711 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_014: [clientTransportAMQP_Subscribe shall succeed and return a value of zero.]*/ |
AzureIoTClient | 4:57e049bce51e | 1712 | PAMQP_TRANSPORT_STATE transportState = (PAMQP_TRANSPORT_STATE)handle; |
AzureIoTClient | 4:57e049bce51e | 1713 | transportState->DoWork_PullMessages = true; |
AzureIoTClient | 4:57e049bce51e | 1714 | result = 0; |
AzureIoTClient | 4:57e049bce51e | 1715 | } |
AzureIoTClient | 4:57e049bce51e | 1716 | return result; |
AzureIoTClient | 4:57e049bce51e | 1717 | } |
AzureIoTClient | 4:57e049bce51e | 1718 | |
AzureIoTClient | 4:57e049bce51e | 1719 | static void clientTransportAMQP_Unsubscribe(TRANSPORT_HANDLE handle) |
AzureIoTClient | 4:57e049bce51e | 1720 | { |
AzureIoTClient | 4:57e049bce51e | 1721 | PAMQP_TRANSPORT_STATE transportState = (PAMQP_TRANSPORT_STATE)handle; |
AzureIoTClient | 4:57e049bce51e | 1722 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_015: [clientTransportAMQP_Unsubscribe shall do nothing if handle is NULL.]*/ |
AzureIoTClient | 4:57e049bce51e | 1723 | if (transportState) |
AzureIoTClient | 4:57e049bce51e | 1724 | { |
AzureIoTClient | 4:57e049bce51e | 1725 | transportState->DoWork_PullMessages = false; |
AzureIoTClient | 4:57e049bce51e | 1726 | } |
AzureIoTClient | 4:57e049bce51e | 1727 | } |
AzureIoTClient | 4:57e049bce51e | 1728 | |
AzureIoTClient | 4:57e049bce51e | 1729 | static IOTHUB_CLIENT_RESULT clientTransportAMQP_GetSendStatus(TRANSPORT_HANDLE handle, IOTHUB_CLIENT_STATUS *iotHubClientStatus) |
AzureIoTClient | 4:57e049bce51e | 1730 | { |
AzureIoTClient | 4:57e049bce51e | 1731 | IOTHUB_CLIENT_RESULT result; |
AzureIoTClient | 4:57e049bce51e | 1732 | |
AzureIoTClient | 4:57e049bce51e | 1733 | /* Codes_SRS_IOTHUBTRANSPORTTAMQP_09_001: [clientTransportAMQP_GetSendStatus IoTHubTransportHttp_GetSendStatus shall return IOTHUB_CLIENT_INVALID_ARG if called with NULL parameter] */ |
AzureIoTClient | 4:57e049bce51e | 1734 | if (handle == NULL) |
AzureIoTClient | 4:57e049bce51e | 1735 | { |
AzureIoTClient | 4:57e049bce51e | 1736 | result = IOTHUB_CLIENT_INVALID_ARG; |
AzureIoTClient | 4:57e049bce51e | 1737 | LogError("Invalid handle to IoTHubClient AMQP transport instance.\r\n"); |
AzureIoTClient | 4:57e049bce51e | 1738 | } |
AzureIoTClient | 4:57e049bce51e | 1739 | else if (iotHubClientStatus == NULL) |
AzureIoTClient | 4:57e049bce51e | 1740 | { |
AzureIoTClient | 4:57e049bce51e | 1741 | result = IOTHUB_CLIENT_INVALID_ARG; |
AzureIoTClient | 4:57e049bce51e | 1742 | LogError("Invalid pointer to output parameter IOTHUB_CLIENT_STATUS.\r\n"); |
AzureIoTClient | 4:57e049bce51e | 1743 | } |
AzureIoTClient | 4:57e049bce51e | 1744 | else |
AzureIoTClient | 4:57e049bce51e | 1745 | { |
AzureIoTClient | 4:57e049bce51e | 1746 | PAMQP_TRANSPORT_STATE handleData = (PAMQP_TRANSPORT_STATE)handle; |
AzureIoTClient | 4:57e049bce51e | 1747 | |
AzureIoTClient | 4:57e049bce51e | 1748 | /* Codes_SRS_IOTHUBTRANSPORTTAMQP_09_002: [clientTransportAMQP_GetSendStatus shall return IOTHUB_CLIENT_OK and status IOTHUB_CLIENT_SEND_STATUS_IDLE if there are currently no event items to be sent or being sent] */ |
AzureIoTClient | 4:57e049bce51e | 1749 | if (!DList_IsListEmpty(handleData->waitingToSend) || !DList_IsListEmpty(&(handleData->workInProgress))) |
AzureIoTClient | 4:57e049bce51e | 1750 | { |
AzureIoTClient | 4:57e049bce51e | 1751 | *iotHubClientStatus = IOTHUB_CLIENT_SEND_STATUS_BUSY; |
AzureIoTClient | 4:57e049bce51e | 1752 | } |
AzureIoTClient | 4:57e049bce51e | 1753 | /* Codes_SRS_IOTHUBTRANSPORTTAMQP_09_003: [clientTransportAMQP_GetSendStatus shall return IOTHUB_CLIENT_OK and status IOTHUB_CLIENT_SEND_STATUS_BUSY if there are currently event items to be sent or being sent] */ |
AzureIoTClient | 4:57e049bce51e | 1754 | else |
AzureIoTClient | 4:57e049bce51e | 1755 | { |
AzureIoTClient | 4:57e049bce51e | 1756 | *iotHubClientStatus = IOTHUB_CLIENT_SEND_STATUS_IDLE; |
AzureIoTClient | 4:57e049bce51e | 1757 | } |
AzureIoTClient | 4:57e049bce51e | 1758 | |
AzureIoTClient | 4:57e049bce51e | 1759 | result = IOTHUB_CLIENT_OK; |
AzureIoTClient | 4:57e049bce51e | 1760 | } |
AzureIoTClient | 4:57e049bce51e | 1761 | |
AzureIoTClient | 4:57e049bce51e | 1762 | return result; |
AzureIoTClient | 4:57e049bce51e | 1763 | } |
AzureIoTClient | 4:57e049bce51e | 1764 | |
AzureIoTClient | 4:57e049bce51e | 1765 | static IOTHUB_CLIENT_RESULT clientTransportAMQP_SetOption(TRANSPORT_HANDLE handle, const char* option, const void* value) |
AzureIoTClient | 4:57e049bce51e | 1766 | { |
AzureIoTClient | 4:57e049bce51e | 1767 | IOTHUB_CLIENT_RESULT result; |
AzureIoTClient | 4:57e049bce51e | 1768 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_001: [If handle parameter is NULL then clientTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.] */ |
AzureIoTClient | 4:57e049bce51e | 1769 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_002: [If parameter optionName is NULL then clientTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.] */ |
AzureIoTClient | 4:57e049bce51e | 1770 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_003: [If parameter value is NULL then clientTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.] */ |
AzureIoTClient | 4:57e049bce51e | 1771 | |
AzureIoTClient | 5:8d58d20699dd | 1772 | if ( |
AzureIoTClient | 5:8d58d20699dd | 1773 | (handle == NULL) || |
AzureIoTClient | 5:8d58d20699dd | 1774 | (option == NULL) || |
AzureIoTClient | 5:8d58d20699dd | 1775 | (value == NULL) |
AzureIoTClient | 5:8d58d20699dd | 1776 | ) |
AzureIoTClient | 5:8d58d20699dd | 1777 | { |
AzureIoTClient | 5:8d58d20699dd | 1778 | result = IOTHUB_CLIENT_INVALID_ARG; |
AzureIoTClient | 5:8d58d20699dd | 1779 | LogError("invalid parameter (NULL) passed to clientTransportAMQP_SetOption\r\n"); |
AzureIoTClient | 5:8d58d20699dd | 1780 | } |
AzureIoTClient | 5:8d58d20699dd | 1781 | else |
AzureIoTClient | 5:8d58d20699dd | 1782 | { |
AzureIoTClient | 5:8d58d20699dd | 1783 | /* Codes_SRS_IOTHUBTRANSPORTTAMQP_02_005: [clientTransportAMQP_SetOption shall set the option "optionName" to *value.] */ |
AzureIoTClient | 5:8d58d20699dd | 1784 | /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_003: ["TrustedCerts"] */ |
AzureIoTClient | 5:8d58d20699dd | 1785 | if (strcmp("TrustedCerts", option) == 0) |
AzureIoTClient | 5:8d58d20699dd | 1786 | { |
AzureIoTClient | 5:8d58d20699dd | 1787 | PAMQP_TRANSPORT_STATE handleData = (PAMQP_TRANSPORT_STATE)handle; |
AzureIoTClient | 5:8d58d20699dd | 1788 | char* newTrsutedCertificates; |
AzureIoTClient | 4:57e049bce51e | 1789 | |
AzureIoTClient | 5:8d58d20699dd | 1790 | if (mallocAndStrcpy_s(&newTrsutedCertificates, (const char*)value) != 0) |
AzureIoTClient | 5:8d58d20699dd | 1791 | { |
AzureIoTClient | 5:8d58d20699dd | 1792 | /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_006: [If any other error occurs while setting the option, clientTransportAMQP_SetOption shall return IOTHUB_CLIENT_ERROR.] */ |
AzureIoTClient | 5:8d58d20699dd | 1793 | result = IOTHUB_CLIENT_ERROR; |
AzureIoTClient | 5:8d58d20699dd | 1794 | } |
AzureIoTClient | 5:8d58d20699dd | 1795 | else |
AzureIoTClient | 5:8d58d20699dd | 1796 | { |
AzureIoTClient | 5:8d58d20699dd | 1797 | /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_004: [Sets a string that should be used as trusted certificates by the transport, freeing any previous TrustedCerts option value.] */ |
AzureIoTClient | 5:8d58d20699dd | 1798 | if (handleData->trustedCertificates != NULL) |
AzureIoTClient | 5:8d58d20699dd | 1799 | { |
AzureIoTClient | 5:8d58d20699dd | 1800 | free(handleData->trustedCertificates); |
AzureIoTClient | 5:8d58d20699dd | 1801 | } |
AzureIoTClient | 4:57e049bce51e | 1802 | |
AzureIoTClient | 5:8d58d20699dd | 1803 | handleData->trustedCertificates = newTrsutedCertificates; |
AzureIoTClient | 5:8d58d20699dd | 1804 | result = IOTHUB_CLIENT_OK; |
AzureIoTClient | 5:8d58d20699dd | 1805 | } |
AzureIoTClient | 5:8d58d20699dd | 1806 | } |
AzureIoTClient | 5:8d58d20699dd | 1807 | else |
AzureIoTClient | 5:8d58d20699dd | 1808 | { |
AzureIoTClient | 5:8d58d20699dd | 1809 | /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_004: [If optionName is not an option supported then clientTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.]*/ |
AzureIoTClient | 5:8d58d20699dd | 1810 | result = IOTHUB_CLIENT_INVALID_ARG; |
AzureIoTClient | 5:8d58d20699dd | 1811 | } |
AzureIoTClient | 5:8d58d20699dd | 1812 | } |
AzureIoTClient | 4:57e049bce51e | 1813 | |
AzureIoTClient | 4:57e049bce51e | 1814 | return result; |
AzureIoTClient | 4:57e049bce51e | 1815 | } |
AzureIoTClient | 4:57e049bce51e | 1816 | |
AzureIoTClient | 4:57e049bce51e | 1817 | static TRANSPORT_PROVIDER myfunc = { |
AzureIoTClient | 4:57e049bce51e | 1818 | clientTransportAMQP_SetOption, |
AzureIoTClient | 4:57e049bce51e | 1819 | clientTransportAMQP_Create, clientTransportAMQP_Destroy, clientTransportAMQP_Subscribe, clientTransportAMQP_Unsubscribe, clientTransportAMQP_DoWork, clientTransportAMQP_GetSendStatus |
AzureIoTClient | 4:57e049bce51e | 1820 | }; |
AzureIoTClient | 4:57e049bce51e | 1821 | |
AzureIoTClient | 4:57e049bce51e | 1822 | extern const void* AMQP_Protocol(void) |
AzureIoTClient | 4:57e049bce51e | 1823 | { |
AzureIoTClient | 4:57e049bce51e | 1824 | return &myfunc; |
AzureIoTClient | 4:57e049bce51e | 1825 | } |