Fork of my original MQTTGateway

Dependencies:   mbed-http

MQTTSManager/MQTTThreadedClient.cpp

Committer:
vpcola
Date:
2017-04-08
Revision:
0:a1734fe1ec4b

File content as of revision 0:a1734fe1ec4b:

#include "mbed.h"
#include "rtos.h"
#include "MQTTThreadedClient.h"
#include "mbedtls/platform.h"
#include "mbedtls/ssl.h"
#include "mbedtls/entropy.h"
#include "mbedtls/ctr_drbg.h"
#include "mbedtls/error.h"

#ifdef DEBUG
#define DBG(fmt, args...)    printf(fmt, ## args)
#else
#define DBG(fmt, args...)    /* Don't do anything in release builds */
#endif

static MemoryPool<MQTT::PubMessage, 4> mpool;
static Queue<MQTT::PubMessage, 4> mqueue;

// SSL/TLS variables
mbedtls_entropy_context _entropy;
mbedtls_ctr_drbg_context _ctr_drbg;
mbedtls_x509_crt _cacert;
mbedtls_ssl_context _ssl;
mbedtls_ssl_config _ssl_conf;    
mbedtls_ssl_session saved_session;

namespace MQTT {
    
/**
 * Receive callback for mbed TLS
 */
static int ssl_recv(void *ctx, unsigned char *buf, size_t len)
{
    int recv = -1;
    TCPSocket *socket = static_cast<TCPSocket *>(ctx);
    socket->set_timeout(DEFAULT_SOCKET_TIMEOUT);
    recv = socket->recv(buf, len);

    if (NSAPI_ERROR_WOULD_BLOCK == recv) {
        return MBEDTLS_ERR_SSL_WANT_READ;
    } else if (recv < 0) {
        return -1;
    } else {
        return recv;
    }
}

/**
 * Send callback for mbed TLS
 */
static int ssl_send(void *ctx, const unsigned char *buf, size_t len)
{
    int sent = -1;
    TCPSocket *socket = static_cast<TCPSocket *>(ctx);
    socket->set_timeout(DEFAULT_SOCKET_TIMEOUT);    
    sent = socket->send(buf, len);

    if(NSAPI_ERROR_WOULD_BLOCK == sent) {
        return MBEDTLS_ERR_SSL_WANT_WRITE;
    } else if (sent < 0) {
        return -1;
    } else {
        return sent;
    }
}

#if DEBUG_LEVEL > 0
/**
 * Debug callback for mbed TLS
 * Just prints on the USB serial port
 */
static void my_debug(void *ctx, int level, const char *file, int line,
                     const char *str)
{
    const char *p, *basename;
    (void) ctx;

    /* Extract basename from file */
    for(p = basename = file; *p != '\0'; p++) {
        if(*p == '/' || *p == '\\') {
            basename = p + 1;
        }
    }

    if (_debug) {
        mbedtls_printf("%s:%04d: |%d| %s", basename, line, level, str);
    }
}

/**
 * Certificate verification callback for mbed TLS
 * Here we only use it to display information on each cert in the chain
 */
static int my_verify(void *data, mbedtls_x509_crt *crt, int depth, uint32_t *flags)
{
    const uint32_t buf_size = 1024;
    char *buf = new char[buf_size];
    (void) data;

    if (_debug) mbedtls_printf("\nVerifying certificate at depth %d:\r\n", depth);
    mbedtls_x509_crt_info(buf, buf_size - 1, "  ", crt);
    if (_debug) mbedtls_printf("%s", buf);

    if (*flags == 0)
        if (_debug) mbedtls_printf("No verification issue for this certificate\r\n");
        else {
            mbedtls_x509_crt_verify_info(buf, buf_size, "  ! ", *flags);
            if (_debug) mbedtls_printf("%s\n", buf);
        }

    delete[] buf;
    return 0;
}
#endif


void MQTTThreadedClient::setupTLS()    
{
        if (useTLS)
        {
            mbedtls_entropy_init(&_entropy);
            mbedtls_ctr_drbg_init(&_ctr_drbg);
            mbedtls_x509_crt_init(&_cacert);
            mbedtls_ssl_init(&_ssl);
            mbedtls_ssl_config_init(&_ssl_conf);        
            memset( &saved_session, 0, sizeof( mbedtls_ssl_session ) );            
        }    
}

void MQTTThreadedClient::freeTLS()
{
        if (useTLS)
        {
            mbedtls_entropy_free(&_entropy);
            mbedtls_ctr_drbg_free(&_ctr_drbg);
            mbedtls_x509_crt_free(&_cacert);
            mbedtls_ssl_free(&_ssl);
            mbedtls_ssl_config_free(&_ssl_conf);               
        }    
}

int MQTTThreadedClient::initTLS()
{
        int ret;
        
        DBG("Initializing TLS ...\r\n");
        DBG("mbedtls_ctr_drdbg_seed ...\r\n");
        if ((ret = mbedtls_ctr_drbg_seed(&_ctr_drbg, mbedtls_entropy_func, &_entropy,
                          (const unsigned char *) DRBG_PERS,
                          sizeof (DRBG_PERS))) != 0) {
            mbedtls_printf("mbedtls_crt_drbg_init returned [%x]\r\n", ret);
            _error = ret;
            return -1;
        }
        DBG("mbedtls_x509_crt_parse ...\r\n");
        if ((ret = mbedtls_x509_crt_parse(&_cacert, (const unsigned char *) ssl_ca_pem,
                           strlen(ssl_ca_pem) + 1)) != 0) {
            mbedtls_printf("mbedtls_x509_crt_parse returned [%x]\r\n", ret);
            _error = ret;
            return -1;
        }

        DBG("mbedtls_ssl_config_defaults ...\r\n");
        if ((ret = mbedtls_ssl_config_defaults(&_ssl_conf,
                        MBEDTLS_SSL_IS_CLIENT,
                        MBEDTLS_SSL_TRANSPORT_STREAM,
                        MBEDTLS_SSL_PRESET_DEFAULT)) != 0) {
            mbedtls_printf("mbedtls_ssl_config_defaults returned [%x]\r\n", ret);
            _error = ret;
            return -1;
        }

        DBG("mbedtls_ssl_config_ca_chain ...\r\n");
        mbedtls_ssl_conf_ca_chain(&_ssl_conf, &_cacert, NULL);
        DBG("mbedtls_ssl_conf_rng ...\r\n");
        mbedtls_ssl_conf_rng(&_ssl_conf, mbedtls_ctr_drbg_random, &_ctr_drbg);

        /* It is possible to disable authentication by passing
         * MBEDTLS_SSL_VERIFY_NONE in the call to mbedtls_ssl_conf_authmode()
         */
        DBG("mbedtls_ssl_conf_authmode ...\r\n");         
        mbedtls_ssl_conf_authmode(&_ssl_conf, MBEDTLS_SSL_VERIFY_REQUIRED);

#if DEBUG_LEVEL > 0
        mbedtls_ssl_conf_verify(&_ssl_conf, my_verify, NULL);
        mbedtls_ssl_conf_dbg(&_ssl_conf, my_debug, NULL);
        mbedtls_debug_set_threshold(DEBUG_LEVEL);
#endif

        DBG("mbedtls_ssl_setup ...\r\n");         
        if ((ret = mbedtls_ssl_setup(&_ssl, &_ssl_conf)) != 0) {
            mbedtls_printf("mbedtls_ssl_setup returned [%x]\r\n", ret);
            _error = ret;
            return -1;
        }
        
        return 0;
}

int MQTTThreadedClient::doTLSHandshake()
{
        int ret;
        
        /* Start the handshake, the rest will be done in onReceive() */
        DBG("Starting the TLS handshake...\r\n");
        ret = mbedtls_ssl_handshake(&_ssl);
        if (ret < 0) 
        {
            if (ret != MBEDTLS_ERR_SSL_WANT_READ &&
                ret != MBEDTLS_ERR_SSL_WANT_WRITE) 
                    mbedtls_printf("mbedtls_ssl_handshake returned [%x]\r\n", ret);
            else 
            {
                // do not close the socket if timed out
                ret = TIMEOUT;
            }
            return ret;
        }

        /* Handshake done, time to print info */
        DBG("TLS connection to %s:%d established\r\n", 
            host.c_str(), port);

        const uint32_t buf_size = 1024;
        char *buf = new char[buf_size];
        mbedtls_x509_crt_info(buf, buf_size, "\r    ",
                        mbedtls_ssl_get_peer_cert(&_ssl));
                        
        DBG("Server certificate:\r\n%s\r", buf);
        // Verify server cert ...
        uint32_t flags = mbedtls_ssl_get_verify_result(&_ssl);
        if( flags != 0 )
        {
            mbedtls_x509_crt_verify_info(buf, buf_size, "\r  ! ", flags);
            DBG("Certificate verification failed:\r\n%s\r\r\n", buf);
            // free server cert ... before error return
            delete [] buf;
            return -1;
        }
        
        DBG("Certificate verification passed\r\n\r\n");
        // delete server cert after verification
        delete [] buf;
        
#if defined(MBEDTLS_SSL_CLI_C)        
        // TODO: Save the session here for reconnect.
        if( ( ret = mbedtls_ssl_get_session( &_ssl, &saved_session ) ) != 0 )
        {
            mbedtls_printf( "mbedtls_ssl_get_session returned -0x%x\n\n", -ret );
            hasSavedSession = false;
            return -1;
        }  
#endif        
        DBG("Session saved for reconnect ...\r\n");     
        hasSavedSession = true;
        
        return 0;
}

int MQTTThreadedClient::readBytesToBuffer(char * buffer, size_t size, int timeout)
{
    int rc;

    if (tcpSocket == NULL)
        return -1;

    if (useTLS) 
    {
        // Do SSL/TLS read
        rc = mbedtls_ssl_read(&_ssl, (unsigned char *) buffer, size);
        if (MBEDTLS_ERR_SSL_WANT_READ == rc)
            return TIMEOUT;
        else
            return rc;
    } else {
        // non-blocking socket ...
        tcpSocket->set_timeout(timeout);
        rc = tcpSocket->recv( (void *) buffer, size);

        // return 0 bytes if timeout ...
        if (NSAPI_ERROR_WOULD_BLOCK == rc)
            return TIMEOUT;
        else
            return rc; // return the number of bytes received or error
    }
}

int MQTTThreadedClient::sendBytesFromBuffer(char * buffer, size_t size, int timeout)
{
    int rc;
    
    if (tcpSocket == NULL)
        return -1;
    
    if (useTLS) {
        // Do SSL/TLS write
        rc =  mbedtls_ssl_write(&_ssl, (const unsigned char *) buffer, size);
        if (MBEDTLS_ERR_SSL_WANT_WRITE == rc)
            return TIMEOUT;
        else
            return rc;
    } else {

        // set the write timeout
        tcpSocket->set_timeout(timeout);
        rc = tcpSocket->send(buffer, size);

        if ( NSAPI_ERROR_WOULD_BLOCK == rc)
            return TIMEOUT;
        else
            return rc;
    }
}

int MQTTThreadedClient::readPacketLength(int* value)
{
    int rc = MQTTPACKET_READ_ERROR;
    unsigned char c;
    int multiplier = 1;
    int len = 0;
    const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;

    *value = 0;
    do
    {
        if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
        {
            rc = MQTTPACKET_READ_ERROR; /* bad data */
            goto exit;
        }
        
        rc = readBytesToBuffer((char *) &c, 1, DEFAULT_SOCKET_TIMEOUT);
        if (rc != 1)
        {
            rc = MQTTPACKET_READ_ERROR;
            goto exit;
        }
            
        *value += (c & 127) * multiplier;
        multiplier *= 128;
    } while ((c & 128) != 0);
    
    rc = MQTTPACKET_READ_COMPLETE;
        
exit:
    if (rc == MQTTPACKET_READ_ERROR )
        len = -1;
    
    return len;
}

int MQTTThreadedClient::sendPacket(size_t length)
{
    int rc = FAILURE;
    int sent = 0;

    while (sent < length)
    {
        rc = sendBytesFromBuffer((char *) &sendbuf[sent], length - sent, DEFAULT_SOCKET_TIMEOUT);
        if (rc < 0)  // there was an error writing the data
            break;
        sent += rc;
    }
    
    if (sent == length)
        rc = SUCCESS;
    else
        rc = FAILURE;
        
    return rc;
}
/**
 * Reads the entire packet to readbuf and returns
 * the type of packet when successful, otherwise
 * a negative error code is returned.
 **/
int MQTTThreadedClient::readPacket()
{
    int rc = FAILURE;
    MQTTHeader header = {0};
    int len = 0;
    int rem_len = 0;

    /* 1. read the header byte.  This has the packet type in it */
    if ( (rc = readBytesToBuffer((char *) &readbuf[0], 1, DEFAULT_SOCKET_TIMEOUT)) != 1)
        goto exit;

    len = 1;
    /* 2. read the remaining length.  This is variable in itself */
    if ( readPacketLength(&rem_len) < 0 )
        goto exit;
        
    len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */

    if (rem_len > (MAX_MQTT_PACKET_SIZE - len))
    {
        rc = BUFFER_OVERFLOW;
        goto exit;
    }

    /* 3. read the rest of the buffer using a callback to supply the rest of the data */
    if (rem_len > 0 && (readBytesToBuffer((char *) (readbuf + len), rem_len, DEFAULT_SOCKET_TIMEOUT) != rem_len))
        goto exit;

    // Convert the header to type
    // and update rc
    header.byte = readbuf[0];
    rc = header.bits.type;
    
exit:

    return rc;    
}

/**
 * Read until a specified packet type is received, or untill the specified
 * timeout dropping packets along the way.
 **/
int MQTTThreadedClient::readUntil(int packetType, int timeout)
{
    int pType = FAILURE;
    Timer timer;
    
    timer.start();
    do {
        pType = readPacket();
        if (pType < 0)
            break;
            
        if (timer.read_ms() > timeout)
        {
            pType = FAILURE;
            break;
        }
    }while(pType != packetType);
    
    return pType;    
}


int MQTTThreadedClient::login()
{
    int rc = FAILURE;
    int len = 0;

    if (!isConnected)
    {
        DBG("Session not connected! \r\n");
        return rc;
    }
        
    // Copy the keepAliveInterval value to local
    // MQTT specifies in seconds, we have to multiply that
    // amount for our 32 bit timers which accepts ms.
    keepAliveInterval = (connect_options.keepAliveInterval * 1000);
    
    DBG("Login with: \r\n");
    DBG("\tUsername: [%s]\r\n", connect_options.username.cstring);
    DBG("\tPassword: [%s]\r\n", connect_options.password.cstring);
    
    if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &connect_options)) <= 0)
    {
        DBG("Error serializing connect packet ...\r\n");
        return rc;
    }
    if ((rc = sendPacket((size_t) len)) != SUCCESS)  // send the connect packet
    {
        DBG("Error sending the connect request packet ...\r\n");
        return rc; 
    }
    
    // Wait for the CONNACK 
    if (readUntil(CONNACK, COMMAND_TIMEOUT) == CONNACK)
    {
        unsigned char connack_rc = 255;
        bool sessionPresent = false;
        DBG("Connection acknowledgement received ... deserializing respones ...\r\n");
        if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
            rc = connack_rc;
        else
            rc = FAILURE;
    }
    else
        rc = FAILURE;

    if (rc == SUCCESS)
    {
        DBG("Connected!!! ... starting connection timers ...\r\n");
        resetConnectionTimer();
    }
    
    DBG("Returning with rc = %d\r\n", rc);
    
    return rc;    
}


