DIYmall 0.96" Inch I2c IIC Serial 128x64 Oled LCD LED White Display Module
Dependencies: Adafruit_GFX SDFileSystem
Fork of ATT_AWS_IoT_demo by
AWS_openssl/aws_mqtt_embedded_client_lib/MQTTClient_C/src/MQTTClient.cpp
- Committer:
- afmiee
- Date:
- 2018-10-09
- Revision:
- 28:4650c541b029
- Parent:
- 15:6f2798e45099
File content as of revision 28:4650c541b029:
/******************************************************************************* * Copyright (c) 2014 IBM Corp. * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * and Eclipse Distribution License v1.0 which accompany this distribution. * * The Eclipse Public License is available at * http://www.eclipse.org/legal/epl-v10.html * and the Eclipse Distribution License is available at * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation *******************************************************************************/ #include "MQTTClient.h" #include <string.h> #include "aws_iot_log.h" static void MQTTForceDisconnect(Client *c); void NewMessageData(MessageData *md, MQTTString *aTopicName, MQTTMessage *aMessage, pApplicationHandler_t applicationHandler) { md->topicName = aTopicName; md->message = aMessage; md->applicationHandler = applicationHandler; } uint16_t getNextPacketId(Client *c) { return c->nextPacketId = (uint16_t)((MAX_PACKET_ID == c->nextPacketId) ? 1 : (c->nextPacketId + 1)); } MQTTReturnCode sendPacket(Client *c, uint32_t length, Timer *timer) { int32_t sentLen = 0; uint32_t sent = 0, i; if(NULL == c || NULL == timer) { ERROR("...MQTT_NULL_VALUE_ERROR"); return MQTT_NULL_VALUE_ERROR; } if(length >= c->bufSize) { ERROR("...MQTTPACKET_BUFFER_TOO_SHORT"); return MQTTPACKET_BUFFER_TOO_SHORT; } while(sent < length && !expired(timer)) { sentLen = c->networkStack.mqttwrite(&(c->networkStack), &c->buf[sent], (int)length, left_ms(timer)); if(sentLen < 0) { /* there was an error writing the data */ ERROR("...there was an error writing the data"); break; } sent = sent + (uint32_t)sentLen; } if(sent == length) { /* record the fact that we have successfully sent the packet */ //countdown(&c->pingTimer, c->keepAliveInterval); return SUCCESS; } ERROR("...sendPacket FAILURE, sent = %d, length = %d", sent, length); return FAILURE; } void copyMQTTConnectData(MQTTPacket_connectData *destination, MQTTPacket_connectData *source) { if(NULL == destination || NULL == source) { return; } destination->willFlag = source->willFlag; destination->MQTTVersion = source->MQTTVersion; destination->clientID.cstring = source->clientID.cstring; destination->username.cstring = source->username.cstring; destination->password.cstring = source->password.cstring; destination->will.topicName.cstring = source->will.topicName.cstring; destination->will.message.cstring = source->will.message.cstring; destination->will.qos = source->will.qos; destination->will.retained = source->will.retained; destination->keepAliveInterval = source->keepAliveInterval; destination->cleansession = source->cleansession; } MQTTReturnCode MQTTClient(Client *c, uint32_t commandTimeoutMs, unsigned char *buf, size_t bufSize, unsigned char *readbuf, size_t readBufSize, uint8_t enableAutoReconnect, networkInitHandler_t networkInitHandler, TLSConnectParams *tlsConnectParams) { uint32_t i; MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; if(NULL == c || NULL == tlsConnectParams || NULL == buf || NULL == readbuf || NULL == networkInitHandler) { return MQTT_NULL_VALUE_ERROR; } for(i = 0; i < MAX_MESSAGE_HANDLERS; ++i) { c->messageHandlers[i].topicFilter = NULL; c->messageHandlers[i].fp = NULL; c->messageHandlers[i].applicationHandler = NULL; c->messageHandlers[i].qos = (QoS)0; } c->commandTimeoutMs = commandTimeoutMs; c->buf = buf; c->bufSize = bufSize; c->readbuf = readbuf; c->readBufSize = readBufSize; c->isConnected = 0; c->isPingOutstanding = 0; c->wasManuallyDisconnected = 0; c->counterNetworkDisconnected = 0; c->isAutoReconnectEnabled = enableAutoReconnect; c->defaultMessageHandler = NULL; c->disconnectHandler = NULL; copyMQTTConnectData(&(c->options), &default_options); c->networkInitHandler = networkInitHandler; c->tlsConnectParams.DestinationPort = tlsConnectParams->DestinationPort; c->tlsConnectParams.pDestinationURL = tlsConnectParams->pDestinationURL; c->tlsConnectParams.pDeviceCertLocation = tlsConnectParams->pDeviceCertLocation; c->tlsConnectParams.pDevicePrivateKeyLocation = tlsConnectParams->pDevicePrivateKeyLocation; c->tlsConnectParams.pRootCALocation = tlsConnectParams->pRootCALocation; c->tlsConnectParams.timeout_ms = tlsConnectParams->timeout_ms; c->tlsConnectParams.ServerVerificationFlag = tlsConnectParams->ServerVerificationFlag; InitTimer(&(c->pingTimer)); InitTimer(&(c->reconnectDelayTimer)); return SUCCESS; } MQTTReturnCode decodePacket(Client *c, uint32_t *value, uint32_t timeout) { unsigned char i; uint32_t multiplier = 1; uint32_t len = 0; const uint32_t MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; if(NULL == c || NULL == value) { return MQTT_NULL_VALUE_ERROR; } *value = 0; do { if(++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) { /* bad data */ return MQTTPACKET_READ_ERROR; } if((c->networkStack.mqttread(&(c->networkStack), &i, 1, (int)timeout)) != 1) { /* The value argument is the important value. len is just used temporarily * and never used by the calling function for anything else */ return FAILURE; } *value += ((i & 127) * multiplier); multiplier *= 128; }while((i & 128) != 0); /* The value argument is the important value. len is just used temporarily * and never used by the calling function for anything else */ return SUCCESS; } MQTTReturnCode readPacket(Client *c, Timer *timer, uint8_t *packet_type) { MQTTHeader header = {0}; uint32_t len = 0; uint32_t rem_len = 0; uint32_t total_bytes_read = 0; uint32_t bytes_to_be_read = 0; int32_t ret_val = 0; MQTTReturnCode rc; if(NULL == c || NULL == timer) { ERROR("readPacket() MQTT_NULL_VALUE_ERROR"); return MQTT_NULL_VALUE_ERROR; } /* 1. read the header byte. This has the packet type in it */ if(1 != c->networkStack.mqttread(&(c->networkStack), c->readbuf, 1, left_ms(timer))) { /* If a network disconnect has occurred it would have been caught by keepalive already. * If nothing is found at this point means there was nothing to read. Not 100% correct, * but the only way to be sure is to pass proper error codes from the network stack * which the mbedtls/openssl implementations do not return */ return MQTT_NOTHING_TO_READ; } len = 1; /* 2. read the remaining length. This is variable in itself */ rc = decodePacket(c, &rem_len, (uint32_t)left_ms(timer)); if(SUCCESS != rc) { ERROR("readPacket() SUCCESS != rc"); return rc; } /* if the buffer is too short then the message will be dropped silently */ if (rem_len >= c->readBufSize) { bytes_to_be_read = c->readBufSize; do { ret_val = c->networkStack.mqttread(&(c->networkStack), c->readbuf, bytes_to_be_read, left_ms(timer)); if (ret_val > 0) { total_bytes_read += ret_val; if((rem_len - total_bytes_read) >= c->readBufSize){ bytes_to_be_read = c->readBufSize; } else{ bytes_to_be_read = rem_len - total_bytes_read; } } } while (total_bytes_read < rem_len && ret_val > 0); return MQTTPACKET_BUFFER_TOO_SHORT; } /* put the original remaining length back into the buffer */ len += MQTTPacket_encode(c->readbuf + 1, rem_len); /* 3. read the rest of the buffer using a callback to supply the rest of the data */ if(rem_len > 0 && (c->networkStack.mqttread(&(c->networkStack), c->readbuf + len, (int)rem_len, left_ms(timer)) != (int)rem_len)) { ERROR("readPacket() FAILURE"); return FAILURE; } header.byte = c->readbuf[0]; *packet_type = header.bits.type; return SUCCESS; } // assume topic filter and name is in correct format // # can only be at end // + and # can only be next to separator char isTopicMatched(char *topicFilter, MQTTString *topicName) { char *curf = NULL; char *curn = NULL; char *curn_end = NULL; if(NULL == topicFilter || NULL == topicName) { return MQTT_NULL_VALUE_ERROR; } curf = topicFilter; curn = topicName->lenstring.data; curn_end = curn + topicName->lenstring.len; while(*curf && (curn < curn_end)) { if(*curn == '/' && *curf != '/') { break; } if(*curf != '+' && *curf != '#' && *curf != *curn) { break; } if(*curf == '+') { /* skip until we meet the next separator, or end of string */ char *nextpos = curn + 1; while(nextpos < curn_end && *nextpos != '/') nextpos = ++curn + 1; } else if(*curf == '#') { /* skip until end of string */ curn = curn_end - 1; } curf++; curn++; }; return (curn == curn_end) && (*curf == '\0'); } MQTTReturnCode deliverMessage(Client *c, MQTTString *topicName, MQTTMessage *message) { uint32_t i; MessageData md; if(NULL == c || NULL == topicName || NULL == message) { return MQTT_NULL_VALUE_ERROR; } // we have to find the right message handler - indexed by topic for(i = 0; i < MAX_MESSAGE_HANDLERS; ++i) { if((c->messageHandlers[i].topicFilter != 0) && (MQTTPacket_equals(topicName, (char*)c->messageHandlers[i].topicFilter) || isTopicMatched((char*)c->messageHandlers[i].topicFilter, topicName))) { if(c->messageHandlers[i].fp != NULL) { NewMessageData(&md, topicName, message, c->messageHandlers[i].applicationHandler); c->messageHandlers[i].fp(&md); return SUCCESS; } } } if(NULL != c->defaultMessageHandler) { NewMessageData(&md, topicName, message, NULL); c->defaultMessageHandler(&md); return SUCCESS; } /* Message handler not found for topic */ return FAILURE; } MQTTReturnCode handleDisconnect(Client *c) { MQTTReturnCode rc; if(NULL == c) { return MQTT_NULL_VALUE_ERROR; } rc = MQTTDisconnect(c); if(rc != SUCCESS){ // If the sendPacket prevents us from sending a disconnect packet then we have to clean the stack MQTTForceDisconnect(c); } if(NULL != c->disconnectHandler) { c->disconnectHandler(); } /* Reset to 0 since this was not a manual disconnect */ c->wasManuallyDisconnected = 0; return MQTT_NETWORK_DISCONNECTED_ERROR; } MQTTReturnCode MQTTAttemptReconnect(Client *c) { MQTTReturnCode rc = MQTT_ATTEMPTING_RECONNECT; if(NULL == c) { return MQTT_NULL_VALUE_ERROR; } if(1 == c->isConnected) { return MQTT_NETWORK_ALREADY_CONNECTED_ERROR; } /* Ignoring return code. failures expected if network is disconnected */ rc = MQTTConnect(c, NULL); /* If still disconnected handle disconnect */ if(0 == c->isConnected) { return MQTT_ATTEMPTING_RECONNECT; } rc = MQTTResubscribe(c); if(SUCCESS != rc) { return rc; } return MQTT_NETWORK_RECONNECTED; } MQTTReturnCode handleReconnect(Client *c) { int8_t isPhysicalLayerConnected = 1; MQTTReturnCode rc = MQTT_NETWORK_RECONNECTED; if(NULL == c) { return MQTT_NULL_VALUE_ERROR; } if(!expired(&(c->reconnectDelayTimer))) { /* Timer has not expired. Not time to attempt reconnect yet. * Return attempting reconnect */ return MQTT_ATTEMPTING_RECONNECT; } if(NULL != c->networkStack.isConnected) { isPhysicalLayerConnected = (int8_t)c->networkStack.isConnected(&(c->networkStack)); } if(isPhysicalLayerConnected) { rc = MQTTAttemptReconnect(c); if(MQTT_NETWORK_RECONNECTED == rc) { return MQTT_NETWORK_RECONNECTED; } } c->currentReconnectWaitInterval *= 2; if(MAX_RECONNECT_WAIT_INTERVAL < c->currentReconnectWaitInterval) { return MQTT_RECONNECT_TIMED_OUT; } countdown_ms(&(c->reconnectDelayTimer), c->currentReconnectWaitInterval); return rc; } MQTTReturnCode keepalive(Client *c) { MQTTReturnCode rc = SUCCESS; Timer timer; uint32_t serialized_len = 0; if(NULL == c) { return MQTT_NULL_VALUE_ERROR; } if(0 == c->keepAliveInterval) { return SUCCESS; } if(!expired(&c->pingTimer)) { return SUCCESS; } if(c->isPingOutstanding) { return handleDisconnect(c); } /* there is no ping outstanding - send one */ InitTimer(&timer); countdown_ms(&timer, c->commandTimeoutMs); rc = MQTTSerialize_pingreq(c->buf, c->bufSize, &serialized_len); if(SUCCESS != rc) { return rc; } /* send the ping packet */ rc = sendPacket(c, serialized_len, &timer); if(SUCCESS != rc) { //If sending a PING fails we can no longer determine if we are connected. In this case we decide we are disconnected and begin reconnection attempts return handleDisconnect(c); } c->isPingOutstanding = 1; /* start a timer to wait for PINGRESP from server */ countdown(&c->pingTimer, c->keepAliveInterval / 2); return SUCCESS; } MQTTReturnCode handlePublish(Client *c, Timer *timer) { MQTTString topicName; MQTTMessage msg; MQTTReturnCode rc; uint32_t len = 0; rc = MQTTDeserialize_publish((unsigned char *) &msg.dup, (QoS *) &msg.qos, (unsigned char *) &msg.retained, (uint16_t *)&msg.id, &topicName, (unsigned char **) &msg.payload, (uint32_t *) &msg.payloadlen, c->readbuf, c->readBufSize); if(SUCCESS != rc) { return rc; } rc = deliverMessage(c, &topicName, &msg); if(SUCCESS != rc) { return rc; } if(QOS0 == msg.qos) { /* No further processing required for QOS0 */ return SUCCESS; } if(QOS1 == msg.qos) { rc = MQTTSerialize_ack(c->buf, c->bufSize, PUBACK, 0, msg.id, &len); } else { /* Message is not QOS0 or 1 means only option left is QOS2 */ rc = MQTTSerialize_ack(c->buf, c->bufSize, PUBREC, 0, msg.id, &len); } if(SUCCESS != rc) { return rc; } rc = sendPacket(c, len, timer); if(SUCCESS != rc) { return rc; } return SUCCESS; } MQTTReturnCode handlePubrec(Client *c, Timer *timer) { uint16_t packet_id; unsigned char dup, type; MQTTReturnCode rc; uint32_t len; rc = MQTTDeserialize_ack(&type, &dup, &packet_id, c->readbuf, c->readBufSize); if(SUCCESS != rc) { return rc; } rc = MQTTSerialize_ack(c->buf, c->bufSize, PUBREL, 0, packet_id, &len); if(SUCCESS != rc) { return rc; } /* send the PUBREL packet */ rc = sendPacket(c, len, timer); if(SUCCESS != rc) { /* there was a problem */ return rc; } return SUCCESS; } MQTTReturnCode cycle(Client *c, Timer *timer, uint8_t *packet_type) { MQTTReturnCode rc; if(NULL == c || NULL == timer) { ERROR("cycle() MQTT_NULL_VALUE_ERROR"); return MQTT_NULL_VALUE_ERROR; } /* read the socket, see what work is due */ rc = readPacket(c, timer, packet_type); if(MQTT_NOTHING_TO_READ == rc) { /* Nothing to read, not a cycle failure */ return SUCCESS; } if(SUCCESS != rc) { ERROR("cycle() SUCCESS != rc"); return rc; } switch(*packet_type) { case CONNACK: case PUBACK: case SUBACK: case UNSUBACK: break; case PUBLISH: { rc = handlePublish(c, timer); break; } case PUBREC: { rc = handlePubrec(c, timer); break; } case PUBCOMP: break; case PINGRESP: { c->isPingOutstanding = 0; countdown(&c->pingTimer, c->keepAliveInterval); break; } default: { /* Either unknown packet type or Failure occurred * Should not happen */ ERROR("cycle() Either unknown packet type or Failure occurred"); return MQTT_BUFFER_RX_MESSAGE_INVALID; break; } } return rc; } MQTTReturnCode MQTTYield(Client *c, uint32_t timeout_ms) { MQTTReturnCode rc = SUCCESS; Timer timer; uint8_t packet_type; if(NULL == c) { ERROR("MQTTYield() MQTT_NULL_VALUE_ERROR"); return MQTT_NULL_VALUE_ERROR; } /* Check if network was manually disconnected */ if(0 == c->isConnected && 1 == c->wasManuallyDisconnected) { ERROR("MQTTYield() MQTT_NETWORK_MANUALLY_DISCONNECTED"); return MQTT_NETWORK_MANUALLY_DISCONNECTED; } /* Check if network is disconnected and auto-reconnect is not enabled */ if(0 == c->isConnected && 0 == c->isAutoReconnectEnabled) { ERROR("MQTTYield() MQTT_NETWORK_DISCONNECTED_ERROR"); return MQTT_NETWORK_DISCONNECTED_ERROR; } InitTimer(&timer); countdown_ms(&timer, timeout_ms); while(!expired(&timer)) { if(0 == c->isConnected) { if(MAX_RECONNECT_WAIT_INTERVAL < c->currentReconnectWaitInterval) { rc = MQTT_RECONNECT_TIMED_OUT; ERROR("MQTTYield() MQTT_RECONNECT_TIMED_OUT"); break; } rc = handleReconnect(c); /* Network reconnect attempted, check if yield timer expired before * doing anything else */ continue; } rc = cycle(c, &timer, &packet_type); if(SUCCESS != rc) { ERROR("MQTTYield() SUCCESS != rc"); break; } rc = keepalive(c); if(MQTT_NETWORK_DISCONNECTED_ERROR == rc && 1 == c->isAutoReconnectEnabled) { c->currentReconnectWaitInterval = MIN_RECONNECT_WAIT_INTERVAL; countdown_ms(&(c->reconnectDelayTimer), c->currentReconnectWaitInterval); c->counterNetworkDisconnected++; /* Depending on timer values, it is possible that yield timer has expired * Set to rc to attempting reconnect to inform client that autoreconnect * attempt has started */ INFO("MQTTYield() MQTT_ATTEMPTING_RECONNECT"); rc = MQTT_ATTEMPTING_RECONNECT; } else if(SUCCESS != rc) { ERROR("MQTTYield() SUCCESS != rc"); break; } } return rc; } /* only used in single-threaded mode where one command at a time is in process */ MQTTReturnCode waitfor(Client *c, uint8_t packet_type, Timer *timer) { MQTTReturnCode rc = FAILURE; uint8_t read_packet_type = 0, retry = 2; if(NULL == c || NULL == timer) { ERROR("waitfor() MQTT_NULL_VALUE_ERROR"); return MQTT_NULL_VALUE_ERROR; } do { if(expired(timer)) { /* we timed out */ ERROR("waitfor() timer expired"); break; //countdown_ms(timer, 10); } rc = cycle(c, timer, &read_packet_type); }while(MQTT_NETWORK_DISCONNECTED_ERROR != rc && read_packet_type != packet_type); if(MQTT_NETWORK_DISCONNECTED_ERROR != rc && read_packet_type != packet_type) { ERROR("waitfor() MQTT_NETWORK_DISCONNECTED_ERROR"); return FAILURE; } /* Something failed or we didn't receive the expected packet, return error code */ return rc; } MQTTReturnCode MQTTConnect(Client *c, MQTTPacket_connectData *options) { Timer connect_timer; MQTTReturnCode connack_rc = FAILURE; char sessionPresent = 0; uint32_t len = 0; MQTTReturnCode rc = FAILURE; if(NULL == c) { return MQTT_NULL_VALUE_ERROR; } DEBUG("...connect_timer"); InitTimer(&connect_timer); countdown_ms(&connect_timer, c->commandTimeoutMs); if(c->isConnected) { /* Don't send connect packet again if we are already connected */ ERROR("...MQTT_NETWORK_ALREADY_CONNECTED_ERROR"); return MQTT_NETWORK_ALREADY_CONNECTED_ERROR; } if(NULL != options) { /* override default options if new options were supplied */ copyMQTTConnectData(&(c->options), options); } DEBUG("...TLS Connect"); c->networkInitHandler(&(c->networkStack)); rc = (MQTTReturnCode)c->networkStack.connect(&(c->networkStack), c->tlsConnectParams); if(0 != rc) { /* TLS Connect failed, return error */ ERROR("...TLS Connect failed, return error"); return FAILURE; } c->keepAliveInterval = c->options.keepAliveInterval; rc = MQTTSerialize_connect(c->buf, c->bufSize, &(c->options), &len); if(SUCCESS != rc || 0 >= len) { ERROR("...MQTTSerialize_connect FAIL"); return FAILURE; } /* send the connect packet */ rc = sendPacket(c, len, &connect_timer); if(SUCCESS != rc) { ERROR("...sendPacket FAIL"); return rc; } /* this will be a blocking call, wait for the CONNACK */ rc = waitfor(c, CONNACK, &connect_timer); if(SUCCESS != rc) { ERROR("...waitfor FAIL"); return rc; } /* Received CONNACK, check the return code */ rc = MQTTDeserialize_connack((unsigned char *)&sessionPresent, &connack_rc, c->readbuf, c->readBufSize); if(SUCCESS != rc) { ERROR("...MQTTDeserialize_connack FAIL"); return rc; } if(MQTT_CONNACK_CONNECTION_ACCEPTED != connack_rc) { ERROR("...MQTT_CONNACK_CONNECTION_ACCEPTED FAIL"); return connack_rc; } DEBUG("...isConnected"); c->isConnected = 1; c->wasManuallyDisconnected = 0; c->isPingOutstanding = 0; countdown(&c->pingTimer, c->keepAliveInterval); return SUCCESS; } /* Return MAX_MESSAGE_HANDLERS value if no free index is available */ uint32_t GetFreeMessageHandlerIndex(Client *c) { uint32_t itr; for(itr = 0; itr < MAX_MESSAGE_HANDLERS; itr++) { if(c->messageHandlers[itr].topicFilter == NULL) { break; } } return itr; } MQTTReturnCode MQTTSubscribe(Client *c, const char *topicFilter, QoS qos, messageHandler messageHandler, pApplicationHandler_t applicationHandler) { MQTTReturnCode rc = FAILURE; Timer timer; uint32_t len = 0; uint32_t indexOfFreeMessageHandler; uint32_t count = 0; QoS grantedQoS[3] = {QOS0, QOS0, QOS0}; uint16_t packetId; MQTTString topic = MQTTString_initializer; if(NULL == c || NULL == topicFilter || NULL == messageHandler || NULL == applicationHandler) { ERROR("...MQTT_NULL_VALUE_ERROR FAIL"); return MQTT_NULL_VALUE_ERROR; } if(!c->isConnected) { ERROR("...MQTT_NETWORK_DISCONNECTED_ERROR FAIL"); return MQTT_NETWORK_DISCONNECTED_ERROR; } topic.cstring = (char *)topicFilter; InitTimer(&timer); countdown_ms(&timer, c->commandTimeoutMs); rc = MQTTSerialize_subscribe(c->buf, c->bufSize, 0, getNextPacketId(c), 1, &topic, &qos, &len); if(SUCCESS != rc) { ERROR("...MQTTSerialize_subscribe FAIL"); return rc; } indexOfFreeMessageHandler = GetFreeMessageHandlerIndex(c); if(MAX_MESSAGE_HANDLERS <= indexOfFreeMessageHandler) { ERROR("...MQTT_MAX_SUBSCRIPTIONS_REACHED_ERROR FAIL"); return MQTT_MAX_SUBSCRIPTIONS_REACHED_ERROR; } /* send the subscribe packet */ rc = sendPacket(c, len, &timer); if(SUCCESS != rc) { ERROR("...send the subscribe packet FAIL"); return rc; } /* wait for suback */ rc = waitfor(c, SUBACK, &timer); if(SUCCESS != rc) { ERROR("...wait for suback FAIL"); return rc; } /* Granted QoS can be 0, 1 or 2 */ rc = MQTTDeserialize_suback(&packetId, 1, &count, grantedQoS, c->readbuf, c->readBufSize); if(SUCCESS != rc) { ERROR("...Granted QoS can be 0, 1 or 2"); return rc; } c->messageHandlers[indexOfFreeMessageHandler].topicFilter = topicFilter; c->messageHandlers[indexOfFreeMessageHandler].fp = messageHandler; c->messageHandlers[indexOfFreeMessageHandler].applicationHandler = applicationHandler; c->messageHandlers[indexOfFreeMessageHandler].qos = qos; DEBUG("...MQTTSubscribe SUCCESS"); return SUCCESS; } MQTTReturnCode MQTTResubscribe(Client *c) { MQTTReturnCode rc = FAILURE; Timer timer; uint32_t len = 0; uint32_t count = 0; QoS grantedQoS[3] = {QOS0, QOS0, QOS0}; uint16_t packetId; uint32_t existingSubCount = 0; uint32_t itr = 0; if(NULL == c) { return MQTT_NULL_VALUE_ERROR; } if(!c->isConnected) { return MQTT_NETWORK_DISCONNECTED_ERROR; } existingSubCount = GetFreeMessageHandlerIndex(c); for(itr = 0; itr < existingSubCount; itr++) { MQTTString topic = MQTTString_initializer; topic.cstring = (char *)c->messageHandlers[itr].topicFilter; InitTimer(&timer); countdown_ms(&timer, c->commandTimeoutMs); rc = MQTTSerialize_subscribe(c->buf, c->bufSize, 0, getNextPacketId(c), 1, &topic, &(c->messageHandlers[itr].qos), &len); if(SUCCESS != rc) { return rc; } /* send the subscribe packet */ rc = sendPacket(c, len, &timer); if(SUCCESS != rc) { return rc; } /* wait for suback */ rc = waitfor(c, SUBACK, &timer); if(SUCCESS != rc) { return rc; } /* Granted QoS can be 0, 1 or 2 */ rc = MQTTDeserialize_suback(&packetId, 1, &count, grantedQoS, c->readbuf, c->readBufSize); if(SUCCESS != rc) { return rc; } } return SUCCESS; } MQTTReturnCode MQTTUnsubscribe(Client *c, const char *topicFilter) { MQTTReturnCode rc = FAILURE; Timer timer; MQTTString topic = MQTTString_initializer; uint32_t len = 0; uint32_t i = 0; uint16_t packet_id; if(NULL == c || NULL == topicFilter) { return MQTT_NULL_VALUE_ERROR; } topic.cstring = (char *)topicFilter; if(!c->isConnected) { return MQTT_NETWORK_DISCONNECTED_ERROR; } InitTimer(&timer); countdown_ms(&timer, c->commandTimeoutMs); rc = MQTTSerialize_unsubscribe(c->buf, c->bufSize, 0, getNextPacketId(c), 1, &topic, &len); if(SUCCESS != rc) { return rc; } /* send the unsubscribe packet */ rc = sendPacket(c, len, &timer); if(SUCCESS != rc) { return rc; } rc = waitfor(c, UNSUBACK, &timer); if(SUCCESS != rc) { return rc; } rc = MQTTDeserialize_unsuback(&packet_id, c->readbuf, c->readBufSize); if(SUCCESS != rc) { return rc; } /* Remove from message handler array */ for(i = 0; i < MAX_MESSAGE_HANDLERS; ++i) { if(c->messageHandlers[i].topicFilter != NULL && (strcmp(c->messageHandlers[i].topicFilter, topicFilter) == 0)) { c->messageHandlers[i].topicFilter = NULL; /* We don't want to break here, if the same topic is registered * with 2 callbacks. Unlikely scenario */ } } return SUCCESS; } MQTTReturnCode MQTTPublish(Client *c, const char *topicName, MQTTMessage *message) { Timer timer; MQTTString topic = MQTTString_initializer; uint32_t len = 0; uint8_t waitForAck = 0; uint8_t packetType = PUBACK; uint16_t packet_id; unsigned char dup, type; MQTTReturnCode rc = FAILURE; if(NULL == c || NULL == topicName || NULL == message) { return MQTT_NULL_VALUE_ERROR; } topic.cstring = (char *)topicName; if(!c->isConnected) { return MQTT_NETWORK_DISCONNECTED_ERROR; } InitTimer(&timer); countdown_ms(&timer, c->commandTimeoutMs); if(QOS1 == message->qos || QOS2 == message->qos) { message->id = getNextPacketId(c); waitForAck = 1; if(QOS2 == message->qos) { packetType = PUBCOMP; } } rc = MQTTSerialize_publish(c->buf, c->bufSize, 0, message->qos, message->retained, message->id, topic, (unsigned char*)message->payload, message->payloadlen, &len); if(SUCCESS != rc) { return rc; } /* send the publish packet */ rc = sendPacket(c, len, &timer); if(SUCCESS != rc) { return rc; } /* Wait for ack if QoS1 or QoS2 */ if(1 == waitForAck) { rc = waitfor(c, packetType, &timer); if(SUCCESS != rc) { return rc; } rc = MQTTDeserialize_ack(&type, &dup, &packet_id, c->readbuf, c->readBufSize); if(SUCCESS != rc) { return rc; } } return SUCCESS; } /** * This is for the case when the sendPacket Fails. */ static void MQTTForceDisconnect(Client *c){ c->isConnected = 0; c->networkStack.disconnect(&(c->networkStack)); c->networkStack.destroy(&(c->networkStack)); } MQTTReturnCode MQTTDisconnect(Client *c) { MQTTReturnCode rc = FAILURE; /* We might wait for incomplete incoming publishes to complete */ Timer timer; uint32_t serialized_len = 0; if(NULL == c) { return MQTT_NULL_VALUE_ERROR; } if(0 == c->isConnected) { /* Network is already disconnected. Do nothing */ return MQTT_NETWORK_DISCONNECTED_ERROR; } rc = MQTTSerialize_disconnect(c->buf, c->bufSize, &serialized_len); if(SUCCESS != rc) { return rc; } InitTimer(&timer); countdown_ms(&timer, c->commandTimeoutMs); /* send the disconnect packet */ if(serialized_len > 0) { rc = sendPacket(c, serialized_len, &timer); if(SUCCESS != rc) { return rc; } } /* Clean network stack */ c->networkStack.disconnect(&(c->networkStack)); rc = (MQTTReturnCode)c->networkStack.destroy(&(c->networkStack)); if(0 != rc) { /* TLS Destroy failed, return error */ return FAILURE; } c->isConnected = 0; /* Always set to 1 whenever disconnect is called. Keepalive resets to 0 */ c->wasManuallyDisconnected = 1; return SUCCESS; } uint8_t MQTTIsConnected(Client *c) { if(NULL == c) { return 0; } return c->isConnected; } uint8_t MQTTIsAutoReconnectEnabled(Client *c) { if(NULL == c) { return 0; } return c->isAutoReconnectEnabled; } MQTTReturnCode setDisconnectHandler(Client *c, disconnectHandler_t disconnectHandler) { if(NULL == c || NULL == disconnectHandler) { return MQTT_NULL_VALUE_ERROR; } c->disconnectHandler = disconnectHandler; return SUCCESS; } MQTTReturnCode setAutoReconnectEnabled(Client *c, uint8_t value) { if(NULL == c) { return FAILURE; } c->isAutoReconnectEnabled = value; return SUCCESS; } uint32_t MQTTGetNetworkDisconnectedCount(Client *c) { return c->counterNetworkDisconnected; } void MQTTResetNetworkDisconnectedCount(Client *c) { c->counterNetworkDisconnected = 0; }