Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependents: STM32F746_iothub_client_sample_mqtt FXOS8700CQ_To_Azure_IoT f767zi_mqtt FXOS8700CQ_To_Azure_IoT ... more
Revision 5:34779607059c, committed 2016-07-29
- Comitter:
- AzureIoTClient
- Date:
- Fri Jul 29 15:58:49 2016 -0700
- Parent:
- 4:e7167dabd6e4
- Child:
- 6:d9c4702f91ca
- Commit message:
- 1.0.10
Changed in this revision
--- a/azure_umqtt_c/mqttconst.h Fri Jul 01 10:42:59 2016 -0700
+++ b/azure_umqtt_c/mqttconst.h Fri Jul 29 15:58:49 2016 -0700
@@ -56,7 +56,7 @@
char* willMessage;
char* username;
char* password;
- int keepAliveInterval;
+ uint16_t keepAliveInterval;
bool messageRetain;
bool useCleanSession;
QOS_VALUE qualityOfServiceValue;
--- a/mqtt_client.c Fri Jul 01 10:42:59 2016 -0700
+++ b/mqtt_client.c Fri Jul 29 15:58:49 2016 -0700
@@ -95,7 +95,7 @@
static void sendComplete(void* context, IO_SEND_RESULT send_result)
{
MQTT_CLIENT* mqttData = (MQTT_CLIENT*)context;
- if (mqttData != NULL && mqttData->fnOperationCallback != NULL)
+ if (mqttData != NULL && mqttData->fnOperationCallback != NULL && send_result == IO_SEND_OK)
{
if (mqttData->packetState == DISCONNECT_TYPE)
{
@@ -103,11 +103,22 @@
mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_DISCONNECT, NULL, mqttData->ctx);
// close the xio
- (void)xio_close(mqttData->xioHandle, NULL, mqttData->ctx);
+ if (xio_close(mqttData->xioHandle, NULL, mqttData->ctx) != 0)
+ {
+ LOG(LOG_ERROR, LOG_LINE, "MQTT xio_close failed to return a successful result.");
+ }
mqttData->socketConnected = false;
mqttData->clientConnected = false;
}
}
+ else
+ {
+ LOG(LOG_ERROR, LOG_LINE, "MQTT Send Complete Failure");
+ if (mqttData->fnOperationCallback)
+ {
+ mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
+ }
+ }
}
static const char* retrievePacketType(CONTROL_PACKET_TYPE packet)
@@ -245,7 +256,7 @@
}
else if (open_result == IO_OPEN_ERROR)
{
- (void)mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
+ mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
}
}
}
@@ -368,47 +379,87 @@
{
if (mqttData->fnMessageRecv != NULL)
{
- //uint8_t ctrlPacket = byteutil_readByte(&iterator);
bool isDuplicateMsg = (flags & DUPLICATE_FLAG_MASK) ? true : false;
bool isRetainMsg = (flags & RETAIN_FLAG_MASK) ? true : false;
QOS_VALUE qosValue = (flags == 0) ? DELIVER_AT_MOST_ONCE : (flags & QOS_LEAST_ONCE_FLAG_MASK) ? DELIVER_AT_LEAST_ONCE : DELIVER_EXACTLY_ONCE;
uint8_t* initialPos = iterator;
char* topicName = byteutil_readUTF(&iterator, NULL);
- uint16_t packetId = 0;
- if (qosValue != DELIVER_AT_MOST_ONCE)
+ if (topicName == NULL)
{
- packetId = byteutil_read_uint16(&iterator);
- }
- size_t length = len - (iterator - initialPos);
-
- MQTT_MESSAGE_HANDLE msgHandle = mqttmessage_create(packetId, topicName, qosValue, iterator, length);
- if (msgHandle == NULL)
- {
- LOG(LOG_ERROR, LOG_LINE, "failure in mqttmessage_create");
+ LOG(LOG_ERROR, LOG_LINE, "Publish MSG: failure reading topic name");
+ if (mqttData->fnOperationCallback)
+ {
+ mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
+ }
}
else
{
- (void)mqttmessage_setIsDuplicateMsg(msgHandle, isDuplicateMsg);
- (void)mqttmessage_setIsRetained(msgHandle, isRetainMsg);
- mqttData->fnMessageRecv(msgHandle, mqttData->ctx);
+ uint16_t packetId = 0;
+ if (qosValue != DELIVER_AT_MOST_ONCE)
+ {
+ packetId = byteutil_read_uint16(&iterator);
+ }
+ size_t length = len - (iterator - initialPos);
- BUFFER_HANDLE pubRel = NULL;
- if (qosValue == DELIVER_EXACTLY_ONCE)
+ MQTT_MESSAGE_HANDLE msgHandle = mqttmessage_create(packetId, topicName, qosValue, iterator, length);
+ if (msgHandle == NULL)
{
- pubRel = mqtt_codec_publishReceived(packetId);
+ LOG(LOG_ERROR, LOG_LINE, "failure in mqttmessage_create");
+ if (mqttData->fnOperationCallback)
+ {
+ mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
+ }
}
- else if (qosValue == DELIVER_AT_LEAST_ONCE)
+ else
{
- pubRel = mqtt_codec_publishAck(packetId);
- }
- if (pubRel != NULL)
- {
- (void)sendPacketItem(mqttData, BUFFER_u_char(pubRel), BUFFER_length(pubRel));
- BUFFER_delete(pubRel);
+ if (mqttmessage_setIsDuplicateMsg(msgHandle, isDuplicateMsg) != 0 ||
+ mqttmessage_setIsRetained(msgHandle, isRetainMsg) != 0)
+ {
+ LOG(LOG_ERROR, LOG_LINE, "failure setting mqtt message property");
+ if (mqttData->fnOperationCallback)
+ {
+ mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
+ }
+ }
+ else
+ {
+ mqttData->fnMessageRecv(msgHandle, mqttData->ctx);
+
+ BUFFER_HANDLE pubRel = NULL;
+ if (qosValue == DELIVER_EXACTLY_ONCE)
+ {
+ pubRel = mqtt_codec_publishReceived(packetId);
+ if (pubRel == NULL)
+ {
+ LOG(LOG_ERROR, LOG_LINE, "Failed to allocate publish receive message.");
+ if (mqttData->fnOperationCallback)
+ {
+ mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
+ }
+ }
+ }
+ else if (qosValue == DELIVER_AT_LEAST_ONCE)
+ {
+ pubRel = mqtt_codec_publishAck(packetId);
+ if (pubRel == NULL)
+ {
+ LOG(LOG_ERROR, LOG_LINE, "Failed to allocate publish ack message.");
+ if (mqttData->fnOperationCallback)
+ {
+ mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
+ }
+ }
+ }
+ if (pubRel != NULL)
+ {
+ (void)sendPacketItem(mqttData, BUFFER_u_char(pubRel), BUFFER_length(pubRel));
+ BUFFER_delete(pubRel);
+ }
+ }
+ mqttmessage_destroy(msgHandle);
}
free(topicName);
- mqttmessage_destroy(msgHandle);
}
}
break;
@@ -433,10 +484,26 @@
if (packet == PUBREC_TYPE)
{
pubRel = mqtt_codec_publishRelease(publish_ack.packetId);
+ if (pubRel == NULL)
+ {
+ LOG(LOG_ERROR, LOG_LINE, "Failed to allocate publish release message.");
+ if (mqttData->fnOperationCallback)
+ {
+ mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
+ }
+ }
}
else if (packet == PUBREL_TYPE)
{
pubRel = mqtt_codec_publishComplete(publish_ack.packetId);
+ if (pubRel == NULL)
+ {
+ LOG(LOG_ERROR, LOG_LINE, "Failed to allocate publish complete message.");
+ if (mqttData->fnOperationCallback)
+ {
+ mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
+ }
+ }
}
if (pubRel != NULL)
{
@@ -466,9 +533,17 @@
suback.qosReturn[suback.qosCount++] = byteutil_readByte(&iterator);
remainLen--;
}
- (void)mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_SUBSCRIBE_ACK, (void*)&suback, mqttData->ctx);
+ mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_SUBSCRIBE_ACK, (void*)&suback, mqttData->ctx);
free(suback.qosReturn);
}
+ else
+ {
+ LOG(LOG_ERROR, LOG_LINE, "allocation of quality of service value failed.");
+ if (mqttData->fnOperationCallback)
+ {
+ mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
+ }
+ }
}
break;
}
@@ -481,7 +556,7 @@
iterator += VARIABLE_HEADER_OFFSET;
unsuback.packetId = byteutil_read_uint16(&iterator);
- (void)mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_UNSUBSCRIBE_ACK, (void*)&unsuback, mqttData->ctx);
+ mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_UNSUBSCRIBE_ACK, (void*)&unsuback, mqttData->ctx);
}
break;
}
@@ -583,6 +658,7 @@
/*SRS_MQTT_CLIENT_07_006: [If any of the parameters handle, ioHandle, or mqttOptions are NULL then mqtt_client_connect shall return a non-zero value.]*/
if (handle == NULL || mqttOptions == NULL)
{
+ LOG(LOG_ERROR, LOG_LINE, "mqtt_client_connect: NULL argument (handle = %p, mqttOptions = %p)", handle, mqttOptions);
result = __LINE__;
}
else
--- a/mqtt_codec.c Fri Jul 01 10:42:59 2016 -0700
+++ b/mqtt_codec.c Fri Jul 29 15:58:49 2016 -0700
@@ -269,7 +269,7 @@
return result;
}
-static BUFFER_HANDLE constructPublishReply(CONTROL_PACKET_TYPE type, int flags, uint16_t packetId)
+static BUFFER_HANDLE constructPublishReply(CONTROL_PACKET_TYPE type, uint8_t flags, uint16_t packetId)
{
BUFFER_HANDLE result = BUFFER_new();
if (result != NULL)
@@ -289,7 +289,7 @@
}
else
{
- *iterator = type | flags;
+ *iterator = (uint8_t)type | flags;
iterator++;
*iterator = 0x2;
iterator++;
@@ -339,7 +339,7 @@
else
{
uint8_t* iterator = BUFFER_u_char(fixedHeader);
- *iterator = packetType | flags;
+ *iterator = (uint8_t)packetType | flags;
iterator++;
(void)memcpy(iterator, remainSize, index);
@@ -400,7 +400,7 @@
{
result = __LINE__;
}
- else if (usernameLen > 0 && passwordLen == 0)
+ else if (usernameLen == 0 && passwordLen > 0)
{
result = __LINE__;
}
@@ -440,8 +440,12 @@
}
if (usernameLen > 0)
{
- packet[CONN_FLAG_BYTE_OFFSET] |= USERNAME_FLAG | PASSWORD_FLAG;
+ packet[CONN_FLAG_BYTE_OFFSET] |= USERNAME_FLAG;
byteutil_writeUTF(&iterator, mqttOptions->username, (uint16_t)usernameLen);
+ }
+ if (passwordLen > 0)
+ {
+ packet[CONN_FLAG_BYTE_OFFSET] |= PASSWORD_FLAG;
byteutil_writeUTF(&iterator, mqttOptions->password, (uint16_t)passwordLen);
}
// TODO: Get the rest of the flags
@@ -915,7 +919,6 @@
}
else if (codec_Data->codecState == CODEC_STATE_VAR_HEADER)
{
- size_t payloadLen = 0;
if (codec_Data->headerData == NULL)
{
codec_Data->codecState = CODEC_STATE_PAYLOAD;
