DIYmall 0.96" Inch I2c IIC Serial 128x64 Oled LCD LED White Display Module
Dependencies: Adafruit_GFX SDFileSystem
Fork of ATT_AWS_IoT_demo by
Diff: AWS_openssl/aws_mqtt_embedded_client_lib/MQTTClient_C/src/MQTTClient.cpp
- Revision:
- 15:6f2798e45099
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/AWS_openssl/aws_mqtt_embedded_client_lib/MQTTClient_C/src/MQTTClient.cpp Thu Dec 01 18:05:38 2016 +0000 @@ -0,0 +1,1068 @@ +/******************************************************************************* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation + *******************************************************************************/ + +#include "MQTTClient.h" +#include <string.h> +#include "aws_iot_log.h" + +static void MQTTForceDisconnect(Client *c); + +void NewMessageData(MessageData *md, MQTTString *aTopicName, MQTTMessage *aMessage, pApplicationHandler_t applicationHandler) { + md->topicName = aTopicName; + md->message = aMessage; + md->applicationHandler = applicationHandler; +} + +uint16_t getNextPacketId(Client *c) { + return c->nextPacketId = (uint16_t)((MAX_PACKET_ID == c->nextPacketId) ? 1 : (c->nextPacketId + 1)); +} + +MQTTReturnCode sendPacket(Client *c, uint32_t length, Timer *timer) { + int32_t sentLen = 0; + uint32_t sent = 0, i; + + if(NULL == c || NULL == timer) { + ERROR("...MQTT_NULL_VALUE_ERROR"); + return MQTT_NULL_VALUE_ERROR; + } + + if(length >= c->bufSize) { + ERROR("...MQTTPACKET_BUFFER_TOO_SHORT"); + return MQTTPACKET_BUFFER_TOO_SHORT; + } + + while(sent < length && !expired(timer)) { + sentLen = c->networkStack.mqttwrite(&(c->networkStack), &c->buf[sent], (int)length, left_ms(timer)); + if(sentLen < 0) { + /* there was an error writing the data */ + ERROR("...there was an error writing the data"); + break; + } + sent = sent + (uint32_t)sentLen; + } + + if(sent == length) { + /* record the fact that we have successfully sent the packet */ + //countdown(&c->pingTimer, c->keepAliveInterval); + return SUCCESS; + } + + ERROR("...sendPacket FAILURE, sent = %d, length = %d", sent, length); + return FAILURE; +} + +void copyMQTTConnectData(MQTTPacket_connectData *destination, MQTTPacket_connectData *source) { + if(NULL == destination || NULL == source) { + return; + } + destination->willFlag = source->willFlag; + destination->MQTTVersion = source->MQTTVersion; + destination->clientID.cstring = source->clientID.cstring; + destination->username.cstring = source->username.cstring; + destination->password.cstring = source->password.cstring; + destination->will.topicName.cstring = source->will.topicName.cstring; + destination->will.message.cstring = source->will.message.cstring; + destination->will.qos = source->will.qos; + destination->will.retained = source->will.retained; + destination->keepAliveInterval = source->keepAliveInterval; + destination->cleansession = source->cleansession; +} + +MQTTReturnCode MQTTClient(Client *c, uint32_t commandTimeoutMs, + unsigned char *buf, size_t bufSize, unsigned char *readbuf, + size_t readBufSize, uint8_t enableAutoReconnect, + networkInitHandler_t networkInitHandler, + TLSConnectParams *tlsConnectParams) { + uint32_t i; + MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; + + if(NULL == c || NULL == tlsConnectParams || NULL == buf || NULL == readbuf + || NULL == networkInitHandler) { + return MQTT_NULL_VALUE_ERROR; + } + + for(i = 0; i < MAX_MESSAGE_HANDLERS; ++i) { + c->messageHandlers[i].topicFilter = NULL; + c->messageHandlers[i].fp = NULL; + c->messageHandlers[i].applicationHandler = NULL; + c->messageHandlers[i].qos = (QoS)0; + } + + c->commandTimeoutMs = commandTimeoutMs; + c->buf = buf; + c->bufSize = bufSize; + c->readbuf = readbuf; + c->readBufSize = readBufSize; + c->isConnected = 0; + c->isPingOutstanding = 0; + c->wasManuallyDisconnected = 0; + c->counterNetworkDisconnected = 0; + c->isAutoReconnectEnabled = enableAutoReconnect; + c->defaultMessageHandler = NULL; + c->disconnectHandler = NULL; + copyMQTTConnectData(&(c->options), &default_options); + + c->networkInitHandler = networkInitHandler; + c->tlsConnectParams.DestinationPort = tlsConnectParams->DestinationPort; + c->tlsConnectParams.pDestinationURL = tlsConnectParams->pDestinationURL; + c->tlsConnectParams.pDeviceCertLocation = tlsConnectParams->pDeviceCertLocation; + c->tlsConnectParams.pDevicePrivateKeyLocation = tlsConnectParams->pDevicePrivateKeyLocation; + c->tlsConnectParams.pRootCALocation = tlsConnectParams->pRootCALocation; + c->tlsConnectParams.timeout_ms = tlsConnectParams->timeout_ms; + c->tlsConnectParams.ServerVerificationFlag = tlsConnectParams->ServerVerificationFlag; + + InitTimer(&(c->pingTimer)); + InitTimer(&(c->reconnectDelayTimer)); + + return SUCCESS; +} + +MQTTReturnCode decodePacket(Client *c, uint32_t *value, uint32_t timeout) { + unsigned char i; + uint32_t multiplier = 1; + uint32_t len = 0; + const uint32_t MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; + + if(NULL == c || NULL == value) { + return MQTT_NULL_VALUE_ERROR; + } + + *value = 0; + + do { + if(++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) { + /* bad data */ + return MQTTPACKET_READ_ERROR; + } + + if((c->networkStack.mqttread(&(c->networkStack), &i, 1, (int)timeout)) != 1) { + /* The value argument is the important value. len is just used temporarily + * and never used by the calling function for anything else */ + return FAILURE; + } + + *value += ((i & 127) * multiplier); + multiplier *= 128; + }while((i & 128) != 0); + + /* The value argument is the important value. len is just used temporarily + * and never used by the calling function for anything else */ + return SUCCESS; +} + +MQTTReturnCode readPacket(Client *c, Timer *timer, uint8_t *packet_type) { + MQTTHeader header = {0}; + uint32_t len = 0; + uint32_t rem_len = 0; + uint32_t total_bytes_read = 0; + uint32_t bytes_to_be_read = 0; + int32_t ret_val = 0; + MQTTReturnCode rc; + + if(NULL == c || NULL == timer) { + ERROR("readPacket() MQTT_NULL_VALUE_ERROR"); + return MQTT_NULL_VALUE_ERROR; + } + + /* 1. read the header byte. This has the packet type in it */ + if(1 != c->networkStack.mqttread(&(c->networkStack), c->readbuf, 1, left_ms(timer))) { + /* If a network disconnect has occurred it would have been caught by keepalive already. + * If nothing is found at this point means there was nothing to read. Not 100% correct, + * but the only way to be sure is to pass proper error codes from the network stack + * which the mbedtls/openssl implementations do not return */ + return MQTT_NOTHING_TO_READ; + } + + len = 1; + /* 2. read the remaining length. This is variable in itself */ + rc = decodePacket(c, &rem_len, (uint32_t)left_ms(timer)); + if(SUCCESS != rc) { + ERROR("readPacket() SUCCESS != rc"); + return rc; + } + + /* if the buffer is too short then the message will be dropped silently */ + if (rem_len >= c->readBufSize) { + bytes_to_be_read = c->readBufSize; + do { + ret_val = c->networkStack.mqttread(&(c->networkStack), c->readbuf, bytes_to_be_read, left_ms(timer)); + if (ret_val > 0) { + total_bytes_read += ret_val; + if((rem_len - total_bytes_read) >= c->readBufSize){ + bytes_to_be_read = c->readBufSize; + } + else{ + bytes_to_be_read = rem_len - total_bytes_read; + } + } + } while (total_bytes_read < rem_len && ret_val > 0); + return MQTTPACKET_BUFFER_TOO_SHORT; + } + + /* put the original remaining length back into the buffer */ + len += MQTTPacket_encode(c->readbuf + 1, rem_len); + + /* 3. read the rest of the buffer using a callback to supply the rest of the data */ + if(rem_len > 0 && (c->networkStack.mqttread(&(c->networkStack), c->readbuf + len, (int)rem_len, left_ms(timer)) != (int)rem_len)) { + ERROR("readPacket() FAILURE"); + return FAILURE; + } + + header.byte = c->readbuf[0]; + *packet_type = header.bits.type; + + return SUCCESS; +} + +// assume topic filter and name is in correct format +// # can only be at end +// + and # can only be next to separator +char isTopicMatched(char *topicFilter, MQTTString *topicName) { + char *curf = NULL; + char *curn = NULL; + char *curn_end = NULL; + + if(NULL == topicFilter || NULL == topicName) { + return MQTT_NULL_VALUE_ERROR; + } + + curf = topicFilter; + curn = topicName->lenstring.data; + curn_end = curn + topicName->lenstring.len; + + while(*curf && (curn < curn_end)) { + if(*curn == '/' && *curf != '/') { + break; + } + if(*curf != '+' && *curf != '#' && *curf != *curn) { + break; + } + if(*curf == '+') { + /* skip until we meet the next separator, or end of string */ + char *nextpos = curn + 1; + while(nextpos < curn_end && *nextpos != '/') + nextpos = ++curn + 1; + } else if(*curf == '#') { + /* skip until end of string */ + curn = curn_end - 1; + } + + curf++; + curn++; + }; + + return (curn == curn_end) && (*curf == '\0'); +} + +MQTTReturnCode deliverMessage(Client *c, MQTTString *topicName, MQTTMessage *message) { + uint32_t i; + MessageData md; + + if(NULL == c || NULL == topicName || NULL == message) { + return MQTT_NULL_VALUE_ERROR; + } + + // we have to find the right message handler - indexed by topic + for(i = 0; i < MAX_MESSAGE_HANDLERS; ++i) { + if((c->messageHandlers[i].topicFilter != 0) + && (MQTTPacket_equals(topicName, (char*)c->messageHandlers[i].topicFilter) || + isTopicMatched((char*)c->messageHandlers[i].topicFilter, topicName))) { + if(c->messageHandlers[i].fp != NULL) { + NewMessageData(&md, topicName, message, c->messageHandlers[i].applicationHandler); + c->messageHandlers[i].fp(&md); + return SUCCESS; + } + } + } + + if(NULL != c->defaultMessageHandler) { + NewMessageData(&md, topicName, message, NULL); + c->defaultMessageHandler(&md); + return SUCCESS; + } + + /* Message handler not found for topic */ + return FAILURE; +} + +MQTTReturnCode handleDisconnect(Client *c) { + MQTTReturnCode rc; + + if(NULL == c) { + return MQTT_NULL_VALUE_ERROR; + } + + rc = MQTTDisconnect(c); + if(rc != SUCCESS){ + // If the sendPacket prevents us from sending a disconnect packet then we have to clean the stack + MQTTForceDisconnect(c); + } + + if(NULL != c->disconnectHandler) { + c->disconnectHandler(); + } + + /* Reset to 0 since this was not a manual disconnect */ + c->wasManuallyDisconnected = 0; + return MQTT_NETWORK_DISCONNECTED_ERROR; +} + +MQTTReturnCode MQTTAttemptReconnect(Client *c) { + MQTTReturnCode rc = MQTT_ATTEMPTING_RECONNECT; + + if(NULL == c) { + return MQTT_NULL_VALUE_ERROR; + } + + if(1 == c->isConnected) { + return MQTT_NETWORK_ALREADY_CONNECTED_ERROR; + } + + /* Ignoring return code. failures expected if network is disconnected */ + rc = MQTTConnect(c, NULL); + + /* If still disconnected handle disconnect */ + if(0 == c->isConnected) { + return MQTT_ATTEMPTING_RECONNECT; + } + + rc = MQTTResubscribe(c); + if(SUCCESS != rc) { + return rc; + } + + return MQTT_NETWORK_RECONNECTED; +} + +MQTTReturnCode handleReconnect(Client *c) { + int8_t isPhysicalLayerConnected = 1; + MQTTReturnCode rc = MQTT_NETWORK_RECONNECTED; + + if(NULL == c) { + return MQTT_NULL_VALUE_ERROR; + } + + if(!expired(&(c->reconnectDelayTimer))) { + /* Timer has not expired. Not time to attempt reconnect yet. + * Return attempting reconnect */ + return MQTT_ATTEMPTING_RECONNECT; + } + + if(NULL != c->networkStack.isConnected) { + isPhysicalLayerConnected = (int8_t)c->networkStack.isConnected(&(c->networkStack)); + } + + if(isPhysicalLayerConnected) { + rc = MQTTAttemptReconnect(c); + if(MQTT_NETWORK_RECONNECTED == rc) { + return MQTT_NETWORK_RECONNECTED; + } + } + + c->currentReconnectWaitInterval *= 2; + + if(MAX_RECONNECT_WAIT_INTERVAL < c->currentReconnectWaitInterval) { + return MQTT_RECONNECT_TIMED_OUT; + } + countdown_ms(&(c->reconnectDelayTimer), c->currentReconnectWaitInterval); + return rc; +} + +MQTTReturnCode keepalive(Client *c) { + MQTTReturnCode rc = SUCCESS; + Timer timer; + uint32_t serialized_len = 0; + + if(NULL == c) { + return MQTT_NULL_VALUE_ERROR; + } + + if(0 == c->keepAliveInterval) { + return SUCCESS; + } + + if(!expired(&c->pingTimer)) { + return SUCCESS; + } + + if(c->isPingOutstanding) { + return handleDisconnect(c); + } + + /* there is no ping outstanding - send one */ + InitTimer(&timer); + countdown_ms(&timer, c->commandTimeoutMs); + rc = MQTTSerialize_pingreq(c->buf, c->bufSize, &serialized_len); + if(SUCCESS != rc) { + return rc; + } + + /* send the ping packet */ + rc = sendPacket(c, serialized_len, &timer); + if(SUCCESS != rc) { + //If sending a PING fails we can no longer determine if we are connected. In this case we decide we are disconnected and begin reconnection attempts + return handleDisconnect(c); + } + + c->isPingOutstanding = 1; + /* start a timer to wait for PINGRESP from server */ + countdown(&c->pingTimer, c->keepAliveInterval / 2); + + return SUCCESS; +} + +MQTTReturnCode handlePublish(Client *c, Timer *timer) { + MQTTString topicName; + MQTTMessage msg; + MQTTReturnCode rc; + uint32_t len = 0; + + rc = MQTTDeserialize_publish((unsigned char *) &msg.dup, (QoS *) &msg.qos, (unsigned char *) &msg.retained, + (uint16_t *)&msg.id, &topicName, + (unsigned char **) &msg.payload, (uint32_t *) &msg.payloadlen, c->readbuf, + c->readBufSize); + if(SUCCESS != rc) { + return rc; + } + + rc = deliverMessage(c, &topicName, &msg); + if(SUCCESS != rc) { + return rc; + } + + if(QOS0 == msg.qos) { + /* No further processing required for QOS0 */ + return SUCCESS; + } + + if(QOS1 == msg.qos) { + rc = MQTTSerialize_ack(c->buf, c->bufSize, PUBACK, 0, msg.id, &len); + } else { /* Message is not QOS0 or 1 means only option left is QOS2 */ + rc = MQTTSerialize_ack(c->buf, c->bufSize, PUBREC, 0, msg.id, &len); + } + + if(SUCCESS != rc) { + return rc; + } + + rc = sendPacket(c, len, timer); + if(SUCCESS != rc) { + return rc; + } + + return SUCCESS; +} + +MQTTReturnCode handlePubrec(Client *c, Timer *timer) { + uint16_t packet_id; + unsigned char dup, type; + MQTTReturnCode rc; + uint32_t len; + + rc = MQTTDeserialize_ack(&type, &dup, &packet_id, c->readbuf, c->readBufSize); + if(SUCCESS != rc) { + return rc; + } + + rc = MQTTSerialize_ack(c->buf, c->bufSize, PUBREL, 0, packet_id, &len); + if(SUCCESS != rc) { + return rc; + } + + /* send the PUBREL packet */ + rc = sendPacket(c, len, timer); + if(SUCCESS != rc) { + /* there was a problem */ + return rc; + } + + return SUCCESS; +} + +MQTTReturnCode cycle(Client *c, Timer *timer, uint8_t *packet_type) { + MQTTReturnCode rc; + if(NULL == c || NULL == timer) { + ERROR("cycle() MQTT_NULL_VALUE_ERROR"); + return MQTT_NULL_VALUE_ERROR; + } + + /* read the socket, see what work is due */ + rc = readPacket(c, timer, packet_type); + if(MQTT_NOTHING_TO_READ == rc) { + /* Nothing to read, not a cycle failure */ + return SUCCESS; + } + if(SUCCESS != rc) { + ERROR("cycle() SUCCESS != rc"); + return rc; + } + + switch(*packet_type) { + case CONNACK: + case PUBACK: + case SUBACK: + case UNSUBACK: + break; + case PUBLISH: { + rc = handlePublish(c, timer); + break; + } + case PUBREC: { + rc = handlePubrec(c, timer); + break; + } + case PUBCOMP: + break; + case PINGRESP: { + c->isPingOutstanding = 0; + countdown(&c->pingTimer, c->keepAliveInterval); + break; + } + default: { + /* Either unknown packet type or Failure occurred + * Should not happen */ + ERROR("cycle() Either unknown packet type or Failure occurred"); + return MQTT_BUFFER_RX_MESSAGE_INVALID; + break; + } + } + + return rc; +} + +MQTTReturnCode MQTTYield(Client *c, uint32_t timeout_ms) { + MQTTReturnCode rc = SUCCESS; + Timer timer; + uint8_t packet_type; + + if(NULL == c) { + ERROR("MQTTYield() MQTT_NULL_VALUE_ERROR"); + return MQTT_NULL_VALUE_ERROR; + } + + /* Check if network was manually disconnected */ + if(0 == c->isConnected && 1 == c->wasManuallyDisconnected) { + ERROR("MQTTYield() MQTT_NETWORK_MANUALLY_DISCONNECTED"); + return MQTT_NETWORK_MANUALLY_DISCONNECTED; + } + + /* Check if network is disconnected and auto-reconnect is not enabled */ + if(0 == c->isConnected && 0 == c->isAutoReconnectEnabled) { + ERROR("MQTTYield() MQTT_NETWORK_DISCONNECTED_ERROR"); + return MQTT_NETWORK_DISCONNECTED_ERROR; + } + + InitTimer(&timer); + countdown_ms(&timer, timeout_ms); + + while(!expired(&timer)) { + if(0 == c->isConnected) { + if(MAX_RECONNECT_WAIT_INTERVAL < c->currentReconnectWaitInterval) { + rc = MQTT_RECONNECT_TIMED_OUT; + ERROR("MQTTYield() MQTT_RECONNECT_TIMED_OUT"); + break; + } + + rc = handleReconnect(c); + /* Network reconnect attempted, check if yield timer expired before + * doing anything else */ + continue; + } + + rc = cycle(c, &timer, &packet_type); + if(SUCCESS != rc) { + ERROR("MQTTYield() SUCCESS != rc"); + break; + } + + rc = keepalive(c); + if(MQTT_NETWORK_DISCONNECTED_ERROR == rc && 1 == c->isAutoReconnectEnabled) { + c->currentReconnectWaitInterval = MIN_RECONNECT_WAIT_INTERVAL; + countdown_ms(&(c->reconnectDelayTimer), c->currentReconnectWaitInterval); + c->counterNetworkDisconnected++; + /* Depending on timer values, it is possible that yield timer has expired + * Set to rc to attempting reconnect to inform client that autoreconnect + * attempt has started */ + INFO("MQTTYield() MQTT_ATTEMPTING_RECONNECT"); + rc = MQTT_ATTEMPTING_RECONNECT; + } else if(SUCCESS != rc) { + ERROR("MQTTYield() SUCCESS != rc"); + break; + } + } + + return rc; +} + +/* only used in single-threaded mode where one command at a time is in process */ +MQTTReturnCode waitfor(Client *c, uint8_t packet_type, Timer *timer) { + MQTTReturnCode rc = FAILURE; + uint8_t read_packet_type = 0, retry = 2; + + + + if(NULL == c || NULL == timer) { + ERROR("waitfor() MQTT_NULL_VALUE_ERROR"); + return MQTT_NULL_VALUE_ERROR; + } + + do { + if(expired(timer)) { + /* we timed out */ + ERROR("waitfor() timer expired"); + break; + + //countdown_ms(timer, 10); + } + + rc = cycle(c, timer, &read_packet_type); + }while(MQTT_NETWORK_DISCONNECTED_ERROR != rc && read_packet_type != packet_type); + + if(MQTT_NETWORK_DISCONNECTED_ERROR != rc && read_packet_type != packet_type) { + ERROR("waitfor() MQTT_NETWORK_DISCONNECTED_ERROR"); + return FAILURE; + } + + /* Something failed or we didn't receive the expected packet, return error code */ + return rc; +} + +MQTTReturnCode MQTTConnect(Client *c, MQTTPacket_connectData *options) { + Timer connect_timer; + MQTTReturnCode connack_rc = FAILURE; + char sessionPresent = 0; + uint32_t len = 0; + MQTTReturnCode rc = FAILURE; + + if(NULL == c) { + return MQTT_NULL_VALUE_ERROR; + } + + DEBUG("...connect_timer"); + InitTimer(&connect_timer); + countdown_ms(&connect_timer, c->commandTimeoutMs); + + if(c->isConnected) { + /* Don't send connect packet again if we are already connected */ + ERROR("...MQTT_NETWORK_ALREADY_CONNECTED_ERROR"); + return MQTT_NETWORK_ALREADY_CONNECTED_ERROR; + } + + if(NULL != options) { + /* override default options if new options were supplied */ + copyMQTTConnectData(&(c->options), options); + } + + DEBUG("...TLS Connect"); + c->networkInitHandler(&(c->networkStack)); + rc = (MQTTReturnCode)c->networkStack.connect(&(c->networkStack), c->tlsConnectParams); + if(0 != rc) { + /* TLS Connect failed, return error */ + ERROR("...TLS Connect failed, return error"); + return FAILURE; + } + + c->keepAliveInterval = c->options.keepAliveInterval; + rc = MQTTSerialize_connect(c->buf, c->bufSize, &(c->options), &len); + if(SUCCESS != rc || 0 >= len) { + ERROR("...MQTTSerialize_connect FAIL"); + return FAILURE; + } + + /* send the connect packet */ + rc = sendPacket(c, len, &connect_timer); + if(SUCCESS != rc) { + ERROR("...sendPacket FAIL"); + return rc; + } + + /* this will be a blocking call, wait for the CONNACK */ + rc = waitfor(c, CONNACK, &connect_timer); + if(SUCCESS != rc) { + ERROR("...waitfor FAIL"); + return rc; + } + + /* Received CONNACK, check the return code */ + rc = MQTTDeserialize_connack((unsigned char *)&sessionPresent, &connack_rc, c->readbuf, c->readBufSize); + if(SUCCESS != rc) { + ERROR("...MQTTDeserialize_connack FAIL"); + return rc; + } + + if(MQTT_CONNACK_CONNECTION_ACCEPTED != connack_rc) { + ERROR("...MQTT_CONNACK_CONNECTION_ACCEPTED FAIL"); + return connack_rc; + } + + DEBUG("...isConnected"); + c->isConnected = 1; + c->wasManuallyDisconnected = 0; + c->isPingOutstanding = 0; + countdown(&c->pingTimer, c->keepAliveInterval); + + return SUCCESS; +} + +/* Return MAX_MESSAGE_HANDLERS value if no free index is available */ +uint32_t GetFreeMessageHandlerIndex(Client *c) { + uint32_t itr; + for(itr = 0; itr < MAX_MESSAGE_HANDLERS; itr++) { + if(c->messageHandlers[itr].topicFilter == NULL) { + break; + } + } + + return itr; +} + +MQTTReturnCode MQTTSubscribe(Client *c, const char *topicFilter, QoS qos, + messageHandler messageHandler, pApplicationHandler_t applicationHandler) { + MQTTReturnCode rc = FAILURE; + Timer timer; + uint32_t len = 0; + uint32_t indexOfFreeMessageHandler; + uint32_t count = 0; + QoS grantedQoS[3] = {QOS0, QOS0, QOS0}; + uint16_t packetId; + MQTTString topic = MQTTString_initializer; + + if(NULL == c || NULL == topicFilter + || NULL == messageHandler || NULL == applicationHandler) { + ERROR("...MQTT_NULL_VALUE_ERROR FAIL"); + return MQTT_NULL_VALUE_ERROR; + } + + if(!c->isConnected) { + ERROR("...MQTT_NETWORK_DISCONNECTED_ERROR FAIL"); + return MQTT_NETWORK_DISCONNECTED_ERROR; + } + + topic.cstring = (char *)topicFilter; + + InitTimer(&timer); + countdown_ms(&timer, c->commandTimeoutMs); + + rc = MQTTSerialize_subscribe(c->buf, c->bufSize, 0, getNextPacketId(c), 1, &topic, &qos, &len); + if(SUCCESS != rc) { + ERROR("...MQTTSerialize_subscribe FAIL"); + return rc; + } + + indexOfFreeMessageHandler = GetFreeMessageHandlerIndex(c); + if(MAX_MESSAGE_HANDLERS <= indexOfFreeMessageHandler) { + ERROR("...MQTT_MAX_SUBSCRIPTIONS_REACHED_ERROR FAIL"); + return MQTT_MAX_SUBSCRIPTIONS_REACHED_ERROR; + } + + /* send the subscribe packet */ + rc = sendPacket(c, len, &timer); + if(SUCCESS != rc) { + ERROR("...send the subscribe packet FAIL"); + return rc; + } + + /* wait for suback */ + rc = waitfor(c, SUBACK, &timer); + if(SUCCESS != rc) { + ERROR("...wait for suback FAIL"); + return rc; + } + + /* Granted QoS can be 0, 1 or 2 */ + rc = MQTTDeserialize_suback(&packetId, 1, &count, grantedQoS, c->readbuf, c->readBufSize); + if(SUCCESS != rc) { + ERROR("...Granted QoS can be 0, 1 or 2"); + return rc; + } + + c->messageHandlers[indexOfFreeMessageHandler].topicFilter = + topicFilter; + c->messageHandlers[indexOfFreeMessageHandler].fp = messageHandler; + c->messageHandlers[indexOfFreeMessageHandler].applicationHandler = + applicationHandler; + c->messageHandlers[indexOfFreeMessageHandler].qos = qos; + + DEBUG("...MQTTSubscribe SUCCESS"); + return SUCCESS; +} + +MQTTReturnCode MQTTResubscribe(Client *c) { + MQTTReturnCode rc = FAILURE; + Timer timer; + uint32_t len = 0; + uint32_t count = 0; + QoS grantedQoS[3] = {QOS0, QOS0, QOS0}; + uint16_t packetId; + uint32_t existingSubCount = 0; + uint32_t itr = 0; + + if(NULL == c) { + return MQTT_NULL_VALUE_ERROR; + } + + if(!c->isConnected) { + return MQTT_NETWORK_DISCONNECTED_ERROR; + } + + existingSubCount = GetFreeMessageHandlerIndex(c); + + for(itr = 0; itr < existingSubCount; itr++) { + MQTTString topic = MQTTString_initializer; + topic.cstring = (char *)c->messageHandlers[itr].topicFilter; + + InitTimer(&timer); + countdown_ms(&timer, c->commandTimeoutMs); + + rc = MQTTSerialize_subscribe(c->buf, c->bufSize, 0, getNextPacketId(c), 1, + &topic, &(c->messageHandlers[itr].qos), &len); + if(SUCCESS != rc) { + return rc; + } + + /* send the subscribe packet */ + rc = sendPacket(c, len, &timer); + if(SUCCESS != rc) { + return rc; + } + + /* wait for suback */ + rc = waitfor(c, SUBACK, &timer); + if(SUCCESS != rc) { + return rc; + } + + /* Granted QoS can be 0, 1 or 2 */ + rc = MQTTDeserialize_suback(&packetId, 1, &count, grantedQoS, c->readbuf, c->readBufSize); + if(SUCCESS != rc) { + return rc; + } + } + + return SUCCESS; +} + +MQTTReturnCode MQTTUnsubscribe(Client *c, const char *topicFilter) { + MQTTReturnCode rc = FAILURE; + Timer timer; + MQTTString topic = MQTTString_initializer; + uint32_t len = 0; + uint32_t i = 0; + uint16_t packet_id; + + if(NULL == c || NULL == topicFilter) { + return MQTT_NULL_VALUE_ERROR; + } + + topic.cstring = (char *)topicFilter; + + if(!c->isConnected) { + return MQTT_NETWORK_DISCONNECTED_ERROR; + } + + InitTimer(&timer); + countdown_ms(&timer, c->commandTimeoutMs); + + rc = MQTTSerialize_unsubscribe(c->buf, c->bufSize, 0, getNextPacketId(c), 1, &topic, &len); + if(SUCCESS != rc) { + return rc; + } + + /* send the unsubscribe packet */ + rc = sendPacket(c, len, &timer); + if(SUCCESS != rc) { + return rc; + } + + rc = waitfor(c, UNSUBACK, &timer); + if(SUCCESS != rc) { + return rc; + } + + rc = MQTTDeserialize_unsuback(&packet_id, c->readbuf, c->readBufSize); + if(SUCCESS != rc) { + return rc; + } + + /* Remove from message handler array */ + for(i = 0; i < MAX_MESSAGE_HANDLERS; ++i) { + if(c->messageHandlers[i].topicFilter != NULL && + (strcmp(c->messageHandlers[i].topicFilter, topicFilter) == 0)) { + c->messageHandlers[i].topicFilter = NULL; + /* We don't want to break here, if the same topic is registered + * with 2 callbacks. Unlikely scenario */ + } + } + + return SUCCESS; +} + +MQTTReturnCode MQTTPublish(Client *c, const char *topicName, MQTTMessage *message) { + Timer timer; + MQTTString topic = MQTTString_initializer; + uint32_t len = 0; + uint8_t waitForAck = 0; + uint8_t packetType = PUBACK; + uint16_t packet_id; + unsigned char dup, type; + MQTTReturnCode rc = FAILURE; + + if(NULL == c || NULL == topicName || NULL == message) { + return MQTT_NULL_VALUE_ERROR; + } + + topic.cstring = (char *)topicName; + + if(!c->isConnected) { + return MQTT_NETWORK_DISCONNECTED_ERROR; + } + + InitTimer(&timer); + countdown_ms(&timer, c->commandTimeoutMs); + + if(QOS1 == message->qos || QOS2 == message->qos) { + message->id = getNextPacketId(c); + waitForAck = 1; + if(QOS2 == message->qos) { + packetType = PUBCOMP; + } + } + + rc = MQTTSerialize_publish(c->buf, c->bufSize, 0, message->qos, message->retained, message->id, + topic, (unsigned char*)message->payload, message->payloadlen, &len); + if(SUCCESS != rc) { + return rc; + } + + /* send the publish packet */ + rc = sendPacket(c, len, &timer); + if(SUCCESS != rc) { + return rc; + } + + /* Wait for ack if QoS1 or QoS2 */ + if(1 == waitForAck) { + rc = waitfor(c, packetType, &timer); + if(SUCCESS != rc) { + return rc; + } + + rc = MQTTDeserialize_ack(&type, &dup, &packet_id, c->readbuf, c->readBufSize); + if(SUCCESS != rc) { + return rc; + } + } + + return SUCCESS; +} +/** + * This is for the case when the sendPacket Fails. + */ +static void MQTTForceDisconnect(Client *c){ + c->isConnected = 0; + c->networkStack.disconnect(&(c->networkStack)); + c->networkStack.destroy(&(c->networkStack)); +} + +MQTTReturnCode MQTTDisconnect(Client *c) { + MQTTReturnCode rc = FAILURE; + /* We might wait for incomplete incoming publishes to complete */ + Timer timer; + uint32_t serialized_len = 0; + + if(NULL == c) { + return MQTT_NULL_VALUE_ERROR; + } + + if(0 == c->isConnected) { + /* Network is already disconnected. Do nothing */ + return MQTT_NETWORK_DISCONNECTED_ERROR; + } + + rc = MQTTSerialize_disconnect(c->buf, c->bufSize, &serialized_len); + if(SUCCESS != rc) { + return rc; + } + + InitTimer(&timer); + countdown_ms(&timer, c->commandTimeoutMs); + + /* send the disconnect packet */ + if(serialized_len > 0) { + rc = sendPacket(c, serialized_len, &timer); + if(SUCCESS != rc) { + return rc; + } + } + + /* Clean network stack */ + c->networkStack.disconnect(&(c->networkStack)); + rc = (MQTTReturnCode)c->networkStack.destroy(&(c->networkStack)); + if(0 != rc) { + /* TLS Destroy failed, return error */ + return FAILURE; + } + + c->isConnected = 0; + + /* Always set to 1 whenever disconnect is called. Keepalive resets to 0 */ + c->wasManuallyDisconnected = 1; + + return SUCCESS; +} + +uint8_t MQTTIsConnected(Client *c) { + if(NULL == c) { + return 0; + } + + return c->isConnected; +} + +uint8_t MQTTIsAutoReconnectEnabled(Client *c) { + if(NULL == c) { + return 0; + } + + return c->isAutoReconnectEnabled; +} + +MQTTReturnCode setDisconnectHandler(Client *c, disconnectHandler_t disconnectHandler) { + if(NULL == c || NULL == disconnectHandler) { + return MQTT_NULL_VALUE_ERROR; + } + + c->disconnectHandler = disconnectHandler; + return SUCCESS; +} + +MQTTReturnCode setAutoReconnectEnabled(Client *c, uint8_t value) { + if(NULL == c) { + return FAILURE; + } + c->isAutoReconnectEnabled = value; + return SUCCESS; +} + +uint32_t MQTTGetNetworkDisconnectedCount(Client *c) { + return c->counterNetworkDisconnected; +} + +void MQTTResetNetworkDisconnectedCount(Client *c) { + c->counterNetworkDisconnected = 0; +} + +