Changes to enabled on-line compiler

Committer:
JMF
Date:
Wed May 30 20:59:51 2018 +0000
Revision:
0:082731ede69f
Initial commit

Who changed what in which revision?

UserRevisionLine numberNew contents of line
JMF 0:082731ede69f 1 /*
JMF 0:082731ede69f 2 * Copyright 2015-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
JMF 0:082731ede69f 3 *
JMF 0:082731ede69f 4 * Licensed under the Apache License, Version 2.0 (the "License").
JMF 0:082731ede69f 5 * You may not use this file except in compliance with the License.
JMF 0:082731ede69f 6 * A copy of the License is located at
JMF 0:082731ede69f 7 *
JMF 0:082731ede69f 8 * http://aws.amazon.com/apache2.0
JMF 0:082731ede69f 9 *
JMF 0:082731ede69f 10 * or in the "license" file accompanying this file. This file is distributed
JMF 0:082731ede69f 11 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
JMF 0:082731ede69f 12 * express or implied. See the License for the specific language governing
JMF 0:082731ede69f 13 * permissions and limitations under the License.
JMF 0:082731ede69f 14 */
JMF 0:082731ede69f 15
JMF 0:082731ede69f 16 // Based on Eclipse Paho.
JMF 0:082731ede69f 17 /*******************************************************************************
JMF 0:082731ede69f 18 * Copyright (c) 2014 IBM Corp.
JMF 0:082731ede69f 19 *
JMF 0:082731ede69f 20 * All rights reserved. This program and the accompanying materials
JMF 0:082731ede69f 21 * are made available under the terms of the Eclipse Public License v1.0
JMF 0:082731ede69f 22 * and Eclipse Distribution License v1.0 which accompany this distribution.
JMF 0:082731ede69f 23 *
JMF 0:082731ede69f 24 * The Eclipse Public License is available at
JMF 0:082731ede69f 25 * http://www.eclipse.org/legal/epl-v10.html
JMF 0:082731ede69f 26 * and the Eclipse Distribution License is available at
JMF 0:082731ede69f 27 * http://www.eclipse.org/org/documents/edl-v10.php.
JMF 0:082731ede69f 28 *
JMF 0:082731ede69f 29 * Contributors:
JMF 0:082731ede69f 30 * Ian Craggs - initial API and implementation and/or initial documentation
JMF 0:082731ede69f 31 * Sergio R. Caprile - non-blocking packet read functions for stream transport
JMF 0:082731ede69f 32 *******************************************************************************/
JMF 0:082731ede69f 33
JMF 0:082731ede69f 34 /**
JMF 0:082731ede69f 35 * @file aws_iot_mqtt_client_common_internal.c
JMF 0:082731ede69f 36 * @brief MQTT client internal API definitions
JMF 0:082731ede69f 37 */
JMF 0:082731ede69f 38
JMF 0:082731ede69f 39 #ifdef __cplusplus
JMF 0:082731ede69f 40 extern "C" {
JMF 0:082731ede69f 41 #endif
JMF 0:082731ede69f 42
JMF 0:082731ede69f 43 #include <aws_iot_mqtt_client.h>
JMF 0:082731ede69f 44 #include "aws_iot_mqtt_client_common_internal.h"
JMF 0:082731ede69f 45
JMF 0:082731ede69f 46 /* Max length of packet header */
JMF 0:082731ede69f 47 #define MAX_NO_OF_REMAINING_LENGTH_BYTES 4
JMF 0:082731ede69f 48
JMF 0:082731ede69f 49 /**
JMF 0:082731ede69f 50 * Encodes the message length according to the MQTT algorithm
JMF 0:082731ede69f 51 * @param buf the buffer into which the encoded data is written
JMF 0:082731ede69f 52 * @param length the length to be encoded
JMF 0:082731ede69f 53 * @return the number of bytes written to buffer
JMF 0:082731ede69f 54 */
JMF 0:082731ede69f 55 size_t aws_iot_mqtt_internal_write_len_to_buffer(unsigned char *buf, uint32_t length) {
JMF 0:082731ede69f 56 size_t outLen = 0;
JMF 0:082731ede69f 57 unsigned char encodedByte;
JMF 0:082731ede69f 58
JMF 0:082731ede69f 59 FUNC_ENTRY;
JMF 0:082731ede69f 60 do {
JMF 0:082731ede69f 61 encodedByte = (unsigned char) (length % 128);
JMF 0:082731ede69f 62 length /= 128;
JMF 0:082731ede69f 63 /* if there are more digits to encode, set the top bit of this digit */
JMF 0:082731ede69f 64 if(length > 0) {
JMF 0:082731ede69f 65 encodedByte |= 0x80;
JMF 0:082731ede69f 66 }
JMF 0:082731ede69f 67 buf[outLen++] = encodedByte;
JMF 0:082731ede69f 68 } while(length > 0);
JMF 0:082731ede69f 69
JMF 0:082731ede69f 70 FUNC_EXIT_RC(outLen);
JMF 0:082731ede69f 71 }
JMF 0:082731ede69f 72
JMF 0:082731ede69f 73 /**
JMF 0:082731ede69f 74 * Decodes the message length according to the MQTT algorithm
JMF 0:082731ede69f 75 * @param the buffer containing the message
JMF 0:082731ede69f 76 * @param value the decoded length returned
JMF 0:082731ede69f 77 * @return the number of bytes read from the socket
JMF 0:082731ede69f 78 */
JMF 0:082731ede69f 79 IoT_Error_t aws_iot_mqtt_internal_decode_remaining_length_from_buffer(unsigned char *buf, uint32_t *decodedLen,
JMF 0:082731ede69f 80 uint32_t *readBytesLen) {
JMF 0:082731ede69f 81 unsigned char encodedByte;
JMF 0:082731ede69f 82 uint32_t multiplier, len;
JMF 0:082731ede69f 83 FUNC_ENTRY;
JMF 0:082731ede69f 84
JMF 0:082731ede69f 85 multiplier = 1;
JMF 0:082731ede69f 86 len = 0;
JMF 0:082731ede69f 87 *decodedLen = 0;
JMF 0:082731ede69f 88
JMF 0:082731ede69f 89 do {
JMF 0:082731ede69f 90 if(++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) {
JMF 0:082731ede69f 91 /* bad data */
JMF 0:082731ede69f 92 FUNC_EXIT_RC(MQTT_DECODE_REMAINING_LENGTH_ERROR);
JMF 0:082731ede69f 93 }
JMF 0:082731ede69f 94 encodedByte = *buf;
JMF 0:082731ede69f 95 buf++;
JMF 0:082731ede69f 96 *decodedLen += (encodedByte & 127) * multiplier;
JMF 0:082731ede69f 97 multiplier *= 128;
JMF 0:082731ede69f 98 } while((encodedByte & 128) != 0);
JMF 0:082731ede69f 99
JMF 0:082731ede69f 100 *readBytesLen = len;
JMF 0:082731ede69f 101
JMF 0:082731ede69f 102 FUNC_EXIT_RC(AWS_SUCCESS);
JMF 0:082731ede69f 103 }
JMF 0:082731ede69f 104
JMF 0:082731ede69f 105 uint32_t aws_iot_mqtt_internal_get_final_packet_length_from_remaining_length(uint32_t rem_len) {
JMF 0:082731ede69f 106 rem_len += 1; /* header byte */
JMF 0:082731ede69f 107 /* now remaining_length field (MQTT 3.1.1 - 2.2.3)*/
JMF 0:082731ede69f 108 if(rem_len < 128) {
JMF 0:082731ede69f 109 rem_len += 1;
JMF 0:082731ede69f 110 } else if(rem_len < 16384) {
JMF 0:082731ede69f 111 rem_len += 2;
JMF 0:082731ede69f 112 } else if(rem_len < 2097152) {
JMF 0:082731ede69f 113 rem_len += 3;
JMF 0:082731ede69f 114 } else {
JMF 0:082731ede69f 115 rem_len += 4;
JMF 0:082731ede69f 116 }
JMF 0:082731ede69f 117 return rem_len;
JMF 0:082731ede69f 118 }
JMF 0:082731ede69f 119
JMF 0:082731ede69f 120 /**
JMF 0:082731ede69f 121 * Calculates uint16 packet id from two bytes read from the input buffer
JMF 0:082731ede69f 122 * Checks Endianness at runtime
JMF 0:082731ede69f 123 *
JMF 0:082731ede69f 124 * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
JMF 0:082731ede69f 125 * @return the value calculated
JMF 0:082731ede69f 126 */
JMF 0:082731ede69f 127 uint16_t aws_iot_mqtt_internal_read_uint16_t(unsigned char **pptr) {
JMF 0:082731ede69f 128 unsigned char *ptr = *pptr;
JMF 0:082731ede69f 129 uint16_t len = 0;
JMF 0:082731ede69f 130 uint8_t firstByte = (uint8_t) (*ptr);
JMF 0:082731ede69f 131 uint8_t secondByte = (uint8_t) (*(ptr + 1));
JMF 0:082731ede69f 132 len = (uint16_t) (secondByte + (256 * firstByte));
JMF 0:082731ede69f 133
JMF 0:082731ede69f 134 *pptr += 2;
JMF 0:082731ede69f 135 return len;
JMF 0:082731ede69f 136 }
JMF 0:082731ede69f 137
JMF 0:082731ede69f 138 /**
JMF 0:082731ede69f 139 * Writes an integer as 2 bytes to an output buffer.
JMF 0:082731ede69f 140 * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
JMF 0:082731ede69f 141 * @param anInt the integer to write
JMF 0:082731ede69f 142 */
JMF 0:082731ede69f 143 void aws_iot_mqtt_internal_write_uint_16(unsigned char **pptr, uint16_t anInt) {
JMF 0:082731ede69f 144 **pptr = (unsigned char) (anInt / 256);
JMF 0:082731ede69f 145 (*pptr)++;
JMF 0:082731ede69f 146 **pptr = (unsigned char) (anInt % 256);
JMF 0:082731ede69f 147 (*pptr)++;
JMF 0:082731ede69f 148 }
JMF 0:082731ede69f 149
JMF 0:082731ede69f 150 /**
JMF 0:082731ede69f 151 * Reads one character from the input buffer.
JMF 0:082731ede69f 152 * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
JMF 0:082731ede69f 153 * @return the character read
JMF 0:082731ede69f 154 */
JMF 0:082731ede69f 155 unsigned char aws_iot_mqtt_internal_read_char(unsigned char **pptr) {
JMF 0:082731ede69f 156 unsigned char c = **pptr;
JMF 0:082731ede69f 157 (*pptr)++;
JMF 0:082731ede69f 158 return c;
JMF 0:082731ede69f 159 }
JMF 0:082731ede69f 160
JMF 0:082731ede69f 161 /**
JMF 0:082731ede69f 162 * Writes one character to an output buffer.
JMF 0:082731ede69f 163 * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
JMF 0:082731ede69f 164 * @param c the character to write
JMF 0:082731ede69f 165 */
JMF 0:082731ede69f 166 void aws_iot_mqtt_internal_write_char(unsigned char **pptr, unsigned char c) {
JMF 0:082731ede69f 167 **pptr = c;
JMF 0:082731ede69f 168 (*pptr)++;
JMF 0:082731ede69f 169 }
JMF 0:082731ede69f 170
JMF 0:082731ede69f 171 void aws_iot_mqtt_internal_write_utf8_string(unsigned char **pptr, const char *string, uint16_t stringLen) {
JMF 0:082731ede69f 172 /* Nothing that calls this function will have a stringLen with a size larger than 2 bytes (MQTT 3.1.1 - 1.5.3) */
JMF 0:082731ede69f 173 aws_iot_mqtt_internal_write_uint_16(pptr, stringLen);
JMF 0:082731ede69f 174 if(stringLen > 0) {
JMF 0:082731ede69f 175 memcpy(*pptr, string, stringLen);
JMF 0:082731ede69f 176 *pptr += stringLen;
JMF 0:082731ede69f 177 }
JMF 0:082731ede69f 178 }
JMF 0:082731ede69f 179
JMF 0:082731ede69f 180 /**
JMF 0:082731ede69f 181 * Initialize the MQTTHeader structure. Used to ensure that Header bits are
JMF 0:082731ede69f 182 * always initialized using the proper mappings. No Endianness issues here since
JMF 0:082731ede69f 183 * the individual fields are all less than a byte. Also generates no warnings since
JMF 0:082731ede69f 184 * all fields are initialized using hex constants
JMF 0:082731ede69f 185 */
JMF 0:082731ede69f 186 IoT_Error_t aws_iot_mqtt_internal_init_header(MQTTHeader *pHeader, MessageTypes message_type,
JMF 0:082731ede69f 187 QoS qos, uint8_t dup, uint8_t retained) {
JMF 0:082731ede69f 188 FUNC_ENTRY;
JMF 0:082731ede69f 189
JMF 0:082731ede69f 190 if(NULL == pHeader) {
JMF 0:082731ede69f 191 FUNC_EXIT_RC(NULL_VALUE_ERROR);
JMF 0:082731ede69f 192 }
JMF 0:082731ede69f 193
JMF 0:082731ede69f 194 /* Set all bits to zero */
JMF 0:082731ede69f 195 pHeader->byte = 0;
JMF 0:082731ede69f 196 uint8_t type = 0;
JMF 0:082731ede69f 197 switch(message_type) {
JMF 0:082731ede69f 198 case UNKNOWN:
JMF 0:082731ede69f 199 /* Should never happen */
JMF 0:082731ede69f 200 return FAILURE;
JMF 0:082731ede69f 201 case CONNECT:
JMF 0:082731ede69f 202 type = 0x01;
JMF 0:082731ede69f 203 break;
JMF 0:082731ede69f 204 case CONNACK:
JMF 0:082731ede69f 205 type = 0x02;
JMF 0:082731ede69f 206 break;
JMF 0:082731ede69f 207 case PUBLISH:
JMF 0:082731ede69f 208 type = 0x03;
JMF 0:082731ede69f 209 break;
JMF 0:082731ede69f 210 case PUBACK:
JMF 0:082731ede69f 211 type = 0x04;
JMF 0:082731ede69f 212 break;
JMF 0:082731ede69f 213 case PUBREC:
JMF 0:082731ede69f 214 type = 0x05;
JMF 0:082731ede69f 215 break;
JMF 0:082731ede69f 216 case PUBREL:
JMF 0:082731ede69f 217 type = 0x06;
JMF 0:082731ede69f 218 break;
JMF 0:082731ede69f 219 case PUBCOMP:
JMF 0:082731ede69f 220 type = 0x07;
JMF 0:082731ede69f 221 break;
JMF 0:082731ede69f 222 case SUBSCRIBE:
JMF 0:082731ede69f 223 type = 0x08;
JMF 0:082731ede69f 224 break;
JMF 0:082731ede69f 225 case SUBACK:
JMF 0:082731ede69f 226 type = 0x09;
JMF 0:082731ede69f 227 break;
JMF 0:082731ede69f 228 case UNSUBSCRIBE:
JMF 0:082731ede69f 229 type = 0x0A;
JMF 0:082731ede69f 230 break;
JMF 0:082731ede69f 231 case UNSUBACK:
JMF 0:082731ede69f 232 type = 0x0B;
JMF 0:082731ede69f 233 break;
JMF 0:082731ede69f 234 case PINGREQ:
JMF 0:082731ede69f 235 type = 0x0C;
JMF 0:082731ede69f 236 break;
JMF 0:082731ede69f 237 case PINGRESP:
JMF 0:082731ede69f 238 type = 0x0D;
JMF 0:082731ede69f 239 break;
JMF 0:082731ede69f 240 case DISCONNECT:
JMF 0:082731ede69f 241 type = 0x0E;
JMF 0:082731ede69f 242 break;
JMF 0:082731ede69f 243 default:
JMF 0:082731ede69f 244 /* Should never happen */
JMF 0:082731ede69f 245 FUNC_EXIT_RC(FAILURE);
JMF 0:082731ede69f 246 }
JMF 0:082731ede69f 247
JMF 0:082731ede69f 248 pHeader->byte = type << 4;
JMF 0:082731ede69f 249 pHeader->byte |= dup << 3;
JMF 0:082731ede69f 250
JMF 0:082731ede69f 251 switch(qos) {
JMF 0:082731ede69f 252 case QOS0:
JMF 0:082731ede69f 253 break;
JMF 0:082731ede69f 254 case QOS1:
JMF 0:082731ede69f 255 pHeader->byte |= 1 << 1;
JMF 0:082731ede69f 256 break;
JMF 0:082731ede69f 257 default:
JMF 0:082731ede69f 258 /* Using QOS0 as default */
JMF 0:082731ede69f 259 break;
JMF 0:082731ede69f 260 }
JMF 0:082731ede69f 261
JMF 0:082731ede69f 262 pHeader->byte |= (1 == retained) ? 0x01 : 0x00;
JMF 0:082731ede69f 263
JMF 0:082731ede69f 264 FUNC_EXIT_RC(AWS_SUCCESS);
JMF 0:082731ede69f 265 }
JMF 0:082731ede69f 266
JMF 0:082731ede69f 267 IoT_Error_t aws_iot_mqtt_internal_send_packet(AWS_IoT_Client *pClient, size_t length, awsTimer *pTimer) {
JMF 0:082731ede69f 268
JMF 0:082731ede69f 269 size_t sentLen, sent;
JMF 0:082731ede69f 270 IoT_Error_t rc;
JMF 0:082731ede69f 271
JMF 0:082731ede69f 272 FUNC_ENTRY;
JMF 0:082731ede69f 273
JMF 0:082731ede69f 274 if(NULL == pClient || NULL == pTimer) {
JMF 0:082731ede69f 275 FUNC_EXIT_RC(NULL_VALUE_ERROR);
JMF 0:082731ede69f 276 }
JMF 0:082731ede69f 277
JMF 0:082731ede69f 278 if(length >= pClient->clientData.writeBufSize) {
JMF 0:082731ede69f 279 FUNC_EXIT_RC(MQTT_TX_BUFFER_TOO_SHORT_ERROR);
JMF 0:082731ede69f 280 }
JMF 0:082731ede69f 281
JMF 0:082731ede69f 282 #ifdef _ENABLE_THREAD_SUPPORT_
JMF 0:082731ede69f 283 rc = aws_iot_mqtt_client_lock_mutex(pClient, &(pClient->clientData.tls_write_mutex));
JMF 0:082731ede69f 284 if(AWS_SUCCESS != rc) {
JMF 0:082731ede69f 285 FUNC_EXIT_RC(rc);
JMF 0:082731ede69f 286 }
JMF 0:082731ede69f 287 #endif
JMF 0:082731ede69f 288
JMF 0:082731ede69f 289 sentLen = 0;
JMF 0:082731ede69f 290 sent = 0;
JMF 0:082731ede69f 291
JMF 0:082731ede69f 292 while(sent < length && !has_timer_expired(pTimer)) {
JMF 0:082731ede69f 293 rc = pClient->networkStack.write(&(pClient->networkStack),
JMF 0:082731ede69f 294 &pClient->clientData.writeBuf[sent],
JMF 0:082731ede69f 295 (length - sent),
JMF 0:082731ede69f 296 pTimer,
JMF 0:082731ede69f 297 &sentLen);
JMF 0:082731ede69f 298 if(AWS_SUCCESS != rc) {
JMF 0:082731ede69f 299 /* there was an error writing the data */
JMF 0:082731ede69f 300 break;
JMF 0:082731ede69f 301 }
JMF 0:082731ede69f 302 sent += sentLen;
JMF 0:082731ede69f 303 }
JMF 0:082731ede69f 304
JMF 0:082731ede69f 305 #ifdef _ENABLE_THREAD_SUPPORT_
JMF 0:082731ede69f 306 rc = aws_iot_mqtt_client_unlock_mutex(pClient, &(pClient->clientData.tls_write_mutex));
JMF 0:082731ede69f 307 if(AWS_SUCCESS != rc) {
JMF 0:082731ede69f 308 FUNC_EXIT_RC(rc);
JMF 0:082731ede69f 309 }
JMF 0:082731ede69f 310 #endif
JMF 0:082731ede69f 311
JMF 0:082731ede69f 312 if(sent == length) {
JMF 0:082731ede69f 313 /* record the fact that we have successfully sent the packet */
JMF 0:082731ede69f 314 //countdown_sec(&c->pingTimer, c->clientData.keepAliveInterval);
JMF 0:082731ede69f 315 FUNC_EXIT_RC(AWS_SUCCESS);
JMF 0:082731ede69f 316 }
JMF 0:082731ede69f 317
JMF 0:082731ede69f 318 FUNC_EXIT_RC(rc)
JMF 0:082731ede69f 319 }
JMF 0:082731ede69f 320
JMF 0:082731ede69f 321 static IoT_Error_t _aws_iot_mqtt_internal_decode_packet_remaining_len(AWS_IoT_Client *pClient,
JMF 0:082731ede69f 322 size_t *rem_len, awsTimer *pTimer) {
JMF 0:082731ede69f 323 unsigned char encodedByte;
JMF 0:082731ede69f 324 size_t multiplier, len;
JMF 0:082731ede69f 325 IoT_Error_t rc;
JMF 0:082731ede69f 326
JMF 0:082731ede69f 327 FUNC_ENTRY;
JMF 0:082731ede69f 328
JMF 0:082731ede69f 329 multiplier = 1;
JMF 0:082731ede69f 330 len = 0;
JMF 0:082731ede69f 331 *rem_len = 0;
JMF 0:082731ede69f 332
JMF 0:082731ede69f 333 do {
JMF 0:082731ede69f 334 if(++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) {
JMF 0:082731ede69f 335 /* bad data */
JMF 0:082731ede69f 336 FUNC_EXIT_RC(MQTT_DECODE_REMAINING_LENGTH_ERROR);
JMF 0:082731ede69f 337 }
JMF 0:082731ede69f 338
JMF 0:082731ede69f 339 rc = pClient->networkStack.read(&(pClient->networkStack), &encodedByte, 1, pTimer, &len);
JMF 0:082731ede69f 340 if(AWS_SUCCESS != rc) {
JMF 0:082731ede69f 341 FUNC_EXIT_RC(rc);
JMF 0:082731ede69f 342 }
JMF 0:082731ede69f 343
JMF 0:082731ede69f 344 *rem_len += ((encodedByte & 127) * multiplier);
JMF 0:082731ede69f 345 multiplier *= 128;
JMF 0:082731ede69f 346 } while((encodedByte & 128) != 0);
JMF 0:082731ede69f 347
JMF 0:082731ede69f 348 FUNC_EXIT_RC(rc);
JMF 0:082731ede69f 349 }
JMF 0:082731ede69f 350
JMF 0:082731ede69f 351 static IoT_Error_t _aws_iot_mqtt_internal_read_packet(AWS_IoT_Client *pClient, awsTimer *pTimer, uint8_t *pPacketType) {
JMF 0:082731ede69f 352 size_t len, rem_len, total_bytes_read, bytes_to_be_read, read_len;
JMF 0:082731ede69f 353 IoT_Error_t rc;
JMF 0:082731ede69f 354 MQTTHeader header = {0};
JMF 0:082731ede69f 355 awsTimer packetTimer;
JMF 0:082731ede69f 356 init_timer(&packetTimer);
JMF 0:082731ede69f 357 countdown_ms(&packetTimer, pClient->clientData.packetTimeoutMs);
JMF 0:082731ede69f 358
JMF 0:082731ede69f 359 rem_len = 0;
JMF 0:082731ede69f 360 total_bytes_read = 0;
JMF 0:082731ede69f 361 bytes_to_be_read = 0;
JMF 0:082731ede69f 362 read_len = 0;
JMF 0:082731ede69f 363
JMF 0:082731ede69f 364 rc = pClient->networkStack.read(&(pClient->networkStack), pClient->clientData.readBuf, 1, pTimer, &read_len);
JMF 0:082731ede69f 365 //printf("JMF:%s:%d read %d from networkStack.read\n",__FILE__,__LINE__,rc);
JMF 0:082731ede69f 366
JMF 0:082731ede69f 367 /* 1. read the header byte. This has the packet type in it */
JMF 0:082731ede69f 368 if(NETWORK_SSL_NOTHING_TO_READ == rc) {
JMF 0:082731ede69f 369 return MQTT_NOTHING_TO_READ;
JMF 0:082731ede69f 370 } else if(AWS_SUCCESS != rc) {
JMF 0:082731ede69f 371 return rc;
JMF 0:082731ede69f 372 }
JMF 0:082731ede69f 373
JMF 0:082731ede69f 374 len = 1;
JMF 0:082731ede69f 375
JMF 0:082731ede69f 376 /* Use the constant packet receive timeout, instead of the variable (remaining) pTimer time, to
JMF 0:082731ede69f 377 * determine packet receiving timeout. This is done so we don't prematurely time out packet receiving
JMF 0:082731ede69f 378 * if the remaining time in pTimer is too short.
JMF 0:082731ede69f 379 */
JMF 0:082731ede69f 380 pTimer = &packetTimer;
JMF 0:082731ede69f 381
JMF 0:082731ede69f 382 /* 2. read the remaining length. This is variable in itself */
JMF 0:082731ede69f 383 rc = _aws_iot_mqtt_internal_decode_packet_remaining_len(pClient, &rem_len, pTimer);
JMF 0:082731ede69f 384 if(AWS_SUCCESS != rc) {
JMF 0:082731ede69f 385 return rc;
JMF 0:082731ede69f 386 }
JMF 0:082731ede69f 387
JMF 0:082731ede69f 388 /* if the buffer is too short then the message will be dropped silently */
JMF 0:082731ede69f 389 if(rem_len >= pClient->clientData.readBufSize) {
JMF 0:082731ede69f 390 bytes_to_be_read = pClient->clientData.readBufSize;
JMF 0:082731ede69f 391 do {
JMF 0:082731ede69f 392 rc = pClient->networkStack.read(&(pClient->networkStack), pClient->clientData.readBuf, bytes_to_be_read,
JMF 0:082731ede69f 393 pTimer, &read_len);
JMF 0:082731ede69f 394 if(AWS_SUCCESS == rc) {
JMF 0:082731ede69f 395 total_bytes_read += read_len;
JMF 0:082731ede69f 396 if((rem_len - total_bytes_read) >= pClient->clientData.readBufSize) {
JMF 0:082731ede69f 397 bytes_to_be_read = pClient->clientData.readBufSize;
JMF 0:082731ede69f 398 } else {
JMF 0:082731ede69f 399 bytes_to_be_read = rem_len - total_bytes_read;
JMF 0:082731ede69f 400 }
JMF 0:082731ede69f 401 }
JMF 0:082731ede69f 402 } while(total_bytes_read < rem_len && AWS_SUCCESS == rc);
JMF 0:082731ede69f 403 return MQTT_RX_BUFFER_TOO_SHORT_ERROR;
JMF 0:082731ede69f 404 }
JMF 0:082731ede69f 405
JMF 0:082731ede69f 406 /* put the original remaining length into the read buffer */
JMF 0:082731ede69f 407 len += aws_iot_mqtt_internal_write_len_to_buffer(pClient->clientData.readBuf + 1, (uint32_t) rem_len);
JMF 0:082731ede69f 408
JMF 0:082731ede69f 409 /* 3. read the rest of the buffer using a callback to supply the rest of the data */
JMF 0:082731ede69f 410 if(rem_len > 0) {
JMF 0:082731ede69f 411 rc = pClient->networkStack.read(&(pClient->networkStack), pClient->clientData.readBuf + len, rem_len, pTimer,
JMF 0:082731ede69f 412 &read_len);
JMF 0:082731ede69f 413 if(AWS_SUCCESS != rc || read_len != rem_len) {
JMF 0:082731ede69f 414 return FAILURE;
JMF 0:082731ede69f 415 }
JMF 0:082731ede69f 416 }
JMF 0:082731ede69f 417
JMF 0:082731ede69f 418 header.byte = pClient->clientData.readBuf[0];
JMF 0:082731ede69f 419 *pPacketType = MQTT_HEADER_FIELD_TYPE(header.byte);
JMF 0:082731ede69f 420
JMF 0:082731ede69f 421 FUNC_EXIT_RC(rc);
JMF 0:082731ede69f 422 }
JMF 0:082731ede69f 423
JMF 0:082731ede69f 424 // assume topic filter and name is in correct format
JMF 0:082731ede69f 425 // # can only be at end
JMF 0:082731ede69f 426 // + and # can only be next to separator
JMF 0:082731ede69f 427 static bool _aws_iot_mqtt_internal_is_topic_matched(char *pTopicFilter, char *pTopicName, uint16_t topicNameLen) {
JMF 0:082731ede69f 428
JMF 0:082731ede69f 429 char *curf, *curn, *curn_end;
JMF 0:082731ede69f 430
JMF 0:082731ede69f 431 if(NULL == pTopicFilter || NULL == pTopicName) {
JMF 0:082731ede69f 432 return false;
JMF 0:082731ede69f 433 }
JMF 0:082731ede69f 434
JMF 0:082731ede69f 435 curf = pTopicFilter;
JMF 0:082731ede69f 436 curn = pTopicName;
JMF 0:082731ede69f 437 curn_end = curn + topicNameLen;
JMF 0:082731ede69f 438
JMF 0:082731ede69f 439 while(*curf && (curn < curn_end)) {
JMF 0:082731ede69f 440 if(*curn == '/' && *curf != '/') {
JMF 0:082731ede69f 441 break;
JMF 0:082731ede69f 442 }
JMF 0:082731ede69f 443 if(*curf != '+' && *curf != '#' && *curf != *curn) {
JMF 0:082731ede69f 444 break;
JMF 0:082731ede69f 445 }
JMF 0:082731ede69f 446 if(*curf == '+') {
JMF 0:082731ede69f 447 /* skip until we meet the next separator, or end of string */
JMF 0:082731ede69f 448 char *nextpos = curn + 1;
JMF 0:082731ede69f 449 while(nextpos < curn_end && *nextpos != '/')
JMF 0:082731ede69f 450 nextpos = ++curn + 1;
JMF 0:082731ede69f 451 } else if(*curf == '#') {
JMF 0:082731ede69f 452 /* skip until end of string */
JMF 0:082731ede69f 453 curn = curn_end - 1;
JMF 0:082731ede69f 454 }
JMF 0:082731ede69f 455
JMF 0:082731ede69f 456 curf++;
JMF 0:082731ede69f 457 curn++;
JMF 0:082731ede69f 458 };
JMF 0:082731ede69f 459
JMF 0:082731ede69f 460 return (curn == curn_end) && (*curf == '\0');
JMF 0:082731ede69f 461 }
JMF 0:082731ede69f 462
JMF 0:082731ede69f 463 static IoT_Error_t _aws_iot_mqtt_internal_deliver_message(AWS_IoT_Client *pClient, char *pTopicName,
JMF 0:082731ede69f 464 uint16_t topicNameLen,
JMF 0:082731ede69f 465 IoT_Publish_Message_Params *pMessageParams) {
JMF 0:082731ede69f 466 uint32_t itr;
JMF 0:082731ede69f 467 IoT_Error_t rc;
JMF 0:082731ede69f 468 ClientState clientState;
JMF 0:082731ede69f 469
JMF 0:082731ede69f 470 FUNC_ENTRY;
JMF 0:082731ede69f 471
JMF 0:082731ede69f 472 if(NULL == pTopicName) {
JMF 0:082731ede69f 473 FUNC_EXIT_RC(NULL_VALUE_ERROR);
JMF 0:082731ede69f 474 }
JMF 0:082731ede69f 475
JMF 0:082731ede69f 476 /* This function can be called from all MQTT APIs
JMF 0:082731ede69f 477 * But while callback return is in progress, Yield should not be called.
JMF 0:082731ede69f 478 * The state for CB_RETURN accomplishes that, as yield cannot be called while in that state */
JMF 0:082731ede69f 479 clientState = aws_iot_mqtt_get_client_state(pClient);
JMF 0:082731ede69f 480 aws_iot_mqtt_set_client_state(pClient, clientState, CLIENT_STATE_CONNECTED_WAIT_FOR_CB_RETURN);
JMF 0:082731ede69f 481
JMF 0:082731ede69f 482 /* Find the right message handler - indexed by topic */
JMF 0:082731ede69f 483 for(itr = 0; itr < AWS_IOT_MQTT_NUM_SUBSCRIBE_HANDLERS; ++itr) {
JMF 0:082731ede69f 484 if(NULL != pClient->clientData.messageHandlers[itr].topicName) {
JMF 0:082731ede69f 485 if(((topicNameLen == pClient->clientData.messageHandlers[itr].topicNameLen)
JMF 0:082731ede69f 486 &&
JMF 0:082731ede69f 487 (strncmp(pTopicName, (char *) pClient->clientData.messageHandlers[itr].topicName, topicNameLen) == 0))
JMF 0:082731ede69f 488 || _aws_iot_mqtt_internal_is_topic_matched((char *) pClient->clientData.messageHandlers[itr].topicName,
JMF 0:082731ede69f 489 pTopicName, topicNameLen)) {
JMF 0:082731ede69f 490 if(NULL != pClient->clientData.messageHandlers[itr].pApplicationHandler) {
JMF 0:082731ede69f 491 pClient->clientData.messageHandlers[itr].pApplicationHandler(pClient, pTopicName, topicNameLen,
JMF 0:082731ede69f 492 pMessageParams,
JMF 0:082731ede69f 493 pClient->clientData.messageHandlers[itr].pApplicationHandlerData);
JMF 0:082731ede69f 494 }
JMF 0:082731ede69f 495 }
JMF 0:082731ede69f 496 }
JMF 0:082731ede69f 497 }
JMF 0:082731ede69f 498 rc = aws_iot_mqtt_set_client_state(pClient, CLIENT_STATE_CONNECTED_WAIT_FOR_CB_RETURN, clientState);
JMF 0:082731ede69f 499
JMF 0:082731ede69f 500 FUNC_EXIT_RC(rc);
JMF 0:082731ede69f 501 }
JMF 0:082731ede69f 502
JMF 0:082731ede69f 503 static IoT_Error_t _aws_iot_mqtt_internal_handle_publish(AWS_IoT_Client *pClient, awsTimer *pTimer) {
JMF 0:082731ede69f 504 char *topicName;
JMF 0:082731ede69f 505 uint16_t topicNameLen;
JMF 0:082731ede69f 506 uint32_t len;
JMF 0:082731ede69f 507 IoT_Error_t rc;
JMF 0:082731ede69f 508 IoT_Publish_Message_Params msg;
JMF 0:082731ede69f 509
JMF 0:082731ede69f 510 FUNC_ENTRY;
JMF 0:082731ede69f 511
JMF 0:082731ede69f 512 topicName = NULL;
JMF 0:082731ede69f 513 topicNameLen = 0;
JMF 0:082731ede69f 514 len = 0;
JMF 0:082731ede69f 515
JMF 0:082731ede69f 516 rc = aws_iot_mqtt_internal_deserialize_publish(&msg.isDup, &msg.qos, &msg.isRetained,
JMF 0:082731ede69f 517 &msg.id, &topicName, &topicNameLen,
JMF 0:082731ede69f 518 (unsigned char **) &msg.payload, &msg.payloadLen,
JMF 0:082731ede69f 519 pClient->clientData.readBuf,
JMF 0:082731ede69f 520 pClient->clientData.readBufSize);
JMF 0:082731ede69f 521
JMF 0:082731ede69f 522 if(AWS_SUCCESS != rc) {
JMF 0:082731ede69f 523 FUNC_EXIT_RC(rc);
JMF 0:082731ede69f 524 }
JMF 0:082731ede69f 525
JMF 0:082731ede69f 526 rc = _aws_iot_mqtt_internal_deliver_message(pClient, topicName, topicNameLen, &msg);
JMF 0:082731ede69f 527 if(AWS_SUCCESS != rc) {
JMF 0:082731ede69f 528 FUNC_EXIT_RC(rc);
JMF 0:082731ede69f 529 }
JMF 0:082731ede69f 530
JMF 0:082731ede69f 531 if(QOS0 == msg.qos) {
JMF 0:082731ede69f 532 /* No further processing required for QoS0 */
JMF 0:082731ede69f 533 FUNC_EXIT_RC(AWS_SUCCESS);
JMF 0:082731ede69f 534 }
JMF 0:082731ede69f 535
JMF 0:082731ede69f 536 /* Message assumed to be QoS1 since we do not support QoS2 at this time */
JMF 0:082731ede69f 537 rc = aws_iot_mqtt_internal_serialize_ack(pClient->clientData.writeBuf, pClient->clientData.writeBufSize,
JMF 0:082731ede69f 538 PUBACK, 0, msg.id, &len);
JMF 0:082731ede69f 539
JMF 0:082731ede69f 540 if(AWS_SUCCESS != rc) {
JMF 0:082731ede69f 541 FUNC_EXIT_RC(rc);
JMF 0:082731ede69f 542 }
JMF 0:082731ede69f 543
JMF 0:082731ede69f 544 rc = aws_iot_mqtt_internal_send_packet(pClient, len, pTimer);
JMF 0:082731ede69f 545 if(AWS_SUCCESS != rc) {
JMF 0:082731ede69f 546 FUNC_EXIT_RC(rc);
JMF 0:082731ede69f 547 }
JMF 0:082731ede69f 548
JMF 0:082731ede69f 549 FUNC_EXIT_RC(AWS_SUCCESS);
JMF 0:082731ede69f 550 }
JMF 0:082731ede69f 551
JMF 0:082731ede69f 552 IoT_Error_t aws_iot_mqtt_internal_cycle_read(AWS_IoT_Client *pClient, awsTimer *pTimer, uint8_t *pPacketType) {
JMF 0:082731ede69f 553 IoT_Error_t rc;
JMF 0:082731ede69f 554 //printf("JMF! in aws_iot_mqtt_internal_cycle_read\n");
JMF 0:082731ede69f 555 #ifdef _ENABLE_THREAD_SUPPORT_
JMF 0:082731ede69f 556 IoT_Error_t threadRc;
JMF 0:082731ede69f 557 #endif
JMF 0:082731ede69f 558
JMF 0:082731ede69f 559 if(NULL == pClient || NULL == pTimer) {
JMF 0:082731ede69f 560 return NULL_VALUE_ERROR;
JMF 0:082731ede69f 561 }
JMF 0:082731ede69f 562
JMF 0:082731ede69f 563 #ifdef _ENABLE_THREAD_SUPPORT_
JMF 0:082731ede69f 564 threadRc = aws_iot_mqtt_client_lock_mutex(pClient, &(pClient->clientData.tls_read_mutex));
JMF 0:082731ede69f 565 if(AWS_SUCCESS != threadRc) {
JMF 0:082731ede69f 566 FUNC_EXIT_RC(threadRc);
JMF 0:082731ede69f 567 }
JMF 0:082731ede69f 568 #endif
JMF 0:082731ede69f 569
JMF 0:082731ede69f 570 /* read the socket, see what work is due */
JMF 0:082731ede69f 571 rc = _aws_iot_mqtt_internal_read_packet(pClient, pTimer, pPacketType);
JMF 0:082731ede69f 572 //printf("JMF:%s:%d rc=%d\n",__FILE__,__LINE__,rc);
JMF 0:082731ede69f 573
JMF 0:082731ede69f 574 #ifdef _ENABLE_THREAD_SUPPORT_
JMF 0:082731ede69f 575 threadRc = aws_iot_mqtt_client_unlock_mutex(pClient, &(pClient->clientData.tls_read_mutex));
JMF 0:082731ede69f 576 if(AWS_SUCCESS != threadRc && (MQTT_NOTHING_TO_READ == rc || AWS_SUCCESS == rc)) {
JMF 0:082731ede69f 577 return threadRc;
JMF 0:082731ede69f 578 }
JMF 0:082731ede69f 579 #endif
JMF 0:082731ede69f 580
JMF 0:082731ede69f 581 if(MQTT_NOTHING_TO_READ == rc) {
JMF 0:082731ede69f 582 /* Nothing to read, not a cycle failure */
JMF 0:082731ede69f 583 //printf("JMF:%s:%d rc=%d\n",__FILE__,__LINE__,rc);
JMF 0:082731ede69f 584 return AWS_SUCCESS;
JMF 0:082731ede69f 585 } else if(AWS_SUCCESS != rc) {
JMF 0:082731ede69f 586 //printf("JMF:%s:%d rc=%d\n",__FILE__,__LINE__,rc);
JMF 0:082731ede69f 587 return rc;
JMF 0:082731ede69f 588 }
JMF 0:082731ede69f 589 //printf("JMF: cycle_read got a char, switch on %d\n",*pPacketType);
JMF 0:082731ede69f 590
JMF 0:082731ede69f 591 switch(*pPacketType) {
JMF 0:082731ede69f 592 case CONNACK:
JMF 0:082731ede69f 593 case PUBACK:
JMF 0:082731ede69f 594 case SUBACK:
JMF 0:082731ede69f 595 case UNSUBACK:
JMF 0:082731ede69f 596 /* SDK is blocking, these responses will be forwarded to calling function to process */
JMF 0:082731ede69f 597 break;
JMF 0:082731ede69f 598 case PUBLISH: {
JMF 0:082731ede69f 599 rc = _aws_iot_mqtt_internal_handle_publish(pClient, pTimer);
JMF 0:082731ede69f 600 break;
JMF 0:082731ede69f 601 }
JMF 0:082731ede69f 602 case PUBREC:
JMF 0:082731ede69f 603 case PUBCOMP:
JMF 0:082731ede69f 604 /* QoS2 not supported at this time */
JMF 0:082731ede69f 605 break;
JMF 0:082731ede69f 606 case PINGRESP: {
JMF 0:082731ede69f 607 pClient->clientStatus.isPingOutstanding = 0;
JMF 0:082731ede69f 608 countdown_sec(&pClient->pingTimer, pClient->clientData.keepAliveInterval);
JMF 0:082731ede69f 609 break;
JMF 0:082731ede69f 610 }
JMF 0:082731ede69f 611 default: {
JMF 0:082731ede69f 612 /* Either unknown packet type or Failure occurred
JMF 0:082731ede69f 613 * Should not happen */
JMF 0:082731ede69f 614 rc = MQTT_RX_MESSAGE_PACKET_TYPE_INVALID_ERROR;
JMF 0:082731ede69f 615 break;
JMF 0:082731ede69f 616 }
JMF 0:082731ede69f 617 }
JMF 0:082731ede69f 618
JMF 0:082731ede69f 619 //printf("JMF:%s:%d EXITING rc=%d\n",__FILE__,__LINE__,rc);
JMF 0:082731ede69f 620 return rc;
JMF 0:082731ede69f 621 }
JMF 0:082731ede69f 622
JMF 0:082731ede69f 623 /* only used in single-threaded mode where one command at a time is in process */
JMF 0:082731ede69f 624 IoT_Error_t aws_iot_mqtt_internal_wait_for_read(AWS_IoT_Client *pClient, uint8_t packetType, awsTimer *pTimer) {
JMF 0:082731ede69f 625 IoT_Error_t rc;
JMF 0:082731ede69f 626 uint8_t read_packet_type;
JMF 0:082731ede69f 627
JMF 0:082731ede69f 628 FUNC_ENTRY;
JMF 0:082731ede69f 629 if(NULL == pClient || NULL == pTimer) {
JMF 0:082731ede69f 630 FUNC_EXIT_RC(NULL_VALUE_ERROR);
JMF 0:082731ede69f 631 }
JMF 0:082731ede69f 632
JMF 0:082731ede69f 633 read_packet_type = 0;
JMF 0:082731ede69f 634 do {
JMF 0:082731ede69f 635 if(has_timer_expired(pTimer)) {
JMF 0:082731ede69f 636 /* we timed out */
JMF 0:082731ede69f 637 rc = MQTT_REQUEST_TIMEOUT_ERROR;
JMF 0:082731ede69f 638 break;
JMF 0:082731ede69f 639 }
JMF 0:082731ede69f 640 rc = aws_iot_mqtt_internal_cycle_read(pClient, pTimer, &read_packet_type);
JMF 0:082731ede69f 641 } while(((AWS_SUCCESS == rc) || (MQTT_NOTHING_TO_READ == rc)) && (read_packet_type != packetType));
JMF 0:082731ede69f 642
JMF 0:082731ede69f 643 /* If rc is AWS_SUCCESS, we have received the expected
JMF 0:082731ede69f 644 * MQTT packet. Otherwise rc tells the error. */
JMF 0:082731ede69f 645 FUNC_EXIT_RC(rc);
JMF 0:082731ede69f 646 }
JMF 0:082731ede69f 647
JMF 0:082731ede69f 648 /**
JMF 0:082731ede69f 649 * Serializes a 0-length packet into the supplied buffer, ready for writing to a socket
JMF 0:082731ede69f 650 * @param buf the buffer into which the packet will be serialized
JMF 0:082731ede69f 651 * @param buflen the length in bytes of the supplied buffer, to avoid overruns
JMF 0:082731ede69f 652 * @param packettype the message type
JMF 0:082731ede69f 653 * @param serialized length
JMF 0:082731ede69f 654 * @return IoT_Error_t indicating function execution status
JMF 0:082731ede69f 655 */
JMF 0:082731ede69f 656 IoT_Error_t aws_iot_mqtt_internal_serialize_zero(unsigned char *pTxBuf, size_t txBufLen, MessageTypes packetType,
JMF 0:082731ede69f 657 size_t *pSerializedLength) {
JMF 0:082731ede69f 658 unsigned char *ptr;
JMF 0:082731ede69f 659 IoT_Error_t rc;
JMF 0:082731ede69f 660 MQTTHeader header = {0};
JMF 0:082731ede69f 661
JMF 0:082731ede69f 662 FUNC_ENTRY;
JMF 0:082731ede69f 663 if(NULL == pTxBuf || NULL == pSerializedLength) {
JMF 0:082731ede69f 664 FUNC_EXIT_RC(NULL_VALUE_ERROR);
JMF 0:082731ede69f 665 }
JMF 0:082731ede69f 666
JMF 0:082731ede69f 667 /* Buffer should have at least 2 bytes for the header */
JMF 0:082731ede69f 668 if(4 > txBufLen) {
JMF 0:082731ede69f 669 FUNC_EXIT_RC(MQTT_TX_BUFFER_TOO_SHORT_ERROR);
JMF 0:082731ede69f 670 }
JMF 0:082731ede69f 671
JMF 0:082731ede69f 672 ptr = pTxBuf;
JMF 0:082731ede69f 673
JMF 0:082731ede69f 674 rc = aws_iot_mqtt_internal_init_header(&header, packetType, QOS0, 0, 0);
JMF 0:082731ede69f 675 if(AWS_SUCCESS != rc) {
JMF 0:082731ede69f 676 FUNC_EXIT_RC(rc);
JMF 0:082731ede69f 677 }
JMF 0:082731ede69f 678
JMF 0:082731ede69f 679 /* write header */
JMF 0:082731ede69f 680 aws_iot_mqtt_internal_write_char(&ptr, header.byte);
JMF 0:082731ede69f 681
JMF 0:082731ede69f 682 /* write remaining length */
JMF 0:082731ede69f 683 ptr += aws_iot_mqtt_internal_write_len_to_buffer(ptr, 0);
JMF 0:082731ede69f 684 *pSerializedLength = (uint32_t) (ptr - pTxBuf);
JMF 0:082731ede69f 685
JMF 0:082731ede69f 686 FUNC_EXIT_RC(AWS_SUCCESS);
JMF 0:082731ede69f 687 }
JMF 0:082731ede69f 688
JMF 0:082731ede69f 689 #ifdef __cplusplus
JMF 0:082731ede69f 690 }
JMF 0:082731ede69f 691 #endif