void MQTTThreadedClient::disconnect()
{
    if (isConnected)
    {
        if( useTLS 
            && ( mbedtls_ssl_session_reset( &_ssl ) != 0 )
           )
        {
            DBG( "Session reset returned an error \r\n");
        }        
        
        isConnected = false;
        tcpSocket->close();      
    }
}

int MQTTThreadedClient::connect()
{
    int ret = FAILURE;

    if ((network == NULL) || (tcpSocket == NULL)
        || host.empty())
    {
        DBG("Network settings not set! \r\n");
        return ret;
    }
    
    if (useTLS) 
    {
        if( ( ret = mbedtls_ssl_session_reset( &_ssl ) ) != 0 ) {
            mbedtls_printf( " failed\n  ! mbedtls_ssl_session_reset returned -0x%x\n\n", -ret );
            return ret;
        }
#if defined(MBEDTLS_SSL_CLI_C)
        if ( hasSavedSession && (( ret = mbedtls_ssl_set_session( &_ssl, &saved_session ) ) != 0 )) {
            mbedtls_printf( " failed\n  ! mbedtls_ssl_conf_session returned %d\n\n", ret );
            return ret;
        }
#endif        
    }
        
    tcpSocket->open(network);
    if (useTLS)
    {
        DBG("mbedtls_ssl_set_hostname ...\r\n");         
        mbedtls_ssl_set_hostname(&_ssl, host.c_str());
        DBG("mbedtls_ssl_set_bio ...\r\n");         
        mbedtls_ssl_set_bio(&_ssl, static_cast<void *>(tcpSocket),
                                   ssl_send, ssl_recv, NULL );
    }
    
    if (( ret = tcpSocket->connect(host.c_str(), port)) < 0 )
    {
         DBG("Error connecting to %s:%d with %d\r\n", host.c_str(), port, ret);
         return ret;
    }else
         isConnected = true;
    
    if (useTLS) 
    {
        
        if (doTLSHandshake() < 0)
        {
            DBG("TLS Handshake failed! \r\n");
            return FAILURE;
        }else
            DBG("TLS Handshake complete!! \r\n");
    }
    
    return login();
}

