Microsoft Azure IoTHub client MQTT transport
Dependents: STM32F746_iothub_client_sample_mqtt FXOS8700CQ_To_Azure_IoT f767zi_mqtt FXOS8700CQ_To_Azure_IoT ... more
Diff: iothubtransport_mqtt_common.c
- Revision:
- 14:4dc2b011be33
- Parent:
- 13:606465879c57
- Child:
- 15:81c95c94c8d7
--- a/iothubtransport_mqtt_common.c Wed Dec 14 15:59:37 2016 -0800 +++ b/iothubtransport_mqtt_common.c Sun Jan 08 11:11:42 2017 -0800 @@ -49,6 +49,8 @@ #define ERROR_TIME_FOR_RETRY_SECS 5 #define WAIT_TIME_SECS (ERROR_TIME_FOR_RETRY_SECS - 1) +#define TOUPPER(c) (((c>='a') && (c<='z'))?c-'a'+'A':c) + static const char TOPIC_DEVICE_TWIN_PREFIX[] = "$iothub/twin"; static const char TOPIC_DEVICE_METHOD_PREFIX[] = "$iothub/methods"; @@ -66,7 +68,7 @@ static const char* PROPERTY_SEPARATOR = "&"; static const char* REPORTED_PROPERTIES_TOPIC = "$iothub/twin/PATCH/properties/reported/?$rid=%"PRIu16; static const char* GET_PROPERTIES_TOPIC = "$iothub/twin/GET/?$rid=%"PRIu16; -static const char* DEVICE_METHOD_RESPONSE_TOPIC = "$iothub/methods/res/%d/?$rid=%"PRIu16; +static const char* DEVICE_METHOD_RESPONSE_TOPIC = "$iothub/methods/res/%d/?$rid=%s"; static const char* REQUEST_ID_PROPERTY = "?$rid="; @@ -231,6 +233,11 @@ DLIST_ENTRY entry; } MQTT_MESSAGE_DETAILS_LIST, *PMQTT_MESSAGE_DETAILS_LIST; +typedef struct DEVICE_METHOD_INFO_TAG +{ + STRING_HANDLE request_id; +} DEVICE_METHOD_INFO; + static int RetryPolicy_Exponential_BackOff_With_Jitter(bool *permit, size_t* delay, void* retryContextCallback) { int result; @@ -558,7 +565,7 @@ } } -static int retrieve_device_method_rid_info(const char* resp_topic, STRING_HANDLE method_name, uint16_t* request_id) +static int retrieve_device_method_rid_info(const char* resp_topic, STRING_HANDLE method_name, STRING_HANDLE request_id) { int result; STRING_TOKENIZER_HANDLE token_handle = STRING_TOKENIZER_create_from_char(resp_topic); @@ -566,7 +573,6 @@ { LogError("Failed creating token from device twin topic."); result = __LINE__; - *request_id = 0; } else { @@ -575,7 +581,6 @@ { LogError("Failed allocating new string ."); result = __LINE__; - *request_id = 0; } else { @@ -588,8 +593,8 @@ { if (STRING_concat_with_STRING(method_name, token_value) != 0) { + LogError("Failed STRING_concat_with_STRING."); result = __LINE__; - *request_id = 0; break; } } @@ -600,8 +605,15 @@ const char* request_id_value = STRING_c_str(token_value); if (memcmp(request_id_value, REQUEST_ID_PROPERTY, request_id_length) == 0) { - *request_id = (uint16_t)atol(request_id_value+request_id_length); - result = 0; + if (STRING_concat(request_id, request_id_value+request_id_length) != 0) + { + LogError("Failed STRING_concat failed."); + result = __LINE__; + } + else + { + result = 0; + } break; } } @@ -710,7 +722,7 @@ { break; } - else if (toupper(TOPIC_DEVICE_TWIN_PREFIX[index]) != toupper(topic_resp[index])) + else if (TOUPPER(TOPIC_DEVICE_TWIN_PREFIX[index]) != TOUPPER(topic_resp[index])) { search_device_twin = false; } @@ -728,7 +740,7 @@ { break; } - else if (toupper(TOPIC_DEVICE_METHOD_PREFIX[index]) != toupper(topic_resp[index])) + else if (TOUPPER(TOPIC_DEVICE_METHOD_PREFIX[index]) != TOUPPER(topic_resp[index])) { search_device_method = false; } @@ -835,12 +847,12 @@ return result; } -static int publish_device_method_message(MQTTTRANSPORT_HANDLE_DATA* transport_data, int status_code, uint16_t request_id, BUFFER_HANDLE response) +static int publish_device_method_message(MQTTTRANSPORT_HANDLE_DATA* transport_data, int status_code, STRING_HANDLE request_id, const unsigned char* response, size_t response_size) { int result; uint16_t packet_id = get_next_packet_id(transport_data); - STRING_HANDLE msg_topic = STRING_construct_sprintf(DEVICE_METHOD_RESPONSE_TOPIC, status_code, request_id); + STRING_HANDLE msg_topic = STRING_construct_sprintf(DEVICE_METHOD_RESPONSE_TOPIC, status_code, STRING_c_str(request_id) ); if (msg_topic == NULL) { LogError("Failed constructing message topic."); @@ -848,7 +860,7 @@ } else { - MQTT_MESSAGE_HANDLE mqtt_get_msg = mqttmessage_create(packet_id, STRING_c_str(msg_topic), DELIVER_AT_MOST_ONCE, BUFFER_u_char(response), BUFFER_length(response)); + MQTT_MESSAGE_HANDLE mqtt_get_msg = mqttmessage_create(packet_id, STRING_c_str(msg_topic), DELIVER_AT_MOST_ONCE, response, response_size); if (mqtt_get_msg == NULL) { LogError("Failed constructing mqtt message."); @@ -1088,8 +1100,10 @@ static void mqtt_notification_callback(MQTT_MESSAGE_HANDLE msgHandle, void* callbackCtx) { + /* Tests_SRS_IOTHUB_MQTT_TRANSPORT_07_051: [ If msgHandle or callbackCtx is NULL, mqtt_notification_callback shall do nothing. ] */ if (msgHandle != NULL && callbackCtx != NULL) { + /* Tests_SRS_IOTHUB_MQTT_TRANSPORT_07_052: [ mqtt_notification_callback shall extract the topic Name from the MQTT_MESSAGE_HANDLE. ] */ const char* topic_resp = mqttmessage_getTopicName(msgHandle); if (topic_resp == NULL) { @@ -1129,10 +1143,12 @@ (void)DList_RemoveEntryList(dev_twin_item); if (msg_entry->device_twin_msg_type == RETRIEVE_PROPERTIES) { + /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_054: [ If type is IOTHUB_TYPE_DEVICE_TWIN, then on success if msg_type is RETRIEVE_PROPERTIES then mqtt_notification_callback shall call IoTHubClient_LL_RetrievePropertyComplete... ] */ IoTHubClient_LL_RetrievePropertyComplete(transportData->llClientHandle, DEVICE_TWIN_UPDATE_COMPLETE, payload->message, payload->length); } else { + /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_055: [ if device_twin_msg_type is not RETRIEVE_PROPERTIES then mqtt_notification_callback shall call IoTHubClient_LL_ReportedStateComplete ] */ IoTHubClient_LL_ReportedStateComplete(transportData->llClientHandle, msg_entry->iothub_msg_id, status_code); } free(msg_entry); @@ -1145,7 +1161,6 @@ } else if (type == IOTHUB_TYPE_DEVICE_METHODS) { - uint16_t request_id; STRING_HANDLE method_name = STRING_new(); if (method_name == NULL) { @@ -1153,24 +1168,36 @@ } else { - BUFFER_HANDLE result_buffer; - if (retrieve_device_method_rid_info(topic_resp, method_name, &request_id) != 0) + DEVICE_METHOD_INFO* dev_method_info = malloc(sizeof(DEVICE_METHOD_INFO) ); + if (dev_method_info == NULL) { - LogError("Failure: retrieve device topic info"); - } - else if ((result_buffer = BUFFER_new()) == NULL) - { - LogError("Failure: retrieve device topic info"); + LogError("Failure: allocating DEVICE_METHOD_INFO object"); } else { - const APP_PAYLOAD* payload = mqttmessage_getApplicationMsg(msgHandle); - int status_code = IoTHubClient_LL_DeviceMethodComplete(transportData->llClientHandle, STRING_c_str(method_name), payload->message, payload->length, result_buffer); - if (publish_device_method_message(transportData, status_code, request_id, result_buffer) != 0) + dev_method_info->request_id = STRING_new(); + if (dev_method_info->request_id == NULL) + { + LogError("Failure constructing request_id string"); + free(dev_method_info); + } + else if (retrieve_device_method_rid_info(topic_resp, method_name, dev_method_info->request_id) != 0) { - LogError("Failure: publishing device method response"); + LogError("Failure: retrieve device topic info"); + STRING_delete(dev_method_info->request_id); + free(dev_method_info); } - BUFFER_delete(result_buffer); + else + { + /* CodesSRS_IOTHUB_MQTT_TRANSPORT_07_053: [ If type is IOTHUB_TYPE_DEVICE_METHODS, then on success mqtt_notification_callback shall call IoTHubClient_LL_DeviceMethodComplete. ] */ + const APP_PAYLOAD* payload = mqttmessage_getApplicationMsg(msgHandle); + if (IoTHubClient_LL_DeviceMethodComplete(transportData->llClientHandle, STRING_c_str(method_name), payload->message, payload->length, (void*)dev_method_info) != 0) + { + LogError("Failure: IoTHubClient_LL_DeviceMethodComplete"); + STRING_delete(dev_method_info->request_id); + free(dev_method_info); + } + } } STRING_delete(method_name); } @@ -1192,6 +1219,7 @@ } else { + /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_056: [ If type is IOTHUB_TYPE_TELEMETRY, then on success mqtt_notification_callback shall call IoTHubClient_LL_MessageCallback. ] */ if (IoTHubClient_LL_MessageCallback(transportData->llClientHandle, IoTHubMessage) != IOTHUBMESSAGE_ACCEPTED) { LogError("Event not accepted by our client."); @@ -2156,6 +2184,44 @@ } } +int IoTHubTransport_MQTT_Common_DeviceMethod_Response(IOTHUB_DEVICE_HANDLE handle, METHOD_HANDLE methodId, const unsigned char* response, size_t respSize, int status) +{ + int result; + MQTTTRANSPORT_HANDLE_DATA* transport_data = (MQTTTRANSPORT_HANDLE_DATA*)handle; + if (transport_data != NULL) + { + /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_042: [ IoTHubTransport_MQTT_Common_DeviceMethod_Response shall publish an mqtt message for the device method response. ] */ + DEVICE_METHOD_INFO* dev_method_info = (DEVICE_METHOD_INFO*)methodId; + if (dev_method_info == NULL) + { + LogError("Failure: DEVICE_METHOD_INFO was NULL"); + result = __LINE__; + } + else + { + if (publish_device_method_message(transport_data, status, dev_method_info->request_id, response, respSize) != 0) + { + /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_051: [ If any error is encountered, IoTHubTransport_MQTT_Common_DeviceMethod_Response shall return a non-zero value. ] */ + LogError("Failure: publishing device method response"); + result = __LINE__; + } + else + { + result = 0; + } + STRING_delete(dev_method_info->request_id); + free(dev_method_info); + } + } + else + { + /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_041: [ If the parameter handle is NULL than IoTHubTransport_MQTT_Common_DeviceMethod_Response shall return a non-zero value. ] */ + result = __LINE__; + LogError("Failure: invalid IOTHUB_DEVICE_HANDLE parameter specified"); + } + return result; +} + int IoTHubTransport_MQTT_Common_Subscribe(IOTHUB_DEVICE_HANDLE handle) { int result; @@ -2577,9 +2643,9 @@ return result; } -// Codes_SRS_IOTHUB_MQTT_TRANSPORT_17_005: [ IoTHubTransport_MQTT_Common_Unregister shall return. ] void IoTHubTransport_MQTT_Common_Unregister(IOTHUB_DEVICE_HANDLE deviceHandle) { + // Codes_SRS_IOTHUB_MQTT_TRANSPORT_17_005: [ If deviceHandle is NULL `IoTHubTransport_MQTT_Common_Unregister` shall do nothing. ] if (deviceHandle != NULL) { MQTTTRANSPORT_HANDLE_DATA* transport_data = (MQTTTRANSPORT_HANDLE_DATA*)deviceHandle;