A small footprint MQTT library
Dependents: STM32F746_iothub_client_sample_mqtt FXOS8700CQ_To_Azure_IoT f767zi_mqtt FXOS8700CQ_To_Azure_IoT ... more
Diff: mqtt_client.c
- Revision:
- 3:9b4e7158ca0d
- Parent:
- 1:8dba42ff9701
- Child:
- 4:e7167dabd6e4
--- a/mqtt_client.c Mon May 09 14:37:25 2016 -0700 +++ b/mqtt_client.c Fri Jun 17 17:03:06 2016 -0700 @@ -19,6 +19,7 @@ #define DUPLICATE_FLAG_MASK 0x8 #define CONNECT_PACKET_MASK 0xf0 #define TIME_MAX_BUFFER 16 +#define DEFAULT_MAX_PING_RESPONSE_TIME 90 static const char* FORMAT_HEX_CHAR = "0x%02x "; @@ -40,6 +41,8 @@ bool socketConnected; bool logTrace; bool rawBytesTrace; + uint64_t timeSincePing; + uint16_t maxPingRespTime; } MQTT_CLIENT; static uint16_t byteutil_read_uint16(uint8_t** buffer) @@ -162,17 +165,27 @@ static void logIncomingMsgTrace(MQTT_CLIENT* clientData, CONTROL_PACKET_TYPE packet, int flags, const uint8_t* data, size_t length) { - if (clientData != NULL && data != NULL && length > 0 && clientData->logTrace) + if (clientData != NULL && clientData->logTrace) { - char tmBuffer[TIME_MAX_BUFFER]; - getLogTime(tmBuffer, TIME_MAX_BUFFER); + if (data != NULL && length > 0) + { + char tmBuffer[TIME_MAX_BUFFER]; + getLogTime(tmBuffer, TIME_MAX_BUFFER); - LOG(clientData->logFunc, 0, "<- %s %s: 0x%02x 0x%02x ", tmBuffer, retrievePacketType((unsigned char)packet), (unsigned char)(packet | flags), length); - for (size_t index = 0; index < length; index++) + LOG(clientData->logFunc, 0, "<- %s %s: 0x%02x 0x%02x ", tmBuffer, retrievePacketType((unsigned char)packet), (unsigned char)(packet | flags), length); + for (size_t index = 0; index < length; index++) + { + LOG(clientData->logFunc, 0, (char*)FORMAT_HEX_CHAR, data[index]); + } + LOG(clientData->logFunc, LOG_LINE, ""); + } + else if (packet == PINGRESP_TYPE) { - LOG(clientData->logFunc, 0, (char*)FORMAT_HEX_CHAR, data[index]); + char tmBuffer[TIME_MAX_BUFFER]; + getLogTime(tmBuffer, TIME_MAX_BUFFER); + + LOG(clientData->logFunc, LOG_LINE, "<- %s %s: 0x%02x 0x%02x ", tmBuffer, retrievePacketType((unsigned char)packet), (unsigned char)(packet | flags), length); } - LOG(clientData->logFunc, LOG_LINE, ""); } } @@ -317,161 +330,162 @@ static void recvCompleteCallback(void* context, CONTROL_PACKET_TYPE packet, int flags, BUFFER_HANDLE headerData) { MQTT_CLIENT* mqttData = (MQTT_CLIENT*)context; - if (mqttData != NULL && headerData != NULL) + if ((mqttData != NULL && headerData != NULL) || packet == PINGRESP_TYPE) { size_t len = BUFFER_length(headerData); uint8_t* iterator = BUFFER_u_char(headerData); logIncomingMsgTrace(mqttData, packet, flags, iterator, len); - if (iterator != NULL && len > 0) + if ((iterator != NULL && len > 0) || packet == PINGRESP_TYPE) { switch (packet) { - case CONNACK_TYPE: - { - if (mqttData->fnOperationCallback != NULL) + case CONNACK_TYPE: { - /*Codes_SRS_MQTT_CLIENT_07_028: [If the actionResult parameter is of type CONNECT_ACK then the msgInfo value shall be a CONNECT_ACK structure.]*/ - CONNECT_ACK connack = { 0 }; - connack.isSessionPresent = (byteutil_readByte(&iterator) == 0x1) ? true : false; - connack.returnCode = byteutil_readByte(&iterator); - - mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_CONNACK, (void*)&connack, mqttData->ctx); + if (mqttData->fnOperationCallback != NULL) + { + /*Codes_SRS_MQTT_CLIENT_07_028: [If the actionResult parameter is of type CONNECT_ACK then the msgInfo value shall be a CONNECT_ACK structure.]*/ + CONNECT_ACK connack = { 0 }; + connack.isSessionPresent = (byteutil_readByte(&iterator) == 0x1) ? true : false; + connack.returnCode = byteutil_readByte(&iterator); - if (connack.returnCode == CONNECTION_ACCEPTED) - { - mqttData->clientConnected = true; + mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_CONNACK, (void*)&connack, mqttData->ctx); + + if (connack.returnCode == CONNECTION_ACCEPTED) + { + mqttData->clientConnected = true; + } } + break; } - break; - } - case PUBLISH_TYPE: - { - if (mqttData->fnMessageRecv != NULL) + case PUBLISH_TYPE: { - //uint8_t ctrlPacket = byteutil_readByte(&iterator); - bool isDuplicateMsg = (flags & DUPLICATE_FLAG_MASK) ? true : false; - bool isRetainMsg = (flags & RETAIN_FLAG_MASK) ? true : false; - QOS_VALUE qosValue = (flags == 0) ? DELIVER_AT_MOST_ONCE : (flags & QOS_LEAST_ONCE_FLAG_MASK) ? DELIVER_AT_LEAST_ONCE : DELIVER_EXACTLY_ONCE; + if (mqttData->fnMessageRecv != NULL) + { + //uint8_t ctrlPacket = byteutil_readByte(&iterator); + bool isDuplicateMsg = (flags & DUPLICATE_FLAG_MASK) ? true : false; + bool isRetainMsg = (flags & RETAIN_FLAG_MASK) ? true : false; + QOS_VALUE qosValue = (flags == 0) ? DELIVER_AT_MOST_ONCE : (flags & QOS_LEAST_ONCE_FLAG_MASK) ? DELIVER_AT_LEAST_ONCE : DELIVER_EXACTLY_ONCE; + + uint8_t* initialPos = iterator; + char* topicName = byteutil_readUTF(&iterator, NULL); + uint16_t packetId = 0; + if (qosValue != DELIVER_AT_MOST_ONCE) + { + packetId = byteutil_read_uint16(&iterator); + } + size_t length = len - (iterator - initialPos); - uint8_t* initialPos = iterator; - char* topicName = byteutil_readUTF(&iterator, NULL); - uint16_t packetId = 0; - if (qosValue != DELIVER_AT_MOST_ONCE) - { - packetId = byteutil_read_uint16(&iterator); + MQTT_MESSAGE_HANDLE msgHandle = mqttmessage_create(packetId, topicName, qosValue, iterator, length); + if (msgHandle == NULL) + { + LOG(mqttData->logFunc, LOG_LINE, "failure in mqttmessage_create"); + } + else + { + (void)mqttmessage_setIsDuplicateMsg(msgHandle, isDuplicateMsg); + (void)mqttmessage_setIsRetained(msgHandle, isRetainMsg); + mqttData->fnMessageRecv(msgHandle, mqttData->ctx); + + BUFFER_HANDLE pubRel = NULL; + if (qosValue == DELIVER_EXACTLY_ONCE) + { + pubRel = mqtt_codec_publishReceived(packetId); + } + else if (qosValue == DELIVER_AT_LEAST_ONCE) + { + pubRel = mqtt_codec_publishAck(packetId); + } + if (pubRel != NULL) + { + (void)sendPacketItem(mqttData, BUFFER_u_char(pubRel), BUFFER_length(pubRel)); + BUFFER_delete(pubRel); + } + free(topicName); + mqttmessage_destroy(msgHandle); + } } - size_t length = len - (iterator - initialPos); - - MQTT_MESSAGE_HANDLE msgHandle = mqttmessage_create(packetId, topicName, qosValue, iterator, length); - if (msgHandle == NULL) + break; + } + case PUBACK_TYPE: + case PUBREC_TYPE: + case PUBREL_TYPE: + case PUBCOMP_TYPE: + { + if (mqttData->fnOperationCallback) { - LOG(mqttData->logFunc, LOG_LINE, "failure in mqttmessage_create"); - } - else - { - (void)mqttmessage_setIsDuplicateMsg(msgHandle, isDuplicateMsg); - (void)mqttmessage_setIsRetained(msgHandle, isRetainMsg); - mqttData->fnMessageRecv(msgHandle, mqttData->ctx); + /*Codes_SRS_MQTT_CLIENT_07_029: [If the actionResult parameter are of types PUBACK_TYPE, PUBREC_TYPE, PUBREL_TYPE or PUBCOMP_TYPE then the msgInfo value shall be a PUBLISH_ACK structure.]*/ + MQTT_CLIENT_EVENT_RESULT action = (packet == PUBACK_TYPE) ? MQTT_CLIENT_ON_PUBLISH_ACK : + (packet == PUBREC_TYPE) ? MQTT_CLIENT_ON_PUBLISH_RECV : + (packet == PUBREL_TYPE) ? MQTT_CLIENT_ON_PUBLISH_REL : MQTT_CLIENT_ON_PUBLISH_COMP; + + PUBLISH_ACK publish_ack = { 0 }; + publish_ack.packetId = byteutil_read_uint16(&iterator); BUFFER_HANDLE pubRel = NULL; - if (qosValue == DELIVER_EXACTLY_ONCE) + mqttData->fnOperationCallback(mqttData, action, (void*)&publish_ack, mqttData->ctx); + if (packet == PUBREC_TYPE) { - pubRel = mqtt_codec_publishReceived(packetId); + pubRel = mqtt_codec_publishRelease(publish_ack.packetId); } - else if (qosValue == DELIVER_AT_LEAST_ONCE) + else if (packet == PUBREL_TYPE) { - pubRel = mqtt_codec_publishAck(packetId); + pubRel = mqtt_codec_publishComplete(publish_ack.packetId); } if (pubRel != NULL) { (void)sendPacketItem(mqttData, BUFFER_u_char(pubRel), BUFFER_length(pubRel)); BUFFER_delete(pubRel); } - free(topicName); - mqttmessage_destroy(msgHandle); } - } - break; - } - case PUBACK_TYPE: - case PUBREC_TYPE: - case PUBREL_TYPE: - case PUBCOMP_TYPE: - { - if (mqttData->fnOperationCallback) - { - /*Codes_SRS_MQTT_CLIENT_07_029: [If the actionResult parameter are of types PUBACK_TYPE, PUBREC_TYPE, PUBREL_TYPE or PUBCOMP_TYPE then the msgInfo value shall be a PUBLISH_ACK structure.]*/ - MQTT_CLIENT_EVENT_RESULT action = (packet == PUBACK_TYPE) ? MQTT_CLIENT_ON_PUBLISH_ACK : - (packet == PUBREC_TYPE) ? MQTT_CLIENT_ON_PUBLISH_RECV : - (packet == PUBREL_TYPE) ? MQTT_CLIENT_ON_PUBLISH_REL : MQTT_CLIENT_ON_PUBLISH_COMP; - - PUBLISH_ACK publish_ack = { 0 }; - publish_ack.packetId = byteutil_read_uint16(&iterator); - - BUFFER_HANDLE pubRel = NULL; - mqttData->fnOperationCallback(mqttData, action, (void*)&publish_ack, mqttData->ctx); - if (packet == PUBREC_TYPE) - { - pubRel = mqtt_codec_publishRelease(publish_ack.packetId); - } - else if (packet == PUBREL_TYPE) - { - pubRel = mqtt_codec_publishComplete(publish_ack.packetId); - } - if (pubRel != NULL) - { - (void)sendPacketItem(mqttData, BUFFER_u_char(pubRel), BUFFER_length(pubRel)); - BUFFER_delete(pubRel); - } + break; } - break; - } - case SUBACK_TYPE: - { - if (mqttData->fnOperationCallback) + case SUBACK_TYPE: { - /*Codes_SRS_MQTT_CLIENT_07_030: [If the actionResult parameter is of type SUBACK_TYPE then the msgInfo value shall be a SUBSCRIBE_ACK structure.]*/ - SUBSCRIBE_ACK suback = { 0 }; + if (mqttData->fnOperationCallback) + { + /*Codes_SRS_MQTT_CLIENT_07_030: [If the actionResult parameter is of type SUBACK_TYPE then the msgInfo value shall be a SUBSCRIBE_ACK structure.]*/ + SUBSCRIBE_ACK suback = { 0 }; + + size_t remainLen = len; + suback.packetId = byteutil_read_uint16(&iterator); + remainLen -= 2; - size_t remainLen = len; - suback.packetId = byteutil_read_uint16(&iterator); - remainLen -= 2; - - // Allocate the remaining len - suback.qosReturn = (QOS_VALUE*)malloc(sizeof(QOS_VALUE)*remainLen); - if (suback.qosReturn != NULL) - { - while (remainLen > 0) + // Allocate the remaining len + suback.qosReturn = (QOS_VALUE*)malloc(sizeof(QOS_VALUE)*remainLen); + if (suback.qosReturn != NULL) { - suback.qosReturn[suback.qosCount++] = byteutil_readByte(&iterator); - remainLen--; + while (remainLen > 0) + { + suback.qosReturn[suback.qosCount++] = byteutil_readByte(&iterator); + remainLen--; + } + (void)mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_SUBSCRIBE_ACK, (void*)&suback, mqttData->ctx); + free(suback.qosReturn); } - (void)mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_SUBSCRIBE_ACK, (void*)&suback, mqttData->ctx); - free(suback.qosReturn); } + break; } - break; - } - case UNSUBACK_TYPE: - { - if (mqttData->fnOperationCallback) + case UNSUBACK_TYPE: { - /*Codes_SRS_MQTT_CLIENT_07_031: [If the actionResult parameter is of type UNSUBACK_TYPE then the msgInfo value shall be a UNSUBSCRIBE_ACK structure.]*/ - UNSUBSCRIBE_ACK unsuback = { 0 }; - iterator += VARIABLE_HEADER_OFFSET; - unsuback.packetId = byteutil_read_uint16(&iterator); + if (mqttData->fnOperationCallback) + { + /*Codes_SRS_MQTT_CLIENT_07_031: [If the actionResult parameter is of type UNSUBACK_TYPE then the msgInfo value shall be a UNSUBSCRIBE_ACK structure.]*/ + UNSUBSCRIBE_ACK unsuback = { 0 }; + iterator += VARIABLE_HEADER_OFFSET; + unsuback.packetId = byteutil_read_uint16(&iterator); - (void)mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_UNSUBSCRIBE_ACK, (void*)&unsuback, mqttData->ctx); + (void)mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_UNSUBSCRIBE_ACK, (void*)&unsuback, mqttData->ctx); + } + break; } - break; - } - case PINGRESP_TYPE: - // Ping responses do not get forwarded - break; - default: - break; + case PINGRESP_TYPE: + mqttData->timeSincePing = 0; + // Ping responses do not get forwarded + break; + default: + break; } } } @@ -515,6 +529,8 @@ result->clientConnected = false; result->logTrace = false; result->rawBytesTrace = false; + result->timeSincePing = 0; + result->maxPingRespTime = DEFAULT_MAX_PING_RESPONSE_TIME; if (result->packetTickCntr == NULL) { /*Codes_SRS_MQTT_CLIENT_07_002: [If any failure is encountered then mqttclient_init shall return NULL.]*/ @@ -580,6 +596,7 @@ mqttData->packetState = UNKNOWN_TYPE; mqttData->qosValue = mqttOptions->qualityOfServiceValue; mqttData->keepAliveInterval = mqttOptions->keepAliveInterval; + mqttData->maxPingRespTime = (DEFAULT_MAX_PING_RESPONSE_TIME < mqttOptions->keepAliveInterval/2) ? DEFAULT_MAX_PING_RESPONSE_TIME : mqttOptions->keepAliveInterval/2; if (cloneMqttOptions(mqttData, mqttOptions) != 0) { LOG(mqttData->logFunc, LOG_LINE, "Error: Clone Mqtt Options failed"); @@ -790,7 +807,21 @@ } else { - if ((((current_ms - mqttData->packetSendTimeMs) / 1000) + KEEP_ALIVE_BUFFER_SEC) > mqttData->keepAliveInterval) + /* Codes_SRS_MQTT_CLIENT_07_035: [If the timeSincePing has expired past the maxPingRespTime then mqtt_client_dowork shall call the Operation Callback function with the message MQTT_CLIENT_NO_PING_RESPONSE] */ + if (mqttData->timeSincePing > 0 && ((current_ms - mqttData->timeSincePing)/1000) > mqttData->maxPingRespTime) + { + // We haven't gotten a ping response in the alloted time + if (mqttData->fnOperationCallback != NULL) + { + mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_NO_PING_RESPONSE, NULL, mqttData->ctx); + } + mqttData->socketConnected = false; + mqttData->clientConnected = false; + mqttData->timeSincePing = 0; + mqttData->packetSendTimeMs = 0; + mqttData->packetState = UNKNOWN_TYPE; + } + else if ((((current_ms - mqttData->packetSendTimeMs) / 1000) + KEEP_ALIVE_BUFFER_SEC) > mqttData->keepAliveInterval) { /*Codes_SRS_MQTT_CLIENT_07_026: [if keepAliveInternal is > 0 and the send time is greater than the MQTT KeepAliveInterval then it shall construct an MQTT PINGREQ packet.]*/ BUFFER_HANDLE pingPacket = mqtt_codec_ping(); @@ -798,6 +829,7 @@ { (void)sendPacketItem(mqttData, BUFFER_u_char(pingPacket), BUFFER_length(pingPacket)); BUFFER_delete(pingPacket); + (void)tickcounter_get_current_ms(mqttData->packetTickCntr, &mqttData->timeSincePing); } } }