DIYmall 0.96" Inch I2c IIC Serial 128x64 Oled LCD LED White Display Module

Dependencies:   Adafruit_GFX SDFileSystem

Fork of ATT_AWS_IoT_demo by AT&T IoT

Committer:
afmiee
Date:
Tue Oct 09 20:57:34 2018 +0000
Revision:
28:4650c541b029
Parent:
15:6f2798e45099
DIYmall 0.96" Inch I2c IIC Serial 128x64 Oled LCD LED White Display Module

Who changed what in which revision?

UserRevisionLine numberNew contents of line
ampembeng 15:6f2798e45099 1 /*******************************************************************************
ampembeng 15:6f2798e45099 2 * Copyright (c) 2014 IBM Corp.
ampembeng 15:6f2798e45099 3 *
ampembeng 15:6f2798e45099 4 * All rights reserved. This program and the accompanying materials
ampembeng 15:6f2798e45099 5 * are made available under the terms of the Eclipse Public License v1.0
ampembeng 15:6f2798e45099 6 * and Eclipse Distribution License v1.0 which accompany this distribution.
ampembeng 15:6f2798e45099 7 *
ampembeng 15:6f2798e45099 8 * The Eclipse Public License is available at
ampembeng 15:6f2798e45099 9 * http://www.eclipse.org/legal/epl-v10.html
ampembeng 15:6f2798e45099 10 * and the Eclipse Distribution License is available at
ampembeng 15:6f2798e45099 11 * http://www.eclipse.org/org/documents/edl-v10.php.
ampembeng 15:6f2798e45099 12 *
ampembeng 15:6f2798e45099 13 * Contributors:
ampembeng 15:6f2798e45099 14 * Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation
ampembeng 15:6f2798e45099 15 *******************************************************************************/
ampembeng 15:6f2798e45099 16
ampembeng 15:6f2798e45099 17 #include "MQTTClient.h"
ampembeng 15:6f2798e45099 18 #include <string.h>
ampembeng 15:6f2798e45099 19 #include "aws_iot_log.h"
ampembeng 15:6f2798e45099 20
ampembeng 15:6f2798e45099 21 static void MQTTForceDisconnect(Client *c);
ampembeng 15:6f2798e45099 22
ampembeng 15:6f2798e45099 23 void NewMessageData(MessageData *md, MQTTString *aTopicName, MQTTMessage *aMessage, pApplicationHandler_t applicationHandler) {
ampembeng 15:6f2798e45099 24 md->topicName = aTopicName;
ampembeng 15:6f2798e45099 25 md->message = aMessage;
ampembeng 15:6f2798e45099 26 md->applicationHandler = applicationHandler;
ampembeng 15:6f2798e45099 27 }
ampembeng 15:6f2798e45099 28
ampembeng 15:6f2798e45099 29 uint16_t getNextPacketId(Client *c) {
ampembeng 15:6f2798e45099 30 return c->nextPacketId = (uint16_t)((MAX_PACKET_ID == c->nextPacketId) ? 1 : (c->nextPacketId + 1));
ampembeng 15:6f2798e45099 31 }
ampembeng 15:6f2798e45099 32
ampembeng 15:6f2798e45099 33 MQTTReturnCode sendPacket(Client *c, uint32_t length, Timer *timer) {
ampembeng 15:6f2798e45099 34 int32_t sentLen = 0;
ampembeng 15:6f2798e45099 35 uint32_t sent = 0, i;
ampembeng 15:6f2798e45099 36
ampembeng 15:6f2798e45099 37 if(NULL == c || NULL == timer) {
ampembeng 15:6f2798e45099 38 ERROR("...MQTT_NULL_VALUE_ERROR");
ampembeng 15:6f2798e45099 39 return MQTT_NULL_VALUE_ERROR;
ampembeng 15:6f2798e45099 40 }
ampembeng 15:6f2798e45099 41
ampembeng 15:6f2798e45099 42 if(length >= c->bufSize) {
ampembeng 15:6f2798e45099 43 ERROR("...MQTTPACKET_BUFFER_TOO_SHORT");
ampembeng 15:6f2798e45099 44 return MQTTPACKET_BUFFER_TOO_SHORT;
ampembeng 15:6f2798e45099 45 }
ampembeng 15:6f2798e45099 46
ampembeng 15:6f2798e45099 47 while(sent < length && !expired(timer)) {
ampembeng 15:6f2798e45099 48 sentLen = c->networkStack.mqttwrite(&(c->networkStack), &c->buf[sent], (int)length, left_ms(timer));
ampembeng 15:6f2798e45099 49 if(sentLen < 0) {
ampembeng 15:6f2798e45099 50 /* there was an error writing the data */
ampembeng 15:6f2798e45099 51 ERROR("...there was an error writing the data");
ampembeng 15:6f2798e45099 52 break;
ampembeng 15:6f2798e45099 53 }
ampembeng 15:6f2798e45099 54 sent = sent + (uint32_t)sentLen;
ampembeng 15:6f2798e45099 55 }
ampembeng 15:6f2798e45099 56
ampembeng 15:6f2798e45099 57 if(sent == length) {
ampembeng 15:6f2798e45099 58 /* record the fact that we have successfully sent the packet */
ampembeng 15:6f2798e45099 59 //countdown(&c->pingTimer, c->keepAliveInterval);
ampembeng 15:6f2798e45099 60 return SUCCESS;
ampembeng 15:6f2798e45099 61 }
ampembeng 15:6f2798e45099 62
ampembeng 15:6f2798e45099 63 ERROR("...sendPacket FAILURE, sent = %d, length = %d", sent, length);
ampembeng 15:6f2798e45099 64 return FAILURE;
ampembeng 15:6f2798e45099 65 }
ampembeng 15:6f2798e45099 66
ampembeng 15:6f2798e45099 67 void copyMQTTConnectData(MQTTPacket_connectData *destination, MQTTPacket_connectData *source) {
ampembeng 15:6f2798e45099 68 if(NULL == destination || NULL == source) {
ampembeng 15:6f2798e45099 69 return;
ampembeng 15:6f2798e45099 70 }
ampembeng 15:6f2798e45099 71 destination->willFlag = source->willFlag;
ampembeng 15:6f2798e45099 72 destination->MQTTVersion = source->MQTTVersion;
ampembeng 15:6f2798e45099 73 destination->clientID.cstring = source->clientID.cstring;
ampembeng 15:6f2798e45099 74 destination->username.cstring = source->username.cstring;
ampembeng 15:6f2798e45099 75 destination->password.cstring = source->password.cstring;
ampembeng 15:6f2798e45099 76 destination->will.topicName.cstring = source->will.topicName.cstring;
ampembeng 15:6f2798e45099 77 destination->will.message.cstring = source->will.message.cstring;
ampembeng 15:6f2798e45099 78 destination->will.qos = source->will.qos;
ampembeng 15:6f2798e45099 79 destination->will.retained = source->will.retained;
ampembeng 15:6f2798e45099 80 destination->keepAliveInterval = source->keepAliveInterval;
ampembeng 15:6f2798e45099 81 destination->cleansession = source->cleansession;
ampembeng 15:6f2798e45099 82 }
ampembeng 15:6f2798e45099 83
ampembeng 15:6f2798e45099 84 MQTTReturnCode MQTTClient(Client *c, uint32_t commandTimeoutMs,
ampembeng 15:6f2798e45099 85 unsigned char *buf, size_t bufSize, unsigned char *readbuf,
ampembeng 15:6f2798e45099 86 size_t readBufSize, uint8_t enableAutoReconnect,
ampembeng 15:6f2798e45099 87 networkInitHandler_t networkInitHandler,
ampembeng 15:6f2798e45099 88 TLSConnectParams *tlsConnectParams) {
ampembeng 15:6f2798e45099 89 uint32_t i;
ampembeng 15:6f2798e45099 90 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
ampembeng 15:6f2798e45099 91
ampembeng 15:6f2798e45099 92 if(NULL == c || NULL == tlsConnectParams || NULL == buf || NULL == readbuf
ampembeng 15:6f2798e45099 93 || NULL == networkInitHandler) {
ampembeng 15:6f2798e45099 94 return MQTT_NULL_VALUE_ERROR;
ampembeng 15:6f2798e45099 95 }
ampembeng 15:6f2798e45099 96
ampembeng 15:6f2798e45099 97 for(i = 0; i < MAX_MESSAGE_HANDLERS; ++i) {
ampembeng 15:6f2798e45099 98 c->messageHandlers[i].topicFilter = NULL;
ampembeng 15:6f2798e45099 99 c->messageHandlers[i].fp = NULL;
ampembeng 15:6f2798e45099 100 c->messageHandlers[i].applicationHandler = NULL;
ampembeng 15:6f2798e45099 101 c->messageHandlers[i].qos = (QoS)0;
ampembeng 15:6f2798e45099 102 }
ampembeng 15:6f2798e45099 103
ampembeng 15:6f2798e45099 104 c->commandTimeoutMs = commandTimeoutMs;
ampembeng 15:6f2798e45099 105 c->buf = buf;
ampembeng 15:6f2798e45099 106 c->bufSize = bufSize;
ampembeng 15:6f2798e45099 107 c->readbuf = readbuf;
ampembeng 15:6f2798e45099 108 c->readBufSize = readBufSize;
ampembeng 15:6f2798e45099 109 c->isConnected = 0;
ampembeng 15:6f2798e45099 110 c->isPingOutstanding = 0;
ampembeng 15:6f2798e45099 111 c->wasManuallyDisconnected = 0;
ampembeng 15:6f2798e45099 112 c->counterNetworkDisconnected = 0;
ampembeng 15:6f2798e45099 113 c->isAutoReconnectEnabled = enableAutoReconnect;
ampembeng 15:6f2798e45099 114 c->defaultMessageHandler = NULL;
ampembeng 15:6f2798e45099 115 c->disconnectHandler = NULL;
ampembeng 15:6f2798e45099 116 copyMQTTConnectData(&(c->options), &default_options);
ampembeng 15:6f2798e45099 117
ampembeng 15:6f2798e45099 118 c->networkInitHandler = networkInitHandler;
ampembeng 15:6f2798e45099 119 c->tlsConnectParams.DestinationPort = tlsConnectParams->DestinationPort;
ampembeng 15:6f2798e45099 120 c->tlsConnectParams.pDestinationURL = tlsConnectParams->pDestinationURL;
ampembeng 15:6f2798e45099 121 c->tlsConnectParams.pDeviceCertLocation = tlsConnectParams->pDeviceCertLocation;
ampembeng 15:6f2798e45099 122 c->tlsConnectParams.pDevicePrivateKeyLocation = tlsConnectParams->pDevicePrivateKeyLocation;
ampembeng 15:6f2798e45099 123 c->tlsConnectParams.pRootCALocation = tlsConnectParams->pRootCALocation;
ampembeng 15:6f2798e45099 124 c->tlsConnectParams.timeout_ms = tlsConnectParams->timeout_ms;
ampembeng 15:6f2798e45099 125 c->tlsConnectParams.ServerVerificationFlag = tlsConnectParams->ServerVerificationFlag;
ampembeng 15:6f2798e45099 126
ampembeng 15:6f2798e45099 127 InitTimer(&(c->pingTimer));
ampembeng 15:6f2798e45099 128 InitTimer(&(c->reconnectDelayTimer));
ampembeng 15:6f2798e45099 129
ampembeng 15:6f2798e45099 130 return SUCCESS;
ampembeng 15:6f2798e45099 131 }
ampembeng 15:6f2798e45099 132
ampembeng 15:6f2798e45099 133 MQTTReturnCode decodePacket(Client *c, uint32_t *value, uint32_t timeout) {
ampembeng 15:6f2798e45099 134 unsigned char i;
ampembeng 15:6f2798e45099 135 uint32_t multiplier = 1;
ampembeng 15:6f2798e45099 136 uint32_t len = 0;
ampembeng 15:6f2798e45099 137 const uint32_t MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
ampembeng 15:6f2798e45099 138
ampembeng 15:6f2798e45099 139 if(NULL == c || NULL == value) {
ampembeng 15:6f2798e45099 140 return MQTT_NULL_VALUE_ERROR;
ampembeng 15:6f2798e45099 141 }
ampembeng 15:6f2798e45099 142
ampembeng 15:6f2798e45099 143 *value = 0;
ampembeng 15:6f2798e45099 144
ampembeng 15:6f2798e45099 145 do {
ampembeng 15:6f2798e45099 146 if(++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) {
ampembeng 15:6f2798e45099 147 /* bad data */
ampembeng 15:6f2798e45099 148 return MQTTPACKET_READ_ERROR;
ampembeng 15:6f2798e45099 149 }
ampembeng 15:6f2798e45099 150
ampembeng 15:6f2798e45099 151 if((c->networkStack.mqttread(&(c->networkStack), &i, 1, (int)timeout)) != 1) {
ampembeng 15:6f2798e45099 152 /* The value argument is the important value. len is just used temporarily
ampembeng 15:6f2798e45099 153 * and never used by the calling function for anything else */
ampembeng 15:6f2798e45099 154 return FAILURE;
ampembeng 15:6f2798e45099 155 }
ampembeng 15:6f2798e45099 156
ampembeng 15:6f2798e45099 157 *value += ((i & 127) * multiplier);
ampembeng 15:6f2798e45099 158 multiplier *= 128;
ampembeng 15:6f2798e45099 159 }while((i & 128) != 0);
ampembeng 15:6f2798e45099 160
ampembeng 15:6f2798e45099 161 /* The value argument is the important value. len is just used temporarily
ampembeng 15:6f2798e45099 162 * and never used by the calling function for anything else */
ampembeng 15:6f2798e45099 163 return SUCCESS;
ampembeng 15:6f2798e45099 164 }
ampembeng 15:6f2798e45099 165
ampembeng 15:6f2798e45099 166 MQTTReturnCode readPacket(Client *c, Timer *timer, uint8_t *packet_type) {
ampembeng 15:6f2798e45099 167 MQTTHeader header = {0};
ampembeng 15:6f2798e45099 168 uint32_t len = 0;
ampembeng 15:6f2798e45099 169 uint32_t rem_len = 0;
ampembeng 15:6f2798e45099 170 uint32_t total_bytes_read = 0;
ampembeng 15:6f2798e45099 171 uint32_t bytes_to_be_read = 0;
ampembeng 15:6f2798e45099 172 int32_t ret_val = 0;
ampembeng 15:6f2798e45099 173 MQTTReturnCode rc;
ampembeng 15:6f2798e45099 174
ampembeng 15:6f2798e45099 175 if(NULL == c || NULL == timer) {
ampembeng 15:6f2798e45099 176 ERROR("readPacket() MQTT_NULL_VALUE_ERROR");
ampembeng 15:6f2798e45099 177 return MQTT_NULL_VALUE_ERROR;
ampembeng 15:6f2798e45099 178 }
ampembeng 15:6f2798e45099 179
ampembeng 15:6f2798e45099 180 /* 1. read the header byte. This has the packet type in it */
ampembeng 15:6f2798e45099 181 if(1 != c->networkStack.mqttread(&(c->networkStack), c->readbuf, 1, left_ms(timer))) {
ampembeng 15:6f2798e45099 182 /* If a network disconnect has occurred it would have been caught by keepalive already.
ampembeng 15:6f2798e45099 183 * If nothing is found at this point means there was nothing to read. Not 100% correct,
ampembeng 15:6f2798e45099 184 * but the only way to be sure is to pass proper error codes from the network stack
ampembeng 15:6f2798e45099 185 * which the mbedtls/openssl implementations do not return */
ampembeng 15:6f2798e45099 186 return MQTT_NOTHING_TO_READ;
ampembeng 15:6f2798e45099 187 }
ampembeng 15:6f2798e45099 188
ampembeng 15:6f2798e45099 189 len = 1;
ampembeng 15:6f2798e45099 190 /* 2. read the remaining length. This is variable in itself */
ampembeng 15:6f2798e45099 191 rc = decodePacket(c, &rem_len, (uint32_t)left_ms(timer));
ampembeng 15:6f2798e45099 192 if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 193 ERROR("readPacket() SUCCESS != rc");
ampembeng 15:6f2798e45099 194 return rc;
ampembeng 15:6f2798e45099 195 }
ampembeng 15:6f2798e45099 196
ampembeng 15:6f2798e45099 197 /* if the buffer is too short then the message will be dropped silently */
ampembeng 15:6f2798e45099 198 if (rem_len >= c->readBufSize) {
ampembeng 15:6f2798e45099 199 bytes_to_be_read = c->readBufSize;
ampembeng 15:6f2798e45099 200 do {
ampembeng 15:6f2798e45099 201 ret_val = c->networkStack.mqttread(&(c->networkStack), c->readbuf, bytes_to_be_read, left_ms(timer));
ampembeng 15:6f2798e45099 202 if (ret_val > 0) {
ampembeng 15:6f2798e45099 203 total_bytes_read += ret_val;
ampembeng 15:6f2798e45099 204 if((rem_len - total_bytes_read) >= c->readBufSize){
ampembeng 15:6f2798e45099 205 bytes_to_be_read = c->readBufSize;
ampembeng 15:6f2798e45099 206 }
ampembeng 15:6f2798e45099 207 else{
ampembeng 15:6f2798e45099 208 bytes_to_be_read = rem_len - total_bytes_read;
ampembeng 15:6f2798e45099 209 }
ampembeng 15:6f2798e45099 210 }
ampembeng 15:6f2798e45099 211 } while (total_bytes_read < rem_len && ret_val > 0);
ampembeng 15:6f2798e45099 212 return MQTTPACKET_BUFFER_TOO_SHORT;
ampembeng 15:6f2798e45099 213 }
ampembeng 15:6f2798e45099 214
ampembeng 15:6f2798e45099 215 /* put the original remaining length back into the buffer */
ampembeng 15:6f2798e45099 216 len += MQTTPacket_encode(c->readbuf + 1, rem_len);
ampembeng 15:6f2798e45099 217
ampembeng 15:6f2798e45099 218 /* 3. read the rest of the buffer using a callback to supply the rest of the data */
ampembeng 15:6f2798e45099 219 if(rem_len > 0 && (c->networkStack.mqttread(&(c->networkStack), c->readbuf + len, (int)rem_len, left_ms(timer)) != (int)rem_len)) {
ampembeng 15:6f2798e45099 220 ERROR("readPacket() FAILURE");
ampembeng 15:6f2798e45099 221 return FAILURE;
ampembeng 15:6f2798e45099 222 }
ampembeng 15:6f2798e45099 223
ampembeng 15:6f2798e45099 224 header.byte = c->readbuf[0];
ampembeng 15:6f2798e45099 225 *packet_type = header.bits.type;
ampembeng 15:6f2798e45099 226
ampembeng 15:6f2798e45099 227 return SUCCESS;
ampembeng 15:6f2798e45099 228 }
ampembeng 15:6f2798e45099 229
ampembeng 15:6f2798e45099 230 // assume topic filter and name is in correct format
ampembeng 15:6f2798e45099 231 // # can only be at end
ampembeng 15:6f2798e45099 232 // + and # can only be next to separator
ampembeng 15:6f2798e45099 233 char isTopicMatched(char *topicFilter, MQTTString *topicName) {
ampembeng 15:6f2798e45099 234 char *curf = NULL;
ampembeng 15:6f2798e45099 235 char *curn = NULL;
ampembeng 15:6f2798e45099 236 char *curn_end = NULL;
ampembeng 15:6f2798e45099 237
ampembeng 15:6f2798e45099 238 if(NULL == topicFilter || NULL == topicName) {
ampembeng 15:6f2798e45099 239 return MQTT_NULL_VALUE_ERROR;
ampembeng 15:6f2798e45099 240 }
ampembeng 15:6f2798e45099 241
ampembeng 15:6f2798e45099 242 curf = topicFilter;
ampembeng 15:6f2798e45099 243 curn = topicName->lenstring.data;
ampembeng 15:6f2798e45099 244 curn_end = curn + topicName->lenstring.len;
ampembeng 15:6f2798e45099 245
ampembeng 15:6f2798e45099 246 while(*curf && (curn < curn_end)) {
ampembeng 15:6f2798e45099 247 if(*curn == '/' && *curf != '/') {
ampembeng 15:6f2798e45099 248 break;
ampembeng 15:6f2798e45099 249 }
ampembeng 15:6f2798e45099 250 if(*curf != '+' && *curf != '#' && *curf != *curn) {
ampembeng 15:6f2798e45099 251 break;
ampembeng 15:6f2798e45099 252 }
ampembeng 15:6f2798e45099 253 if(*curf == '+') {
ampembeng 15:6f2798e45099 254 /* skip until we meet the next separator, or end of string */
ampembeng 15:6f2798e45099 255 char *nextpos = curn + 1;
ampembeng 15:6f2798e45099 256 while(nextpos < curn_end && *nextpos != '/')
ampembeng 15:6f2798e45099 257 nextpos = ++curn + 1;
ampembeng 15:6f2798e45099 258 } else if(*curf == '#') {
ampembeng 15:6f2798e45099 259 /* skip until end of string */
ampembeng 15:6f2798e45099 260 curn = curn_end - 1;
ampembeng 15:6f2798e45099 261 }
ampembeng 15:6f2798e45099 262
ampembeng 15:6f2798e45099 263 curf++;
ampembeng 15:6f2798e45099 264 curn++;
ampembeng 15:6f2798e45099 265 };
ampembeng 15:6f2798e45099 266
ampembeng 15:6f2798e45099 267 return (curn == curn_end) && (*curf == '\0');
ampembeng 15:6f2798e45099 268 }
ampembeng 15:6f2798e45099 269
ampembeng 15:6f2798e45099 270 MQTTReturnCode deliverMessage(Client *c, MQTTString *topicName, MQTTMessage *message) {
ampembeng 15:6f2798e45099 271 uint32_t i;
ampembeng 15:6f2798e45099 272 MessageData md;
ampembeng 15:6f2798e45099 273
ampembeng 15:6f2798e45099 274 if(NULL == c || NULL == topicName || NULL == message) {
ampembeng 15:6f2798e45099 275 return MQTT_NULL_VALUE_ERROR;
ampembeng 15:6f2798e45099 276 }
ampembeng 15:6f2798e45099 277
ampembeng 15:6f2798e45099 278 // we have to find the right message handler - indexed by topic
ampembeng 15:6f2798e45099 279 for(i = 0; i < MAX_MESSAGE_HANDLERS; ++i) {
ampembeng 15:6f2798e45099 280 if((c->messageHandlers[i].topicFilter != 0)
ampembeng 15:6f2798e45099 281 && (MQTTPacket_equals(topicName, (char*)c->messageHandlers[i].topicFilter) ||
ampembeng 15:6f2798e45099 282 isTopicMatched((char*)c->messageHandlers[i].topicFilter, topicName))) {
ampembeng 15:6f2798e45099 283 if(c->messageHandlers[i].fp != NULL) {
ampembeng 15:6f2798e45099 284 NewMessageData(&md, topicName, message, c->messageHandlers[i].applicationHandler);
ampembeng 15:6f2798e45099 285 c->messageHandlers[i].fp(&md);
ampembeng 15:6f2798e45099 286 return SUCCESS;
ampembeng 15:6f2798e45099 287 }
ampembeng 15:6f2798e45099 288 }
ampembeng 15:6f2798e45099 289 }
ampembeng 15:6f2798e45099 290
ampembeng 15:6f2798e45099 291 if(NULL != c->defaultMessageHandler) {
ampembeng 15:6f2798e45099 292 NewMessageData(&md, topicName, message, NULL);
ampembeng 15:6f2798e45099 293 c->defaultMessageHandler(&md);
ampembeng 15:6f2798e45099 294 return SUCCESS;
ampembeng 15:6f2798e45099 295 }
ampembeng 15:6f2798e45099 296
ampembeng 15:6f2798e45099 297 /* Message handler not found for topic */
ampembeng 15:6f2798e45099 298 return FAILURE;
ampembeng 15:6f2798e45099 299 }
ampembeng 15:6f2798e45099 300
ampembeng 15:6f2798e45099 301 MQTTReturnCode handleDisconnect(Client *c) {
ampembeng 15:6f2798e45099 302 MQTTReturnCode rc;
ampembeng 15:6f2798e45099 303
ampembeng 15:6f2798e45099 304 if(NULL == c) {
ampembeng 15:6f2798e45099 305 return MQTT_NULL_VALUE_ERROR;
ampembeng 15:6f2798e45099 306 }
ampembeng 15:6f2798e45099 307
ampembeng 15:6f2798e45099 308 rc = MQTTDisconnect(c);
ampembeng 15:6f2798e45099 309 if(rc != SUCCESS){
ampembeng 15:6f2798e45099 310 // If the sendPacket prevents us from sending a disconnect packet then we have to clean the stack
ampembeng 15:6f2798e45099 311 MQTTForceDisconnect(c);
ampembeng 15:6f2798e45099 312 }
ampembeng 15:6f2798e45099 313
ampembeng 15:6f2798e45099 314 if(NULL != c->disconnectHandler) {
ampembeng 15:6f2798e45099 315 c->disconnectHandler();
ampembeng 15:6f2798e45099 316 }
ampembeng 15:6f2798e45099 317
ampembeng 15:6f2798e45099 318 /* Reset to 0 since this was not a manual disconnect */
ampembeng 15:6f2798e45099 319 c->wasManuallyDisconnected = 0;
ampembeng 15:6f2798e45099 320 return MQTT_NETWORK_DISCONNECTED_ERROR;
ampembeng 15:6f2798e45099 321 }
ampembeng 15:6f2798e45099 322
ampembeng 15:6f2798e45099 323 MQTTReturnCode MQTTAttemptReconnect(Client *c) {
ampembeng 15:6f2798e45099 324 MQTTReturnCode rc = MQTT_ATTEMPTING_RECONNECT;
ampembeng 15:6f2798e45099 325
ampembeng 15:6f2798e45099 326 if(NULL == c) {
ampembeng 15:6f2798e45099 327 return MQTT_NULL_VALUE_ERROR;
ampembeng 15:6f2798e45099 328 }
ampembeng 15:6f2798e45099 329
ampembeng 15:6f2798e45099 330 if(1 == c->isConnected) {
ampembeng 15:6f2798e45099 331 return MQTT_NETWORK_ALREADY_CONNECTED_ERROR;
ampembeng 15:6f2798e45099 332 }
ampembeng 15:6f2798e45099 333
ampembeng 15:6f2798e45099 334 /* Ignoring return code. failures expected if network is disconnected */
ampembeng 15:6f2798e45099 335 rc = MQTTConnect(c, NULL);
ampembeng 15:6f2798e45099 336
ampembeng 15:6f2798e45099 337 /* If still disconnected handle disconnect */
ampembeng 15:6f2798e45099 338 if(0 == c->isConnected) {
ampembeng 15:6f2798e45099 339 return MQTT_ATTEMPTING_RECONNECT;
ampembeng 15:6f2798e45099 340 }
ampembeng 15:6f2798e45099 341
ampembeng 15:6f2798e45099 342 rc = MQTTResubscribe(c);
ampembeng 15:6f2798e45099 343 if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 344 return rc;
ampembeng 15:6f2798e45099 345 }
ampembeng 15:6f2798e45099 346
ampembeng 15:6f2798e45099 347 return MQTT_NETWORK_RECONNECTED;
ampembeng 15:6f2798e45099 348 }
ampembeng 15:6f2798e45099 349
ampembeng 15:6f2798e45099 350 MQTTReturnCode handleReconnect(Client *c) {
ampembeng 15:6f2798e45099 351 int8_t isPhysicalLayerConnected = 1;
ampembeng 15:6f2798e45099 352 MQTTReturnCode rc = MQTT_NETWORK_RECONNECTED;
ampembeng 15:6f2798e45099 353
ampembeng 15:6f2798e45099 354 if(NULL == c) {
ampembeng 15:6f2798e45099 355 return MQTT_NULL_VALUE_ERROR;
ampembeng 15:6f2798e45099 356 }
ampembeng 15:6f2798e45099 357
ampembeng 15:6f2798e45099 358 if(!expired(&(c->reconnectDelayTimer))) {
ampembeng 15:6f2798e45099 359 /* Timer has not expired. Not time to attempt reconnect yet.
ampembeng 15:6f2798e45099 360 * Return attempting reconnect */
ampembeng 15:6f2798e45099 361 return MQTT_ATTEMPTING_RECONNECT;
ampembeng 15:6f2798e45099 362 }
ampembeng 15:6f2798e45099 363
ampembeng 15:6f2798e45099 364 if(NULL != c->networkStack.isConnected) {
ampembeng 15:6f2798e45099 365 isPhysicalLayerConnected = (int8_t)c->networkStack.isConnected(&(c->networkStack));
ampembeng 15:6f2798e45099 366 }
ampembeng 15:6f2798e45099 367
ampembeng 15:6f2798e45099 368 if(isPhysicalLayerConnected) {
ampembeng 15:6f2798e45099 369 rc = MQTTAttemptReconnect(c);
ampembeng 15:6f2798e45099 370 if(MQTT_NETWORK_RECONNECTED == rc) {
ampembeng 15:6f2798e45099 371 return MQTT_NETWORK_RECONNECTED;
ampembeng 15:6f2798e45099 372 }
ampembeng 15:6f2798e45099 373 }
ampembeng 15:6f2798e45099 374
ampembeng 15:6f2798e45099 375 c->currentReconnectWaitInterval *= 2;
ampembeng 15:6f2798e45099 376
ampembeng 15:6f2798e45099 377 if(MAX_RECONNECT_WAIT_INTERVAL < c->currentReconnectWaitInterval) {
ampembeng 15:6f2798e45099 378 return MQTT_RECONNECT_TIMED_OUT;
ampembeng 15:6f2798e45099 379 }
ampembeng 15:6f2798e45099 380 countdown_ms(&(c->reconnectDelayTimer), c->currentReconnectWaitInterval);
ampembeng 15:6f2798e45099 381 return rc;
ampembeng 15:6f2798e45099 382 }
ampembeng 15:6f2798e45099 383
ampembeng 15:6f2798e45099 384 MQTTReturnCode keepalive(Client *c) {
ampembeng 15:6f2798e45099 385 MQTTReturnCode rc = SUCCESS;
ampembeng 15:6f2798e45099 386 Timer timer;
ampembeng 15:6f2798e45099 387 uint32_t serialized_len = 0;
ampembeng 15:6f2798e45099 388
ampembeng 15:6f2798e45099 389 if(NULL == c) {
ampembeng 15:6f2798e45099 390 return MQTT_NULL_VALUE_ERROR;
ampembeng 15:6f2798e45099 391 }
ampembeng 15:6f2798e45099 392
ampembeng 15:6f2798e45099 393 if(0 == c->keepAliveInterval) {
ampembeng 15:6f2798e45099 394 return SUCCESS;
ampembeng 15:6f2798e45099 395 }
ampembeng 15:6f2798e45099 396
ampembeng 15:6f2798e45099 397 if(!expired(&c->pingTimer)) {
ampembeng 15:6f2798e45099 398 return SUCCESS;
ampembeng 15:6f2798e45099 399 }
ampembeng 15:6f2798e45099 400
ampembeng 15:6f2798e45099 401 if(c->isPingOutstanding) {
ampembeng 15:6f2798e45099 402 return handleDisconnect(c);
ampembeng 15:6f2798e45099 403 }
ampembeng 15:6f2798e45099 404
ampembeng 15:6f2798e45099 405 /* there is no ping outstanding - send one */
ampembeng 15:6f2798e45099 406 InitTimer(&timer);
ampembeng 15:6f2798e45099 407 countdown_ms(&timer, c->commandTimeoutMs);
ampembeng 15:6f2798e45099 408 rc = MQTTSerialize_pingreq(c->buf, c->bufSize, &serialized_len);
ampembeng 15:6f2798e45099 409 if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 410 return rc;
ampembeng 15:6f2798e45099 411 }
ampembeng 15:6f2798e45099 412
ampembeng 15:6f2798e45099 413 /* send the ping packet */
ampembeng 15:6f2798e45099 414 rc = sendPacket(c, serialized_len, &timer);
ampembeng 15:6f2798e45099 415 if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 416 //If sending a PING fails we can no longer determine if we are connected. In this case we decide we are disconnected and begin reconnection attempts
ampembeng 15:6f2798e45099 417 return handleDisconnect(c);
ampembeng 15:6f2798e45099 418 }
ampembeng 15:6f2798e45099 419
ampembeng 15:6f2798e45099 420 c->isPingOutstanding = 1;
ampembeng 15:6f2798e45099 421 /* start a timer to wait for PINGRESP from server */
ampembeng 15:6f2798e45099 422 countdown(&c->pingTimer, c->keepAliveInterval / 2);
ampembeng 15:6f2798e45099 423
ampembeng 15:6f2798e45099 424 return SUCCESS;
ampembeng 15:6f2798e45099 425 }
ampembeng 15:6f2798e45099 426
ampembeng 15:6f2798e45099 427 MQTTReturnCode handlePublish(Client *c, Timer *timer) {
ampembeng 15:6f2798e45099 428 MQTTString topicName;
ampembeng 15:6f2798e45099 429 MQTTMessage msg;
ampembeng 15:6f2798e45099 430 MQTTReturnCode rc;
ampembeng 15:6f2798e45099 431 uint32_t len = 0;
ampembeng 15:6f2798e45099 432
ampembeng 15:6f2798e45099 433 rc = MQTTDeserialize_publish((unsigned char *) &msg.dup, (QoS *) &msg.qos, (unsigned char *) &msg.retained,
ampembeng 15:6f2798e45099 434 (uint16_t *)&msg.id, &topicName,
ampembeng 15:6f2798e45099 435 (unsigned char **) &msg.payload, (uint32_t *) &msg.payloadlen, c->readbuf,
ampembeng 15:6f2798e45099 436 c->readBufSize);
ampembeng 15:6f2798e45099 437 if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 438 return rc;
ampembeng 15:6f2798e45099 439 }
ampembeng 15:6f2798e45099 440
ampembeng 15:6f2798e45099 441 rc = deliverMessage(c, &topicName, &msg);
ampembeng 15:6f2798e45099 442 if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 443 return rc;
ampembeng 15:6f2798e45099 444 }
ampembeng 15:6f2798e45099 445
ampembeng 15:6f2798e45099 446 if(QOS0 == msg.qos) {
ampembeng 15:6f2798e45099 447 /* No further processing required for QOS0 */
ampembeng 15:6f2798e45099 448 return SUCCESS;
ampembeng 15:6f2798e45099 449 }
ampembeng 15:6f2798e45099 450
ampembeng 15:6f2798e45099 451 if(QOS1 == msg.qos) {
ampembeng 15:6f2798e45099 452 rc = MQTTSerialize_ack(c->buf, c->bufSize, PUBACK, 0, msg.id, &len);
ampembeng 15:6f2798e45099 453 } else { /* Message is not QOS0 or 1 means only option left is QOS2 */
ampembeng 15:6f2798e45099 454 rc = MQTTSerialize_ack(c->buf, c->bufSize, PUBREC, 0, msg.id, &len);
ampembeng 15:6f2798e45099 455 }
ampembeng 15:6f2798e45099 456
ampembeng 15:6f2798e45099 457 if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 458 return rc;
ampembeng 15:6f2798e45099 459 }
ampembeng 15:6f2798e45099 460
ampembeng 15:6f2798e45099 461 rc = sendPacket(c, len, timer);
ampembeng 15:6f2798e45099 462 if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 463 return rc;
ampembeng 15:6f2798e45099 464 }
ampembeng 15:6f2798e45099 465
ampembeng 15:6f2798e45099 466 return SUCCESS;
ampembeng 15:6f2798e45099 467 }
ampembeng 15:6f2798e45099 468
ampembeng 15:6f2798e45099 469 MQTTReturnCode handlePubrec(Client *c, Timer *timer) {
ampembeng 15:6f2798e45099 470 uint16_t packet_id;
ampembeng 15:6f2798e45099 471 unsigned char dup, type;
ampembeng 15:6f2798e45099 472 MQTTReturnCode rc;
ampembeng 15:6f2798e45099 473 uint32_t len;
ampembeng 15:6f2798e45099 474
ampembeng 15:6f2798e45099 475 rc = MQTTDeserialize_ack(&type, &dup, &packet_id, c->readbuf, c->readBufSize);
ampembeng 15:6f2798e45099 476 if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 477 return rc;
ampembeng 15:6f2798e45099 478 }
ampembeng 15:6f2798e45099 479
ampembeng 15:6f2798e45099 480 rc = MQTTSerialize_ack(c->buf, c->bufSize, PUBREL, 0, packet_id, &len);
ampembeng 15:6f2798e45099 481 if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 482 return rc;
ampembeng 15:6f2798e45099 483 }
ampembeng 15:6f2798e45099 484
ampembeng 15:6f2798e45099 485 /* send the PUBREL packet */
ampembeng 15:6f2798e45099 486 rc = sendPacket(c, len, timer);
ampembeng 15:6f2798e45099 487 if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 488 /* there was a problem */
ampembeng 15:6f2798e45099 489 return rc;
ampembeng 15:6f2798e45099 490 }
ampembeng 15:6f2798e45099 491
ampembeng 15:6f2798e45099 492 return SUCCESS;
ampembeng 15:6f2798e45099 493 }
ampembeng 15:6f2798e45099 494
ampembeng 15:6f2798e45099 495 MQTTReturnCode cycle(Client *c, Timer *timer, uint8_t *packet_type) {
ampembeng 15:6f2798e45099 496 MQTTReturnCode rc;
ampembeng 15:6f2798e45099 497 if(NULL == c || NULL == timer) {
ampembeng 15:6f2798e45099 498 ERROR("cycle() MQTT_NULL_VALUE_ERROR");
ampembeng 15:6f2798e45099 499 return MQTT_NULL_VALUE_ERROR;
ampembeng 15:6f2798e45099 500 }
ampembeng 15:6f2798e45099 501
ampembeng 15:6f2798e45099 502 /* read the socket, see what work is due */
ampembeng 15:6f2798e45099 503 rc = readPacket(c, timer, packet_type);
ampembeng 15:6f2798e45099 504 if(MQTT_NOTHING_TO_READ == rc) {
ampembeng 15:6f2798e45099 505 /* Nothing to read, not a cycle failure */
ampembeng 15:6f2798e45099 506 return SUCCESS;
ampembeng 15:6f2798e45099 507 }
ampembeng 15:6f2798e45099 508 if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 509 ERROR("cycle() SUCCESS != rc");
ampembeng 15:6f2798e45099 510 return rc;
ampembeng 15:6f2798e45099 511 }
ampembeng 15:6f2798e45099 512
ampembeng 15:6f2798e45099 513 switch(*packet_type) {
ampembeng 15:6f2798e45099 514 case CONNACK:
ampembeng 15:6f2798e45099 515 case PUBACK:
ampembeng 15:6f2798e45099 516 case SUBACK:
ampembeng 15:6f2798e45099 517 case UNSUBACK:
ampembeng 15:6f2798e45099 518 break;
ampembeng 15:6f2798e45099 519 case PUBLISH: {
ampembeng 15:6f2798e45099 520 rc = handlePublish(c, timer);
ampembeng 15:6f2798e45099 521 break;
ampembeng 15:6f2798e45099 522 }
ampembeng 15:6f2798e45099 523 case PUBREC: {
ampembeng 15:6f2798e45099 524 rc = handlePubrec(c, timer);
ampembeng 15:6f2798e45099 525 break;
ampembeng 15:6f2798e45099 526 }
ampembeng 15:6f2798e45099 527 case PUBCOMP:
ampembeng 15:6f2798e45099 528 break;
ampembeng 15:6f2798e45099 529 case PINGRESP: {
ampembeng 15:6f2798e45099 530 c->isPingOutstanding = 0;
ampembeng 15:6f2798e45099 531 countdown(&c->pingTimer, c->keepAliveInterval);
ampembeng 15:6f2798e45099 532 break;
ampembeng 15:6f2798e45099 533 }
ampembeng 15:6f2798e45099 534 default: {
ampembeng 15:6f2798e45099 535 /* Either unknown packet type or Failure occurred
ampembeng 15:6f2798e45099 536 * Should not happen */
ampembeng 15:6f2798e45099 537 ERROR("cycle() Either unknown packet type or Failure occurred");
ampembeng 15:6f2798e45099 538 return MQTT_BUFFER_RX_MESSAGE_INVALID;
ampembeng 15:6f2798e45099 539 break;
ampembeng 15:6f2798e45099 540 }
ampembeng 15:6f2798e45099 541 }
ampembeng 15:6f2798e45099 542
ampembeng 15:6f2798e45099 543 return rc;
ampembeng 15:6f2798e45099 544 }
ampembeng 15:6f2798e45099 545
ampembeng 15:6f2798e45099 546 MQTTReturnCode MQTTYield(Client *c, uint32_t timeout_ms) {
ampembeng 15:6f2798e45099 547 MQTTReturnCode rc = SUCCESS;
ampembeng 15:6f2798e45099 548 Timer timer;
ampembeng 15:6f2798e45099 549 uint8_t packet_type;
ampembeng 15:6f2798e45099 550
ampembeng 15:6f2798e45099 551 if(NULL == c) {
ampembeng 15:6f2798e45099 552 ERROR("MQTTYield() MQTT_NULL_VALUE_ERROR");
ampembeng 15:6f2798e45099 553 return MQTT_NULL_VALUE_ERROR;
ampembeng 15:6f2798e45099 554 }
ampembeng 15:6f2798e45099 555
ampembeng 15:6f2798e45099 556 /* Check if network was manually disconnected */
ampembeng 15:6f2798e45099 557 if(0 == c->isConnected && 1 == c->wasManuallyDisconnected) {
ampembeng 15:6f2798e45099 558 ERROR("MQTTYield() MQTT_NETWORK_MANUALLY_DISCONNECTED");
ampembeng 15:6f2798e45099 559 return MQTT_NETWORK_MANUALLY_DISCONNECTED;
ampembeng 15:6f2798e45099 560 }
ampembeng 15:6f2798e45099 561
ampembeng 15:6f2798e45099 562 /* Check if network is disconnected and auto-reconnect is not enabled */
ampembeng 15:6f2798e45099 563 if(0 == c->isConnected && 0 == c->isAutoReconnectEnabled) {
ampembeng 15:6f2798e45099 564 ERROR("MQTTYield() MQTT_NETWORK_DISCONNECTED_ERROR");
ampembeng 15:6f2798e45099 565 return MQTT_NETWORK_DISCONNECTED_ERROR;
ampembeng 15:6f2798e45099 566 }
ampembeng 15:6f2798e45099 567
ampembeng 15:6f2798e45099 568 InitTimer(&timer);
ampembeng 15:6f2798e45099 569 countdown_ms(&timer, timeout_ms);
ampembeng 15:6f2798e45099 570
ampembeng 15:6f2798e45099 571 while(!expired(&timer)) {
ampembeng 15:6f2798e45099 572 if(0 == c->isConnected) {
ampembeng 15:6f2798e45099 573 if(MAX_RECONNECT_WAIT_INTERVAL < c->currentReconnectWaitInterval) {
ampembeng 15:6f2798e45099 574 rc = MQTT_RECONNECT_TIMED_OUT;
ampembeng 15:6f2798e45099 575 ERROR("MQTTYield() MQTT_RECONNECT_TIMED_OUT");
ampembeng 15:6f2798e45099 576 break;
ampembeng 15:6f2798e45099 577 }
ampembeng 15:6f2798e45099 578
ampembeng 15:6f2798e45099 579 rc = handleReconnect(c);
ampembeng 15:6f2798e45099 580 /* Network reconnect attempted, check if yield timer expired before
ampembeng 15:6f2798e45099 581 * doing anything else */
ampembeng 15:6f2798e45099 582 continue;
ampembeng 15:6f2798e45099 583 }
ampembeng 15:6f2798e45099 584
ampembeng 15:6f2798e45099 585 rc = cycle(c, &timer, &packet_type);
ampembeng 15:6f2798e45099 586 if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 587 ERROR("MQTTYield() SUCCESS != rc");
ampembeng 15:6f2798e45099 588 break;
ampembeng 15:6f2798e45099 589 }
ampembeng 15:6f2798e45099 590
ampembeng 15:6f2798e45099 591 rc = keepalive(c);
ampembeng 15:6f2798e45099 592 if(MQTT_NETWORK_DISCONNECTED_ERROR == rc && 1 == c->isAutoReconnectEnabled) {
ampembeng 15:6f2798e45099 593 c->currentReconnectWaitInterval = MIN_RECONNECT_WAIT_INTERVAL;
ampembeng 15:6f2798e45099 594 countdown_ms(&(c->reconnectDelayTimer), c->currentReconnectWaitInterval);
ampembeng 15:6f2798e45099 595 c->counterNetworkDisconnected++;
ampembeng 15:6f2798e45099 596 /* Depending on timer values, it is possible that yield timer has expired
ampembeng 15:6f2798e45099 597 * Set to rc to attempting reconnect to inform client that autoreconnect
ampembeng 15:6f2798e45099 598 * attempt has started */
ampembeng 15:6f2798e45099 599 INFO("MQTTYield() MQTT_ATTEMPTING_RECONNECT");
ampembeng 15:6f2798e45099 600 rc = MQTT_ATTEMPTING_RECONNECT;
ampembeng 15:6f2798e45099 601 } else if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 602 ERROR("MQTTYield() SUCCESS != rc");
ampembeng 15:6f2798e45099 603 break;
ampembeng 15:6f2798e45099 604 }
ampembeng 15:6f2798e45099 605 }
ampembeng 15:6f2798e45099 606
ampembeng 15:6f2798e45099 607 return rc;
ampembeng 15:6f2798e45099 608 }
ampembeng 15:6f2798e45099 609
ampembeng 15:6f2798e45099 610 /* only used in single-threaded mode where one command at a time is in process */
ampembeng 15:6f2798e45099 611 MQTTReturnCode waitfor(Client *c, uint8_t packet_type, Timer *timer) {
ampembeng 15:6f2798e45099 612 MQTTReturnCode rc = FAILURE;
ampembeng 15:6f2798e45099 613 uint8_t read_packet_type = 0, retry = 2;
ampembeng 15:6f2798e45099 614
ampembeng 15:6f2798e45099 615
ampembeng 15:6f2798e45099 616
ampembeng 15:6f2798e45099 617 if(NULL == c || NULL == timer) {
ampembeng 15:6f2798e45099 618 ERROR("waitfor() MQTT_NULL_VALUE_ERROR");
ampembeng 15:6f2798e45099 619 return MQTT_NULL_VALUE_ERROR;
ampembeng 15:6f2798e45099 620 }
ampembeng 15:6f2798e45099 621
ampembeng 15:6f2798e45099 622 do {
ampembeng 15:6f2798e45099 623 if(expired(timer)) {
ampembeng 15:6f2798e45099 624 /* we timed out */
ampembeng 15:6f2798e45099 625 ERROR("waitfor() timer expired");
ampembeng 15:6f2798e45099 626 break;
ampembeng 15:6f2798e45099 627
ampembeng 15:6f2798e45099 628 //countdown_ms(timer, 10);
ampembeng 15:6f2798e45099 629 }
ampembeng 15:6f2798e45099 630
ampembeng 15:6f2798e45099 631 rc = cycle(c, timer, &read_packet_type);
ampembeng 15:6f2798e45099 632 }while(MQTT_NETWORK_DISCONNECTED_ERROR != rc && read_packet_type != packet_type);
ampembeng 15:6f2798e45099 633
ampembeng 15:6f2798e45099 634 if(MQTT_NETWORK_DISCONNECTED_ERROR != rc && read_packet_type != packet_type) {
ampembeng 15:6f2798e45099 635 ERROR("waitfor() MQTT_NETWORK_DISCONNECTED_ERROR");
ampembeng 15:6f2798e45099 636 return FAILURE;
ampembeng 15:6f2798e45099 637 }
ampembeng 15:6f2798e45099 638
ampembeng 15:6f2798e45099 639 /* Something failed or we didn't receive the expected packet, return error code */
ampembeng 15:6f2798e45099 640 return rc;
ampembeng 15:6f2798e45099 641 }
ampembeng 15:6f2798e45099 642
ampembeng 15:6f2798e45099 643 MQTTReturnCode MQTTConnect(Client *c, MQTTPacket_connectData *options) {
ampembeng 15:6f2798e45099 644 Timer connect_timer;
ampembeng 15:6f2798e45099 645 MQTTReturnCode connack_rc = FAILURE;
ampembeng 15:6f2798e45099 646 char sessionPresent = 0;
ampembeng 15:6f2798e45099 647 uint32_t len = 0;
ampembeng 15:6f2798e45099 648 MQTTReturnCode rc = FAILURE;
ampembeng 15:6f2798e45099 649
ampembeng 15:6f2798e45099 650 if(NULL == c) {
ampembeng 15:6f2798e45099 651 return MQTT_NULL_VALUE_ERROR;
ampembeng 15:6f2798e45099 652 }
ampembeng 15:6f2798e45099 653
ampembeng 15:6f2798e45099 654 DEBUG("...connect_timer");
ampembeng 15:6f2798e45099 655 InitTimer(&connect_timer);
ampembeng 15:6f2798e45099 656 countdown_ms(&connect_timer, c->commandTimeoutMs);
ampembeng 15:6f2798e45099 657
ampembeng 15:6f2798e45099 658 if(c->isConnected) {
ampembeng 15:6f2798e45099 659 /* Don't send connect packet again if we are already connected */
ampembeng 15:6f2798e45099 660 ERROR("...MQTT_NETWORK_ALREADY_CONNECTED_ERROR");
ampembeng 15:6f2798e45099 661 return MQTT_NETWORK_ALREADY_CONNECTED_ERROR;
ampembeng 15:6f2798e45099 662 }
ampembeng 15:6f2798e45099 663
ampembeng 15:6f2798e45099 664 if(NULL != options) {
ampembeng 15:6f2798e45099 665 /* override default options if new options were supplied */
ampembeng 15:6f2798e45099 666 copyMQTTConnectData(&(c->options), options);
ampembeng 15:6f2798e45099 667 }
ampembeng 15:6f2798e45099 668
ampembeng 15:6f2798e45099 669 DEBUG("...TLS Connect");
ampembeng 15:6f2798e45099 670 c->networkInitHandler(&(c->networkStack));
ampembeng 15:6f2798e45099 671 rc = (MQTTReturnCode)c->networkStack.connect(&(c->networkStack), c->tlsConnectParams);
ampembeng 15:6f2798e45099 672 if(0 != rc) {
ampembeng 15:6f2798e45099 673 /* TLS Connect failed, return error */
ampembeng 15:6f2798e45099 674 ERROR("...TLS Connect failed, return error");
ampembeng 15:6f2798e45099 675 return FAILURE;
ampembeng 15:6f2798e45099 676 }
ampembeng 15:6f2798e45099 677
ampembeng 15:6f2798e45099 678 c->keepAliveInterval = c->options.keepAliveInterval;
ampembeng 15:6f2798e45099 679 rc = MQTTSerialize_connect(c->buf, c->bufSize, &(c->options), &len);
ampembeng 15:6f2798e45099 680 if(SUCCESS != rc || 0 >= len) {
ampembeng 15:6f2798e45099 681 ERROR("...MQTTSerialize_connect FAIL");
ampembeng 15:6f2798e45099 682 return FAILURE;
ampembeng 15:6f2798e45099 683 }
ampembeng 15:6f2798e45099 684
ampembeng 15:6f2798e45099 685 /* send the connect packet */
ampembeng 15:6f2798e45099 686 rc = sendPacket(c, len, &connect_timer);
ampembeng 15:6f2798e45099 687 if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 688 ERROR("...sendPacket FAIL");
ampembeng 15:6f2798e45099 689 return rc;
ampembeng 15:6f2798e45099 690 }
ampembeng 15:6f2798e45099 691
ampembeng 15:6f2798e45099 692 /* this will be a blocking call, wait for the CONNACK */
ampembeng 15:6f2798e45099 693 rc = waitfor(c, CONNACK, &connect_timer);
ampembeng 15:6f2798e45099 694 if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 695 ERROR("...waitfor FAIL");
ampembeng 15:6f2798e45099 696 return rc;
ampembeng 15:6f2798e45099 697 }
ampembeng 15:6f2798e45099 698
ampembeng 15:6f2798e45099 699 /* Received CONNACK, check the return code */
ampembeng 15:6f2798e45099 700 rc = MQTTDeserialize_connack((unsigned char *)&sessionPresent, &connack_rc, c->readbuf, c->readBufSize);
ampembeng 15:6f2798e45099 701 if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 702 ERROR("...MQTTDeserialize_connack FAIL");
ampembeng 15:6f2798e45099 703 return rc;
ampembeng 15:6f2798e45099 704 }
ampembeng 15:6f2798e45099 705
ampembeng 15:6f2798e45099 706 if(MQTT_CONNACK_CONNECTION_ACCEPTED != connack_rc) {
ampembeng 15:6f2798e45099 707 ERROR("...MQTT_CONNACK_CONNECTION_ACCEPTED FAIL");
ampembeng 15:6f2798e45099 708 return connack_rc;
ampembeng 15:6f2798e45099 709 }
ampembeng 15:6f2798e45099 710
ampembeng 15:6f2798e45099 711 DEBUG("...isConnected");
ampembeng 15:6f2798e45099 712 c->isConnected = 1;
ampembeng 15:6f2798e45099 713 c->wasManuallyDisconnected = 0;
ampembeng 15:6f2798e45099 714 c->isPingOutstanding = 0;
ampembeng 15:6f2798e45099 715 countdown(&c->pingTimer, c->keepAliveInterval);
ampembeng 15:6f2798e45099 716
ampembeng 15:6f2798e45099 717 return SUCCESS;
ampembeng 15:6f2798e45099 718 }
ampembeng 15:6f2798e45099 719
ampembeng 15:6f2798e45099 720 /* Return MAX_MESSAGE_HANDLERS value if no free index is available */
ampembeng 15:6f2798e45099 721 uint32_t GetFreeMessageHandlerIndex(Client *c) {
ampembeng 15:6f2798e45099 722 uint32_t itr;
ampembeng 15:6f2798e45099 723 for(itr = 0; itr < MAX_MESSAGE_HANDLERS; itr++) {
ampembeng 15:6f2798e45099 724 if(c->messageHandlers[itr].topicFilter == NULL) {
ampembeng 15:6f2798e45099 725 break;
ampembeng 15:6f2798e45099 726 }
ampembeng 15:6f2798e45099 727 }
ampembeng 15:6f2798e45099 728
ampembeng 15:6f2798e45099 729 return itr;
ampembeng 15:6f2798e45099 730 }
ampembeng 15:6f2798e45099 731
ampembeng 15:6f2798e45099 732 MQTTReturnCode MQTTSubscribe(Client *c, const char *topicFilter, QoS qos,
ampembeng 15:6f2798e45099 733 messageHandler messageHandler, pApplicationHandler_t applicationHandler) {
ampembeng 15:6f2798e45099 734 MQTTReturnCode rc = FAILURE;
ampembeng 15:6f2798e45099 735 Timer timer;
ampembeng 15:6f2798e45099 736 uint32_t len = 0;
ampembeng 15:6f2798e45099 737 uint32_t indexOfFreeMessageHandler;
ampembeng 15:6f2798e45099 738 uint32_t count = 0;
ampembeng 15:6f2798e45099 739 QoS grantedQoS[3] = {QOS0, QOS0, QOS0};
ampembeng 15:6f2798e45099 740 uint16_t packetId;
ampembeng 15:6f2798e45099 741 MQTTString topic = MQTTString_initializer;
ampembeng 15:6f2798e45099 742
ampembeng 15:6f2798e45099 743 if(NULL == c || NULL == topicFilter
ampembeng 15:6f2798e45099 744 || NULL == messageHandler || NULL == applicationHandler) {
ampembeng 15:6f2798e45099 745 ERROR("...MQTT_NULL_VALUE_ERROR FAIL");
ampembeng 15:6f2798e45099 746 return MQTT_NULL_VALUE_ERROR;
ampembeng 15:6f2798e45099 747 }
ampembeng 15:6f2798e45099 748
ampembeng 15:6f2798e45099 749 if(!c->isConnected) {
ampembeng 15:6f2798e45099 750 ERROR("...MQTT_NETWORK_DISCONNECTED_ERROR FAIL");
ampembeng 15:6f2798e45099 751 return MQTT_NETWORK_DISCONNECTED_ERROR;
ampembeng 15:6f2798e45099 752 }
ampembeng 15:6f2798e45099 753
ampembeng 15:6f2798e45099 754 topic.cstring = (char *)topicFilter;
ampembeng 15:6f2798e45099 755
ampembeng 15:6f2798e45099 756 InitTimer(&timer);
ampembeng 15:6f2798e45099 757 countdown_ms(&timer, c->commandTimeoutMs);
ampembeng 15:6f2798e45099 758
ampembeng 15:6f2798e45099 759 rc = MQTTSerialize_subscribe(c->buf, c->bufSize, 0, getNextPacketId(c), 1, &topic, &qos, &len);
ampembeng 15:6f2798e45099 760 if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 761 ERROR("...MQTTSerialize_subscribe FAIL");
ampembeng 15:6f2798e45099 762 return rc;
ampembeng 15:6f2798e45099 763 }
ampembeng 15:6f2798e45099 764
ampembeng 15:6f2798e45099 765 indexOfFreeMessageHandler = GetFreeMessageHandlerIndex(c);
ampembeng 15:6f2798e45099 766 if(MAX_MESSAGE_HANDLERS <= indexOfFreeMessageHandler) {
ampembeng 15:6f2798e45099 767 ERROR("...MQTT_MAX_SUBSCRIPTIONS_REACHED_ERROR FAIL");
ampembeng 15:6f2798e45099 768 return MQTT_MAX_SUBSCRIPTIONS_REACHED_ERROR;
ampembeng 15:6f2798e45099 769 }
ampembeng 15:6f2798e45099 770
ampembeng 15:6f2798e45099 771 /* send the subscribe packet */
ampembeng 15:6f2798e45099 772 rc = sendPacket(c, len, &timer);
ampembeng 15:6f2798e45099 773 if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 774 ERROR("...send the subscribe packet FAIL");
ampembeng 15:6f2798e45099 775 return rc;
ampembeng 15:6f2798e45099 776 }
ampembeng 15:6f2798e45099 777
ampembeng 15:6f2798e45099 778 /* wait for suback */
ampembeng 15:6f2798e45099 779 rc = waitfor(c, SUBACK, &timer);
ampembeng 15:6f2798e45099 780 if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 781 ERROR("...wait for suback FAIL");
ampembeng 15:6f2798e45099 782 return rc;
ampembeng 15:6f2798e45099 783 }
ampembeng 15:6f2798e45099 784
ampembeng 15:6f2798e45099 785 /* Granted QoS can be 0, 1 or 2 */
ampembeng 15:6f2798e45099 786 rc = MQTTDeserialize_suback(&packetId, 1, &count, grantedQoS, c->readbuf, c->readBufSize);
ampembeng 15:6f2798e45099 787 if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 788 ERROR("...Granted QoS can be 0, 1 or 2");
ampembeng 15:6f2798e45099 789 return rc;
ampembeng 15:6f2798e45099 790 }
ampembeng 15:6f2798e45099 791
ampembeng 15:6f2798e45099 792 c->messageHandlers[indexOfFreeMessageHandler].topicFilter =
ampembeng 15:6f2798e45099 793 topicFilter;
ampembeng 15:6f2798e45099 794 c->messageHandlers[indexOfFreeMessageHandler].fp = messageHandler;
ampembeng 15:6f2798e45099 795 c->messageHandlers[indexOfFreeMessageHandler].applicationHandler =
ampembeng 15:6f2798e45099 796 applicationHandler;
ampembeng 15:6f2798e45099 797 c->messageHandlers[indexOfFreeMessageHandler].qos = qos;
ampembeng 15:6f2798e45099 798
ampembeng 15:6f2798e45099 799 DEBUG("...MQTTSubscribe SUCCESS");
ampembeng 15:6f2798e45099 800 return SUCCESS;
ampembeng 15:6f2798e45099 801 }
ampembeng 15:6f2798e45099 802
ampembeng 15:6f2798e45099 803 MQTTReturnCode MQTTResubscribe(Client *c) {
ampembeng 15:6f2798e45099 804 MQTTReturnCode rc = FAILURE;
ampembeng 15:6f2798e45099 805 Timer timer;
ampembeng 15:6f2798e45099 806 uint32_t len = 0;
ampembeng 15:6f2798e45099 807 uint32_t count = 0;
ampembeng 15:6f2798e45099 808 QoS grantedQoS[3] = {QOS0, QOS0, QOS0};
ampembeng 15:6f2798e45099 809 uint16_t packetId;
ampembeng 15:6f2798e45099 810 uint32_t existingSubCount = 0;
ampembeng 15:6f2798e45099 811 uint32_t itr = 0;
ampembeng 15:6f2798e45099 812
ampembeng 15:6f2798e45099 813 if(NULL == c) {
ampembeng 15:6f2798e45099 814 return MQTT_NULL_VALUE_ERROR;
ampembeng 15:6f2798e45099 815 }
ampembeng 15:6f2798e45099 816
ampembeng 15:6f2798e45099 817 if(!c->isConnected) {
ampembeng 15:6f2798e45099 818 return MQTT_NETWORK_DISCONNECTED_ERROR;
ampembeng 15:6f2798e45099 819 }
ampembeng 15:6f2798e45099 820
ampembeng 15:6f2798e45099 821 existingSubCount = GetFreeMessageHandlerIndex(c);
ampembeng 15:6f2798e45099 822
ampembeng 15:6f2798e45099 823 for(itr = 0; itr < existingSubCount; itr++) {
ampembeng 15:6f2798e45099 824 MQTTString topic = MQTTString_initializer;
ampembeng 15:6f2798e45099 825 topic.cstring = (char *)c->messageHandlers[itr].topicFilter;
ampembeng 15:6f2798e45099 826
ampembeng 15:6f2798e45099 827 InitTimer(&timer);
ampembeng 15:6f2798e45099 828 countdown_ms(&timer, c->commandTimeoutMs);
ampembeng 15:6f2798e45099 829
ampembeng 15:6f2798e45099 830 rc = MQTTSerialize_subscribe(c->buf, c->bufSize, 0, getNextPacketId(c), 1,
ampembeng 15:6f2798e45099 831 &topic, &(c->messageHandlers[itr].qos), &len);
ampembeng 15:6f2798e45099 832 if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 833 return rc;
ampembeng 15:6f2798e45099 834 }
ampembeng 15:6f2798e45099 835
ampembeng 15:6f2798e45099 836 /* send the subscribe packet */
ampembeng 15:6f2798e45099 837 rc = sendPacket(c, len, &timer);
ampembeng 15:6f2798e45099 838 if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 839 return rc;
ampembeng 15:6f2798e45099 840 }
ampembeng 15:6f2798e45099 841
ampembeng 15:6f2798e45099 842 /* wait for suback */
ampembeng 15:6f2798e45099 843 rc = waitfor(c, SUBACK, &timer);
ampembeng 15:6f2798e45099 844 if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 845 return rc;
ampembeng 15:6f2798e45099 846 }
ampembeng 15:6f2798e45099 847
ampembeng 15:6f2798e45099 848 /* Granted QoS can be 0, 1 or 2 */
ampembeng 15:6f2798e45099 849 rc = MQTTDeserialize_suback(&packetId, 1, &count, grantedQoS, c->readbuf, c->readBufSize);
ampembeng 15:6f2798e45099 850 if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 851 return rc;
ampembeng 15:6f2798e45099 852 }
ampembeng 15:6f2798e45099 853 }
ampembeng 15:6f2798e45099 854
ampembeng 15:6f2798e45099 855 return SUCCESS;
ampembeng 15:6f2798e45099 856 }
ampembeng 15:6f2798e45099 857
ampembeng 15:6f2798e45099 858 MQTTReturnCode MQTTUnsubscribe(Client *c, const char *topicFilter) {
ampembeng 15:6f2798e45099 859 MQTTReturnCode rc = FAILURE;
ampembeng 15:6f2798e45099 860 Timer timer;
ampembeng 15:6f2798e45099 861 MQTTString topic = MQTTString_initializer;
ampembeng 15:6f2798e45099 862 uint32_t len = 0;
ampembeng 15:6f2798e45099 863 uint32_t i = 0;
ampembeng 15:6f2798e45099 864 uint16_t packet_id;
ampembeng 15:6f2798e45099 865
ampembeng 15:6f2798e45099 866 if(NULL == c || NULL == topicFilter) {
ampembeng 15:6f2798e45099 867 return MQTT_NULL_VALUE_ERROR;
ampembeng 15:6f2798e45099 868 }
ampembeng 15:6f2798e45099 869
ampembeng 15:6f2798e45099 870 topic.cstring = (char *)topicFilter;
ampembeng 15:6f2798e45099 871
ampembeng 15:6f2798e45099 872 if(!c->isConnected) {
ampembeng 15:6f2798e45099 873 return MQTT_NETWORK_DISCONNECTED_ERROR;
ampembeng 15:6f2798e45099 874 }
ampembeng 15:6f2798e45099 875
ampembeng 15:6f2798e45099 876 InitTimer(&timer);
ampembeng 15:6f2798e45099 877 countdown_ms(&timer, c->commandTimeoutMs);
ampembeng 15:6f2798e45099 878
ampembeng 15:6f2798e45099 879 rc = MQTTSerialize_unsubscribe(c->buf, c->bufSize, 0, getNextPacketId(c), 1, &topic, &len);
ampembeng 15:6f2798e45099 880 if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 881 return rc;
ampembeng 15:6f2798e45099 882 }
ampembeng 15:6f2798e45099 883
ampembeng 15:6f2798e45099 884 /* send the unsubscribe packet */
ampembeng 15:6f2798e45099 885 rc = sendPacket(c, len, &timer);
ampembeng 15:6f2798e45099 886 if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 887 return rc;
ampembeng 15:6f2798e45099 888 }
ampembeng 15:6f2798e45099 889
ampembeng 15:6f2798e45099 890 rc = waitfor(c, UNSUBACK, &timer);
ampembeng 15:6f2798e45099 891 if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 892 return rc;
ampembeng 15:6f2798e45099 893 }
ampembeng 15:6f2798e45099 894
ampembeng 15:6f2798e45099 895 rc = MQTTDeserialize_unsuback(&packet_id, c->readbuf, c->readBufSize);
ampembeng 15:6f2798e45099 896 if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 897 return rc;
ampembeng 15:6f2798e45099 898 }
ampembeng 15:6f2798e45099 899
ampembeng 15:6f2798e45099 900 /* Remove from message handler array */
ampembeng 15:6f2798e45099 901 for(i = 0; i < MAX_MESSAGE_HANDLERS; ++i) {
ampembeng 15:6f2798e45099 902 if(c->messageHandlers[i].topicFilter != NULL &&
ampembeng 15:6f2798e45099 903 (strcmp(c->messageHandlers[i].topicFilter, topicFilter) == 0)) {
ampembeng 15:6f2798e45099 904 c->messageHandlers[i].topicFilter = NULL;
ampembeng 15:6f2798e45099 905 /* We don't want to break here, if the same topic is registered
ampembeng 15:6f2798e45099 906 * with 2 callbacks. Unlikely scenario */
ampembeng 15:6f2798e45099 907 }
ampembeng 15:6f2798e45099 908 }
ampembeng 15:6f2798e45099 909
ampembeng 15:6f2798e45099 910 return SUCCESS;
ampembeng 15:6f2798e45099 911 }
ampembeng 15:6f2798e45099 912
ampembeng 15:6f2798e45099 913 MQTTReturnCode MQTTPublish(Client *c, const char *topicName, MQTTMessage *message) {
ampembeng 15:6f2798e45099 914 Timer timer;
ampembeng 15:6f2798e45099 915 MQTTString topic = MQTTString_initializer;
ampembeng 15:6f2798e45099 916 uint32_t len = 0;
ampembeng 15:6f2798e45099 917 uint8_t waitForAck = 0;
ampembeng 15:6f2798e45099 918 uint8_t packetType = PUBACK;
ampembeng 15:6f2798e45099 919 uint16_t packet_id;
ampembeng 15:6f2798e45099 920 unsigned char dup, type;
ampembeng 15:6f2798e45099 921 MQTTReturnCode rc = FAILURE;
ampembeng 15:6f2798e45099 922
ampembeng 15:6f2798e45099 923 if(NULL == c || NULL == topicName || NULL == message) {
ampembeng 15:6f2798e45099 924 return MQTT_NULL_VALUE_ERROR;
ampembeng 15:6f2798e45099 925 }
ampembeng 15:6f2798e45099 926
ampembeng 15:6f2798e45099 927 topic.cstring = (char *)topicName;
ampembeng 15:6f2798e45099 928
ampembeng 15:6f2798e45099 929 if(!c->isConnected) {
ampembeng 15:6f2798e45099 930 return MQTT_NETWORK_DISCONNECTED_ERROR;
ampembeng 15:6f2798e45099 931 }
ampembeng 15:6f2798e45099 932
ampembeng 15:6f2798e45099 933 InitTimer(&timer);
ampembeng 15:6f2798e45099 934 countdown_ms(&timer, c->commandTimeoutMs);
ampembeng 15:6f2798e45099 935
ampembeng 15:6f2798e45099 936 if(QOS1 == message->qos || QOS2 == message->qos) {
ampembeng 15:6f2798e45099 937 message->id = getNextPacketId(c);
ampembeng 15:6f2798e45099 938 waitForAck = 1;
ampembeng 15:6f2798e45099 939 if(QOS2 == message->qos) {
ampembeng 15:6f2798e45099 940 packetType = PUBCOMP;
ampembeng 15:6f2798e45099 941 }
ampembeng 15:6f2798e45099 942 }
ampembeng 15:6f2798e45099 943
ampembeng 15:6f2798e45099 944 rc = MQTTSerialize_publish(c->buf, c->bufSize, 0, message->qos, message->retained, message->id,
ampembeng 15:6f2798e45099 945 topic, (unsigned char*)message->payload, message->payloadlen, &len);
ampembeng 15:6f2798e45099 946 if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 947 return rc;
ampembeng 15:6f2798e45099 948 }
ampembeng 15:6f2798e45099 949
ampembeng 15:6f2798e45099 950 /* send the publish packet */
ampembeng 15:6f2798e45099 951 rc = sendPacket(c, len, &timer);
ampembeng 15:6f2798e45099 952 if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 953 return rc;
ampembeng 15:6f2798e45099 954 }
ampembeng 15:6f2798e45099 955
ampembeng 15:6f2798e45099 956 /* Wait for ack if QoS1 or QoS2 */
ampembeng 15:6f2798e45099 957 if(1 == waitForAck) {
ampembeng 15:6f2798e45099 958 rc = waitfor(c, packetType, &timer);
ampembeng 15:6f2798e45099 959 if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 960 return rc;
ampembeng 15:6f2798e45099 961 }
ampembeng 15:6f2798e45099 962
ampembeng 15:6f2798e45099 963 rc = MQTTDeserialize_ack(&type, &dup, &packet_id, c->readbuf, c->readBufSize);
ampembeng 15:6f2798e45099 964 if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 965 return rc;
ampembeng 15:6f2798e45099 966 }
ampembeng 15:6f2798e45099 967 }
ampembeng 15:6f2798e45099 968
ampembeng 15:6f2798e45099 969 return SUCCESS;
ampembeng 15:6f2798e45099 970 }
ampembeng 15:6f2798e45099 971 /**
ampembeng 15:6f2798e45099 972 * This is for the case when the sendPacket Fails.
ampembeng 15:6f2798e45099 973 */
ampembeng 15:6f2798e45099 974 static void MQTTForceDisconnect(Client *c){
ampembeng 15:6f2798e45099 975 c->isConnected = 0;
ampembeng 15:6f2798e45099 976 c->networkStack.disconnect(&(c->networkStack));
ampembeng 15:6f2798e45099 977 c->networkStack.destroy(&(c->networkStack));
ampembeng 15:6f2798e45099 978 }
ampembeng 15:6f2798e45099 979
ampembeng 15:6f2798e45099 980 MQTTReturnCode MQTTDisconnect(Client *c) {
ampembeng 15:6f2798e45099 981 MQTTReturnCode rc = FAILURE;
ampembeng 15:6f2798e45099 982 /* We might wait for incomplete incoming publishes to complete */
ampembeng 15:6f2798e45099 983 Timer timer;
ampembeng 15:6f2798e45099 984 uint32_t serialized_len = 0;
ampembeng 15:6f2798e45099 985
ampembeng 15:6f2798e45099 986 if(NULL == c) {
ampembeng 15:6f2798e45099 987 return MQTT_NULL_VALUE_ERROR;
ampembeng 15:6f2798e45099 988 }
ampembeng 15:6f2798e45099 989
ampembeng 15:6f2798e45099 990 if(0 == c->isConnected) {
ampembeng 15:6f2798e45099 991 /* Network is already disconnected. Do nothing */
ampembeng 15:6f2798e45099 992 return MQTT_NETWORK_DISCONNECTED_ERROR;
ampembeng 15:6f2798e45099 993 }
ampembeng 15:6f2798e45099 994
ampembeng 15:6f2798e45099 995 rc = MQTTSerialize_disconnect(c->buf, c->bufSize, &serialized_len);
ampembeng 15:6f2798e45099 996 if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 997 return rc;
ampembeng 15:6f2798e45099 998 }
ampembeng 15:6f2798e45099 999
ampembeng 15:6f2798e45099 1000 InitTimer(&timer);
ampembeng 15:6f2798e45099 1001 countdown_ms(&timer, c->commandTimeoutMs);
ampembeng 15:6f2798e45099 1002
ampembeng 15:6f2798e45099 1003 /* send the disconnect packet */
ampembeng 15:6f2798e45099 1004 if(serialized_len > 0) {
ampembeng 15:6f2798e45099 1005 rc = sendPacket(c, serialized_len, &timer);
ampembeng 15:6f2798e45099 1006 if(SUCCESS != rc) {
ampembeng 15:6f2798e45099 1007 return rc;
ampembeng 15:6f2798e45099 1008 }
ampembeng 15:6f2798e45099 1009 }
ampembeng 15:6f2798e45099 1010
ampembeng 15:6f2798e45099 1011 /* Clean network stack */
ampembeng 15:6f2798e45099 1012 c->networkStack.disconnect(&(c->networkStack));
ampembeng 15:6f2798e45099 1013 rc = (MQTTReturnCode)c->networkStack.destroy(&(c->networkStack));
ampembeng 15:6f2798e45099 1014 if(0 != rc) {
ampembeng 15:6f2798e45099 1015 /* TLS Destroy failed, return error */
ampembeng 15:6f2798e45099 1016 return FAILURE;
ampembeng 15:6f2798e45099 1017 }
ampembeng 15:6f2798e45099 1018
ampembeng 15:6f2798e45099 1019 c->isConnected = 0;
ampembeng 15:6f2798e45099 1020
ampembeng 15:6f2798e45099 1021 /* Always set to 1 whenever disconnect is called. Keepalive resets to 0 */
ampembeng 15:6f2798e45099 1022 c->wasManuallyDisconnected = 1;
ampembeng 15:6f2798e45099 1023
ampembeng 15:6f2798e45099 1024 return SUCCESS;
ampembeng 15:6f2798e45099 1025 }
ampembeng 15:6f2798e45099 1026
ampembeng 15:6f2798e45099 1027 uint8_t MQTTIsConnected(Client *c) {
ampembeng 15:6f2798e45099 1028 if(NULL == c) {
ampembeng 15:6f2798e45099 1029 return 0;
ampembeng 15:6f2798e45099 1030 }
ampembeng 15:6f2798e45099 1031
ampembeng 15:6f2798e45099 1032 return c->isConnected;
ampembeng 15:6f2798e45099 1033 }
ampembeng 15:6f2798e45099 1034
ampembeng 15:6f2798e45099 1035 uint8_t MQTTIsAutoReconnectEnabled(Client *c) {
ampembeng 15:6f2798e45099 1036 if(NULL == c) {
ampembeng 15:6f2798e45099 1037 return 0;
ampembeng 15:6f2798e45099 1038 }
ampembeng 15:6f2798e45099 1039
ampembeng 15:6f2798e45099 1040 return c->isAutoReconnectEnabled;
ampembeng 15:6f2798e45099 1041 }
ampembeng 15:6f2798e45099 1042
ampembeng 15:6f2798e45099 1043 MQTTReturnCode setDisconnectHandler(Client *c, disconnectHandler_t disconnectHandler) {
ampembeng 15:6f2798e45099 1044 if(NULL == c || NULL == disconnectHandler) {
ampembeng 15:6f2798e45099 1045 return MQTT_NULL_VALUE_ERROR;
ampembeng 15:6f2798e45099 1046 }
ampembeng 15:6f2798e45099 1047
ampembeng 15:6f2798e45099 1048 c->disconnectHandler = disconnectHandler;
ampembeng 15:6f2798e45099 1049 return SUCCESS;
ampembeng 15:6f2798e45099 1050 }
ampembeng 15:6f2798e45099 1051
ampembeng 15:6f2798e45099 1052 MQTTReturnCode setAutoReconnectEnabled(Client *c, uint8_t value) {
ampembeng 15:6f2798e45099 1053 if(NULL == c) {
ampembeng 15:6f2798e45099 1054 return FAILURE;
ampembeng 15:6f2798e45099 1055 }
ampembeng 15:6f2798e45099 1056 c->isAutoReconnectEnabled = value;
ampembeng 15:6f2798e45099 1057 return SUCCESS;
ampembeng 15:6f2798e45099 1058 }
ampembeng 15:6f2798e45099 1059
ampembeng 15:6f2798e45099 1060 uint32_t MQTTGetNetworkDisconnectedCount(Client *c) {
ampembeng 15:6f2798e45099 1061 return c->counterNetworkDisconnected;
ampembeng 15:6f2798e45099 1062 }
ampembeng 15:6f2798e45099 1063
ampembeng 15:6f2798e45099 1064 void MQTTResetNetworkDisconnectedCount(Client *c) {
ampembeng 15:6f2798e45099 1065 c->counterNetworkDisconnected = 0;
ampembeng 15:6f2798e45099 1066 }
ampembeng 15:6f2798e45099 1067
ampembeng 15:6f2798e45099 1068