+* Copyright 2015-2016, Inc. or its affiliates. All Rights Reserved.
+* Licensed under the Apache License, Version 2.0 (the "License").
+* You may not use this file except in compliance with the License.
+* A copy of the License is located at
+* or in the "license" file accompanying this file. This file is distributed
+* express or implied. See the License for the specific language governing
+* permissions and limitations under the License.
+// Based on Eclipse Paho.
+ * Copyright (c) 2014 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ *
+ * and the Eclipse Distribution License is available at
+ *
+ *
+ * Contributors:
+ *    Ian Craggs - initial API and implementation and/or initial documentation
+ *    Sergio R. Caprile - non-blocking packet read functions for stream transport
+ *******************************************************************************/
+ * @file aws_iot_mqtt_client_common_internal.c
+ * @brief MQTT client internal API definitions
+ */
+#ifdef __cplusplus
+extern "C" {
+#include <aws_iot_mqtt_client.h>
+#include "aws_iot_mqtt_client_common_internal.h"
+/* Max length of packet header */
+ * Encodes the message length according to the MQTT algorithm
+ * @param buf the buffer into which the encoded data is written
+ * @param length the length to be encoded
+ * @return the number of bytes written to buffer
+ */
+size_t aws_iot_mqtt_internal_write_len_to_buffer(unsigned char *buf, uint32_t length) {
+    size_t outLen = 0;
+    unsigned char encodedByte;
+    do {
+        encodedByte = (unsigned char) (length % 128);
+        length /= 128;
+        /* if there are more digits to encode, set the top bit of this digit */
+        if(length > 0) {
+            encodedByte |= 0x80;
+        }
+        buf[outLen++] = encodedByte;
+    } while(length > 0);
+    FUNC_EXIT_RC(outLen);
+ * Decodes the message length according to the MQTT algorithm
+ * @param the buffer containing the message
+ * @param value the decoded length returned
+ * @return the number of bytes read from the socket
+ */
+IoT_Error_t aws_iot_mqtt_internal_decode_remaining_length_from_buffer(unsigned char *buf, uint32_t *decodedLen,
+                                                                      uint32_t *readBytesLen) {
+    unsigned char encodedByte;
+    uint32_t multiplier, len;
+    multiplier = 1;
+    len = 0;
+    *decodedLen = 0;
+    do {
+        if(++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) {
+            /* bad data */
+        }
+        encodedByte = *buf;
+        buf++;
+        *decodedLen += (encodedByte & 127) * multiplier;
+        multiplier *= 128;
+    } while((encodedByte & 128) != 0);
+    *readBytesLen = len;
+uint32_t aws_iot_mqtt_internal_get_final_packet_length_from_remaining_length(uint32_t rem_len) {
+    rem_len += 1; /* header byte */
+    /* now remaining_length field (MQTT 3.1.1 - 2.2.3)*/
+    if(rem_len < 128) {
+        rem_len += 1;
+    } else if(rem_len < 16384) {
+        rem_len += 2;
+    } else if(rem_len < 2097152) {
+        rem_len += 3;
+    } else {
+        rem_len += 4;
+    }
+    return rem_len;
+ * Calculates uint16 packet id from two bytes read from the input buffer
+ * Checks Endianness at runtime
+ *
+ * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
+ * @return the value calculated
+ */
+uint16_t aws_iot_mqtt_internal_read_uint16_t(unsigned char **pptr) {
+    unsigned char *ptr = *pptr;
+    uint16_t len = 0;
+    uint8_t firstByte = (uint8_t) (*ptr);
+    uint8_t secondByte = (uint8_t) (*(ptr + 1));
+    len = (uint16_t) (secondByte + (256 * firstByte));
+    *pptr += 2;
+    return len;
+ * Writes an integer as 2 bytes to an output buffer.
+ * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
+ * @param anInt the integer to write
+ */
+void aws_iot_mqtt_internal_write_uint_16(unsigned char **pptr, uint16_t anInt) {
+    **pptr = (unsigned char) (anInt / 256);
+    (*pptr)++;
+    **pptr = (unsigned char) (anInt % 256);
+    (*pptr)++;
+ * Reads one character from the input buffer.
+ * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
+ * @return the character read
+ */
+unsigned char aws_iot_mqtt_internal_read_char(unsigned char **pptr) {
+    unsigned char c = **pptr;
+    (*pptr)++;
+    return c;
+ * Writes one character to an output buffer.
+ * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
+ * @param c the character to write
+ */
+void aws_iot_mqtt_internal_write_char(unsigned char **pptr, unsigned char c) {
+    **pptr = c;
+    (*pptr)++;
+void aws_iot_mqtt_internal_write_utf8_string(unsigned char **pptr, const char *string, uint16_t stringLen) {
+    /* Nothing that calls this function will have a stringLen with a size larger than 2 bytes (MQTT 3.1.1 - 1.5.3) */
+    aws_iot_mqtt_internal_write_uint_16(pptr, stringLen);
+    if(stringLen > 0) {
+        memcpy(*pptr, string, stringLen);
+        *pptr += stringLen;
+    }
+ * Initialize the MQTTHeader structure. Used to ensure that Header bits are
+ * always initialized using the proper mappings. No Endianness issues here since
+ * the individual fields are all less than a byte. Also generates no warnings since
+ * all fields are initialized using hex constants
+ */
+IoT_Error_t aws_iot_mqtt_internal_init_header(MQTTHeader *pHeader, MessageTypes message_type,
+                                              QoS qos, uint8_t dup, uint8_t retained) {
+    if(NULL == pHeader) {
+    }
+    /* Set all bits to zero */
+    pHeader->byte = 0;
+    uint8_t type = 0;
+    switch(message_type) {
+        case UNKNOWN:
+            /* Should never happen */
+            return FAILURE;
+        case CONNECT:
+            type = 0x01;
+            break;
+        case CONNACK:
+            type = 0x02;
+            break;
+        case PUBLISH:
+            type = 0x03;
+            break;
+        case PUBACK:
+            type = 0x04;
+            break;
+        case PUBREC:
+            type = 0x05;
+            break;
+        case PUBREL:
+            type = 0x06;
+            break;
+        case PUBCOMP:
+            type = 0x07;
+            break;
+        case SUBSCRIBE:
+            type = 0x08;
+            break;
+        case SUBACK:
+            type = 0x09;
+            break;
+        case UNSUBSCRIBE:
+            type = 0x0A;
+            break;
+        case UNSUBACK:
+            type = 0x0B;
+            break;
+        case PINGREQ:
+            type = 0x0C;
+            break;
+        case PINGRESP:
+            type = 0x0D;
+            break;
+        case DISCONNECT:
+            type = 0x0E;
+            break;
+        default:
+            /* Should never happen */
+    }
+    pHeader->byte = type << 4;
+    pHeader->byte |= dup << 3;
+    switch(qos) {
+        case QOS0:
+            break;
+        case QOS1:
+            pHeader->byte |= 1 << 1;
+            break;
+        default:
+            /* Using QOS0 as default */
+            break;
+    }
+    pHeader->byte |= (1 == retained) ? 0x01 : 0x00;
+IoT_Error_t aws_iot_mqtt_internal_send_packet(AWS_IoT_Client *pClient, size_t length, awsTimer *pTimer) {
+    size_t sentLen, sent;
+    IoT_Error_t rc;
+    if(NULL == pClient || NULL == pTimer) {
+    }
+    if(length >= pClient->clientData.writeBufSize) {
+    }
+    rc = aws_iot_mqtt_client_lock_mutex(pClient, &(pClient->clientData.tls_write_mutex));
+    if(AWS_SUCCESS != rc) {
+        FUNC_EXIT_RC(rc);
+    }
+    sentLen = 0;
+    sent = 0;
+    while(sent < length && !has_timer_expired(pTimer)) {
+        rc = pClient->networkStack.write(&(pClient->networkStack),
+                         &pClient->clientData.writeBuf[sent],
+                         (length - sent),
+                         pTimer,
+                         &sentLen);
+        if(AWS_SUCCESS != rc) {
+            /* there was an error writing the data */
+            break;
+        }
+        sent += sentLen;
+    }
+    rc = aws_iot_mqtt_client_unlock_mutex(pClient, &(pClient->clientData.tls_write_mutex));
+    if(AWS_SUCCESS != rc) {
+        FUNC_EXIT_RC(rc);
+    }
+    if(sent == length) {
+        /* record the fact that we have successfully sent the packet */
+        //countdown_sec(&c->pingTimer, c->clientData.keepAliveInterval);
+    }
+    FUNC_EXIT_RC(rc) 
+static IoT_Error_t _aws_iot_mqtt_internal_decode_packet_remaining_len(AWS_IoT_Client *pClient,
+                                                                      size_t *rem_len, awsTimer *pTimer) {
+    unsigned char encodedByte;
+    size_t multiplier, len;
+    IoT_Error_t rc;
+    multiplier = 1;
+    len = 0;
+    *rem_len = 0;
+    do {
+        if(++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) {
+            /* bad data */
+        }
+        rc = pClient->>networkStack), &encodedByte, 1, pTimer, &len);
+        if(AWS_SUCCESS != rc) {
+            FUNC_EXIT_RC(rc);
+        }
+        *rem_len += ((encodedByte & 127) * multiplier);
+        multiplier *= 128;
+    } while((encodedByte & 128) != 0);
+    FUNC_EXIT_RC(rc);
+static IoT_Error_t _aws_iot_mqtt_internal_read_packet(AWS_IoT_Client *pClient, awsTimer *pTimer, uint8_t *pPacketType) {
+    size_t len, rem_len, total_bytes_read, bytes_to_be_read, read_len;
+    IoT_Error_t rc;
+    MQTTHeader header = {0};
+    awsTimer packetTimer;
+    init_timer(&packetTimer);
+    countdown_ms(&packetTimer, pClient->clientData.packetTimeoutMs);
+    rem_len = 0;
+    total_bytes_read = 0;
+    bytes_to_be_read = 0;
+    read_len = 0;
+    rc = pClient->>networkStack), pClient->clientData.readBuf, 1, pTimer, &read_len);
+//printf("JMF:%s:%d read %d from\n",__FILE__,__LINE__,rc);
+    /* 1. read the header byte.  This has the packet type in it */
+        return MQTT_NOTHING_TO_READ;
+    } else if(AWS_SUCCESS != rc) {
+        return rc;
+    }
+    len = 1;
+    /* Use the constant packet receive timeout, instead of the variable (remaining) pTimer time, to
+     * determine packet receiving timeout. This is done so we don't prematurely time out packet receiving
+     * if the remaining time in pTimer is too short.
+     */
+    pTimer = &packetTimer;
+    /* 2. read the remaining length.  This is variable in itself */
+    rc = _aws_iot_mqtt_internal_decode_packet_remaining_len(pClient, &rem_len, pTimer);
+    if(AWS_SUCCESS != rc) {
+        return rc;
+    }
+    /* if the buffer is too short then the message will be dropped silently */
+    if(rem_len >= pClient->clientData.readBufSize) {
+        bytes_to_be_read = pClient->clientData.readBufSize;
+        do {
+            rc = pClient->>networkStack), pClient->clientData.readBuf, bytes_to_be_read,
+                                            pTimer, &read_len);
+            if(AWS_SUCCESS == rc) {
+                total_bytes_read += read_len;
+                if((rem_len - total_bytes_read) >= pClient->clientData.readBufSize) {
+                    bytes_to_be_read = pClient->clientData.readBufSize;
+                } else {
+                    bytes_to_be_read = rem_len - total_bytes_read;
+                }
+            }
+        } while(total_bytes_read < rem_len && AWS_SUCCESS == rc);
+    }
+    /* put the original remaining length into the read buffer */
+    len += aws_iot_mqtt_internal_write_len_to_buffer(pClient->clientData.readBuf + 1, (uint32_t) rem_len);
+    /* 3. read the rest of the buffer using a callback to supply the rest of the data */
+    if(rem_len > 0) {
+        rc = pClient->>networkStack), pClient->clientData.readBuf + len, rem_len, pTimer,
+                                        &read_len);
+        if(AWS_SUCCESS != rc || read_len != rem_len) {
+            return FAILURE;
+        }
+    }
+    header.byte = pClient->clientData.readBuf[0];
+    *pPacketType = MQTT_HEADER_FIELD_TYPE(header.byte);
+    FUNC_EXIT_RC(rc);
+// assume topic filter and name is in correct format
+// # can only be at end
+// + and # can only be next to separator
+static bool _aws_iot_mqtt_internal_is_topic_matched(char *pTopicFilter, char *pTopicName, uint16_t topicNameLen) {
+    char *curf, *curn, *curn_end;
+    if(NULL == pTopicFilter || NULL == pTopicName) {
+        return false;
+    }
+    curf = pTopicFilter;
+    curn = pTopicName;
+    curn_end = curn + topicNameLen;
+    while(*curf && (curn < curn_end)) {
+        if(*curn == '/' && *curf != '/') {
+            break;
+        }
+        if(*curf != '+' && *curf != '#' && *curf != *curn) {
+            break;
+        }
+        if(*curf == '+') {
+            /* skip until we meet the next separator, or end of string */
+            char *nextpos = curn + 1;
+            while(nextpos < curn_end && *nextpos != '/')
+                nextpos = ++curn + 1;
+        } else if(*curf == '#') {
+            /* skip until end of string */
+            curn = curn_end - 1;
+        }
+        curf++;
+        curn++;
+    };
+    return (curn == curn_end) && (*curf == '\0');
+static IoT_Error_t _aws_iot_mqtt_internal_deliver_message(AWS_IoT_Client *pClient, char *pTopicName,
+                                                          uint16_t topicNameLen,
+                                                          IoT_Publish_Message_Params *pMessageParams) {
+    uint32_t itr;
+    IoT_Error_t rc;
+    ClientState clientState;
+    if(NULL == pTopicName) {
+    }
+    /* This function can be called from all MQTT APIs
+     * But while callback return is in progress, Yield should not be called.
+     * The state for CB_RETURN accomplishes that, as yield cannot be called while in that state */
+    clientState = aws_iot_mqtt_get_client_state(pClient);
+    aws_iot_mqtt_set_client_state(pClient, clientState, CLIENT_STATE_CONNECTED_WAIT_FOR_CB_RETURN);
+    /* Find the right message handler - indexed by topic */
+    for(itr = 0; itr < AWS_IOT_MQTT_NUM_SUBSCRIBE_HANDLERS; ++itr) {
+        if(NULL != pClient->clientData.messageHandlers[itr].topicName) {
+            if(((topicNameLen == pClient->clientData.messageHandlers[itr].topicNameLen)
+                &&
+                (strncmp(pTopicName, (char *) pClient->clientData.messageHandlers[itr].topicName, topicNameLen) == 0))
+               || _aws_iot_mqtt_internal_is_topic_matched((char *) pClient->clientData.messageHandlers[itr].topicName,
+                                                          pTopicName, topicNameLen)) {
+                if(NULL != pClient->clientData.messageHandlers[itr].pApplicationHandler) {
+                    pClient->clientData.messageHandlers[itr].pApplicationHandler(pClient, pTopicName, topicNameLen,
+                                                                                 pMessageParams,
+                                                                                 pClient->clientData.messageHandlers[itr].pApplicationHandlerData);
+                }
+            }
+        }
+    }
+    rc = aws_iot_mqtt_set_client_state(pClient, CLIENT_STATE_CONNECTED_WAIT_FOR_CB_RETURN, clientState);
+    FUNC_EXIT_RC(rc);
+static IoT_Error_t _aws_iot_mqtt_internal_handle_publish(AWS_IoT_Client *pClient, awsTimer *pTimer) {
+    char *topicName;
+    uint16_t topicNameLen;
+    uint32_t len;
+    IoT_Error_t rc;
+    IoT_Publish_Message_Params msg;
+    topicName = NULL;
+    topicNameLen = 0;
+    len = 0;
+    rc = aws_iot_mqtt_internal_deserialize_publish(&msg.isDup, &msg.qos, &msg.isRetained,
+                                                   &, &topicName, &topicNameLen,
+                                                   (unsigned char **) &msg.payload, &msg.payloadLen,
+                                                   pClient->clientData.readBuf,
+                                                   pClient->clientData.readBufSize);
+    if(AWS_SUCCESS != rc) {
+        FUNC_EXIT_RC(rc);
+    }
+    rc = _aws_iot_mqtt_internal_deliver_message(pClient, topicName, topicNameLen, &msg);
+    if(AWS_SUCCESS != rc) {
+        FUNC_EXIT_RC(rc);
+    }
+    if(QOS0 == msg.qos) {
+        /* No further processing required for QoS0 */
+    }
+    /* Message assumed to be QoS1 since we do not support QoS2 at this time */
+    rc = aws_iot_mqtt_internal_serialize_ack(pClient->clientData.writeBuf, pClient->clientData.writeBufSize,
+                                             PUBACK, 0,, &len);
+    if(AWS_SUCCESS != rc) {
+        FUNC_EXIT_RC(rc);
+    }
+    rc = aws_iot_mqtt_internal_send_packet(pClient, len, pTimer);
+    if(AWS_SUCCESS != rc) {
+        FUNC_EXIT_RC(rc);
+    }
+IoT_Error_t aws_iot_mqtt_internal_cycle_read(AWS_IoT_Client *pClient, awsTimer *pTimer, uint8_t *pPacketType) {
+    IoT_Error_t rc;
+//printf("JMF! in aws_iot_mqtt_internal_cycle_read\n");
+    IoT_Error_t threadRc;
+    if(NULL == pClient || NULL == pTimer) {
+        return NULL_VALUE_ERROR;
+    }
+    threadRc = aws_iot_mqtt_client_lock_mutex(pClient, &(pClient->clientData.tls_read_mutex));
+    if(AWS_SUCCESS != threadRc) {
+        FUNC_EXIT_RC(threadRc);
+    }
+    /* read the socket, see what work is due */
+    rc = _aws_iot_mqtt_internal_read_packet(pClient, pTimer, pPacketType);
+//printf("JMF:%s:%d rc=%d\n",__FILE__,__LINE__,rc);
+    threadRc = aws_iot_mqtt_client_unlock_mutex(pClient, &(pClient->clientData.tls_read_mutex));
+    if(AWS_SUCCESS != threadRc && (MQTT_NOTHING_TO_READ == rc || AWS_SUCCESS == rc)) {
+        return threadRc;
+    }
+    if(MQTT_NOTHING_TO_READ == rc) {
+        /* Nothing to read, not a cycle failure */
+//printf("JMF:%s:%d rc=%d\n",__FILE__,__LINE__,rc);
+        return AWS_SUCCESS;
+    } else if(AWS_SUCCESS != rc) {
+//printf("JMF:%s:%d rc=%d\n",__FILE__,__LINE__,rc);
+        return rc;
+    }
+//printf("JMF: cycle_read got a char, switch on %d\n",*pPacketType);
+    switch(*pPacketType) {
+        case CONNACK:
+        case PUBACK:
+        case SUBACK:
+        case UNSUBACK:
+            /* SDK is blocking, these responses will be forwarded to calling function to process */
+            break;
+        case PUBLISH: {
+            rc = _aws_iot_mqtt_internal_handle_publish(pClient, pTimer);
+            break;
+        }
+        case PUBREC:
+        case PUBCOMP:
+            /* QoS2 not supported at this time */
+            break;
+        case PINGRESP: {
+            pClient->clientStatus.isPingOutstanding = 0;
+            countdown_sec(&pClient->pingTimer, pClient->clientData.keepAliveInterval);
+            break;
+        }
+        default: {
+            /* Either unknown packet type or Failure occurred
+             * Should not happen */
+            break;
+        }
+    }
+//printf("JMF:%s:%d EXITING rc=%d\n",__FILE__,__LINE__,rc);
+    return rc;
+/* only used in single-threaded mode where one command at a time is in process */
+IoT_Error_t aws_iot_mqtt_internal_wait_for_read(AWS_IoT_Client *pClient, uint8_t packetType, awsTimer *pTimer) {
+    IoT_Error_t rc;
+    uint8_t read_packet_type;
+    if(NULL == pClient || NULL == pTimer) {
+    }
+    read_packet_type = 0;
+    do {
+        if(has_timer_expired(pTimer)) {
+            /* we timed out */
+            break;
+        }
+        rc = aws_iot_mqtt_internal_cycle_read(pClient, pTimer, &read_packet_type);
+    } while(((AWS_SUCCESS == rc) || (MQTT_NOTHING_TO_READ == rc)) && (read_packet_type != packetType));
+    /* If rc is AWS_SUCCESS, we have received the expected
+     * MQTT packet. Otherwise rc tells the error. */
+    FUNC_EXIT_RC(rc);
+  * Serializes a 0-length packet into the supplied buffer, ready for writing to a socket
+  * @param buf the buffer into which the packet will be serialized
+  * @param buflen the length in bytes of the supplied buffer, to avoid overruns
+  * @param packettype the message type
+  * @param serialized length
+  * @return IoT_Error_t indicating function execution status
+  */
+IoT_Error_t aws_iot_mqtt_internal_serialize_zero(unsigned char *pTxBuf, size_t txBufLen, MessageTypes packetType,
+                                                 size_t *pSerializedLength) {
+    unsigned char *ptr;
+    IoT_Error_t rc;
+    MQTTHeader header = {0};
+    if(NULL == pTxBuf || NULL == pSerializedLength) {
+    }
+    /* Buffer should have at least 2 bytes for the header */
+    if(4 > txBufLen) {
+    }
+    ptr = pTxBuf;
+    rc = aws_iot_mqtt_internal_init_header(&header, packetType, QOS0, 0, 0);
+    if(AWS_SUCCESS != rc) {
+        FUNC_EXIT_RC(rc);
+    }
+    /* write header */
+    aws_iot_mqtt_internal_write_char(&ptr, header.byte);
+    /* write remaining length */
+    ptr += aws_iot_mqtt_internal_write_len_to_buffer(ptr, 0);
+    *pSerializedLength = (uint32_t) (ptr - pTxBuf);
+#ifdef __cplusplus