DIYmall 0.96" Inch I2c IIC Serial 128x64 Oled LCD LED White Display Module
Dependencies: Adafruit_GFX SDFileSystem
Fork of ATT_AWS_IoT_demo by
AWS_openssl/aws_mqtt_embedded_client_lib/MQTTClient_C/src/MQTTClient.cpp@15:6f2798e45099, 2016-12-01 (annotated)
- Committer:
- ampembeng
- Date:
- Thu Dec 01 18:05:38 2016 +0000
- Revision:
- 15:6f2798e45099
Initial commit. Demo works with both the FRDM wired Ethernet and the Avnet Shield wireless modem.
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
ampembeng | 15:6f2798e45099 | 1 | /******************************************************************************* |
ampembeng | 15:6f2798e45099 | 2 | * Copyright (c) 2014 IBM Corp. |
ampembeng | 15:6f2798e45099 | 3 | * |
ampembeng | 15:6f2798e45099 | 4 | * All rights reserved. This program and the accompanying materials |
ampembeng | 15:6f2798e45099 | 5 | * are made available under the terms of the Eclipse Public License v1.0 |
ampembeng | 15:6f2798e45099 | 6 | * and Eclipse Distribution License v1.0 which accompany this distribution. |
ampembeng | 15:6f2798e45099 | 7 | * |
ampembeng | 15:6f2798e45099 | 8 | * The Eclipse Public License is available at |
ampembeng | 15:6f2798e45099 | 9 | * http://www.eclipse.org/legal/epl-v10.html |
ampembeng | 15:6f2798e45099 | 10 | * and the Eclipse Distribution License is available at |
ampembeng | 15:6f2798e45099 | 11 | * http://www.eclipse.org/org/documents/edl-v10.php. |
ampembeng | 15:6f2798e45099 | 12 | * |
ampembeng | 15:6f2798e45099 | 13 | * Contributors: |
ampembeng | 15:6f2798e45099 | 14 | * Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation |
ampembeng | 15:6f2798e45099 | 15 | *******************************************************************************/ |
ampembeng | 15:6f2798e45099 | 16 | |
ampembeng | 15:6f2798e45099 | 17 | #include "MQTTClient.h" |
ampembeng | 15:6f2798e45099 | 18 | #include <string.h> |
ampembeng | 15:6f2798e45099 | 19 | #include "aws_iot_log.h" |
ampembeng | 15:6f2798e45099 | 20 | |
ampembeng | 15:6f2798e45099 | 21 | static void MQTTForceDisconnect(Client *c); |
ampembeng | 15:6f2798e45099 | 22 | |
ampembeng | 15:6f2798e45099 | 23 | void NewMessageData(MessageData *md, MQTTString *aTopicName, MQTTMessage *aMessage, pApplicationHandler_t applicationHandler) { |
ampembeng | 15:6f2798e45099 | 24 | md->topicName = aTopicName; |
ampembeng | 15:6f2798e45099 | 25 | md->message = aMessage; |
ampembeng | 15:6f2798e45099 | 26 | md->applicationHandler = applicationHandler; |
ampembeng | 15:6f2798e45099 | 27 | } |
ampembeng | 15:6f2798e45099 | 28 | |
ampembeng | 15:6f2798e45099 | 29 | uint16_t getNextPacketId(Client *c) { |
ampembeng | 15:6f2798e45099 | 30 | return c->nextPacketId = (uint16_t)((MAX_PACKET_ID == c->nextPacketId) ? 1 : (c->nextPacketId + 1)); |
ampembeng | 15:6f2798e45099 | 31 | } |
ampembeng | 15:6f2798e45099 | 32 | |
ampembeng | 15:6f2798e45099 | 33 | MQTTReturnCode sendPacket(Client *c, uint32_t length, Timer *timer) { |
ampembeng | 15:6f2798e45099 | 34 | int32_t sentLen = 0; |
ampembeng | 15:6f2798e45099 | 35 | uint32_t sent = 0, i; |
ampembeng | 15:6f2798e45099 | 36 | |
ampembeng | 15:6f2798e45099 | 37 | if(NULL == c || NULL == timer) { |
ampembeng | 15:6f2798e45099 | 38 | ERROR("...MQTT_NULL_VALUE_ERROR"); |
ampembeng | 15:6f2798e45099 | 39 | return MQTT_NULL_VALUE_ERROR; |
ampembeng | 15:6f2798e45099 | 40 | } |
ampembeng | 15:6f2798e45099 | 41 | |
ampembeng | 15:6f2798e45099 | 42 | if(length >= c->bufSize) { |
ampembeng | 15:6f2798e45099 | 43 | ERROR("...MQTTPACKET_BUFFER_TOO_SHORT"); |
ampembeng | 15:6f2798e45099 | 44 | return MQTTPACKET_BUFFER_TOO_SHORT; |
ampembeng | 15:6f2798e45099 | 45 | } |
ampembeng | 15:6f2798e45099 | 46 | |
ampembeng | 15:6f2798e45099 | 47 | while(sent < length && !expired(timer)) { |
ampembeng | 15:6f2798e45099 | 48 | sentLen = c->networkStack.mqttwrite(&(c->networkStack), &c->buf[sent], (int)length, left_ms(timer)); |
ampembeng | 15:6f2798e45099 | 49 | if(sentLen < 0) { |
ampembeng | 15:6f2798e45099 | 50 | /* there was an error writing the data */ |
ampembeng | 15:6f2798e45099 | 51 | ERROR("...there was an error writing the data"); |
ampembeng | 15:6f2798e45099 | 52 | break; |
ampembeng | 15:6f2798e45099 | 53 | } |
ampembeng | 15:6f2798e45099 | 54 | sent = sent + (uint32_t)sentLen; |
ampembeng | 15:6f2798e45099 | 55 | } |
ampembeng | 15:6f2798e45099 | 56 | |
ampembeng | 15:6f2798e45099 | 57 | if(sent == length) { |
ampembeng | 15:6f2798e45099 | 58 | /* record the fact that we have successfully sent the packet */ |
ampembeng | 15:6f2798e45099 | 59 | //countdown(&c->pingTimer, c->keepAliveInterval); |
ampembeng | 15:6f2798e45099 | 60 | return SUCCESS; |
ampembeng | 15:6f2798e45099 | 61 | } |
ampembeng | 15:6f2798e45099 | 62 | |
ampembeng | 15:6f2798e45099 | 63 | ERROR("...sendPacket FAILURE, sent = %d, length = %d", sent, length); |
ampembeng | 15:6f2798e45099 | 64 | return FAILURE; |
ampembeng | 15:6f2798e45099 | 65 | } |
ampembeng | 15:6f2798e45099 | 66 | |
ampembeng | 15:6f2798e45099 | 67 | void copyMQTTConnectData(MQTTPacket_connectData *destination, MQTTPacket_connectData *source) { |
ampembeng | 15:6f2798e45099 | 68 | if(NULL == destination || NULL == source) { |
ampembeng | 15:6f2798e45099 | 69 | return; |
ampembeng | 15:6f2798e45099 | 70 | } |
ampembeng | 15:6f2798e45099 | 71 | destination->willFlag = source->willFlag; |
ampembeng | 15:6f2798e45099 | 72 | destination->MQTTVersion = source->MQTTVersion; |
ampembeng | 15:6f2798e45099 | 73 | destination->clientID.cstring = source->clientID.cstring; |
ampembeng | 15:6f2798e45099 | 74 | destination->username.cstring = source->username.cstring; |
ampembeng | 15:6f2798e45099 | 75 | destination->password.cstring = source->password.cstring; |
ampembeng | 15:6f2798e45099 | 76 | destination->will.topicName.cstring = source->will.topicName.cstring; |
ampembeng | 15:6f2798e45099 | 77 | destination->will.message.cstring = source->will.message.cstring; |
ampembeng | 15:6f2798e45099 | 78 | destination->will.qos = source->will.qos; |
ampembeng | 15:6f2798e45099 | 79 | destination->will.retained = source->will.retained; |
ampembeng | 15:6f2798e45099 | 80 | destination->keepAliveInterval = source->keepAliveInterval; |
ampembeng | 15:6f2798e45099 | 81 | destination->cleansession = source->cleansession; |
ampembeng | 15:6f2798e45099 | 82 | } |
ampembeng | 15:6f2798e45099 | 83 | |
ampembeng | 15:6f2798e45099 | 84 | MQTTReturnCode MQTTClient(Client *c, uint32_t commandTimeoutMs, |
ampembeng | 15:6f2798e45099 | 85 | unsigned char *buf, size_t bufSize, unsigned char *readbuf, |
ampembeng | 15:6f2798e45099 | 86 | size_t readBufSize, uint8_t enableAutoReconnect, |
ampembeng | 15:6f2798e45099 | 87 | networkInitHandler_t networkInitHandler, |
ampembeng | 15:6f2798e45099 | 88 | TLSConnectParams *tlsConnectParams) { |
ampembeng | 15:6f2798e45099 | 89 | uint32_t i; |
ampembeng | 15:6f2798e45099 | 90 | MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; |
ampembeng | 15:6f2798e45099 | 91 | |
ampembeng | 15:6f2798e45099 | 92 | if(NULL == c || NULL == tlsConnectParams || NULL == buf || NULL == readbuf |
ampembeng | 15:6f2798e45099 | 93 | || NULL == networkInitHandler) { |
ampembeng | 15:6f2798e45099 | 94 | return MQTT_NULL_VALUE_ERROR; |
ampembeng | 15:6f2798e45099 | 95 | } |
ampembeng | 15:6f2798e45099 | 96 | |
ampembeng | 15:6f2798e45099 | 97 | for(i = 0; i < MAX_MESSAGE_HANDLERS; ++i) { |
ampembeng | 15:6f2798e45099 | 98 | c->messageHandlers[i].topicFilter = NULL; |
ampembeng | 15:6f2798e45099 | 99 | c->messageHandlers[i].fp = NULL; |
ampembeng | 15:6f2798e45099 | 100 | c->messageHandlers[i].applicationHandler = NULL; |
ampembeng | 15:6f2798e45099 | 101 | c->messageHandlers[i].qos = (QoS)0; |
ampembeng | 15:6f2798e45099 | 102 | } |
ampembeng | 15:6f2798e45099 | 103 | |
ampembeng | 15:6f2798e45099 | 104 | c->commandTimeoutMs = commandTimeoutMs; |
ampembeng | 15:6f2798e45099 | 105 | c->buf = buf; |
ampembeng | 15:6f2798e45099 | 106 | c->bufSize = bufSize; |
ampembeng | 15:6f2798e45099 | 107 | c->readbuf = readbuf; |
ampembeng | 15:6f2798e45099 | 108 | c->readBufSize = readBufSize; |
ampembeng | 15:6f2798e45099 | 109 | c->isConnected = 0; |
ampembeng | 15:6f2798e45099 | 110 | c->isPingOutstanding = 0; |
ampembeng | 15:6f2798e45099 | 111 | c->wasManuallyDisconnected = 0; |
ampembeng | 15:6f2798e45099 | 112 | c->counterNetworkDisconnected = 0; |
ampembeng | 15:6f2798e45099 | 113 | c->isAutoReconnectEnabled = enableAutoReconnect; |
ampembeng | 15:6f2798e45099 | 114 | c->defaultMessageHandler = NULL; |
ampembeng | 15:6f2798e45099 | 115 | c->disconnectHandler = NULL; |
ampembeng | 15:6f2798e45099 | 116 | copyMQTTConnectData(&(c->options), &default_options); |
ampembeng | 15:6f2798e45099 | 117 | |
ampembeng | 15:6f2798e45099 | 118 | c->networkInitHandler = networkInitHandler; |
ampembeng | 15:6f2798e45099 | 119 | c->tlsConnectParams.DestinationPort = tlsConnectParams->DestinationPort; |
ampembeng | 15:6f2798e45099 | 120 | c->tlsConnectParams.pDestinationURL = tlsConnectParams->pDestinationURL; |
ampembeng | 15:6f2798e45099 | 121 | c->tlsConnectParams.pDeviceCertLocation = tlsConnectParams->pDeviceCertLocation; |
ampembeng | 15:6f2798e45099 | 122 | c->tlsConnectParams.pDevicePrivateKeyLocation = tlsConnectParams->pDevicePrivateKeyLocation; |
ampembeng | 15:6f2798e45099 | 123 | c->tlsConnectParams.pRootCALocation = tlsConnectParams->pRootCALocation; |
ampembeng | 15:6f2798e45099 | 124 | c->tlsConnectParams.timeout_ms = tlsConnectParams->timeout_ms; |
ampembeng | 15:6f2798e45099 | 125 | c->tlsConnectParams.ServerVerificationFlag = tlsConnectParams->ServerVerificationFlag; |
ampembeng | 15:6f2798e45099 | 126 | |
ampembeng | 15:6f2798e45099 | 127 | InitTimer(&(c->pingTimer)); |
ampembeng | 15:6f2798e45099 | 128 | InitTimer(&(c->reconnectDelayTimer)); |
ampembeng | 15:6f2798e45099 | 129 | |
ampembeng | 15:6f2798e45099 | 130 | return SUCCESS; |
ampembeng | 15:6f2798e45099 | 131 | } |
ampembeng | 15:6f2798e45099 | 132 | |
ampembeng | 15:6f2798e45099 | 133 | MQTTReturnCode decodePacket(Client *c, uint32_t *value, uint32_t timeout) { |
ampembeng | 15:6f2798e45099 | 134 | unsigned char i; |
ampembeng | 15:6f2798e45099 | 135 | uint32_t multiplier = 1; |
ampembeng | 15:6f2798e45099 | 136 | uint32_t len = 0; |
ampembeng | 15:6f2798e45099 | 137 | const uint32_t MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; |
ampembeng | 15:6f2798e45099 | 138 | |
ampembeng | 15:6f2798e45099 | 139 | if(NULL == c || NULL == value) { |
ampembeng | 15:6f2798e45099 | 140 | return MQTT_NULL_VALUE_ERROR; |
ampembeng | 15:6f2798e45099 | 141 | } |
ampembeng | 15:6f2798e45099 | 142 | |
ampembeng | 15:6f2798e45099 | 143 | *value = 0; |
ampembeng | 15:6f2798e45099 | 144 | |
ampembeng | 15:6f2798e45099 | 145 | do { |
ampembeng | 15:6f2798e45099 | 146 | if(++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) { |
ampembeng | 15:6f2798e45099 | 147 | /* bad data */ |
ampembeng | 15:6f2798e45099 | 148 | return MQTTPACKET_READ_ERROR; |
ampembeng | 15:6f2798e45099 | 149 | } |
ampembeng | 15:6f2798e45099 | 150 | |
ampembeng | 15:6f2798e45099 | 151 | if((c->networkStack.mqttread(&(c->networkStack), &i, 1, (int)timeout)) != 1) { |
ampembeng | 15:6f2798e45099 | 152 | /* The value argument is the important value. len is just used temporarily |
ampembeng | 15:6f2798e45099 | 153 | * and never used by the calling function for anything else */ |
ampembeng | 15:6f2798e45099 | 154 | return FAILURE; |
ampembeng | 15:6f2798e45099 | 155 | } |
ampembeng | 15:6f2798e45099 | 156 | |
ampembeng | 15:6f2798e45099 | 157 | *value += ((i & 127) * multiplier); |
ampembeng | 15:6f2798e45099 | 158 | multiplier *= 128; |
ampembeng | 15:6f2798e45099 | 159 | }while((i & 128) != 0); |
ampembeng | 15:6f2798e45099 | 160 | |
ampembeng | 15:6f2798e45099 | 161 | /* The value argument is the important value. len is just used temporarily |
ampembeng | 15:6f2798e45099 | 162 | * and never used by the calling function for anything else */ |
ampembeng | 15:6f2798e45099 | 163 | return SUCCESS; |
ampembeng | 15:6f2798e45099 | 164 | } |
ampembeng | 15:6f2798e45099 | 165 | |
ampembeng | 15:6f2798e45099 | 166 | MQTTReturnCode readPacket(Client *c, Timer *timer, uint8_t *packet_type) { |
ampembeng | 15:6f2798e45099 | 167 | MQTTHeader header = {0}; |
ampembeng | 15:6f2798e45099 | 168 | uint32_t len = 0; |
ampembeng | 15:6f2798e45099 | 169 | uint32_t rem_len = 0; |
ampembeng | 15:6f2798e45099 | 170 | uint32_t total_bytes_read = 0; |
ampembeng | 15:6f2798e45099 | 171 | uint32_t bytes_to_be_read = 0; |
ampembeng | 15:6f2798e45099 | 172 | int32_t ret_val = 0; |
ampembeng | 15:6f2798e45099 | 173 | MQTTReturnCode rc; |
ampembeng | 15:6f2798e45099 | 174 | |
ampembeng | 15:6f2798e45099 | 175 | if(NULL == c || NULL == timer) { |
ampembeng | 15:6f2798e45099 | 176 | ERROR("readPacket() MQTT_NULL_VALUE_ERROR"); |
ampembeng | 15:6f2798e45099 | 177 | return MQTT_NULL_VALUE_ERROR; |
ampembeng | 15:6f2798e45099 | 178 | } |
ampembeng | 15:6f2798e45099 | 179 | |
ampembeng | 15:6f2798e45099 | 180 | /* 1. read the header byte. This has the packet type in it */ |
ampembeng | 15:6f2798e45099 | 181 | if(1 != c->networkStack.mqttread(&(c->networkStack), c->readbuf, 1, left_ms(timer))) { |
ampembeng | 15:6f2798e45099 | 182 | /* If a network disconnect has occurred it would have been caught by keepalive already. |
ampembeng | 15:6f2798e45099 | 183 | * If nothing is found at this point means there was nothing to read. Not 100% correct, |
ampembeng | 15:6f2798e45099 | 184 | * but the only way to be sure is to pass proper error codes from the network stack |
ampembeng | 15:6f2798e45099 | 185 | * which the mbedtls/openssl implementations do not return */ |
ampembeng | 15:6f2798e45099 | 186 | return MQTT_NOTHING_TO_READ; |
ampembeng | 15:6f2798e45099 | 187 | } |
ampembeng | 15:6f2798e45099 | 188 | |
ampembeng | 15:6f2798e45099 | 189 | len = 1; |
ampembeng | 15:6f2798e45099 | 190 | /* 2. read the remaining length. This is variable in itself */ |
ampembeng | 15:6f2798e45099 | 191 | rc = decodePacket(c, &rem_len, (uint32_t)left_ms(timer)); |
ampembeng | 15:6f2798e45099 | 192 | if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 193 | ERROR("readPacket() SUCCESS != rc"); |
ampembeng | 15:6f2798e45099 | 194 | return rc; |
ampembeng | 15:6f2798e45099 | 195 | } |
ampembeng | 15:6f2798e45099 | 196 | |
ampembeng | 15:6f2798e45099 | 197 | /* if the buffer is too short then the message will be dropped silently */ |
ampembeng | 15:6f2798e45099 | 198 | if (rem_len >= c->readBufSize) { |
ampembeng | 15:6f2798e45099 | 199 | bytes_to_be_read = c->readBufSize; |
ampembeng | 15:6f2798e45099 | 200 | do { |
ampembeng | 15:6f2798e45099 | 201 | ret_val = c->networkStack.mqttread(&(c->networkStack), c->readbuf, bytes_to_be_read, left_ms(timer)); |
ampembeng | 15:6f2798e45099 | 202 | if (ret_val > 0) { |
ampembeng | 15:6f2798e45099 | 203 | total_bytes_read += ret_val; |
ampembeng | 15:6f2798e45099 | 204 | if((rem_len - total_bytes_read) >= c->readBufSize){ |
ampembeng | 15:6f2798e45099 | 205 | bytes_to_be_read = c->readBufSize; |
ampembeng | 15:6f2798e45099 | 206 | } |
ampembeng | 15:6f2798e45099 | 207 | else{ |
ampembeng | 15:6f2798e45099 | 208 | bytes_to_be_read = rem_len - total_bytes_read; |
ampembeng | 15:6f2798e45099 | 209 | } |
ampembeng | 15:6f2798e45099 | 210 | } |
ampembeng | 15:6f2798e45099 | 211 | } while (total_bytes_read < rem_len && ret_val > 0); |
ampembeng | 15:6f2798e45099 | 212 | return MQTTPACKET_BUFFER_TOO_SHORT; |
ampembeng | 15:6f2798e45099 | 213 | } |
ampembeng | 15:6f2798e45099 | 214 | |
ampembeng | 15:6f2798e45099 | 215 | /* put the original remaining length back into the buffer */ |
ampembeng | 15:6f2798e45099 | 216 | len += MQTTPacket_encode(c->readbuf + 1, rem_len); |
ampembeng | 15:6f2798e45099 | 217 | |
ampembeng | 15:6f2798e45099 | 218 | /* 3. read the rest of the buffer using a callback to supply the rest of the data */ |
ampembeng | 15:6f2798e45099 | 219 | if(rem_len > 0 && (c->networkStack.mqttread(&(c->networkStack), c->readbuf + len, (int)rem_len, left_ms(timer)) != (int)rem_len)) { |
ampembeng | 15:6f2798e45099 | 220 | ERROR("readPacket() FAILURE"); |
ampembeng | 15:6f2798e45099 | 221 | return FAILURE; |
ampembeng | 15:6f2798e45099 | 222 | } |
ampembeng | 15:6f2798e45099 | 223 | |
ampembeng | 15:6f2798e45099 | 224 | header.byte = c->readbuf[0]; |
ampembeng | 15:6f2798e45099 | 225 | *packet_type = header.bits.type; |
ampembeng | 15:6f2798e45099 | 226 | |
ampembeng | 15:6f2798e45099 | 227 | return SUCCESS; |
ampembeng | 15:6f2798e45099 | 228 | } |
ampembeng | 15:6f2798e45099 | 229 | |
ampembeng | 15:6f2798e45099 | 230 | // assume topic filter and name is in correct format |
ampembeng | 15:6f2798e45099 | 231 | // # can only be at end |
ampembeng | 15:6f2798e45099 | 232 | // + and # can only be next to separator |
ampembeng | 15:6f2798e45099 | 233 | char isTopicMatched(char *topicFilter, MQTTString *topicName) { |
ampembeng | 15:6f2798e45099 | 234 | char *curf = NULL; |
ampembeng | 15:6f2798e45099 | 235 | char *curn = NULL; |
ampembeng | 15:6f2798e45099 | 236 | char *curn_end = NULL; |
ampembeng | 15:6f2798e45099 | 237 | |
ampembeng | 15:6f2798e45099 | 238 | if(NULL == topicFilter || NULL == topicName) { |
ampembeng | 15:6f2798e45099 | 239 | return MQTT_NULL_VALUE_ERROR; |
ampembeng | 15:6f2798e45099 | 240 | } |
ampembeng | 15:6f2798e45099 | 241 | |
ampembeng | 15:6f2798e45099 | 242 | curf = topicFilter; |
ampembeng | 15:6f2798e45099 | 243 | curn = topicName->lenstring.data; |
ampembeng | 15:6f2798e45099 | 244 | curn_end = curn + topicName->lenstring.len; |
ampembeng | 15:6f2798e45099 | 245 | |
ampembeng | 15:6f2798e45099 | 246 | while(*curf && (curn < curn_end)) { |
ampembeng | 15:6f2798e45099 | 247 | if(*curn == '/' && *curf != '/') { |
ampembeng | 15:6f2798e45099 | 248 | break; |
ampembeng | 15:6f2798e45099 | 249 | } |
ampembeng | 15:6f2798e45099 | 250 | if(*curf != '+' && *curf != '#' && *curf != *curn) { |
ampembeng | 15:6f2798e45099 | 251 | break; |
ampembeng | 15:6f2798e45099 | 252 | } |
ampembeng | 15:6f2798e45099 | 253 | if(*curf == '+') { |
ampembeng | 15:6f2798e45099 | 254 | /* skip until we meet the next separator, or end of string */ |
ampembeng | 15:6f2798e45099 | 255 | char *nextpos = curn + 1; |
ampembeng | 15:6f2798e45099 | 256 | while(nextpos < curn_end && *nextpos != '/') |
ampembeng | 15:6f2798e45099 | 257 | nextpos = ++curn + 1; |
ampembeng | 15:6f2798e45099 | 258 | } else if(*curf == '#') { |
ampembeng | 15:6f2798e45099 | 259 | /* skip until end of string */ |
ampembeng | 15:6f2798e45099 | 260 | curn = curn_end - 1; |
ampembeng | 15:6f2798e45099 | 261 | } |
ampembeng | 15:6f2798e45099 | 262 | |
ampembeng | 15:6f2798e45099 | 263 | curf++; |
ampembeng | 15:6f2798e45099 | 264 | curn++; |
ampembeng | 15:6f2798e45099 | 265 | }; |
ampembeng | 15:6f2798e45099 | 266 | |
ampembeng | 15:6f2798e45099 | 267 | return (curn == curn_end) && (*curf == '\0'); |
ampembeng | 15:6f2798e45099 | 268 | } |
ampembeng | 15:6f2798e45099 | 269 | |
ampembeng | 15:6f2798e45099 | 270 | MQTTReturnCode deliverMessage(Client *c, MQTTString *topicName, MQTTMessage *message) { |
ampembeng | 15:6f2798e45099 | 271 | uint32_t i; |
ampembeng | 15:6f2798e45099 | 272 | MessageData md; |
ampembeng | 15:6f2798e45099 | 273 | |
ampembeng | 15:6f2798e45099 | 274 | if(NULL == c || NULL == topicName || NULL == message) { |
ampembeng | 15:6f2798e45099 | 275 | return MQTT_NULL_VALUE_ERROR; |
ampembeng | 15:6f2798e45099 | 276 | } |
ampembeng | 15:6f2798e45099 | 277 | |
ampembeng | 15:6f2798e45099 | 278 | // we have to find the right message handler - indexed by topic |
ampembeng | 15:6f2798e45099 | 279 | for(i = 0; i < MAX_MESSAGE_HANDLERS; ++i) { |
ampembeng | 15:6f2798e45099 | 280 | if((c->messageHandlers[i].topicFilter != 0) |
ampembeng | 15:6f2798e45099 | 281 | && (MQTTPacket_equals(topicName, (char*)c->messageHandlers[i].topicFilter) || |
ampembeng | 15:6f2798e45099 | 282 | isTopicMatched((char*)c->messageHandlers[i].topicFilter, topicName))) { |
ampembeng | 15:6f2798e45099 | 283 | if(c->messageHandlers[i].fp != NULL) { |
ampembeng | 15:6f2798e45099 | 284 | NewMessageData(&md, topicName, message, c->messageHandlers[i].applicationHandler); |
ampembeng | 15:6f2798e45099 | 285 | c->messageHandlers[i].fp(&md); |
ampembeng | 15:6f2798e45099 | 286 | return SUCCESS; |
ampembeng | 15:6f2798e45099 | 287 | } |
ampembeng | 15:6f2798e45099 | 288 | } |
ampembeng | 15:6f2798e45099 | 289 | } |
ampembeng | 15:6f2798e45099 | 290 | |
ampembeng | 15:6f2798e45099 | 291 | if(NULL != c->defaultMessageHandler) { |
ampembeng | 15:6f2798e45099 | 292 | NewMessageData(&md, topicName, message, NULL); |
ampembeng | 15:6f2798e45099 | 293 | c->defaultMessageHandler(&md); |
ampembeng | 15:6f2798e45099 | 294 | return SUCCESS; |
ampembeng | 15:6f2798e45099 | 295 | } |
ampembeng | 15:6f2798e45099 | 296 | |
ampembeng | 15:6f2798e45099 | 297 | /* Message handler not found for topic */ |
ampembeng | 15:6f2798e45099 | 298 | return FAILURE; |
ampembeng | 15:6f2798e45099 | 299 | } |
ampembeng | 15:6f2798e45099 | 300 | |
ampembeng | 15:6f2798e45099 | 301 | MQTTReturnCode handleDisconnect(Client *c) { |
ampembeng | 15:6f2798e45099 | 302 | MQTTReturnCode rc; |
ampembeng | 15:6f2798e45099 | 303 | |
ampembeng | 15:6f2798e45099 | 304 | if(NULL == c) { |
ampembeng | 15:6f2798e45099 | 305 | return MQTT_NULL_VALUE_ERROR; |
ampembeng | 15:6f2798e45099 | 306 | } |
ampembeng | 15:6f2798e45099 | 307 | |
ampembeng | 15:6f2798e45099 | 308 | rc = MQTTDisconnect(c); |
ampembeng | 15:6f2798e45099 | 309 | if(rc != SUCCESS){ |
ampembeng | 15:6f2798e45099 | 310 | // If the sendPacket prevents us from sending a disconnect packet then we have to clean the stack |
ampembeng | 15:6f2798e45099 | 311 | MQTTForceDisconnect(c); |
ampembeng | 15:6f2798e45099 | 312 | } |
ampembeng | 15:6f2798e45099 | 313 | |
ampembeng | 15:6f2798e45099 | 314 | if(NULL != c->disconnectHandler) { |
ampembeng | 15:6f2798e45099 | 315 | c->disconnectHandler(); |
ampembeng | 15:6f2798e45099 | 316 | } |
ampembeng | 15:6f2798e45099 | 317 | |
ampembeng | 15:6f2798e45099 | 318 | /* Reset to 0 since this was not a manual disconnect */ |
ampembeng | 15:6f2798e45099 | 319 | c->wasManuallyDisconnected = 0; |
ampembeng | 15:6f2798e45099 | 320 | return MQTT_NETWORK_DISCONNECTED_ERROR; |
ampembeng | 15:6f2798e45099 | 321 | } |
ampembeng | 15:6f2798e45099 | 322 | |
ampembeng | 15:6f2798e45099 | 323 | MQTTReturnCode MQTTAttemptReconnect(Client *c) { |
ampembeng | 15:6f2798e45099 | 324 | MQTTReturnCode rc = MQTT_ATTEMPTING_RECONNECT; |
ampembeng | 15:6f2798e45099 | 325 | |
ampembeng | 15:6f2798e45099 | 326 | if(NULL == c) { |
ampembeng | 15:6f2798e45099 | 327 | return MQTT_NULL_VALUE_ERROR; |
ampembeng | 15:6f2798e45099 | 328 | } |
ampembeng | 15:6f2798e45099 | 329 | |
ampembeng | 15:6f2798e45099 | 330 | if(1 == c->isConnected) { |
ampembeng | 15:6f2798e45099 | 331 | return MQTT_NETWORK_ALREADY_CONNECTED_ERROR; |
ampembeng | 15:6f2798e45099 | 332 | } |
ampembeng | 15:6f2798e45099 | 333 | |
ampembeng | 15:6f2798e45099 | 334 | /* Ignoring return code. failures expected if network is disconnected */ |
ampembeng | 15:6f2798e45099 | 335 | rc = MQTTConnect(c, NULL); |
ampembeng | 15:6f2798e45099 | 336 | |
ampembeng | 15:6f2798e45099 | 337 | /* If still disconnected handle disconnect */ |
ampembeng | 15:6f2798e45099 | 338 | if(0 == c->isConnected) { |
ampembeng | 15:6f2798e45099 | 339 | return MQTT_ATTEMPTING_RECONNECT; |
ampembeng | 15:6f2798e45099 | 340 | } |
ampembeng | 15:6f2798e45099 | 341 | |
ampembeng | 15:6f2798e45099 | 342 | rc = MQTTResubscribe(c); |
ampembeng | 15:6f2798e45099 | 343 | if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 344 | return rc; |
ampembeng | 15:6f2798e45099 | 345 | } |
ampembeng | 15:6f2798e45099 | 346 | |
ampembeng | 15:6f2798e45099 | 347 | return MQTT_NETWORK_RECONNECTED; |
ampembeng | 15:6f2798e45099 | 348 | } |
ampembeng | 15:6f2798e45099 | 349 | |
ampembeng | 15:6f2798e45099 | 350 | MQTTReturnCode handleReconnect(Client *c) { |
ampembeng | 15:6f2798e45099 | 351 | int8_t isPhysicalLayerConnected = 1; |
ampembeng | 15:6f2798e45099 | 352 | MQTTReturnCode rc = MQTT_NETWORK_RECONNECTED; |
ampembeng | 15:6f2798e45099 | 353 | |
ampembeng | 15:6f2798e45099 | 354 | if(NULL == c) { |
ampembeng | 15:6f2798e45099 | 355 | return MQTT_NULL_VALUE_ERROR; |
ampembeng | 15:6f2798e45099 | 356 | } |
ampembeng | 15:6f2798e45099 | 357 | |
ampembeng | 15:6f2798e45099 | 358 | if(!expired(&(c->reconnectDelayTimer))) { |
ampembeng | 15:6f2798e45099 | 359 | /* Timer has not expired. Not time to attempt reconnect yet. |
ampembeng | 15:6f2798e45099 | 360 | * Return attempting reconnect */ |
ampembeng | 15:6f2798e45099 | 361 | return MQTT_ATTEMPTING_RECONNECT; |
ampembeng | 15:6f2798e45099 | 362 | } |
ampembeng | 15:6f2798e45099 | 363 | |
ampembeng | 15:6f2798e45099 | 364 | if(NULL != c->networkStack.isConnected) { |
ampembeng | 15:6f2798e45099 | 365 | isPhysicalLayerConnected = (int8_t)c->networkStack.isConnected(&(c->networkStack)); |
ampembeng | 15:6f2798e45099 | 366 | } |
ampembeng | 15:6f2798e45099 | 367 | |
ampembeng | 15:6f2798e45099 | 368 | if(isPhysicalLayerConnected) { |
ampembeng | 15:6f2798e45099 | 369 | rc = MQTTAttemptReconnect(c); |
ampembeng | 15:6f2798e45099 | 370 | if(MQTT_NETWORK_RECONNECTED == rc) { |
ampembeng | 15:6f2798e45099 | 371 | return MQTT_NETWORK_RECONNECTED; |
ampembeng | 15:6f2798e45099 | 372 | } |
ampembeng | 15:6f2798e45099 | 373 | } |
ampembeng | 15:6f2798e45099 | 374 | |
ampembeng | 15:6f2798e45099 | 375 | c->currentReconnectWaitInterval *= 2; |
ampembeng | 15:6f2798e45099 | 376 | |
ampembeng | 15:6f2798e45099 | 377 | if(MAX_RECONNECT_WAIT_INTERVAL < c->currentReconnectWaitInterval) { |
ampembeng | 15:6f2798e45099 | 378 | return MQTT_RECONNECT_TIMED_OUT; |
ampembeng | 15:6f2798e45099 | 379 | } |
ampembeng | 15:6f2798e45099 | 380 | countdown_ms(&(c->reconnectDelayTimer), c->currentReconnectWaitInterval); |
ampembeng | 15:6f2798e45099 | 381 | return rc; |
ampembeng | 15:6f2798e45099 | 382 | } |
ampembeng | 15:6f2798e45099 | 383 | |
ampembeng | 15:6f2798e45099 | 384 | MQTTReturnCode keepalive(Client *c) { |
ampembeng | 15:6f2798e45099 | 385 | MQTTReturnCode rc = SUCCESS; |
ampembeng | 15:6f2798e45099 | 386 | Timer timer; |
ampembeng | 15:6f2798e45099 | 387 | uint32_t serialized_len = 0; |
ampembeng | 15:6f2798e45099 | 388 | |
ampembeng | 15:6f2798e45099 | 389 | if(NULL == c) { |
ampembeng | 15:6f2798e45099 | 390 | return MQTT_NULL_VALUE_ERROR; |
ampembeng | 15:6f2798e45099 | 391 | } |
ampembeng | 15:6f2798e45099 | 392 | |
ampembeng | 15:6f2798e45099 | 393 | if(0 == c->keepAliveInterval) { |
ampembeng | 15:6f2798e45099 | 394 | return SUCCESS; |
ampembeng | 15:6f2798e45099 | 395 | } |
ampembeng | 15:6f2798e45099 | 396 | |
ampembeng | 15:6f2798e45099 | 397 | if(!expired(&c->pingTimer)) { |
ampembeng | 15:6f2798e45099 | 398 | return SUCCESS; |
ampembeng | 15:6f2798e45099 | 399 | } |
ampembeng | 15:6f2798e45099 | 400 | |
ampembeng | 15:6f2798e45099 | 401 | if(c->isPingOutstanding) { |
ampembeng | 15:6f2798e45099 | 402 | return handleDisconnect(c); |
ampembeng | 15:6f2798e45099 | 403 | } |
ampembeng | 15:6f2798e45099 | 404 | |
ampembeng | 15:6f2798e45099 | 405 | /* there is no ping outstanding - send one */ |
ampembeng | 15:6f2798e45099 | 406 | InitTimer(&timer); |
ampembeng | 15:6f2798e45099 | 407 | countdown_ms(&timer, c->commandTimeoutMs); |
ampembeng | 15:6f2798e45099 | 408 | rc = MQTTSerialize_pingreq(c->buf, c->bufSize, &serialized_len); |
ampembeng | 15:6f2798e45099 | 409 | if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 410 | return rc; |
ampembeng | 15:6f2798e45099 | 411 | } |
ampembeng | 15:6f2798e45099 | 412 | |
ampembeng | 15:6f2798e45099 | 413 | /* send the ping packet */ |
ampembeng | 15:6f2798e45099 | 414 | rc = sendPacket(c, serialized_len, &timer); |
ampembeng | 15:6f2798e45099 | 415 | if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 416 | //If sending a PING fails we can no longer determine if we are connected. In this case we decide we are disconnected and begin reconnection attempts |
ampembeng | 15:6f2798e45099 | 417 | return handleDisconnect(c); |
ampembeng | 15:6f2798e45099 | 418 | } |
ampembeng | 15:6f2798e45099 | 419 | |
ampembeng | 15:6f2798e45099 | 420 | c->isPingOutstanding = 1; |
ampembeng | 15:6f2798e45099 | 421 | /* start a timer to wait for PINGRESP from server */ |
ampembeng | 15:6f2798e45099 | 422 | countdown(&c->pingTimer, c->keepAliveInterval / 2); |
ampembeng | 15:6f2798e45099 | 423 | |
ampembeng | 15:6f2798e45099 | 424 | return SUCCESS; |
ampembeng | 15:6f2798e45099 | 425 | } |
ampembeng | 15:6f2798e45099 | 426 | |
ampembeng | 15:6f2798e45099 | 427 | MQTTReturnCode handlePublish(Client *c, Timer *timer) { |
ampembeng | 15:6f2798e45099 | 428 | MQTTString topicName; |
ampembeng | 15:6f2798e45099 | 429 | MQTTMessage msg; |
ampembeng | 15:6f2798e45099 | 430 | MQTTReturnCode rc; |
ampembeng | 15:6f2798e45099 | 431 | uint32_t len = 0; |
ampembeng | 15:6f2798e45099 | 432 | |
ampembeng | 15:6f2798e45099 | 433 | rc = MQTTDeserialize_publish((unsigned char *) &msg.dup, (QoS *) &msg.qos, (unsigned char *) &msg.retained, |
ampembeng | 15:6f2798e45099 | 434 | (uint16_t *)&msg.id, &topicName, |
ampembeng | 15:6f2798e45099 | 435 | (unsigned char **) &msg.payload, (uint32_t *) &msg.payloadlen, c->readbuf, |
ampembeng | 15:6f2798e45099 | 436 | c->readBufSize); |
ampembeng | 15:6f2798e45099 | 437 | if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 438 | return rc; |
ampembeng | 15:6f2798e45099 | 439 | } |
ampembeng | 15:6f2798e45099 | 440 | |
ampembeng | 15:6f2798e45099 | 441 | rc = deliverMessage(c, &topicName, &msg); |
ampembeng | 15:6f2798e45099 | 442 | if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 443 | return rc; |
ampembeng | 15:6f2798e45099 | 444 | } |
ampembeng | 15:6f2798e45099 | 445 | |
ampembeng | 15:6f2798e45099 | 446 | if(QOS0 == msg.qos) { |
ampembeng | 15:6f2798e45099 | 447 | /* No further processing required for QOS0 */ |
ampembeng | 15:6f2798e45099 | 448 | return SUCCESS; |
ampembeng | 15:6f2798e45099 | 449 | } |
ampembeng | 15:6f2798e45099 | 450 | |
ampembeng | 15:6f2798e45099 | 451 | if(QOS1 == msg.qos) { |
ampembeng | 15:6f2798e45099 | 452 | rc = MQTTSerialize_ack(c->buf, c->bufSize, PUBACK, 0, msg.id, &len); |
ampembeng | 15:6f2798e45099 | 453 | } else { /* Message is not QOS0 or 1 means only option left is QOS2 */ |
ampembeng | 15:6f2798e45099 | 454 | rc = MQTTSerialize_ack(c->buf, c->bufSize, PUBREC, 0, msg.id, &len); |
ampembeng | 15:6f2798e45099 | 455 | } |
ampembeng | 15:6f2798e45099 | 456 | |
ampembeng | 15:6f2798e45099 | 457 | if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 458 | return rc; |
ampembeng | 15:6f2798e45099 | 459 | } |
ampembeng | 15:6f2798e45099 | 460 | |
ampembeng | 15:6f2798e45099 | 461 | rc = sendPacket(c, len, timer); |
ampembeng | 15:6f2798e45099 | 462 | if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 463 | return rc; |
ampembeng | 15:6f2798e45099 | 464 | } |
ampembeng | 15:6f2798e45099 | 465 | |
ampembeng | 15:6f2798e45099 | 466 | return SUCCESS; |
ampembeng | 15:6f2798e45099 | 467 | } |
ampembeng | 15:6f2798e45099 | 468 | |
ampembeng | 15:6f2798e45099 | 469 | MQTTReturnCode handlePubrec(Client *c, Timer *timer) { |
ampembeng | 15:6f2798e45099 | 470 | uint16_t packet_id; |
ampembeng | 15:6f2798e45099 | 471 | unsigned char dup, type; |
ampembeng | 15:6f2798e45099 | 472 | MQTTReturnCode rc; |
ampembeng | 15:6f2798e45099 | 473 | uint32_t len; |
ampembeng | 15:6f2798e45099 | 474 | |
ampembeng | 15:6f2798e45099 | 475 | rc = MQTTDeserialize_ack(&type, &dup, &packet_id, c->readbuf, c->readBufSize); |
ampembeng | 15:6f2798e45099 | 476 | if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 477 | return rc; |
ampembeng | 15:6f2798e45099 | 478 | } |
ampembeng | 15:6f2798e45099 | 479 | |
ampembeng | 15:6f2798e45099 | 480 | rc = MQTTSerialize_ack(c->buf, c->bufSize, PUBREL, 0, packet_id, &len); |
ampembeng | 15:6f2798e45099 | 481 | if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 482 | return rc; |
ampembeng | 15:6f2798e45099 | 483 | } |
ampembeng | 15:6f2798e45099 | 484 | |
ampembeng | 15:6f2798e45099 | 485 | /* send the PUBREL packet */ |
ampembeng | 15:6f2798e45099 | 486 | rc = sendPacket(c, len, timer); |
ampembeng | 15:6f2798e45099 | 487 | if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 488 | /* there was a problem */ |
ampembeng | 15:6f2798e45099 | 489 | return rc; |
ampembeng | 15:6f2798e45099 | 490 | } |
ampembeng | 15:6f2798e45099 | 491 | |
ampembeng | 15:6f2798e45099 | 492 | return SUCCESS; |
ampembeng | 15:6f2798e45099 | 493 | } |
ampembeng | 15:6f2798e45099 | 494 | |
ampembeng | 15:6f2798e45099 | 495 | MQTTReturnCode cycle(Client *c, Timer *timer, uint8_t *packet_type) { |
ampembeng | 15:6f2798e45099 | 496 | MQTTReturnCode rc; |
ampembeng | 15:6f2798e45099 | 497 | if(NULL == c || NULL == timer) { |
ampembeng | 15:6f2798e45099 | 498 | ERROR("cycle() MQTT_NULL_VALUE_ERROR"); |
ampembeng | 15:6f2798e45099 | 499 | return MQTT_NULL_VALUE_ERROR; |
ampembeng | 15:6f2798e45099 | 500 | } |
ampembeng | 15:6f2798e45099 | 501 | |
ampembeng | 15:6f2798e45099 | 502 | /* read the socket, see what work is due */ |
ampembeng | 15:6f2798e45099 | 503 | rc = readPacket(c, timer, packet_type); |
ampembeng | 15:6f2798e45099 | 504 | if(MQTT_NOTHING_TO_READ == rc) { |
ampembeng | 15:6f2798e45099 | 505 | /* Nothing to read, not a cycle failure */ |
ampembeng | 15:6f2798e45099 | 506 | return SUCCESS; |
ampembeng | 15:6f2798e45099 | 507 | } |
ampembeng | 15:6f2798e45099 | 508 | if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 509 | ERROR("cycle() SUCCESS != rc"); |
ampembeng | 15:6f2798e45099 | 510 | return rc; |
ampembeng | 15:6f2798e45099 | 511 | } |
ampembeng | 15:6f2798e45099 | 512 | |
ampembeng | 15:6f2798e45099 | 513 | switch(*packet_type) { |
ampembeng | 15:6f2798e45099 | 514 | case CONNACK: |
ampembeng | 15:6f2798e45099 | 515 | case PUBACK: |
ampembeng | 15:6f2798e45099 | 516 | case SUBACK: |
ampembeng | 15:6f2798e45099 | 517 | case UNSUBACK: |
ampembeng | 15:6f2798e45099 | 518 | break; |
ampembeng | 15:6f2798e45099 | 519 | case PUBLISH: { |
ampembeng | 15:6f2798e45099 | 520 | rc = handlePublish(c, timer); |
ampembeng | 15:6f2798e45099 | 521 | break; |
ampembeng | 15:6f2798e45099 | 522 | } |
ampembeng | 15:6f2798e45099 | 523 | case PUBREC: { |
ampembeng | 15:6f2798e45099 | 524 | rc = handlePubrec(c, timer); |
ampembeng | 15:6f2798e45099 | 525 | break; |
ampembeng | 15:6f2798e45099 | 526 | } |
ampembeng | 15:6f2798e45099 | 527 | case PUBCOMP: |
ampembeng | 15:6f2798e45099 | 528 | break; |
ampembeng | 15:6f2798e45099 | 529 | case PINGRESP: { |
ampembeng | 15:6f2798e45099 | 530 | c->isPingOutstanding = 0; |
ampembeng | 15:6f2798e45099 | 531 | countdown(&c->pingTimer, c->keepAliveInterval); |
ampembeng | 15:6f2798e45099 | 532 | break; |
ampembeng | 15:6f2798e45099 | 533 | } |
ampembeng | 15:6f2798e45099 | 534 | default: { |
ampembeng | 15:6f2798e45099 | 535 | /* Either unknown packet type or Failure occurred |
ampembeng | 15:6f2798e45099 | 536 | * Should not happen */ |
ampembeng | 15:6f2798e45099 | 537 | ERROR("cycle() Either unknown packet type or Failure occurred"); |
ampembeng | 15:6f2798e45099 | 538 | return MQTT_BUFFER_RX_MESSAGE_INVALID; |
ampembeng | 15:6f2798e45099 | 539 | break; |
ampembeng | 15:6f2798e45099 | 540 | } |
ampembeng | 15:6f2798e45099 | 541 | } |
ampembeng | 15:6f2798e45099 | 542 | |
ampembeng | 15:6f2798e45099 | 543 | return rc; |
ampembeng | 15:6f2798e45099 | 544 | } |
ampembeng | 15:6f2798e45099 | 545 | |
ampembeng | 15:6f2798e45099 | 546 | MQTTReturnCode MQTTYield(Client *c, uint32_t timeout_ms) { |
ampembeng | 15:6f2798e45099 | 547 | MQTTReturnCode rc = SUCCESS; |
ampembeng | 15:6f2798e45099 | 548 | Timer timer; |
ampembeng | 15:6f2798e45099 | 549 | uint8_t packet_type; |
ampembeng | 15:6f2798e45099 | 550 | |
ampembeng | 15:6f2798e45099 | 551 | if(NULL == c) { |
ampembeng | 15:6f2798e45099 | 552 | ERROR("MQTTYield() MQTT_NULL_VALUE_ERROR"); |
ampembeng | 15:6f2798e45099 | 553 | return MQTT_NULL_VALUE_ERROR; |
ampembeng | 15:6f2798e45099 | 554 | } |
ampembeng | 15:6f2798e45099 | 555 | |
ampembeng | 15:6f2798e45099 | 556 | /* Check if network was manually disconnected */ |
ampembeng | 15:6f2798e45099 | 557 | if(0 == c->isConnected && 1 == c->wasManuallyDisconnected) { |
ampembeng | 15:6f2798e45099 | 558 | ERROR("MQTTYield() MQTT_NETWORK_MANUALLY_DISCONNECTED"); |
ampembeng | 15:6f2798e45099 | 559 | return MQTT_NETWORK_MANUALLY_DISCONNECTED; |
ampembeng | 15:6f2798e45099 | 560 | } |
ampembeng | 15:6f2798e45099 | 561 | |
ampembeng | 15:6f2798e45099 | 562 | /* Check if network is disconnected and auto-reconnect is not enabled */ |
ampembeng | 15:6f2798e45099 | 563 | if(0 == c->isConnected && 0 == c->isAutoReconnectEnabled) { |
ampembeng | 15:6f2798e45099 | 564 | ERROR("MQTTYield() MQTT_NETWORK_DISCONNECTED_ERROR"); |
ampembeng | 15:6f2798e45099 | 565 | return MQTT_NETWORK_DISCONNECTED_ERROR; |
ampembeng | 15:6f2798e45099 | 566 | } |
ampembeng | 15:6f2798e45099 | 567 | |
ampembeng | 15:6f2798e45099 | 568 | InitTimer(&timer); |
ampembeng | 15:6f2798e45099 | 569 | countdown_ms(&timer, timeout_ms); |
ampembeng | 15:6f2798e45099 | 570 | |
ampembeng | 15:6f2798e45099 | 571 | while(!expired(&timer)) { |
ampembeng | 15:6f2798e45099 | 572 | if(0 == c->isConnected) { |
ampembeng | 15:6f2798e45099 | 573 | if(MAX_RECONNECT_WAIT_INTERVAL < c->currentReconnectWaitInterval) { |
ampembeng | 15:6f2798e45099 | 574 | rc = MQTT_RECONNECT_TIMED_OUT; |
ampembeng | 15:6f2798e45099 | 575 | ERROR("MQTTYield() MQTT_RECONNECT_TIMED_OUT"); |
ampembeng | 15:6f2798e45099 | 576 | break; |
ampembeng | 15:6f2798e45099 | 577 | } |
ampembeng | 15:6f2798e45099 | 578 | |
ampembeng | 15:6f2798e45099 | 579 | rc = handleReconnect(c); |
ampembeng | 15:6f2798e45099 | 580 | /* Network reconnect attempted, check if yield timer expired before |
ampembeng | 15:6f2798e45099 | 581 | * doing anything else */ |
ampembeng | 15:6f2798e45099 | 582 | continue; |
ampembeng | 15:6f2798e45099 | 583 | } |
ampembeng | 15:6f2798e45099 | 584 | |
ampembeng | 15:6f2798e45099 | 585 | rc = cycle(c, &timer, &packet_type); |
ampembeng | 15:6f2798e45099 | 586 | if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 587 | ERROR("MQTTYield() SUCCESS != rc"); |
ampembeng | 15:6f2798e45099 | 588 | break; |
ampembeng | 15:6f2798e45099 | 589 | } |
ampembeng | 15:6f2798e45099 | 590 | |
ampembeng | 15:6f2798e45099 | 591 | rc = keepalive(c); |
ampembeng | 15:6f2798e45099 | 592 | if(MQTT_NETWORK_DISCONNECTED_ERROR == rc && 1 == c->isAutoReconnectEnabled) { |
ampembeng | 15:6f2798e45099 | 593 | c->currentReconnectWaitInterval = MIN_RECONNECT_WAIT_INTERVAL; |
ampembeng | 15:6f2798e45099 | 594 | countdown_ms(&(c->reconnectDelayTimer), c->currentReconnectWaitInterval); |
ampembeng | 15:6f2798e45099 | 595 | c->counterNetworkDisconnected++; |
ampembeng | 15:6f2798e45099 | 596 | /* Depending on timer values, it is possible that yield timer has expired |
ampembeng | 15:6f2798e45099 | 597 | * Set to rc to attempting reconnect to inform client that autoreconnect |
ampembeng | 15:6f2798e45099 | 598 | * attempt has started */ |
ampembeng | 15:6f2798e45099 | 599 | INFO("MQTTYield() MQTT_ATTEMPTING_RECONNECT"); |
ampembeng | 15:6f2798e45099 | 600 | rc = MQTT_ATTEMPTING_RECONNECT; |
ampembeng | 15:6f2798e45099 | 601 | } else if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 602 | ERROR("MQTTYield() SUCCESS != rc"); |
ampembeng | 15:6f2798e45099 | 603 | break; |
ampembeng | 15:6f2798e45099 | 604 | } |
ampembeng | 15:6f2798e45099 | 605 | } |
ampembeng | 15:6f2798e45099 | 606 | |
ampembeng | 15:6f2798e45099 | 607 | return rc; |
ampembeng | 15:6f2798e45099 | 608 | } |
ampembeng | 15:6f2798e45099 | 609 | |
ampembeng | 15:6f2798e45099 | 610 | /* only used in single-threaded mode where one command at a time is in process */ |
ampembeng | 15:6f2798e45099 | 611 | MQTTReturnCode waitfor(Client *c, uint8_t packet_type, Timer *timer) { |
ampembeng | 15:6f2798e45099 | 612 | MQTTReturnCode rc = FAILURE; |
ampembeng | 15:6f2798e45099 | 613 | uint8_t read_packet_type = 0, retry = 2; |
ampembeng | 15:6f2798e45099 | 614 | |
ampembeng | 15:6f2798e45099 | 615 | |
ampembeng | 15:6f2798e45099 | 616 | |
ampembeng | 15:6f2798e45099 | 617 | if(NULL == c || NULL == timer) { |
ampembeng | 15:6f2798e45099 | 618 | ERROR("waitfor() MQTT_NULL_VALUE_ERROR"); |
ampembeng | 15:6f2798e45099 | 619 | return MQTT_NULL_VALUE_ERROR; |
ampembeng | 15:6f2798e45099 | 620 | } |
ampembeng | 15:6f2798e45099 | 621 | |
ampembeng | 15:6f2798e45099 | 622 | do { |
ampembeng | 15:6f2798e45099 | 623 | if(expired(timer)) { |
ampembeng | 15:6f2798e45099 | 624 | /* we timed out */ |
ampembeng | 15:6f2798e45099 | 625 | ERROR("waitfor() timer expired"); |
ampembeng | 15:6f2798e45099 | 626 | break; |
ampembeng | 15:6f2798e45099 | 627 | |
ampembeng | 15:6f2798e45099 | 628 | //countdown_ms(timer, 10); |
ampembeng | 15:6f2798e45099 | 629 | } |
ampembeng | 15:6f2798e45099 | 630 | |
ampembeng | 15:6f2798e45099 | 631 | rc = cycle(c, timer, &read_packet_type); |
ampembeng | 15:6f2798e45099 | 632 | }while(MQTT_NETWORK_DISCONNECTED_ERROR != rc && read_packet_type != packet_type); |
ampembeng | 15:6f2798e45099 | 633 | |
ampembeng | 15:6f2798e45099 | 634 | if(MQTT_NETWORK_DISCONNECTED_ERROR != rc && read_packet_type != packet_type) { |
ampembeng | 15:6f2798e45099 | 635 | ERROR("waitfor() MQTT_NETWORK_DISCONNECTED_ERROR"); |
ampembeng | 15:6f2798e45099 | 636 | return FAILURE; |
ampembeng | 15:6f2798e45099 | 637 | } |
ampembeng | 15:6f2798e45099 | 638 | |
ampembeng | 15:6f2798e45099 | 639 | /* Something failed or we didn't receive the expected packet, return error code */ |
ampembeng | 15:6f2798e45099 | 640 | return rc; |
ampembeng | 15:6f2798e45099 | 641 | } |
ampembeng | 15:6f2798e45099 | 642 | |
ampembeng | 15:6f2798e45099 | 643 | MQTTReturnCode MQTTConnect(Client *c, MQTTPacket_connectData *options) { |
ampembeng | 15:6f2798e45099 | 644 | Timer connect_timer; |
ampembeng | 15:6f2798e45099 | 645 | MQTTReturnCode connack_rc = FAILURE; |
ampembeng | 15:6f2798e45099 | 646 | char sessionPresent = 0; |
ampembeng | 15:6f2798e45099 | 647 | uint32_t len = 0; |
ampembeng | 15:6f2798e45099 | 648 | MQTTReturnCode rc = FAILURE; |
ampembeng | 15:6f2798e45099 | 649 | |
ampembeng | 15:6f2798e45099 | 650 | if(NULL == c) { |
ampembeng | 15:6f2798e45099 | 651 | return MQTT_NULL_VALUE_ERROR; |
ampembeng | 15:6f2798e45099 | 652 | } |
ampembeng | 15:6f2798e45099 | 653 | |
ampembeng | 15:6f2798e45099 | 654 | DEBUG("...connect_timer"); |
ampembeng | 15:6f2798e45099 | 655 | InitTimer(&connect_timer); |
ampembeng | 15:6f2798e45099 | 656 | countdown_ms(&connect_timer, c->commandTimeoutMs); |
ampembeng | 15:6f2798e45099 | 657 | |
ampembeng | 15:6f2798e45099 | 658 | if(c->isConnected) { |
ampembeng | 15:6f2798e45099 | 659 | /* Don't send connect packet again if we are already connected */ |
ampembeng | 15:6f2798e45099 | 660 | ERROR("...MQTT_NETWORK_ALREADY_CONNECTED_ERROR"); |
ampembeng | 15:6f2798e45099 | 661 | return MQTT_NETWORK_ALREADY_CONNECTED_ERROR; |
ampembeng | 15:6f2798e45099 | 662 | } |
ampembeng | 15:6f2798e45099 | 663 | |
ampembeng | 15:6f2798e45099 | 664 | if(NULL != options) { |
ampembeng | 15:6f2798e45099 | 665 | /* override default options if new options were supplied */ |
ampembeng | 15:6f2798e45099 | 666 | copyMQTTConnectData(&(c->options), options); |
ampembeng | 15:6f2798e45099 | 667 | } |
ampembeng | 15:6f2798e45099 | 668 | |
ampembeng | 15:6f2798e45099 | 669 | DEBUG("...TLS Connect"); |
ampembeng | 15:6f2798e45099 | 670 | c->networkInitHandler(&(c->networkStack)); |
ampembeng | 15:6f2798e45099 | 671 | rc = (MQTTReturnCode)c->networkStack.connect(&(c->networkStack), c->tlsConnectParams); |
ampembeng | 15:6f2798e45099 | 672 | if(0 != rc) { |
ampembeng | 15:6f2798e45099 | 673 | /* TLS Connect failed, return error */ |
ampembeng | 15:6f2798e45099 | 674 | ERROR("...TLS Connect failed, return error"); |
ampembeng | 15:6f2798e45099 | 675 | return FAILURE; |
ampembeng | 15:6f2798e45099 | 676 | } |
ampembeng | 15:6f2798e45099 | 677 | |
ampembeng | 15:6f2798e45099 | 678 | c->keepAliveInterval = c->options.keepAliveInterval; |
ampembeng | 15:6f2798e45099 | 679 | rc = MQTTSerialize_connect(c->buf, c->bufSize, &(c->options), &len); |
ampembeng | 15:6f2798e45099 | 680 | if(SUCCESS != rc || 0 >= len) { |
ampembeng | 15:6f2798e45099 | 681 | ERROR("...MQTTSerialize_connect FAIL"); |
ampembeng | 15:6f2798e45099 | 682 | return FAILURE; |
ampembeng | 15:6f2798e45099 | 683 | } |
ampembeng | 15:6f2798e45099 | 684 | |
ampembeng | 15:6f2798e45099 | 685 | /* send the connect packet */ |
ampembeng | 15:6f2798e45099 | 686 | rc = sendPacket(c, len, &connect_timer); |
ampembeng | 15:6f2798e45099 | 687 | if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 688 | ERROR("...sendPacket FAIL"); |
ampembeng | 15:6f2798e45099 | 689 | return rc; |
ampembeng | 15:6f2798e45099 | 690 | } |
ampembeng | 15:6f2798e45099 | 691 | |
ampembeng | 15:6f2798e45099 | 692 | /* this will be a blocking call, wait for the CONNACK */ |
ampembeng | 15:6f2798e45099 | 693 | rc = waitfor(c, CONNACK, &connect_timer); |
ampembeng | 15:6f2798e45099 | 694 | if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 695 | ERROR("...waitfor FAIL"); |
ampembeng | 15:6f2798e45099 | 696 | return rc; |
ampembeng | 15:6f2798e45099 | 697 | } |
ampembeng | 15:6f2798e45099 | 698 | |
ampembeng | 15:6f2798e45099 | 699 | /* Received CONNACK, check the return code */ |
ampembeng | 15:6f2798e45099 | 700 | rc = MQTTDeserialize_connack((unsigned char *)&sessionPresent, &connack_rc, c->readbuf, c->readBufSize); |
ampembeng | 15:6f2798e45099 | 701 | if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 702 | ERROR("...MQTTDeserialize_connack FAIL"); |
ampembeng | 15:6f2798e45099 | 703 | return rc; |
ampembeng | 15:6f2798e45099 | 704 | } |
ampembeng | 15:6f2798e45099 | 705 | |
ampembeng | 15:6f2798e45099 | 706 | if(MQTT_CONNACK_CONNECTION_ACCEPTED != connack_rc) { |
ampembeng | 15:6f2798e45099 | 707 | ERROR("...MQTT_CONNACK_CONNECTION_ACCEPTED FAIL"); |
ampembeng | 15:6f2798e45099 | 708 | return connack_rc; |
ampembeng | 15:6f2798e45099 | 709 | } |
ampembeng | 15:6f2798e45099 | 710 | |
ampembeng | 15:6f2798e45099 | 711 | DEBUG("...isConnected"); |
ampembeng | 15:6f2798e45099 | 712 | c->isConnected = 1; |
ampembeng | 15:6f2798e45099 | 713 | c->wasManuallyDisconnected = 0; |
ampembeng | 15:6f2798e45099 | 714 | c->isPingOutstanding = 0; |
ampembeng | 15:6f2798e45099 | 715 | countdown(&c->pingTimer, c->keepAliveInterval); |
ampembeng | 15:6f2798e45099 | 716 | |
ampembeng | 15:6f2798e45099 | 717 | return SUCCESS; |
ampembeng | 15:6f2798e45099 | 718 | } |
ampembeng | 15:6f2798e45099 | 719 | |
ampembeng | 15:6f2798e45099 | 720 | /* Return MAX_MESSAGE_HANDLERS value if no free index is available */ |
ampembeng | 15:6f2798e45099 | 721 | uint32_t GetFreeMessageHandlerIndex(Client *c) { |
ampembeng | 15:6f2798e45099 | 722 | uint32_t itr; |
ampembeng | 15:6f2798e45099 | 723 | for(itr = 0; itr < MAX_MESSAGE_HANDLERS; itr++) { |
ampembeng | 15:6f2798e45099 | 724 | if(c->messageHandlers[itr].topicFilter == NULL) { |
ampembeng | 15:6f2798e45099 | 725 | break; |
ampembeng | 15:6f2798e45099 | 726 | } |
ampembeng | 15:6f2798e45099 | 727 | } |
ampembeng | 15:6f2798e45099 | 728 | |
ampembeng | 15:6f2798e45099 | 729 | return itr; |
ampembeng | 15:6f2798e45099 | 730 | } |
ampembeng | 15:6f2798e45099 | 731 | |
ampembeng | 15:6f2798e45099 | 732 | MQTTReturnCode MQTTSubscribe(Client *c, const char *topicFilter, QoS qos, |
ampembeng | 15:6f2798e45099 | 733 | messageHandler messageHandler, pApplicationHandler_t applicationHandler) { |
ampembeng | 15:6f2798e45099 | 734 | MQTTReturnCode rc = FAILURE; |
ampembeng | 15:6f2798e45099 | 735 | Timer timer; |
ampembeng | 15:6f2798e45099 | 736 | uint32_t len = 0; |
ampembeng | 15:6f2798e45099 | 737 | uint32_t indexOfFreeMessageHandler; |
ampembeng | 15:6f2798e45099 | 738 | uint32_t count = 0; |
ampembeng | 15:6f2798e45099 | 739 | QoS grantedQoS[3] = {QOS0, QOS0, QOS0}; |
ampembeng | 15:6f2798e45099 | 740 | uint16_t packetId; |
ampembeng | 15:6f2798e45099 | 741 | MQTTString topic = MQTTString_initializer; |
ampembeng | 15:6f2798e45099 | 742 | |
ampembeng | 15:6f2798e45099 | 743 | if(NULL == c || NULL == topicFilter |
ampembeng | 15:6f2798e45099 | 744 | || NULL == messageHandler || NULL == applicationHandler) { |
ampembeng | 15:6f2798e45099 | 745 | ERROR("...MQTT_NULL_VALUE_ERROR FAIL"); |
ampembeng | 15:6f2798e45099 | 746 | return MQTT_NULL_VALUE_ERROR; |
ampembeng | 15:6f2798e45099 | 747 | } |
ampembeng | 15:6f2798e45099 | 748 | |
ampembeng | 15:6f2798e45099 | 749 | if(!c->isConnected) { |
ampembeng | 15:6f2798e45099 | 750 | ERROR("...MQTT_NETWORK_DISCONNECTED_ERROR FAIL"); |
ampembeng | 15:6f2798e45099 | 751 | return MQTT_NETWORK_DISCONNECTED_ERROR; |
ampembeng | 15:6f2798e45099 | 752 | } |
ampembeng | 15:6f2798e45099 | 753 | |
ampembeng | 15:6f2798e45099 | 754 | topic.cstring = (char *)topicFilter; |
ampembeng | 15:6f2798e45099 | 755 | |
ampembeng | 15:6f2798e45099 | 756 | InitTimer(&timer); |
ampembeng | 15:6f2798e45099 | 757 | countdown_ms(&timer, c->commandTimeoutMs); |
ampembeng | 15:6f2798e45099 | 758 | |
ampembeng | 15:6f2798e45099 | 759 | rc = MQTTSerialize_subscribe(c->buf, c->bufSize, 0, getNextPacketId(c), 1, &topic, &qos, &len); |
ampembeng | 15:6f2798e45099 | 760 | if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 761 | ERROR("...MQTTSerialize_subscribe FAIL"); |
ampembeng | 15:6f2798e45099 | 762 | return rc; |
ampembeng | 15:6f2798e45099 | 763 | } |
ampembeng | 15:6f2798e45099 | 764 | |
ampembeng | 15:6f2798e45099 | 765 | indexOfFreeMessageHandler = GetFreeMessageHandlerIndex(c); |
ampembeng | 15:6f2798e45099 | 766 | if(MAX_MESSAGE_HANDLERS <= indexOfFreeMessageHandler) { |
ampembeng | 15:6f2798e45099 | 767 | ERROR("...MQTT_MAX_SUBSCRIPTIONS_REACHED_ERROR FAIL"); |
ampembeng | 15:6f2798e45099 | 768 | return MQTT_MAX_SUBSCRIPTIONS_REACHED_ERROR; |
ampembeng | 15:6f2798e45099 | 769 | } |
ampembeng | 15:6f2798e45099 | 770 | |
ampembeng | 15:6f2798e45099 | 771 | /* send the subscribe packet */ |
ampembeng | 15:6f2798e45099 | 772 | rc = sendPacket(c, len, &timer); |
ampembeng | 15:6f2798e45099 | 773 | if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 774 | ERROR("...send the subscribe packet FAIL"); |
ampembeng | 15:6f2798e45099 | 775 | return rc; |
ampembeng | 15:6f2798e45099 | 776 | } |
ampembeng | 15:6f2798e45099 | 777 | |
ampembeng | 15:6f2798e45099 | 778 | /* wait for suback */ |
ampembeng | 15:6f2798e45099 | 779 | rc = waitfor(c, SUBACK, &timer); |
ampembeng | 15:6f2798e45099 | 780 | if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 781 | ERROR("...wait for suback FAIL"); |
ampembeng | 15:6f2798e45099 | 782 | return rc; |
ampembeng | 15:6f2798e45099 | 783 | } |
ampembeng | 15:6f2798e45099 | 784 | |
ampembeng | 15:6f2798e45099 | 785 | /* Granted QoS can be 0, 1 or 2 */ |
ampembeng | 15:6f2798e45099 | 786 | rc = MQTTDeserialize_suback(&packetId, 1, &count, grantedQoS, c->readbuf, c->readBufSize); |
ampembeng | 15:6f2798e45099 | 787 | if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 788 | ERROR("...Granted QoS can be 0, 1 or 2"); |
ampembeng | 15:6f2798e45099 | 789 | return rc; |
ampembeng | 15:6f2798e45099 | 790 | } |
ampembeng | 15:6f2798e45099 | 791 | |
ampembeng | 15:6f2798e45099 | 792 | c->messageHandlers[indexOfFreeMessageHandler].topicFilter = |
ampembeng | 15:6f2798e45099 | 793 | topicFilter; |
ampembeng | 15:6f2798e45099 | 794 | c->messageHandlers[indexOfFreeMessageHandler].fp = messageHandler; |
ampembeng | 15:6f2798e45099 | 795 | c->messageHandlers[indexOfFreeMessageHandler].applicationHandler = |
ampembeng | 15:6f2798e45099 | 796 | applicationHandler; |
ampembeng | 15:6f2798e45099 | 797 | c->messageHandlers[indexOfFreeMessageHandler].qos = qos; |
ampembeng | 15:6f2798e45099 | 798 | |
ampembeng | 15:6f2798e45099 | 799 | DEBUG("...MQTTSubscribe SUCCESS"); |
ampembeng | 15:6f2798e45099 | 800 | return SUCCESS; |
ampembeng | 15:6f2798e45099 | 801 | } |
ampembeng | 15:6f2798e45099 | 802 | |
ampembeng | 15:6f2798e45099 | 803 | MQTTReturnCode MQTTResubscribe(Client *c) { |
ampembeng | 15:6f2798e45099 | 804 | MQTTReturnCode rc = FAILURE; |
ampembeng | 15:6f2798e45099 | 805 | Timer timer; |
ampembeng | 15:6f2798e45099 | 806 | uint32_t len = 0; |
ampembeng | 15:6f2798e45099 | 807 | uint32_t count = 0; |
ampembeng | 15:6f2798e45099 | 808 | QoS grantedQoS[3] = {QOS0, QOS0, QOS0}; |
ampembeng | 15:6f2798e45099 | 809 | uint16_t packetId; |
ampembeng | 15:6f2798e45099 | 810 | uint32_t existingSubCount = 0; |
ampembeng | 15:6f2798e45099 | 811 | uint32_t itr = 0; |
ampembeng | 15:6f2798e45099 | 812 | |
ampembeng | 15:6f2798e45099 | 813 | if(NULL == c) { |
ampembeng | 15:6f2798e45099 | 814 | return MQTT_NULL_VALUE_ERROR; |
ampembeng | 15:6f2798e45099 | 815 | } |
ampembeng | 15:6f2798e45099 | 816 | |
ampembeng | 15:6f2798e45099 | 817 | if(!c->isConnected) { |
ampembeng | 15:6f2798e45099 | 818 | return MQTT_NETWORK_DISCONNECTED_ERROR; |
ampembeng | 15:6f2798e45099 | 819 | } |
ampembeng | 15:6f2798e45099 | 820 | |
ampembeng | 15:6f2798e45099 | 821 | existingSubCount = GetFreeMessageHandlerIndex(c); |
ampembeng | 15:6f2798e45099 | 822 | |
ampembeng | 15:6f2798e45099 | 823 | for(itr = 0; itr < existingSubCount; itr++) { |
ampembeng | 15:6f2798e45099 | 824 | MQTTString topic = MQTTString_initializer; |
ampembeng | 15:6f2798e45099 | 825 | topic.cstring = (char *)c->messageHandlers[itr].topicFilter; |
ampembeng | 15:6f2798e45099 | 826 | |
ampembeng | 15:6f2798e45099 | 827 | InitTimer(&timer); |
ampembeng | 15:6f2798e45099 | 828 | countdown_ms(&timer, c->commandTimeoutMs); |
ampembeng | 15:6f2798e45099 | 829 | |
ampembeng | 15:6f2798e45099 | 830 | rc = MQTTSerialize_subscribe(c->buf, c->bufSize, 0, getNextPacketId(c), 1, |
ampembeng | 15:6f2798e45099 | 831 | &topic, &(c->messageHandlers[itr].qos), &len); |
ampembeng | 15:6f2798e45099 | 832 | if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 833 | return rc; |
ampembeng | 15:6f2798e45099 | 834 | } |
ampembeng | 15:6f2798e45099 | 835 | |
ampembeng | 15:6f2798e45099 | 836 | /* send the subscribe packet */ |
ampembeng | 15:6f2798e45099 | 837 | rc = sendPacket(c, len, &timer); |
ampembeng | 15:6f2798e45099 | 838 | if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 839 | return rc; |
ampembeng | 15:6f2798e45099 | 840 | } |
ampembeng | 15:6f2798e45099 | 841 | |
ampembeng | 15:6f2798e45099 | 842 | /* wait for suback */ |
ampembeng | 15:6f2798e45099 | 843 | rc = waitfor(c, SUBACK, &timer); |
ampembeng | 15:6f2798e45099 | 844 | if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 845 | return rc; |
ampembeng | 15:6f2798e45099 | 846 | } |
ampembeng | 15:6f2798e45099 | 847 | |
ampembeng | 15:6f2798e45099 | 848 | /* Granted QoS can be 0, 1 or 2 */ |
ampembeng | 15:6f2798e45099 | 849 | rc = MQTTDeserialize_suback(&packetId, 1, &count, grantedQoS, c->readbuf, c->readBufSize); |
ampembeng | 15:6f2798e45099 | 850 | if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 851 | return rc; |
ampembeng | 15:6f2798e45099 | 852 | } |
ampembeng | 15:6f2798e45099 | 853 | } |
ampembeng | 15:6f2798e45099 | 854 | |
ampembeng | 15:6f2798e45099 | 855 | return SUCCESS; |
ampembeng | 15:6f2798e45099 | 856 | } |
ampembeng | 15:6f2798e45099 | 857 | |
ampembeng | 15:6f2798e45099 | 858 | MQTTReturnCode MQTTUnsubscribe(Client *c, const char *topicFilter) { |
ampembeng | 15:6f2798e45099 | 859 | MQTTReturnCode rc = FAILURE; |
ampembeng | 15:6f2798e45099 | 860 | Timer timer; |
ampembeng | 15:6f2798e45099 | 861 | MQTTString topic = MQTTString_initializer; |
ampembeng | 15:6f2798e45099 | 862 | uint32_t len = 0; |
ampembeng | 15:6f2798e45099 | 863 | uint32_t i = 0; |
ampembeng | 15:6f2798e45099 | 864 | uint16_t packet_id; |
ampembeng | 15:6f2798e45099 | 865 | |
ampembeng | 15:6f2798e45099 | 866 | if(NULL == c || NULL == topicFilter) { |
ampembeng | 15:6f2798e45099 | 867 | return MQTT_NULL_VALUE_ERROR; |
ampembeng | 15:6f2798e45099 | 868 | } |
ampembeng | 15:6f2798e45099 | 869 | |
ampembeng | 15:6f2798e45099 | 870 | topic.cstring = (char *)topicFilter; |
ampembeng | 15:6f2798e45099 | 871 | |
ampembeng | 15:6f2798e45099 | 872 | if(!c->isConnected) { |
ampembeng | 15:6f2798e45099 | 873 | return MQTT_NETWORK_DISCONNECTED_ERROR; |
ampembeng | 15:6f2798e45099 | 874 | } |
ampembeng | 15:6f2798e45099 | 875 | |
ampembeng | 15:6f2798e45099 | 876 | InitTimer(&timer); |
ampembeng | 15:6f2798e45099 | 877 | countdown_ms(&timer, c->commandTimeoutMs); |
ampembeng | 15:6f2798e45099 | 878 | |
ampembeng | 15:6f2798e45099 | 879 | rc = MQTTSerialize_unsubscribe(c->buf, c->bufSize, 0, getNextPacketId(c), 1, &topic, &len); |
ampembeng | 15:6f2798e45099 | 880 | if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 881 | return rc; |
ampembeng | 15:6f2798e45099 | 882 | } |
ampembeng | 15:6f2798e45099 | 883 | |
ampembeng | 15:6f2798e45099 | 884 | /* send the unsubscribe packet */ |
ampembeng | 15:6f2798e45099 | 885 | rc = sendPacket(c, len, &timer); |
ampembeng | 15:6f2798e45099 | 886 | if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 887 | return rc; |
ampembeng | 15:6f2798e45099 | 888 | } |
ampembeng | 15:6f2798e45099 | 889 | |
ampembeng | 15:6f2798e45099 | 890 | rc = waitfor(c, UNSUBACK, &timer); |
ampembeng | 15:6f2798e45099 | 891 | if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 892 | return rc; |
ampembeng | 15:6f2798e45099 | 893 | } |
ampembeng | 15:6f2798e45099 | 894 | |
ampembeng | 15:6f2798e45099 | 895 | rc = MQTTDeserialize_unsuback(&packet_id, c->readbuf, c->readBufSize); |
ampembeng | 15:6f2798e45099 | 896 | if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 897 | return rc; |
ampembeng | 15:6f2798e45099 | 898 | } |
ampembeng | 15:6f2798e45099 | 899 | |
ampembeng | 15:6f2798e45099 | 900 | /* Remove from message handler array */ |
ampembeng | 15:6f2798e45099 | 901 | for(i = 0; i < MAX_MESSAGE_HANDLERS; ++i) { |
ampembeng | 15:6f2798e45099 | 902 | if(c->messageHandlers[i].topicFilter != NULL && |
ampembeng | 15:6f2798e45099 | 903 | (strcmp(c->messageHandlers[i].topicFilter, topicFilter) == 0)) { |
ampembeng | 15:6f2798e45099 | 904 | c->messageHandlers[i].topicFilter = NULL; |
ampembeng | 15:6f2798e45099 | 905 | /* We don't want to break here, if the same topic is registered |
ampembeng | 15:6f2798e45099 | 906 | * with 2 callbacks. Unlikely scenario */ |
ampembeng | 15:6f2798e45099 | 907 | } |
ampembeng | 15:6f2798e45099 | 908 | } |
ampembeng | 15:6f2798e45099 | 909 | |
ampembeng | 15:6f2798e45099 | 910 | return SUCCESS; |
ampembeng | 15:6f2798e45099 | 911 | } |
ampembeng | 15:6f2798e45099 | 912 | |
ampembeng | 15:6f2798e45099 | 913 | MQTTReturnCode MQTTPublish(Client *c, const char *topicName, MQTTMessage *message) { |
ampembeng | 15:6f2798e45099 | 914 | Timer timer; |
ampembeng | 15:6f2798e45099 | 915 | MQTTString topic = MQTTString_initializer; |
ampembeng | 15:6f2798e45099 | 916 | uint32_t len = 0; |
ampembeng | 15:6f2798e45099 | 917 | uint8_t waitForAck = 0; |
ampembeng | 15:6f2798e45099 | 918 | uint8_t packetType = PUBACK; |
ampembeng | 15:6f2798e45099 | 919 | uint16_t packet_id; |
ampembeng | 15:6f2798e45099 | 920 | unsigned char dup, type; |
ampembeng | 15:6f2798e45099 | 921 | MQTTReturnCode rc = FAILURE; |
ampembeng | 15:6f2798e45099 | 922 | |
ampembeng | 15:6f2798e45099 | 923 | if(NULL == c || NULL == topicName || NULL == message) { |
ampembeng | 15:6f2798e45099 | 924 | return MQTT_NULL_VALUE_ERROR; |
ampembeng | 15:6f2798e45099 | 925 | } |
ampembeng | 15:6f2798e45099 | 926 | |
ampembeng | 15:6f2798e45099 | 927 | topic.cstring = (char *)topicName; |
ampembeng | 15:6f2798e45099 | 928 | |
ampembeng | 15:6f2798e45099 | 929 | if(!c->isConnected) { |
ampembeng | 15:6f2798e45099 | 930 | return MQTT_NETWORK_DISCONNECTED_ERROR; |
ampembeng | 15:6f2798e45099 | 931 | } |
ampembeng | 15:6f2798e45099 | 932 | |
ampembeng | 15:6f2798e45099 | 933 | InitTimer(&timer); |
ampembeng | 15:6f2798e45099 | 934 | countdown_ms(&timer, c->commandTimeoutMs); |
ampembeng | 15:6f2798e45099 | 935 | |
ampembeng | 15:6f2798e45099 | 936 | if(QOS1 == message->qos || QOS2 == message->qos) { |
ampembeng | 15:6f2798e45099 | 937 | message->id = getNextPacketId(c); |
ampembeng | 15:6f2798e45099 | 938 | waitForAck = 1; |
ampembeng | 15:6f2798e45099 | 939 | if(QOS2 == message->qos) { |
ampembeng | 15:6f2798e45099 | 940 | packetType = PUBCOMP; |
ampembeng | 15:6f2798e45099 | 941 | } |
ampembeng | 15:6f2798e45099 | 942 | } |
ampembeng | 15:6f2798e45099 | 943 | |
ampembeng | 15:6f2798e45099 | 944 | rc = MQTTSerialize_publish(c->buf, c->bufSize, 0, message->qos, message->retained, message->id, |
ampembeng | 15:6f2798e45099 | 945 | topic, (unsigned char*)message->payload, message->payloadlen, &len); |
ampembeng | 15:6f2798e45099 | 946 | if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 947 | return rc; |
ampembeng | 15:6f2798e45099 | 948 | } |
ampembeng | 15:6f2798e45099 | 949 | |
ampembeng | 15:6f2798e45099 | 950 | /* send the publish packet */ |
ampembeng | 15:6f2798e45099 | 951 | rc = sendPacket(c, len, &timer); |
ampembeng | 15:6f2798e45099 | 952 | if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 953 | return rc; |
ampembeng | 15:6f2798e45099 | 954 | } |
ampembeng | 15:6f2798e45099 | 955 | |
ampembeng | 15:6f2798e45099 | 956 | /* Wait for ack if QoS1 or QoS2 */ |
ampembeng | 15:6f2798e45099 | 957 | if(1 == waitForAck) { |
ampembeng | 15:6f2798e45099 | 958 | rc = waitfor(c, packetType, &timer); |
ampembeng | 15:6f2798e45099 | 959 | if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 960 | return rc; |
ampembeng | 15:6f2798e45099 | 961 | } |
ampembeng | 15:6f2798e45099 | 962 | |
ampembeng | 15:6f2798e45099 | 963 | rc = MQTTDeserialize_ack(&type, &dup, &packet_id, c->readbuf, c->readBufSize); |
ampembeng | 15:6f2798e45099 | 964 | if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 965 | return rc; |
ampembeng | 15:6f2798e45099 | 966 | } |
ampembeng | 15:6f2798e45099 | 967 | } |
ampembeng | 15:6f2798e45099 | 968 | |
ampembeng | 15:6f2798e45099 | 969 | return SUCCESS; |
ampembeng | 15:6f2798e45099 | 970 | } |
ampembeng | 15:6f2798e45099 | 971 | /** |
ampembeng | 15:6f2798e45099 | 972 | * This is for the case when the sendPacket Fails. |
ampembeng | 15:6f2798e45099 | 973 | */ |
ampembeng | 15:6f2798e45099 | 974 | static void MQTTForceDisconnect(Client *c){ |
ampembeng | 15:6f2798e45099 | 975 | c->isConnected = 0; |
ampembeng | 15:6f2798e45099 | 976 | c->networkStack.disconnect(&(c->networkStack)); |
ampembeng | 15:6f2798e45099 | 977 | c->networkStack.destroy(&(c->networkStack)); |
ampembeng | 15:6f2798e45099 | 978 | } |
ampembeng | 15:6f2798e45099 | 979 | |
ampembeng | 15:6f2798e45099 | 980 | MQTTReturnCode MQTTDisconnect(Client *c) { |
ampembeng | 15:6f2798e45099 | 981 | MQTTReturnCode rc = FAILURE; |
ampembeng | 15:6f2798e45099 | 982 | /* We might wait for incomplete incoming publishes to complete */ |
ampembeng | 15:6f2798e45099 | 983 | Timer timer; |
ampembeng | 15:6f2798e45099 | 984 | uint32_t serialized_len = 0; |
ampembeng | 15:6f2798e45099 | 985 | |
ampembeng | 15:6f2798e45099 | 986 | if(NULL == c) { |
ampembeng | 15:6f2798e45099 | 987 | return MQTT_NULL_VALUE_ERROR; |
ampembeng | 15:6f2798e45099 | 988 | } |
ampembeng | 15:6f2798e45099 | 989 | |
ampembeng | 15:6f2798e45099 | 990 | if(0 == c->isConnected) { |
ampembeng | 15:6f2798e45099 | 991 | /* Network is already disconnected. Do nothing */ |
ampembeng | 15:6f2798e45099 | 992 | return MQTT_NETWORK_DISCONNECTED_ERROR; |
ampembeng | 15:6f2798e45099 | 993 | } |
ampembeng | 15:6f2798e45099 | 994 | |
ampembeng | 15:6f2798e45099 | 995 | rc = MQTTSerialize_disconnect(c->buf, c->bufSize, &serialized_len); |
ampembeng | 15:6f2798e45099 | 996 | if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 997 | return rc; |
ampembeng | 15:6f2798e45099 | 998 | } |
ampembeng | 15:6f2798e45099 | 999 | |
ampembeng | 15:6f2798e45099 | 1000 | InitTimer(&timer); |
ampembeng | 15:6f2798e45099 | 1001 | countdown_ms(&timer, c->commandTimeoutMs); |
ampembeng | 15:6f2798e45099 | 1002 | |
ampembeng | 15:6f2798e45099 | 1003 | /* send the disconnect packet */ |
ampembeng | 15:6f2798e45099 | 1004 | if(serialized_len > 0) { |
ampembeng | 15:6f2798e45099 | 1005 | rc = sendPacket(c, serialized_len, &timer); |
ampembeng | 15:6f2798e45099 | 1006 | if(SUCCESS != rc) { |
ampembeng | 15:6f2798e45099 | 1007 | return rc; |
ampembeng | 15:6f2798e45099 | 1008 | } |
ampembeng | 15:6f2798e45099 | 1009 | } |
ampembeng | 15:6f2798e45099 | 1010 | |
ampembeng | 15:6f2798e45099 | 1011 | /* Clean network stack */ |
ampembeng | 15:6f2798e45099 | 1012 | c->networkStack.disconnect(&(c->networkStack)); |
ampembeng | 15:6f2798e45099 | 1013 | rc = (MQTTReturnCode)c->networkStack.destroy(&(c->networkStack)); |
ampembeng | 15:6f2798e45099 | 1014 | if(0 != rc) { |
ampembeng | 15:6f2798e45099 | 1015 | /* TLS Destroy failed, return error */ |
ampembeng | 15:6f2798e45099 | 1016 | return FAILURE; |
ampembeng | 15:6f2798e45099 | 1017 | } |
ampembeng | 15:6f2798e45099 | 1018 | |
ampembeng | 15:6f2798e45099 | 1019 | c->isConnected = 0; |
ampembeng | 15:6f2798e45099 | 1020 | |
ampembeng | 15:6f2798e45099 | 1021 | /* Always set to 1 whenever disconnect is called. Keepalive resets to 0 */ |
ampembeng | 15:6f2798e45099 | 1022 | c->wasManuallyDisconnected = 1; |
ampembeng | 15:6f2798e45099 | 1023 | |
ampembeng | 15:6f2798e45099 | 1024 | return SUCCESS; |
ampembeng | 15:6f2798e45099 | 1025 | } |
ampembeng | 15:6f2798e45099 | 1026 | |
ampembeng | 15:6f2798e45099 | 1027 | uint8_t MQTTIsConnected(Client *c) { |
ampembeng | 15:6f2798e45099 | 1028 | if(NULL == c) { |
ampembeng | 15:6f2798e45099 | 1029 | return 0; |
ampembeng | 15:6f2798e45099 | 1030 | } |
ampembeng | 15:6f2798e45099 | 1031 | |
ampembeng | 15:6f2798e45099 | 1032 | return c->isConnected; |
ampembeng | 15:6f2798e45099 | 1033 | } |
ampembeng | 15:6f2798e45099 | 1034 | |
ampembeng | 15:6f2798e45099 | 1035 | uint8_t MQTTIsAutoReconnectEnabled(Client *c) { |
ampembeng | 15:6f2798e45099 | 1036 | if(NULL == c) { |
ampembeng | 15:6f2798e45099 | 1037 | return 0; |
ampembeng | 15:6f2798e45099 | 1038 | } |
ampembeng | 15:6f2798e45099 | 1039 | |
ampembeng | 15:6f2798e45099 | 1040 | return c->isAutoReconnectEnabled; |
ampembeng | 15:6f2798e45099 | 1041 | } |
ampembeng | 15:6f2798e45099 | 1042 | |
ampembeng | 15:6f2798e45099 | 1043 | MQTTReturnCode setDisconnectHandler(Client *c, disconnectHandler_t disconnectHandler) { |
ampembeng | 15:6f2798e45099 | 1044 | if(NULL == c || NULL == disconnectHandler) { |
ampembeng | 15:6f2798e45099 | 1045 | return MQTT_NULL_VALUE_ERROR; |
ampembeng | 15:6f2798e45099 | 1046 | } |
ampembeng | 15:6f2798e45099 | 1047 | |
ampembeng | 15:6f2798e45099 | 1048 | c->disconnectHandler = disconnectHandler; |
ampembeng | 15:6f2798e45099 | 1049 | return SUCCESS; |
ampembeng | 15:6f2798e45099 | 1050 | } |
ampembeng | 15:6f2798e45099 | 1051 | |
ampembeng | 15:6f2798e45099 | 1052 | MQTTReturnCode setAutoReconnectEnabled(Client *c, uint8_t value) { |
ampembeng | 15:6f2798e45099 | 1053 | if(NULL == c) { |
ampembeng | 15:6f2798e45099 | 1054 | return FAILURE; |
ampembeng | 15:6f2798e45099 | 1055 | } |
ampembeng | 15:6f2798e45099 | 1056 | c->isAutoReconnectEnabled = value; |
ampembeng | 15:6f2798e45099 | 1057 | return SUCCESS; |
ampembeng | 15:6f2798e45099 | 1058 | } |
ampembeng | 15:6f2798e45099 | 1059 | |
ampembeng | 15:6f2798e45099 | 1060 | uint32_t MQTTGetNetworkDisconnectedCount(Client *c) { |
ampembeng | 15:6f2798e45099 | 1061 | return c->counterNetworkDisconnected; |
ampembeng | 15:6f2798e45099 | 1062 | } |
ampembeng | 15:6f2798e45099 | 1063 | |
ampembeng | 15:6f2798e45099 | 1064 | void MQTTResetNetworkDisconnectedCount(Client *c) { |
ampembeng | 15:6f2798e45099 | 1065 | c->counterNetworkDisconnected = 0; |
ampembeng | 15:6f2798e45099 | 1066 | } |
ampembeng | 15:6f2798e45099 | 1067 | |
ampembeng | 15:6f2798e45099 | 1068 |