A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.

Dependencies:   FP MQTTPacket

Fork of HelloMQTT by MQTT

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTThreadedClient.cpp Source File

MQTTThreadedClient.cpp

00001 #include "mbed.h"
00002 #include "rtos.h"
00003 #include "MQTTThreadedClient.h"
00004 #include "mbedtls/platform.h"
00005 #include "mbedtls/ssl.h"
00006 #include "mbedtls/entropy.h"
00007 #include "mbedtls/ctr_drbg.h"
00008 #include "mbedtls/error.h"
00009 
00010 static MemoryPool<MQTT::PubMessage, 8> mpool;
00011 static Queue<MQTT::PubMessage, 8> mqueue;
00012 
00013 // SSL/TLS variables
00014 mbedtls_entropy_context _entropy;
00015 mbedtls_ctr_drbg_context _ctr_drbg;
00016 mbedtls_x509_crt _cacert;
00017 mbedtls_ssl_context _ssl;
00018 mbedtls_ssl_config _ssl_conf;    
00019 mbedtls_ssl_session saved_session;
00020 
00021 namespace MQTT {
00022     
00023 /**
00024  * Receive callback for mbed TLS
00025  */
00026 static int ssl_recv(void *ctx, unsigned char *buf, size_t len)
00027 {
00028     int recv = -1;
00029     TCPSocket *socket = static_cast<TCPSocket *>(ctx);
00030     socket->set_timeout(DEFAULT_SOCKET_TIMEOUT);
00031     recv = socket->recv(buf, len);
00032 
00033     if (NSAPI_ERROR_WOULD_BLOCK == recv) {
00034         return MBEDTLS_ERR_SSL_WANT_READ;
00035     } else if (recv < 0) {
00036         return -1;
00037     } else {
00038         return recv;
00039     }
00040 }
00041 
00042 /**
00043  * Send callback for mbed TLS
00044  */
00045 static int ssl_send(void *ctx, const unsigned char *buf, size_t len)
00046 {
00047     int sent = -1;
00048     TCPSocket *socket = static_cast<TCPSocket *>(ctx);
00049     socket->set_timeout(DEFAULT_SOCKET_TIMEOUT);    
00050     sent = socket->send(buf, len);
00051 
00052     if(NSAPI_ERROR_WOULD_BLOCK == sent) {
00053         return MBEDTLS_ERR_SSL_WANT_WRITE;
00054     } else if (sent < 0) {
00055         return -1;
00056     } else {
00057         return sent;
00058     }
00059 }
00060 
00061 #if DEBUG_LEVEL > 0
00062 /**
00063  * Debug callback for mbed TLS
00064  * Just prints on the USB serial port
00065  */
00066 static void my_debug(void *ctx, int level, const char *file, int line,
00067                      const char *str)
00068 {
00069     const char *p, *basename;
00070     (void) ctx;
00071 
00072     /* Extract basename from file */
00073     for(p = basename = file; *p != '\0'; p++) {
00074         if(*p == '/' || *p == '\\') {
00075             basename = p + 1;
00076         }
00077     }
00078 
00079     if (_debug) {
00080         mbedtls_printf("%s:%04d: |%d| %s", basename, line, level, str);
00081     }
00082 }
00083 
00084 /**
00085  * Certificate verification callback for mbed TLS
00086  * Here we only use it to display information on each cert in the chain
00087  */
00088 static int my_verify(void *data, mbedtls_x509_crt *crt, int depth, uint32_t *flags)
00089 {
00090     const uint32_t buf_size = 1024;
00091     char *buf = new char[buf_size];
00092     (void) data;
00093 
00094     if (_debug) mbedtls_printf("\nVerifying certificate at depth %d:\r\n", depth);
00095     mbedtls_x509_crt_info(buf, buf_size - 1, "  ", crt);
00096     if (_debug) mbedtls_printf("%s", buf);
00097 
00098     if (*flags == 0)
00099         if (_debug) mbedtls_printf("No verification issue for this certificate\r\n");
00100         else {
00101             mbedtls_x509_crt_verify_info(buf, buf_size, "  ! ", *flags);
00102             if (_debug) mbedtls_printf("%s\n", buf);
00103         }
00104 
00105     delete[] buf;
00106     return 0;
00107 }
00108 #endif
00109 
00110 
00111 void MQTTThreadedClient::setupTLS()    
00112 {
00113         if (useTLS)
00114         {
00115             mbedtls_entropy_init(&_entropy);
00116             mbedtls_ctr_drbg_init(&_ctr_drbg);
00117             mbedtls_x509_crt_init(&_cacert);
00118             mbedtls_ssl_init(&_ssl);
00119             mbedtls_ssl_config_init(&_ssl_conf);        
00120             memset( &saved_session, 0, sizeof( mbedtls_ssl_session ) );            
00121         }    
00122 }
00123 
00124 void MQTTThreadedClient::freeTLS()
00125 {
00126         if (useTLS)
00127         {
00128             mbedtls_entropy_free(&_entropy);
00129             mbedtls_ctr_drbg_free(&_ctr_drbg);
00130             mbedtls_x509_crt_free(&_cacert);
00131             mbedtls_ssl_free(&_ssl);
00132             mbedtls_ssl_config_free(&_ssl_conf);               
00133         }    
00134 }
00135 
00136 int MQTTThreadedClient::initTLS()
00137 {
00138         int ret;
00139         
00140         DBG("Initializing TLS ...\r\n");
00141         DBG("mbedtls_ctr_drdbg_seed ...\r\n");
00142         if ((ret = mbedtls_ctr_drbg_seed(&_ctr_drbg, mbedtls_entropy_func, &_entropy,
00143                           (const unsigned char *) DRBG_PERS,
00144                           sizeof (DRBG_PERS))) != 0) {
00145             mbedtls_printf("mbedtls_crt_drbg_init returned [%x]\r\n", ret);
00146             _error = ret;
00147             return -1;
00148         }
00149         DBG("mbedtls_x509_crt_parse ...\r\n");
00150         if ((ret = mbedtls_x509_crt_parse(&_cacert, (const unsigned char *) ssl_ca_pem,
00151                            strlen(ssl_ca_pem) + 1)) != 0) {
00152             mbedtls_printf("mbedtls_x509_crt_parse returned [%x]\r\n", ret);
00153             _error = ret;
00154             return -1;
00155         }
00156 
00157         DBG("mbedtls_ssl_config_defaults ...\r\n");
00158         if ((ret = mbedtls_ssl_config_defaults(&_ssl_conf,
00159                         MBEDTLS_SSL_IS_CLIENT,
00160                         MBEDTLS_SSL_TRANSPORT_STREAM,
00161                         MBEDTLS_SSL_PRESET_DEFAULT)) != 0) {
00162             mbedtls_printf("mbedtls_ssl_config_defaults returned [%x]\r\n", ret);
00163             _error = ret;
00164             return -1;
00165         }
00166 
00167         DBG("mbedtls_ssl_config_ca_chain ...\r\n");
00168         mbedtls_ssl_conf_ca_chain(&_ssl_conf, &_cacert, NULL);
00169         DBG("mbedtls_ssl_conf_rng ...\r\n");
00170         mbedtls_ssl_conf_rng(&_ssl_conf, mbedtls_ctr_drbg_random, &_ctr_drbg);
00171 
00172         /* It is possible to disable authentication by passing
00173          * MBEDTLS_SSL_VERIFY_NONE in the call to mbedtls_ssl_conf_authmode()
00174          */
00175         DBG("mbedtls_ssl_conf_authmode ...\r\n");         
00176         mbedtls_ssl_conf_authmode(&_ssl_conf, MBEDTLS_SSL_VERIFY_REQUIRED);
00177 
00178 #if DEBUG_LEVEL > 0
00179         mbedtls_ssl_conf_verify(&_ssl_conf, my_verify, NULL);
00180         mbedtls_ssl_conf_dbg(&_ssl_conf, my_debug, NULL);
00181         mbedtls_debug_set_threshold(DEBUG_LEVEL);
00182 #endif
00183 
00184         DBG("mbedtls_ssl_setup ...\r\n");         
00185         if ((ret = mbedtls_ssl_setup(&_ssl, &_ssl_conf)) != 0) {
00186             mbedtls_printf("mbedtls_ssl_setup returned [%x]\r\n", ret);
00187             _error = ret;
00188             return -1;
00189         }
00190         
00191         return 0;
00192 }
00193 
00194 int MQTTThreadedClient::doTLSHandshake()
00195 {
00196         int ret;
00197         
00198         /* Start the handshake, the rest will be done in onReceive() */
00199         printf("Starting the TLS handshake...\r\n");
00200         ret = mbedtls_ssl_handshake(&_ssl);
00201         if (ret < 0) 
00202         {
00203             if (ret != MBEDTLS_ERR_SSL_WANT_READ &&
00204                 ret != MBEDTLS_ERR_SSL_WANT_WRITE) 
00205                     mbedtls_printf("mbedtls_ssl_handshake returned [%x]\r\n", ret);
00206             else 
00207             {
00208                 // do not close the socket if timed out
00209                 ret = TIMEOUT;
00210             }
00211             return ret;
00212         }
00213 
00214         /* Handshake done, time to print info */
00215         printf("TLS connection to %s:%d established\r\n", 
00216             host.c_str(), port);
00217 
00218         const uint32_t buf_size = 1024;
00219         char *buf = new char[buf_size];
00220         mbedtls_x509_crt_info(buf, buf_size, "\r    ",
00221                         mbedtls_ssl_get_peer_cert(&_ssl));
00222                         
00223         printf("Server certificate:\r\n%s\r", buf);
00224         // Verify server cert ...
00225         uint32_t flags = mbedtls_ssl_get_verify_result(&_ssl);
00226         if( flags != 0 )
00227         {
00228             mbedtls_x509_crt_verify_info(buf, buf_size, "\r  ! ", flags);
00229             printf("Certificate verification failed:\r\n%s\r\r\n", buf);
00230             // free server cert ... before error return
00231             delete [] buf;
00232             return -1;
00233         }
00234         
00235         printf("Certificate verification passed\r\n\r\n");
00236         // delete server cert after verification
00237         delete [] buf;
00238         
00239 #if defined(MBEDTLS_SSL_CLI_C)        
00240                 printf("Saving SSL/TLS session ...\r\n");
00241         // TODO: Save the session here for reconnect.
00242         if( ( ret = mbedtls_ssl_get_session( &_ssl, &saved_session ) ) != 0 )
00243         {
00244             mbedtls_printf( "mbedtls_ssl_get_session returned -0x%x\n\n", -ret );
00245             hasSavedSession = false;
00246             return -1;
00247         }  
00248         printf("Session saved for reconnect ...\r\n");              
00249 #endif        
00250      
00251         hasSavedSession = true;
00252         
00253         return 0;
00254 }
00255 
00256 int MQTTThreadedClient::readBytesToBuffer(char * buffer, size_t size, int timeout)
00257 {
00258     int rc;
00259 
00260     if (tcpSocket == NULL)
00261         return -1;
00262 
00263     if (useTLS) 
00264     {
00265         // Do SSL/TLS read
00266         rc = mbedtls_ssl_read(&_ssl, (unsigned char *) buffer, size);
00267         if (MBEDTLS_ERR_SSL_WANT_READ == rc)
00268             return TIMEOUT;
00269         else
00270             return rc;
00271     } else {
00272         // non-blocking socket ...
00273         tcpSocket->set_timeout(timeout);
00274         rc = tcpSocket->recv( (void *) buffer, size);
00275 
00276         // return 0 bytes if timeout ...
00277         if (NSAPI_ERROR_WOULD_BLOCK == rc)
00278             return TIMEOUT;
00279         else
00280             return rc; // return the number of bytes received or error
00281     }
00282 }
00283 
00284 int MQTTThreadedClient::sendBytesFromBuffer(char * buffer, size_t size, int timeout)
00285 {
00286     int rc;
00287     
00288     if (tcpSocket == NULL)
00289         return -1;
00290     
00291     if (useTLS) {
00292         // Do SSL/TLS write
00293         rc =  mbedtls_ssl_write(&_ssl, (const unsigned char *) buffer, size);
00294         if (MBEDTLS_ERR_SSL_WANT_WRITE == rc)
00295             return TIMEOUT;
00296         else
00297             return rc;
00298     } else {
00299 
00300         // set the write timeout
00301         tcpSocket->set_timeout(timeout);
00302         rc = tcpSocket->send(buffer, size);
00303 
00304         if ( NSAPI_ERROR_WOULD_BLOCK == rc)
00305             return TIMEOUT;
00306         else
00307             return rc;
00308     }
00309 }
00310 
00311 int MQTTThreadedClient::readPacketLength(int* value)
00312 {
00313     int rc = MQTTPACKET_READ_ERROR;
00314     unsigned char c;
00315     int multiplier = 1;
00316     int len = 0;
00317     const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
00318 
00319     *value = 0;
00320     do
00321     {
00322         if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
00323         {
00324             rc = MQTTPACKET_READ_ERROR; /* bad data */
00325             goto exit;
00326         }
00327         
00328         rc = readBytesToBuffer((char *) &c, 1, DEFAULT_SOCKET_TIMEOUT);
00329         if (rc != 1)
00330         {
00331             rc = MQTTPACKET_READ_ERROR;
00332             goto exit;
00333         }
00334             
00335         *value += (c & 127) * multiplier;
00336         multiplier *= 128;
00337     } while ((c & 128) != 0);
00338     
00339     rc = MQTTPACKET_READ_COMPLETE;
00340         
00341 exit:
00342     if (rc == MQTTPACKET_READ_ERROR )
00343         len = -1;
00344     
00345     return len;
00346 }
00347 
00348 int MQTTThreadedClient::sendPacket(size_t length)
00349 {
00350     int rc = FAILURE;
00351     int sent = 0;
00352 
00353     while (sent < length)
00354     {
00355         rc = sendBytesFromBuffer((char *) &sendbuf[sent], length - sent, DEFAULT_SOCKET_TIMEOUT);
00356         if (rc < 0)  // there was an error writing the data
00357             break;
00358         sent += rc;
00359     }
00360     
00361     if (sent == length)
00362         rc = SUCCESS;
00363     else
00364         rc = FAILURE;
00365         
00366     return rc;
00367 }
00368 /**
00369  * Reads the entire packet to readbuf and returns
00370  * the type of packet when successful, otherwise
00371  * a negative error code is returned.
00372  **/
00373 int MQTTThreadedClient::readPacket()
00374 {
00375     int rc = FAILURE;
00376     MQTTHeader header = {0};
00377     int len = 0;
00378     int rem_len = 0;
00379 
00380     /* 1. read the header byte.  This has the packet type in it */
00381     if ( (rc = readBytesToBuffer((char *) &readbuf[0], 1, DEFAULT_SOCKET_TIMEOUT)) != 1)
00382         goto exit;
00383 
00384     len = 1;
00385     /* 2. read the remaining length.  This is variable in itself */
00386     if ( readPacketLength(&rem_len) < 0 )
00387         goto exit;
00388         
00389     len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */
00390 
00391     if (rem_len > (MAX_MQTT_PACKET_SIZE - len))
00392     {
00393         rc = BUFFER_OVERFLOW;
00394         goto exit;
00395     }
00396 
00397     /* 3. read the rest of the buffer using a callback to supply the rest of the data */
00398     if (rem_len > 0 && (readBytesToBuffer((char *) (readbuf + len), rem_len, DEFAULT_SOCKET_TIMEOUT) != rem_len))
00399         goto exit;
00400 
00401     // Convert the header to type
00402     // and update rc
00403     header.byte = readbuf[0];
00404     rc = header.bits.type;
00405     
00406 exit:
00407 
00408     return rc;    
00409 }
00410 
00411 /**
00412  * Read until a specified packet type is received, or untill the specified
00413  * timeout dropping packets along the way.
00414  **/
00415 int MQTTThreadedClient::readUntil(int packetType, int timeout)
00416 {
00417     int pType = FAILURE;
00418     Timer timer;
00419     
00420     timer.start();
00421     do {
00422         pType = readPacket();
00423         if (pType < 0)
00424             break;
00425             
00426         if (timer.read_ms() > timeout)
00427         {
00428             pType = FAILURE;
00429             break;
00430         }
00431     }while(pType != packetType);
00432     
00433     return pType;    
00434 }
00435 
00436 
00437 int MQTTThreadedClient::login()
00438 {
00439     int rc = FAILURE;
00440     int len = 0;
00441 
00442     if (!isConnected)
00443     {
00444         DBG("Session not connected! \r\n");
00445         return rc;
00446     }
00447         
00448     // Copy the keepAliveInterval value to local
00449     // MQTT specifies in seconds, we have to multiply that
00450     // amount for our 32 bit timers which accepts ms.
00451     keepAliveInterval = (connect_options.keepAliveInterval * 1000);
00452     
00453     DBG("Login with: \r\n");
00454     DBG("\tUsername: [%s]\r\n", connect_options.username.cstring);
00455     DBG("\tPassword: [%s]\r\n", connect_options.password.cstring);
00456     
00457     if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &connect_options)) <= 0)
00458     {
00459         DBG("Error serializing connect packet ...\r\n");
00460         return rc;
00461     }
00462     if ((rc = sendPacket((size_t) len)) != SUCCESS)  // send the connect packet
00463     {
00464         DBG("Error sending the connect request packet ...\r\n");
00465         return rc; 
00466     }
00467     
00468     // Wait for the CONNACK 
00469     if (readUntil(CONNACK, COMMAND_TIMEOUT) == CONNACK)
00470     {
00471         unsigned char connack_rc = 255;
00472         bool sessionPresent = false;
00473         DBG("Connection acknowledgement received ... deserializing respones ...\r\n");
00474         if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
00475             rc = connack_rc;
00476         else
00477             rc = FAILURE;
00478     }
00479     else
00480         rc = FAILURE;
00481 
00482     if (rc == SUCCESS)
00483     {
00484         DBG("Connected!!! ... starting connection timers ...\r\n");
00485         resetConnectionTimer();
00486     }
00487     
00488     DBG("Returning with rc = %d\r\n", rc);
00489     
00490     return rc;    
00491 }
00492 
00493 
00494 void MQTTThreadedClient::disconnect()
00495 {
00496     if (isConnected)
00497     {
00498         if( useTLS 
00499             && ( mbedtls_ssl_session_reset( &_ssl ) != 0 )
00500            )
00501         {
00502             DBG( "Session reset returned an error \r\n");
00503         }        
00504         
00505         isConnected = false;
00506         tcpSocket->close();      
00507     }
00508 }
00509 
00510 int MQTTThreadedClient::connect()
00511 {
00512     int ret = FAILURE;
00513 
00514     if ((network == NULL) || (tcpSocket == NULL)
00515         || host.empty())
00516     {
00517         DBG("Network settings not set! \r\n");
00518         return ret;
00519     }
00520     
00521     if (useTLS) 
00522     {
00523         if( ( ret = mbedtls_ssl_session_reset( &_ssl ) ) != 0 ) {
00524             mbedtls_printf( " failed\n  ! mbedtls_ssl_session_reset returned -0x%x\n\n", -ret );
00525             return ret;
00526         }
00527 #if defined(MBEDTLS_SSL_CLI_C)
00528         if ( hasSavedSession && (( ret = mbedtls_ssl_set_session( &_ssl, &saved_session ) ) != 0 )) {
00529             mbedtls_printf( " failed\n  ! mbedtls_ssl_conf_session returned %d\n\n", ret );
00530             return ret;
00531         }
00532 #endif        
00533     }
00534         
00535     tcpSocket->open(network);
00536     if (useTLS)
00537     {
00538         DBG("mbedtls_ssl_set_hostname ...\r\n");         
00539         mbedtls_ssl_set_hostname(&_ssl, host.c_str());
00540         DBG("mbedtls_ssl_set_bio ...\r\n");         
00541         mbedtls_ssl_set_bio(&_ssl, static_cast<void *>(tcpSocket),
00542                                    ssl_send, ssl_recv, NULL );
00543     }
00544     
00545     if (( ret = tcpSocket->connect(host.c_str(), port)) < 0 )
00546     {
00547          DBG("Error connecting to %s:%d with %d\r\n", host.c_str(), port, ret);
00548          return ret;
00549     }else
00550          isConnected = true;
00551     
00552     if (useTLS) 
00553     {
00554         
00555         if (doTLSHandshake() < 0)
00556         {
00557             DBG("TLS Handshake failed! \r\n");
00558             return FAILURE;
00559         }else
00560             DBG("TLS Handshake complete!! \r\n");
00561     }
00562     
00563     return login();
00564 }
00565 
00566 void MQTTThreadedClient::setConnectionParameters(const char * chost, uint16_t cport, MQTTPacket_connectData & options)
00567 {
00568     // Copy the settings for reconnection
00569     host = chost;
00570     port = cport;
00571     connect_options = options;    
00572 }
00573 
00574 int MQTTThreadedClient::publish(PubMessage& msg)
00575 {
00576 #if 0
00577     int id = queue.call(mbed::callback(this, &MQTTThreadedClient::sendPublish), topic, message);
00578     // TODO: handle id values when the function is called later
00579     if (id == 0)
00580         return FAILURE;
00581     else
00582         return SUCCESS;
00583 #endif
00584     PubMessage *message = mpool.alloc();
00585     // Simple copy
00586     *message = msg;
00587     
00588     // Push the data to the thread
00589     DBG("Pushing data to consumer thread ...\r\n");
00590     mqueue.put(message);
00591     
00592     return SUCCESS;
00593 }
00594 
00595 int MQTTThreadedClient::sendPublish(PubMessage& message)
00596 {
00597      MQTTString topicString = MQTTString_initializer;
00598      
00599      if (!isConnected) 
00600      {
00601         DBG("Not connected!!! ...\r\n");
00602         return FAILURE;
00603      }
00604         
00605      topicString.cstring = (char*) &message.topic[0];
00606      int len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, message.qos, false, message.id,
00607               topicString, (unsigned char*) &message.payload[0], (int) message.payloadlen);
00608      if (len <= 0)
00609      {
00610          DBG("Failed serializing message ...\r\n");
00611          return FAILURE;
00612      }
00613      
00614      if (sendPacket(len) == SUCCESS)
00615      {
00616          DBG("Successfully sent publish packet to server ...\r\n");
00617          return SUCCESS;
00618      }
00619     
00620     DBG("Failed to send publish packet to server ...\r\n");
00621     return FAILURE;
00622 }
00623 
00624 void MQTTThreadedClient::addTopicHandler(const char * topicstr, void (*function)(MessageData &))
00625 {
00626     // Push the subscription into the map ...
00627     FP<void,MessageData &> fp;
00628     fp.attach(function);
00629     
00630     topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));    
00631 } 
00632 
00633 int MQTTThreadedClient::processSubscriptions()
00634 {
00635     int numsubscribed = 0;
00636     
00637     if (!isConnected) 
00638     {
00639             DBG("Session not connected!!\r\n");
00640             return 0;
00641     }
00642     
00643     DBG("Processing subscribed topics ....\r\n");
00644     
00645     std::map<std::string, FP<void, MessageData &> >::iterator it;
00646     for(it = topicCBMap.begin(); it != topicCBMap.end(); it++) 
00647     {
00648         int rc = FAILURE;
00649         int len = 0;
00650         //TODO: We only subscribe to QoS = 0 for now
00651         QoS qos = QOS0;
00652 
00653         MQTTString topic = {(char*)it->first.c_str(), {0, 0}};
00654         DBG("Subscribing to topic [%s]\r\n", topic.cstring);
00655 
00656 
00657         len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
00658         if (len <= 0) {
00659             DBG("Error serializing subscribe packet ...\r\n");
00660             continue;
00661         }
00662 
00663         if ((rc = sendPacket(len)) != SUCCESS) {
00664             DBG("Error sending subscribe packet [%d]\r\n", rc);
00665             continue;
00666         }
00667 
00668         DBG("Waiting for subscription ack ...\r\n");
00669         // Wait for SUBACK, dropping packets read along the way ...
00670         if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback
00671             int count = 0, grantedQoS = -1;
00672             unsigned short mypacketid;
00673             if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
00674                 rc = grantedQoS; // 0, 1, 2 or 0x80
00675             // For as long as we do not get 0x80 ..
00676             if (rc != 0x80) 
00677             {
00678                 // Reset connection timers here ...
00679                 resetConnectionTimer();
00680                 DBG("Successfully subscribed to %s ...\r\n", it->first.c_str());
00681                 numsubscribed++;
00682             } else {
00683                 DBG("Failed to subscribe to topic %s ... (not authorized?)\r\n", it->first.c_str());
00684             }
00685         } else 
00686             DBG("Failed to subscribe to topic %s (ack not received) ...\r\n", it->first.c_str());
00687     } // end for loop
00688     
00689     return numsubscribed;    
00690 }
00691 
00692 bool MQTTThreadedClient::isTopicMatched(char* topicFilter, MQTTString& topicName)
00693 {
00694     char* curf = topicFilter;
00695     char* curn = topicName.lenstring.data;
00696     char* curn_end = curn + topicName.lenstring.len;
00697 
00698     while (*curf && curn < curn_end)
00699     {
00700         if (*curn == '/' && *curf != '/')
00701             break;
00702         if (*curf != '+' && *curf != '#' && *curf != *curn)
00703             break;
00704         if (*curf == '+')
00705         {   // skip until we meet the next separator, or end of string
00706             char* nextpos = curn + 1;
00707             while (nextpos < curn_end && *nextpos != '/')
00708                 nextpos = ++curn + 1;
00709         }
00710         else if (*curf == '#')
00711             curn = curn_end - 1;    // skip until end of string
00712         curf++;
00713         curn++;
00714     };
00715 
00716     return (curn == curn_end) && (*curf == '\0');
00717 }
00718 
00719 int MQTTThreadedClient::handlePublishMsg()
00720 {
00721     MQTTString topicName = MQTTString_initializer;
00722     Message msg;
00723     int intQoS;
00724     DBG("Deserializing publish message ...\r\n");
00725     if (MQTTDeserialize_publish((unsigned char*)&msg.dup, 
00726             &intQoS, 
00727             (unsigned char*)&msg.retained, 
00728             (unsigned short*)&msg.id, 
00729             &topicName,
00730             (unsigned char**)&msg.payload, 
00731             (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
00732     {
00733         DBG("Error deserializing published message ...\r\n");
00734         return -1;
00735     }
00736 
00737     std::string topic;
00738     if (topicName.lenstring.len > 0)
00739     {
00740         topic = std::string((const char *) topicName.lenstring.data, (size_t) topicName.lenstring.len);
00741     }else
00742         topic = (const char *) topicName.cstring;
00743     
00744     DBG("Got message for topic [%s], QoS [%d] ...\r\n", topic.c_str(), intQoS);
00745     
00746     msg.qos = (QoS) intQoS;
00747 
00748     
00749     // Call the handlers for each topic 
00750     if (topicCBMap.find(topic) != topicCBMap.end())
00751     {
00752         // Call the callback function 
00753         if (topicCBMap[topic].attached())
00754         {
00755             DBG("Invoking function handler for topic ...\r\n");
00756             MessageData md(topicName, msg);            
00757             topicCBMap[topic](md);
00758             
00759             return 1;
00760         }
00761     }
00762     
00763     // TODO: depending on the QoS
00764     // we send data to the server = PUBACK or PUBREC
00765     switch(intQoS)
00766     {
00767         case QOS0:
00768             // We send back nothing ...
00769             break;
00770         case QOS1:
00771             // TODO: implement
00772             break;
00773         case QOS2:
00774             // TODO: implement
00775             break;
00776         default:
00777             break;
00778     }
00779     
00780     return 0;
00781 }
00782 
00783 void MQTTThreadedClient::resetConnectionTimer()
00784 {
00785     if (keepAliveInterval > 0)
00786     {
00787         comTimer.reset();
00788         comTimer.start();
00789     }
00790 }
00791 
00792 bool MQTTThreadedClient::hasConnectionTimedOut()
00793 {
00794     if (keepAliveInterval > 0 ) {
00795         // Check connection timer
00796         if (comTimer.read_ms() > keepAliveInterval)
00797             return true;
00798         else
00799             return false;
00800     }
00801 
00802     return false;
00803 }
00804         
00805 void MQTTThreadedClient::sendPingRequest()
00806 {
00807     int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
00808     if (len > 0 && (sendPacket(len) == SUCCESS)) // send the ping packet
00809     {
00810         DBG("Ping request sent successfully ...\r\n");
00811     }
00812 }
00813 
00814 void MQTTThreadedClient::startListener()
00815 {
00816     int pType;
00817     int numsubs;
00818     // Continuesly listens for packets and dispatch
00819     // message handlers ...
00820     if (useTLS)
00821     {
00822         initTLS();
00823     }
00824             
00825     while(true)
00826     {
00827 
00828         // Attempt to reconnect and login
00829         if ( connect() < 0 )
00830         {
00831             disconnect();
00832             // Wait for a few secs and reconnect ...
00833             Thread::wait(6000);
00834             continue;
00835         }
00836         
00837         numsubs = processSubscriptions();
00838         DBG("Subscribed %d topics ...\r\n", numsubs);
00839          
00840         // loop read    
00841         while(true) 
00842         {
00843             pType = readPacket();
00844             switch(pType) 
00845             {
00846                 case TIMEOUT:
00847                     // No data available from the network ...
00848                     break;
00849                 case FAILURE:
00850                     {
00851                         DBG("readPacket returned failure \r\n");
00852                         goto reconnect;
00853                     }
00854                 case BUFFER_OVERFLOW: 
00855                     {
00856                         // TODO: Network error, do we disconnect and reconnect?
00857                         DBG("Failure or buffer overflow problem ... \r\n");
00858                         MBED_ASSERT(false);
00859                     }
00860                     break;
00861                 /**
00862                 *  The rest of the return codes below (all positive) is about MQTT
00863                  * response codes
00864                  **/
00865                 case CONNACK:
00866                 case PUBACK:
00867                 case SUBACK:
00868                     break;
00869                 case PUBLISH: 
00870                     {
00871                         DBG("Publish received!....\r\n");
00872                         // We receive data from the MQTT server ..
00873                         if (handlePublishMsg() < 0) {
00874                             DBG("Error handling PUBLISH message ... \r\n");
00875                             break;
00876                         }
00877                     }
00878                     break;
00879                 case PINGRESP: 
00880                     {
00881                         DBG("Got ping response ...\r\n");
00882                         resetConnectionTimer();
00883                     }
00884                     break;
00885                 default:
00886                     DBG("Unknown/Not handled message from server pType[%d]\r\n", pType);
00887             }
00888 
00889             // Check if its time to send a keepAlive packet
00890             if (hasConnectionTimedOut()) {
00891                 // Queue the ping request so that other
00892                 // pending operations queued above will go first
00893                 queue.call(this, &MQTTThreadedClient::sendPingRequest);
00894             }
00895 
00896             // Check if we have messages on the message queue
00897             osEvent evt = mqueue.get(10);
00898             if (evt.status == osEventMessage) {
00899 
00900                 DBG("Got message to publish! ... \r\n");
00901 
00902                 // Unpack the message
00903                 PubMessage * message = (PubMessage *)evt.value.p;
00904 
00905                 // Send the packet, do not queue the call
00906                 // like the ping above ..
00907                 if ( sendPublish(*message) == SUCCESS) {
00908                     // Reset timers if we have been able to send successfully
00909                     resetConnectionTimer();
00910                 } else {
00911                     // Disconnected?
00912                     goto reconnect;
00913                 }
00914 
00915                 // Free the message from mempool  after using
00916                 mpool.free(message);
00917             }
00918 
00919             // Dispatch any queued events ...
00920             queue.dispatch(100);
00921         } // end while loop
00922 
00923 reconnect:
00924         // reconnect?
00925         DBG("Client disconnected!! ... retrying ...\r\n");
00926         disconnect();
00927         
00928     };
00929 }
00930 
00931 void MQTTThreadedClient::stopListener()
00932 {
00933     // TODO: Set a signal/flag that the running thread 
00934     // will check if its ok to stop ...
00935 }
00936 
00937 }