![](/media/cache/profiles/5f55d0baa59f4bc1dc393149183f1492.jpg.50x50_q85.jpg)
Changes to enabled on-line compiler
Diff: src/aws_iot_mqtt_client_common_internal.c
- Revision:
- 0:082731ede69f
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/aws_iot_mqtt_client_common_internal.c Wed May 30 20:59:51 2018 +0000 @@ -0,0 +1,691 @@ +/* +* Copyright 2015-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. +* +* Licensed under the Apache License, Version 2.0 (the "License"). +* You may not use this file except in compliance with the License. +* A copy of the License is located at +* +* http://aws.amazon.com/apache2.0 +* +* or in the "license" file accompanying this file. This file is distributed +* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +* express or implied. See the License for the specific language governing +* permissions and limitations under the License. +*/ + +// Based on Eclipse Paho. +/******************************************************************************* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + * Sergio R. Caprile - non-blocking packet read functions for stream transport + *******************************************************************************/ + +/** + * @file aws_iot_mqtt_client_common_internal.c + * @brief MQTT client internal API definitions + */ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <aws_iot_mqtt_client.h> +#include "aws_iot_mqtt_client_common_internal.h" + +/* Max length of packet header */ +#define MAX_NO_OF_REMAINING_LENGTH_BYTES 4 + +/** + * Encodes the message length according to the MQTT algorithm + * @param buf the buffer into which the encoded data is written + * @param length the length to be encoded + * @return the number of bytes written to buffer + */ +size_t aws_iot_mqtt_internal_write_len_to_buffer(unsigned char *buf, uint32_t length) { + size_t outLen = 0; + unsigned char encodedByte; + + FUNC_ENTRY; + do { + encodedByte = (unsigned char) (length % 128); + length /= 128; + /* if there are more digits to encode, set the top bit of this digit */ + if(length > 0) { + encodedByte |= 0x80; + } + buf[outLen++] = encodedByte; + } while(length > 0); + + FUNC_EXIT_RC(outLen); +} + +/** + * Decodes the message length according to the MQTT algorithm + * @param the buffer containing the message + * @param value the decoded length returned + * @return the number of bytes read from the socket + */ +IoT_Error_t aws_iot_mqtt_internal_decode_remaining_length_from_buffer(unsigned char *buf, uint32_t *decodedLen, + uint32_t *readBytesLen) { + unsigned char encodedByte; + uint32_t multiplier, len; + FUNC_ENTRY; + + multiplier = 1; + len = 0; + *decodedLen = 0; + + do { + if(++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) { + /* bad data */ + FUNC_EXIT_RC(MQTT_DECODE_REMAINING_LENGTH_ERROR); + } + encodedByte = *buf; + buf++; + *decodedLen += (encodedByte & 127) * multiplier; + multiplier *= 128; + } while((encodedByte & 128) != 0); + + *readBytesLen = len; + + FUNC_EXIT_RC(AWS_SUCCESS); +} + +uint32_t aws_iot_mqtt_internal_get_final_packet_length_from_remaining_length(uint32_t rem_len) { + rem_len += 1; /* header byte */ + /* now remaining_length field (MQTT 3.1.1 - 2.2.3)*/ + if(rem_len < 128) { + rem_len += 1; + } else if(rem_len < 16384) { + rem_len += 2; + } else if(rem_len < 2097152) { + rem_len += 3; + } else { + rem_len += 4; + } + return rem_len; +} + +/** + * Calculates uint16 packet id from two bytes read from the input buffer + * Checks Endianness at runtime + * + * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned + * @return the value calculated + */ +uint16_t aws_iot_mqtt_internal_read_uint16_t(unsigned char **pptr) { + unsigned char *ptr = *pptr; + uint16_t len = 0; + uint8_t firstByte = (uint8_t) (*ptr); + uint8_t secondByte = (uint8_t) (*(ptr + 1)); + len = (uint16_t) (secondByte + (256 * firstByte)); + + *pptr += 2; + return len; +} + +/** + * Writes an integer as 2 bytes to an output buffer. + * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned + * @param anInt the integer to write + */ +void aws_iot_mqtt_internal_write_uint_16(unsigned char **pptr, uint16_t anInt) { + **pptr = (unsigned char) (anInt / 256); + (*pptr)++; + **pptr = (unsigned char) (anInt % 256); + (*pptr)++; +} + +/** + * Reads one character from the input buffer. + * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned + * @return the character read + */ +unsigned char aws_iot_mqtt_internal_read_char(unsigned char **pptr) { + unsigned char c = **pptr; + (*pptr)++; + return c; +} + +/** + * Writes one character to an output buffer. + * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned + * @param c the character to write + */ +void aws_iot_mqtt_internal_write_char(unsigned char **pptr, unsigned char c) { + **pptr = c; + (*pptr)++; +} + +void aws_iot_mqtt_internal_write_utf8_string(unsigned char **pptr, const char *string, uint16_t stringLen) { + /* Nothing that calls this function will have a stringLen with a size larger than 2 bytes (MQTT 3.1.1 - 1.5.3) */ + aws_iot_mqtt_internal_write_uint_16(pptr, stringLen); + if(stringLen > 0) { + memcpy(*pptr, string, stringLen); + *pptr += stringLen; + } +} + +/** + * Initialize the MQTTHeader structure. Used to ensure that Header bits are + * always initialized using the proper mappings. No Endianness issues here since + * the individual fields are all less than a byte. Also generates no warnings since + * all fields are initialized using hex constants + */ +IoT_Error_t aws_iot_mqtt_internal_init_header(MQTTHeader *pHeader, MessageTypes message_type, + QoS qos, uint8_t dup, uint8_t retained) { + FUNC_ENTRY; + + if(NULL == pHeader) { + FUNC_EXIT_RC(NULL_VALUE_ERROR); + } + + /* Set all bits to zero */ + pHeader->byte = 0; + uint8_t type = 0; + switch(message_type) { + case UNKNOWN: + /* Should never happen */ + return FAILURE; + case CONNECT: + type = 0x01; + break; + case CONNACK: + type = 0x02; + break; + case PUBLISH: + type = 0x03; + break; + case PUBACK: + type = 0x04; + break; + case PUBREC: + type = 0x05; + break; + case PUBREL: + type = 0x06; + break; + case PUBCOMP: + type = 0x07; + break; + case SUBSCRIBE: + type = 0x08; + break; + case SUBACK: + type = 0x09; + break; + case UNSUBSCRIBE: + type = 0x0A; + break; + case UNSUBACK: + type = 0x0B; + break; + case PINGREQ: + type = 0x0C; + break; + case PINGRESP: + type = 0x0D; + break; + case DISCONNECT: + type = 0x0E; + break; + default: + /* Should never happen */ + FUNC_EXIT_RC(FAILURE); + } + + pHeader->byte = type << 4; + pHeader->byte |= dup << 3; + + switch(qos) { + case QOS0: + break; + case QOS1: + pHeader->byte |= 1 << 1; + break; + default: + /* Using QOS0 as default */ + break; + } + + pHeader->byte |= (1 == retained) ? 0x01 : 0x00; + + FUNC_EXIT_RC(AWS_SUCCESS); +} + +IoT_Error_t aws_iot_mqtt_internal_send_packet(AWS_IoT_Client *pClient, size_t length, awsTimer *pTimer) { + + size_t sentLen, sent; + IoT_Error_t rc; + + FUNC_ENTRY; + + if(NULL == pClient || NULL == pTimer) { + FUNC_EXIT_RC(NULL_VALUE_ERROR); + } + + if(length >= pClient->clientData.writeBufSize) { + FUNC_EXIT_RC(MQTT_TX_BUFFER_TOO_SHORT_ERROR); + } + +#ifdef _ENABLE_THREAD_SUPPORT_ + rc = aws_iot_mqtt_client_lock_mutex(pClient, &(pClient->clientData.tls_write_mutex)); + if(AWS_SUCCESS != rc) { + FUNC_EXIT_RC(rc); + } +#endif + + sentLen = 0; + sent = 0; + + while(sent < length && !has_timer_expired(pTimer)) { + rc = pClient->networkStack.write(&(pClient->networkStack), + &pClient->clientData.writeBuf[sent], + (length - sent), + pTimer, + &sentLen); + if(AWS_SUCCESS != rc) { + /* there was an error writing the data */ + break; + } + sent += sentLen; + } + +#ifdef _ENABLE_THREAD_SUPPORT_ + rc = aws_iot_mqtt_client_unlock_mutex(pClient, &(pClient->clientData.tls_write_mutex)); + if(AWS_SUCCESS != rc) { + FUNC_EXIT_RC(rc); + } +#endif + + if(sent == length) { + /* record the fact that we have successfully sent the packet */ + //countdown_sec(&c->pingTimer, c->clientData.keepAliveInterval); + FUNC_EXIT_RC(AWS_SUCCESS); + } + + FUNC_EXIT_RC(rc) +} + +static IoT_Error_t _aws_iot_mqtt_internal_decode_packet_remaining_len(AWS_IoT_Client *pClient, + size_t *rem_len, awsTimer *pTimer) { + unsigned char encodedByte; + size_t multiplier, len; + IoT_Error_t rc; + + FUNC_ENTRY; + + multiplier = 1; + len = 0; + *rem_len = 0; + + do { + if(++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) { + /* bad data */ + FUNC_EXIT_RC(MQTT_DECODE_REMAINING_LENGTH_ERROR); + } + + rc = pClient->networkStack.read(&(pClient->networkStack), &encodedByte, 1, pTimer, &len); + if(AWS_SUCCESS != rc) { + FUNC_EXIT_RC(rc); + } + + *rem_len += ((encodedByte & 127) * multiplier); + multiplier *= 128; + } while((encodedByte & 128) != 0); + + FUNC_EXIT_RC(rc); +} + +static IoT_Error_t _aws_iot_mqtt_internal_read_packet(AWS_IoT_Client *pClient, awsTimer *pTimer, uint8_t *pPacketType) { + size_t len, rem_len, total_bytes_read, bytes_to_be_read, read_len; + IoT_Error_t rc; + MQTTHeader header = {0}; + awsTimer packetTimer; + init_timer(&packetTimer); + countdown_ms(&packetTimer, pClient->clientData.packetTimeoutMs); + + rem_len = 0; + total_bytes_read = 0; + bytes_to_be_read = 0; + read_len = 0; + + rc = pClient->networkStack.read(&(pClient->networkStack), pClient->clientData.readBuf, 1, pTimer, &read_len); +//printf("JMF:%s:%d read %d from networkStack.read\n",__FILE__,__LINE__,rc); + + /* 1. read the header byte. This has the packet type in it */ + if(NETWORK_SSL_NOTHING_TO_READ == rc) { + return MQTT_NOTHING_TO_READ; + } else if(AWS_SUCCESS != rc) { + return rc; + } + + len = 1; + + /* Use the constant packet receive timeout, instead of the variable (remaining) pTimer time, to + * determine packet receiving timeout. This is done so we don't prematurely time out packet receiving + * if the remaining time in pTimer is too short. + */ + pTimer = &packetTimer; + + /* 2. read the remaining length. This is variable in itself */ + rc = _aws_iot_mqtt_internal_decode_packet_remaining_len(pClient, &rem_len, pTimer); + if(AWS_SUCCESS != rc) { + return rc; + } + + /* if the buffer is too short then the message will be dropped silently */ + if(rem_len >= pClient->clientData.readBufSize) { + bytes_to_be_read = pClient->clientData.readBufSize; + do { + rc = pClient->networkStack.read(&(pClient->networkStack), pClient->clientData.readBuf, bytes_to_be_read, + pTimer, &read_len); + if(AWS_SUCCESS == rc) { + total_bytes_read += read_len; + if((rem_len - total_bytes_read) >= pClient->clientData.readBufSize) { + bytes_to_be_read = pClient->clientData.readBufSize; + } else { + bytes_to_be_read = rem_len - total_bytes_read; + } + } + } while(total_bytes_read < rem_len && AWS_SUCCESS == rc); + return MQTT_RX_BUFFER_TOO_SHORT_ERROR; + } + + /* put the original remaining length into the read buffer */ + len += aws_iot_mqtt_internal_write_len_to_buffer(pClient->clientData.readBuf + 1, (uint32_t) rem_len); + + /* 3. read the rest of the buffer using a callback to supply the rest of the data */ + if(rem_len > 0) { + rc = pClient->networkStack.read(&(pClient->networkStack), pClient->clientData.readBuf + len, rem_len, pTimer, + &read_len); + if(AWS_SUCCESS != rc || read_len != rem_len) { + return FAILURE; + } + } + + header.byte = pClient->clientData.readBuf[0]; + *pPacketType = MQTT_HEADER_FIELD_TYPE(header.byte); + + FUNC_EXIT_RC(rc); +} + +// assume topic filter and name is in correct format +// # can only be at end +// + and # can only be next to separator +static bool _aws_iot_mqtt_internal_is_topic_matched(char *pTopicFilter, char *pTopicName, uint16_t topicNameLen) { + + char *curf, *curn, *curn_end; + + if(NULL == pTopicFilter || NULL == pTopicName) { + return false; + } + + curf = pTopicFilter; + curn = pTopicName; + curn_end = curn + topicNameLen; + + while(*curf && (curn < curn_end)) { + if(*curn == '/' && *curf != '/') { + break; + } + if(*curf != '+' && *curf != '#' && *curf != *curn) { + break; + } + if(*curf == '+') { + /* skip until we meet the next separator, or end of string */ + char *nextpos = curn + 1; + while(nextpos < curn_end && *nextpos != '/') + nextpos = ++curn + 1; + } else if(*curf == '#') { + /* skip until end of string */ + curn = curn_end - 1; + } + + curf++; + curn++; + }; + + return (curn == curn_end) && (*curf == '\0'); +} + +static IoT_Error_t _aws_iot_mqtt_internal_deliver_message(AWS_IoT_Client *pClient, char *pTopicName, + uint16_t topicNameLen, + IoT_Publish_Message_Params *pMessageParams) { + uint32_t itr; + IoT_Error_t rc; + ClientState clientState; + + FUNC_ENTRY; + + if(NULL == pTopicName) { + FUNC_EXIT_RC(NULL_VALUE_ERROR); + } + + /* This function can be called from all MQTT APIs + * But while callback return is in progress, Yield should not be called. + * The state for CB_RETURN accomplishes that, as yield cannot be called while in that state */ + clientState = aws_iot_mqtt_get_client_state(pClient); + aws_iot_mqtt_set_client_state(pClient, clientState, CLIENT_STATE_CONNECTED_WAIT_FOR_CB_RETURN); + + /* Find the right message handler - indexed by topic */ + for(itr = 0; itr < AWS_IOT_MQTT_NUM_SUBSCRIBE_HANDLERS; ++itr) { + if(NULL != pClient->clientData.messageHandlers[itr].topicName) { + if(((topicNameLen == pClient->clientData.messageHandlers[itr].topicNameLen) + && + (strncmp(pTopicName, (char *) pClient->clientData.messageHandlers[itr].topicName, topicNameLen) == 0)) + || _aws_iot_mqtt_internal_is_topic_matched((char *) pClient->clientData.messageHandlers[itr].topicName, + pTopicName, topicNameLen)) { + if(NULL != pClient->clientData.messageHandlers[itr].pApplicationHandler) { + pClient->clientData.messageHandlers[itr].pApplicationHandler(pClient, pTopicName, topicNameLen, + pMessageParams, + pClient->clientData.messageHandlers[itr].pApplicationHandlerData); + } + } + } + } + rc = aws_iot_mqtt_set_client_state(pClient, CLIENT_STATE_CONNECTED_WAIT_FOR_CB_RETURN, clientState); + + FUNC_EXIT_RC(rc); +} + +static IoT_Error_t _aws_iot_mqtt_internal_handle_publish(AWS_IoT_Client *pClient, awsTimer *pTimer) { + char *topicName; + uint16_t topicNameLen; + uint32_t len; + IoT_Error_t rc; + IoT_Publish_Message_Params msg; + + FUNC_ENTRY; + + topicName = NULL; + topicNameLen = 0; + len = 0; + + rc = aws_iot_mqtt_internal_deserialize_publish(&msg.isDup, &msg.qos, &msg.isRetained, + &msg.id, &topicName, &topicNameLen, + (unsigned char **) &msg.payload, &msg.payloadLen, + pClient->clientData.readBuf, + pClient->clientData.readBufSize); + + if(AWS_SUCCESS != rc) { + FUNC_EXIT_RC(rc); + } + + rc = _aws_iot_mqtt_internal_deliver_message(pClient, topicName, topicNameLen, &msg); + if(AWS_SUCCESS != rc) { + FUNC_EXIT_RC(rc); + } + + if(QOS0 == msg.qos) { + /* No further processing required for QoS0 */ + FUNC_EXIT_RC(AWS_SUCCESS); + } + + /* Message assumed to be QoS1 since we do not support QoS2 at this time */ + rc = aws_iot_mqtt_internal_serialize_ack(pClient->clientData.writeBuf, pClient->clientData.writeBufSize, + PUBACK, 0, msg.id, &len); + + if(AWS_SUCCESS != rc) { + FUNC_EXIT_RC(rc); + } + + rc = aws_iot_mqtt_internal_send_packet(pClient, len, pTimer); + if(AWS_SUCCESS != rc) { + FUNC_EXIT_RC(rc); + } + + FUNC_EXIT_RC(AWS_SUCCESS); +} + +IoT_Error_t aws_iot_mqtt_internal_cycle_read(AWS_IoT_Client *pClient, awsTimer *pTimer, uint8_t *pPacketType) { + IoT_Error_t rc; +//printf("JMF! in aws_iot_mqtt_internal_cycle_read\n"); +#ifdef _ENABLE_THREAD_SUPPORT_ + IoT_Error_t threadRc; +#endif + + if(NULL == pClient || NULL == pTimer) { + return NULL_VALUE_ERROR; + } + +#ifdef _ENABLE_THREAD_SUPPORT_ + threadRc = aws_iot_mqtt_client_lock_mutex(pClient, &(pClient->clientData.tls_read_mutex)); + if(AWS_SUCCESS != threadRc) { + FUNC_EXIT_RC(threadRc); + } +#endif + + /* read the socket, see what work is due */ + rc = _aws_iot_mqtt_internal_read_packet(pClient, pTimer, pPacketType); +//printf("JMF:%s:%d rc=%d\n",__FILE__,__LINE__,rc); + +#ifdef _ENABLE_THREAD_SUPPORT_ + threadRc = aws_iot_mqtt_client_unlock_mutex(pClient, &(pClient->clientData.tls_read_mutex)); + if(AWS_SUCCESS != threadRc && (MQTT_NOTHING_TO_READ == rc || AWS_SUCCESS == rc)) { + return threadRc; + } +#endif + + if(MQTT_NOTHING_TO_READ == rc) { + /* Nothing to read, not a cycle failure */ +//printf("JMF:%s:%d rc=%d\n",__FILE__,__LINE__,rc); + return AWS_SUCCESS; + } else if(AWS_SUCCESS != rc) { +//printf("JMF:%s:%d rc=%d\n",__FILE__,__LINE__,rc); + return rc; + } +//printf("JMF: cycle_read got a char, switch on %d\n",*pPacketType); + + switch(*pPacketType) { + case CONNACK: + case PUBACK: + case SUBACK: + case UNSUBACK: + /* SDK is blocking, these responses will be forwarded to calling function to process */ + break; + case PUBLISH: { + rc = _aws_iot_mqtt_internal_handle_publish(pClient, pTimer); + break; + } + case PUBREC: + case PUBCOMP: + /* QoS2 not supported at this time */ + break; + case PINGRESP: { + pClient->clientStatus.isPingOutstanding = 0; + countdown_sec(&pClient->pingTimer, pClient->clientData.keepAliveInterval); + break; + } + default: { + /* Either unknown packet type or Failure occurred + * Should not happen */ + rc = MQTT_RX_MESSAGE_PACKET_TYPE_INVALID_ERROR; + break; + } + } + +//printf("JMF:%s:%d EXITING rc=%d\n",__FILE__,__LINE__,rc); + return rc; +} + +/* only used in single-threaded mode where one command at a time is in process */ +IoT_Error_t aws_iot_mqtt_internal_wait_for_read(AWS_IoT_Client *pClient, uint8_t packetType, awsTimer *pTimer) { + IoT_Error_t rc; + uint8_t read_packet_type; + + FUNC_ENTRY; + if(NULL == pClient || NULL == pTimer) { + FUNC_EXIT_RC(NULL_VALUE_ERROR); + } + + read_packet_type = 0; + do { + if(has_timer_expired(pTimer)) { + /* we timed out */ + rc = MQTT_REQUEST_TIMEOUT_ERROR; + break; + } + rc = aws_iot_mqtt_internal_cycle_read(pClient, pTimer, &read_packet_type); + } while(((AWS_SUCCESS == rc) || (MQTT_NOTHING_TO_READ == rc)) && (read_packet_type != packetType)); + + /* If rc is AWS_SUCCESS, we have received the expected + * MQTT packet. Otherwise rc tells the error. */ + FUNC_EXIT_RC(rc); +} + +/** + * Serializes a 0-length packet into the supplied buffer, ready for writing to a socket + * @param buf the buffer into which the packet will be serialized + * @param buflen the length in bytes of the supplied buffer, to avoid overruns + * @param packettype the message type + * @param serialized length + * @return IoT_Error_t indicating function execution status + */ +IoT_Error_t aws_iot_mqtt_internal_serialize_zero(unsigned char *pTxBuf, size_t txBufLen, MessageTypes packetType, + size_t *pSerializedLength) { + unsigned char *ptr; + IoT_Error_t rc; + MQTTHeader header = {0}; + + FUNC_ENTRY; + if(NULL == pTxBuf || NULL == pSerializedLength) { + FUNC_EXIT_RC(NULL_VALUE_ERROR); + } + + /* Buffer should have at least 2 bytes for the header */ + if(4 > txBufLen) { + FUNC_EXIT_RC(MQTT_TX_BUFFER_TOO_SHORT_ERROR); + } + + ptr = pTxBuf; + + rc = aws_iot_mqtt_internal_init_header(&header, packetType, QOS0, 0, 0); + if(AWS_SUCCESS != rc) { + FUNC_EXIT_RC(rc); + } + + /* write header */ + aws_iot_mqtt_internal_write_char(&ptr, header.byte); + + /* write remaining length */ + ptr += aws_iot_mqtt_internal_write_len_to_buffer(ptr, 0); + *pSerializedLength = (uint32_t) (ptr - pTxBuf); + + FUNC_EXIT_RC(AWS_SUCCESS); +} + +#ifdef __cplusplus +} +#endif