Jim Flynn
/
aws-iot-device-sdk-mbed-c
Changes to enabled on-line compiler
src/aws_iot_mqtt_client_common_internal.c
- Committer:
- JMF
- Date:
- 2018-05-30
- Revision:
- 0:082731ede69f
File content as of revision 0:082731ede69f:
/* * Copyright 2015-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * http://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ // Based on Eclipse Paho. /******************************************************************************* * Copyright (c) 2014 IBM Corp. * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * and Eclipse Distribution License v1.0 which accompany this distribution. * * The Eclipse Public License is available at * http://www.eclipse.org/legal/epl-v10.html * and the Eclipse Distribution License is available at * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Ian Craggs - initial API and implementation and/or initial documentation * Sergio R. Caprile - non-blocking packet read functions for stream transport *******************************************************************************/ /** * @file aws_iot_mqtt_client_common_internal.c * @brief MQTT client internal API definitions */ #ifdef __cplusplus extern "C" { #endif #include <aws_iot_mqtt_client.h> #include "aws_iot_mqtt_client_common_internal.h" /* Max length of packet header */ #define MAX_NO_OF_REMAINING_LENGTH_BYTES 4 /** * Encodes the message length according to the MQTT algorithm * @param buf the buffer into which the encoded data is written * @param length the length to be encoded * @return the number of bytes written to buffer */ size_t aws_iot_mqtt_internal_write_len_to_buffer(unsigned char *buf, uint32_t length) { size_t outLen = 0; unsigned char encodedByte; FUNC_ENTRY; do { encodedByte = (unsigned char) (length % 128); length /= 128; /* if there are more digits to encode, set the top bit of this digit */ if(length > 0) { encodedByte |= 0x80; } buf[outLen++] = encodedByte; } while(length > 0); FUNC_EXIT_RC(outLen); } /** * Decodes the message length according to the MQTT algorithm * @param the buffer containing the message * @param value the decoded length returned * @return the number of bytes read from the socket */ IoT_Error_t aws_iot_mqtt_internal_decode_remaining_length_from_buffer(unsigned char *buf, uint32_t *decodedLen, uint32_t *readBytesLen) { unsigned char encodedByte; uint32_t multiplier, len; FUNC_ENTRY; multiplier = 1; len = 0; *decodedLen = 0; do { if(++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) { /* bad data */ FUNC_EXIT_RC(MQTT_DECODE_REMAINING_LENGTH_ERROR); } encodedByte = *buf; buf++; *decodedLen += (encodedByte & 127) * multiplier; multiplier *= 128; } while((encodedByte & 128) != 0); *readBytesLen = len; FUNC_EXIT_RC(AWS_SUCCESS); } uint32_t aws_iot_mqtt_internal_get_final_packet_length_from_remaining_length(uint32_t rem_len) { rem_len += 1; /* header byte */ /* now remaining_length field (MQTT 3.1.1 - 2.2.3)*/ if(rem_len < 128) { rem_len += 1; } else if(rem_len < 16384) { rem_len += 2; } else if(rem_len < 2097152) { rem_len += 3; } else { rem_len += 4; } return rem_len; } /** * Calculates uint16 packet id from two bytes read from the input buffer * Checks Endianness at runtime * * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned * @return the value calculated */ uint16_t aws_iot_mqtt_internal_read_uint16_t(unsigned char **pptr) { unsigned char *ptr = *pptr; uint16_t len = 0; uint8_t firstByte = (uint8_t) (*ptr); uint8_t secondByte = (uint8_t) (*(ptr + 1)); len = (uint16_t) (secondByte + (256 * firstByte)); *pptr += 2; return len; } /** * Writes an integer as 2 bytes to an output buffer. * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned * @param anInt the integer to write */ void aws_iot_mqtt_internal_write_uint_16(unsigned char **pptr, uint16_t anInt) { **pptr = (unsigned char) (anInt / 256); (*pptr)++; **pptr = (unsigned char) (anInt % 256); (*pptr)++; } /** * Reads one character from the input buffer. * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned * @return the character read */ unsigned char aws_iot_mqtt_internal_read_char(unsigned char **pptr) { unsigned char c = **pptr; (*pptr)++; return c; } /** * Writes one character to an output buffer. * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned * @param c the character to write */ void aws_iot_mqtt_internal_write_char(unsigned char **pptr, unsigned char c) { **pptr = c; (*pptr)++; } void aws_iot_mqtt_internal_write_utf8_string(unsigned char **pptr, const char *string, uint16_t stringLen) { /* Nothing that calls this function will have a stringLen with a size larger than 2 bytes (MQTT 3.1.1 - 1.5.3) */ aws_iot_mqtt_internal_write_uint_16(pptr, stringLen); if(stringLen > 0) { memcpy(*pptr, string, stringLen); *pptr += stringLen; } } /** * Initialize the MQTTHeader structure. Used to ensure that Header bits are * always initialized using the proper mappings. No Endianness issues here since * the individual fields are all less than a byte. Also generates no warnings since * all fields are initialized using hex constants */ IoT_Error_t aws_iot_mqtt_internal_init_header(MQTTHeader *pHeader, MessageTypes message_type, QoS qos, uint8_t dup, uint8_t retained) { FUNC_ENTRY; if(NULL == pHeader) { FUNC_EXIT_RC(NULL_VALUE_ERROR); } /* Set all bits to zero */ pHeader->byte = 0; uint8_t type = 0; switch(message_type) { case UNKNOWN: /* Should never happen */ return FAILURE; case CONNECT: type = 0x01; break; case CONNACK: type = 0x02; break; case PUBLISH: type = 0x03; break; case PUBACK: type = 0x04; break; case PUBREC: type = 0x05; break; case PUBREL: type = 0x06; break; case PUBCOMP: type = 0x07; break; case SUBSCRIBE: type = 0x08; break; case SUBACK: type = 0x09; break; case UNSUBSCRIBE: type = 0x0A; break; case UNSUBACK: type = 0x0B; break; case PINGREQ: type = 0x0C; break; case PINGRESP: type = 0x0D; break; case DISCONNECT: type = 0x0E; break; default: /* Should never happen */ FUNC_EXIT_RC(FAILURE); } pHeader->byte = type << 4; pHeader->byte |= dup << 3; switch(qos) { case QOS0: break; case QOS1: pHeader->byte |= 1 << 1; break; default: /* Using QOS0 as default */ break; } pHeader->byte |= (1 == retained) ? 0x01 : 0x00; FUNC_EXIT_RC(AWS_SUCCESS); } IoT_Error_t aws_iot_mqtt_internal_send_packet(AWS_IoT_Client *pClient, size_t length, awsTimer *pTimer) { size_t sentLen, sent; IoT_Error_t rc; FUNC_ENTRY; if(NULL == pClient || NULL == pTimer) { FUNC_EXIT_RC(NULL_VALUE_ERROR); } if(length >= pClient->clientData.writeBufSize) { FUNC_EXIT_RC(MQTT_TX_BUFFER_TOO_SHORT_ERROR); } #ifdef _ENABLE_THREAD_SUPPORT_ rc = aws_iot_mqtt_client_lock_mutex(pClient, &(pClient->clientData.tls_write_mutex)); if(AWS_SUCCESS != rc) { FUNC_EXIT_RC(rc); } #endif sentLen = 0; sent = 0; while(sent < length && !has_timer_expired(pTimer)) { rc = pClient->networkStack.write(&(pClient->networkStack), &pClient->clientData.writeBuf[sent], (length - sent), pTimer, &sentLen); if(AWS_SUCCESS != rc) { /* there was an error writing the data */ break; } sent += sentLen; } #ifdef _ENABLE_THREAD_SUPPORT_ rc = aws_iot_mqtt_client_unlock_mutex(pClient, &(pClient->clientData.tls_write_mutex)); if(AWS_SUCCESS != rc) { FUNC_EXIT_RC(rc); } #endif if(sent == length) { /* record the fact that we have successfully sent the packet */ //countdown_sec(&c->pingTimer, c->clientData.keepAliveInterval); FUNC_EXIT_RC(AWS_SUCCESS); } FUNC_EXIT_RC(rc) } static IoT_Error_t _aws_iot_mqtt_internal_decode_packet_remaining_len(AWS_IoT_Client *pClient, size_t *rem_len, awsTimer *pTimer) { unsigned char encodedByte; size_t multiplier, len; IoT_Error_t rc; FUNC_ENTRY; multiplier = 1; len = 0; *rem_len = 0; do { if(++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) { /* bad data */ FUNC_EXIT_RC(MQTT_DECODE_REMAINING_LENGTH_ERROR); } rc = pClient->networkStack.read(&(pClient->networkStack), &encodedByte, 1, pTimer, &len); if(AWS_SUCCESS != rc) { FUNC_EXIT_RC(rc); } *rem_len += ((encodedByte & 127) * multiplier); multiplier *= 128; } while((encodedByte & 128) != 0); FUNC_EXIT_RC(rc); } static IoT_Error_t _aws_iot_mqtt_internal_read_packet(AWS_IoT_Client *pClient, awsTimer *pTimer, uint8_t *pPacketType) { size_t len, rem_len, total_bytes_read, bytes_to_be_read, read_len; IoT_Error_t rc; MQTTHeader header = {0}; awsTimer packetTimer; init_timer(&packetTimer); countdown_ms(&packetTimer, pClient->clientData.packetTimeoutMs); rem_len = 0; total_bytes_read = 0; bytes_to_be_read = 0; read_len = 0; rc = pClient->networkStack.read(&(pClient->networkStack), pClient->clientData.readBuf, 1, pTimer, &read_len); //printf("JMF:%s:%d read %d from networkStack.read\n",__FILE__,__LINE__,rc); /* 1. read the header byte. This has the packet type in it */ if(NETWORK_SSL_NOTHING_TO_READ == rc) { return MQTT_NOTHING_TO_READ; } else if(AWS_SUCCESS != rc) { return rc; } len = 1; /* Use the constant packet receive timeout, instead of the variable (remaining) pTimer time, to * determine packet receiving timeout. This is done so we don't prematurely time out packet receiving * if the remaining time in pTimer is too short. */ pTimer = &packetTimer; /* 2. read the remaining length. This is variable in itself */ rc = _aws_iot_mqtt_internal_decode_packet_remaining_len(pClient, &rem_len, pTimer); if(AWS_SUCCESS != rc) { return rc; } /* if the buffer is too short then the message will be dropped silently */ if(rem_len >= pClient->clientData.readBufSize) { bytes_to_be_read = pClient->clientData.readBufSize; do { rc = pClient->networkStack.read(&(pClient->networkStack), pClient->clientData.readBuf, bytes_to_be_read, pTimer, &read_len); if(AWS_SUCCESS == rc) { total_bytes_read += read_len; if((rem_len - total_bytes_read) >= pClient->clientData.readBufSize) { bytes_to_be_read = pClient->clientData.readBufSize; } else { bytes_to_be_read = rem_len - total_bytes_read; } } } while(total_bytes_read < rem_len && AWS_SUCCESS == rc); return MQTT_RX_BUFFER_TOO_SHORT_ERROR; } /* put the original remaining length into the read buffer */ len += aws_iot_mqtt_internal_write_len_to_buffer(pClient->clientData.readBuf + 1, (uint32_t) rem_len); /* 3. read the rest of the buffer using a callback to supply the rest of the data */ if(rem_len > 0) { rc = pClient->networkStack.read(&(pClient->networkStack), pClient->clientData.readBuf + len, rem_len, pTimer, &read_len); if(AWS_SUCCESS != rc || read_len != rem_len) { return FAILURE; } } header.byte = pClient->clientData.readBuf[0]; *pPacketType = MQTT_HEADER_FIELD_TYPE(header.byte); FUNC_EXIT_RC(rc); } // assume topic filter and name is in correct format // # can only be at end // + and # can only be next to separator static bool _aws_iot_mqtt_internal_is_topic_matched(char *pTopicFilter, char *pTopicName, uint16_t topicNameLen) { char *curf, *curn, *curn_end; if(NULL == pTopicFilter || NULL == pTopicName) { return false; } curf = pTopicFilter; curn = pTopicName; curn_end = curn + topicNameLen; while(*curf && (curn < curn_end)) { if(*curn == '/' && *curf != '/') { break; } if(*curf != '+' && *curf != '#' && *curf != *curn) { break; } if(*curf == '+') { /* skip until we meet the next separator, or end of string */ char *nextpos = curn + 1; while(nextpos < curn_end && *nextpos != '/') nextpos = ++curn + 1; } else if(*curf == '#') { /* skip until end of string */ curn = curn_end - 1; } curf++; curn++; }; return (curn == curn_end) && (*curf == '\0'); } static IoT_Error_t _aws_iot_mqtt_internal_deliver_message(AWS_IoT_Client *pClient, char *pTopicName, uint16_t topicNameLen, IoT_Publish_Message_Params *pMessageParams) { uint32_t itr; IoT_Error_t rc; ClientState clientState; FUNC_ENTRY; if(NULL == pTopicName) { FUNC_EXIT_RC(NULL_VALUE_ERROR); } /* This function can be called from all MQTT APIs * But while callback return is in progress, Yield should not be called. * The state for CB_RETURN accomplishes that, as yield cannot be called while in that state */ clientState = aws_iot_mqtt_get_client_state(pClient); aws_iot_mqtt_set_client_state(pClient, clientState, CLIENT_STATE_CONNECTED_WAIT_FOR_CB_RETURN); /* Find the right message handler - indexed by topic */ for(itr = 0; itr < AWS_IOT_MQTT_NUM_SUBSCRIBE_HANDLERS; ++itr) { if(NULL != pClient->clientData.messageHandlers[itr].topicName) { if(((topicNameLen == pClient->clientData.messageHandlers[itr].topicNameLen) && (strncmp(pTopicName, (char *) pClient->clientData.messageHandlers[itr].topicName, topicNameLen) == 0)) || _aws_iot_mqtt_internal_is_topic_matched((char *) pClient->clientData.messageHandlers[itr].topicName, pTopicName, topicNameLen)) { if(NULL != pClient->clientData.messageHandlers[itr].pApplicationHandler) { pClient->clientData.messageHandlers[itr].pApplicationHandler(pClient, pTopicName, topicNameLen, pMessageParams, pClient->clientData.messageHandlers[itr].pApplicationHandlerData); } } } } rc = aws_iot_mqtt_set_client_state(pClient, CLIENT_STATE_CONNECTED_WAIT_FOR_CB_RETURN, clientState); FUNC_EXIT_RC(rc); } static IoT_Error_t _aws_iot_mqtt_internal_handle_publish(AWS_IoT_Client *pClient, awsTimer *pTimer) { char *topicName; uint16_t topicNameLen; uint32_t len; IoT_Error_t rc; IoT_Publish_Message_Params msg; FUNC_ENTRY; topicName = NULL; topicNameLen = 0; len = 0; rc = aws_iot_mqtt_internal_deserialize_publish(&msg.isDup, &msg.qos, &msg.isRetained, &msg.id, &topicName, &topicNameLen, (unsigned char **) &msg.payload, &msg.payloadLen, pClient->clientData.readBuf, pClient->clientData.readBufSize); if(AWS_SUCCESS != rc) { FUNC_EXIT_RC(rc); } rc = _aws_iot_mqtt_internal_deliver_message(pClient, topicName, topicNameLen, &msg); if(AWS_SUCCESS != rc) { FUNC_EXIT_RC(rc); } if(QOS0 == msg.qos) { /* No further processing required for QoS0 */ FUNC_EXIT_RC(AWS_SUCCESS); } /* Message assumed to be QoS1 since we do not support QoS2 at this time */ rc = aws_iot_mqtt_internal_serialize_ack(pClient->clientData.writeBuf, pClient->clientData.writeBufSize, PUBACK, 0, msg.id, &len); if(AWS_SUCCESS != rc) { FUNC_EXIT_RC(rc); } rc = aws_iot_mqtt_internal_send_packet(pClient, len, pTimer); if(AWS_SUCCESS != rc) { FUNC_EXIT_RC(rc); } FUNC_EXIT_RC(AWS_SUCCESS); } IoT_Error_t aws_iot_mqtt_internal_cycle_read(AWS_IoT_Client *pClient, awsTimer *pTimer, uint8_t *pPacketType) { IoT_Error_t rc; //printf("JMF! in aws_iot_mqtt_internal_cycle_read\n"); #ifdef _ENABLE_THREAD_SUPPORT_ IoT_Error_t threadRc; #endif if(NULL == pClient || NULL == pTimer) { return NULL_VALUE_ERROR; } #ifdef _ENABLE_THREAD_SUPPORT_ threadRc = aws_iot_mqtt_client_lock_mutex(pClient, &(pClient->clientData.tls_read_mutex)); if(AWS_SUCCESS != threadRc) { FUNC_EXIT_RC(threadRc); } #endif /* read the socket, see what work is due */ rc = _aws_iot_mqtt_internal_read_packet(pClient, pTimer, pPacketType); //printf("JMF:%s:%d rc=%d\n",__FILE__,__LINE__,rc); #ifdef _ENABLE_THREAD_SUPPORT_ threadRc = aws_iot_mqtt_client_unlock_mutex(pClient, &(pClient->clientData.tls_read_mutex)); if(AWS_SUCCESS != threadRc && (MQTT_NOTHING_TO_READ == rc || AWS_SUCCESS == rc)) { return threadRc; } #endif if(MQTT_NOTHING_TO_READ == rc) { /* Nothing to read, not a cycle failure */ //printf("JMF:%s:%d rc=%d\n",__FILE__,__LINE__,rc); return AWS_SUCCESS; } else if(AWS_SUCCESS != rc) { //printf("JMF:%s:%d rc=%d\n",__FILE__,__LINE__,rc); return rc; } //printf("JMF: cycle_read got a char, switch on %d\n",*pPacketType); switch(*pPacketType) { case CONNACK: case PUBACK: case SUBACK: case UNSUBACK: /* SDK is blocking, these responses will be forwarded to calling function to process */ break; case PUBLISH: { rc = _aws_iot_mqtt_internal_handle_publish(pClient, pTimer); break; } case PUBREC: case PUBCOMP: /* QoS2 not supported at this time */ break; case PINGRESP: { pClient->clientStatus.isPingOutstanding = 0; countdown_sec(&pClient->pingTimer, pClient->clientData.keepAliveInterval); break; } default: { /* Either unknown packet type or Failure occurred * Should not happen */ rc = MQTT_RX_MESSAGE_PACKET_TYPE_INVALID_ERROR; break; } } //printf("JMF:%s:%d EXITING rc=%d\n",__FILE__,__LINE__,rc); return rc; } /* only used in single-threaded mode where one command at a time is in process */ IoT_Error_t aws_iot_mqtt_internal_wait_for_read(AWS_IoT_Client *pClient, uint8_t packetType, awsTimer *pTimer) { IoT_Error_t rc; uint8_t read_packet_type; FUNC_ENTRY; if(NULL == pClient || NULL == pTimer) { FUNC_EXIT_RC(NULL_VALUE_ERROR); } read_packet_type = 0; do { if(has_timer_expired(pTimer)) { /* we timed out */ rc = MQTT_REQUEST_TIMEOUT_ERROR; break; } rc = aws_iot_mqtt_internal_cycle_read(pClient, pTimer, &read_packet_type); } while(((AWS_SUCCESS == rc) || (MQTT_NOTHING_TO_READ == rc)) && (read_packet_type != packetType)); /* If rc is AWS_SUCCESS, we have received the expected * MQTT packet. Otherwise rc tells the error. */ FUNC_EXIT_RC(rc); } /** * Serializes a 0-length packet into the supplied buffer, ready for writing to a socket * @param buf the buffer into which the packet will be serialized * @param buflen the length in bytes of the supplied buffer, to avoid overruns * @param packettype the message type * @param serialized length * @return IoT_Error_t indicating function execution status */ IoT_Error_t aws_iot_mqtt_internal_serialize_zero(unsigned char *pTxBuf, size_t txBufLen, MessageTypes packetType, size_t *pSerializedLength) { unsigned char *ptr; IoT_Error_t rc; MQTTHeader header = {0}; FUNC_ENTRY; if(NULL == pTxBuf || NULL == pSerializedLength) { FUNC_EXIT_RC(NULL_VALUE_ERROR); } /* Buffer should have at least 2 bytes for the header */ if(4 > txBufLen) { FUNC_EXIT_RC(MQTT_TX_BUFFER_TOO_SHORT_ERROR); } ptr = pTxBuf; rc = aws_iot_mqtt_internal_init_header(&header, packetType, QOS0, 0, 0); if(AWS_SUCCESS != rc) { FUNC_EXIT_RC(rc); } /* write header */ aws_iot_mqtt_internal_write_char(&ptr, header.byte); /* write remaining length */ ptr += aws_iot_mqtt_internal_write_len_to_buffer(ptr, 0); *pSerializedLength = (uint32_t) (ptr - pTxBuf); FUNC_EXIT_RC(AWS_SUCCESS); } #ifdef __cplusplus } #endif