void MQTTThreadedClient::setConnectionParameters(const char * chost, uint16_t cport, MQTTPacket_connectData & options)
{
    // Copy the settings for reconnection
    host = chost;
    port = cport;
    connect_options = options;    
}

int MQTTThreadedClient::publish(PubMessage& msg)
{
#if 0
    int id = queue.call(mbed::callback(this, &MQTTThreadedClient::sendPublish), topic, message);
    // TODO: handle id values when the function is called later
    if (id == 0)
        return FAILURE;
    else
        return SUCCESS;
#endif
    PubMessage *message = mpool.alloc();
    // Simple copy
    *message = msg;
    
    // Push the data to the thread
    DBG("[Thread:%d] Pushing data to consumer thread ...\r\n", Thread::gettid());
    mqueue.put(message);
    
    return SUCCESS;
}

int MQTTThreadedClient::sendPublish(PubMessage& message)
{
     MQTTString topicString = MQTTString_initializer;
     
     if (!isConnected) 
     {
        DBG("[Thread:%d] Not connected!!! ...\r\n", Thread::gettid());
        return FAILURE;
     }
        
     topicString.cstring = (char*) &message.topic[0];
     int len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, message.qos, false, message.id,
              topicString, (unsigned char*) &message.payload[0], (int) message.payloadlen);
     if (len <= 0)
     {
         DBG("[Thread:%d]Failed serializing message ...\r\n", Thread::gettid());
         return FAILURE;
     }
     
     if (sendPacket(len) == SUCCESS)
     {
         DBG("[Thread:%d]Successfully sent publish packet to server ...\r\n", Thread::gettid());
         return SUCCESS;
     }
    
    DBG("[Thread:%d]Failed to send publish packet to server ...\r\n", Thread::gettid());
    return FAILURE;
}

