A small footprint MQTT library
Dependents: STM32F746_iothub_client_sample_mqtt FXOS8700CQ_To_Azure_IoT f767zi_mqtt FXOS8700CQ_To_Azure_IoT ... more
Diff: mqtt_client.c
- Revision:
- 5:34779607059c
- Parent:
- 4:e7167dabd6e4
- Child:
- 6:d9c4702f91ca
--- a/mqtt_client.c Fri Jul 01 10:42:59 2016 -0700 +++ b/mqtt_client.c Fri Jul 29 15:58:49 2016 -0700 @@ -95,7 +95,7 @@ static void sendComplete(void* context, IO_SEND_RESULT send_result) { MQTT_CLIENT* mqttData = (MQTT_CLIENT*)context; - if (mqttData != NULL && mqttData->fnOperationCallback != NULL) + if (mqttData != NULL && mqttData->fnOperationCallback != NULL && send_result == IO_SEND_OK) { if (mqttData->packetState == DISCONNECT_TYPE) { @@ -103,11 +103,22 @@ mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_DISCONNECT, NULL, mqttData->ctx); // close the xio - (void)xio_close(mqttData->xioHandle, NULL, mqttData->ctx); + if (xio_close(mqttData->xioHandle, NULL, mqttData->ctx) != 0) + { + LOG(LOG_ERROR, LOG_LINE, "MQTT xio_close failed to return a successful result."); + } mqttData->socketConnected = false; mqttData->clientConnected = false; } } + else + { + LOG(LOG_ERROR, LOG_LINE, "MQTT Send Complete Failure"); + if (mqttData->fnOperationCallback) + { + mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx); + } + } } static const char* retrievePacketType(CONTROL_PACKET_TYPE packet) @@ -245,7 +256,7 @@ } else if (open_result == IO_OPEN_ERROR) { - (void)mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx); + mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx); } } } @@ -368,47 +379,87 @@ { if (mqttData->fnMessageRecv != NULL) { - //uint8_t ctrlPacket = byteutil_readByte(&iterator); bool isDuplicateMsg = (flags & DUPLICATE_FLAG_MASK) ? true : false; bool isRetainMsg = (flags & RETAIN_FLAG_MASK) ? true : false; QOS_VALUE qosValue = (flags == 0) ? DELIVER_AT_MOST_ONCE : (flags & QOS_LEAST_ONCE_FLAG_MASK) ? DELIVER_AT_LEAST_ONCE : DELIVER_EXACTLY_ONCE; uint8_t* initialPos = iterator; char* topicName = byteutil_readUTF(&iterator, NULL); - uint16_t packetId = 0; - if (qosValue != DELIVER_AT_MOST_ONCE) + if (topicName == NULL) { - packetId = byteutil_read_uint16(&iterator); - } - size_t length = len - (iterator - initialPos); - - MQTT_MESSAGE_HANDLE msgHandle = mqttmessage_create(packetId, topicName, qosValue, iterator, length); - if (msgHandle == NULL) - { - LOG(LOG_ERROR, LOG_LINE, "failure in mqttmessage_create"); + LOG(LOG_ERROR, LOG_LINE, "Publish MSG: failure reading topic name"); + if (mqttData->fnOperationCallback) + { + mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx); + } } else { - (void)mqttmessage_setIsDuplicateMsg(msgHandle, isDuplicateMsg); - (void)mqttmessage_setIsRetained(msgHandle, isRetainMsg); - mqttData->fnMessageRecv(msgHandle, mqttData->ctx); + uint16_t packetId = 0; + if (qosValue != DELIVER_AT_MOST_ONCE) + { + packetId = byteutil_read_uint16(&iterator); + } + size_t length = len - (iterator - initialPos); - BUFFER_HANDLE pubRel = NULL; - if (qosValue == DELIVER_EXACTLY_ONCE) + MQTT_MESSAGE_HANDLE msgHandle = mqttmessage_create(packetId, topicName, qosValue, iterator, length); + if (msgHandle == NULL) { - pubRel = mqtt_codec_publishReceived(packetId); + LOG(LOG_ERROR, LOG_LINE, "failure in mqttmessage_create"); + if (mqttData->fnOperationCallback) + { + mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx); + } } - else if (qosValue == DELIVER_AT_LEAST_ONCE) + else { - pubRel = mqtt_codec_publishAck(packetId); - } - if (pubRel != NULL) - { - (void)sendPacketItem(mqttData, BUFFER_u_char(pubRel), BUFFER_length(pubRel)); - BUFFER_delete(pubRel); + if (mqttmessage_setIsDuplicateMsg(msgHandle, isDuplicateMsg) != 0 || + mqttmessage_setIsRetained(msgHandle, isRetainMsg) != 0) + { + LOG(LOG_ERROR, LOG_LINE, "failure setting mqtt message property"); + if (mqttData->fnOperationCallback) + { + mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx); + } + } + else + { + mqttData->fnMessageRecv(msgHandle, mqttData->ctx); + + BUFFER_HANDLE pubRel = NULL; + if (qosValue == DELIVER_EXACTLY_ONCE) + { + pubRel = mqtt_codec_publishReceived(packetId); + if (pubRel == NULL) + { + LOG(LOG_ERROR, LOG_LINE, "Failed to allocate publish receive message."); + if (mqttData->fnOperationCallback) + { + mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx); + } + } + } + else if (qosValue == DELIVER_AT_LEAST_ONCE) + { + pubRel = mqtt_codec_publishAck(packetId); + if (pubRel == NULL) + { + LOG(LOG_ERROR, LOG_LINE, "Failed to allocate publish ack message."); + if (mqttData->fnOperationCallback) + { + mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx); + } + } + } + if (pubRel != NULL) + { + (void)sendPacketItem(mqttData, BUFFER_u_char(pubRel), BUFFER_length(pubRel)); + BUFFER_delete(pubRel); + } + } + mqttmessage_destroy(msgHandle); } free(topicName); - mqttmessage_destroy(msgHandle); } } break; @@ -433,10 +484,26 @@ if (packet == PUBREC_TYPE) { pubRel = mqtt_codec_publishRelease(publish_ack.packetId); + if (pubRel == NULL) + { + LOG(LOG_ERROR, LOG_LINE, "Failed to allocate publish release message."); + if (mqttData->fnOperationCallback) + { + mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx); + } + } } else if (packet == PUBREL_TYPE) { pubRel = mqtt_codec_publishComplete(publish_ack.packetId); + if (pubRel == NULL) + { + LOG(LOG_ERROR, LOG_LINE, "Failed to allocate publish complete message."); + if (mqttData->fnOperationCallback) + { + mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx); + } + } } if (pubRel != NULL) { @@ -466,9 +533,17 @@ suback.qosReturn[suback.qosCount++] = byteutil_readByte(&iterator); remainLen--; } - (void)mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_SUBSCRIBE_ACK, (void*)&suback, mqttData->ctx); + mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_SUBSCRIBE_ACK, (void*)&suback, mqttData->ctx); free(suback.qosReturn); } + else + { + LOG(LOG_ERROR, LOG_LINE, "allocation of quality of service value failed."); + if (mqttData->fnOperationCallback) + { + mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx); + } + } } break; } @@ -481,7 +556,7 @@ iterator += VARIABLE_HEADER_OFFSET; unsuback.packetId = byteutil_read_uint16(&iterator); - (void)mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_UNSUBSCRIBE_ACK, (void*)&unsuback, mqttData->ctx); + mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_UNSUBSCRIBE_ACK, (void*)&unsuback, mqttData->ctx); } break; } @@ -583,6 +658,7 @@ /*SRS_MQTT_CLIENT_07_006: [If any of the parameters handle, ioHandle, or mqttOptions are NULL then mqtt_client_connect shall return a non-zero value.]*/ if (handle == NULL || mqttOptions == NULL) { + LOG(LOG_ERROR, LOG_LINE, "mqtt_client_connect: NULL argument (handle = %p, mqttOptions = %p)", handle, mqttOptions); result = __LINE__; } else