Fork of my MQTTGateway
Diff: MQTTSManager/MQTTThreadedClient.cpp
- Revision:
- 0:f1d3878b8dd9
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/MQTTSManager/MQTTThreadedClient.cpp Sat Apr 08 14:45:51 2017 +0000
@@ -0,0 +1,941 @@
+#include "mbed.h"
+#include "rtos.h"
+#include "MQTTThreadedClient.h"
+#include "mbedtls/platform.h"
+#include "mbedtls/ssl.h"
+#include "mbedtls/entropy.h"
+#include "mbedtls/ctr_drbg.h"
+#include "mbedtls/error.h"
+
+#ifdef DEBUG
+#define DBG(fmt, args...) printf(fmt, ## args)
+#else
+#define DBG(fmt, args...) /* Don't do anything in release builds */
+#endif
+
+static MemoryPool<MQTT::PubMessage, 4> mpool;
+static Queue<MQTT::PubMessage, 4> mqueue;
+
+// SSL/TLS variables
+mbedtls_entropy_context _entropy;
+mbedtls_ctr_drbg_context _ctr_drbg;
+mbedtls_x509_crt _cacert;
+mbedtls_ssl_context _ssl;
+mbedtls_ssl_config _ssl_conf;
+mbedtls_ssl_session saved_session;
+
+namespace MQTT {
+
+/**
+ * Receive callback for mbed TLS
+ */
+static int ssl_recv(void *ctx, unsigned char *buf, size_t len)
+{
+ int recv = -1;
+ TCPSocket *socket = static_cast<TCPSocket *>(ctx);
+ socket->set_timeout(DEFAULT_SOCKET_TIMEOUT);
+ recv = socket->recv(buf, len);
+
+ if (NSAPI_ERROR_WOULD_BLOCK == recv) {
+ return MBEDTLS_ERR_SSL_WANT_READ;
+ } else if (recv < 0) {
+ return -1;
+ } else {
+ return recv;
+ }
+}
+
+/**
+ * Send callback for mbed TLS
+ */
+static int ssl_send(void *ctx, const unsigned char *buf, size_t len)
+{
+ int sent = -1;
+ TCPSocket *socket = static_cast<TCPSocket *>(ctx);
+ socket->set_timeout(DEFAULT_SOCKET_TIMEOUT);
+ sent = socket->send(buf, len);
+
+ if(NSAPI_ERROR_WOULD_BLOCK == sent) {
+ return MBEDTLS_ERR_SSL_WANT_WRITE;
+ } else if (sent < 0) {
+ return -1;
+ } else {
+ return sent;
+ }
+}
+
+#if DEBUG_LEVEL > 0
+/**
+ * Debug callback for mbed TLS
+ * Just prints on the USB serial port
+ */
+static void my_debug(void *ctx, int level, const char *file, int line,
+ const char *str)
+{
+ const char *p, *basename;
+ (void) ctx;
+
+ /* Extract basename from file */
+ for(p = basename = file; *p != '\0'; p++) {
+ if(*p == '/' || *p == '\\') {
+ basename = p + 1;
+ }
+ }
+
+ if (_debug) {
+ mbedtls_printf("%s:%04d: |%d| %s", basename, line, level, str);
+ }
+}
+
+/**
+ * Certificate verification callback for mbed TLS
+ * Here we only use it to display information on each cert in the chain
+ */
+static int my_verify(void *data, mbedtls_x509_crt *crt, int depth, uint32_t *flags)
+{
+ const uint32_t buf_size = 1024;
+ char *buf = new char[buf_size];
+ (void) data;
+
+ if (_debug) mbedtls_printf("\nVerifying certificate at depth %d:\r\n", depth);
+ mbedtls_x509_crt_info(buf, buf_size - 1, " ", crt);
+ if (_debug) mbedtls_printf("%s", buf);
+
+ if (*flags == 0)
+ if (_debug) mbedtls_printf("No verification issue for this certificate\r\n");
+ else {
+ mbedtls_x509_crt_verify_info(buf, buf_size, " ! ", *flags);
+ if (_debug) mbedtls_printf("%s\n", buf);
+ }
+
+ delete[] buf;
+ return 0;
+}
+#endif
+
+
+void MQTTThreadedClient::setupTLS()
+{
+ if (useTLS)
+ {
+ mbedtls_entropy_init(&_entropy);
+ mbedtls_ctr_drbg_init(&_ctr_drbg);
+ mbedtls_x509_crt_init(&_cacert);
+ mbedtls_ssl_init(&_ssl);
+ mbedtls_ssl_config_init(&_ssl_conf);
+ memset( &saved_session, 0, sizeof( mbedtls_ssl_session ) );
+ }
+}
+
+void MQTTThreadedClient::freeTLS()
+{
+ if (useTLS)
+ {
+ mbedtls_entropy_free(&_entropy);
+ mbedtls_ctr_drbg_free(&_ctr_drbg);
+ mbedtls_x509_crt_free(&_cacert);
+ mbedtls_ssl_free(&_ssl);
+ mbedtls_ssl_config_free(&_ssl_conf);
+ }
+}
+
+int MQTTThreadedClient::initTLS()
+{
+ int ret;
+
+ DBG("Initializing TLS ...\r\n");
+ DBG("mbedtls_ctr_drdbg_seed ...\r\n");
+ if ((ret = mbedtls_ctr_drbg_seed(&_ctr_drbg, mbedtls_entropy_func, &_entropy,
+ (const unsigned char *) DRBG_PERS,
+ sizeof (DRBG_PERS))) != 0) {
+ mbedtls_printf("mbedtls_crt_drbg_init returned [%x]\r\n", ret);
+ _error = ret;
+ return -1;
+ }
+ DBG("mbedtls_x509_crt_parse ...\r\n");
+ if ((ret = mbedtls_x509_crt_parse(&_cacert, (const unsigned char *) ssl_ca_pem,
+ strlen(ssl_ca_pem) + 1)) != 0) {
+ mbedtls_printf("mbedtls_x509_crt_parse returned [%x]\r\n", ret);
+ _error = ret;
+ return -1;
+ }
+
+ DBG("mbedtls_ssl_config_defaults ...\r\n");
+ if ((ret = mbedtls_ssl_config_defaults(&_ssl_conf,
+ MBEDTLS_SSL_IS_CLIENT,
+ MBEDTLS_SSL_TRANSPORT_STREAM,
+ MBEDTLS_SSL_PRESET_DEFAULT)) != 0) {
+ mbedtls_printf("mbedtls_ssl_config_defaults returned [%x]\r\n", ret);
+ _error = ret;
+ return -1;
+ }
+
+ DBG("mbedtls_ssl_config_ca_chain ...\r\n");
+ mbedtls_ssl_conf_ca_chain(&_ssl_conf, &_cacert, NULL);
+ DBG("mbedtls_ssl_conf_rng ...\r\n");
+ mbedtls_ssl_conf_rng(&_ssl_conf, mbedtls_ctr_drbg_random, &_ctr_drbg);
+
+ /* It is possible to disable authentication by passing
+ * MBEDTLS_SSL_VERIFY_NONE in the call to mbedtls_ssl_conf_authmode()
+ */
+ DBG("mbedtls_ssl_conf_authmode ...\r\n");
+ mbedtls_ssl_conf_authmode(&_ssl_conf, MBEDTLS_SSL_VERIFY_REQUIRED);
+
+#if DEBUG_LEVEL > 0
+ mbedtls_ssl_conf_verify(&_ssl_conf, my_verify, NULL);
+ mbedtls_ssl_conf_dbg(&_ssl_conf, my_debug, NULL);
+ mbedtls_debug_set_threshold(DEBUG_LEVEL);
+#endif
+
+ DBG("mbedtls_ssl_setup ...\r\n");
+ if ((ret = mbedtls_ssl_setup(&_ssl, &_ssl_conf)) != 0) {
+ mbedtls_printf("mbedtls_ssl_setup returned [%x]\r\n", ret);
+ _error = ret;
+ return -1;
+ }
+
+ return 0;
+}
+
+int MQTTThreadedClient::doTLSHandshake()
+{
+ int ret;
+
+ /* Start the handshake, the rest will be done in onReceive() */
+ DBG("Starting the TLS handshake...\r\n");
+ ret = mbedtls_ssl_handshake(&_ssl);
+ if (ret < 0)
+ {
+ if (ret != MBEDTLS_ERR_SSL_WANT_READ &&
+ ret != MBEDTLS_ERR_SSL_WANT_WRITE)
+ mbedtls_printf("mbedtls_ssl_handshake returned [%x]\r\n", ret);
+ else
+ {
+ // do not close the socket if timed out
+ ret = TIMEOUT;
+ }
+ return ret;
+ }
+
+ /* Handshake done, time to print info */
+ DBG("TLS connection to %s:%d established\r\n",
+ host.c_str(), port);
+
+ const uint32_t buf_size = 1024;
+ char *buf = new char[buf_size];
+ mbedtls_x509_crt_info(buf, buf_size, "\r ",
+ mbedtls_ssl_get_peer_cert(&_ssl));
+
+ DBG("Server certificate:\r\n%s\r", buf);
+ // Verify server cert ...
+ uint32_t flags = mbedtls_ssl_get_verify_result(&_ssl);
+ if( flags != 0 )
+ {
+ mbedtls_x509_crt_verify_info(buf, buf_size, "\r ! ", flags);
+ DBG("Certificate verification failed:\r\n%s\r\r\n", buf);
+ // free server cert ... before error return
+ delete [] buf;
+ return -1;
+ }
+
+ DBG("Certificate verification passed\r\n\r\n");
+ // delete server cert after verification
+ delete [] buf;
+
+#if defined(MBEDTLS_SSL_CLI_C)
+ // TODO: Save the session here for reconnect.
+ if( ( ret = mbedtls_ssl_get_session( &_ssl, &saved_session ) ) != 0 )
+ {
+ mbedtls_printf( "mbedtls_ssl_get_session returned -0x%x\n\n", -ret );
+ hasSavedSession = false;
+ return -1;
+ }
+#endif
+ DBG("Session saved for reconnect ...\r\n");
+ hasSavedSession = true;
+
+ return 0;
+}
+
+int MQTTThreadedClient::readBytesToBuffer(char * buffer, size_t size, int timeout)
+{
+ int rc;
+
+ if (tcpSocket == NULL)
+ return -1;
+
+ if (useTLS)
+ {
+ // Do SSL/TLS read
+ rc = mbedtls_ssl_read(&_ssl, (unsigned char *) buffer, size);
+ if (MBEDTLS_ERR_SSL_WANT_READ == rc)
+ return TIMEOUT;
+ else
+ return rc;
+ } else {
+ // non-blocking socket ...
+ tcpSocket->set_timeout(timeout);
+ rc = tcpSocket->recv( (void *) buffer, size);
+
+ // return 0 bytes if timeout ...
+ if (NSAPI_ERROR_WOULD_BLOCK == rc)
+ return TIMEOUT;
+ else
+ return rc; // return the number of bytes received or error
+ }
+}
+
+int MQTTThreadedClient::sendBytesFromBuffer(char * buffer, size_t size, int timeout)
+{
+ int rc;
+
+ if (tcpSocket == NULL)
+ return -1;
+
+ if (useTLS) {
+ // Do SSL/TLS write
+ rc = mbedtls_ssl_write(&_ssl, (const unsigned char *) buffer, size);
+ if (MBEDTLS_ERR_SSL_WANT_WRITE == rc)
+ return TIMEOUT;
+ else
+ return rc;
+ } else {
+
+ // set the write timeout
+ tcpSocket->set_timeout(timeout);
+ rc = tcpSocket->send(buffer, size);
+
+ if ( NSAPI_ERROR_WOULD_BLOCK == rc)
+ return TIMEOUT;
+ else
+ return rc;
+ }
+}
+
+int MQTTThreadedClient::readPacketLength(int* value)
+{
+ int rc = MQTTPACKET_READ_ERROR;
+ unsigned char c;
+ int multiplier = 1;
+ int len = 0;
+ const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
+
+ *value = 0;
+ do
+ {
+ if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
+ {
+ rc = MQTTPACKET_READ_ERROR; /* bad data */
+ goto exit;
+ }
+
+ rc = readBytesToBuffer((char *) &c, 1, DEFAULT_SOCKET_TIMEOUT);
+ if (rc != 1)
+ {
+ rc = MQTTPACKET_READ_ERROR;
+ goto exit;
+ }
+
+ *value += (c & 127) * multiplier;
+ multiplier *= 128;
+ } while ((c & 128) != 0);
+
+ rc = MQTTPACKET_READ_COMPLETE;
+
+exit:
+ if (rc == MQTTPACKET_READ_ERROR )
+ len = -1;
+
+ return len;
+}
+
+int MQTTThreadedClient::sendPacket(size_t length)
+{
+ int rc = FAILURE;
+ int sent = 0;
+
+ while (sent < length)
+ {
+ rc = sendBytesFromBuffer((char *) &sendbuf[sent], length - sent, DEFAULT_SOCKET_TIMEOUT);
+ if (rc < 0) // there was an error writing the data
+ break;
+ sent += rc;
+ }
+
+ if (sent == length)
+ rc = SUCCESS;
+ else
+ rc = FAILURE;
+
+ return rc;
+}
+/**
+ * Reads the entire packet to readbuf and returns
+ * the type of packet when successful, otherwise
+ * a negative error code is returned.
+ **/
+int MQTTThreadedClient::readPacket()
+{
+ int rc = FAILURE;
+ MQTTHeader header = {0};
+ int len = 0;
+ int rem_len = 0;
+
+ /* 1. read the header byte. This has the packet type in it */
+ if ( (rc = readBytesToBuffer((char *) &readbuf[0], 1, DEFAULT_SOCKET_TIMEOUT)) != 1)
+ goto exit;
+
+ len = 1;
+ /* 2. read the remaining length. This is variable in itself */
+ if ( readPacketLength(&rem_len) < 0 )
+ goto exit;
+
+ len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */
+
+ if (rem_len > (MAX_MQTT_PACKET_SIZE - len))
+ {
+ rc = BUFFER_OVERFLOW;
+ goto exit;
+ }
+
+ /* 3. read the rest of the buffer using a callback to supply the rest of the data */
+ if (rem_len > 0 && (readBytesToBuffer((char *) (readbuf + len), rem_len, DEFAULT_SOCKET_TIMEOUT) != rem_len))
+ goto exit;
+
+ // Convert the header to type
+ // and update rc
+ header.byte = readbuf[0];
+ rc = header.bits.type;
+
+exit:
+
+ return rc;
+}
+
+/**
+ * Read until a specified packet type is received, or untill the specified
+ * timeout dropping packets along the way.
+ **/
+int MQTTThreadedClient::readUntil(int packetType, int timeout)
+{
+ int pType = FAILURE;
+ Timer timer;
+
+ timer.start();
+ do {
+ pType = readPacket();
+ if (pType < 0)
+ break;
+
+ if (timer.read_ms() > timeout)
+ {
+ pType = FAILURE;
+ break;
+ }
+ }while(pType != packetType);
+
+ return pType;
+}
+
+
+int MQTTThreadedClient::login()
+{
+ int rc = FAILURE;
+ int len = 0;
+
+ if (!isConnected)
+ {
+ DBG("Session not connected! \r\n");
+ return rc;
+ }
+
+ // Copy the keepAliveInterval value to local
+ // MQTT specifies in seconds, we have to multiply that
+ // amount for our 32 bit timers which accepts ms.
+ keepAliveInterval = (connect_options.keepAliveInterval * 1000);
+
+ DBG("Login with: \r\n");
+ DBG("\tUsername: [%s]\r\n", connect_options.username.cstring);
+ DBG("\tPassword: [%s]\r\n", connect_options.password.cstring);
+
+ if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &connect_options)) <= 0)
+ {
+ DBG("Error serializing connect packet ...\r\n");
+ return rc;
+ }
+ if ((rc = sendPacket((size_t) len)) != SUCCESS) // send the connect packet
+ {
+ DBG("Error sending the connect request packet ...\r\n");
+ return rc;
+ }
+
+ // Wait for the CONNACK
+ if (readUntil(CONNACK, COMMAND_TIMEOUT) == CONNACK)
+ {
+ unsigned char connack_rc = 255;
+ bool sessionPresent = false;
+ DBG("Connection acknowledgement received ... deserializing respones ...\r\n");
+ if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
+ rc = connack_rc;
+ else
+ rc = FAILURE;
+ }
+ else
+ rc = FAILURE;
+
+ if (rc == SUCCESS)
+ {
+ DBG("Connected!!! ... starting connection timers ...\r\n");
+ resetConnectionTimer();
+ }
+
+ DBG("Returning with rc = %d\r\n", rc);
+
+ return rc;
+}
+
+
+void MQTTThreadedClient::disconnect()
+{
+ if (isConnected)
+ {
+ if( useTLS
+ && ( mbedtls_ssl_session_reset( &_ssl ) != 0 )
+ )
+ {
+ DBG( "Session reset returned an error \r\n");
+ }
+
+ isConnected = false;
+ tcpSocket->close();
+ }
+}
+
+int MQTTThreadedClient::connect()
+{
+ int ret = FAILURE;
+
+ if ((network == NULL) || (tcpSocket == NULL)
+ || host.empty())
+ {
+ DBG("Network settings not set! \r\n");
+ return ret;
+ }
+
+ if (useTLS)
+ {
+ if( ( ret = mbedtls_ssl_session_reset( &_ssl ) ) != 0 ) {
+ mbedtls_printf( " failed\n ! mbedtls_ssl_session_reset returned -0x%x\n\n", -ret );
+ return ret;
+ }
+#if defined(MBEDTLS_SSL_CLI_C)
+ if ( hasSavedSession && (( ret = mbedtls_ssl_set_session( &_ssl, &saved_session ) ) != 0 )) {
+ mbedtls_printf( " failed\n ! mbedtls_ssl_conf_session returned %d\n\n", ret );
+ return ret;
+ }
+#endif
+ }
+
+ tcpSocket->open(network);
+ if (useTLS)
+ {
+ DBG("mbedtls_ssl_set_hostname ...\r\n");
+ mbedtls_ssl_set_hostname(&_ssl, host.c_str());
+ DBG("mbedtls_ssl_set_bio ...\r\n");
+ mbedtls_ssl_set_bio(&_ssl, static_cast<void *>(tcpSocket),
+ ssl_send, ssl_recv, NULL );
+ }
+
+ if (( ret = tcpSocket->connect(host.c_str(), port)) < 0 )
+ {
+ DBG("Error connecting to %s:%d with %d\r\n", host.c_str(), port, ret);
+ return ret;
+ }else
+ isConnected = true;
+
+ if (useTLS)
+ {
+
+ if (doTLSHandshake() < 0)
+ {
+ DBG("TLS Handshake failed! \r\n");
+ return FAILURE;
+ }else
+ DBG("TLS Handshake complete!! \r\n");
+ }
+
+ return login();
+}
+
+void MQTTThreadedClient::setConnectionParameters(const char * chost, uint16_t cport, MQTTPacket_connectData & options)
+{
+ // Copy the settings for reconnection
+ host = chost;
+ port = cport;
+ connect_options = options;
+}
+
+int MQTTThreadedClient::publish(PubMessage& msg)
+{
+#if 0
+ int id = queue.call(mbed::callback(this, &MQTTThreadedClient::sendPublish), topic, message);
+ // TODO: handle id values when the function is called later
+ if (id == 0)
+ return FAILURE;
+ else
+ return SUCCESS;
+#endif
+ PubMessage *message = mpool.alloc();
+ // Simple copy
+ *message = msg;
+
+ // Push the data to the thread
+ DBG("[Thread:%d] Pushing data to consumer thread ...\r\n", Thread::gettid());
+ mqueue.put(message);
+
+ return SUCCESS;
+}
+
+int MQTTThreadedClient::sendPublish(PubMessage& message)
+{
+ MQTTString topicString = MQTTString_initializer;
+
+ if (!isConnected)
+ {
+ DBG("[Thread:%d] Not connected!!! ...\r\n", Thread::gettid());
+ return FAILURE;
+ }
+
+ topicString.cstring = (char*) &message.topic[0];
+ int len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, message.qos, false, message.id,
+ topicString, (unsigned char*) &message.payload[0], (int) message.payloadlen);
+ if (len <= 0)
+ {
+ DBG("[Thread:%d]Failed serializing message ...\r\n", Thread::gettid());
+ return FAILURE;
+ }
+
+ if (sendPacket(len) == SUCCESS)
+ {
+ DBG("[Thread:%d]Successfully sent publish packet to server ...\r\n", Thread::gettid());
+ return SUCCESS;
+ }
+
+ DBG("[Thread:%d]Failed to send publish packet to server ...\r\n", Thread::gettid());
+ return FAILURE;
+}
+
+void MQTTThreadedClient::addTopicHandler(const char * topicstr, void (*function)(MessageData &))
+{
+ // Push the subscription into the map ...
+ FP<void,MessageData &> fp;
+ fp.attach(function);
+
+ topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));
+}
+
+int MQTTThreadedClient::processSubscriptions()
+{
+ int numsubscribed = 0;
+
+ if (!isConnected)
+ {
+ DBG("Session not connected!!\r\n");
+ return 0;
+ }
+
+ DBG("Processing subscribed topics ....\r\n");
+
+ std::map<std::string, FP<void, MessageData &> >::iterator it;
+ for(it = topicCBMap.begin(); it != topicCBMap.end(); it++)
+ {
+ int rc = FAILURE;
+ int len = 0;
+ //TODO: We only subscribe to QoS = 0 for now
+ QoS qos = QOS0;
+
+ MQTTString topic = {(char*)it->first.c_str(), {0, 0}};
+ DBG("Subscribing to topic [%s]\r\n", topic.cstring);
+
+
+ len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
+ if (len <= 0) {
+ DBG("Error serializing subscribe packet ...\r\n");
+ continue;
+ }
+
+ if ((rc = sendPacket(len)) != SUCCESS) {
+ DBG("Error sending subscribe packet [%d]\r\n", rc);
+ continue;
+ }
+
+ DBG("Waiting for subscription ack ...\r\n");
+ // Wait for SUBACK, dropping packets read along the way ...
+ if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback
+ int count = 0, grantedQoS = -1;
+ unsigned short mypacketid;
+ if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
+ rc = grantedQoS; // 0, 1, 2 or 0x80
+ // For as long as we do not get 0x80 ..
+ if (rc != 0x80)
+ {
+ // Reset connection timers here ...
+ resetConnectionTimer();
+ DBG("Successfully subscribed to %s ...\r\n", it->first.c_str());
+ numsubscribed++;
+ } else {
+ DBG("Failed to subscribe to topic %s ... (not authorized?)\r\n", it->first.c_str());
+ }
+ } else
+ DBG("Failed to subscribe to topic %s (ack not received) ...\r\n", it->first.c_str());
+ } // end for loop
+
+ return numsubscribed;
+}
+
+bool MQTTThreadedClient::isTopicMatched(char* topicFilter, MQTTString& topicName)
+{
+ char* curf = topicFilter;
+ char* curn = topicName.lenstring.data;
+ char* curn_end = curn + topicName.lenstring.len;
+
+ while (*curf && curn < curn_end)
+ {
+ if (*curn == '/' && *curf != '/')
+ break;
+ if (*curf != '+' && *curf != '#' && *curf != *curn)
+ break;
+ if (*curf == '+')
+ { // skip until we meet the next separator, or end of string
+ char* nextpos = curn + 1;
+ while (nextpos < curn_end && *nextpos != '/')
+ nextpos = ++curn + 1;
+ }
+ else if (*curf == '#')
+ curn = curn_end - 1; // skip until end of string
+ curf++;
+ curn++;
+ };
+
+ return (curn == curn_end) && (*curf == '\0');
+}
+
+int MQTTThreadedClient::handlePublishMsg()
+{
+ MQTTString topicName = MQTTString_initializer;
+ Message msg;
+ int intQoS;
+ DBG("[Thread:%d]Deserializing publish message ...\r\n", Thread::gettid());
+ if (MQTTDeserialize_publish((unsigned char*)&msg.dup,
+ &intQoS,
+ (unsigned char*)&msg.retained,
+ (unsigned short*)&msg.id,
+ &topicName,
+ (unsigned char**)&msg.payload,
+ (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
+ {
+ DBG("[Thread:%d]Error deserializing published message ...\r\n", Thread::gettid());
+ return -1;
+ }
+
+ std::string topic;
+ if (topicName.lenstring.len > 0)
+ {
+ topic = std::string((const char *) topicName.lenstring.data, (size_t) topicName.lenstring.len);
+ }else
+ topic = (const char *) topicName.cstring;
+
+ DBG("[Thread:%d]Got message for topic [%s], QoS [%d] ...\r\n", Thread::gettid(), topic.c_str(), intQoS);
+
+ msg.qos = (QoS) intQoS;
+
+
+ // Call the handlers for each topic
+ if (topicCBMap.find(topic) != topicCBMap.end())
+ {
+ // Call the callback function
+ if (topicCBMap[topic].attached())
+ {
+ DBG("[Thread:%d]Invoking function handler for topic ...\r\n", Thread::gettid());
+ MessageData md(topicName, msg);
+ topicCBMap[topic](md);
+
+ return 1;
+ }
+ }
+
+ // TODO: depending on the QoS
+ // we send data to the server = PUBACK or PUBREC
+ switch(intQoS)
+ {
+ case QOS0:
+ // We send back nothing ...
+ break;
+ case QOS1:
+ // TODO: implement
+ break;
+ case QOS2:
+ // TODO: implement
+ break;
+ default:
+ break;
+ }
+
+ return 0;
+}
+
+void MQTTThreadedClient::resetConnectionTimer()
+{
+ if (keepAliveInterval > 0)
+ {
+ comTimer.reset();
+ comTimer.start();
+ }
+}
+
+bool MQTTThreadedClient::hasConnectionTimedOut()
+{
+ if (keepAliveInterval > 0 ) {
+ // Check connection timer
+ if (comTimer.read_ms() > keepAliveInterval)
+ return true;
+ else
+ return false;
+ }
+
+ return false;
+}
+
+void MQTTThreadedClient::sendPingRequest()
+{
+ int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
+ if (len > 0 && (sendPacket(len) == SUCCESS)) // send the ping packet
+ {
+ printf("MQTT Ping request sent successfully ...\r\n");
+ }
+}
+
+void MQTTThreadedClient::startListener()
+{
+ int pType;
+ int numsubs;
+ // Continuesly listens for packets and dispatch
+ // message handlers ...
+ if (useTLS)
+ {
+ initTLS();
+ }
+
+ while(true)
+ {
+
+ // Attempt to reconnect and login
+ if ( connect() < 0 )
+ {
+ disconnect();
+ // Wait for a few secs and reconnect ...
+ Thread::wait(6000);
+ continue;
+ }
+
+ numsubs = processSubscriptions();
+ DBG("Subscribed %d topics ...\r\n", numsubs);
+
+ // loop read
+ while(true)
+ {
+ pType = readPacket();
+ switch(pType)
+ {
+ case TIMEOUT:
+ // No data available from the network ...
+ break;
+ case FAILURE:
+ {
+ DBG("readPacket returned failure \r\n");
+ goto reconnect;
+ }
+ case BUFFER_OVERFLOW:
+ {
+ // TODO: Network error, do we disconnect and reconnect?
+ DBG("[Thread:%d]Failure or buffer overflow problem ... \r\n", Thread::gettid());
+ MBED_ASSERT(false);
+ }
+ break;
+ /**
+ * The rest of the return codes below (all positive) is about MQTT
+ * response codes
+ **/
+ case CONNACK:
+ case PUBACK:
+ case SUBACK:
+ break;
+ case PUBLISH:
+ {
+ DBG("[Thread:%d]Publish received!....\r\n", Thread::gettid());
+ // We receive data from the MQTT server ..
+ if (handlePublishMsg() < 0) {
+ DBG("[Thread:%d]Error handling PUBLISH message ... \r\n", Thread::gettid());
+ break;
+ }
+ }
+ break;
+ case PINGRESP:
+ {
+ printf("MQTT Got ping response ...\r\n");
+ resetConnectionTimer();
+ }
+ break;
+ default:
+ DBG("[Thread:%d]Unknown/Not handled message from server pType[%d]\r\n", Thread::gettid(), pType);
+ }
+
+ // Check if its time to send a keepAlive packet
+ if (hasConnectionTimedOut()) {
+ // Queue the ping request so that other
+ // pending operations queued above will go first
+ queue.call(this, &MQTTThreadedClient::sendPingRequest);
+ }
+
+ // Check if we have messages on the message queue
+ osEvent evt = mqueue.get(10);
+ if (evt.status == osEventMessage) {
+
+ DBG("[Thread:%d]Got message to publish! ... \r\n", Thread::gettid());
+
+ // Unpack the message
+ PubMessage * message = (PubMessage *)evt.value.p;
+
+ // Send the packet, do not queue the call
+ // like the ping above ..
+ if ( sendPublish(*message) == SUCCESS) {
+ // Reset timers if we have been able to send successfully
+ resetConnectionTimer();
+ } else {
+ // Disconnected?
+ goto reconnect;
+ }
+
+ // Free the message from mempool after using
+ mpool.free(message);
+ }
+
+ // Dispatch any queued events ...
+ queue.dispatch(100);
+ } // end while loop
+
+reconnect:
+ // reconnect?
+ DBG("Client disconnected!! ... retrying ...\r\n");
+ disconnect();
+
+ };
+}
+
+void MQTTThreadedClient::stopListener()
+{
+ // TODO: Set a signal/flag that the running thread
+ // will check if its ok to stop ...
+}
+
+}
\ No newline at end of file