A small footprint MQTT library
Dependents: STM32F746_iothub_client_sample_mqtt FXOS8700CQ_To_Azure_IoT f767zi_mqtt FXOS8700CQ_To_Azure_IoT ... more
Diff: mqtt_client.c
- Revision:
- 9:37d14c31ff6e
- Parent:
- 8:83bb166aba73
- Child:
- 10:2ab268507775
--- a/mqtt_client.c Thu Oct 20 17:07:55 2016 -0700 +++ b/mqtt_client.c Wed Nov 16 21:38:15 2016 -0800 @@ -9,6 +9,7 @@ #include "azure_c_shared_utility/xlogging.h" #include "azure_c_shared_utility/strings.h" #include "azure_c_shared_utility/agenttime.h" +#include "azure_c_shared_utility/threadapi.h" #include "azure_umqtt_c/mqtt_client.h" #include "azure_umqtt_c/mqtt_codec.h" @@ -23,6 +24,7 @@ #define CONNECT_PACKET_MASK 0xf0 #define TIME_MAX_BUFFER 16 #define DEFAULT_MAX_PING_RESPONSE_TIME 80 // % of time to send pings +#define MAX_CLOSE_RETRIES 10 static const char* FORMAT_HEX_CHAR = "0x%02x "; static const char* TRUE_CONST = "true"; @@ -40,6 +42,8 @@ ON_MQTT_OPERATION_CALLBACK fnOperationCallback; ON_MQTT_MESSAGE_RECV_CALLBACK fnMessageRecv; void* ctx; + ON_MQTT_ERROR_CALLBACK fnOnErrorCallBack; + void* errorCBCtx; QOS_VALUE qosValue; uint16_t keepAliveInterval; MQTT_CLIENT_OPTIONS mqttOptions; @@ -51,6 +55,37 @@ uint16_t maxPingRespTime; } MQTT_CLIENT; +static void on_connection_closed(void* context) +{ + size_t* close_complete = (size_t*)context; + *close_complete = 1; +} + +static void close_connection(MQTT_CLIENT* mqtt_client) +{ + size_t close_complete = 0; + (void)xio_close(mqtt_client->xioHandle, on_connection_closed, &close_complete); + + size_t counter = 0; + do + { + xio_dowork(mqtt_client->xioHandle); + counter++; + ThreadAPI_Sleep(10); + } while (close_complete == 0 && counter < MAX_CLOSE_RETRIES); + mqtt_client->socketConnected = false; + mqtt_client->clientConnected = false; +} + +static void set_error_callback(MQTT_CLIENT* mqtt_client, MQTT_CLIENT_EVENT_ERROR error_type) +{ + if (mqtt_client->fnOnErrorCallBack) + { + mqtt_client->fnOnErrorCallBack(mqtt_client, error_type, mqtt_client->errorCBCtx); + } + close_connection(mqtt_client); +} + static STRING_HANDLE construct_trace_log_handle(MQTT_CLIENT* mqtt_client) { STRING_HANDLE trace_log; @@ -127,29 +162,30 @@ static void sendComplete(void* context, IO_SEND_RESULT send_result) { MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context; - if (mqtt_client != NULL && mqtt_client->fnOperationCallback != NULL && send_result == IO_SEND_OK) + if (mqtt_client != NULL) { - if (mqtt_client->packetState == DISCONNECT_TYPE) + if (send_result == IO_SEND_OK) { - /*Codes_SRS_MQTT_CLIENT_07_032: [If the actionResult parameter is of type MQTT_CLIENT_ON_DISCONNECT or MQTT_CLIENT_ON_ERROR the the msgInfo value shall be NULL.]*/ - mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_DISCONNECT, NULL, mqtt_client->ctx); - - // close the xio - if (xio_close(mqtt_client->xioHandle, NULL, mqtt_client->ctx) != 0) + if (mqtt_client->packetState == DISCONNECT_TYPE) { - LOG(LOG_ERROR, LOG_LINE, "MQTT xio_close failed to return a successful result."); + /*Codes_SRS_MQTT_CLIENT_07_032: [If the actionResult parameter is of type MQTT_CLIENT_ON_DISCONNECT the the msgInfo value shall be NULL.]*/ + if (mqtt_client->fnOperationCallback != NULL) + { + mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_DISCONNECT, NULL, mqtt_client->ctx); + } + // close the xio + close_connection(mqtt_client); } - mqtt_client->socketConnected = false; - mqtt_client->clientConnected = false; + } + else if (send_result == IO_SEND_ERROR) + { + LOG(LOG_ERROR, LOG_LINE, "MQTT Send Complete Failure send_result: %d", (int)send_result); + set_error_callback(mqtt_client, MQTT_CLIENT_COMMUNICATION_ERROR); } } else { - LOG(LOG_ERROR, LOG_LINE, "MQTT Send Complete Failure send_result: %d", (int)send_result); - if (mqtt_client->fnOperationCallback) - { - mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_ERROR, NULL, mqtt_client->ctx); - } + LOG(LOG_ERROR, LOG_LINE, "MQTT Send Complete Failure with NULL mqtt_client"); } } @@ -331,10 +367,11 @@ } else { - if (mqtt_client->fnOperationCallback) + if (mqtt_client->socketConnected == false && mqtt_client->fnOnErrorCallBack) { - mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_ERROR, NULL, mqtt_client->ctx); + mqtt_client->fnOnErrorCallBack(mqtt_client, MQTT_CLIENT_CONNECTION_ERROR, mqtt_client->errorCBCtx); } + close_connection(mqtt_client); } } else @@ -350,10 +387,7 @@ { if (mqtt_codec_bytesReceived(mqtt_client->codec_handle, buffer, size) != 0) { - if (mqtt_client->fnOperationCallback) - { - mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_ERROR, NULL, mqtt_client->ctx); - } + set_error_callback(mqtt_client, MQTT_CLIENT_PARSE_ERROR); } } else @@ -367,11 +401,9 @@ MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context; if (mqtt_client != NULL && mqtt_client->fnOperationCallback) { - /*Codes_SRS_MQTT_CLIENT_07_032: [If the actionResult parameter is of type MQTT_CLIENT_ON_DISCONNECT or MQTT_CLIENT_ON_ERROR the the msgInfo value shall be NULL.]*/ - mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_ERROR, NULL, mqtt_client->ctx); - mqtt_client->socketConnected = false; + /*Codes_SRS_MQTT_CLIENT_07_032: [If the actionResult parameter is of type MQTT_CLIENT_ON_DISCONNECT the the msgInfo value shall be NULL.]*/ /* Codes_SRS_MQTT_CLIENT_07_036: [ If an error is encountered by the ioHandle the mqtt_client shall call xio_close. ] */ - (void)xio_close(mqtt_client->xioHandle, NULL, NULL); + set_error_callback(mqtt_client, MQTT_CLIENT_CONNECTION_ERROR); } else { @@ -506,10 +538,7 @@ if (topicName == NULL) { LOG(LOG_ERROR, LOG_LINE, "Publish MSG: failure reading topic name"); - if (mqtt_client->fnOperationCallback) - { - mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_ERROR, NULL, mqtt_client->ctx); - } + set_error_callback(mqtt_client, MQTT_CLIENT_PARSE_ERROR); if (trace_log != NULL) { STRING_delete(trace_log); @@ -537,10 +566,7 @@ if (msgHandle == NULL) { LOG(LOG_ERROR, LOG_LINE, "failure in mqttmessage_create"); - if (mqtt_client->fnOperationCallback) - { - mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_ERROR, NULL, mqtt_client->ctx); - } + set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR); if (trace_log != NULL) { STRING_delete(trace_log); } @@ -551,10 +577,7 @@ mqttmessage_setIsRetained(msgHandle, isRetainMsg) != 0) { LOG(LOG_ERROR, LOG_LINE, "failure setting mqtt message property"); - if (mqtt_client->fnOperationCallback) - { - mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_ERROR, NULL, mqtt_client->ctx); - } + set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR); if (trace_log != NULL) { STRING_delete(trace_log); } @@ -577,10 +600,7 @@ if (pubRel == NULL) { LOG(LOG_ERROR, LOG_LINE, "Failed to allocate publish receive message."); - if (mqtt_client->fnOperationCallback) - { - mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_ERROR, NULL, mqtt_client->ctx); - } + set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR); } } else if (qosValue == DELIVER_AT_LEAST_ONCE) @@ -589,10 +609,7 @@ if (pubRel == NULL) { LOG(LOG_ERROR, LOG_LINE, "Failed to allocate publish ack message."); - if (mqtt_client->fnOperationCallback) - { - mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_ERROR, NULL, mqtt_client->ctx); - } + set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR); } } if (pubRel != NULL) @@ -642,10 +659,7 @@ if (pubRel == NULL) { LOG(LOG_ERROR, LOG_LINE, "Failed to allocate publish release message."); - if (mqtt_client->fnOperationCallback) - { - mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_ERROR, NULL, mqtt_client->ctx); - } + set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR); } } else if (packet == PUBREL_TYPE) @@ -654,10 +668,7 @@ if (pubRel == NULL) { LOG(LOG_ERROR, LOG_LINE, "Failed to allocate publish complete message."); - if (mqtt_client->fnOperationCallback) - { - mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_ERROR, NULL, mqtt_client->ctx); - } + set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR); } } if (pubRel != NULL) @@ -711,10 +722,7 @@ else { LOG(LOG_ERROR, LOG_LINE, "allocation of quality of service value failed."); - if (mqtt_client->fnOperationCallback) - { - mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_ERROR, NULL, mqtt_client->ctx); - } + set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR); } } break; @@ -757,7 +765,7 @@ } } -MQTT_CLIENT_HANDLE mqtt_client_init(ON_MQTT_MESSAGE_RECV_CALLBACK msgRecv, ON_MQTT_OPERATION_CALLBACK opCallback, void* callbackCtx) +MQTT_CLIENT_HANDLE mqtt_client_init(ON_MQTT_MESSAGE_RECV_CALLBACK msgRecv, ON_MQTT_OPERATION_CALLBACK opCallback, void* opCallbackCtx, ON_MQTT_ERROR_CALLBACK onErrorCallBack, void* errorCBCtx) { MQTT_CLIENT* result; /*Codes_SRS_MQTT_CLIENT_07_001: [If the parameters ON_MQTT_MESSAGE_RECV_CALLBACK is NULL then mqttclient_init shall return NULL.]*/ @@ -780,8 +788,10 @@ result->packetState = UNKNOWN_TYPE; result->packetSendTimeMs = 0; result->fnOperationCallback = opCallback; + result->ctx = opCallbackCtx; result->fnMessageRecv = msgRecv; - result->ctx = callbackCtx; + result->fnOnErrorCallBack = onErrorCallBack; + result->errorCBCtx = errorCBCtx; result->qosValue = DELIVER_AT_MOST_ONCE; result->keepAliveInterval = 0; result->packetTickCntr = tickcounter_create(); @@ -1099,16 +1109,11 @@ } else { - /* Codes_SRS_MQTT_CLIENT_07_035: [If the timeSincePing has expired past the maxPingRespTime then mqtt_client_dowork shall call the Operation Callback function with the message MQTT_CLIENT_NO_PING_RESPONSE] */ + /* Codes_SRS_MQTT_CLIENT_07_035: [If the timeSincePing has expired past the maxPingRespTime then mqtt_client_dowork shall call the Error Callback function with the message MQTT_CLIENT_NO_PING_RESPONSE] */ if (mqtt_client->timeSincePing > 0 && ((current_ms - mqtt_client->timeSincePing)/1000) > mqtt_client->maxPingRespTime) { // We haven't gotten a ping response in the alloted time - if (mqtt_client->fnOperationCallback != NULL) - { - mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_NO_PING_RESPONSE, NULL, mqtt_client->ctx); - } - mqtt_client->socketConnected = false; - mqtt_client->clientConnected = false; + set_error_callback(mqtt_client, MQTT_CLIENT_NO_PING_RESPONSE); mqtt_client->timeSincePing = 0; mqtt_client->packetSendTimeMs = 0; mqtt_client->packetState = UNKNOWN_TYPE;