Fork of my original MQTTGateway

Dependencies:   mbed-http

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTThreadedClient.cpp Source File

MQTTThreadedClient.cpp

00001 #include "mbed.h"
00002 #include "rtos.h"
00003 #include "MQTTThreadedClient.h"
00004 #include "mbedtls/platform.h"
00005 #include "mbedtls/ssl.h"
00006 #include "mbedtls/entropy.h"
00007 #include "mbedtls/ctr_drbg.h"
00008 #include "mbedtls/error.h"
00009 
00010 #ifdef DEBUG
00011 #define DBG(fmt, args...)    printf(fmt, ## args)
00012 #else
00013 #define DBG(fmt, args...)    /* Don't do anything in release builds */
00014 #endif
00015 
00016 static MemoryPool<MQTT::PubMessage, 4> mpool;
00017 static Queue<MQTT::PubMessage, 4> mqueue;
00018 
00019 // SSL/TLS variables
00020 mbedtls_entropy_context _entropy;
00021 mbedtls_ctr_drbg_context _ctr_drbg;
00022 mbedtls_x509_crt _cacert;
00023 mbedtls_ssl_context _ssl;
00024 mbedtls_ssl_config _ssl_conf;    
00025 mbedtls_ssl_session saved_session;
00026 
00027 namespace MQTT {
00028     
00029 /**
00030  * Receive callback for mbed TLS
00031  */
00032 static int ssl_recv(void *ctx, unsigned char *buf, size_t len)
00033 {
00034     int recv = -1;
00035     TCPSocket *socket = static_cast<TCPSocket *>(ctx);
00036     socket->set_timeout(DEFAULT_SOCKET_TIMEOUT);
00037     recv = socket->recv(buf, len);
00038 
00039     if (NSAPI_ERROR_WOULD_BLOCK == recv) {
00040         return MBEDTLS_ERR_SSL_WANT_READ;
00041     } else if (recv < 0) {
00042         return -1;
00043     } else {
00044         return recv;
00045     }
00046 }
00047 
00048 /**
00049  * Send callback for mbed TLS
00050  */
00051 static int ssl_send(void *ctx, const unsigned char *buf, size_t len)
00052 {
00053     int sent = -1;
00054     TCPSocket *socket = static_cast<TCPSocket *>(ctx);
00055     socket->set_timeout(DEFAULT_SOCKET_TIMEOUT);    
00056     sent = socket->send(buf, len);
00057 
00058     if(NSAPI_ERROR_WOULD_BLOCK == sent) {
00059         return MBEDTLS_ERR_SSL_WANT_WRITE;
00060     } else if (sent < 0) {
00061         return -1;
00062     } else {
00063         return sent;
00064     }
00065 }
00066 
00067 #if DEBUG_LEVEL > 0
00068 /**
00069  * Debug callback for mbed TLS
00070  * Just prints on the USB serial port
00071  */
00072 static void my_debug(void *ctx, int level, const char *file, int line,
00073                      const char *str)
00074 {
00075     const char *p, *basename;
00076     (void) ctx;
00077 
00078     /* Extract basename from file */
00079     for(p = basename = file; *p != '\0'; p++) {
00080         if(*p == '/' || *p == '\\') {
00081             basename = p + 1;
00082         }
00083     }
00084 
00085     if (_debug) {
00086         mbedtls_printf("%s:%04d: |%d| %s", basename, line, level, str);
00087     }
00088 }
00089 
00090 /**
00091  * Certificate verification callback for mbed TLS
00092  * Here we only use it to display information on each cert in the chain
00093  */
00094 static int my_verify(void *data, mbedtls_x509_crt *crt, int depth, uint32_t *flags)
00095 {
00096     const uint32_t buf_size = 1024;
00097     char *buf = new char[buf_size];
00098     (void) data;
00099 
00100     if (_debug) mbedtls_printf("\nVerifying certificate at depth %d:\r\n", depth);
00101     mbedtls_x509_crt_info(buf, buf_size - 1, "  ", crt);
00102     if (_debug) mbedtls_printf("%s", buf);
00103 
00104     if (*flags == 0)
00105         if (_debug) mbedtls_printf("No verification issue for this certificate\r\n");
00106         else {
00107             mbedtls_x509_crt_verify_info(buf, buf_size, "  ! ", *flags);
00108             if (_debug) mbedtls_printf("%s\n", buf);
00109         }
00110 
00111     delete[] buf;
00112     return 0;
00113 }
00114 #endif
00115 
00116 
00117 void MQTTThreadedClient::setupTLS()    
00118 {
00119         if (useTLS)
00120         {
00121             mbedtls_entropy_init(&_entropy);
00122             mbedtls_ctr_drbg_init(&_ctr_drbg);
00123             mbedtls_x509_crt_init(&_cacert);
00124             mbedtls_ssl_init(&_ssl);
00125             mbedtls_ssl_config_init(&_ssl_conf);        
00126             memset( &saved_session, 0, sizeof( mbedtls_ssl_session ) );            
00127         }    
00128 }
00129 
00130 void MQTTThreadedClient::freeTLS()
00131 {
00132         if (useTLS)
00133         {
00134             mbedtls_entropy_free(&_entropy);
00135             mbedtls_ctr_drbg_free(&_ctr_drbg);
00136             mbedtls_x509_crt_free(&_cacert);
00137             mbedtls_ssl_free(&_ssl);
00138             mbedtls_ssl_config_free(&_ssl_conf);               
00139         }    
00140 }
00141 
00142 int MQTTThreadedClient::initTLS()
00143 {
00144         int ret;
00145         
00146         DBG("Initializing TLS ...\r\n");
00147         DBG("mbedtls_ctr_drdbg_seed ...\r\n");
00148         if ((ret = mbedtls_ctr_drbg_seed(&_ctr_drbg, mbedtls_entropy_func, &_entropy,
00149                           (const unsigned char *) DRBG_PERS,
00150                           sizeof (DRBG_PERS))) != 0) {
00151             mbedtls_printf("mbedtls_crt_drbg_init returned [%x]\r\n", ret);
00152             _error = ret;
00153             return -1;
00154         }
00155         DBG("mbedtls_x509_crt_parse ...\r\n");
00156         if ((ret = mbedtls_x509_crt_parse(&_cacert, (const unsigned char *) ssl_ca_pem,
00157                            strlen(ssl_ca_pem) + 1)) != 0) {
00158             mbedtls_printf("mbedtls_x509_crt_parse returned [%x]\r\n", ret);
00159             _error = ret;
00160             return -1;
00161         }
00162 
00163         DBG("mbedtls_ssl_config_defaults ...\r\n");
00164         if ((ret = mbedtls_ssl_config_defaults(&_ssl_conf,
00165                         MBEDTLS_SSL_IS_CLIENT,
00166                         MBEDTLS_SSL_TRANSPORT_STREAM,
00167                         MBEDTLS_SSL_PRESET_DEFAULT)) != 0) {
00168             mbedtls_printf("mbedtls_ssl_config_defaults returned [%x]\r\n", ret);
00169             _error = ret;
00170             return -1;
00171         }
00172 
00173         DBG("mbedtls_ssl_config_ca_chain ...\r\n");
00174         mbedtls_ssl_conf_ca_chain(&_ssl_conf, &_cacert, NULL);
00175         DBG("mbedtls_ssl_conf_rng ...\r\n");
00176         mbedtls_ssl_conf_rng(&_ssl_conf, mbedtls_ctr_drbg_random, &_ctr_drbg);
00177 
00178         /* It is possible to disable authentication by passing
00179          * MBEDTLS_SSL_VERIFY_NONE in the call to mbedtls_ssl_conf_authmode()
00180          */
00181         DBG("mbedtls_ssl_conf_authmode ...\r\n");         
00182         mbedtls_ssl_conf_authmode(&_ssl_conf, MBEDTLS_SSL_VERIFY_REQUIRED);
00183 
00184 #if DEBUG_LEVEL > 0
00185         mbedtls_ssl_conf_verify(&_ssl_conf, my_verify, NULL);
00186         mbedtls_ssl_conf_dbg(&_ssl_conf, my_debug, NULL);
00187         mbedtls_debug_set_threshold(DEBUG_LEVEL);
00188 #endif
00189 
00190         DBG("mbedtls_ssl_setup ...\r\n");         
00191         if ((ret = mbedtls_ssl_setup(&_ssl, &_ssl_conf)) != 0) {
00192             mbedtls_printf("mbedtls_ssl_setup returned [%x]\r\n", ret);
00193             _error = ret;
00194             return -1;
00195         }
00196         
00197         return 0;
00198 }
00199 
00200 int MQTTThreadedClient::doTLSHandshake()
00201 {
00202         int ret;
00203         
00204         /* Start the handshake, the rest will be done in onReceive() */
00205         DBG("Starting the TLS handshake...\r\n");
00206         ret = mbedtls_ssl_handshake(&_ssl);
00207         if (ret < 0) 
00208         {
00209             if (ret != MBEDTLS_ERR_SSL_WANT_READ &&
00210                 ret != MBEDTLS_ERR_SSL_WANT_WRITE) 
00211                     mbedtls_printf("mbedtls_ssl_handshake returned [%x]\r\n", ret);
00212             else 
00213             {
00214                 // do not close the socket if timed out
00215                 ret = TIMEOUT;
00216             }
00217             return ret;
00218         }
00219 
00220         /* Handshake done, time to print info */
00221         DBG("TLS connection to %s:%d established\r\n", 
00222             host.c_str(), port);
00223 
00224         const uint32_t buf_size = 1024;
00225         char *buf = new char[buf_size];
00226         mbedtls_x509_crt_info(buf, buf_size, "\r    ",
00227                         mbedtls_ssl_get_peer_cert(&_ssl));
00228                         
00229         DBG("Server certificate:\r\n%s\r", buf);
00230         // Verify server cert ...
00231         uint32_t flags = mbedtls_ssl_get_verify_result(&_ssl);
00232         if( flags != 0 )
00233         {
00234             mbedtls_x509_crt_verify_info(buf, buf_size, "\r  ! ", flags);
00235             DBG("Certificate verification failed:\r\n%s\r\r\n", buf);
00236             // free server cert ... before error return
00237             delete [] buf;
00238             return -1;
00239         }
00240         
00241         DBG("Certificate verification passed\r\n\r\n");
00242         // delete server cert after verification
00243         delete [] buf;
00244         
00245 #if defined(MBEDTLS_SSL_CLI_C)        
00246         // TODO: Save the session here for reconnect.
00247         if( ( ret = mbedtls_ssl_get_session( &_ssl, &saved_session ) ) != 0 )
00248         {
00249             mbedtls_printf( "mbedtls_ssl_get_session returned -0x%x\n\n", -ret );
00250             hasSavedSession = false;
00251             return -1;
00252         }  
00253 #endif        
00254         DBG("Session saved for reconnect ...\r\n");     
00255         hasSavedSession = true;
00256         
00257         return 0;
00258 }
00259 
00260 int MQTTThreadedClient::readBytesToBuffer(char * buffer, size_t size, int timeout)
00261 {
00262     int rc;
00263 
00264     if (tcpSocket == NULL)
00265         return -1;
00266 
00267     if (useTLS) 
00268     {
00269         // Do SSL/TLS read
00270         rc = mbedtls_ssl_read(&_ssl, (unsigned char *) buffer, size);
00271         if (MBEDTLS_ERR_SSL_WANT_READ == rc)
00272             return TIMEOUT;
00273         else
00274             return rc;
00275     } else {
00276         // non-blocking socket ...
00277         tcpSocket->set_timeout(timeout);
00278         rc = tcpSocket->recv( (void *) buffer, size);
00279 
00280         // return 0 bytes if timeout ...
00281         if (NSAPI_ERROR_WOULD_BLOCK == rc)
00282             return TIMEOUT;
00283         else
00284             return rc; // return the number of bytes received or error
00285     }
00286 }
00287 
00288 int MQTTThreadedClient::sendBytesFromBuffer(char * buffer, size_t size, int timeout)
00289 {
00290     int rc;
00291     
00292     if (tcpSocket == NULL)
00293         return -1;
00294     
00295     if (useTLS) {
00296         // Do SSL/TLS write
00297         rc =  mbedtls_ssl_write(&_ssl, (const unsigned char *) buffer, size);
00298         if (MBEDTLS_ERR_SSL_WANT_WRITE == rc)
00299             return TIMEOUT;
00300         else
00301             return rc;
00302     } else {
00303 
00304         // set the write timeout
00305         tcpSocket->set_timeout(timeout);
00306         rc = tcpSocket->send(buffer, size);
00307 
00308         if ( NSAPI_ERROR_WOULD_BLOCK == rc)
00309             return TIMEOUT;
00310         else
00311             return rc;
00312     }
00313 }
00314 
00315 int MQTTThreadedClient::readPacketLength(int* value)
00316 {
00317     int rc = MQTTPACKET_READ_ERROR;
00318     unsigned char c;
00319     int multiplier = 1;
00320     int len = 0;
00321     const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
00322 
00323     *value = 0;
00324     do
00325     {
00326         if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
00327         {
00328             rc = MQTTPACKET_READ_ERROR; /* bad data */
00329             goto exit;
00330         }
00331         
00332         rc = readBytesToBuffer((char *) &c, 1, DEFAULT_SOCKET_TIMEOUT);
00333         if (rc != 1)
00334         {
00335             rc = MQTTPACKET_READ_ERROR;
00336             goto exit;
00337         }
00338             
00339         *value += (c & 127) * multiplier;
00340         multiplier *= 128;
00341     } while ((c & 128) != 0);
00342     
00343     rc = MQTTPACKET_READ_COMPLETE;
00344         
00345 exit:
00346     if (rc == MQTTPACKET_READ_ERROR )
00347         len = -1;
00348     
00349     return len;
00350 }
00351 
00352 int MQTTThreadedClient::sendPacket(size_t length)
00353 {
00354     int rc = FAILURE;
00355     int sent = 0;
00356 
00357     while (sent < length)
00358     {
00359         rc = sendBytesFromBuffer((char *) &sendbuf[sent], length - sent, DEFAULT_SOCKET_TIMEOUT);
00360         if (rc < 0)  // there was an error writing the data
00361             break;
00362         sent += rc;
00363     }
00364     
00365     if (sent == length)
00366         rc = SUCCESS;
00367     else
00368         rc = FAILURE;
00369         
00370     return rc;
00371 }
00372 /**
00373  * Reads the entire packet to readbuf and returns
00374  * the type of packet when successful, otherwise
00375  * a negative error code is returned.
00376  **/
00377 int MQTTThreadedClient::readPacket()
00378 {
00379     int rc = FAILURE;
00380     MQTTHeader header = {0};
00381     int len = 0;
00382     int rem_len = 0;
00383 
00384     /* 1. read the header byte.  This has the packet type in it */
00385     if ( (rc = readBytesToBuffer((char *) &readbuf[0], 1, DEFAULT_SOCKET_TIMEOUT)) != 1)
00386         goto exit;
00387 
00388     len = 1;
00389     /* 2. read the remaining length.  This is variable in itself */
00390     if ( readPacketLength(&rem_len) < 0 )
00391         goto exit;
00392         
00393     len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */
00394 
00395     if (rem_len > (MAX_MQTT_PACKET_SIZE - len))
00396     {
00397         rc = BUFFER_OVERFLOW;
00398         goto exit;
00399     }
00400 
00401     /* 3. read the rest of the buffer using a callback to supply the rest of the data */
00402     if (rem_len > 0 && (readBytesToBuffer((char *) (readbuf + len), rem_len, DEFAULT_SOCKET_TIMEOUT) != rem_len))
00403         goto exit;
00404 
00405     // Convert the header to type
00406     // and update rc
00407     header.byte = readbuf[0];
00408     rc = header.bits.type;
00409     
00410 exit:
00411 
00412     return rc;    
00413 }
00414 
00415 /**
00416  * Read until a specified packet type is received, or untill the specified
00417  * timeout dropping packets along the way.
00418  **/
00419 int MQTTThreadedClient::readUntil(int packetType, int timeout)
00420 {
00421     int pType = FAILURE;
00422     Timer timer;
00423     
00424     timer.start();
00425     do {
00426         pType = readPacket();
00427         if (pType < 0)
00428             break;
00429             
00430         if (timer.read_ms() > timeout)
00431         {
00432             pType = FAILURE;
00433             break;
00434         }
00435     }while(pType != packetType);
00436     
00437     return pType;    
00438 }
00439 
00440 
00441 int MQTTThreadedClient::login()
00442 {
00443     int rc = FAILURE;
00444     int len = 0;
00445 
00446     if (!isConnected)
00447     {
00448         DBG("Session not connected! \r\n");
00449         return rc;
00450     }
00451         
00452     // Copy the keepAliveInterval value to local
00453     // MQTT specifies in seconds, we have to multiply that
00454     // amount for our 32 bit timers which accepts ms.
00455     keepAliveInterval = (connect_options.keepAliveInterval * 1000);
00456     
00457     DBG("Login with: \r\n");
00458     DBG("\tUsername: [%s]\r\n", connect_options.username.cstring);
00459     DBG("\tPassword: [%s]\r\n", connect_options.password.cstring);
00460     
00461     if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &connect_options)) <= 0)
00462     {
00463         DBG("Error serializing connect packet ...\r\n");
00464         return rc;
00465     }
00466     if ((rc = sendPacket((size_t) len)) != SUCCESS)  // send the connect packet
00467     {
00468         DBG("Error sending the connect request packet ...\r\n");
00469         return rc; 
00470     }
00471     
00472     // Wait for the CONNACK 
00473     if (readUntil(CONNACK, COMMAND_TIMEOUT) == CONNACK)
00474     {
00475         unsigned char connack_rc = 255;
00476         bool sessionPresent = false;
00477         DBG("Connection acknowledgement received ... deserializing respones ...\r\n");
00478         if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
00479             rc = connack_rc;
00480         else
00481             rc = FAILURE;
00482     }
00483     else
00484         rc = FAILURE;
00485 
00486     if (rc == SUCCESS)
00487     {
00488         DBG("Connected!!! ... starting connection timers ...\r\n");
00489         resetConnectionTimer();
00490     }
00491     
00492     DBG("Returning with rc = %d\r\n", rc);
00493     
00494     return rc;    
00495 }
00496 
00497 
00498 void MQTTThreadedClient::disconnect()
00499 {
00500     if (isConnected)
00501     {
00502         if( useTLS 
00503             && ( mbedtls_ssl_session_reset( &_ssl ) != 0 )
00504            )
00505         {
00506             DBG( "Session reset returned an error \r\n");
00507         }        
00508         
00509         isConnected = false;
00510         tcpSocket->close();      
00511     }
00512 }
00513 
00514 int MQTTThreadedClient::connect()
00515 {
00516     int ret = FAILURE;
00517 
00518     if ((network == NULL) || (tcpSocket == NULL)
00519         || host.empty())
00520     {
00521         DBG("Network settings not set! \r\n");
00522         return ret;
00523     }
00524     
00525     if (useTLS) 
00526     {
00527         if( ( ret = mbedtls_ssl_session_reset( &_ssl ) ) != 0 ) {
00528             mbedtls_printf( " failed\n  ! mbedtls_ssl_session_reset returned -0x%x\n\n", -ret );
00529             return ret;
00530         }
00531 #if defined(MBEDTLS_SSL_CLI_C)
00532         if ( hasSavedSession && (( ret = mbedtls_ssl_set_session( &_ssl, &saved_session ) ) != 0 )) {
00533             mbedtls_printf( " failed\n  ! mbedtls_ssl_conf_session returned %d\n\n", ret );
00534             return ret;
00535         }
00536 #endif        
00537     }
00538         
00539     tcpSocket->open(network);
00540     if (useTLS)
00541     {
00542         DBG("mbedtls_ssl_set_hostname ...\r\n");         
00543         mbedtls_ssl_set_hostname(&_ssl, host.c_str());
00544         DBG("mbedtls_ssl_set_bio ...\r\n");         
00545         mbedtls_ssl_set_bio(&_ssl, static_cast<void *>(tcpSocket),
00546                                    ssl_send, ssl_recv, NULL );
00547     }
00548     
00549     if (( ret = tcpSocket->connect(host.c_str(), port)) < 0 )
00550     {
00551          DBG("Error connecting to %s:%d with %d\r\n", host.c_str(), port, ret);
00552          return ret;
00553     }else
00554          isConnected = true;
00555     
00556     if (useTLS) 
00557     {
00558         
00559         if (doTLSHandshake() < 0)
00560         {
00561             DBG("TLS Handshake failed! \r\n");
00562             return FAILURE;
00563         }else
00564             DBG("TLS Handshake complete!! \r\n");
00565     }
00566     
00567     return login();
00568 }
00569 
00570 void MQTTThreadedClient::setConnectionParameters(const char * chost, uint16_t cport, MQTTPacket_connectData & options)
00571 {
00572     // Copy the settings for reconnection
00573     host = chost;
00574     port = cport;
00575     connect_options = options;    
00576 }
00577 
00578 int MQTTThreadedClient::publish(PubMessage& msg)
00579 {
00580 #if 0
00581     int id = queue.call(mbed::callback(this, &MQTTThreadedClient::sendPublish), topic, message);
00582     // TODO: handle id values when the function is called later
00583     if (id == 0)
00584         return FAILURE;
00585     else
00586         return SUCCESS;
00587 #endif
00588     PubMessage *message = mpool.alloc();
00589     // Simple copy
00590     *message = msg;
00591     
00592     // Push the data to the thread
00593     DBG("[Thread:%d] Pushing data to consumer thread ...\r\n", Thread::gettid());
00594     mqueue.put(message);
00595     
00596     return SUCCESS;
00597 }
00598 
00599 int MQTTThreadedClient::sendPublish(PubMessage& message)
00600 {
00601      MQTTString topicString = MQTTString_initializer;
00602      
00603      if (!isConnected) 
00604      {
00605         DBG("[Thread:%d] Not connected!!! ...\r\n", Thread::gettid());
00606         return FAILURE;
00607      }
00608         
00609      topicString.cstring = (char*) &message.topic[0];
00610      int len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, message.qos, false, message.id,
00611               topicString, (unsigned char*) &message.payload[0], (int) message.payloadlen);
00612      if (len <= 0)
00613      {
00614          DBG("[Thread:%d]Failed serializing message ...\r\n", Thread::gettid());
00615          return FAILURE;
00616      }
00617      
00618      if (sendPacket(len) == SUCCESS)
00619      {
00620          DBG("[Thread:%d]Successfully sent publish packet to server ...\r\n", Thread::gettid());
00621          return SUCCESS;
00622      }
00623     
00624     DBG("[Thread:%d]Failed to send publish packet to server ...\r\n", Thread::gettid());
00625     return FAILURE;
00626 }
00627 
00628 void MQTTThreadedClient::addTopicHandler(const char * topicstr, void (*function)(MessageData &))
00629 {
00630     // Push the subscription into the map ...
00631     FP<void,MessageData &> fp;
00632     fp.attach(function);
00633     
00634     topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));    
00635 } 
00636 
00637 int MQTTThreadedClient::processSubscriptions()
00638 {
00639     int numsubscribed = 0;
00640     
00641     if (!isConnected) 
00642     {
00643             DBG("Session not connected!!\r\n");
00644             return 0;
00645     }
00646     
00647     DBG("Processing subscribed topics ....\r\n");
00648     
00649     std::map<std::string, FP<void, MessageData &> >::iterator it;
00650     for(it = topicCBMap.begin(); it != topicCBMap.end(); it++) 
00651     {
00652         int rc = FAILURE;
00653         int len = 0;
00654         //TODO: We only subscribe to QoS = 0 for now
00655         QoS qos = QOS0;
00656 
00657         MQTTString topic = {(char*)it->first.c_str(), {0, 0}};
00658         DBG("Subscribing to topic [%s]\r\n", topic.cstring);
00659 
00660 
00661         len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
00662         if (len <= 0) {
00663             DBG("Error serializing subscribe packet ...\r\n");
00664             continue;
00665         }
00666 
00667         if ((rc = sendPacket(len)) != SUCCESS) {
00668             DBG("Error sending subscribe packet [%d]\r\n", rc);
00669             continue;
00670         }
00671 
00672         DBG("Waiting for subscription ack ...\r\n");
00673         // Wait for SUBACK, dropping packets read along the way ...
00674         if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback
00675             int count = 0, grantedQoS = -1;
00676             unsigned short mypacketid;
00677             if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
00678                 rc = grantedQoS; // 0, 1, 2 or 0x80
00679             // For as long as we do not get 0x80 ..
00680             if (rc != 0x80) 
00681             {
00682                 // Reset connection timers here ...
00683                 resetConnectionTimer();
00684                 DBG("Successfully subscribed to %s ...\r\n", it->first.c_str());
00685                 numsubscribed++;
00686             } else {
00687                 DBG("Failed to subscribe to topic %s ... (not authorized?)\r\n", it->first.c_str());
00688             }
00689         } else 
00690             DBG("Failed to subscribe to topic %s (ack not received) ...\r\n", it->first.c_str());
00691     } // end for loop
00692     
00693     return numsubscribed;    
00694 }
00695 
00696 bool MQTTThreadedClient::isTopicMatched(char* topicFilter, MQTTString& topicName)
00697 {
00698     char* curf = topicFilter;
00699     char* curn = topicName.lenstring.data;
00700     char* curn_end = curn + topicName.lenstring.len;
00701 
00702     while (*curf && curn < curn_end)
00703     {
00704         if (*curn == '/' && *curf != '/')
00705             break;
00706         if (*curf != '+' && *curf != '#' && *curf != *curn)
00707             break;
00708         if (*curf == '+')
00709         {   // skip until we meet the next separator, or end of string
00710             char* nextpos = curn + 1;
00711             while (nextpos < curn_end && *nextpos != '/')
00712                 nextpos = ++curn + 1;
00713         }
00714         else if (*curf == '#')
00715             curn = curn_end - 1;    // skip until end of string
00716         curf++;
00717         curn++;
00718     };
00719 
00720     return (curn == curn_end) && (*curf == '\0');
00721 }
00722 
00723 int MQTTThreadedClient::handlePublishMsg()
00724 {
00725     MQTTString topicName = MQTTString_initializer;
00726     Message msg;
00727     int intQoS;
00728     DBG("[Thread:%d]Deserializing publish message ...\r\n", Thread::gettid());
00729     if (MQTTDeserialize_publish((unsigned char*)&msg.dup, 
00730             &intQoS, 
00731             (unsigned char*)&msg.retained, 
00732             (unsigned short*)&msg.id, 
00733             &topicName,
00734             (unsigned char**)&msg.payload, 
00735             (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00736     {
00737         DBG("[Thread:%d]Error deserializing published message ...\r\n", Thread::gettid());
00738         return -1;
00739     }
00740 
00741     std::string topic;
00742     if (topicName.lenstring.len > 0)
00743     {
00744         topic = std::string((const char *) topicName.lenstring.data, (size_t) topicName.lenstring.len);
00745     }else
00746         topic = (const char *) topicName.cstring;
00747     
00748     DBG("[Thread:%d]Got message for topic [%s], QoS [%d] ...\r\n", Thread::gettid(), topic.c_str(), intQoS);
00749     
00750     msg.qos = (QoS) intQoS;
00751 
00752     
00753     // Call the handlers for each topic 
00754     if (topicCBMap.find(topic) != topicCBMap.end())
00755     {
00756         // Call the callback function 
00757         if (topicCBMap[topic].attached())
00758         {
00759             DBG("[Thread:%d]Invoking function handler for topic ...\r\n", Thread::gettid());
00760             MessageData md(topicName, msg);            
00761             topicCBMap[topic](md);
00762             
00763             return 1;
00764         }
00765     }
00766     
00767     // TODO: depending on the QoS
00768     // we send data to the server = PUBACK or PUBREC
00769     switch(intQoS)
00770     {
00771         case QOS0:
00772             // We send back nothing ...
00773             break;
00774         case QOS1:
00775             // TODO: implement
00776             break;
00777         case QOS2:
00778             // TODO: implement
00779             break;
00780         default:
00781             break;
00782     }
00783     
00784     return 0;
00785 }
00786 
00787 void MQTTThreadedClient::resetConnectionTimer()
00788 {
00789     if (keepAliveInterval > 0)
00790     {
00791         comTimer.reset();
00792         comTimer.start();
00793     }
00794 }
00795 
00796 bool MQTTThreadedClient::hasConnectionTimedOut()
00797 {
00798     if (keepAliveInterval > 0 ) {
00799         // Check connection timer
00800         if (comTimer.read_ms() > keepAliveInterval)
00801             return true;
00802         else
00803             return false;
00804     }
00805 
00806     return false;
00807 }
00808         
00809 void MQTTThreadedClient::sendPingRequest()
00810 {
00811     int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
00812     if (len > 0 && (sendPacket(len) == SUCCESS)) // send the ping packet
00813     {
00814         printf("MQTT Ping request sent successfully ...\r\n");
00815     }
00816 }
00817 
00818 void MQTTThreadedClient::startListener()
00819 {
00820     int pType;
00821     int numsubs;
00822     // Continuesly listens for packets and dispatch
00823     // message handlers ...
00824     if (useTLS)
00825     {
00826         initTLS();
00827     }
00828             
00829     while(true)
00830     {
00831 
00832         // Attempt to reconnect and login
00833         if ( connect() < 0 )
00834         {
00835             disconnect();
00836             // Wait for a few secs and reconnect ...
00837             Thread::wait(6000);
00838             continue;
00839         }
00840         
00841         numsubs = processSubscriptions();
00842         DBG("Subscribed %d topics ...\r\n", numsubs);
00843          
00844         // loop read    
00845         while(true) 
00846         {
00847             pType = readPacket();
00848             switch(pType) 
00849             {
00850                 case TIMEOUT:
00851                     // No data available from the network ...
00852                     break;
00853                 case FAILURE:
00854                     {
00855                         DBG("readPacket returned failure \r\n");
00856                         goto reconnect;
00857                     }
00858                 case BUFFER_OVERFLOW: 
00859                     {
00860                         // TODO: Network error, do we disconnect and reconnect?
00861                         DBG("[Thread:%d]Failure or buffer overflow problem ... \r\n", Thread::gettid());
00862                         MBED_ASSERT(false);
00863                     }
00864                     break;
00865                 /**
00866                 *  The rest of the return codes below (all positive) is about MQTT
00867                  * response codes
00868                  **/
00869                 case CONNACK:
00870                 case PUBACK:
00871                 case SUBACK:
00872                     break;
00873                 case PUBLISH: 
00874                     {
00875                         DBG("[Thread:%d]Publish received!....\r\n", Thread::gettid());
00876                         // We receive data from the MQTT server ..
00877                         if (handlePublishMsg() < 0) {
00878                             DBG("[Thread:%d]Error handling PUBLISH message ... \r\n", Thread::gettid());
00879                             break;
00880                         }
00881                     }
00882                     break;
00883                 case PINGRESP: 
00884                     {
00885                         printf("MQTT Got ping response ...\r\n");
00886                         resetConnectionTimer();
00887                     }
00888                     break;
00889                 default:
00890                     DBG("[Thread:%d]Unknown/Not handled message from server pType[%d]\r\n", Thread::gettid(), pType);
00891             }
00892 
00893             // Check if its time to send a keepAlive packet
00894             if (hasConnectionTimedOut()) {
00895                 // Queue the ping request so that other
00896                 // pending operations queued above will go first
00897                 queue.call(this, &MQTTThreadedClient::sendPingRequest);
00898             }
00899 
00900             // Check if we have messages on the message queue
00901             osEvent evt = mqueue.get(10);
00902             if (evt.status == osEventMessage) {
00903 
00904                 DBG("[Thread:%d]Got message to publish! ... \r\n", Thread::gettid());
00905 
00906                 // Unpack the message
00907                 PubMessage * message = (PubMessage *)evt.value.p;
00908 
00909                 // Send the packet, do not queue the call
00910                 // like the ping above ..
00911                 if ( sendPublish(*message) == SUCCESS) {
00912                     // Reset timers if we have been able to send successfully
00913                     resetConnectionTimer();
00914                 } else {
00915                     // Disconnected?
00916                     goto reconnect;
00917                 }
00918 
00919                 // Free the message from mempool  after using
00920                 mpool.free(message);
00921             }
00922 
00923             // Dispatch any queued events ...
00924             queue.dispatch(100);
00925         } // end while loop
00926 
00927 reconnect:
00928         // reconnect?
00929         DBG("Client disconnected!! ... retrying ...\r\n");
00930         disconnect();
00931         
00932     };
00933 }
00934 
00935 void MQTTThreadedClient::stopListener()
00936 {
00937     // TODO: Set a signal/flag that the running thread 
00938     // will check if its ok to stop ...
00939 }
00940 
00941 }