Microsoft Azure IoTHub client AMQP transport
Dependents: sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more
This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks
Diff: iothubtransportamqp.c
- Revision:
- 2:1e6040e0c035
- Parent:
- 0:1b5f413bf328
--- a/iothubtransportamqp.c Tue Sep 15 23:52:53 2015 -0700 +++ b/iothubtransportamqp.c Tue Sep 22 20:36:39 2015 -0700 @@ -225,7 +225,7 @@ return result; } -/* Code_SRS_IOTHUBTRANSPORTTAMQP_07_002: [On notifications the properties shall be retrieved from the message using pn_message_properties and will be entered in the IoTHubMessage_Properties Map.] */ +/* Code_SRS_IOTHUBTRANSPORTTAMQP_07_002: [On messages the properties shall be retrieved from the message using pn_message_properties and will be entered in the IoTHubMessage_Properties Map.] */ static int cloneProperties(IOTHUB_MESSAGE_HANDLE ioTMessage, pn_message_t* msg) { int result = 0; @@ -457,7 +457,7 @@ { pn_message_free(transportState->message); } - STRING_delete(transportState->notificationAddress); + STRING_delete(transportState->messageAddress); STRING_delete(transportState->eventAddress); STRING_delete(transportState->urledDeviceId); STRING_delete(transportState->devicesPortionPath); @@ -772,10 +772,10 @@ if (amqpState->putTokenWasSuccessful == true) { /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_041: [pn_messenger_subscribe shall be invoked.]*/ - if ((amqpState->notificationSubscription = pn_messenger_subscribe(amqpState->messenger, (const char*)STRING_c_str(amqpState->notificationAddress))) == NULL) + if ((amqpState->messageSubscription = pn_messenger_subscribe(amqpState->messenger, (const char*)STRING_c_str(amqpState->messageAddress))) == NULL) { /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_042: [If pn_messenger_subscribe returns a non-zero value then messenger initialization fails.]*/ - LogError("unable to create a subscription to the notification address\r\n"); + LogError("unable to create a subscription to the message address\r\n"); } else { @@ -962,7 +962,7 @@ if (amqpBatchResult == amqp_batch_nowork) { // no work to do. Stop trying. - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_089: [If there is nothing on the list then go on to the notification.]*/ + /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_089: [If there is nothing on the list then go on to the message.]*/ break; } else if (amqpBatchResult == amqp_batch_error) @@ -1134,10 +1134,10 @@ pn_messenger_settle(transportState->messenger, tracker, 0); } -static void processNotification(PAMQP_TRANSPORT_STATE transportState, pn_tracker_t tracker) +static void processMessage(PAMQP_TRANSPORT_STATE transportState, pn_tracker_t tracker) { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_111: [If the state variable DoWork_PullNotification is false then the message shall be rejected and processNotification shall return.]*/ - if (transportState->DoWork_PullNotifications == false) + /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_111: [If the state variable DoWork_PullMessage is false then the message shall be rejected and processMessage shall return.]*/ + if (transportState->DoWork_PullMessages == false) { // // Nothing to do with the message. We reject it. In that way it can come back later. @@ -1149,10 +1149,10 @@ { pn_data_t *body; - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_050: [processNotification will acquire the body of the notification by invoking pn_message_body.]*/ + /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_050: [processMessage will acquire the body of the message by invoking pn_message_body.]*/ if ((body = pn_message_body(transportState->message)) == NULL) { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_051: [If the body is NULL then the notification will be abandoned.]*/ + /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_051: [If the body is NULL then the message will be abandoned.]*/ LogError("Failed to get the proton message body\r\n"); pn_messenger_release(transportState->messenger, tracker, 0); pn_messenger_settle(transportState->messenger, tracker, 0); @@ -1160,8 +1160,8 @@ else { pn_bytes_t sizeAndData; - bool keepProcessingThisNotification = true; - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_054: [processNotification will check for a null (note the lowercase) body by invoking pn_data_next.]*/ + bool keepProcessingThisMessage = true; + /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_054: [processMessage will check for a null (note the lowercase) body by invoking pn_data_next.]*/ if (pn_data_next(body) == false) { // @@ -1174,16 +1174,16 @@ } else { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_055: [If the body is NOT null then processNotification will check that the body is of type PN_BINARY.]*/ + /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_055: [If the body is NOT null then processMessage will check that the body is of type PN_BINARY.]*/ if (PN_BINARY != pn_data_type(body)) { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_056: [processNotification will abandon the notification with body types that are NOT PN_BINARY, the IOTHUB_MESSAGE_HANDLE will be destroyed and the loop will be continued.]*/ - LogError("Notification received via AMQP was not PN_BINARY\r\n"); + /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_056: [processMessage will abandon the message with body types that are NOT PN_BINARY, the IOTHUB_MESSAGE_HANDLE will be destroyed and the loop will be continued.]*/ + LogError("Message received via AMQP was not PN_BINARY\r\n"); sizeAndData.size = 0; // Compilers get upset. sizeAndData.start = NULL; pn_messenger_release(transportState->messenger, tracker, 0); pn_messenger_settle(transportState->messenger, tracker, 0); - keepProcessingThisNotification = false; + keepProcessingThisMessage = false; } else { @@ -1191,13 +1191,13 @@ sizeAndData = pn_data_get_binary(body); } } - if (keepProcessingThisNotification == true) + if (keepProcessingThisMessage == true) { IOTHUB_MESSAGE_HANDLE ioTMessage; /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_058: [The IOTHUB_MESSAGE_HANDLE will be set by invoking IotHubMessage_SetData with the previously saved length and pointer to payload.]*/ if ((ioTMessage = IoTHubMessage_CreateFromByteArray((const unsigned char*)sizeAndData.start, sizeAndData.size)) == NULL) { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_059: [processNotification will abandon the notification if IotHubMessage_SetData returns any status other than IOTHUB_MESSAGE_OK.]*/ + /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_059: [processMessage will abandon the message if IotHubMessage_SetData returns any status other than IOTHUB_MESSAGE_OK.]*/ LogError("Failed to set data for IoT hub message\r\n"); pn_messenger_release(transportState->messenger, tracker, 0); pn_messenger_settle(transportState->messenger, tracker, 0); @@ -1210,27 +1210,27 @@ } else { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_060: [If IOTHUB_MESSAGE_OK had been returned IoTHubClient_LL_NotificationCallback will be invoked with the IOTHUB_MESSAGE_HANDLE.]*/ - IOTHUBMESSAGE_DISPOSITION_RESULT upperLayerDisposition = IoTHubClient_LL_NotificationCallback(transportState->savedClientHandle, ioTMessage); + /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_060: [If IOTHUB_MESSAGE_OK had been returned IoTHubClient_LL_MessageCallback will be invoked with the IOTHUB_MESSAGE_HANDLE.]*/ + IOTHUBMESSAGE_DISPOSITION_RESULT upperLayerDisposition = IoTHubClient_LL_MessageCallback(transportState->savedClientHandle, ioTMessage); if (upperLayerDisposition == IOTHUBMESSAGE_ACCEPTED) { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_006: [If IoTHubClient_NotificationCallback returns IOTHUBMESSAGE_ACCEPTED value then the notification will be accepted.] */ + /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_006: [If IoTHubClient_MessageCallback returns IOTHUBMESSAGE_ACCEPTED value then the message will be accepted.] */ pn_messenger_accept(transportState->messenger, tracker, 0); } else if (upperLayerDisposition == IOTHUBMESSAGE_ABANDONED) { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_007: [If IoTHubClient_NotificationCallback returns IOTHUBMESSAGE_ABANDONED value then the notification will be abandoned.] */ + /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_007: [If IoTHubClient_MessageCallback returns IOTHUBMESSAGE_ABANDONED value then the message will be abandoned.] */ pn_messenger_release(transportState->messenger, tracker, 0); } else { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_061: [If IoTHubClient_NotificationCallback returns IOTHUBMESSAGE_REJECTED value then the notification will be rejected.]*/ + /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_061: [If IoTHubClient_MessageCallback returns IOTHUBMESSAGE_REJECTED value then the message will be rejected.]*/ pn_messenger_reject(transportState->messenger, tracker, 0); } pn_messenger_settle(transportState->messenger, tracker, 0); } - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_063: [processNotification will destroy the IOTHUB_MESSAGE_HANDLE by invoking IoTHubMessage_Destroy.]*/ + /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_063: [processMessage will destroy the IOTHUB_MESSAGE_HANDLE by invoking IoTHubMessage_Destroy.]*/ IoTHubMessage_Destroy(ioTMessage); } } @@ -1242,16 +1242,16 @@ PAMQP_TRANSPORT_STATE transportState = (PAMQP_TRANSPORT_STATE)handle; int receiveResult; - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_043: [If DoWork_PullNotification is false then DoWork will cancel any preceding notifications by invoking pn_messenger_recv.]*/ + /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_043: [If DoWork_PullMessage is false then DoWork will cancel any preceding messages by invoking pn_messenger_recv.]*/ - if ((transportState->DoWork_PullNotifications == false) && (transportState->waitingForPutTokenReply == false)) + if ((transportState->DoWork_PullMessages == false) && (transportState->waitingForPutTokenReply == false)) { if ((receiveResult = pn_messenger_recv(transportState->messenger, 0)) != 0) { if ((receiveResult != PN_INPROGRESS) && (receiveResult != PN_TIMEOUT)) { /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_046: [If pn_messenger_recv fails it will be logged.]*/ - LogError("Error attempting to receive notifications: %d\r\n", receiveResult); + LogError("Error attempting to receive messages: %d\r\n", receiveResult); /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_105: [processReceives will then go on to the next action of DoWork.]*/ transportState->messengerInitialized = false; } @@ -1259,13 +1259,13 @@ } else { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_044: [If the DoWork_PullNotification or the waitingForPutTokenReply flag is true then processReceives will pull notifications with pn_messenger_recv.]*/ + /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_044: [If the DoWork_PullMessage or the waitingForPutTokenReply flag is true then processReceives will pull messages with pn_messenger_recv.]*/ if ((receiveResult = pn_messenger_recv(transportState->messenger, PROTON_INCOMING_WINDOW_SIZE)) != 0) { if ((receiveResult != PN_INPROGRESS) && (receiveResult != PN_TIMEOUT)) { /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_046: [If pn_messenger_recv fails it will be logged.]*/ - LogError("Error attempting to receive notifications: %d\r\n", receiveResult); + LogError("Error attempting to receive messages: %d\r\n", receiveResult); /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_105: [processReceives will then go on to the next action of DoWork.]*/ transportState->messengerInitialized = false; } @@ -1276,7 +1276,7 @@ // If any are present we need to dispose of messages in the incoming queue. // size_t depthOfReceiveQueue; - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_045: [processReceives will check the number of notifications received by invoking pn_messenger_incoming.]*/ + /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_045: [processReceives will check the number of messages received by invoking pn_messenger_incoming.]*/ for (depthOfReceiveQueue = pn_messenger_incoming(transportState->messenger); depthOfReceiveQueue > 0; depthOfReceiveQueue--) { int result; @@ -1307,16 +1307,16 @@ // Currently we can receive two types of messages. We can recieve status messages from the $CBS that indicates the success // or failure of renewing the SAS for the device. // - // Otherwise we can recieve a simple notification. We can get a pointer to the subscription from the tracker. We can compare + // Otherwise we can recieve a simple message. We can get a pointer to the subscription from the tracker. We can compare // that pointer to the subscriptions that we stored when we initialized the messenger. We then dispatch appropriately. // /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_106: [processReceives acquires the subscription of the message by invoking pn_messneger_incoming_subscription.] This value is henceforth known as theMessageSource.*/ theMessageSource = pn_messenger_incoming_subscription(transportState->messenger); - if (theMessageSource == transportState->notificationSubscription) + if (theMessageSource == transportState->messageSubscription) { /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_108: [If theMessageSource is equal to the state variable cbsSubscription invoke processCBSReply.]*/ - processNotification(transportState, tracker); + processMessage(transportState, tracker); } else if (theMessageSource == transportState->cbsSubscription) { @@ -1382,11 +1382,11 @@ (STRING_concat(state->eventAddress, EVENT_ENDPOINT) != 0)) ? (false) : (true); } -static bool createNotificationAddress(PAMQP_TRANSPORT_STATE state) +static bool createMessageAddress(PAMQP_TRANSPORT_STATE state) { - return (((state->notificationAddress = STRING_construct(AMQPS_SCHEME)) == NULL) || - (STRING_concat_with_STRING(state->notificationAddress, state->devicesPortionPath) != 0) || - (STRING_concat(state->notificationAddress, NOTIFICATION_ENDPOINT) != 0)) ? (false) : (true); + return (((state->messageAddress = STRING_construct(AMQPS_SCHEME)) == NULL) || + (STRING_concat_with_STRING(state->messageAddress, state->devicesPortionPath) != 0) || + (STRING_concat(state->messageAddress, MESSAGE_ENDPOINT) != 0)) ? (false) : (true); } static bool createCbsAddress(PAMQP_TRANSPORT_STATE state, const char* host) @@ -1468,7 +1468,7 @@ } else { - amqpState->notificationAddress = NULL; + amqpState->messageAddress = NULL; amqpState->eventAddress = NULL; amqpState->urledDeviceId = NULL; amqpState->devicesPortionPath = NULL; @@ -1476,7 +1476,7 @@ amqpState->deviceKey = NULL; amqpState->savedClientHandle = NULL; amqpState->messenger = NULL; - amqpState->notificationSubscription = NULL; + amqpState->messageSubscription = NULL; amqpState->cbsSubscription = NULL; amqpState->zeroLengthString = NULL; amqpState->messengerInitialized = false; @@ -1490,7 +1490,7 @@ DList_InitializeListHead(&amqpState->workInProgress); DList_InitializeListHead(&amqpState->availableWorkItems); DList_InitializeListHead(&amqpState->messengerCorral); - amqpState->DoWork_PullNotifications = false; + amqpState->DoWork_PullMessages = false; amqpState->trustedCertificates = NULL; /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_003: [clientTransportAMQP_Create shall fail and return NULL if memory allocation of the transports basic internal state structures fails.]*/ if ((amqpState->message = pn_message()) == NULL) @@ -1530,14 +1530,14 @@ /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_091: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as urledDeviceId, by url encoding the config->deviceId.]*/ /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_093: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as devicePortionOfPath, from the following pieces: host + "/devices/" + urledDeviceId.]*/ /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_097: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as eventAddress, from the following pieces: "amqps://" + devicesPortionOfPath + "/messages/events".]*/ - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_099: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as notificationAddress, from the following pieces: "amqps://" + devicesPortionOfPath + "/messages/devicebound".]*/ + /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_099: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as messageAddress, from the following pieces: "amqps://" + devicesPortionOfPath + "/messages/devicebound".]*/ /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_101: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as cbsAddress, from the following pieces: "amqps://" + host + "/$cbs".]*/ /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_103: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as deviceKey, from the config->deviceKey.]*/ /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_186: [An empty string shall be created. If this fails then clientTransportAMQP shall fail and return NULL.]*/ if ((!createUrledDeviceId(amqpState, config)) || (!createDevicesPortionPath(amqpState, STRING_c_str(host))) || (!createEventAddress(amqpState)) || - (!createNotificationAddress(amqpState)) || + (!createMessageAddress(amqpState)) || (!createCbsAddress(amqpState, STRING_c_str(host))) || ((amqpState->deviceKey = STRING_construct(config->upperConfig->deviceKey)) == NULL) || ((amqpState->zeroLengthString = STRING_new()) == NULL)) @@ -1545,7 +1545,7 @@ /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_092: [If creating the urledDeviceId fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/ /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_094: [If creating the devicePortionOfPath fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/ /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_098: [If creating the eventAddress fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/ - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_100: [If creating the notificationAddress fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/ + /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_100: [If creating the messageAddress fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/ /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_102: [If creating the cbsAddress fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/ /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_104: [If creating the deviceKey fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/ clientTransportAMQP_Destroy(amqpState); @@ -1579,7 +1579,7 @@ { /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_014: [clientTransportAMQP_Subscribe shall succeed and return a value of zero.]*/ PAMQP_TRANSPORT_STATE transportState = (PAMQP_TRANSPORT_STATE)handle; - transportState->DoWork_PullNotifications = true; + transportState->DoWork_PullMessages = true; result = 0; } return result; @@ -1591,7 +1591,7 @@ /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_015: [clientTransportAMQP_Unsubscribe shall do nothing if handle is NULL.]*/ if (transportState) { - transportState->DoWork_PullNotifications = false; + transportState->DoWork_PullMessages = false; } } @@ -1688,7 +1688,7 @@ clientTransportAMQP_Create, clientTransportAMQP_Destroy, clientTransportAMQP_Subscribe, clientTransportAMQP_Unsubscribe, clientTransportAMQP_DoWork, clientTransportAMQP_GetSendStatus }; -extern const void* IoTHubTransportAmqp_ProvideTransportInterface(void) +extern const void* AMQP_Protocol(void) { return &myfunc; }