Microsoft Azure IoTHub client AMQP transport
Dependents: sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more
This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks
Diff: iothubtransportamqp.c
- Revision:
- 5:8d58d20699dd
- Parent:
- 4:57e049bce51e
- Child:
- 7:07bc440836b3
--- a/iothubtransportamqp.c Thu Oct 22 18:33:15 2015 -0700 +++ b/iothubtransportamqp.c Mon Nov 02 19:21:13 2015 -0800 @@ -104,6 +104,123 @@ } return result; } + +static int setRecvMessageIds(IOTHUB_MESSAGE_HANDLE ioTMessage, pn_message_t* msg) +{ + int result; + + // Function can not fail + pn_atom_t msgIdInfo = pn_message_get_id(msg); + if (msgIdInfo.type != PN_NULL) + { + if (msgIdInfo.type != PN_STRING) + { + LogError("Message Id Failure: Invalid message id Type %d\r\n", msgIdInfo.type); + result = __LINE__; + } + else + { + if (IoTHubMessage_SetMessageId(ioTMessage, msgIdInfo.u.as_bytes.start) != IOTHUB_MESSAGE_OK) + { + result = __LINE__; + } + else + { + result = 0; + } + } + } + else + { + // NULL values mean it's not set + result = 0; + } + + if (result == 0) + { + pn_atom_t corrIdInfo = pn_message_get_correlation_id(msg); + if (corrIdInfo.type != PN_NULL) + { + if (corrIdInfo.type != PN_STRING) + { + LogError("Message Id Failure: Invalid correlation id Type %d\r\n", corrIdInfo.type); + result = __LINE__; + } + else + { + if (IoTHubMessage_SetCorrelationId(ioTMessage, corrIdInfo.u.as_bytes.start) != IOTHUB_MESSAGE_OK) + { + result = __LINE__; + } + else + { + result = 0; + } + } + } + else + { + // NULL values mean it's not set + result = 0; + } + } + return result; +} + +static int setSendMessageIds(PDLIST_ENTRY messagesEntry, pn_message_t* msg) +{ + int result; + + IOTHUB_MESSAGE_LIST* currentMessage = containingRecord(messagesEntry->Flink, IOTHUB_MESSAGE_LIST, entry); + const char* messageId = IoTHubMessage_GetMessageId(currentMessage->messageHandle); + if (messageId != NULL) + { + pn_bytes_t dataBytes = pn_bytes(strlen(messageId), messageId); + pn_atom_t pnMsgId; + pnMsgId.type = PN_STRING; + pnMsgId.u.as_bytes = dataBytes; + if (pn_message_set_id(msg, pnMsgId) == 0) + { + result = 0; + } + else + { + LogError("Failure setting pn_message_set_id.\r\n"); + result = __LINE__; + } + } + else + { + result = 0; + } + + if (result == 0) + { + const char* correlationId = IoTHubMessage_GetCorrelationId(currentMessage->messageHandle); + if (correlationId != NULL) + { + pn_bytes_t dataBytes = pn_bytes(strlen(correlationId), correlationId); + pn_atom_t pnCorrelationId; + pnCorrelationId.type = PN_STRING; + pnCorrelationId.u.as_bytes = dataBytes; + if (pn_message_set_correlation_id(msg, pnCorrelationId) == 0) + { + result = 0; + } + else + { + LogError("Failure setting pn_message_set_correlation_id.\r\n"); + result = __LINE__; + } + } + else + { + result = 0; + } + } + return result; +} + /* Code_SRS_IOTHUBTRANSPORTTAMQP_07_001: [All IoTHubMessage_Properties shall be enumerated and entered in the pn_message_properties Map.] */ static int setProperties(PDLIST_ENTRY messagesMakingUpBatch, pn_message_t* msg) { @@ -464,12 +581,12 @@ STRING_delete(transportState->cbsAddress); STRING_delete(transportState->deviceKey); STRING_delete(transportState->zeroLengthString); - if (transportState->trustedCertificates != NULL) - { - free(transportState->trustedCertificates); - } + if (transportState->trustedCertificates != NULL) + { + free(transportState->trustedCertificates); + } - free(transportState); + free(transportState); } } @@ -711,15 +828,15 @@ /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_028: [If pn_messenger returns NULL then the messenger initialization fails.]*/ LogError("The messenger is not able to be created.\r\n"); } - /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_001: [If the option "TrustedCerts" has been set (length greater than 0), the trusted certificates string shall be passed to Proton by a call to pn_messenger_set_trusted_certificates.] */ - else if ((amqpState->trustedCertificates != NULL) && - (strlen(amqpState->trustedCertificates) > 0) && - (pn_messenger_set_trusted_certificates(amqpState->messenger, amqpState->trustedCertificates) != 0)) - { - /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_002: [If pn_messenger_set_trusted_certificates fails, then messenger initialization shall fail.] */ - LogError("unable to pass certificate information via pn_messenger_set_trusted_certificates\r\n"); - } - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_029: [pn_messenger_start shall be invoked.]*/ + /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_001: [If the option "TrustedCerts" has been set (length greater than 0), the trusted certificates string shall be passed to Proton by a call to pn_messenger_set_trusted_certificates.] */ + else if ((amqpState->trustedCertificates != NULL) && + (strlen(amqpState->trustedCertificates) > 0) && + (pn_messenger_set_trusted_certificates(amqpState->messenger, amqpState->trustedCertificates) != 0)) + { + /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_002: [If pn_messenger_set_trusted_certificates fails, then messenger initialization shall fail.] */ + LogError("unable to pass certificate information via pn_messenger_set_trusted_certificates\r\n"); + } + /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_029: [pn_messenger_start shall be invoked.]*/ else if (pn_messenger_start(amqpState->messenger) != 0) { /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_030: [If pn_messenger_start returns a non-zero value then messenger initialization fails.]*/ @@ -988,6 +1105,13 @@ /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_082: [sendEvent will then break out of the send loop.]*/ break; } + else if (setSendMessageIds(&headOfAvailable->eventMessages, transportState->message)) + { + /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_081: [sendEvent will take the item that had been the head of the waitingToSend and put it back at the head of the waitingToSend list.]*/ + putSecondListAfterHeadOfFirst(transportState->waitingToSend, &headOfAvailable->eventMessages); + /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_082: [sendEvent will then break out of the send loop.]*/ + break; + } else { /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_083: [If the proton API is successful then invoke pn_messenger_put.]*/ @@ -1204,7 +1328,14 @@ } else if (cloneProperties(ioTMessage, transportState->message) != 0) { - LogError("Failed to set data for IoT hub message\r\n"); + LogError("Failed to clone properties for IoT hub message\r\n"); + pn_messenger_release(transportState->messenger, tracker, 0); + pn_messenger_settle(transportState->messenger, tracker, 0); + } + /* Codes_SRS_IOTHUBTRANSPORTTAMQP_07_003: [If the pn_message_get_id value is not NULL then the value will be set by calling IotHubMessage_SetMessageId.] */ + else if (setRecvMessageIds(ioTMessage, transportState->message) != 0) + { + LogError("Failed to set MessageId for IoT hub message\r\n"); pn_messenger_release(transportState->messenger, tracker, 0); pn_messenger_settle(transportState->messenger, tracker, 0); } @@ -1491,7 +1622,7 @@ DList_InitializeListHead(&amqpState->availableWorkItems); DList_InitializeListHead(&amqpState->messengerCorral); amqpState->DoWork_PullMessages = false; - amqpState->trustedCertificates = NULL; + amqpState->trustedCertificates = NULL; /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_003: [clientTransportAMQP_Create shall fail and return NULL if memory allocation of the transports basic internal state structures fails.]*/ if ((amqpState->message = pn_message()) == NULL) { @@ -1638,47 +1769,47 @@ /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_002: [If parameter optionName is NULL then clientTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.] */ /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_003: [If parameter value is NULL then clientTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.] */ - if ( - (handle == NULL) || - (option == NULL) || - (value == NULL) - ) - { - result = IOTHUB_CLIENT_INVALID_ARG; - LogError("invalid parameter (NULL) passed to clientTransportAMQP_SetOption\r\n"); - } - else - { - /* Codes_SRS_IOTHUBTRANSPORTTAMQP_02_005: [clientTransportAMQP_SetOption shall set the option "optionName" to *value.] */ - /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_003: ["TrustedCerts"] */ - if (strcmp("TrustedCerts", option) == 0) - { - PAMQP_TRANSPORT_STATE handleData = (PAMQP_TRANSPORT_STATE)handle; - char* newTrsutedCertificates; + if ( + (handle == NULL) || + (option == NULL) || + (value == NULL) + ) + { + result = IOTHUB_CLIENT_INVALID_ARG; + LogError("invalid parameter (NULL) passed to clientTransportAMQP_SetOption\r\n"); + } + else + { + /* Codes_SRS_IOTHUBTRANSPORTTAMQP_02_005: [clientTransportAMQP_SetOption shall set the option "optionName" to *value.] */ + /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_003: ["TrustedCerts"] */ + if (strcmp("TrustedCerts", option) == 0) + { + PAMQP_TRANSPORT_STATE handleData = (PAMQP_TRANSPORT_STATE)handle; + char* newTrsutedCertificates; - if (mallocAndStrcpy_s(&newTrsutedCertificates, (const char*)value) != 0) - { - /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_006: [If any other error occurs while setting the option, clientTransportAMQP_SetOption shall return IOTHUB_CLIENT_ERROR.] */ - result = IOTHUB_CLIENT_ERROR; - } - else - { - /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_004: [Sets a string that should be used as trusted certificates by the transport, freeing any previous TrustedCerts option value.] */ - if (handleData->trustedCertificates != NULL) - { - free(handleData->trustedCertificates); - } + if (mallocAndStrcpy_s(&newTrsutedCertificates, (const char*)value) != 0) + { + /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_006: [If any other error occurs while setting the option, clientTransportAMQP_SetOption shall return IOTHUB_CLIENT_ERROR.] */ + result = IOTHUB_CLIENT_ERROR; + } + else + { + /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_004: [Sets a string that should be used as trusted certificates by the transport, freeing any previous TrustedCerts option value.] */ + if (handleData->trustedCertificates != NULL) + { + free(handleData->trustedCertificates); + } - handleData->trustedCertificates = newTrsutedCertificates; - result = IOTHUB_CLIENT_OK; - } - } - else - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_004: [If optionName is not an option supported then clientTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.]*/ - result = IOTHUB_CLIENT_INVALID_ARG; - } - } + handleData->trustedCertificates = newTrsutedCertificates; + result = IOTHUB_CLIENT_OK; + } + } + else + { + /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_004: [If optionName is not an option supported then clientTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.]*/ + result = IOTHUB_CLIENT_INVALID_ARG; + } + } return result; }