Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
MQTTThreadedClient.cpp
00001 #include "mbed.h" 00002 #include "rtos.h" 00003 #include "MQTTThreadedClient.h" 00004 #include "mbedtls/platform.h" 00005 #include "mbedtls/ssl.h" 00006 #include "mbedtls/entropy.h" 00007 #include "mbedtls/ctr_drbg.h" 00008 #include "mbedtls/error.h" 00009 00010 #ifdef DEBUG 00011 #define DBG(fmt, args...) printf(fmt, ## args) 00012 #else 00013 #define DBG(fmt, args...) /* Don't do anything in release builds */ 00014 #endif 00015 00016 static MemoryPool<MQTT::PubMessage, 4> mpool; 00017 static Queue<MQTT::PubMessage, 4> mqueue; 00018 00019 // SSL/TLS variables 00020 mbedtls_entropy_context _entropy; 00021 mbedtls_ctr_drbg_context _ctr_drbg; 00022 mbedtls_x509_crt _cacert; 00023 mbedtls_ssl_context _ssl; 00024 mbedtls_ssl_config _ssl_conf; 00025 mbedtls_ssl_session saved_session; 00026 00027 namespace MQTT { 00028 00029 /** 00030 * Receive callback for mbed TLS 00031 */ 00032 static int ssl_recv(void *ctx, unsigned char *buf, size_t len) 00033 { 00034 int recv = -1; 00035 TCPSocket *socket = static_cast<TCPSocket *>(ctx); 00036 socket->set_timeout(DEFAULT_SOCKET_TIMEOUT); 00037 recv = socket->recv(buf, len); 00038 00039 if (NSAPI_ERROR_WOULD_BLOCK == recv) { 00040 return MBEDTLS_ERR_SSL_WANT_READ; 00041 } else if (recv < 0) { 00042 return -1; 00043 } else { 00044 return recv; 00045 } 00046 } 00047 00048 /** 00049 * Send callback for mbed TLS 00050 */ 00051 static int ssl_send(void *ctx, const unsigned char *buf, size_t len) 00052 { 00053 int sent = -1; 00054 TCPSocket *socket = static_cast<TCPSocket *>(ctx); 00055 socket->set_timeout(DEFAULT_SOCKET_TIMEOUT); 00056 sent = socket->send(buf, len); 00057 00058 if(NSAPI_ERROR_WOULD_BLOCK == sent) { 00059 return MBEDTLS_ERR_SSL_WANT_WRITE; 00060 } else if (sent < 0) { 00061 return -1; 00062 } else { 00063 return sent; 00064 } 00065 } 00066 00067 #if DEBUG_LEVEL > 0 00068 /** 00069 * Debug callback for mbed TLS 00070 * Just prints on the USB serial port 00071 */ 00072 static void my_debug(void *ctx, int level, const char *file, int line, 00073 const char *str) 00074 { 00075 const char *p, *basename; 00076 (void) ctx; 00077 00078 /* Extract basename from file */ 00079 for(p = basename = file; *p != '\0'; p++) { 00080 if(*p == '/' || *p == '\\') { 00081 basename = p + 1; 00082 } 00083 } 00084 00085 if (_debug) { 00086 mbedtls_printf("%s:%04d: |%d| %s", basename, line, level, str); 00087 } 00088 } 00089 00090 /** 00091 * Certificate verification callback for mbed TLS 00092 * Here we only use it to display information on each cert in the chain 00093 */ 00094 static int my_verify(void *data, mbedtls_x509_crt *crt, int depth, uint32_t *flags) 00095 { 00096 const uint32_t buf_size = 1024; 00097 char *buf = new char[buf_size]; 00098 (void) data; 00099 00100 if (_debug) mbedtls_printf("\nVerifying certificate at depth %d:\r\n", depth); 00101 mbedtls_x509_crt_info(buf, buf_size - 1, " ", crt); 00102 if (_debug) mbedtls_printf("%s", buf); 00103 00104 if (*flags == 0) 00105 if (_debug) mbedtls_printf("No verification issue for this certificate\r\n"); 00106 else { 00107 mbedtls_x509_crt_verify_info(buf, buf_size, " ! ", *flags); 00108 if (_debug) mbedtls_printf("%s\n", buf); 00109 } 00110 00111 delete[] buf; 00112 return 0; 00113 } 00114 #endif 00115 00116 00117 void MQTTThreadedClient::setupTLS() 00118 { 00119 if (useTLS) 00120 { 00121 mbedtls_entropy_init(&_entropy); 00122 mbedtls_ctr_drbg_init(&_ctr_drbg); 00123 mbedtls_x509_crt_init(&_cacert); 00124 mbedtls_ssl_init(&_ssl); 00125 mbedtls_ssl_config_init(&_ssl_conf); 00126 memset( &saved_session, 0, sizeof( mbedtls_ssl_session ) ); 00127 } 00128 } 00129 00130 void MQTTThreadedClient::freeTLS() 00131 { 00132 if (useTLS) 00133 { 00134 mbedtls_entropy_free(&_entropy); 00135 mbedtls_ctr_drbg_free(&_ctr_drbg); 00136 mbedtls_x509_crt_free(&_cacert); 00137 mbedtls_ssl_free(&_ssl); 00138 mbedtls_ssl_config_free(&_ssl_conf); 00139 } 00140 } 00141 00142 int MQTTThreadedClient::initTLS() 00143 { 00144 int ret; 00145 00146 DBG("Initializing TLS ...\r\n"); 00147 DBG("mbedtls_ctr_drdbg_seed ...\r\n"); 00148 if ((ret = mbedtls_ctr_drbg_seed(&_ctr_drbg, mbedtls_entropy_func, &_entropy, 00149 (const unsigned char *) DRBG_PERS, 00150 sizeof (DRBG_PERS))) != 0) { 00151 mbedtls_printf("mbedtls_crt_drbg_init returned [%x]\r\n", ret); 00152 _error = ret; 00153 return -1; 00154 } 00155 DBG("mbedtls_x509_crt_parse ...\r\n"); 00156 if ((ret = mbedtls_x509_crt_parse(&_cacert, (const unsigned char *) ssl_ca_pem, 00157 strlen(ssl_ca_pem) + 1)) != 0) { 00158 mbedtls_printf("mbedtls_x509_crt_parse returned [%x]\r\n", ret); 00159 _error = ret; 00160 return -1; 00161 } 00162 00163 DBG("mbedtls_ssl_config_defaults ...\r\n"); 00164 if ((ret = mbedtls_ssl_config_defaults(&_ssl_conf, 00165 MBEDTLS_SSL_IS_CLIENT, 00166 MBEDTLS_SSL_TRANSPORT_STREAM, 00167 MBEDTLS_SSL_PRESET_DEFAULT)) != 0) { 00168 mbedtls_printf("mbedtls_ssl_config_defaults returned [%x]\r\n", ret); 00169 _error = ret; 00170 return -1; 00171 } 00172 00173 DBG("mbedtls_ssl_config_ca_chain ...\r\n"); 00174 mbedtls_ssl_conf_ca_chain(&_ssl_conf, &_cacert, NULL); 00175 DBG("mbedtls_ssl_conf_rng ...\r\n"); 00176 mbedtls_ssl_conf_rng(&_ssl_conf, mbedtls_ctr_drbg_random, &_ctr_drbg); 00177 00178 /* It is possible to disable authentication by passing 00179 * MBEDTLS_SSL_VERIFY_NONE in the call to mbedtls_ssl_conf_authmode() 00180 */ 00181 DBG("mbedtls_ssl_conf_authmode ...\r\n"); 00182 mbedtls_ssl_conf_authmode(&_ssl_conf, MBEDTLS_SSL_VERIFY_REQUIRED); 00183 00184 #if DEBUG_LEVEL > 0 00185 mbedtls_ssl_conf_verify(&_ssl_conf, my_verify, NULL); 00186 mbedtls_ssl_conf_dbg(&_ssl_conf, my_debug, NULL); 00187 mbedtls_debug_set_threshold(DEBUG_LEVEL); 00188 #endif 00189 00190 DBG("mbedtls_ssl_setup ...\r\n"); 00191 if ((ret = mbedtls_ssl_setup(&_ssl, &_ssl_conf)) != 0) { 00192 mbedtls_printf("mbedtls_ssl_setup returned [%x]\r\n", ret); 00193 _error = ret; 00194 return -1; 00195 } 00196 00197 return 0; 00198 } 00199 00200 int MQTTThreadedClient::doTLSHandshake() 00201 { 00202 int ret; 00203 00204 /* Start the handshake, the rest will be done in onReceive() */ 00205 DBG("Starting the TLS handshake...\r\n"); 00206 ret = mbedtls_ssl_handshake(&_ssl); 00207 if (ret < 0) 00208 { 00209 if (ret != MBEDTLS_ERR_SSL_WANT_READ && 00210 ret != MBEDTLS_ERR_SSL_WANT_WRITE) 00211 mbedtls_printf("mbedtls_ssl_handshake returned [%x]\r\n", ret); 00212 else 00213 { 00214 // do not close the socket if timed out 00215 ret = TIMEOUT; 00216 } 00217 return ret; 00218 } 00219 00220 /* Handshake done, time to print info */ 00221 DBG("TLS connection to %s:%d established\r\n", 00222 host.c_str(), port); 00223 00224 const uint32_t buf_size = 1024; 00225 char *buf = new char[buf_size]; 00226 mbedtls_x509_crt_info(buf, buf_size, "\r ", 00227 mbedtls_ssl_get_peer_cert(&_ssl)); 00228 00229 DBG("Server certificate:\r\n%s\r", buf); 00230 // Verify server cert ... 00231 uint32_t flags = mbedtls_ssl_get_verify_result(&_ssl); 00232 if( flags != 0 ) 00233 { 00234 mbedtls_x509_crt_verify_info(buf, buf_size, "\r ! ", flags); 00235 DBG("Certificate verification failed:\r\n%s\r\r\n", buf); 00236 // free server cert ... before error return 00237 delete [] buf; 00238 return -1; 00239 } 00240 00241 DBG("Certificate verification passed\r\n\r\n"); 00242 // delete server cert after verification 00243 delete [] buf; 00244 00245 #if defined(MBEDTLS_SSL_CLI_C) 00246 // TODO: Save the session here for reconnect. 00247 if( ( ret = mbedtls_ssl_get_session( &_ssl, &saved_session ) ) != 0 ) 00248 { 00249 mbedtls_printf( "mbedtls_ssl_get_session returned -0x%x\n\n", -ret ); 00250 hasSavedSession = false; 00251 return -1; 00252 } 00253 #endif 00254 DBG("Session saved for reconnect ...\r\n"); 00255 hasSavedSession = true; 00256 00257 return 0; 00258 } 00259 00260 int MQTTThreadedClient::readBytesToBuffer(char * buffer, size_t size, int timeout) 00261 { 00262 int rc; 00263 00264 if (tcpSocket == NULL) 00265 return -1; 00266 00267 if (useTLS) 00268 { 00269 // Do SSL/TLS read 00270 rc = mbedtls_ssl_read(&_ssl, (unsigned char *) buffer, size); 00271 if (MBEDTLS_ERR_SSL_WANT_READ == rc) 00272 return TIMEOUT; 00273 else 00274 return rc; 00275 } else { 00276 // non-blocking socket ... 00277 tcpSocket->set_timeout(timeout); 00278 rc = tcpSocket->recv( (void *) buffer, size); 00279 00280 // return 0 bytes if timeout ... 00281 if (NSAPI_ERROR_WOULD_BLOCK == rc) 00282 return TIMEOUT; 00283 else 00284 return rc; // return the number of bytes received or error 00285 } 00286 } 00287 00288 int MQTTThreadedClient::sendBytesFromBuffer(char * buffer, size_t size, int timeout) 00289 { 00290 int rc; 00291 00292 if (tcpSocket == NULL) 00293 return -1; 00294 00295 if (useTLS) { 00296 // Do SSL/TLS write 00297 rc = mbedtls_ssl_write(&_ssl, (const unsigned char *) buffer, size); 00298 if (MBEDTLS_ERR_SSL_WANT_WRITE == rc) 00299 return TIMEOUT; 00300 else 00301 return rc; 00302 } else { 00303 00304 // set the write timeout 00305 tcpSocket->set_timeout(timeout); 00306 rc = tcpSocket->send(buffer, size); 00307 00308 if ( NSAPI_ERROR_WOULD_BLOCK == rc) 00309 return TIMEOUT; 00310 else 00311 return rc; 00312 } 00313 } 00314 00315 int MQTTThreadedClient::readPacketLength(int* value) 00316 { 00317 int rc = MQTTPACKET_READ_ERROR; 00318 unsigned char c; 00319 int multiplier = 1; 00320 int len = 0; 00321 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; 00322 00323 *value = 0; 00324 do 00325 { 00326 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) 00327 { 00328 rc = MQTTPACKET_READ_ERROR; /* bad data */ 00329 goto exit; 00330 } 00331 00332 rc = readBytesToBuffer((char *) &c, 1, DEFAULT_SOCKET_TIMEOUT); 00333 if (rc != 1) 00334 { 00335 rc = MQTTPACKET_READ_ERROR; 00336 goto exit; 00337 } 00338 00339 *value += (c & 127) * multiplier; 00340 multiplier *= 128; 00341 } while ((c & 128) != 0); 00342 00343 rc = MQTTPACKET_READ_COMPLETE; 00344 00345 exit: 00346 if (rc == MQTTPACKET_READ_ERROR ) 00347 len = -1; 00348 00349 return len; 00350 } 00351 00352 int MQTTThreadedClient::sendPacket(size_t length) 00353 { 00354 int rc = FAILURE; 00355 int sent = 0; 00356 00357 while (sent < length) 00358 { 00359 rc = sendBytesFromBuffer((char *) &sendbuf[sent], length - sent, DEFAULT_SOCKET_TIMEOUT); 00360 if (rc < 0) // there was an error writing the data 00361 break; 00362 sent += rc; 00363 } 00364 00365 if (sent == length) 00366 rc = SUCCESS; 00367 else 00368 rc = FAILURE; 00369 00370 return rc; 00371 } 00372 /** 00373 * Reads the entire packet to readbuf and returns 00374 * the type of packet when successful, otherwise 00375 * a negative error code is returned. 00376 **/ 00377 int MQTTThreadedClient::readPacket() 00378 { 00379 int rc = FAILURE; 00380 MQTTHeader header = {0}; 00381 int len = 0; 00382 int rem_len = 0; 00383 00384 /* 1. read the header byte. This has the packet type in it */ 00385 if ( (rc = readBytesToBuffer((char *) &readbuf[0], 1, DEFAULT_SOCKET_TIMEOUT)) != 1) 00386 goto exit; 00387 00388 len = 1; 00389 /* 2. read the remaining length. This is variable in itself */ 00390 if ( readPacketLength(&rem_len) < 0 ) 00391 goto exit; 00392 00393 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */ 00394 00395 if (rem_len > (MAX_MQTT_PACKET_SIZE - len)) 00396 { 00397 rc = BUFFER_OVERFLOW; 00398 goto exit; 00399 } 00400 00401 /* 3. read the rest of the buffer using a callback to supply the rest of the data */ 00402 if (rem_len > 0 && (readBytesToBuffer((char *) (readbuf + len), rem_len, DEFAULT_SOCKET_TIMEOUT) != rem_len)) 00403 goto exit; 00404 00405 // Convert the header to type 00406 // and update rc 00407 header.byte = readbuf[0]; 00408 rc = header.bits.type; 00409 00410 exit: 00411 00412 return rc; 00413 } 00414 00415 /** 00416 * Read until a specified packet type is received, or untill the specified 00417 * timeout dropping packets along the way. 00418 **/ 00419 int MQTTThreadedClient::readUntil(int packetType, int timeout) 00420 { 00421 int pType = FAILURE; 00422 Timer timer; 00423 00424 timer.start(); 00425 do { 00426 pType = readPacket(); 00427 if (pType < 0) 00428 break; 00429 00430 if (timer.read_ms() > timeout) 00431 { 00432 pType = FAILURE; 00433 break; 00434 } 00435 }while(pType != packetType); 00436 00437 return pType; 00438 } 00439 00440 00441 int MQTTThreadedClient::login() 00442 { 00443 int rc = FAILURE; 00444 int len = 0; 00445 00446 if (!isConnected) 00447 { 00448 DBG("Session not connected! \r\n"); 00449 return rc; 00450 } 00451 00452 // Copy the keepAliveInterval value to local 00453 // MQTT specifies in seconds, we have to multiply that 00454 // amount for our 32 bit timers which accepts ms. 00455 keepAliveInterval = (connect_options.keepAliveInterval * 1000); 00456 00457 DBG("Login with: \r\n"); 00458 DBG("\tUsername: [%s]\r\n", connect_options.username.cstring); 00459 DBG("\tPassword: [%s]\r\n", connect_options.password.cstring); 00460 00461 if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &connect_options)) <= 0) 00462 { 00463 DBG("Error serializing connect packet ...\r\n"); 00464 return rc; 00465 } 00466 if ((rc = sendPacket((size_t) len)) != SUCCESS) // send the connect packet 00467 { 00468 DBG("Error sending the connect request packet ...\r\n"); 00469 return rc; 00470 } 00471 00472 // Wait for the CONNACK 00473 if (readUntil(CONNACK, COMMAND_TIMEOUT) == CONNACK) 00474 { 00475 unsigned char connack_rc = 255; 00476 bool sessionPresent = false; 00477 DBG("Connection acknowledgement received ... deserializing respones ...\r\n"); 00478 if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00479 rc = connack_rc; 00480 else 00481 rc = FAILURE; 00482 } 00483 else 00484 rc = FAILURE; 00485 00486 if (rc == SUCCESS) 00487 { 00488 DBG("Connected!!! ... starting connection timers ...\r\n"); 00489 resetConnectionTimer(); 00490 } 00491 00492 DBG("Returning with rc = %d\r\n", rc); 00493 00494 return rc; 00495 } 00496 00497 00498 void MQTTThreadedClient::disconnect() 00499 { 00500 if (isConnected) 00501 { 00502 if( useTLS 00503 && ( mbedtls_ssl_session_reset( &_ssl ) != 0 ) 00504 ) 00505 { 00506 DBG( "Session reset returned an error \r\n"); 00507 } 00508 00509 isConnected = false; 00510 tcpSocket->close(); 00511 } 00512 } 00513 00514 int MQTTThreadedClient::connect() 00515 { 00516 int ret = FAILURE; 00517 00518 if ((network == NULL) || (tcpSocket == NULL) 00519 || host.empty()) 00520 { 00521 DBG("Network settings not set! \r\n"); 00522 return ret; 00523 } 00524 00525 if (useTLS) 00526 { 00527 if( ( ret = mbedtls_ssl_session_reset( &_ssl ) ) != 0 ) { 00528 mbedtls_printf( " failed\n ! mbedtls_ssl_session_reset returned -0x%x\n\n", -ret ); 00529 return ret; 00530 } 00531 #if defined(MBEDTLS_SSL_CLI_C) 00532 if ( hasSavedSession && (( ret = mbedtls_ssl_set_session( &_ssl, &saved_session ) ) != 0 )) { 00533 mbedtls_printf( " failed\n ! mbedtls_ssl_conf_session returned %d\n\n", ret ); 00534 return ret; 00535 } 00536 #endif 00537 } 00538 00539 tcpSocket->open(network); 00540 if (useTLS) 00541 { 00542 DBG("mbedtls_ssl_set_hostname ...\r\n"); 00543 mbedtls_ssl_set_hostname(&_ssl, host.c_str()); 00544 DBG("mbedtls_ssl_set_bio ...\r\n"); 00545 mbedtls_ssl_set_bio(&_ssl, static_cast<void *>(tcpSocket), 00546 ssl_send, ssl_recv, NULL ); 00547 } 00548 00549 if (( ret = tcpSocket->connect(host.c_str(), port)) < 0 ) 00550 { 00551 DBG("Error connecting to %s:%d with %d\r\n", host.c_str(), port, ret); 00552 return ret; 00553 }else 00554 isConnected = true; 00555 00556 if (useTLS) 00557 { 00558 00559 if (doTLSHandshake() < 0) 00560 { 00561 DBG("TLS Handshake failed! \r\n"); 00562 return FAILURE; 00563 }else 00564 DBG("TLS Handshake complete!! \r\n"); 00565 } 00566 00567 return login(); 00568 } 00569 00570 void MQTTThreadedClient::setConnectionParameters(const char * chost, uint16_t cport, MQTTPacket_connectData & options) 00571 { 00572 // Copy the settings for reconnection 00573 host = chost; 00574 port = cport; 00575 connect_options = options; 00576 } 00577 00578 int MQTTThreadedClient::publish(PubMessage& msg) 00579 { 00580 #if 0 00581 int id = queue.call(mbed::callback(this, &MQTTThreadedClient::sendPublish), topic, message); 00582 // TODO: handle id values when the function is called later 00583 if (id == 0) 00584 return FAILURE; 00585 else 00586 return SUCCESS; 00587 #endif 00588 PubMessage *message = mpool.alloc(); 00589 // Simple copy 00590 *message = msg; 00591 00592 // Push the data to the thread 00593 DBG("[Thread:%d] Pushing data to consumer thread ...\r\n", Thread::gettid()); 00594 mqueue.put(message); 00595 00596 return SUCCESS; 00597 } 00598 00599 int MQTTThreadedClient::sendPublish(PubMessage& message) 00600 { 00601 MQTTString topicString = MQTTString_initializer; 00602 00603 if (!isConnected) 00604 { 00605 DBG("[Thread:%d] Not connected!!! ...\r\n", Thread::gettid()); 00606 return FAILURE; 00607 } 00608 00609 topicString.cstring = (char*) &message.topic[0]; 00610 int len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, message.qos, false, message.id, 00611 topicString, (unsigned char*) &message.payload[0], (int) message.payloadlen); 00612 if (len <= 0) 00613 { 00614 DBG("[Thread:%d]Failed serializing message ...\r\n", Thread::gettid()); 00615 return FAILURE; 00616 } 00617 00618 if (sendPacket(len) == SUCCESS) 00619 { 00620 DBG("[Thread:%d]Successfully sent publish packet to server ...\r\n", Thread::gettid()); 00621 return SUCCESS; 00622 } 00623 00624 DBG("[Thread:%d]Failed to send publish packet to server ...\r\n", Thread::gettid()); 00625 return FAILURE; 00626 } 00627 00628 void MQTTThreadedClient::addTopicHandler(const char * topicstr, void (*function)(MessageData &)) 00629 { 00630 // Push the subscription into the map ... 00631 FP<void,MessageData &> fp; 00632 fp.attach(function); 00633 00634 topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp)); 00635 } 00636 00637 int MQTTThreadedClient::processSubscriptions() 00638 { 00639 int numsubscribed = 0; 00640 00641 if (!isConnected) 00642 { 00643 DBG("Session not connected!!\r\n"); 00644 return 0; 00645 } 00646 00647 DBG("Processing subscribed topics ....\r\n"); 00648 00649 std::map<std::string, FP<void, MessageData &> >::iterator it; 00650 for(it = topicCBMap.begin(); it != topicCBMap.end(); it++) 00651 { 00652 int rc = FAILURE; 00653 int len = 0; 00654 //TODO: We only subscribe to QoS = 0 for now 00655 QoS qos = QOS0; 00656 00657 MQTTString topic = {(char*)it->first.c_str(), {0, 0}}; 00658 DBG("Subscribing to topic [%s]\r\n", topic.cstring); 00659 00660 00661 len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); 00662 if (len <= 0) { 00663 DBG("Error serializing subscribe packet ...\r\n"); 00664 continue; 00665 } 00666 00667 if ((rc = sendPacket(len)) != SUCCESS) { 00668 DBG("Error sending subscribe packet [%d]\r\n", rc); 00669 continue; 00670 } 00671 00672 DBG("Waiting for subscription ack ...\r\n"); 00673 // Wait for SUBACK, dropping packets read along the way ... 00674 if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback 00675 int count = 0, grantedQoS = -1; 00676 unsigned short mypacketid; 00677 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00678 rc = grantedQoS; // 0, 1, 2 or 0x80 00679 // For as long as we do not get 0x80 .. 00680 if (rc != 0x80) 00681 { 00682 // Reset connection timers here ... 00683 resetConnectionTimer(); 00684 DBG("Successfully subscribed to %s ...\r\n", it->first.c_str()); 00685 numsubscribed++; 00686 } else { 00687 DBG("Failed to subscribe to topic %s ... (not authorized?)\r\n", it->first.c_str()); 00688 } 00689 } else 00690 DBG("Failed to subscribe to topic %s (ack not received) ...\r\n", it->first.c_str()); 00691 } // end for loop 00692 00693 return numsubscribed; 00694 } 00695 00696 bool MQTTThreadedClient::isTopicMatched(char* topicFilter, MQTTString& topicName) 00697 { 00698 char* curf = topicFilter; 00699 char* curn = topicName.lenstring.data; 00700 char* curn_end = curn + topicName.lenstring.len; 00701 00702 while (*curf && curn < curn_end) 00703 { 00704 if (*curn == '/' && *curf != '/') 00705 break; 00706 if (*curf != '+' && *curf != '#' && *curf != *curn) 00707 break; 00708 if (*curf == '+') 00709 { // skip until we meet the next separator, or end of string 00710 char* nextpos = curn + 1; 00711 while (nextpos < curn_end && *nextpos != '/') 00712 nextpos = ++curn + 1; 00713 } 00714 else if (*curf == '#') 00715 curn = curn_end - 1; // skip until end of string 00716 curf++; 00717 curn++; 00718 }; 00719 00720 return (curn == curn_end) && (*curf == '\0'); 00721 } 00722 00723 int MQTTThreadedClient::handlePublishMsg() 00724 { 00725 MQTTString topicName = MQTTString_initializer; 00726 Message msg; 00727 int intQoS; 00728 DBG("[Thread:%d]Deserializing publish message ...\r\n", Thread::gettid()); 00729 if (MQTTDeserialize_publish((unsigned char*)&msg.dup, 00730 &intQoS, 00731 (unsigned char*)&msg.retained, 00732 (unsigned short*)&msg.id, 00733 &topicName, 00734 (unsigned char**)&msg.payload, 00735 (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00736 { 00737 DBG("[Thread:%d]Error deserializing published message ...\r\n", Thread::gettid()); 00738 return -1; 00739 } 00740 00741 std::string topic; 00742 if (topicName.lenstring.len > 0) 00743 { 00744 topic = std::string((const char *) topicName.lenstring.data, (size_t) topicName.lenstring.len); 00745 }else 00746 topic = (const char *) topicName.cstring; 00747 00748 DBG("[Thread:%d]Got message for topic [%s], QoS [%d] ...\r\n", Thread::gettid(), topic.c_str(), intQoS); 00749 00750 msg.qos = (QoS) intQoS; 00751 00752 00753 // Call the handlers for each topic 00754 if (topicCBMap.find(topic) != topicCBMap.end()) 00755 { 00756 // Call the callback function 00757 if (topicCBMap[topic].attached()) 00758 { 00759 DBG("[Thread:%d]Invoking function handler for topic ...\r\n", Thread::gettid()); 00760 MessageData md(topicName, msg); 00761 topicCBMap[topic](md); 00762 00763 return 1; 00764 } 00765 } 00766 00767 // TODO: depending on the QoS 00768 // we send data to the server = PUBACK or PUBREC 00769 switch(intQoS) 00770 { 00771 case QOS0: 00772 // We send back nothing ... 00773 break; 00774 case QOS1: 00775 // TODO: implement 00776 break; 00777 case QOS2: 00778 // TODO: implement 00779 break; 00780 default: 00781 break; 00782 } 00783 00784 return 0; 00785 } 00786 00787 void MQTTThreadedClient::resetConnectionTimer() 00788 { 00789 if (keepAliveInterval > 0) 00790 { 00791 comTimer.reset(); 00792 comTimer.start(); 00793 } 00794 } 00795 00796 bool MQTTThreadedClient::hasConnectionTimedOut() 00797 { 00798 if (keepAliveInterval > 0 ) { 00799 // Check connection timer 00800 if (comTimer.read_ms() > keepAliveInterval) 00801 return true; 00802 else 00803 return false; 00804 } 00805 00806 return false; 00807 } 00808 00809 void MQTTThreadedClient::sendPingRequest() 00810 { 00811 int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE); 00812 if (len > 0 && (sendPacket(len) == SUCCESS)) // send the ping packet 00813 { 00814 printf("MQTT Ping request sent successfully ...\r\n"); 00815 } 00816 } 00817 00818 void MQTTThreadedClient::startListener() 00819 { 00820 int pType; 00821 int numsubs; 00822 // Continuesly listens for packets and dispatch 00823 // message handlers ... 00824 if (useTLS) 00825 { 00826 initTLS(); 00827 } 00828 00829 while(true) 00830 { 00831 00832 // Attempt to reconnect and login 00833 if ( connect() < 0 ) 00834 { 00835 disconnect(); 00836 // Wait for a few secs and reconnect ... 00837 Thread::wait(6000); 00838 continue; 00839 } 00840 00841 numsubs = processSubscriptions(); 00842 DBG("Subscribed %d topics ...\r\n", numsubs); 00843 00844 // loop read 00845 while(true) 00846 { 00847 pType = readPacket(); 00848 switch(pType) 00849 { 00850 case TIMEOUT: 00851 // No data available from the network ... 00852 break; 00853 case FAILURE: 00854 { 00855 DBG("readPacket returned failure \r\n"); 00856 goto reconnect; 00857 } 00858 case BUFFER_OVERFLOW: 00859 { 00860 // TODO: Network error, do we disconnect and reconnect? 00861 DBG("[Thread:%d]Failure or buffer overflow problem ... \r\n", Thread::gettid()); 00862 MBED_ASSERT(false); 00863 } 00864 break; 00865 /** 00866 * The rest of the return codes below (all positive) is about MQTT 00867 * response codes 00868 **/ 00869 case CONNACK: 00870 case PUBACK: 00871 case SUBACK: 00872 break; 00873 case PUBLISH: 00874 { 00875 DBG("[Thread:%d]Publish received!....\r\n", Thread::gettid()); 00876 // We receive data from the MQTT server .. 00877 if (handlePublishMsg() < 0) { 00878 DBG("[Thread:%d]Error handling PUBLISH message ... \r\n", Thread::gettid()); 00879 break; 00880 } 00881 } 00882 break; 00883 case PINGRESP: 00884 { 00885 printf("MQTT Got ping response ...\r\n"); 00886 resetConnectionTimer(); 00887 } 00888 break; 00889 default: 00890 DBG("[Thread:%d]Unknown/Not handled message from server pType[%d]\r\n", Thread::gettid(), pType); 00891 } 00892 00893 // Check if its time to send a keepAlive packet 00894 if (hasConnectionTimedOut()) { 00895 // Queue the ping request so that other 00896 // pending operations queued above will go first 00897 queue.call(this, &MQTTThreadedClient::sendPingRequest); 00898 } 00899 00900 // Check if we have messages on the message queue 00901 osEvent evt = mqueue.get(10); 00902 if (evt.status == osEventMessage) { 00903 00904 DBG("[Thread:%d]Got message to publish! ... \r\n", Thread::gettid()); 00905 00906 // Unpack the message 00907 PubMessage * message = (PubMessage *)evt.value.p; 00908 00909 // Send the packet, do not queue the call 00910 // like the ping above .. 00911 if ( sendPublish(*message) == SUCCESS) { 00912 // Reset timers if we have been able to send successfully 00913 resetConnectionTimer(); 00914 } else { 00915 // Disconnected? 00916 goto reconnect; 00917 } 00918 00919 // Free the message from mempool after using 00920 mpool.free(message); 00921 } 00922 00923 // Dispatch any queued events ... 00924 queue.dispatch(100); 00925 } // end while loop 00926 00927 reconnect: 00928 // reconnect? 00929 DBG("Client disconnected!! ... retrying ...\r\n"); 00930 disconnect(); 00931 00932 }; 00933 } 00934 00935 void MQTTThreadedClient::stopListener() 00936 { 00937 // TODO: Set a signal/flag that the running thread 00938 // will check if its ok to stop ... 00939 } 00940 00941 }
Generated on Tue Jul 12 2022 18:06:46 by
1.7.2