void MQTTThreadedClient::addTopicHandler(const char * topicstr, void (*function)(MessageData &))
{
    // Push the subscription into the map ...
    FP<void,MessageData &> fp;
    fp.attach(function);
    
    topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));    
} 

int MQTTThreadedClient::processSubscriptions()
{
    int numsubscribed = 0;
    
    if (!isConnected) 
    {
            DBG("Session not connected!!\r\n");
            return 0;
    }
    
    DBG("Processing subscribed topics ....\r\n");
    
    std::map<std::string, FP<void, MessageData &> >::iterator it;
    for(it = topicCBMap.begin(); it != topicCBMap.end(); it++) 
    {
        int rc = FAILURE;
        int len = 0;
        //TODO: We only subscribe to QoS = 0 for now
        QoS qos = QOS0;

        MQTTString topic = {(char*)it->first.c_str(), {0, 0}};
        DBG("Subscribing to topic [%s]\r\n", topic.cstring);


        len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
        if (len <= 0) {
            DBG("Error serializing subscribe packet ...\r\n");
            continue;
        }

        if ((rc = sendPacket(len)) != SUCCESS) {
            DBG("Error sending subscribe packet [%d]\r\n", rc);
            continue;
        }

        DBG("Waiting for subscription ack ...\r\n");
        // Wait for SUBACK, dropping packets read along the way ...
        if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback
            int count = 0, grantedQoS = -1;
            unsigned short mypacketid;
            if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
                rc = grantedQoS; // 0, 1, 2 or 0x80
            // For as long as we do not get 0x80 ..
            if (rc != 0x80) 
            {
                // Reset connection timers here ...
                resetConnectionTimer();
                DBG("Successfully subscribed to %s ...\r\n", it->first.c_str());
                numsubscribed++;
            } else {
                DBG("Failed to subscribe to topic %s ... (not authorized?)\r\n", it->first.c_str());
            }
        } else 
            DBG("Failed to subscribe to topic %s (ack not received) ...\r\n", it->first.c_str());
    } // end for loop
    
    return numsubscribed;    
}

