Microsoft Azure IoTHub client AMQP transport

Dependents:   sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more

This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks

Committer:
AzureIoTClient
Date:
Tue Sep 22 20:36:39 2015 -0700
Revision:
2:1e6040e0c035
Parent:
0:1b5f413bf328
New release

Who changed what in which revision?

UserRevisionLine numberNew contents of line
AzureIoTClient 0:1b5f413bf328 1 // Copyright (c) Microsoft. All rights reserved.
AzureIoTClient 0:1b5f413bf328 2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
AzureIoTClient 0:1b5f413bf328 3
AzureIoTClient 0:1b5f413bf328 4 #include <stdlib.h>
AzureIoTClient 0:1b5f413bf328 5 #ifdef _CRTDBG_MAP_ALLOC
AzureIoTClient 0:1b5f413bf328 6 #include <crtdbg.h>
AzureIoTClient 0:1b5f413bf328 7 #endif
AzureIoTClient 0:1b5f413bf328 8 #include "gballoc.h"
AzureIoTClient 0:1b5f413bf328 9
AzureIoTClient 0:1b5f413bf328 10 #include "proton/message.h"
AzureIoTClient 0:1b5f413bf328 11 #include "proton/messenger.h"
AzureIoTClient 0:1b5f413bf328 12
AzureIoTClient 0:1b5f413bf328 13 #include "iot_logging.h"
AzureIoTClient 0:1b5f413bf328 14 #include "strings.h"
AzureIoTClient 0:1b5f413bf328 15 #include "urlencode.h"
AzureIoTClient 0:1b5f413bf328 16 #include "doublylinkedlist.h"
AzureIoTClient 0:1b5f413bf328 17 #include "crt_abstractions.h"
AzureIoTClient 0:1b5f413bf328 18 #include "sastoken.h"
AzureIoTClient 0:1b5f413bf328 19
AzureIoTClient 0:1b5f413bf328 20 #include "iothub_client_ll.h"
AzureIoTClient 0:1b5f413bf328 21 #include "iothub_client_private.h"
AzureIoTClient 0:1b5f413bf328 22 #include "iothubtransportamqp.h"
AzureIoTClient 0:1b5f413bf328 23 #include "iothub_client_amqp_internal.h"
AzureIoTClient 0:1b5f413bf328 24
AzureIoTClient 0:1b5f413bf328 25 static const size_t NUMBER_OF_MESSENGER_STOP_TRIES = PROTON_MESSENGER_STOP_TRIES;
AzureIoTClient 0:1b5f413bf328 26 static const size_t MAXIMUM_EVENT_LENGTH = PROTON_MAXIMUM_EVENT_LENGTH;
AzureIoTClient 0:1b5f413bf328 27
AzureIoTClient 0:1b5f413bf328 28
AzureIoTClient 0:1b5f413bf328 29 #define amqp_batch_result_values \
AzureIoTClient 0:1b5f413bf328 30 amqp_batch_nowork, \
AzureIoTClient 0:1b5f413bf328 31 amqp_batch_batch, \
AzureIoTClient 0:1b5f413bf328 32 amqp_batch_nobatch, \
AzureIoTClient 0:1b5f413bf328 33 amqp_batch_error \
AzureIoTClient 0:1b5f413bf328 34
AzureIoTClient 0:1b5f413bf328 35 DEFINE_ENUM(amqp_batch_result, amqp_batch_result_values);
AzureIoTClient 0:1b5f413bf328 36
AzureIoTClient 0:1b5f413bf328 37 static const int IO_PROCESSING_YIELD_IN_MILLISECONDS = PROTON_PROCESSING_YIELD_IN_MILLISECONDS;
AzureIoTClient 0:1b5f413bf328 38
AzureIoTClient 0:1b5f413bf328 39 static void processReceives(TRANSPORT_HANDLE handle);
AzureIoTClient 0:1b5f413bf328 40
AzureIoTClient 0:1b5f413bf328 41 typedef struct MESSENGER_CONTAINER_TAG
AzureIoTClient 0:1b5f413bf328 42 {
AzureIoTClient 0:1b5f413bf328 43 //
AzureIoTClient 0:1b5f413bf328 44 // This links all of these records onto the messageCorral in the transport state.
AzureIoTClient 0:1b5f413bf328 45 DLIST_ENTRY entry;
AzureIoTClient 0:1b5f413bf328 46
AzureIoTClient 0:1b5f413bf328 47 //
AzureIoTClient 0:1b5f413bf328 48 // This is a messenger that couldn't be stopped. (You have to admire its initiative.) We'll keep trying to stop it when
AzureIoTClient 0:1b5f413bf328 49 // we have a spare moment.
AzureIoTClient 0:1b5f413bf328 50 pn_messenger_t* messengerThatDidNotStop;
AzureIoTClient 0:1b5f413bf328 51 } MESSENGER_CONTAINER, *PMESSENGER_CONTAINER;
AzureIoTClient 0:1b5f413bf328 52
AzureIoTClient 0:1b5f413bf328 53 //
AzureIoTClient 0:1b5f413bf328 54 // Transport specific structure used to contain everything needed to send an event item.
AzureIoTClient 0:1b5f413bf328 55 //
AzureIoTClient 0:1b5f413bf328 56 typedef struct AMQP_WORK_ITEM_TAG
AzureIoTClient 0:1b5f413bf328 57 {
AzureIoTClient 0:1b5f413bf328 58 //
AzureIoTClient 0:1b5f413bf328 59 // At what time should we give up on sending this message.
AzureIoTClient 0:1b5f413bf328 60 size_t expiry;
AzureIoTClient 0:1b5f413bf328 61
AzureIoTClient 0:1b5f413bf328 62 //
AzureIoTClient 0:1b5f413bf328 63 // Used by the transport to determine if the IO operation is complete.
AzureIoTClient 0:1b5f413bf328 64 pn_tracker_t tracker;
AzureIoTClient 0:1b5f413bf328 65
AzureIoTClient 0:1b5f413bf328 66 //
AzureIoTClient 0:1b5f413bf328 67 // This acts as a listhead for all of the messages that make up one transfer frame.
AzureIoTClient 0:1b5f413bf328 68 // If no batching is being done then this list will have at most one item. If batching
AzureIoTClient 0:1b5f413bf328 69 // is being done then this could be lengthy!
AzureIoTClient 0:1b5f413bf328 70 DLIST_ENTRY eventMessages;
AzureIoTClient 0:1b5f413bf328 71
AzureIoTClient 0:1b5f413bf328 72 //
AzureIoTClient 0:1b5f413bf328 73 // This is used to hold this work item either on the work in progress list or on the available work item list.
AzureIoTClient 0:1b5f413bf328 74 DLIST_ENTRY link;
AzureIoTClient 0:1b5f413bf328 75 } AMQP_WORK_ITEM, *PAMQP_WORK_ITEM;
AzureIoTClient 0:1b5f413bf328 76
AzureIoTClient 0:1b5f413bf328 77 static void rollbackEvent(PAMQP_TRANSPORT_STATE transportState);
AzureIoTClient 0:1b5f413bf328 78
AzureIoTClient 0:1b5f413bf328 79 static bool checkForErrorsThenWork(PAMQP_TRANSPORT_STATE transportState)
AzureIoTClient 0:1b5f413bf328 80 {
AzureIoTClient 0:1b5f413bf328 81 pn_error_t* errorStruct;
AzureIoTClient 0:1b5f413bf328 82 bool result;
AzureIoTClient 0:1b5f413bf328 83 errorStruct = pn_messenger_error(transportState->messenger);
AzureIoTClient 0:1b5f413bf328 84 if (pn_error_code(errorStruct) != 0)
AzureIoTClient 0:1b5f413bf328 85 {
AzureIoTClient 0:1b5f413bf328 86 LogError("An error was detected in the networking manager: %d\r\n", pn_error_code(errorStruct));
AzureIoTClient 0:1b5f413bf328 87 transportState->messengerInitialized = false;
AzureIoTClient 0:1b5f413bf328 88 result = false;
AzureIoTClient 0:1b5f413bf328 89 }
AzureIoTClient 0:1b5f413bf328 90 else
AzureIoTClient 0:1b5f413bf328 91 {
AzureIoTClient 0:1b5f413bf328 92 int protonResult;
AzureIoTClient 0:1b5f413bf328 93 protonResult = pn_messenger_work(transportState->messenger, IO_PROCESSING_YIELD_IN_MILLISECONDS);
AzureIoTClient 0:1b5f413bf328 94 if ((protonResult < 0) && (protonResult != PN_TIMEOUT))
AzureIoTClient 0:1b5f413bf328 95 {
AzureIoTClient 0:1b5f413bf328 96 LogError("A networking error occured. Proton error code: %d\r\n", protonResult);
AzureIoTClient 0:1b5f413bf328 97 transportState->messengerInitialized = false;
AzureIoTClient 0:1b5f413bf328 98 result = false;
AzureIoTClient 0:1b5f413bf328 99 }
AzureIoTClient 0:1b5f413bf328 100 else
AzureIoTClient 0:1b5f413bf328 101 {
AzureIoTClient 0:1b5f413bf328 102 result = true;
AzureIoTClient 0:1b5f413bf328 103 }
AzureIoTClient 0:1b5f413bf328 104 }
AzureIoTClient 0:1b5f413bf328 105 return result;
AzureIoTClient 0:1b5f413bf328 106 }
AzureIoTClient 0:1b5f413bf328 107 /* Code_SRS_IOTHUBTRANSPORTTAMQP_07_001: [All IoTHubMessage_Properties shall be enumerated and entered in the pn_message_properties Map.] */
AzureIoTClient 0:1b5f413bf328 108 static int setProperties(PDLIST_ENTRY messagesMakingUpBatch, pn_message_t* msg)
AzureIoTClient 0:1b5f413bf328 109 {
AzureIoTClient 0:1b5f413bf328 110 int result = 0;
AzureIoTClient 0:1b5f413bf328 111 size_t index;
AzureIoTClient 0:1b5f413bf328 112
AzureIoTClient 0:1b5f413bf328 113 pn_data_t* propData;
AzureIoTClient 0:1b5f413bf328 114 const char*const* keys;
AzureIoTClient 0:1b5f413bf328 115 const char*const* values;
AzureIoTClient 0:1b5f413bf328 116 size_t propertyCount = 0;
AzureIoTClient 0:1b5f413bf328 117
AzureIoTClient 0:1b5f413bf328 118 IOTHUB_MESSAGE_LIST* currentMessage = containingRecord(messagesMakingUpBatch->Flink, IOTHUB_MESSAGE_LIST, entry);
AzureIoTClient 0:1b5f413bf328 119 MAP_HANDLE mapProperties = IoTHubMessage_Properties(currentMessage->messageHandle);
AzureIoTClient 0:1b5f413bf328 120 if (mapProperties == NULL)
AzureIoTClient 0:1b5f413bf328 121 {
AzureIoTClient 0:1b5f413bf328 122 LogError("Failure IoTHubMessage_Properties.\r\n");
AzureIoTClient 0:1b5f413bf328 123 result = __LINE__;
AzureIoTClient 0:1b5f413bf328 124 }
AzureIoTClient 0:1b5f413bf328 125 else if (Map_GetInternals(mapProperties, &keys, &values, &propertyCount) != MAP_OK)
AzureIoTClient 0:1b5f413bf328 126 {
AzureIoTClient 0:1b5f413bf328 127 LogError("Failure retrieving Map_GetInternals.\r\n");
AzureIoTClient 0:1b5f413bf328 128 result = __LINE__;
AzureIoTClient 0:1b5f413bf328 129 }
AzureIoTClient 0:1b5f413bf328 130 else if (propertyCount > 0)
AzureIoTClient 0:1b5f413bf328 131 {
AzureIoTClient 0:1b5f413bf328 132 if ( (propData = pn_message_properties(msg) ) == NULL)
AzureIoTClient 0:1b5f413bf328 133 {
AzureIoTClient 0:1b5f413bf328 134 LogError("Failure pn_message_properties.\r\n");
AzureIoTClient 0:1b5f413bf328 135 result = __LINE__;
AzureIoTClient 0:1b5f413bf328 136 }
AzureIoTClient 0:1b5f413bf328 137 else if (pn_data_put_map(propData) != 0)
AzureIoTClient 0:1b5f413bf328 138 {
AzureIoTClient 0:1b5f413bf328 139 LogError("Failure pn_data_put_map.\r\n");
AzureIoTClient 0:1b5f413bf328 140 result = __LINE__;
AzureIoTClient 0:1b5f413bf328 141 }
AzureIoTClient 0:1b5f413bf328 142 else if (pn_data_enter(propData) == false )
AzureIoTClient 0:1b5f413bf328 143 {
AzureIoTClient 0:1b5f413bf328 144 LogError("Failure pn_data_enter.\r\n");
AzureIoTClient 0:1b5f413bf328 145 result = __LINE__;
AzureIoTClient 0:1b5f413bf328 146 }
AzureIoTClient 0:1b5f413bf328 147 else
AzureIoTClient 0:1b5f413bf328 148 {
AzureIoTClient 0:1b5f413bf328 149 for (index = 0; index < propertyCount; index++)
AzureIoTClient 0:1b5f413bf328 150 {
AzureIoTClient 0:1b5f413bf328 151 if (pn_data_put_symbol(propData, pn_bytes(strlen(keys[index]), keys[index])) != 0)
AzureIoTClient 0:1b5f413bf328 152 {
AzureIoTClient 0:1b5f413bf328 153 LogError("Failure pn_data_put_symbol.\r\n");
AzureIoTClient 0:1b5f413bf328 154 break;
AzureIoTClient 0:1b5f413bf328 155 }
AzureIoTClient 0:1b5f413bf328 156 else if (pn_data_put_string(propData, pn_bytes(strlen(values[index]), values[index])) != 0)
AzureIoTClient 0:1b5f413bf328 157 {
AzureIoTClient 0:1b5f413bf328 158 LogError("Failure pn_data_put_string.\r\n");
AzureIoTClient 0:1b5f413bf328 159 break;
AzureIoTClient 0:1b5f413bf328 160 }
AzureIoTClient 0:1b5f413bf328 161 }
AzureIoTClient 0:1b5f413bf328 162 if (index != propertyCount)
AzureIoTClient 0:1b5f413bf328 163 {
AzureIoTClient 0:1b5f413bf328 164 result = __LINE__;
AzureIoTClient 0:1b5f413bf328 165 }
AzureIoTClient 0:1b5f413bf328 166 else if (!pn_data_exit(propData) )
AzureIoTClient 0:1b5f413bf328 167 {
AzureIoTClient 0:1b5f413bf328 168 LogError("Failure pn_data_exit.\r\n");
AzureIoTClient 0:1b5f413bf328 169 result = __LINE__;
AzureIoTClient 0:1b5f413bf328 170 }
AzureIoTClient 0:1b5f413bf328 171 else
AzureIoTClient 0:1b5f413bf328 172 {
AzureIoTClient 0:1b5f413bf328 173 result = 0;
AzureIoTClient 0:1b5f413bf328 174 }
AzureIoTClient 0:1b5f413bf328 175 }
AzureIoTClient 0:1b5f413bf328 176 }
AzureIoTClient 0:1b5f413bf328 177 return result;
AzureIoTClient 0:1b5f413bf328 178 }
AzureIoTClient 0:1b5f413bf328 179
AzureIoTClient 0:1b5f413bf328 180 static int getMapString(pn_data_t* propData, pn_bytes_t* mapValue)
AzureIoTClient 0:1b5f413bf328 181 {
AzureIoTClient 0:1b5f413bf328 182 int result;
AzureIoTClient 0:1b5f413bf328 183 if (pn_data_next(propData) == true)
AzureIoTClient 0:1b5f413bf328 184 {
AzureIoTClient 0:1b5f413bf328 185 if (pn_data_type(propData) == PN_STRING)
AzureIoTClient 0:1b5f413bf328 186 {
AzureIoTClient 0:1b5f413bf328 187 *mapValue = pn_data_get_string(propData);
AzureIoTClient 0:1b5f413bf328 188 result = 0;
AzureIoTClient 0:1b5f413bf328 189 }
AzureIoTClient 0:1b5f413bf328 190 else
AzureIoTClient 0:1b5f413bf328 191 {
AzureIoTClient 0:1b5f413bf328 192 LogError("Failure pn_data_type.\r\n");
AzureIoTClient 0:1b5f413bf328 193 result = __LINE__;
AzureIoTClient 0:1b5f413bf328 194 }
AzureIoTClient 0:1b5f413bf328 195 }
AzureIoTClient 0:1b5f413bf328 196 else
AzureIoTClient 0:1b5f413bf328 197 {
AzureIoTClient 0:1b5f413bf328 198 LogError("Failure pn_data_next.\r\n");
AzureIoTClient 0:1b5f413bf328 199 result = __LINE__;
AzureIoTClient 0:1b5f413bf328 200 }
AzureIoTClient 0:1b5f413bf328 201 return result;
AzureIoTClient 0:1b5f413bf328 202 }
AzureIoTClient 0:1b5f413bf328 203
AzureIoTClient 0:1b5f413bf328 204 static int getMapInt(pn_data_t* propData, int32_t* mapValue)
AzureIoTClient 0:1b5f413bf328 205 {
AzureIoTClient 0:1b5f413bf328 206 int result;
AzureIoTClient 0:1b5f413bf328 207 if (pn_data_next(propData) == true)
AzureIoTClient 0:1b5f413bf328 208 {
AzureIoTClient 0:1b5f413bf328 209 if (pn_data_type(propData) == PN_INT)
AzureIoTClient 0:1b5f413bf328 210 {
AzureIoTClient 0:1b5f413bf328 211 *mapValue = pn_data_get_int(propData);
AzureIoTClient 0:1b5f413bf328 212 result = 0;
AzureIoTClient 0:1b5f413bf328 213 }
AzureIoTClient 0:1b5f413bf328 214 else
AzureIoTClient 0:1b5f413bf328 215 {
AzureIoTClient 0:1b5f413bf328 216 LogError("Failure pn_data_type.\r\n");
AzureIoTClient 0:1b5f413bf328 217 result = __LINE__;
AzureIoTClient 0:1b5f413bf328 218 }
AzureIoTClient 0:1b5f413bf328 219 }
AzureIoTClient 0:1b5f413bf328 220 else
AzureIoTClient 0:1b5f413bf328 221 {
AzureIoTClient 0:1b5f413bf328 222 LogError("Failure pn_data_next.\r\n");
AzureIoTClient 0:1b5f413bf328 223 result = __LINE__;
AzureIoTClient 0:1b5f413bf328 224 }
AzureIoTClient 0:1b5f413bf328 225 return result;
AzureIoTClient 0:1b5f413bf328 226 }
AzureIoTClient 0:1b5f413bf328 227
AzureIoTClient 2:1e6040e0c035 228 /* Code_SRS_IOTHUBTRANSPORTTAMQP_07_002: [On messages the properties shall be retrieved from the message using pn_message_properties and will be entered in the IoTHubMessage_Properties Map.] */
AzureIoTClient 0:1b5f413bf328 229 static int cloneProperties(IOTHUB_MESSAGE_HANDLE ioTMessage, pn_message_t* msg)
AzureIoTClient 0:1b5f413bf328 230 {
AzureIoTClient 0:1b5f413bf328 231 int result = 0;
AzureIoTClient 0:1b5f413bf328 232 size_t index = 0;
AzureIoTClient 0:1b5f413bf328 233 pn_data_t* propData = pn_message_properties(msg);
AzureIoTClient 0:1b5f413bf328 234 if (propData == NULL)
AzureIoTClient 0:1b5f413bf328 235 {
AzureIoTClient 0:1b5f413bf328 236 LogError("Failure pn_message_properties.\r\n");
AzureIoTClient 0:1b5f413bf328 237 result = __LINE__;
AzureIoTClient 0:1b5f413bf328 238 }
AzureIoTClient 0:1b5f413bf328 239 else if (pn_data_next(propData) == false )
AzureIoTClient 0:1b5f413bf328 240 {
AzureIoTClient 0:1b5f413bf328 241 // This should fail if there is no next sibling which means that there
AzureIoTClient 0:1b5f413bf328 242 // are no property and it should continue
AzureIoTClient 0:1b5f413bf328 243 result = 0;
AzureIoTClient 0:1b5f413bf328 244 }
AzureIoTClient 0:1b5f413bf328 245 else
AzureIoTClient 0:1b5f413bf328 246 {
AzureIoTClient 0:1b5f413bf328 247 size_t propertyCount = pn_data_get_map(propData)/2;
AzureIoTClient 0:1b5f413bf328 248 if (propertyCount > 0)
AzureIoTClient 0:1b5f413bf328 249 {
AzureIoTClient 0:1b5f413bf328 250 if (pn_data_enter(propData) == false)
AzureIoTClient 0:1b5f413bf328 251 {
AzureIoTClient 0:1b5f413bf328 252 LogError("Failure pn_data_enter.\r\n");
AzureIoTClient 0:1b5f413bf328 253 result = __LINE__;
AzureIoTClient 0:1b5f413bf328 254 }
AzureIoTClient 0:1b5f413bf328 255 else
AzureIoTClient 0:1b5f413bf328 256 {
AzureIoTClient 0:1b5f413bf328 257 // Get the Properties from the Message
AzureIoTClient 0:1b5f413bf328 258 MAP_HANDLE properties = IoTHubMessage_Properties(ioTMessage);
AzureIoTClient 0:1b5f413bf328 259 if (properties == NULL)
AzureIoTClient 0:1b5f413bf328 260 {
AzureIoTClient 0:1b5f413bf328 261 result = __LINE__;
AzureIoTClient 0:1b5f413bf328 262 }
AzureIoTClient 0:1b5f413bf328 263 else
AzureIoTClient 0:1b5f413bf328 264 {
AzureIoTClient 0:1b5f413bf328 265 for (index = 0; index < propertyCount; index++)
AzureIoTClient 0:1b5f413bf328 266 {
AzureIoTClient 0:1b5f413bf328 267 pn_bytes_t propValue;
AzureIoTClient 0:1b5f413bf328 268 pn_bytes_t propName;
AzureIoTClient 0:1b5f413bf328 269 if (getMapString(propData, &propValue) != 0)
AzureIoTClient 0:1b5f413bf328 270 {
AzureIoTClient 0:1b5f413bf328 271 result = __LINE__;
AzureIoTClient 0:1b5f413bf328 272 break;
AzureIoTClient 0:1b5f413bf328 273 }
AzureIoTClient 0:1b5f413bf328 274 if (getMapString(propData, &propName) != 0)
AzureIoTClient 0:1b5f413bf328 275 {
AzureIoTClient 0:1b5f413bf328 276 result = __LINE__;
AzureIoTClient 0:1b5f413bf328 277 break;
AzureIoTClient 0:1b5f413bf328 278 }
AzureIoTClient 0:1b5f413bf328 279
AzureIoTClient 0:1b5f413bf328 280 if (Map_AddOrUpdate(properties, propValue.start, propName.start) != MAP_OK)
AzureIoTClient 0:1b5f413bf328 281 {
AzureIoTClient 0:1b5f413bf328 282 LogError("Failure Map_AddOrUpdate.\r\n");
AzureIoTClient 0:1b5f413bf328 283 result = __LINE__;
AzureIoTClient 0:1b5f413bf328 284 break;
AzureIoTClient 0:1b5f413bf328 285 }
AzureIoTClient 0:1b5f413bf328 286 }
AzureIoTClient 0:1b5f413bf328 287 if (index != propertyCount)
AzureIoTClient 0:1b5f413bf328 288 {
AzureIoTClient 0:1b5f413bf328 289 result = __LINE__;
AzureIoTClient 0:1b5f413bf328 290 }
AzureIoTClient 0:1b5f413bf328 291 else if (!pn_data_exit(propData) )
AzureIoTClient 0:1b5f413bf328 292 {
AzureIoTClient 0:1b5f413bf328 293 LogError("Failure pn_data_exit.\r\n");
AzureIoTClient 0:1b5f413bf328 294 result = __LINE__;
AzureIoTClient 0:1b5f413bf328 295 }
AzureIoTClient 0:1b5f413bf328 296 else
AzureIoTClient 0:1b5f413bf328 297 {
AzureIoTClient 0:1b5f413bf328 298 result = 0;
AzureIoTClient 0:1b5f413bf328 299 }
AzureIoTClient 0:1b5f413bf328 300 }
AzureIoTClient 0:1b5f413bf328 301 }
AzureIoTClient 0:1b5f413bf328 302 }
AzureIoTClient 0:1b5f413bf328 303 }
AzureIoTClient 0:1b5f413bf328 304 return result;
AzureIoTClient 0:1b5f413bf328 305 }
AzureIoTClient 0:1b5f413bf328 306
AzureIoTClient 0:1b5f413bf328 307 static void putSecondListAfterHeadOfFirst(PDLIST_ENTRY first, PDLIST_ENTRY second)
AzureIoTClient 0:1b5f413bf328 308 {
AzureIoTClient 0:1b5f413bf328 309 DList_AppendTailList(first->Flink, second);
AzureIoTClient 0:1b5f413bf328 310 DList_RemoveEntryList(second);
AzureIoTClient 0:1b5f413bf328 311 DList_InitializeListHead(second); // Just to be tidy.
AzureIoTClient 0:1b5f413bf328 312 }
AzureIoTClient 0:1b5f413bf328 313
AzureIoTClient 0:1b5f413bf328 314 static bool stopMessenger(pn_messenger_t* messenger)
AzureIoTClient 0:1b5f413bf328 315 {
AzureIoTClient 0:1b5f413bf328 316 bool succeeded;
AzureIoTClient 0:1b5f413bf328 317 int result;
AzureIoTClient 0:1b5f413bf328 318 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_021: [Invoke pn_messenger_stop on the messenger.]*/
AzureIoTClient 0:1b5f413bf328 319 result = pn_messenger_stop(messenger);
AzureIoTClient 0:1b5f413bf328 320 if (result == 0)
AzureIoTClient 0:1b5f413bf328 321 {
AzureIoTClient 0:1b5f413bf328 322 succeeded = true;
AzureIoTClient 0:1b5f413bf328 323 }
AzureIoTClient 0:1b5f413bf328 324 else if (result != PN_INPROGRESS)
AzureIoTClient 0:1b5f413bf328 325 {
AzureIoTClient 0:1b5f413bf328 326 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_023: [If pn_messenger_stop returns any result other than PN_INPROGRESS then place the messenger on a list for attempting to stop later.]*/
AzureIoTClient 0:1b5f413bf328 327 succeeded = false;
AzureIoTClient 0:1b5f413bf328 328 }
AzureIoTClient 0:1b5f413bf328 329 else
AzureIoTClient 0:1b5f413bf328 330 {
AzureIoTClient 0:1b5f413bf328 331 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_024: [If pn_messenger_stop returns PN_INPROGRESS invoke pn_messenger_stopped a fixed number of time, seeking a return of true.]*/
AzureIoTClient 0:1b5f413bf328 332 size_t numberOfTrys;
AzureIoTClient 0:1b5f413bf328 333 succeeded = false;
AzureIoTClient 0:1b5f413bf328 334 for (numberOfTrys = NUMBER_OF_MESSENGER_STOP_TRIES; numberOfTrys > 0; numberOfTrys--)
AzureIoTClient 0:1b5f413bf328 335 {
AzureIoTClient 0:1b5f413bf328 336 pn_messenger_work(messenger,100);
AzureIoTClient 0:1b5f413bf328 337 if (pn_messenger_stopped(messenger))
AzureIoTClient 0:1b5f413bf328 338 {
AzureIoTClient 0:1b5f413bf328 339 succeeded = true;
AzureIoTClient 0:1b5f413bf328 340 break;
AzureIoTClient 0:1b5f413bf328 341 }
AzureIoTClient 0:1b5f413bf328 342 }
AzureIoTClient 0:1b5f413bf328 343 }
AzureIoTClient 0:1b5f413bf328 344 return succeeded;
AzureIoTClient 0:1b5f413bf328 345 }
AzureIoTClient 0:1b5f413bf328 346
AzureIoTClient 0:1b5f413bf328 347 static void herdTheMessengers(PAMQP_TRANSPORT_STATE state, bool logMessengerStopFailures)
AzureIoTClient 0:1b5f413bf328 348 {
AzureIoTClient 0:1b5f413bf328 349 PDLIST_ENTRY currentMessengerLink = state->messengerCorral.Flink;
AzureIoTClient 0:1b5f413bf328 350
AzureIoTClient 0:1b5f413bf328 351 //
AzureIoTClient 0:1b5f413bf328 352 // We walk the list of messenger containers. We try to stop each one in turn.
AzureIoTClient 0:1b5f413bf328 353 //
AzureIoTClient 0:1b5f413bf328 354 // If we are able to stop the contained messenger, we will free the actual messenger. We will then free the messenger container.
AzureIoTClient 0:1b5f413bf328 355 //
AzureIoTClient 0:1b5f413bf328 356 while (currentMessengerLink != &state->messengerCorral)
AzureIoTClient 0:1b5f413bf328 357 {
AzureIoTClient 0:1b5f413bf328 358 PDLIST_ENTRY nextLink = currentMessengerLink->Flink;
AzureIoTClient 0:1b5f413bf328 359 pn_messenger_t* oldMessenger = containingRecord(currentMessengerLink, MESSENGER_CONTAINER, entry)->messengerThatDidNotStop;
AzureIoTClient 0:1b5f413bf328 360 if (stopMessenger(oldMessenger))
AzureIoTClient 0:1b5f413bf328 361 {
AzureIoTClient 0:1b5f413bf328 362 (void)(DList_RemoveEntryList(currentMessengerLink));
AzureIoTClient 0:1b5f413bf328 363 free(containingRecord(currentMessengerLink, MESSENGER_CONTAINER, entry));
AzureIoTClient 0:1b5f413bf328 364 pn_messenger_free(oldMessenger);
AzureIoTClient 0:1b5f413bf328 365 }
AzureIoTClient 0:1b5f413bf328 366 else
AzureIoTClient 0:1b5f413bf328 367 {
AzureIoTClient 0:1b5f413bf328 368 if (logMessengerStopFailures == true)
AzureIoTClient 0:1b5f413bf328 369 {
AzureIoTClient 0:1b5f413bf328 370 LogError("Unable to free the messenger at address: %p\r\n", oldMessenger);
AzureIoTClient 0:1b5f413bf328 371 }
AzureIoTClient 0:1b5f413bf328 372 }
AzureIoTClient 0:1b5f413bf328 373 currentMessengerLink = nextLink;
AzureIoTClient 0:1b5f413bf328 374 }
AzureIoTClient 0:1b5f413bf328 375 }
AzureIoTClient 0:1b5f413bf328 376
AzureIoTClient 0:1b5f413bf328 377 static void disposeOfOldMessenger(PAMQP_TRANSPORT_STATE state)
AzureIoTClient 0:1b5f413bf328 378 {
AzureIoTClient 0:1b5f413bf328 379 if (state->messenger)
AzureIoTClient 0:1b5f413bf328 380 {
AzureIoTClient 0:1b5f413bf328 381 if (stopMessenger(state->messenger))
AzureIoTClient 0:1b5f413bf328 382 {
AzureIoTClient 0:1b5f413bf328 383 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_022: [If pn_messenger_stop returns 0 then invoke pn_messenger_free on the messenger.]*/
AzureIoTClient 0:1b5f413bf328 384 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_025: [If pn_messenger_stopped returns true, then call pn_messenger_free.]*/
AzureIoTClient 0:1b5f413bf328 385 //
AzureIoTClient 0:1b5f413bf328 386 // Yay. It successfully stopped.
AzureIoTClient 0:1b5f413bf328 387 //
AzureIoTClient 0:1b5f413bf328 388 pn_messenger_free(state->messenger);
AzureIoTClient 0:1b5f413bf328 389 }
AzureIoTClient 0:1b5f413bf328 390 else
AzureIoTClient 0:1b5f413bf328 391 {
AzureIoTClient 0:1b5f413bf328 392 //
AzureIoTClient 0:1b5f413bf328 393 // If we couldn't destroy it, then put it on a list and try to do it later in our "spare" time.
AzureIoTClient 0:1b5f413bf328 394 //
AzureIoTClient 0:1b5f413bf328 395 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_023: [If pn_messenger_stop returns any result other than PN_INPROGRESS then place the messenger on a list for attempting to stop later.]*/
AzureIoTClient 0:1b5f413bf328 396 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_026: [If pn_messenger_stopped never returns true, then place the messenger on a list for attempting to stop later.]*/
AzureIoTClient 0:1b5f413bf328 397 PMESSENGER_CONTAINER newContainer = malloc(sizeof(MESSENGER_CONTAINER));
AzureIoTClient 0:1b5f413bf328 398 if (newContainer == NULL)
AzureIoTClient 0:1b5f413bf328 399 {
AzureIoTClient 0:1b5f413bf328 400 //
AzureIoTClient 0:1b5f413bf328 401 // Darn! This is not good. These containers are not very big. If we can't allocate one, things are pretty bad.
AzureIoTClient 0:1b5f413bf328 402 // However, it doesn't make sense to actually free this messenger. It hasn't been stopped and we can NOT be sure who
AzureIoTClient 0:1b5f413bf328 403 // is referencing the memory that it has allocated. The safest thing to do in this very bad situation is to drop
AzureIoTClient 0:1b5f413bf328 404 // it on the floor. It's a leak, but a leak is better than anything else. There is a pretty good likelyhood that
AzureIoTClient 0:1b5f413bf328 405 // the app is exiting soon. The dropped memory will be cleaned up by exit.
AzureIoTClient 0:1b5f413bf328 406 ;
AzureIoTClient 0:1b5f413bf328 407 }
AzureIoTClient 0:1b5f413bf328 408 else
AzureIoTClient 0:1b5f413bf328 409 {
AzureIoTClient 0:1b5f413bf328 410 newContainer->messengerThatDidNotStop = state->messenger;
AzureIoTClient 0:1b5f413bf328 411 DList_InsertTailList(&state->messengerCorral, &newContainer->entry);
AzureIoTClient 0:1b5f413bf328 412 }
AzureIoTClient 0:1b5f413bf328 413 }
AzureIoTClient 0:1b5f413bf328 414 state->messenger = NULL;
AzureIoTClient 0:1b5f413bf328 415 }
AzureIoTClient 0:1b5f413bf328 416 }
AzureIoTClient 0:1b5f413bf328 417
AzureIoTClient 0:1b5f413bf328 418 static void clientTransportAMQP_Destroy(TRANSPORT_HANDLE handle)
AzureIoTClient 0:1b5f413bf328 419 {
AzureIoTClient 0:1b5f413bf328 420 PAMQP_TRANSPORT_STATE transportState = (PAMQP_TRANSPORT_STATE)handle;
AzureIoTClient 0:1b5f413bf328 421
AzureIoTClient 0:1b5f413bf328 422 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_010: [clientTransportAMQP_Destroy shall do nothing if the handle is NULL.]*/
AzureIoTClient 0:1b5f413bf328 423 if (transportState)
AzureIoTClient 0:1b5f413bf328 424 {
AzureIoTClient 0:1b5f413bf328 425 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_018: [clientTransportAMQP_Destroy shall free all the resources currently in use.]*/
AzureIoTClient 0:1b5f413bf328 426 //
AzureIoTClient 0:1b5f413bf328 427 // Go through all of the active work items. Complete their associated messages.
AzureIoTClient 0:1b5f413bf328 428 //
AzureIoTClient 0:1b5f413bf328 429 while (!DList_IsListEmpty(&transportState->workInProgress))
AzureIoTClient 0:1b5f413bf328 430 {
AzureIoTClient 0:1b5f413bf328 431 PDLIST_ENTRY currentEntry = DList_RemoveHeadList(&transportState->workInProgress);
AzureIoTClient 0:1b5f413bf328 432 PAMQP_WORK_ITEM currentItem = containingRecord(currentEntry, AMQP_WORK_ITEM, link);
AzureIoTClient 0:1b5f413bf328 433 IoTHubClient_LL_SendComplete(transportState->savedClientHandle, &currentItem->eventMessages, IOTHUB_BATCHSTATE_FAILED);
AzureIoTClient 0:1b5f413bf328 434 DList_InsertTailList(&transportState->availableWorkItems, &currentItem->link);
AzureIoTClient 0:1b5f413bf328 435 }
AzureIoTClient 0:1b5f413bf328 436
AzureIoTClient 0:1b5f413bf328 437 //
AzureIoTClient 0:1b5f413bf328 438 // Get rid of work items that aren't being used, and that should be everything given the above loop!
AzureIoTClient 0:1b5f413bf328 439 //
AzureIoTClient 0:1b5f413bf328 440 while (!DList_IsListEmpty(&transportState->availableWorkItems))
AzureIoTClient 0:1b5f413bf328 441 {
AzureIoTClient 0:1b5f413bf328 442 PDLIST_ENTRY currentEntry = DList_RemoveHeadList(&transportState->availableWorkItems);
AzureIoTClient 0:1b5f413bf328 443 free(containingRecord(currentEntry, AMQP_WORK_ITEM, link));
AzureIoTClient 0:1b5f413bf328 444 }
AzureIoTClient 0:1b5f413bf328 445
AzureIoTClient 0:1b5f413bf328 446 disposeOfOldMessenger(transportState);
AzureIoTClient 0:1b5f413bf328 447
AzureIoTClient 0:1b5f413bf328 448 //
AzureIoTClient 0:1b5f413bf328 449 // herdTheMessengers will NOT free a messenger that couldn't be stopped. Therefore the following
AzureIoTClient 0:1b5f413bf328 450 // function could result in a memory leak of old messengers. This is by FAR less of a problem then
AzureIoTClient 0:1b5f413bf328 451 // having memory returned in to pool that something else might still be referring to. (To get all folksy
AzureIoTClient 0:1b5f413bf328 452 // here, don't put a sick chicken back in the hen house.)
AzureIoTClient 0:1b5f413bf328 453 //
AzureIoTClient 0:1b5f413bf328 454
AzureIoTClient 0:1b5f413bf328 455 herdTheMessengers(transportState, true);
AzureIoTClient 0:1b5f413bf328 456 if (transportState->message)
AzureIoTClient 0:1b5f413bf328 457 {
AzureIoTClient 0:1b5f413bf328 458 pn_message_free(transportState->message);
AzureIoTClient 0:1b5f413bf328 459 }
AzureIoTClient 2:1e6040e0c035 460 STRING_delete(transportState->messageAddress);
AzureIoTClient 0:1b5f413bf328 461 STRING_delete(transportState->eventAddress);
AzureIoTClient 0:1b5f413bf328 462 STRING_delete(transportState->urledDeviceId);
AzureIoTClient 0:1b5f413bf328 463 STRING_delete(transportState->devicesPortionPath);
AzureIoTClient 0:1b5f413bf328 464 STRING_delete(transportState->cbsAddress);
AzureIoTClient 0:1b5f413bf328 465 STRING_delete(transportState->deviceKey);
AzureIoTClient 0:1b5f413bf328 466 STRING_delete(transportState->zeroLengthString);
AzureIoTClient 0:1b5f413bf328 467 if (transportState->trustedCertificates != NULL)
AzureIoTClient 0:1b5f413bf328 468 {
AzureIoTClient 0:1b5f413bf328 469 free(transportState->trustedCertificates);
AzureIoTClient 0:1b5f413bf328 470 }
AzureIoTClient 0:1b5f413bf328 471
AzureIoTClient 0:1b5f413bf328 472 free(transportState);
AzureIoTClient 0:1b5f413bf328 473 }
AzureIoTClient 0:1b5f413bf328 474 }
AzureIoTClient 0:1b5f413bf328 475
AzureIoTClient 0:1b5f413bf328 476 static int putToken(PAMQP_TRANSPORT_STATE transportState, STRING_HANDLE token)
AzureIoTClient 0:1b5f413bf328 477 {
AzureIoTClient 0:1b5f413bf328 478 int result = 0;
AzureIoTClient 0:1b5f413bf328 479
AzureIoTClient 0:1b5f413bf328 480 pn_data_t *properties;
AzureIoTClient 0:1b5f413bf328 481 pn_atom_t messageId;
AzureIoTClient 0:1b5f413bf328 482
AzureIoTClient 0:1b5f413bf328 483 messageId.u.as_ulong = transportState->sendTokenMessageId;
AzureIoTClient 0:1b5f413bf328 484 messageId.type = PN_ULONG;
AzureIoTClient 0:1b5f413bf328 485 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_130: [putToken clears the single messages that is used for all communication via proton.]*/
AzureIoTClient 0:1b5f413bf328 486 pn_message_clear(transportState->message);
AzureIoTClient 0:1b5f413bf328 487 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_131: [putToken obtains the properties of the message.]*/
AzureIoTClient 0:1b5f413bf328 488 properties = pn_message_properties(transportState->message);
AzureIoTClient 0:1b5f413bf328 489 if (properties == NULL)
AzureIoTClient 0:1b5f413bf328 490 {
AzureIoTClient 0:1b5f413bf328 491 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_132: [If this fails then putToken fails.]*/
AzureIoTClient 0:1b5f413bf328 492 LogError("unable to pn_message_properties\r\n");
AzureIoTClient 0:1b5f413bf328 493 result = __LINE__;
AzureIoTClient 0:1b5f413bf328 494 }
AzureIoTClient 0:1b5f413bf328 495 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_133: [putToken constructs a proton map which describes the put token operation. If this construction fails then putToken fails.]*/
AzureIoTClient 0:1b5f413bf328 496 else if (!((pn_data_put_map(properties) == 0) &&
AzureIoTClient 0:1b5f413bf328 497 (pn_data_enter(properties) == true) &&
AzureIoTClient 0:1b5f413bf328 498 (pn_data_put_string(properties, pn_bytes(strlen(PROTON_MAP_OPERATIONS_KEY), PROTON_MAP_OPERATIONS_KEY)) == 0) && // key
AzureIoTClient 0:1b5f413bf328 499 (pn_data_put_string(properties, pn_bytes(strlen(PROTON_MAP_PUT_TOKEN_OPERATION), PROTON_MAP_PUT_TOKEN_OPERATION)) == 0) && // value
AzureIoTClient 0:1b5f413bf328 500 (pn_data_put_string(properties, pn_bytes(strlen(PROTON_MAP_TYPE_KEY), PROTON_MAP_TYPE_KEY)) == 0) && // key
AzureIoTClient 0:1b5f413bf328 501 (pn_data_put_string(properties, pn_bytes(strlen(PROTON_MAP_TOKEN_TYPE), PROTON_MAP_TOKEN_TYPE)) == 0) && // value
AzureIoTClient 0:1b5f413bf328 502 (pn_data_put_string(properties, pn_bytes(strlen(PROTON_MAP_NAME_KEY), PROTON_MAP_NAME_KEY)) == 0) && // key
AzureIoTClient 0:1b5f413bf328 503 (pn_data_put_string(properties, pn_bytes(STRING_length(transportState->devicesPortionPath), STRING_c_str(transportState->devicesPortionPath))) == 0) && // value
AzureIoTClient 0:1b5f413bf328 504 (pn_data_exit(properties))
AzureIoTClient 0:1b5f413bf328 505 ))
AzureIoTClient 0:1b5f413bf328 506 {
AzureIoTClient 0:1b5f413bf328 507 LogError("unable to build CBS put token map\r\n");
AzureIoTClient 0:1b5f413bf328 508 result = __LINE__;
AzureIoTClient 0:1b5f413bf328 509 }
AzureIoTClient 0:1b5f413bf328 510 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_134: [putToken sets the address (CBS endpoint) that this put token operation is targeted to.]*/
AzureIoTClient 0:1b5f413bf328 511 else if (pn_message_set_reply_to(transportState->message, CBS_REPLY_TO) != 0)
AzureIoTClient 0:1b5f413bf328 512 {
AzureIoTClient 0:1b5f413bf328 513 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_135: [If this fails then putToken fails.]*/
AzureIoTClient 0:1b5f413bf328 514 LogError("unable to pn_message_set_reply_to\r\n");
AzureIoTClient 0:1b5f413bf328 515 result = __LINE__;
AzureIoTClient 0:1b5f413bf328 516 }
AzureIoTClient 0:1b5f413bf328 517 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_136: [putToken sets the reply to address for the put token operation.]*/
AzureIoTClient 0:1b5f413bf328 518 else if (pn_message_set_address(transportState->message, STRING_c_str(transportState->cbsAddress)) != 0)
AzureIoTClient 0:1b5f413bf328 519 {
AzureIoTClient 0:1b5f413bf328 520 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_137: [If this fails then putToken fails.] */
AzureIoTClient 0:1b5f413bf328 521 LogError("unable to pn_message_set_address\r\n");
AzureIoTClient 0:1b5f413bf328 522 result = __LINE__;
AzureIoTClient 0:1b5f413bf328 523 }
AzureIoTClient 0:1b5f413bf328 524 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_138: [putToken sets the message to not be inferred.]*/
AzureIoTClient 0:1b5f413bf328 525 else if (pn_message_set_inferred(transportState->message, false) != 0)
AzureIoTClient 0:1b5f413bf328 526 {
AzureIoTClient 0:1b5f413bf328 527 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_139: [If this fails then putToken fails.]*/
AzureIoTClient 0:1b5f413bf328 528 LogError("unable to pn_message_set_inferred\r\n");
AzureIoTClient 0:1b5f413bf328 529 result = __LINE__;
AzureIoTClient 0:1b5f413bf328 530 }
AzureIoTClient 0:1b5f413bf328 531 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_140: [putToken sets the messageId of the message to the expiry of the token.]*/
AzureIoTClient 0:1b5f413bf328 532 else if (pn_message_set_id(transportState->message, messageId) != 0)
AzureIoTClient 0:1b5f413bf328 533 {
AzureIoTClient 0:1b5f413bf328 534 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_141: [If this fails then putToken fails.]*/
AzureIoTClient 0:1b5f413bf328 535 LogError("Unable to set the message id on the transfer of the token to the CBS\r\n");
AzureIoTClient 0:1b5f413bf328 536 result = __LINE__;
AzureIoTClient 0:1b5f413bf328 537 }
AzureIoTClient 0:1b5f413bf328 538 else
AzureIoTClient 0:1b5f413bf328 539 {
AzureIoTClient 0:1b5f413bf328 540 pn_data_t* body;
AzureIoTClient 0:1b5f413bf328 541 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_142: [putToken obtains the body of the message.]*/
AzureIoTClient 0:1b5f413bf328 542 body = pn_message_body(transportState->message);
AzureIoTClient 0:1b5f413bf328 543 if (body == NULL)
AzureIoTClient 0:1b5f413bf328 544 {
AzureIoTClient 0:1b5f413bf328 545 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_143: [If this fails then putToken fails.]*/
AzureIoTClient 0:1b5f413bf328 546 LogError("Unable to pn_message_body\r\n");
AzureIoTClient 0:1b5f413bf328 547 result = __LINE__;
AzureIoTClient 0:1b5f413bf328 548 }
AzureIoTClient 0:1b5f413bf328 549 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_144: [putToken places the actual SAS token into the body of the message.]*/
AzureIoTClient 0:1b5f413bf328 550 else if (pn_data_put_string(body, pn_bytes(STRING_length(token), STRING_c_str(token))) != 0)
AzureIoTClient 0:1b5f413bf328 551 {
AzureIoTClient 0:1b5f413bf328 552 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_145: [If this fails then putToken fails.]*/
AzureIoTClient 0:1b5f413bf328 553 LogError("Unable to pn_data_put_string\r\n");
AzureIoTClient 0:1b5f413bf328 554 result = __LINE__;
AzureIoTClient 0:1b5f413bf328 555 }
AzureIoTClient 0:1b5f413bf328 556 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_146: [putToken invokes pn_messenger_put.]*/
AzureIoTClient 0:1b5f413bf328 557 else if (pn_messenger_put(transportState->messenger, transportState->message) != 0)
AzureIoTClient 0:1b5f413bf328 558 {
AzureIoTClient 0:1b5f413bf328 559 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_147: [If this fails then putToken fails.]*/
AzureIoTClient 0:1b5f413bf328 560 LogError("Unable to pn_messenger_put\r\n");
AzureIoTClient 0:1b5f413bf328 561 result = __LINE__;
AzureIoTClient 0:1b5f413bf328 562 transportState->messengerInitialized = false;
AzureIoTClient 0:1b5f413bf328 563 }
AzureIoTClient 0:1b5f413bf328 564 else
AzureIoTClient 0:1b5f413bf328 565 {
AzureIoTClient 0:1b5f413bf328 566 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_148: [A proton tracker is obtained.]*/
AzureIoTClient 0:1b5f413bf328 567 pn_tracker_t tracker = pn_messenger_outgoing_tracker(transportState->messenger);
AzureIoTClient 0:1b5f413bf328 568 time_t beginningOfWaitLoop = get_time(NULL);
AzureIoTClient 0:1b5f413bf328 569 time_t currentTime;
AzureIoTClient 0:1b5f413bf328 570 result = __LINE__;
AzureIoTClient 0:1b5f413bf328 571 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_149: [putToken will loop invoking pn_messenger_work waiting for the message to reach a terminal delivery state.]*/
AzureIoTClient 0:1b5f413bf328 572 do
AzureIoTClient 0:1b5f413bf328 573 {
AzureIoTClient 0:1b5f413bf328 574 int protonResult;
AzureIoTClient 0:1b5f413bf328 575 if (checkForErrorsThenWork(transportState) == false)
AzureIoTClient 0:1b5f413bf328 576 {
AzureIoTClient 0:1b5f413bf328 577 break;
AzureIoTClient 0:1b5f413bf328 578 }
AzureIoTClient 0:1b5f413bf328 579 else
AzureIoTClient 0:1b5f413bf328 580 {
AzureIoTClient 0:1b5f413bf328 581 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_150: [If a terminal delivery state is not reached then putToken fails.]*/
AzureIoTClient 0:1b5f413bf328 582 if ((protonResult = pn_messenger_status(transportState->messenger, tracker)) != PN_STATUS_PENDING)
AzureIoTClient 0:1b5f413bf328 583 {
AzureIoTClient 0:1b5f413bf328 584 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_151: [On reaching a terminal delivery state, if the message status is NOT accepted then putToken fails.]*/
AzureIoTClient 0:1b5f413bf328 585 if (protonResult != PN_STATUS_ACCEPTED)
AzureIoTClient 0:1b5f413bf328 586 {
AzureIoTClient 0:1b5f413bf328 587 LogError("Error sending the token: %s\r\n", pn_error_text(pn_messenger_error(transportState->messenger)));
AzureIoTClient 0:1b5f413bf328 588 result = __LINE__;
AzureIoTClient 0:1b5f413bf328 589 transportState->messengerInitialized = false;
AzureIoTClient 0:1b5f413bf328 590 }
AzureIoTClient 0:1b5f413bf328 591 else
AzureIoTClient 0:1b5f413bf328 592 {
AzureIoTClient 0:1b5f413bf328 593 result = 0;
AzureIoTClient 0:1b5f413bf328 594 }
AzureIoTClient 0:1b5f413bf328 595 break;
AzureIoTClient 0:1b5f413bf328 596 }
AzureIoTClient 0:1b5f413bf328 597 }
AzureIoTClient 0:1b5f413bf328 598 currentTime = get_time(NULL);
AzureIoTClient 0:1b5f413bf328 599 } while (difftime(currentTime, beginningOfWaitLoop) < transportState->cbsRequestAcceptTime);
AzureIoTClient 0:1b5f413bf328 600
AzureIoTClient 0:1b5f413bf328 601 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_152: [The tracker is settled.]*/
AzureIoTClient 0:1b5f413bf328 602 pn_messenger_settle(transportState->messenger, tracker, 0);
AzureIoTClient 0:1b5f413bf328 603 if (result == 0)
AzureIoTClient 0:1b5f413bf328 604 {
AzureIoTClient 0:1b5f413bf328 605 //
AzureIoTClient 0:1b5f413bf328 606 // Successfully sent the token. Wait around for a reply.
AzureIoTClient 0:1b5f413bf328 607 //
AzureIoTClient 0:1b5f413bf328 608 transportState->waitingForPutTokenReply = true;
AzureIoTClient 0:1b5f413bf328 609 beginningOfWaitLoop = get_time(NULL);
AzureIoTClient 0:1b5f413bf328 610 result = __LINE__;
AzureIoTClient 0:1b5f413bf328 611 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_153: [putToken will again loop waiting for a reply.]*/
AzureIoTClient 0:1b5f413bf328 612 do
AzureIoTClient 0:1b5f413bf328 613 {
AzureIoTClient 0:1b5f413bf328 614 processReceives(transportState);
AzureIoTClient 0:1b5f413bf328 615 if (transportState->waitingForPutTokenReply == true)
AzureIoTClient 0:1b5f413bf328 616 {
AzureIoTClient 0:1b5f413bf328 617 if (checkForErrorsThenWork(transportState) == false)
AzureIoTClient 0:1b5f413bf328 618 {
AzureIoTClient 0:1b5f413bf328 619 break;
AzureIoTClient 0:1b5f413bf328 620 }
AzureIoTClient 0:1b5f413bf328 621 }
AzureIoTClient 0:1b5f413bf328 622 else
AzureIoTClient 0:1b5f413bf328 623 {
AzureIoTClient 0:1b5f413bf328 624 result = 0;
AzureIoTClient 0:1b5f413bf328 625 break;
AzureIoTClient 0:1b5f413bf328 626 }
AzureIoTClient 0:1b5f413bf328 627 currentTime = get_time(NULL);
AzureIoTClient 0:1b5f413bf328 628 } while (difftime(currentTime, beginningOfWaitLoop) < transportState->cbsReplyTime);
AzureIoTClient 0:1b5f413bf328 629 }
AzureIoTClient 0:1b5f413bf328 630 else
AzureIoTClient 0:1b5f413bf328 631 {
AzureIoTClient 0:1b5f413bf328 632 LogError("An network error occured while waiting for a CBS reply.\r\n");
AzureIoTClient 0:1b5f413bf328 633 transportState->messengerInitialized = false;
AzureIoTClient 0:1b5f413bf328 634 }
AzureIoTClient 0:1b5f413bf328 635 }
AzureIoTClient 0:1b5f413bf328 636 }
AzureIoTClient 0:1b5f413bf328 637 pn_message_clear(transportState->message);
AzureIoTClient 0:1b5f413bf328 638 return result;
AzureIoTClient 0:1b5f413bf328 639 }
AzureIoTClient 0:1b5f413bf328 640
AzureIoTClient 0:1b5f413bf328 641 static void renewIfNecessaryTheCBS(PAMQP_TRANSPORT_STATE transportState)
AzureIoTClient 0:1b5f413bf328 642 {
AzureIoTClient 0:1b5f413bf328 643
AzureIoTClient 0:1b5f413bf328 644 //
AzureIoTClient 0:1b5f413bf328 645 // There is an expiry stored in the state. It has the last expiry value used to define a
AzureIoTClient 0:1b5f413bf328 646 // sas token. If we are within 20 minutes OR we're PAST that expiration generate a new
AzureIoTClient 0:1b5f413bf328 647 // expiry time and create a new sas token and "put" it to the cbs.
AzureIoTClient 0:1b5f413bf328 648 //
AzureIoTClient 0:1b5f413bf328 649
AzureIoTClient 0:1b5f413bf328 650 size_t secondsSinceEpoch;
AzureIoTClient 0:1b5f413bf328 651 int differenceWithLastExpiry;
AzureIoTClient 0:1b5f413bf328 652
AzureIoTClient 0:1b5f413bf328 653 transportState->putTokenWasSuccessful = false;
AzureIoTClient 0:1b5f413bf328 654 //
AzureIoTClient 0:1b5f413bf328 655 // Get the number of seconds since the epoch.
AzureIoTClient 0:1b5f413bf328 656 //
AzureIoTClient 0:1b5f413bf328 657 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_163: [Invocation of renewIfNecessaryTheCBS will first obtain the current number of seconds since the epoch.*/
AzureIoTClient 0:1b5f413bf328 658 secondsSinceEpoch = (size_t)(difftime(get_time(NULL), EPOCH_TIME_T_VALUE)+0); // adding zero because a compiler feels it's necessary.
AzureIoTClient 0:1b5f413bf328 659 differenceWithLastExpiry = transportState->lastExpiryUsed - secondsSinceEpoch;
AzureIoTClient 0:1b5f413bf328 660 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_164: [If the difference between lastExpiryUsed and the current value is less than refresh seconds then renewIfNecessaryTheCBS will attempt to renew the SAS.]*/
AzureIoTClient 0:1b5f413bf328 661 if ((differenceWithLastExpiry <= 0) ||
AzureIoTClient 0:1b5f413bf328 662 (((size_t)differenceWithLastExpiry) < transportState->sasRefreshLine)) // Within refresh minutes (or past if negative) of the expiration
AzureIoTClient 0:1b5f413bf328 663 {
AzureIoTClient 0:1b5f413bf328 664 //
AzureIoTClient 0:1b5f413bf328 665 // We already have the seconds since the epoch. Add an hour to it and generate the new SAS.
AzureIoTClient 0:1b5f413bf328 666 //
AzureIoTClient 0:1b5f413bf328 667 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_165: [If SAS renewal is to occur then lifetime seconds is added to the current number of seconds.]*/
AzureIoTClient 0:1b5f413bf328 668 size_t possibleNewExpiry = secondsSinceEpoch + transportState->sasTokenLifetime;
AzureIoTClient 0:1b5f413bf328 669 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_169: [SASToken_Create is invoked.] */
AzureIoTClient 0:1b5f413bf328 670 STRING_HANDLE newSASToken = SASToken_Create(transportState->deviceKey, transportState->devicesPortionPath, transportState->zeroLengthString, possibleNewExpiry);
AzureIoTClient 0:1b5f413bf328 671 if (newSASToken == NULL)
AzureIoTClient 0:1b5f413bf328 672 {
AzureIoTClient 0:1b5f413bf328 673 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_170: [If SASToken_Create returns NULL then renewal is considered a failure and we move on to the next phase of DoWork.]*/
AzureIoTClient 0:1b5f413bf328 674 LogError("Could not generate a new SAS token for the CBS\r\n");
AzureIoTClient 0:1b5f413bf328 675 }
AzureIoTClient 0:1b5f413bf328 676 else
AzureIoTClient 0:1b5f413bf328 677 {
AzureIoTClient 0:1b5f413bf328 678 //
AzureIoTClient 0:1b5f413bf328 679 // We are going to save off in the transport state this POSSIBLE expiry value. Sending token and response processing code
AzureIoTClient 0:1b5f413bf328 680 // will use it as the message id and correlation id for the request/response. This will work
AzureIoTClient 0:1b5f413bf328 681 // because the code that sends the token will wait for AT LEAST one second for a valid response. If a valid response comes
AzureIoTClient 0:1b5f413bf328 682 // back in that period of time then even if for some bizarre reason we need to renew again in less than a second, we know
AzureIoTClient 0:1b5f413bf328 683 // there won't be a spurious response (because we just got the resonse!).
AzureIoTClient 0:1b5f413bf328 684 //
AzureIoTClient 0:1b5f413bf328 685 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_166: [This value is saved in the state as sendTokenMessageId.]*/
AzureIoTClient 0:1b5f413bf328 686 transportState->sendTokenMessageId = possibleNewExpiry;
AzureIoTClient 0:1b5f413bf328 687 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_171: [renewIfNecessaryTheCBS will invoke a local static function putToken.]*/
AzureIoTClient 0:1b5f413bf328 688 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_172: [If putToken fails then the renewal is considered a failure and we move on to the next phase of DoWork.]*/
AzureIoTClient 0:1b5f413bf328 689 if (putToken(transportState, newSASToken) == 0)
AzureIoTClient 0:1b5f413bf328 690 {
AzureIoTClient 0:1b5f413bf328 691 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_173: [Note that putToken actually sets a state value which will indicate whether the CBS accepted the token and responded with success.]*/
AzureIoTClient 0:1b5f413bf328 692 if (transportState->putTokenWasSuccessful == true)
AzureIoTClient 0:1b5f413bf328 693 {
AzureIoTClient 0:1b5f413bf328 694 transportState->lastExpiryUsed = possibleNewExpiry;
AzureIoTClient 0:1b5f413bf328 695 }
AzureIoTClient 0:1b5f413bf328 696 }
AzureIoTClient 0:1b5f413bf328 697 STRING_delete(newSASToken);
AzureIoTClient 0:1b5f413bf328 698 }
AzureIoTClient 0:1b5f413bf328 699 }
AzureIoTClient 0:1b5f413bf328 700 }
AzureIoTClient 0:1b5f413bf328 701
AzureIoTClient 0:1b5f413bf328 702 static bool protonMessengerInit(PAMQP_TRANSPORT_STATE amqpState)
AzureIoTClient 0:1b5f413bf328 703 {
AzureIoTClient 0:1b5f413bf328 704 if (!amqpState->messengerInitialized)
AzureIoTClient 0:1b5f413bf328 705 {
AzureIoTClient 0:1b5f413bf328 706 rollbackEvent(amqpState);
AzureIoTClient 0:1b5f413bf328 707 disposeOfOldMessenger(amqpState);
AzureIoTClient 0:1b5f413bf328 708 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_027: [DoWork shall invoke pn_messenger to create a new messenger.]*/
AzureIoTClient 0:1b5f413bf328 709 if ((amqpState->messenger = pn_messenger(NULL)) == NULL)
AzureIoTClient 0:1b5f413bf328 710 {
AzureIoTClient 0:1b5f413bf328 711 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_028: [If pn_messenger returns NULL then the messenger initialization fails.]*/
AzureIoTClient 0:1b5f413bf328 712 LogError("The messenger is not able to be created.\r\n");
AzureIoTClient 0:1b5f413bf328 713 }
AzureIoTClient 0:1b5f413bf328 714 /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_001: [If the option "TrustedCerts" has been set (length greater than 0), the trusted certificates string shall be passed to Proton by a call to pn_messenger_set_trusted_certificates.] */
AzureIoTClient 0:1b5f413bf328 715 else if ((amqpState->trustedCertificates != NULL) &&
AzureIoTClient 0:1b5f413bf328 716 (strlen(amqpState->trustedCertificates) > 0) &&
AzureIoTClient 0:1b5f413bf328 717 (pn_messenger_set_trusted_certificates(amqpState->messenger, amqpState->trustedCertificates) != 0))
AzureIoTClient 0:1b5f413bf328 718 {
AzureIoTClient 0:1b5f413bf328 719 /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_002: [If pn_messenger_set_trusted_certificates fails, then messenger initialization shall fail.] */
AzureIoTClient 0:1b5f413bf328 720 LogError("unable to pass certificate information via pn_messenger_set_trusted_certificates\r\n");
AzureIoTClient 0:1b5f413bf328 721 }
AzureIoTClient 0:1b5f413bf328 722 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_029: [pn_messenger_start shall be invoked.]*/
AzureIoTClient 0:1b5f413bf328 723 else if (pn_messenger_start(amqpState->messenger) != 0)
AzureIoTClient 0:1b5f413bf328 724 {
AzureIoTClient 0:1b5f413bf328 725 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_030: [If pn_messenger_start returns a non-zero value then messenger initialization fails.]*/
AzureIoTClient 0:1b5f413bf328 726 LogError("unable to pn_messenger_start\r\n");
AzureIoTClient 0:1b5f413bf328 727 }
AzureIoTClient 0:1b5f413bf328 728 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_031: [pn_messenger_set_blocking shall be invoked.]*/
AzureIoTClient 0:1b5f413bf328 729 else if (pn_messenger_set_blocking(amqpState->messenger, false) != 0)
AzureIoTClient 0:1b5f413bf328 730 {
AzureIoTClient 0:1b5f413bf328 731 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_032: [If pn_messenger_set_blocking returns a non-zero value then messenger initialization fails.]*/
AzureIoTClient 0:1b5f413bf328 732 LogError("unable to pn_messenger_set_blocking\r\n");
AzureIoTClient 0:1b5f413bf328 733 }
AzureIoTClient 0:1b5f413bf328 734 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_033: [pn_messenger_set_snd_settle_mode shall be invoked.]*/
AzureIoTClient 0:1b5f413bf328 735 else if (pn_messenger_set_snd_settle_mode(amqpState->messenger, PN_SND_UNSETTLED) != 0)
AzureIoTClient 0:1b5f413bf328 736 {
AzureIoTClient 0:1b5f413bf328 737 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_034: [If pn_messenger_set_snd_settle_mode returns a non-zero value then messenger initialization fails.]*/
AzureIoTClient 0:1b5f413bf328 738 LogError("unable to set the send settle mode\r\n");
AzureIoTClient 0:1b5f413bf328 739 }
AzureIoTClient 0:1b5f413bf328 740 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_035: [pn_messenger_set_rcv_settle_mode shall be invoked.]*/
AzureIoTClient 0:1b5f413bf328 741 else if (pn_messenger_set_rcv_settle_mode(amqpState->messenger, PN_RCV_FIRST) != 0)
AzureIoTClient 0:1b5f413bf328 742 {
AzureIoTClient 0:1b5f413bf328 743 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_036: [If pn_messenger_set_rcv_settle_mode returns a non-zero value then messenger initialization fails.]*/
AzureIoTClient 0:1b5f413bf328 744 LogError("unable to set the receive settle mode\r\n");
AzureIoTClient 0:1b5f413bf328 745 }
AzureIoTClient 0:1b5f413bf328 746 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_037: [pn_messenger_set_outgoing_window shall be invoked.]*/
AzureIoTClient 0:1b5f413bf328 747 else if (pn_messenger_set_outgoing_window(amqpState->messenger, PROTON_OUTGOING_WINDOW_SIZE) != 0)
AzureIoTClient 0:1b5f413bf328 748 {
AzureIoTClient 0:1b5f413bf328 749 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_038: [If pn_messenger_set_outgoing_window returns a non-zero value then messenger initialization fails.]*/
AzureIoTClient 0:1b5f413bf328 750 LogError("unable to pn_messenger_set_outgoing_window\r\n");
AzureIoTClient 0:1b5f413bf328 751 }
AzureIoTClient 0:1b5f413bf328 752 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_039: [pn_messenger_set_incoming_window shall be invoked.]*/
AzureIoTClient 0:1b5f413bf328 753 else if (pn_messenger_set_incoming_window(amqpState->messenger, PROTON_INCOMING_WINDOW_SIZE) != 0)
AzureIoTClient 0:1b5f413bf328 754 {
AzureIoTClient 0:1b5f413bf328 755 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_040: [If pn_messenger_set_incoming_window returns a non-zero value then messenger initialization fails.]*/
AzureIoTClient 0:1b5f413bf328 756 LogError("unable to pn_messenger_set_incoming_window\r\n");
AzureIoTClient 0:1b5f413bf328 757 }
AzureIoTClient 0:1b5f413bf328 758 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_156: [pn_messenger_subscribe will be invoked on the cbs address.]*/
AzureIoTClient 0:1b5f413bf328 759 else if ((amqpState->cbsSubscription = pn_messenger_subscribe(amqpState->messenger, (const char*)STRING_c_str(amqpState->cbsAddress))) == NULL)
AzureIoTClient 0:1b5f413bf328 760 {
AzureIoTClient 0:1b5f413bf328 761 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_157: [If the pn_messenger_subscribe returns a non-zero value then messenger initialization fails.]*/
AzureIoTClient 0:1b5f413bf328 762 LogError("unable to create a subscription to the cbs address\r\n");
AzureIoTClient 0:1b5f413bf328 763 }
AzureIoTClient 0:1b5f413bf328 764 else
AzureIoTClient 0:1b5f413bf328 765 {
AzureIoTClient 0:1b5f413bf328 766 //We've got the subscription. Now we attempt to connect to the cbs.
AzureIoTClient 0:1b5f413bf328 767 /*Codes_During messenger initialization a state variable known as lastExpiryUsed will be set to zero.*/
AzureIoTClient 0:1b5f413bf328 768 amqpState->lastExpiryUsed = 0;
AzureIoTClient 0:1b5f413bf328 769 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_159: [renewIfNecessaryCBS will be invoked.]*/
AzureIoTClient 0:1b5f413bf328 770 renewIfNecessaryTheCBS(amqpState);
AzureIoTClient 0:1b5f413bf328 771 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_160: [If after renewIfNecessaryCBS is invoked, the state variable putTokenWasSuccessful is false then the messenger initialization fails.]*/
AzureIoTClient 0:1b5f413bf328 772 if (amqpState->putTokenWasSuccessful == true)
AzureIoTClient 0:1b5f413bf328 773 {
AzureIoTClient 0:1b5f413bf328 774 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_041: [pn_messenger_subscribe shall be invoked.]*/
AzureIoTClient 2:1e6040e0c035 775 if ((amqpState->messageSubscription = pn_messenger_subscribe(amqpState->messenger, (const char*)STRING_c_str(amqpState->messageAddress))) == NULL)
AzureIoTClient 0:1b5f413bf328 776 {
AzureIoTClient 0:1b5f413bf328 777 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_042: [If pn_messenger_subscribe returns a non-zero value then messenger initialization fails.]*/
AzureIoTClient 2:1e6040e0c035 778 LogError("unable to create a subscription to the message address\r\n");
AzureIoTClient 0:1b5f413bf328 779 }
AzureIoTClient 0:1b5f413bf328 780 else
AzureIoTClient 0:1b5f413bf328 781 {
AzureIoTClient 0:1b5f413bf328 782 amqpState->messengerInitialized = true;
AzureIoTClient 0:1b5f413bf328 783 }
AzureIoTClient 0:1b5f413bf328 784 }
AzureIoTClient 0:1b5f413bf328 785 }
AzureIoTClient 0:1b5f413bf328 786 }
AzureIoTClient 0:1b5f413bf328 787 return amqpState->messengerInitialized;
AzureIoTClient 0:1b5f413bf328 788 }
AzureIoTClient 0:1b5f413bf328 789
AzureIoTClient 0:1b5f413bf328 790 static amqp_batch_result prepareBatch(size_t maximumPayload, PDLIST_ENTRY waitingToSend, PDLIST_ENTRY messagesMakingUpBatch, size_t* payloadSize, const unsigned char** payload)
AzureIoTClient 0:1b5f413bf328 791 {
AzureIoTClient 0:1b5f413bf328 792 amqp_batch_result result;
AzureIoTClient 0:1b5f413bf328 793 if (DList_IsListEmpty(waitingToSend))
AzureIoTClient 0:1b5f413bf328 794 {
AzureIoTClient 0:1b5f413bf328 795 result = amqp_batch_nowork;
AzureIoTClient 0:1b5f413bf328 796 }
AzureIoTClient 0:1b5f413bf328 797 else
AzureIoTClient 0:1b5f413bf328 798 {
AzureIoTClient 0:1b5f413bf328 799 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_074: [If there is a waitingToSend entry remove it from the list.]*/
AzureIoTClient 0:1b5f413bf328 800 IOTHUB_MESSAGE_LIST* currentMessage;
AzureIoTClient 0:1b5f413bf328 801 IOTHUBMESSAGE_CONTENT_TYPE contentType;
AzureIoTClient 0:1b5f413bf328 802 size_t messageLength;
AzureIoTClient 0:1b5f413bf328 803 const unsigned char* messagePayload;
AzureIoTClient 0:1b5f413bf328 804 DList_InitializeListHead(messagesMakingUpBatch);
AzureIoTClient 0:1b5f413bf328 805 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_086: [If the pn_messenger_put succeeds then DoWork will save off a tracker obtained by invoking pn_messenger_outgoing_tracker and save it the head of the availableWorkItems. The eventMessages in the AMQP_WORK_ITEM head will also contain the IOTHUB_MESSAGE_LIST.]*/
AzureIoTClient 0:1b5f413bf328 806 DList_InsertTailList(messagesMakingUpBatch, DList_RemoveHeadList(waitingToSend));
AzureIoTClient 0:1b5f413bf328 807 currentMessage = containingRecord(messagesMakingUpBatch->Flink, IOTHUB_MESSAGE_LIST, entry);
AzureIoTClient 0:1b5f413bf328 808 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_075: [Get its size and payload address by calling IoTHubMessage_GetByteArray.]*/
AzureIoTClient 0:1b5f413bf328 809 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_008: [If the message type is IOTHUBMESSAGE_STRING then get the data by using IoTHubMesage_GetString and set the size to the length of the string.] */
AzureIoTClient 0:1b5f413bf328 810 contentType = IoTHubMessage_GetContentType(currentMessage->messageHandle);
AzureIoTClient 0:1b5f413bf328 811 if (!(
AzureIoTClient 0:1b5f413bf328 812 ((contentType == IOTHUBMESSAGE_BYTEARRAY) && (IoTHubMessage_GetByteArray(currentMessage->messageHandle, &messagePayload, &messageLength) == IOTHUB_MESSAGE_OK)) ||
AzureIoTClient 0:1b5f413bf328 813 ((contentType == IOTHUBMESSAGE_STRING) && (
AzureIoTClient 0:1b5f413bf328 814 messagePayload = (const unsigned char*)IoTHubMessage_GetString(currentMessage->messageHandle),
AzureIoTClient 0:1b5f413bf328 815 (messageLength = (messagePayload == NULL) ? 0 : strlen((const char*)messagePayload)),
AzureIoTClient 0:1b5f413bf328 816 messagePayload != NULL)
AzureIoTClient 0:1b5f413bf328 817 )
AzureIoTClient 0:1b5f413bf328 818 ))
AzureIoTClient 0:1b5f413bf328 819 {
AzureIoTClient 0:1b5f413bf328 820 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_076: [If that fails then fail that event.]*/
AzureIoTClient 0:1b5f413bf328 821 LogError("Failure in getting message content. Type had decimal value = %d\r\n", contentType);
AzureIoTClient 0:1b5f413bf328 822 result = amqp_batch_error;
AzureIoTClient 0:1b5f413bf328 823 }
AzureIoTClient 0:1b5f413bf328 824 else if (messageLength > maximumPayload)
AzureIoTClient 0:1b5f413bf328 825 {
AzureIoTClient 0:1b5f413bf328 826 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_077: [If the size of the payload is greater than the maximum (currently 256*1024) fail that event.]*/
AzureIoTClient 0:1b5f413bf328 827 LogError("Batch length is too large.\r\n");
AzureIoTClient 0:1b5f413bf328 828 result = amqp_batch_error;
AzureIoTClient 0:1b5f413bf328 829 }
AzureIoTClient 0:1b5f413bf328 830 else
AzureIoTClient 0:1b5f413bf328 831 {
AzureIoTClient 0:1b5f413bf328 832 *payloadSize = messageLength;
AzureIoTClient 0:1b5f413bf328 833 *payload = messagePayload;
AzureIoTClient 0:1b5f413bf328 834 result = amqp_batch_nobatch;
AzureIoTClient 0:1b5f413bf328 835 }
AzureIoTClient 0:1b5f413bf328 836 }
AzureIoTClient 0:1b5f413bf328 837 return result;
AzureIoTClient 0:1b5f413bf328 838 }
AzureIoTClient 0:1b5f413bf328 839
AzureIoTClient 0:1b5f413bf328 840 static bool prepareSimpleProtonMessage(pn_message_t* message, STRING_HANDLE endpoint, size_t payloadLength, const unsigned char* payload)
AzureIoTClient 0:1b5f413bf328 841 {
AzureIoTClient 0:1b5f413bf328 842 bool result = false;
AzureIoTClient 0:1b5f413bf328 843 pn_data_t * body;
AzureIoTClient 0:1b5f413bf328 844
AzureIoTClient 0:1b5f413bf328 845 pn_message_clear(message);
AzureIoTClient 0:1b5f413bf328 846 if (pn_message_set_address(message, STRING_c_str(endpoint)) != 0)
AzureIoTClient 0:1b5f413bf328 847 {
AzureIoTClient 0:1b5f413bf328 848 LogError("Unable to set amqp address for proton message.\r\n");
AzureIoTClient 0:1b5f413bf328 849 }
AzureIoTClient 0:1b5f413bf328 850 else if (pn_message_set_inferred(message, true) != 0)
AzureIoTClient 0:1b5f413bf328 851 {
AzureIoTClient 0:1b5f413bf328 852 LogError("Unable to set inferred to true for proton message.\r\n");
AzureIoTClient 0:1b5f413bf328 853 }
AzureIoTClient 0:1b5f413bf328 854 else if ((body = pn_message_body(message)) == NULL)
AzureIoTClient 0:1b5f413bf328 855 {
AzureIoTClient 0:1b5f413bf328 856 LogError("Unable to set proton message body.\r\n");
AzureIoTClient 0:1b5f413bf328 857 }
AzureIoTClient 0:1b5f413bf328 858 else if (pn_data_put_binary(body, pn_bytes(payloadLength, (const char*)payload)) != 0)
AzureIoTClient 0:1b5f413bf328 859 {
AzureIoTClient 0:1b5f413bf328 860 LogError("Unable to set binary data for message body.\r\n");
AzureIoTClient 0:1b5f413bf328 861 }
AzureIoTClient 0:1b5f413bf328 862 else
AzureIoTClient 0:1b5f413bf328 863 {
AzureIoTClient 0:1b5f413bf328 864 result = true;
AzureIoTClient 0:1b5f413bf328 865 }
AzureIoTClient 0:1b5f413bf328 866
AzureIoTClient 0:1b5f413bf328 867 return result;
AzureIoTClient 0:1b5f413bf328 868 }
AzureIoTClient 0:1b5f413bf328 869
AzureIoTClient 0:1b5f413bf328 870 static void rollbackEvent(PAMQP_TRANSPORT_STATE transportState)
AzureIoTClient 0:1b5f413bf328 871 {
AzureIoTClient 0:1b5f413bf328 872 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_175: [The list of work in progress is tested to see if it is empty.]*/
AzureIoTClient 0:1b5f413bf328 873 while (!DList_IsListEmpty(&transportState->workInProgress))
AzureIoTClient 0:1b5f413bf328 874 {
AzureIoTClient 0:1b5f413bf328 875 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_176: [If it is not empty then the list is traversed.]*/
AzureIoTClient 0:1b5f413bf328 876 int settleResult;
AzureIoTClient 0:1b5f413bf328 877 int protonResult;
AzureIoTClient 0:1b5f413bf328 878 PDLIST_ENTRY newestEntry = transportState->workInProgress.Blink;
AzureIoTClient 0:1b5f413bf328 879 PAMQP_WORK_ITEM currentWork = containingRecord(newestEntry, AMQP_WORK_ITEM, link);
AzureIoTClient 0:1b5f413bf328 880 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_179: [The status of the work item is acquired.]*/
AzureIoTClient 0:1b5f413bf328 881 protonResult = pn_messenger_status(transportState->messenger, currentWork->tracker);
AzureIoTClient 0:1b5f413bf328 882 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_180: [The item is settled.]*/
AzureIoTClient 0:1b5f413bf328 883 if ((settleResult = pn_messenger_settle(transportState->messenger, currentWork->tracker, 0)) != 0)
AzureIoTClient 0:1b5f413bf328 884 {
AzureIoTClient 0:1b5f413bf328 885 LogError("Attempt to settle a tracker produced an error: %d\r\n", settleResult);
AzureIoTClient 0:1b5f413bf328 886 }
AzureIoTClient 0:1b5f413bf328 887 if ((protonResult == PN_STATUS_ACCEPTED) ||
AzureIoTClient 0:1b5f413bf328 888 (protonResult == PN_STATUS_REJECTED) ||
AzureIoTClient 0:1b5f413bf328 889 (protonResult == PN_STATUS_RELEASED))
AzureIoTClient 0:1b5f413bf328 890 {
AzureIoTClient 0:1b5f413bf328 891 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_182: [Items that have been settled by the service will have their IOTHUB_MESSAGE_LIST completed.]*/
AzureIoTClient 0:1b5f413bf328 892 IoTHubClient_LL_SendComplete(transportState->savedClientHandle, &currentWork->eventMessages, (protonResult == PN_STATUS_ACCEPTED) ? IOTHUB_BATCHSTATE_SUCCESS : IOTHUB_BATCHSTATE_FAILED);
AzureIoTClient 0:1b5f413bf328 893 }
AzureIoTClient 0:1b5f413bf328 894 else
AzureIoTClient 0:1b5f413bf328 895 {
AzureIoTClient 0:1b5f413bf328 896 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_178: [Items that are not settled by the service will have their IOTHUB_MESSAGE_LIST removed from the work item and placed back on the waiting to send.]*/
AzureIoTClient 0:1b5f413bf328 897 putSecondListAfterHeadOfFirst(transportState->waitingToSend->Flink, &currentWork->eventMessages);
AzureIoTClient 0:1b5f413bf328 898 }
AzureIoTClient 0:1b5f413bf328 899 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_181: [The work item is removed from the in progress list and placed on the available list.]*/
AzureIoTClient 0:1b5f413bf328 900 DList_RemoveEntryList(newestEntry);
AzureIoTClient 0:1b5f413bf328 901 DList_InsertTailList(&transportState->availableWorkItems, newestEntry);
AzureIoTClient 0:1b5f413bf328 902 }
AzureIoTClient 0:1b5f413bf328 903 }
AzureIoTClient 0:1b5f413bf328 904 static void reclaimEventResources(PAMQP_TRANSPORT_STATE transportState)
AzureIoTClient 0:1b5f413bf328 905 {
AzureIoTClient 0:1b5f413bf328 906 PDLIST_ENTRY currentListEntry;
AzureIoTClient 0:1b5f413bf328 907 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_064: [Obtain the head of the workInProgress]*/
AzureIoTClient 0:1b5f413bf328 908 currentListEntry = transportState->workInProgress.Flink;
AzureIoTClient 0:1b5f413bf328 909 while (currentListEntry != &transportState->workInProgress)
AzureIoTClient 0:1b5f413bf328 910 {
AzureIoTClient 0:1b5f413bf328 911 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_065: [While head != listhead reclaimEventResources will do as follows.] */
AzureIoTClient 0:1b5f413bf328 912 int protonResult;
AzureIoTClient 0:1b5f413bf328 913 PAMQP_WORK_ITEM currentWork = containingRecord(currentListEntry, AMQP_WORK_ITEM, link);
AzureIoTClient 0:1b5f413bf328 914 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_066: [Check the current status via pn_messenger_status via the tracker stored in the AMQP_WORK_ITEM.]*/
AzureIoTClient 0:1b5f413bf328 915 if ((protonResult = pn_messenger_status(transportState->messenger, currentWork->tracker)) == PN_STATUS_PENDING)
AzureIoTClient 0:1b5f413bf328 916 {
AzureIoTClient 0:1b5f413bf328 917 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_183: [if the amount of time that the work item has been pending exceeds a timeout stored in the work item, the messenger will be marked as not initialized.]*/
AzureIoTClient 0:1b5f413bf328 918 size_t secondsSinceTheEpoch = (size_t)(difftime(get_time(NULL), EPOCH_TIME_T_VALUE) + 0);
AzureIoTClient 0:1b5f413bf328 919 if (currentWork->expiry < secondsSinceTheEpoch)
AzureIoTClient 0:1b5f413bf328 920 {
AzureIoTClient 0:1b5f413bf328 921 LogError("A event operation timed out.\r\n");
AzureIoTClient 0:1b5f413bf328 922 transportState->messengerInitialized = false;
AzureIoTClient 0:1b5f413bf328 923 }
AzureIoTClient 0:1b5f413bf328 924 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_067: [reclaimEventResources shall go onto the next work item if the proton status is PN_STATUS_PENDING.]*/
AzureIoTClient 0:1b5f413bf328 925 currentListEntry = currentListEntry->Flink;
AzureIoTClient 0:1b5f413bf328 926 }
AzureIoTClient 0:1b5f413bf328 927 else
AzureIoTClient 0:1b5f413bf328 928 {
AzureIoTClient 0:1b5f413bf328 929 DLIST_ENTRY savedFromCurrentListEntry;
AzureIoTClient 0:1b5f413bf328 930 int settleResult;
AzureIoTClient 0:1b5f413bf328 931 // Yay! It's finished. Take it off the in progress list.
AzureIoTClient 0:1b5f413bf328 932 savedFromCurrentListEntry.Flink = currentListEntry->Flink; // We need to save this because it will be stomped on when we remove it from the work in progress.
AzureIoTClient 0:1b5f413bf328 933
AzureIoTClient 0:1b5f413bf328 934 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_068: [Otherwise reclaimEventResources will use the result of IOTHUB_BATCHSTATE_SUCCESS if the proton status was PN_STATUS_ACCEPTED.]*/
AzureIoTClient 0:1b5f413bf328 935 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_069: [Otherwise reclaimEventResources will use the status of IOTHUB_BATCHSTATE_FAILED if the proton status was NOT PN_STATUS_ACCEPTED.] */
AzureIoTClient 0:1b5f413bf328 936 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_070: [reclaimEventResources will invoke IotHubClient_SendBatchComplete with the IOTHUB_CLIENT_LL_HANDLE, supplied to reclaimEventResources, also pass the eventMessages stored in the AMQP_WORK_ITEM, and finally pass the above IOTHUB_BATCH status.]*/
AzureIoTClient 0:1b5f413bf328 937 IoTHubClient_LL_SendComplete(transportState->savedClientHandle, &currentWork->eventMessages, (protonResult == PN_STATUS_ACCEPTED) ? IOTHUB_BATCHSTATE_SUCCESS : IOTHUB_BATCHSTATE_FAILED);
AzureIoTClient 0:1b5f413bf328 938 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_090: [reclaimEventResources will release the tracker by invoking pn_messenger_settle.]*/
AzureIoTClient 0:1b5f413bf328 939 if ((settleResult = pn_messenger_settle(transportState->messenger, currentWork->tracker, 0)) != 0)
AzureIoTClient 0:1b5f413bf328 940 {
AzureIoTClient 0:1b5f413bf328 941 LogError("Attempt to settle a tracker produced an error: %d\r\n", settleResult);
AzureIoTClient 0:1b5f413bf328 942 }
AzureIoTClient 0:1b5f413bf328 943 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_071: [reclaimEventResources will take the current work item from the workInProgress list and insert it on the tail of the availableWorkItems list.]*/
AzureIoTClient 0:1b5f413bf328 944 DList_RemoveEntryList(currentListEntry);
AzureIoTClient 0:1b5f413bf328 945 DList_InsertTailList(&transportState->availableWorkItems, currentListEntry);
AzureIoTClient 0:1b5f413bf328 946 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_072: [reclaimEventResources will continue to iterate over the workInProgress list until it returns to the list head.]*/
AzureIoTClient 0:1b5f413bf328 947 currentListEntry = savedFromCurrentListEntry.Flink;
AzureIoTClient 0:1b5f413bf328 948 }
AzureIoTClient 0:1b5f413bf328 949 }
AzureIoTClient 0:1b5f413bf328 950 }
AzureIoTClient 0:1b5f413bf328 951
AzureIoTClient 0:1b5f413bf328 952 static void sendEvent(PAMQP_TRANSPORT_STATE transportState)
AzureIoTClient 0:1b5f413bf328 953 {
AzureIoTClient 0:1b5f413bf328 954 while (!DList_IsListEmpty(&transportState->availableWorkItems))
AzureIoTClient 0:1b5f413bf328 955 {
AzureIoTClient 0:1b5f413bf328 956 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_073: [While the availableWorkItems is NOT empty loop.] */
AzureIoTClient 0:1b5f413bf328 957 PAMQP_WORK_ITEM headOfAvailable = containingRecord(transportState->availableWorkItems.Flink, AMQP_WORK_ITEM, link);
AzureIoTClient 0:1b5f413bf328 958 size_t payloadSize;
AzureIoTClient 0:1b5f413bf328 959 const unsigned char* payloadAddress;
AzureIoTClient 0:1b5f413bf328 960 amqp_batch_result amqpBatchResult;
AzureIoTClient 0:1b5f413bf328 961 amqpBatchResult = prepareBatch(MAXIMUM_EVENT_LENGTH, transportState->waitingToSend, &headOfAvailable->eventMessages, &payloadSize, &payloadAddress);
AzureIoTClient 0:1b5f413bf328 962 if (amqpBatchResult == amqp_batch_nowork)
AzureIoTClient 0:1b5f413bf328 963 {
AzureIoTClient 0:1b5f413bf328 964 // no work to do. Stop trying.
AzureIoTClient 2:1e6040e0c035 965 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_089: [If there is nothing on the list then go on to the message.]*/
AzureIoTClient 0:1b5f413bf328 966 break;
AzureIoTClient 0:1b5f413bf328 967 }
AzureIoTClient 0:1b5f413bf328 968 else if (amqpBatchResult == amqp_batch_error)
AzureIoTClient 0:1b5f413bf328 969 {
AzureIoTClient 0:1b5f413bf328 970 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_078: [If the event failed call invoke IotHubClient_SendBatchComplete as above utilizing the eventMessages list of the head of the availableWorkItems and a status of IOTHUB_BATCHSTATE_FAILED.]*/
AzureIoTClient 0:1b5f413bf328 971 IoTHubClient_LL_SendComplete(transportState->savedClientHandle, &headOfAvailable->eventMessages, IOTHUB_BATCHSTATE_FAILED);
AzureIoTClient 0:1b5f413bf328 972 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_079: [Continue this send loop.]*/
AzureIoTClient 0:1b5f413bf328 973 }
AzureIoTClient 0:1b5f413bf328 974 else
AzureIoTClient 0:1b5f413bf328 975 {
AzureIoTClient 0:1b5f413bf328 976 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_080: [If the event did not fail then prepare a proton message using the following proton API: pn_message_set_address, pn_message_set_inferred, pn_message_body and pn_data_put_binary with the above saved size and address values.]*/
AzureIoTClient 0:1b5f413bf328 977 if (prepareSimpleProtonMessage(transportState->message, transportState->eventAddress, payloadSize, payloadAddress) == false)
AzureIoTClient 0:1b5f413bf328 978 {
AzureIoTClient 0:1b5f413bf328 979 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_081: [sendEvent will take the item that had been the head of the waitingToSend and put it back at the head of the waitingToSend list.]*/
AzureIoTClient 0:1b5f413bf328 980 putSecondListAfterHeadOfFirst(transportState->waitingToSend, &headOfAvailable->eventMessages);
AzureIoTClient 0:1b5f413bf328 981 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_082: [sendEvent will then break out of the send loop.]*/
AzureIoTClient 0:1b5f413bf328 982 break;
AzureIoTClient 0:1b5f413bf328 983 }
AzureIoTClient 0:1b5f413bf328 984 else if (setProperties(&headOfAvailable->eventMessages, transportState->message) != 0)
AzureIoTClient 0:1b5f413bf328 985 {
AzureIoTClient 0:1b5f413bf328 986 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_081: [sendEvent will take the item that had been the head of the waitingToSend and put it back at the head of the waitingToSend list.]*/
AzureIoTClient 0:1b5f413bf328 987 putSecondListAfterHeadOfFirst(transportState->waitingToSend, &headOfAvailable->eventMessages);
AzureIoTClient 0:1b5f413bf328 988 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_082: [sendEvent will then break out of the send loop.]*/
AzureIoTClient 0:1b5f413bf328 989 break;
AzureIoTClient 0:1b5f413bf328 990 }
AzureIoTClient 0:1b5f413bf328 991 else
AzureIoTClient 0:1b5f413bf328 992 {
AzureIoTClient 0:1b5f413bf328 993 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_083: [If the proton API is successful then invoke pn_messenger_put.]*/
AzureIoTClient 0:1b5f413bf328 994 int protonResult;
AzureIoTClient 0:1b5f413bf328 995 protonResult = pn_messenger_put(transportState->messenger, transportState->message);
AzureIoTClient 0:1b5f413bf328 996 if (protonResult != 0)
AzureIoTClient 0:1b5f413bf328 997 {
AzureIoTClient 0:1b5f413bf328 998 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_084: [If that fails, sendEvent will take the item that had been the head of the waitingToSend and put it back at the head of the waitingToSend list.]*/
AzureIoTClient 0:1b5f413bf328 999 putSecondListAfterHeadOfFirst(transportState->waitingToSend, &headOfAvailable->eventMessages);
AzureIoTClient 0:1b5f413bf328 1000 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_085: [sendEvent will then break out of the send loop.]*/
AzureIoTClient 0:1b5f413bf328 1001 transportState->messengerInitialized = false;
AzureIoTClient 0:1b5f413bf328 1002 break;
AzureIoTClient 0:1b5f413bf328 1003 }
AzureIoTClient 0:1b5f413bf328 1004 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_184: [The work item will also contain a timeout that shall denote how long the transport will wait before initiating error recovery.]*/
AzureIoTClient 0:1b5f413bf328 1005 headOfAvailable->expiry = (size_t)(difftime(get_time(NULL), EPOCH_TIME_T_VALUE) + transportState->eventTimeout);
AzureIoTClient 0:1b5f413bf328 1006 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_086: [If the pn_messenger_put succeeds then sendEvent will save off a tracker obtained by invoking pn_messenger_outgoing_tracker and save it the head of the availableWorkItems. The eventMessages in the AMQP_WORK_ITEM head will also contain the IOTHUB_MESSAGE_LIST.]*/
AzureIoTClient 0:1b5f413bf328 1007 headOfAvailable->tracker = pn_messenger_outgoing_tracker(transportState->messenger);
AzureIoTClient 0:1b5f413bf328 1008 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_087: [The head of the availableWorkItems will be removed and placed on the workInProgress list.]*/
AzureIoTClient 0:1b5f413bf328 1009 DList_InsertTailList(&transportState->workInProgress, DList_RemoveHeadList(&transportState->availableWorkItems));
AzureIoTClient 0:1b5f413bf328 1010 }
AzureIoTClient 0:1b5f413bf328 1011 }
AzureIoTClient 0:1b5f413bf328 1012 }
AzureIoTClient 0:1b5f413bf328 1013 }
AzureIoTClient 0:1b5f413bf328 1014 static void processCBSReply(PAMQP_TRANSPORT_STATE transportState, pn_tracker_t tracker)
AzureIoTClient 0:1b5f413bf328 1015 {
AzureIoTClient 0:1b5f413bf328 1016 bool goodMessage = false;
AzureIoTClient 0:1b5f413bf328 1017 pn_atom_t correlationId;
AzureIoTClient 0:1b5f413bf328 1018
AzureIoTClient 0:1b5f413bf328 1019 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_112: [The correlationId message property is obtained via a call to pn_message_get_correlation_id.]*/
AzureIoTClient 0:1b5f413bf328 1020 correlationId = pn_message_get_correlation_id(transportState->message);
AzureIoTClient 0:1b5f413bf328 1021
AzureIoTClient 0:1b5f413bf328 1022 //
AzureIoTClient 0:1b5f413bf328 1023 // If it isn't the one we're looking for, simply accept it, because we don't want to see it and we can't do anything with it.
AzureIoTClient 0:1b5f413bf328 1024 //
AzureIoTClient 0:1b5f413bf328 1025
AzureIoTClient 0:1b5f413bf328 1026 if (correlationId.u.as_ulong != transportState->sendTokenMessageId)
AzureIoTClient 0:1b5f413bf328 1027 {
AzureIoTClient 0:1b5f413bf328 1028 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_113: [The id is compared to the state variable sendTokenMessageId. If there is no match, an error is logged, the message is abandoned and processCBSReply returns.]*/
AzureIoTClient 0:1b5f413bf328 1029 LogError("An old reply from a CBS renewal drifed in. Moving on\r\n");
AzureIoTClient 0:1b5f413bf328 1030 }
AzureIoTClient 0:1b5f413bf328 1031 else
AzureIoTClient 0:1b5f413bf328 1032 {
AzureIoTClient 0:1b5f413bf328 1033 //
AzureIoTClient 0:1b5f413bf328 1034 // This is the response we're looking for!
AzureIoTClient 0:1b5f413bf328 1035 //
AzureIoTClient 0:1b5f413bf328 1036
AzureIoTClient 0:1b5f413bf328 1037 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_114: [Obtain the properties of the message by invoking pn_message_properties.]*/
AzureIoTClient 0:1b5f413bf328 1038 pn_data_t* propertyData = pn_message_properties(transportState->message);
AzureIoTClient 0:1b5f413bf328 1039
AzureIoTClient 0:1b5f413bf328 1040 transportState->waitingForPutTokenReply = false;
AzureIoTClient 0:1b5f413bf328 1041 if (propertyData == NULL)
AzureIoTClient 0:1b5f413bf328 1042 {
AzureIoTClient 0:1b5f413bf328 1043 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_115: [If NULL then abandon the message and return.]*/
AzureIoTClient 0:1b5f413bf328 1044 LogError("Invalid response back from the CBS - no application properties\r\n");
AzureIoTClient 0:1b5f413bf328 1045 }
AzureIoTClient 0:1b5f413bf328 1046 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_116: [Advance to the first property of the message via pn_data_next.]*/
AzureIoTClient 0:1b5f413bf328 1047 else if (pn_data_next(propertyData) == false)
AzureIoTClient 0:1b5f413bf328 1048 {
AzureIoTClient 0:1b5f413bf328 1049 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_117: [If this fails, abandon and return.]*/
AzureIoTClient 0:1b5f413bf328 1050 LogError("Invalid response back from the CBS - application properties malformed\r\n");
AzureIoTClient 0:1b5f413bf328 1051 }
AzureIoTClient 0:1b5f413bf328 1052 else
AzureIoTClient 0:1b5f413bf328 1053 {
AzureIoTClient 0:1b5f413bf328 1054 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_118: [Invoke pn_data_get_map to obtain the number of properties.]*/
AzureIoTClient 0:1b5f413bf328 1055 size_t numberOfProperties = pn_data_get_map(propertyData) / 2;
AzureIoTClient 0:1b5f413bf328 1056 if (numberOfProperties < 1) // We MUST have at least the status property
AzureIoTClient 0:1b5f413bf328 1057 {
AzureIoTClient 0:1b5f413bf328 1058 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_119: [If the number of properties is 0 then abandon and return.]*/
AzureIoTClient 0:1b5f413bf328 1059 LogError("Invalid response back from the CBS - invalid number of properties: %zu\r\n", pn_data_get_map(propertyData));
AzureIoTClient 0:1b5f413bf328 1060 }
AzureIoTClient 0:1b5f413bf328 1061 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_120: [Enter the property map by invoking pn_data_enter.]*/
AzureIoTClient 0:1b5f413bf328 1062 else if (pn_data_enter(propertyData) == false)
AzureIoTClient 0:1b5f413bf328 1063 {
AzureIoTClient 0:1b5f413bf328 1064 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_121: [If that fails then abandon and return.]*/
AzureIoTClient 0:1b5f413bf328 1065 LogError("Invalid response back from the CBS - unable to access the sent properties\r\n");
AzureIoTClient 0:1b5f413bf328 1066 }
AzureIoTClient 0:1b5f413bf328 1067 else
AzureIoTClient 0:1b5f413bf328 1068 {
AzureIoTClient 0:1b5f413bf328 1069 pn_bytes_t propertyName;
AzureIoTClient 0:1b5f413bf328 1070 int32_t resultStatus;
AzureIoTClient 0:1b5f413bf328 1071 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_122: [Get the property name.]*/
AzureIoTClient 0:1b5f413bf328 1072 if (getMapString(propertyData, &propertyName) != 0)
AzureIoTClient 0:1b5f413bf328 1073 {
AzureIoTClient 0:1b5f413bf328 1074 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_123: [If that fails then abandon and return.]*/
AzureIoTClient 0:1b5f413bf328 1075 LogError("Unable to acquire the status value from the cbs request\r\n");
AzureIoTClient 0:1b5f413bf328 1076 }
AzureIoTClient 0:1b5f413bf328 1077 else if (strcmp(propertyName.start, PROTON_MAP_STATUS_CODE) != 0)
AzureIoTClient 0:1b5f413bf328 1078 {
AzureIoTClient 0:1b5f413bf328 1079 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_124: [If the property name is not "status-code" than abandon and return.]*/
AzureIoTClient 0:1b5f413bf328 1080 LogError("Unknown application property returned: %s\r\n", propertyName.start);
AzureIoTClient 0:1b5f413bf328 1081 }
AzureIoTClient 0:1b5f413bf328 1082 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_125: [Get the actual status code.]*/
AzureIoTClient 0:1b5f413bf328 1083 else if (getMapInt(propertyData, &resultStatus) != 0)
AzureIoTClient 0:1b5f413bf328 1084 {
AzureIoTClient 0:1b5f413bf328 1085 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_126: [If that fails then abandon and return.]*/
AzureIoTClient 0:1b5f413bf328 1086 LogError("Unable to acquire the actual status of the CBS put token\r\n");
AzureIoTClient 0:1b5f413bf328 1087 }
AzureIoTClient 0:1b5f413bf328 1088 else if (resultStatus != 200)
AzureIoTClient 0:1b5f413bf328 1089 {
AzureIoTClient 0:1b5f413bf328 1090 pn_bytes_t statusDescription;
AzureIoTClient 0:1b5f413bf328 1091 //
AzureIoTClient 0:1b5f413bf328 1092 // Yeah, it's not what we want to hear but it's not as though anything was wrong with
AzureIoTClient 0:1b5f413bf328 1093 // the message.
AzureIoTClient 0:1b5f413bf328 1094 //
AzureIoTClient 0:1b5f413bf328 1095 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_127: [If the status code is not equal to 200 then attempt to acquire the explanation from the properties.]*/
AzureIoTClient 0:1b5f413bf328 1096 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_128: [Accept the message and return.]*/
AzureIoTClient 0:1b5f413bf328 1097 goodMessage = true;
AzureIoTClient 0:1b5f413bf328 1098 LogError("The CBS put token operation failed: %d\r\n", resultStatus);
AzureIoTClient 0:1b5f413bf328 1099 if (numberOfProperties >= 2)
AzureIoTClient 0:1b5f413bf328 1100 {
AzureIoTClient 0:1b5f413bf328 1101 //
AzureIoTClient 0:1b5f413bf328 1102 // Well we have an error. See if the next property is the status description.
AzureIoTClient 0:1b5f413bf328 1103 // If it is, then log the value as further explanation
AzureIoTClient 0:1b5f413bf328 1104 //
AzureIoTClient 0:1b5f413bf328 1105 if (getMapString(propertyData, &propertyName) == 0)
AzureIoTClient 0:1b5f413bf328 1106 {
AzureIoTClient 0:1b5f413bf328 1107 if (strcmp(propertyName.start, PROTON_MAP_STATUS_DESCRIPTION) == 0)
AzureIoTClient 0:1b5f413bf328 1108 {
AzureIoTClient 0:1b5f413bf328 1109 if (getMapString(propertyData, &statusDescription) == 0)
AzureIoTClient 0:1b5f413bf328 1110 {
AzureIoTClient 0:1b5f413bf328 1111 LogError("The CBS put token operation failed due to: %s\r\n", statusDescription.start);
AzureIoTClient 0:1b5f413bf328 1112 }
AzureIoTClient 0:1b5f413bf328 1113 }
AzureIoTClient 0:1b5f413bf328 1114 }
AzureIoTClient 0:1b5f413bf328 1115 }
AzureIoTClient 0:1b5f413bf328 1116 }
AzureIoTClient 0:1b5f413bf328 1117 else
AzureIoTClient 0:1b5f413bf328 1118 {
AzureIoTClient 0:1b5f413bf328 1119 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_129: [If the status code was 200 then set the state variable putTokenWasSuccessful, accept the message and return.]*/
AzureIoTClient 0:1b5f413bf328 1120 goodMessage = true;
AzureIoTClient 0:1b5f413bf328 1121 transportState->putTokenWasSuccessful = true;
AzureIoTClient 0:1b5f413bf328 1122 }
AzureIoTClient 0:1b5f413bf328 1123 }
AzureIoTClient 0:1b5f413bf328 1124 }
AzureIoTClient 0:1b5f413bf328 1125 }
AzureIoTClient 0:1b5f413bf328 1126 if (goodMessage == true)
AzureIoTClient 0:1b5f413bf328 1127 {
AzureIoTClient 0:1b5f413bf328 1128 pn_messenger_accept(transportState->messenger, tracker, 0);
AzureIoTClient 0:1b5f413bf328 1129 }
AzureIoTClient 0:1b5f413bf328 1130 else
AzureIoTClient 0:1b5f413bf328 1131 {
AzureIoTClient 0:1b5f413bf328 1132 pn_messenger_release(transportState->messenger, tracker, 0);
AzureIoTClient 0:1b5f413bf328 1133 }
AzureIoTClient 0:1b5f413bf328 1134 pn_messenger_settle(transportState->messenger, tracker, 0);
AzureIoTClient 0:1b5f413bf328 1135 }
AzureIoTClient 0:1b5f413bf328 1136
AzureIoTClient 2:1e6040e0c035 1137 static void processMessage(PAMQP_TRANSPORT_STATE transportState, pn_tracker_t tracker)
AzureIoTClient 0:1b5f413bf328 1138 {
AzureIoTClient 2:1e6040e0c035 1139 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_111: [If the state variable DoWork_PullMessage is false then the message shall be rejected and processMessage shall return.]*/
AzureIoTClient 2:1e6040e0c035 1140 if (transportState->DoWork_PullMessages == false)
AzureIoTClient 0:1b5f413bf328 1141 {
AzureIoTClient 0:1b5f413bf328 1142 //
AzureIoTClient 0:1b5f413bf328 1143 // Nothing to do with the message. We reject it. In that way it can come back later.
AzureIoTClient 0:1b5f413bf328 1144 //
AzureIoTClient 0:1b5f413bf328 1145 pn_messenger_reject(transportState->messenger, tracker, 0);
AzureIoTClient 0:1b5f413bf328 1146 pn_messenger_settle(transportState->messenger, tracker, 0);
AzureIoTClient 0:1b5f413bf328 1147 }
AzureIoTClient 0:1b5f413bf328 1148 else
AzureIoTClient 0:1b5f413bf328 1149 {
AzureIoTClient 0:1b5f413bf328 1150 pn_data_t *body;
AzureIoTClient 0:1b5f413bf328 1151
AzureIoTClient 2:1e6040e0c035 1152 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_050: [processMessage will acquire the body of the message by invoking pn_message_body.]*/
AzureIoTClient 0:1b5f413bf328 1153 if ((body = pn_message_body(transportState->message)) == NULL)
AzureIoTClient 0:1b5f413bf328 1154 {
AzureIoTClient 2:1e6040e0c035 1155 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_051: [If the body is NULL then the message will be abandoned.]*/
AzureIoTClient 0:1b5f413bf328 1156 LogError("Failed to get the proton message body\r\n");
AzureIoTClient 0:1b5f413bf328 1157 pn_messenger_release(transportState->messenger, tracker, 0);
AzureIoTClient 0:1b5f413bf328 1158 pn_messenger_settle(transportState->messenger, tracker, 0);
AzureIoTClient 0:1b5f413bf328 1159 }
AzureIoTClient 0:1b5f413bf328 1160 else
AzureIoTClient 0:1b5f413bf328 1161 {
AzureIoTClient 0:1b5f413bf328 1162 pn_bytes_t sizeAndData;
AzureIoTClient 2:1e6040e0c035 1163 bool keepProcessingThisMessage = true;
AzureIoTClient 2:1e6040e0c035 1164 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_054: [processMessage will check for a null (note the lowercase) body by invoking pn_data_next.]*/
AzureIoTClient 0:1b5f413bf328 1165 if (pn_data_next(body) == false)
AzureIoTClient 0:1b5f413bf328 1166 {
AzureIoTClient 0:1b5f413bf328 1167 //
AzureIoTClient 0:1b5f413bf328 1168 // We have a null body. This is reasonable. It should also be pointed out that
AzureIoTClient 0:1b5f413bf328 1169 // pn_data_next call shouldn't be thought of as checking for the existence of something.
AzureIoTClient 0:1b5f413bf328 1170 // It should be thought of as actually MOVING through a tree structure in the message.
AzureIoTClient 0:1b5f413bf328 1171 //
AzureIoTClient 0:1b5f413bf328 1172 sizeAndData.size = 0;
AzureIoTClient 0:1b5f413bf328 1173 sizeAndData.start = NULL;
AzureIoTClient 0:1b5f413bf328 1174 }
AzureIoTClient 0:1b5f413bf328 1175 else
AzureIoTClient 0:1b5f413bf328 1176 {
AzureIoTClient 2:1e6040e0c035 1177 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_055: [If the body is NOT null then processMessage will check that the body is of type PN_BINARY.]*/
AzureIoTClient 0:1b5f413bf328 1178 if (PN_BINARY != pn_data_type(body))
AzureIoTClient 0:1b5f413bf328 1179 {
AzureIoTClient 2:1e6040e0c035 1180 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_056: [processMessage will abandon the message with body types that are NOT PN_BINARY, the IOTHUB_MESSAGE_HANDLE will be destroyed and the loop will be continued.]*/
AzureIoTClient 2:1e6040e0c035 1181 LogError("Message received via AMQP was not PN_BINARY\r\n");
AzureIoTClient 0:1b5f413bf328 1182 sizeAndData.size = 0; // Compilers get upset.
AzureIoTClient 0:1b5f413bf328 1183 sizeAndData.start = NULL;
AzureIoTClient 0:1b5f413bf328 1184 pn_messenger_release(transportState->messenger, tracker, 0);
AzureIoTClient 0:1b5f413bf328 1185 pn_messenger_settle(transportState->messenger, tracker, 0);
AzureIoTClient 2:1e6040e0c035 1186 keepProcessingThisMessage = false;
AzureIoTClient 0:1b5f413bf328 1187 }
AzureIoTClient 0:1b5f413bf328 1188 else
AzureIoTClient 0:1b5f413bf328 1189 {
AzureIoTClient 0:1b5f413bf328 1190 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_057: [The length and a pointer to the payload will be acquired and saved by invoking pn_data_get_binary.]*/
AzureIoTClient 0:1b5f413bf328 1191 sizeAndData = pn_data_get_binary(body);
AzureIoTClient 0:1b5f413bf328 1192 }
AzureIoTClient 0:1b5f413bf328 1193 }
AzureIoTClient 2:1e6040e0c035 1194 if (keepProcessingThisMessage == true)
AzureIoTClient 0:1b5f413bf328 1195 {
AzureIoTClient 0:1b5f413bf328 1196 IOTHUB_MESSAGE_HANDLE ioTMessage;
AzureIoTClient 0:1b5f413bf328 1197 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_058: [The IOTHUB_MESSAGE_HANDLE will be set by invoking IotHubMessage_SetData with the previously saved length and pointer to payload.]*/
AzureIoTClient 0:1b5f413bf328 1198 if ((ioTMessage = IoTHubMessage_CreateFromByteArray((const unsigned char*)sizeAndData.start, sizeAndData.size)) == NULL)
AzureIoTClient 0:1b5f413bf328 1199 {
AzureIoTClient 2:1e6040e0c035 1200 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_059: [processMessage will abandon the message if IotHubMessage_SetData returns any status other than IOTHUB_MESSAGE_OK.]*/
AzureIoTClient 0:1b5f413bf328 1201 LogError("Failed to set data for IoT hub message\r\n");
AzureIoTClient 0:1b5f413bf328 1202 pn_messenger_release(transportState->messenger, tracker, 0);
AzureIoTClient 0:1b5f413bf328 1203 pn_messenger_settle(transportState->messenger, tracker, 0);
AzureIoTClient 0:1b5f413bf328 1204 }
AzureIoTClient 0:1b5f413bf328 1205 else if (cloneProperties(ioTMessage, transportState->message) != 0)
AzureIoTClient 0:1b5f413bf328 1206 {
AzureIoTClient 0:1b5f413bf328 1207 LogError("Failed to set data for IoT hub message\r\n");
AzureIoTClient 0:1b5f413bf328 1208 pn_messenger_release(transportState->messenger, tracker, 0);
AzureIoTClient 0:1b5f413bf328 1209 pn_messenger_settle(transportState->messenger, tracker, 0);
AzureIoTClient 0:1b5f413bf328 1210 }
AzureIoTClient 0:1b5f413bf328 1211 else
AzureIoTClient 0:1b5f413bf328 1212 {
AzureIoTClient 2:1e6040e0c035 1213 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_060: [If IOTHUB_MESSAGE_OK had been returned IoTHubClient_LL_MessageCallback will be invoked with the IOTHUB_MESSAGE_HANDLE.]*/
AzureIoTClient 2:1e6040e0c035 1214 IOTHUBMESSAGE_DISPOSITION_RESULT upperLayerDisposition = IoTHubClient_LL_MessageCallback(transportState->savedClientHandle, ioTMessage);
AzureIoTClient 0:1b5f413bf328 1215
AzureIoTClient 0:1b5f413bf328 1216 if (upperLayerDisposition == IOTHUBMESSAGE_ACCEPTED)
AzureIoTClient 0:1b5f413bf328 1217 {
AzureIoTClient 2:1e6040e0c035 1218 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_006: [If IoTHubClient_MessageCallback returns IOTHUBMESSAGE_ACCEPTED value then the message will be accepted.] */
AzureIoTClient 0:1b5f413bf328 1219 pn_messenger_accept(transportState->messenger, tracker, 0);
AzureIoTClient 0:1b5f413bf328 1220 }
AzureIoTClient 0:1b5f413bf328 1221 else if (upperLayerDisposition == IOTHUBMESSAGE_ABANDONED)
AzureIoTClient 0:1b5f413bf328 1222 {
AzureIoTClient 2:1e6040e0c035 1223 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_007: [If IoTHubClient_MessageCallback returns IOTHUBMESSAGE_ABANDONED value then the message will be abandoned.] */
AzureIoTClient 0:1b5f413bf328 1224 pn_messenger_release(transportState->messenger, tracker, 0);
AzureIoTClient 0:1b5f413bf328 1225 }
AzureIoTClient 0:1b5f413bf328 1226 else
AzureIoTClient 0:1b5f413bf328 1227 {
AzureIoTClient 2:1e6040e0c035 1228 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_061: [If IoTHubClient_MessageCallback returns IOTHUBMESSAGE_REJECTED value then the message will be rejected.]*/
AzureIoTClient 0:1b5f413bf328 1229 pn_messenger_reject(transportState->messenger, tracker, 0);
AzureIoTClient 0:1b5f413bf328 1230 }
AzureIoTClient 0:1b5f413bf328 1231 pn_messenger_settle(transportState->messenger, tracker, 0);
AzureIoTClient 0:1b5f413bf328 1232 }
AzureIoTClient 2:1e6040e0c035 1233 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_063: [processMessage will destroy the IOTHUB_MESSAGE_HANDLE by invoking IoTHubMessage_Destroy.]*/
AzureIoTClient 0:1b5f413bf328 1234 IoTHubMessage_Destroy(ioTMessage);
AzureIoTClient 0:1b5f413bf328 1235 }
AzureIoTClient 0:1b5f413bf328 1236 }
AzureIoTClient 0:1b5f413bf328 1237 }
AzureIoTClient 0:1b5f413bf328 1238 }
AzureIoTClient 0:1b5f413bf328 1239
AzureIoTClient 0:1b5f413bf328 1240 static void processReceives(TRANSPORT_HANDLE handle)
AzureIoTClient 0:1b5f413bf328 1241 {
AzureIoTClient 0:1b5f413bf328 1242 PAMQP_TRANSPORT_STATE transportState = (PAMQP_TRANSPORT_STATE)handle;
AzureIoTClient 0:1b5f413bf328 1243 int receiveResult;
AzureIoTClient 0:1b5f413bf328 1244
AzureIoTClient 2:1e6040e0c035 1245 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_043: [If DoWork_PullMessage is false then DoWork will cancel any preceding messages by invoking pn_messenger_recv.]*/
AzureIoTClient 0:1b5f413bf328 1246
AzureIoTClient 2:1e6040e0c035 1247 if ((transportState->DoWork_PullMessages == false) && (transportState->waitingForPutTokenReply == false))
AzureIoTClient 0:1b5f413bf328 1248 {
AzureIoTClient 0:1b5f413bf328 1249 if ((receiveResult = pn_messenger_recv(transportState->messenger, 0)) != 0)
AzureIoTClient 0:1b5f413bf328 1250 {
AzureIoTClient 0:1b5f413bf328 1251 if ((receiveResult != PN_INPROGRESS) && (receiveResult != PN_TIMEOUT))
AzureIoTClient 0:1b5f413bf328 1252 {
AzureIoTClient 0:1b5f413bf328 1253 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_046: [If pn_messenger_recv fails it will be logged.]*/
AzureIoTClient 2:1e6040e0c035 1254 LogError("Error attempting to receive messages: %d\r\n", receiveResult);
AzureIoTClient 0:1b5f413bf328 1255 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_105: [processReceives will then go on to the next action of DoWork.]*/
AzureIoTClient 0:1b5f413bf328 1256 transportState->messengerInitialized = false;
AzureIoTClient 0:1b5f413bf328 1257 }
AzureIoTClient 0:1b5f413bf328 1258 }
AzureIoTClient 0:1b5f413bf328 1259 }
AzureIoTClient 0:1b5f413bf328 1260 else
AzureIoTClient 0:1b5f413bf328 1261 {
AzureIoTClient 2:1e6040e0c035 1262 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_044: [If the DoWork_PullMessage or the waitingForPutTokenReply flag is true then processReceives will pull messages with pn_messenger_recv.]*/
AzureIoTClient 0:1b5f413bf328 1263 if ((receiveResult = pn_messenger_recv(transportState->messenger, PROTON_INCOMING_WINDOW_SIZE)) != 0)
AzureIoTClient 0:1b5f413bf328 1264 {
AzureIoTClient 0:1b5f413bf328 1265 if ((receiveResult != PN_INPROGRESS) && (receiveResult != PN_TIMEOUT))
AzureIoTClient 0:1b5f413bf328 1266 {
AzureIoTClient 0:1b5f413bf328 1267 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_046: [If pn_messenger_recv fails it will be logged.]*/
AzureIoTClient 2:1e6040e0c035 1268 LogError("Error attempting to receive messages: %d\r\n", receiveResult);
AzureIoTClient 0:1b5f413bf328 1269 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_105: [processReceives will then go on to the next action of DoWork.]*/
AzureIoTClient 0:1b5f413bf328 1270 transportState->messengerInitialized = false;
AzureIoTClient 0:1b5f413bf328 1271 }
AzureIoTClient 0:1b5f413bf328 1272 }
AzureIoTClient 0:1b5f413bf328 1273 else
AzureIoTClient 0:1b5f413bf328 1274 {
AzureIoTClient 0:1b5f413bf328 1275 //
AzureIoTClient 0:1b5f413bf328 1276 // If any are present we need to dispose of messages in the incoming queue.
AzureIoTClient 0:1b5f413bf328 1277 //
AzureIoTClient 0:1b5f413bf328 1278 size_t depthOfReceiveQueue;
AzureIoTClient 2:1e6040e0c035 1279 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_045: [processReceives will check the number of messages received by invoking pn_messenger_incoming.]*/
AzureIoTClient 0:1b5f413bf328 1280 for (depthOfReceiveQueue = pn_messenger_incoming(transportState->messenger); depthOfReceiveQueue > 0; depthOfReceiveQueue--)
AzureIoTClient 0:1b5f413bf328 1281 {
AzureIoTClient 0:1b5f413bf328 1282 int result;
AzureIoTClient 0:1b5f413bf328 1283 pn_message_clear(transportState->message);
AzureIoTClient 0:1b5f413bf328 1284 //
AzureIoTClient 0:1b5f413bf328 1285 // Ok something is there. First thing is to get it.
AzureIoTClient 0:1b5f413bf328 1286 //
AzureIoTClient 0:1b5f413bf328 1287 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_047: [processReceives will retrieve a message with pn_messenger_get.]*/
AzureIoTClient 0:1b5f413bf328 1288 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_048: [If pn_messenger_get fails we log and break the receive loop.]*/
AzureIoTClient 0:1b5f413bf328 1289 if ((result = pn_messenger_get(transportState->messenger, transportState->message)) != 0)
AzureIoTClient 0:1b5f413bf328 1290 {
AzureIoTClient 0:1b5f413bf328 1291 //
AzureIoTClient 0:1b5f413bf328 1292 // Well that's pretty odd. We shouldn't have gotten here if the message queue was empty!
AzureIoTClient 0:1b5f413bf328 1293 //
AzureIoTClient 0:1b5f413bf328 1294 LogError("Message reception queue unexpectedly empty! Code: %d\r\n", result);
AzureIoTClient 0:1b5f413bf328 1295 break;
AzureIoTClient 0:1b5f413bf328 1296 }
AzureIoTClient 0:1b5f413bf328 1297 else
AzureIoTClient 0:1b5f413bf328 1298 {
AzureIoTClient 0:1b5f413bf328 1299 //
AzureIoTClient 0:1b5f413bf328 1300 // We'll need a tracker to dispose of the message. We need the subscription to determine who should process it.
AzureIoTClient 0:1b5f413bf328 1301 //
AzureIoTClient 0:1b5f413bf328 1302 pn_subscription_t* theMessageSource;
AzureIoTClient 0:1b5f413bf328 1303 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_049: [processReceives shall acquire a tracker by invoking pn_messenger_incoming_tracker.]*/
AzureIoTClient 0:1b5f413bf328 1304 pn_tracker_t tracker = pn_messenger_incoming_tracker(transportState->messenger);
AzureIoTClient 0:1b5f413bf328 1305
AzureIoTClient 0:1b5f413bf328 1306 //
AzureIoTClient 0:1b5f413bf328 1307 // Currently we can receive two types of messages. We can recieve status messages from the $CBS that indicates the success
AzureIoTClient 0:1b5f413bf328 1308 // or failure of renewing the SAS for the device.
AzureIoTClient 0:1b5f413bf328 1309 //
AzureIoTClient 2:1e6040e0c035 1310 // Otherwise we can recieve a simple message. We can get a pointer to the subscription from the tracker. We can compare
AzureIoTClient 0:1b5f413bf328 1311 // that pointer to the subscriptions that we stored when we initialized the messenger. We then dispatch appropriately.
AzureIoTClient 0:1b5f413bf328 1312 //
AzureIoTClient 0:1b5f413bf328 1313
AzureIoTClient 0:1b5f413bf328 1314 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_106: [processReceives acquires the subscription of the message by invoking pn_messneger_incoming_subscription.] This value is henceforth known as theMessageSource.*/
AzureIoTClient 0:1b5f413bf328 1315 theMessageSource = pn_messenger_incoming_subscription(transportState->messenger);
AzureIoTClient 2:1e6040e0c035 1316 if (theMessageSource == transportState->messageSubscription)
AzureIoTClient 0:1b5f413bf328 1317 {
AzureIoTClient 0:1b5f413bf328 1318 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_108: [If theMessageSource is equal to the state variable cbsSubscription invoke processCBSReply.]*/
AzureIoTClient 2:1e6040e0c035 1319 processMessage(transportState, tracker);
AzureIoTClient 0:1b5f413bf328 1320 }
AzureIoTClient 0:1b5f413bf328 1321 else if (theMessageSource == transportState->cbsSubscription)
AzureIoTClient 0:1b5f413bf328 1322 {
AzureIoTClient 0:1b5f413bf328 1323 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_108: [If theMessageSource is equal to the state variable cbsSubscription invoke processCBSReply.]*/
AzureIoTClient 0:1b5f413bf328 1324 processCBSReply(transportState, tracker);
AzureIoTClient 0:1b5f413bf328 1325 }
AzureIoTClient 0:1b5f413bf328 1326 else
AzureIoTClient 0:1b5f413bf328 1327 {
AzureIoTClient 0:1b5f413bf328 1328 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_109: [Otherwise, log an error and abandon the message so that it never returns.]*/
AzureIoTClient 0:1b5f413bf328 1329 LogError("Message received from an unknown sender\r\n");
AzureIoTClient 0:1b5f413bf328 1330 pn_messenger_release(transportState->messenger, tracker, 0);
AzureIoTClient 0:1b5f413bf328 1331 pn_messenger_settle(transportState->messenger, tracker, 0);
AzureIoTClient 0:1b5f413bf328 1332 }
AzureIoTClient 0:1b5f413bf328 1333 }
AzureIoTClient 0:1b5f413bf328 1334 }
AzureIoTClient 0:1b5f413bf328 1335 pn_message_clear(transportState->message);
AzureIoTClient 0:1b5f413bf328 1336 }
AzureIoTClient 0:1b5f413bf328 1337 }
AzureIoTClient 0:1b5f413bf328 1338 }
AzureIoTClient 0:1b5f413bf328 1339
AzureIoTClient 0:1b5f413bf328 1340 static void clientTransportAMQP_DoWork(TRANSPORT_HANDLE handle, IOTHUB_CLIENT_LL_HANDLE iotHubClientHandle)
AzureIoTClient 0:1b5f413bf328 1341 {
AzureIoTClient 0:1b5f413bf328 1342 PAMQP_TRANSPORT_STATE transportState = (PAMQP_TRANSPORT_STATE)handle;
AzureIoTClient 0:1b5f413bf328 1343
AzureIoTClient 0:1b5f413bf328 1344 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_019: [clientTransportAMQP_Dowork shall do no work if handle is NULL.]*/
AzureIoTClient 0:1b5f413bf328 1345 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_020: [clientTransportAMQP_Dowork shall do no work if iotHubClientHandle is NULL.]*/
AzureIoTClient 0:1b5f413bf328 1346 if (transportState && iotHubClientHandle)
AzureIoTClient 0:1b5f413bf328 1347 {
AzureIoTClient 0:1b5f413bf328 1348 transportState->savedClientHandle = iotHubClientHandle;
AzureIoTClient 0:1b5f413bf328 1349 if (protonMessengerInit(transportState))
AzureIoTClient 0:1b5f413bf328 1350 {
AzureIoTClient 0:1b5f413bf328 1351 renewIfNecessaryTheCBS(transportState);
AzureIoTClient 0:1b5f413bf328 1352 if (transportState->messengerInitialized == true) reclaimEventResources(transportState);
AzureIoTClient 0:1b5f413bf328 1353 if (transportState->messengerInitialized == true) sendEvent(transportState);
AzureIoTClient 0:1b5f413bf328 1354 if (transportState->messengerInitialized == true) processReceives(transportState);
AzureIoTClient 0:1b5f413bf328 1355
AzureIoTClient 0:1b5f413bf328 1356 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_088: [pn_messenger_work shall be invoked following all the above actions.]*/
AzureIoTClient 0:1b5f413bf328 1357 if (transportState->messengerInitialized == true) checkForErrorsThenWork(transportState);
AzureIoTClient 0:1b5f413bf328 1358 }
AzureIoTClient 0:1b5f413bf328 1359
AzureIoTClient 0:1b5f413bf328 1360 //
AzureIoTClient 0:1b5f413bf328 1361 // Try to get rid of any old messengers that we were NOT able to stop.
AzureIoTClient 0:1b5f413bf328 1362 //
AzureIoTClient 0:1b5f413bf328 1363 herdTheMessengers(transportState, false);
AzureIoTClient 0:1b5f413bf328 1364 }
AzureIoTClient 0:1b5f413bf328 1365 return;
AzureIoTClient 0:1b5f413bf328 1366 }
AzureIoTClient 0:1b5f413bf328 1367
AzureIoTClient 0:1b5f413bf328 1368 static bool createUrledDeviceId(PAMQP_TRANSPORT_STATE state, const IOTHUBTRANSPORT_CONFIG* config)
AzureIoTClient 0:1b5f413bf328 1369 {
AzureIoTClient 0:1b5f413bf328 1370 return ((state->urledDeviceId = URL_EncodeString(config->upperConfig->deviceId)) == NULL) ? (false) : (true);
AzureIoTClient 0:1b5f413bf328 1371 }
AzureIoTClient 0:1b5f413bf328 1372 static bool createDevicesPortionPath(PAMQP_TRANSPORT_STATE state, const char* host)
AzureIoTClient 0:1b5f413bf328 1373 {
AzureIoTClient 0:1b5f413bf328 1374 return (((state->devicesPortionPath = STRING_construct(host)) == NULL) ||
AzureIoTClient 0:1b5f413bf328 1375 (STRING_concat(state->devicesPortionPath, "/devices/") != 0) ||
AzureIoTClient 0:1b5f413bf328 1376 (STRING_concat_with_STRING(state->devicesPortionPath, state->urledDeviceId) != 0)) ? (false) : (true);
AzureIoTClient 0:1b5f413bf328 1377 }
AzureIoTClient 0:1b5f413bf328 1378 static bool createEventAddress(PAMQP_TRANSPORT_STATE state)
AzureIoTClient 0:1b5f413bf328 1379 {
AzureIoTClient 0:1b5f413bf328 1380 return (((state->eventAddress = STRING_construct(AMQPS_SCHEME)) == NULL) ||
AzureIoTClient 0:1b5f413bf328 1381 (STRING_concat_with_STRING(state->eventAddress, state->devicesPortionPath) != 0) ||
AzureIoTClient 0:1b5f413bf328 1382 (STRING_concat(state->eventAddress, EVENT_ENDPOINT) != 0)) ? (false) : (true);
AzureIoTClient 0:1b5f413bf328 1383 }
AzureIoTClient 0:1b5f413bf328 1384
AzureIoTClient 2:1e6040e0c035 1385 static bool createMessageAddress(PAMQP_TRANSPORT_STATE state)
AzureIoTClient 0:1b5f413bf328 1386 {
AzureIoTClient 2:1e6040e0c035 1387 return (((state->messageAddress = STRING_construct(AMQPS_SCHEME)) == NULL) ||
AzureIoTClient 2:1e6040e0c035 1388 (STRING_concat_with_STRING(state->messageAddress, state->devicesPortionPath) != 0) ||
AzureIoTClient 2:1e6040e0c035 1389 (STRING_concat(state->messageAddress, MESSAGE_ENDPOINT) != 0)) ? (false) : (true);
AzureIoTClient 0:1b5f413bf328 1390 }
AzureIoTClient 0:1b5f413bf328 1391
AzureIoTClient 0:1b5f413bf328 1392 static bool createCbsAddress(PAMQP_TRANSPORT_STATE state, const char* host)
AzureIoTClient 0:1b5f413bf328 1393 {
AzureIoTClient 0:1b5f413bf328 1394 return (((state->cbsAddress = STRING_construct(AMQPS_SCHEME)) == NULL) ||
AzureIoTClient 0:1b5f413bf328 1395 (STRING_concat(state->cbsAddress,host) != 0) ||
AzureIoTClient 0:1b5f413bf328 1396 (STRING_concat(state->cbsAddress, CBS_ENDPOINT) != 0)) ? (false) : (true);
AzureIoTClient 0:1b5f413bf328 1397 }
AzureIoTClient 0:1b5f413bf328 1398
AzureIoTClient 0:1b5f413bf328 1399 static STRING_HANDLE createHost(const IOTHUBTRANSPORT_CONFIG* config)
AzureIoTClient 0:1b5f413bf328 1400 {
AzureIoTClient 0:1b5f413bf328 1401 STRING_HANDLE result = NULL;
AzureIoTClient 0:1b5f413bf328 1402 if (((result = STRING_construct(config->upperConfig->iotHubName)) == NULL) ||
AzureIoTClient 0:1b5f413bf328 1403 (STRING_concat(result, ".") != 0) ||
AzureIoTClient 0:1b5f413bf328 1404 (STRING_concat(result, config->upperConfig->iotHubSuffix) != 0))
AzureIoTClient 0:1b5f413bf328 1405 {
AzureIoTClient 0:1b5f413bf328 1406 if (result != NULL)
AzureIoTClient 0:1b5f413bf328 1407 {
AzureIoTClient 0:1b5f413bf328 1408 STRING_delete(result);
AzureIoTClient 0:1b5f413bf328 1409 result = NULL;
AzureIoTClient 0:1b5f413bf328 1410 }
AzureIoTClient 0:1b5f413bf328 1411 }
AzureIoTClient 0:1b5f413bf328 1412 return result;
AzureIoTClient 0:1b5f413bf328 1413 }
AzureIoTClient 0:1b5f413bf328 1414
AzureIoTClient 0:1b5f413bf328 1415 static TRANSPORT_HANDLE clientTransportAMQP_Create(const IOTHUBTRANSPORT_CONFIG* config)
AzureIoTClient 0:1b5f413bf328 1416 {
AzureIoTClient 0:1b5f413bf328 1417 PAMQP_TRANSPORT_STATE amqpState = NULL;
AzureIoTClient 0:1b5f413bf328 1418 if (!config)
AzureIoTClient 0:1b5f413bf328 1419 {
AzureIoTClient 0:1b5f413bf328 1420 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_001: [If parameter config is NULL then clientTransportAMQP_Create shall fail and return NULL.]*/
AzureIoTClient 0:1b5f413bf328 1421 LogError("IoTHub amqp client tranport null configuration parameter.\r\n");
AzureIoTClient 0:1b5f413bf328 1422 }
AzureIoTClient 0:1b5f413bf328 1423 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_002: [clientTransportAMQP_Create shall fail and return NULL if any other fields of the config structure are NULL.]*/
AzureIoTClient 0:1b5f413bf328 1424 else if (config->upperConfig->protocol == NULL)
AzureIoTClient 0:1b5f413bf328 1425 {
AzureIoTClient 0:1b5f413bf328 1426 LogError("invalid configuration (NULL protocol detected)\r\n");
AzureIoTClient 0:1b5f413bf328 1427 }
AzureIoTClient 0:1b5f413bf328 1428 else if (config->upperConfig->deviceId == NULL)
AzureIoTClient 0:1b5f413bf328 1429 {
AzureIoTClient 0:1b5f413bf328 1430 LogError("invalid configuration (NULL deviceId detected)\r\n");
AzureIoTClient 0:1b5f413bf328 1431 }
AzureIoTClient 0:1b5f413bf328 1432 else if (config->upperConfig->deviceKey == NULL)
AzureIoTClient 0:1b5f413bf328 1433 {
AzureIoTClient 0:1b5f413bf328 1434 LogError("invalid configuration (NULL deviceKey detected)\r\n");
AzureIoTClient 0:1b5f413bf328 1435 }
AzureIoTClient 0:1b5f413bf328 1436 else if (config->upperConfig->iotHubName == NULL)
AzureIoTClient 0:1b5f413bf328 1437 {
AzureIoTClient 0:1b5f413bf328 1438 LogError("invalid configuration (NULL iotHubName detected)\r\n");
AzureIoTClient 0:1b5f413bf328 1439 }
AzureIoTClient 0:1b5f413bf328 1440 else if (config->upperConfig->iotHubSuffix == NULL)
AzureIoTClient 0:1b5f413bf328 1441 {
AzureIoTClient 0:1b5f413bf328 1442 LogError("invalid configuration (NULL iotHubSuffix detected)\r\n");
AzureIoTClient 0:1b5f413bf328 1443 }
AzureIoTClient 0:1b5f413bf328 1444 else if (!config->waitingToSend)
AzureIoTClient 0:1b5f413bf328 1445 {
AzureIoTClient 0:1b5f413bf328 1446 LogError("invalid configuration (NULL waitingToSend list detected)\r\n");
AzureIoTClient 0:1b5f413bf328 1447 }
AzureIoTClient 0:1b5f413bf328 1448 else if (strlen(config->upperConfig->deviceId) > 128U)
AzureIoTClient 0:1b5f413bf328 1449 {
AzureIoTClient 0:1b5f413bf328 1450 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_016: [clientTransportAMQP_Create shall fail and return NULL if the deviceId length is greater than 128.]*/
AzureIoTClient 0:1b5f413bf328 1451 LogError("deviceName is too long\r\n");
AzureIoTClient 0:1b5f413bf328 1452 }
AzureIoTClient 0:1b5f413bf328 1453 else if ((strlen(config->upperConfig->deviceId) == 0) ||
AzureIoTClient 0:1b5f413bf328 1454 (strlen(config->upperConfig->deviceKey) == 0) ||
AzureIoTClient 0:1b5f413bf328 1455 (strlen(config->upperConfig->iotHubName) == 0) ||
AzureIoTClient 0:1b5f413bf328 1456 (strlen(config->upperConfig->iotHubSuffix) == 0))
AzureIoTClient 0:1b5f413bf328 1457 {
AzureIoTClient 0:1b5f413bf328 1458 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_017: [clientTransportAMQP_Create shall fail and return NULL if any string config field is zero length.]*/
AzureIoTClient 0:1b5f413bf328 1459 LogError("zero length config parameter\r\n");
AzureIoTClient 0:1b5f413bf328 1460 }
AzureIoTClient 0:1b5f413bf328 1461 else
AzureIoTClient 0:1b5f413bf328 1462 {
AzureIoTClient 0:1b5f413bf328 1463 amqpState = malloc(sizeof(AMQP_TRANSPORT_STATE));
AzureIoTClient 0:1b5f413bf328 1464 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_003: [clientTransportAMQP_Create shall fail and return NULL if memory allocation of the transports basic internal state structures fails.]*/
AzureIoTClient 0:1b5f413bf328 1465 if (amqpState == NULL)
AzureIoTClient 0:1b5f413bf328 1466 {
AzureIoTClient 0:1b5f413bf328 1467 LogError("Could not allocate AMQP transport state\r\n");
AzureIoTClient 0:1b5f413bf328 1468 }
AzureIoTClient 0:1b5f413bf328 1469 else
AzureIoTClient 0:1b5f413bf328 1470 {
AzureIoTClient 2:1e6040e0c035 1471 amqpState->messageAddress = NULL;
AzureIoTClient 0:1b5f413bf328 1472 amqpState->eventAddress = NULL;
AzureIoTClient 0:1b5f413bf328 1473 amqpState->urledDeviceId = NULL;
AzureIoTClient 0:1b5f413bf328 1474 amqpState->devicesPortionPath = NULL;
AzureIoTClient 0:1b5f413bf328 1475 amqpState->cbsAddress = NULL;
AzureIoTClient 0:1b5f413bf328 1476 amqpState->deviceKey = NULL;
AzureIoTClient 0:1b5f413bf328 1477 amqpState->savedClientHandle = NULL;
AzureIoTClient 0:1b5f413bf328 1478 amqpState->messenger = NULL;
AzureIoTClient 2:1e6040e0c035 1479 amqpState->messageSubscription = NULL;
AzureIoTClient 0:1b5f413bf328 1480 amqpState->cbsSubscription = NULL;
AzureIoTClient 0:1b5f413bf328 1481 amqpState->zeroLengthString = NULL;
AzureIoTClient 0:1b5f413bf328 1482 amqpState->messengerInitialized = false;
AzureIoTClient 0:1b5f413bf328 1483 amqpState->waitingForPutTokenReply = false;
AzureIoTClient 0:1b5f413bf328 1484 amqpState->cbsRequestAcceptTime = CBS_DEFAULT_REQUEST_ACCEPT_TIME;
AzureIoTClient 0:1b5f413bf328 1485 amqpState->cbsReplyTime = CBS_DEFAULT_REPLY_TIME;
AzureIoTClient 0:1b5f413bf328 1486 amqpState->sasTokenLifetime = SAS_TOKEN_DEFAULT_LIFETIME;
AzureIoTClient 0:1b5f413bf328 1487 amqpState->sasRefreshLine = SAS_TOKEN_DEFAULT_REFRESH_LINE;
AzureIoTClient 0:1b5f413bf328 1488 amqpState->eventTimeout = EVENT_TIMEOUT_DEFAULT;
AzureIoTClient 0:1b5f413bf328 1489 amqpState->waitingToSend = config->waitingToSend;
AzureIoTClient 0:1b5f413bf328 1490 DList_InitializeListHead(&amqpState->workInProgress);
AzureIoTClient 0:1b5f413bf328 1491 DList_InitializeListHead(&amqpState->availableWorkItems);
AzureIoTClient 0:1b5f413bf328 1492 DList_InitializeListHead(&amqpState->messengerCorral);
AzureIoTClient 2:1e6040e0c035 1493 amqpState->DoWork_PullMessages = false;
AzureIoTClient 0:1b5f413bf328 1494 amqpState->trustedCertificates = NULL;
AzureIoTClient 0:1b5f413bf328 1495 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_003: [clientTransportAMQP_Create shall fail and return NULL if memory allocation of the transports basic internal state structures fails.]*/
AzureIoTClient 0:1b5f413bf328 1496 if ((amqpState->message = pn_message()) == NULL)
AzureIoTClient 0:1b5f413bf328 1497 {
AzureIoTClient 0:1b5f413bf328 1498 LogError("Unable to preallocate the proton message!\r\n");
AzureIoTClient 0:1b5f413bf328 1499 clientTransportAMQP_Destroy(amqpState);
AzureIoTClient 0:1b5f413bf328 1500 amqpState = NULL;
AzureIoTClient 0:1b5f413bf328 1501 }
AzureIoTClient 0:1b5f413bf328 1502 else
AzureIoTClient 0:1b5f413bf328 1503 {
AzureIoTClient 0:1b5f413bf328 1504 size_t workItemsToPreallocate;
AzureIoTClient 0:1b5f413bf328 1505 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_003: [clientTransportAMQP_Create shall fail and return NULL if memory allocation of the transports basic internal state structures fails.]*/
AzureIoTClient 0:1b5f413bf328 1506 for (workItemsToPreallocate = PROTON_EVENT_OUTGOING_WINDOW_SIZE; workItemsToPreallocate > 0; workItemsToPreallocate--)
AzureIoTClient 0:1b5f413bf328 1507 {
AzureIoTClient 0:1b5f413bf328 1508 PAMQP_WORK_ITEM currentItem = malloc(sizeof(AMQP_WORK_ITEM));
AzureIoTClient 0:1b5f413bf328 1509 if (!currentItem)
AzureIoTClient 0:1b5f413bf328 1510 {
AzureIoTClient 0:1b5f413bf328 1511 //
AzureIoTClient 0:1b5f413bf328 1512 // Not a particularly good sign. Fail the create.
AzureIoTClient 0:1b5f413bf328 1513 //
AzureIoTClient 0:1b5f413bf328 1514 LogError("Unable to preallocate work item!\r\n");
AzureIoTClient 0:1b5f413bf328 1515 clientTransportAMQP_Destroy(amqpState);
AzureIoTClient 0:1b5f413bf328 1516 amqpState = NULL;
AzureIoTClient 0:1b5f413bf328 1517 break;
AzureIoTClient 0:1b5f413bf328 1518 }
AzureIoTClient 0:1b5f413bf328 1519 else
AzureIoTClient 0:1b5f413bf328 1520 {
AzureIoTClient 0:1b5f413bf328 1521 DList_InsertTailList(&amqpState->availableWorkItems, &currentItem->link);
AzureIoTClient 0:1b5f413bf328 1522 }
AzureIoTClient 0:1b5f413bf328 1523 }
AzureIoTClient 0:1b5f413bf328 1524 if (amqpState != NULL)
AzureIoTClient 0:1b5f413bf328 1525 {
AzureIoTClient 0:1b5f413bf328 1526 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_095: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as host, from the following pieces: "config->iotHubName + "." + config->iotHubSuffix.]*/
AzureIoTClient 0:1b5f413bf328 1527 STRING_HANDLE host = createHost(config);
AzureIoTClient 0:1b5f413bf328 1528 if (host != NULL)
AzureIoTClient 0:1b5f413bf328 1529 {
AzureIoTClient 0:1b5f413bf328 1530 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_091: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as urledDeviceId, by url encoding the config->deviceId.]*/
AzureIoTClient 0:1b5f413bf328 1531 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_093: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as devicePortionOfPath, from the following pieces: host + "/devices/" + urledDeviceId.]*/
AzureIoTClient 0:1b5f413bf328 1532 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_097: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as eventAddress, from the following pieces: "amqps://" + devicesPortionOfPath + "/messages/events".]*/
AzureIoTClient 2:1e6040e0c035 1533 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_099: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as messageAddress, from the following pieces: "amqps://" + devicesPortionOfPath + "/messages/devicebound".]*/
AzureIoTClient 0:1b5f413bf328 1534 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_101: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as cbsAddress, from the following pieces: "amqps://" + host + "/$cbs".]*/
AzureIoTClient 0:1b5f413bf328 1535 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_103: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as deviceKey, from the config->deviceKey.]*/
AzureIoTClient 0:1b5f413bf328 1536 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_186: [An empty string shall be created. If this fails then clientTransportAMQP shall fail and return NULL.]*/
AzureIoTClient 0:1b5f413bf328 1537 if ((!createUrledDeviceId(amqpState, config)) ||
AzureIoTClient 0:1b5f413bf328 1538 (!createDevicesPortionPath(amqpState, STRING_c_str(host))) ||
AzureIoTClient 0:1b5f413bf328 1539 (!createEventAddress(amqpState)) ||
AzureIoTClient 2:1e6040e0c035 1540 (!createMessageAddress(amqpState)) ||
AzureIoTClient 0:1b5f413bf328 1541 (!createCbsAddress(amqpState, STRING_c_str(host))) ||
AzureIoTClient 0:1b5f413bf328 1542 ((amqpState->deviceKey = STRING_construct(config->upperConfig->deviceKey)) == NULL) ||
AzureIoTClient 0:1b5f413bf328 1543 ((amqpState->zeroLengthString = STRING_new()) == NULL))
AzureIoTClient 0:1b5f413bf328 1544 {
AzureIoTClient 0:1b5f413bf328 1545 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_092: [If creating the urledDeviceId fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/
AzureIoTClient 0:1b5f413bf328 1546 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_094: [If creating the devicePortionOfPath fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/
AzureIoTClient 0:1b5f413bf328 1547 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_098: [If creating the eventAddress fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/
AzureIoTClient 2:1e6040e0c035 1548 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_100: [If creating the messageAddress fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/
AzureIoTClient 0:1b5f413bf328 1549 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_102: [If creating the cbsAddress fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/
AzureIoTClient 0:1b5f413bf328 1550 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_104: [If creating the deviceKey fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/
AzureIoTClient 0:1b5f413bf328 1551 clientTransportAMQP_Destroy(amqpState);
AzureIoTClient 0:1b5f413bf328 1552 amqpState = NULL;
AzureIoTClient 0:1b5f413bf328 1553 }
AzureIoTClient 0:1b5f413bf328 1554 STRING_delete(host);
AzureIoTClient 0:1b5f413bf328 1555 }
AzureIoTClient 0:1b5f413bf328 1556 else
AzureIoTClient 0:1b5f413bf328 1557 {
AzureIoTClient 0:1b5f413bf328 1558 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_096: [If creating the schemeAndHost fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/
AzureIoTClient 0:1b5f413bf328 1559 clientTransportAMQP_Destroy(amqpState);
AzureIoTClient 0:1b5f413bf328 1560 amqpState = NULL;
AzureIoTClient 0:1b5f413bf328 1561 }
AzureIoTClient 0:1b5f413bf328 1562 }
AzureIoTClient 0:1b5f413bf328 1563 }
AzureIoTClient 0:1b5f413bf328 1564 }
AzureIoTClient 0:1b5f413bf328 1565 }
AzureIoTClient 0:1b5f413bf328 1566 return amqpState;
AzureIoTClient 0:1b5f413bf328 1567 }
AzureIoTClient 0:1b5f413bf328 1568
AzureIoTClient 0:1b5f413bf328 1569 static int clientTransportAMQP_Subscribe(TRANSPORT_HANDLE handle)
AzureIoTClient 0:1b5f413bf328 1570 {
AzureIoTClient 0:1b5f413bf328 1571 int result;
AzureIoTClient 0:1b5f413bf328 1572 if (handle == NULL)
AzureIoTClient 0:1b5f413bf328 1573 {
AzureIoTClient 0:1b5f413bf328 1574 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_013: [If the parameter handle is NULL then clientTransportAMQP_Subscribe shall fail and return a non-zero value.]*/
AzureIoTClient 0:1b5f413bf328 1575 LogError("Invalid handle to IoTHubClient amqp subscribe.\r\n");
AzureIoTClient 0:1b5f413bf328 1576 result = 1;
AzureIoTClient 0:1b5f413bf328 1577 }
AzureIoTClient 0:1b5f413bf328 1578 else
AzureIoTClient 0:1b5f413bf328 1579 {
AzureIoTClient 0:1b5f413bf328 1580 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_014: [clientTransportAMQP_Subscribe shall succeed and return a value of zero.]*/
AzureIoTClient 0:1b5f413bf328 1581 PAMQP_TRANSPORT_STATE transportState = (PAMQP_TRANSPORT_STATE)handle;
AzureIoTClient 2:1e6040e0c035 1582 transportState->DoWork_PullMessages = true;
AzureIoTClient 0:1b5f413bf328 1583 result = 0;
AzureIoTClient 0:1b5f413bf328 1584 }
AzureIoTClient 0:1b5f413bf328 1585 return result;
AzureIoTClient 0:1b5f413bf328 1586 }
AzureIoTClient 0:1b5f413bf328 1587
AzureIoTClient 0:1b5f413bf328 1588 static void clientTransportAMQP_Unsubscribe(TRANSPORT_HANDLE handle)
AzureIoTClient 0:1b5f413bf328 1589 {
AzureIoTClient 0:1b5f413bf328 1590 PAMQP_TRANSPORT_STATE transportState = (PAMQP_TRANSPORT_STATE)handle;
AzureIoTClient 0:1b5f413bf328 1591 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_015: [clientTransportAMQP_Unsubscribe shall do nothing if handle is NULL.]*/
AzureIoTClient 0:1b5f413bf328 1592 if (transportState)
AzureIoTClient 0:1b5f413bf328 1593 {
AzureIoTClient 2:1e6040e0c035 1594 transportState->DoWork_PullMessages = false;
AzureIoTClient 0:1b5f413bf328 1595 }
AzureIoTClient 0:1b5f413bf328 1596 }
AzureIoTClient 0:1b5f413bf328 1597
AzureIoTClient 0:1b5f413bf328 1598 static IOTHUB_CLIENT_RESULT clientTransportAMQP_GetSendStatus(TRANSPORT_HANDLE handle, IOTHUB_CLIENT_STATUS *iotHubClientStatus)
AzureIoTClient 0:1b5f413bf328 1599 {
AzureIoTClient 0:1b5f413bf328 1600 IOTHUB_CLIENT_RESULT result;
AzureIoTClient 0:1b5f413bf328 1601
AzureIoTClient 0:1b5f413bf328 1602 /* Codes_SRS_IOTHUBTRANSPORTTAMQP_09_001: [clientTransportAMQP_GetSendStatus IoTHubTransportHttp_GetSendStatus shall return IOTHUB_CLIENT_INVALID_ARG if called with NULL parameter] */
AzureIoTClient 0:1b5f413bf328 1603 if (handle == NULL)
AzureIoTClient 0:1b5f413bf328 1604 {
AzureIoTClient 0:1b5f413bf328 1605 result = IOTHUB_CLIENT_INVALID_ARG;
AzureIoTClient 0:1b5f413bf328 1606 LogError("Invalid handle to IoTHubClient AMQP transport instance.\r\n");
AzureIoTClient 0:1b5f413bf328 1607 }
AzureIoTClient 0:1b5f413bf328 1608 else if (iotHubClientStatus == NULL)
AzureIoTClient 0:1b5f413bf328 1609 {
AzureIoTClient 0:1b5f413bf328 1610 result = IOTHUB_CLIENT_INVALID_ARG;
AzureIoTClient 0:1b5f413bf328 1611 LogError("Invalid pointer to output parameter IOTHUB_CLIENT_STATUS.\r\n");
AzureIoTClient 0:1b5f413bf328 1612 }
AzureIoTClient 0:1b5f413bf328 1613 else
AzureIoTClient 0:1b5f413bf328 1614 {
AzureIoTClient 0:1b5f413bf328 1615 PAMQP_TRANSPORT_STATE handleData = (PAMQP_TRANSPORT_STATE)handle;
AzureIoTClient 0:1b5f413bf328 1616
AzureIoTClient 0:1b5f413bf328 1617 /* Codes_SRS_IOTHUBTRANSPORTTAMQP_09_002: [clientTransportAMQP_GetSendStatus shall return IOTHUB_CLIENT_OK and status IOTHUB_CLIENT_SEND_STATUS_IDLE if there are currently no event items to be sent or being sent] */
AzureIoTClient 0:1b5f413bf328 1618 if (!DList_IsListEmpty(handleData->waitingToSend) || !DList_IsListEmpty(&(handleData->workInProgress)))
AzureIoTClient 0:1b5f413bf328 1619 {
AzureIoTClient 0:1b5f413bf328 1620 *iotHubClientStatus = IOTHUB_CLIENT_SEND_STATUS_BUSY;
AzureIoTClient 0:1b5f413bf328 1621 }
AzureIoTClient 0:1b5f413bf328 1622 /* Codes_SRS_IOTHUBTRANSPORTTAMQP_09_003: [clientTransportAMQP_GetSendStatus shall return IOTHUB_CLIENT_OK and status IOTHUB_CLIENT_SEND_STATUS_BUSY if there are currently event items to be sent or being sent] */
AzureIoTClient 0:1b5f413bf328 1623 else
AzureIoTClient 0:1b5f413bf328 1624 {
AzureIoTClient 0:1b5f413bf328 1625 *iotHubClientStatus = IOTHUB_CLIENT_SEND_STATUS_IDLE;
AzureIoTClient 0:1b5f413bf328 1626 }
AzureIoTClient 0:1b5f413bf328 1627
AzureIoTClient 0:1b5f413bf328 1628 result = IOTHUB_CLIENT_OK;
AzureIoTClient 0:1b5f413bf328 1629 }
AzureIoTClient 0:1b5f413bf328 1630
AzureIoTClient 0:1b5f413bf328 1631 return result;
AzureIoTClient 0:1b5f413bf328 1632 }
AzureIoTClient 0:1b5f413bf328 1633
AzureIoTClient 0:1b5f413bf328 1634 static IOTHUB_CLIENT_RESULT clientTransportAMQP_SetOption(TRANSPORT_HANDLE handle, const char* option, const void* value)
AzureIoTClient 0:1b5f413bf328 1635 {
AzureIoTClient 0:1b5f413bf328 1636 IOTHUB_CLIENT_RESULT result;
AzureIoTClient 0:1b5f413bf328 1637 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_001: [If handle parameter is NULL then clientTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.] */
AzureIoTClient 0:1b5f413bf328 1638 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_002: [If parameter optionName is NULL then clientTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.] */
AzureIoTClient 0:1b5f413bf328 1639 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_003: [If parameter value is NULL then clientTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.] */
AzureIoTClient 0:1b5f413bf328 1640
AzureIoTClient 0:1b5f413bf328 1641 if (
AzureIoTClient 0:1b5f413bf328 1642 (handle == NULL) ||
AzureIoTClient 0:1b5f413bf328 1643 (option == NULL) ||
AzureIoTClient 0:1b5f413bf328 1644 (value == NULL)
AzureIoTClient 0:1b5f413bf328 1645 )
AzureIoTClient 0:1b5f413bf328 1646 {
AzureIoTClient 0:1b5f413bf328 1647 result = IOTHUB_CLIENT_INVALID_ARG;
AzureIoTClient 0:1b5f413bf328 1648 LogError("invalid parameter (NULL) passed to clientTransportAMQP_SetOption\r\n");
AzureIoTClient 0:1b5f413bf328 1649 }
AzureIoTClient 0:1b5f413bf328 1650 else
AzureIoTClient 0:1b5f413bf328 1651 {
AzureIoTClient 0:1b5f413bf328 1652 /* Codes_SRS_IOTHUBTRANSPORTTAMQP_02_005: [clientTransportAMQP_SetOption shall set the option "optionName" to *value.] */
AzureIoTClient 0:1b5f413bf328 1653 /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_003: ["TrustedCerts"] */
AzureIoTClient 0:1b5f413bf328 1654 if (strcmp("TrustedCerts", option) == 0)
AzureIoTClient 0:1b5f413bf328 1655 {
AzureIoTClient 0:1b5f413bf328 1656 PAMQP_TRANSPORT_STATE handleData = (PAMQP_TRANSPORT_STATE)handle;
AzureIoTClient 0:1b5f413bf328 1657 char* newTrsutedCertificates;
AzureIoTClient 0:1b5f413bf328 1658
AzureIoTClient 0:1b5f413bf328 1659 if (mallocAndStrcpy_s(&newTrsutedCertificates, (const char*)value) != 0)
AzureIoTClient 0:1b5f413bf328 1660 {
AzureIoTClient 0:1b5f413bf328 1661 /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_006: [If any other error occurs while setting the option, clientTransportAMQP_SetOption shall return IOTHUB_CLIENT_ERROR.] */
AzureIoTClient 0:1b5f413bf328 1662 result = IOTHUB_CLIENT_ERROR;
AzureIoTClient 0:1b5f413bf328 1663 }
AzureIoTClient 0:1b5f413bf328 1664 else
AzureIoTClient 0:1b5f413bf328 1665 {
AzureIoTClient 0:1b5f413bf328 1666 /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_004: [Sets a string that should be used as trusted certificates by the transport, freeing any previous TrustedCerts option value.] */
AzureIoTClient 0:1b5f413bf328 1667 if (handleData->trustedCertificates != NULL)
AzureIoTClient 0:1b5f413bf328 1668 {
AzureIoTClient 0:1b5f413bf328 1669 free(handleData->trustedCertificates);
AzureIoTClient 0:1b5f413bf328 1670 }
AzureIoTClient 0:1b5f413bf328 1671
AzureIoTClient 0:1b5f413bf328 1672 handleData->trustedCertificates = newTrsutedCertificates;
AzureIoTClient 0:1b5f413bf328 1673 result = IOTHUB_CLIENT_OK;
AzureIoTClient 0:1b5f413bf328 1674 }
AzureIoTClient 0:1b5f413bf328 1675 }
AzureIoTClient 0:1b5f413bf328 1676 else
AzureIoTClient 0:1b5f413bf328 1677 {
AzureIoTClient 0:1b5f413bf328 1678 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_004: [If optionName is not an option supported then clientTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.]*/
AzureIoTClient 0:1b5f413bf328 1679 result = IOTHUB_CLIENT_INVALID_ARG;
AzureIoTClient 0:1b5f413bf328 1680 }
AzureIoTClient 0:1b5f413bf328 1681 }
AzureIoTClient 0:1b5f413bf328 1682
AzureIoTClient 0:1b5f413bf328 1683 return result;
AzureIoTClient 0:1b5f413bf328 1684 }
AzureIoTClient 0:1b5f413bf328 1685
AzureIoTClient 0:1b5f413bf328 1686 static TRANSPORT_PROVIDER myfunc = {
AzureIoTClient 0:1b5f413bf328 1687 clientTransportAMQP_SetOption,
AzureIoTClient 0:1b5f413bf328 1688 clientTransportAMQP_Create, clientTransportAMQP_Destroy, clientTransportAMQP_Subscribe, clientTransportAMQP_Unsubscribe, clientTransportAMQP_DoWork, clientTransportAMQP_GetSendStatus
AzureIoTClient 0:1b5f413bf328 1689 };
AzureIoTClient 0:1b5f413bf328 1690
AzureIoTClient 2:1e6040e0c035 1691 extern const void* AMQP_Protocol(void)
AzureIoTClient 0:1b5f413bf328 1692 {
AzureIoTClient 0:1b5f413bf328 1693 return &myfunc;
AzureIoTClient 0:1b5f413bf328 1694 }