bool MQTTThreadedClient::isTopicMatched(char* topicFilter, MQTTString& topicName)
{
    char* curf = topicFilter;
    char* curn = topicName.lenstring.data;
    char* curn_end = curn + topicName.lenstring.len;

    while (*curf && curn < curn_end)
    {
        if (*curn == '/' && *curf != '/')
            break;
        if (*curf != '+' && *curf != '#' && *curf != *curn)
            break;
        if (*curf == '+')
        {   // skip until we meet the next separator, or end of string
            char* nextpos = curn + 1;
            while (nextpos < curn_end && *nextpos != '/')
                nextpos = ++curn + 1;
        }
        else if (*curf == '#')
            curn = curn_end - 1;    // skip until end of string
        curf++;
        curn++;
    };

    return (curn == curn_end) && (*curf == '\0');
}

int MQTTThreadedClient::handlePublishMsg()
{
    MQTTString topicName = MQTTString_initializer;
    Message msg;
    int intQoS;
    DBG("[Thread:%d]Deserializing publish message ...\r\n", Thread::gettid());
    if (MQTTDeserialize_publish((unsigned char*)&msg.dup, 
            &intQoS, 
            (unsigned char*)&msg.retained, 
            (unsigned short*)&msg.id, 
            &topicName,
            (unsigned char**)&msg.payload, 
            (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
    {
        DBG("[Thread:%d]Error deserializing published message ...\r\n", Thread::gettid());
        return -1;
    }

    std::string topic;
    if (topicName.lenstring.len > 0)
    {
        topic = std::string((const char *) topicName.lenstring.data, (size_t) topicName.lenstring.len);
    }else
        topic = (const char *) topicName.cstring;
    
    DBG("[Thread:%d]Got message for topic [%s], QoS [%d] ...\r\n", Thread::gettid(), topic.c_str(), intQoS);
    
    msg.qos = (QoS) intQoS;

    
    // Call the handlers for each topic 
    if (topicCBMap.find(topic) != topicCBMap.end())
    {
        // Call the callback function 
        if (topicCBMap[topic].attached())
        {
            DBG("[Thread:%d]Invoking function handler for topic ...\r\n", Thread::gettid());
            MessageData md(topicName, msg);            
            topicCBMap[topic](md);
            
            return 1;
        }
    }
    
    // TODO: depending on the QoS
    // we send data to the server = PUBACK or PUBREC
    switch(intQoS)
    {
        case QOS0:
            // We send back nothing ...
            break;
        case QOS1:
            // TODO: implement
            break;
        case QOS2:
            // TODO: implement
            break;
        default:
            break;
    }
    
    return 0;
}

void MQTTThreadedClient::resetConnectionTimer()
{
    if (keepAliveInterval > 0)
    {
        comTimer.reset();
        comTimer.start();
    }
}

bool MQTTThreadedClient::hasConnectionTimedOut()
{
    if (keepAliveInterval > 0 ) {
        // Check connection timer
        if (comTimer.read_ms() > keepAliveInterval)
            return true;
        else
            return false;
    }

    return false;
}
        
void MQTTThreadedClient::sendPingRequest()
{
    int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
    if (len > 0 && (sendPacket(len) == SUCCESS)) // send the ping packet
    {
        printf("MQTT Ping request sent successfully ...\r\n");
    }
}

void MQTTThreadedClient::startListener()
{
    int pType;
    int numsubs;
    // Continuesly listens for packets and dispatch
    // message handlers ...
    if (useTLS)
    {
        initTLS();
    }
            
    while(true)
    {

        // Attempt to reconnect and login
        if ( connect() < 0 )
        {
            disconnect();
            // Wait for a few secs and reconnect ...
            Thread::wait(6000);
            continue;
        }
        
        numsubs = processSubscriptions();
        DBG("Subscribed %d topics ...\r\n", numsubs);
         
        // loop read    
        while(true) 
        {
            pType = readPacket();
            switch(pType) 
            {
                case TIMEOUT:
                    // No data available from the network ...
                    break;
                case FAILURE:
                    {
                        DBG("readPacket returned failure \r\n");
                        goto reconnect;
                    }
                case BUFFER_OVERFLOW: 
                    {
                        // TODO: Network error, do we disconnect and reconnect?
                        DBG("[Thread:%d]Failure or buffer overflow problem ... \r\n", Thread::gettid());
                        MBED_ASSERT(false);
                    }
                    break;
                /**
                *  The rest of the return codes below (all positive) is about MQTT
                 * response codes
                 **/
                case CONNACK:
                case PUBACK:
                case SUBACK:
                    break;
                case PUBLISH: 
                    {
                        DBG("[Thread:%d]Publish received!....\r\n", Thread::gettid());
                        // We receive data from the MQTT server ..
                        if (handlePublishMsg() < 0) {
                            DBG("[Thread:%d]Error handling PUBLISH message ... \r\n", Thread::gettid());
                            break;
                        }
                    }
                    break;
                case PINGRESP: 
                    {
                        printf("MQTT Got ping response ...\r\n");
                        resetConnectionTimer();
                    }
                    break;
                default:
                    DBG("[Thread:%d]Unknown/Not handled message from server pType[%d]\r\n", Thread::gettid(), pType);
            }

            // Check if its time to send a keepAlive packet
            if (hasConnectionTimedOut()) {
                // Queue the ping request so that other
                // pending operations queued above will go first
                queue.call(this, &MQTTThreadedClient::sendPingRequest);
            }

            // Check if we have messages on the message queue
            osEvent evt = mqueue.get(10);
            if (evt.status == osEventMessage) {

                DBG("[Thread:%d]Got message to publish! ... \r\n", Thread::gettid());

                // Unpack the message
                PubMessage * message = (PubMessage *)evt.value.p;

                // Send the packet, do not queue the call
                // like the ping above ..
                if ( sendPublish(*message) == SUCCESS) {
                    // Reset timers if we have been able to send successfully
                    resetConnectionTimer();
                } else {
                    // Disconnected?
                    goto reconnect;
                }

                // Free the message from mempool  after using
                mpool.free(message);
            }

            // Dispatch any queued events ...
            queue.dispatch(100);
        } // end while loop

reconnect:
        // reconnect?
        DBG("Client disconnected!! ... retrying ...\r\n");
        disconnect();
        
    };
}

void MQTTThreadedClient::stopListener()
{
    // TODO: Set a signal/flag that the running thread 
    // will check if its ok to stop ...
}

}