Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Fork of HelloMQTT by
MQTTThreadedClient.cpp
00001 #include "mbed.h" 00002 #include "rtos.h" 00003 #include "MQTTThreadedClient.h" 00004 #include "mbedtls/platform.h" 00005 #include "mbedtls/ssl.h" 00006 #include "mbedtls/entropy.h" 00007 #include "mbedtls/ctr_drbg.h" 00008 #include "mbedtls/error.h" 00009 00010 static MemoryPool<MQTT::PubMessage, 8> mpool; 00011 static Queue<MQTT::PubMessage, 8> mqueue; 00012 00013 // SSL/TLS variables 00014 mbedtls_entropy_context _entropy; 00015 mbedtls_ctr_drbg_context _ctr_drbg; 00016 mbedtls_x509_crt _cacert; 00017 mbedtls_ssl_context _ssl; 00018 mbedtls_ssl_config _ssl_conf; 00019 mbedtls_ssl_session saved_session; 00020 00021 namespace MQTT { 00022 00023 /** 00024 * Receive callback for mbed TLS 00025 */ 00026 static int ssl_recv(void *ctx, unsigned char *buf, size_t len) 00027 { 00028 int recv = -1; 00029 TCPSocket *socket = static_cast<TCPSocket *>(ctx); 00030 socket->set_timeout(DEFAULT_SOCKET_TIMEOUT); 00031 recv = socket->recv(buf, len); 00032 00033 if (NSAPI_ERROR_WOULD_BLOCK == recv) { 00034 return MBEDTLS_ERR_SSL_WANT_READ; 00035 } else if (recv < 0) { 00036 return -1; 00037 } else { 00038 return recv; 00039 } 00040 } 00041 00042 /** 00043 * Send callback for mbed TLS 00044 */ 00045 static int ssl_send(void *ctx, const unsigned char *buf, size_t len) 00046 { 00047 int sent = -1; 00048 TCPSocket *socket = static_cast<TCPSocket *>(ctx); 00049 socket->set_timeout(DEFAULT_SOCKET_TIMEOUT); 00050 sent = socket->send(buf, len); 00051 00052 if(NSAPI_ERROR_WOULD_BLOCK == sent) { 00053 return MBEDTLS_ERR_SSL_WANT_WRITE; 00054 } else if (sent < 0) { 00055 return -1; 00056 } else { 00057 return sent; 00058 } 00059 } 00060 00061 #if DEBUG_LEVEL > 0 00062 /** 00063 * Debug callback for mbed TLS 00064 * Just prints on the USB serial port 00065 */ 00066 static void my_debug(void *ctx, int level, const char *file, int line, 00067 const char *str) 00068 { 00069 const char *p, *basename; 00070 (void) ctx; 00071 00072 /* Extract basename from file */ 00073 for(p = basename = file; *p != '\0'; p++) { 00074 if(*p == '/' || *p == '\\') { 00075 basename = p + 1; 00076 } 00077 } 00078 00079 if (_debug) { 00080 mbedtls_printf("%s:%04d: |%d| %s", basename, line, level, str); 00081 } 00082 } 00083 00084 /** 00085 * Certificate verification callback for mbed TLS 00086 * Here we only use it to display information on each cert in the chain 00087 */ 00088 static int my_verify(void *data, mbedtls_x509_crt *crt, int depth, uint32_t *flags) 00089 { 00090 const uint32_t buf_size = 1024; 00091 char *buf = new char[buf_size]; 00092 (void) data; 00093 00094 if (_debug) mbedtls_printf("\nVerifying certificate at depth %d:\r\n", depth); 00095 mbedtls_x509_crt_info(buf, buf_size - 1, " ", crt); 00096 if (_debug) mbedtls_printf("%s", buf); 00097 00098 if (*flags == 0) 00099 if (_debug) mbedtls_printf("No verification issue for this certificate\r\n"); 00100 else { 00101 mbedtls_x509_crt_verify_info(buf, buf_size, " ! ", *flags); 00102 if (_debug) mbedtls_printf("%s\n", buf); 00103 } 00104 00105 delete[] buf; 00106 return 0; 00107 } 00108 #endif 00109 00110 00111 void MQTTThreadedClient::setupTLS() 00112 { 00113 if (useTLS) 00114 { 00115 mbedtls_entropy_init(&_entropy); 00116 mbedtls_ctr_drbg_init(&_ctr_drbg); 00117 mbedtls_x509_crt_init(&_cacert); 00118 mbedtls_ssl_init(&_ssl); 00119 mbedtls_ssl_config_init(&_ssl_conf); 00120 memset( &saved_session, 0, sizeof( mbedtls_ssl_session ) ); 00121 } 00122 } 00123 00124 void MQTTThreadedClient::freeTLS() 00125 { 00126 if (useTLS) 00127 { 00128 mbedtls_entropy_free(&_entropy); 00129 mbedtls_ctr_drbg_free(&_ctr_drbg); 00130 mbedtls_x509_crt_free(&_cacert); 00131 mbedtls_ssl_free(&_ssl); 00132 mbedtls_ssl_config_free(&_ssl_conf); 00133 } 00134 } 00135 00136 int MQTTThreadedClient::initTLS() 00137 { 00138 int ret; 00139 00140 DBG("Initializing TLS ...\r\n"); 00141 DBG("mbedtls_ctr_drdbg_seed ...\r\n"); 00142 if ((ret = mbedtls_ctr_drbg_seed(&_ctr_drbg, mbedtls_entropy_func, &_entropy, 00143 (const unsigned char *) DRBG_PERS, 00144 sizeof (DRBG_PERS))) != 0) { 00145 mbedtls_printf("mbedtls_crt_drbg_init returned [%x]\r\n", ret); 00146 _error = ret; 00147 return -1; 00148 } 00149 DBG("mbedtls_x509_crt_parse ...\r\n"); 00150 if ((ret = mbedtls_x509_crt_parse(&_cacert, (const unsigned char *) ssl_ca_pem, 00151 strlen(ssl_ca_pem) + 1)) != 0) { 00152 mbedtls_printf("mbedtls_x509_crt_parse returned [%x]\r\n", ret); 00153 _error = ret; 00154 return -1; 00155 } 00156 00157 DBG("mbedtls_ssl_config_defaults ...\r\n"); 00158 if ((ret = mbedtls_ssl_config_defaults(&_ssl_conf, 00159 MBEDTLS_SSL_IS_CLIENT, 00160 MBEDTLS_SSL_TRANSPORT_STREAM, 00161 MBEDTLS_SSL_PRESET_DEFAULT)) != 0) { 00162 mbedtls_printf("mbedtls_ssl_config_defaults returned [%x]\r\n", ret); 00163 _error = ret; 00164 return -1; 00165 } 00166 00167 DBG("mbedtls_ssl_config_ca_chain ...\r\n"); 00168 mbedtls_ssl_conf_ca_chain(&_ssl_conf, &_cacert, NULL); 00169 DBG("mbedtls_ssl_conf_rng ...\r\n"); 00170 mbedtls_ssl_conf_rng(&_ssl_conf, mbedtls_ctr_drbg_random, &_ctr_drbg); 00171 00172 /* It is possible to disable authentication by passing 00173 * MBEDTLS_SSL_VERIFY_NONE in the call to mbedtls_ssl_conf_authmode() 00174 */ 00175 DBG("mbedtls_ssl_conf_authmode ...\r\n"); 00176 mbedtls_ssl_conf_authmode(&_ssl_conf, MBEDTLS_SSL_VERIFY_REQUIRED); 00177 00178 #if DEBUG_LEVEL > 0 00179 mbedtls_ssl_conf_verify(&_ssl_conf, my_verify, NULL); 00180 mbedtls_ssl_conf_dbg(&_ssl_conf, my_debug, NULL); 00181 mbedtls_debug_set_threshold(DEBUG_LEVEL); 00182 #endif 00183 00184 DBG("mbedtls_ssl_setup ...\r\n"); 00185 if ((ret = mbedtls_ssl_setup(&_ssl, &_ssl_conf)) != 0) { 00186 mbedtls_printf("mbedtls_ssl_setup returned [%x]\r\n", ret); 00187 _error = ret; 00188 return -1; 00189 } 00190 00191 return 0; 00192 } 00193 00194 int MQTTThreadedClient::doTLSHandshake() 00195 { 00196 int ret; 00197 00198 /* Start the handshake, the rest will be done in onReceive() */ 00199 printf("Starting the TLS handshake...\r\n"); 00200 ret = mbedtls_ssl_handshake(&_ssl); 00201 if (ret < 0) 00202 { 00203 if (ret != MBEDTLS_ERR_SSL_WANT_READ && 00204 ret != MBEDTLS_ERR_SSL_WANT_WRITE) 00205 mbedtls_printf("mbedtls_ssl_handshake returned [%x]\r\n", ret); 00206 else 00207 { 00208 // do not close the socket if timed out 00209 ret = TIMEOUT; 00210 } 00211 return ret; 00212 } 00213 00214 /* Handshake done, time to print info */ 00215 printf("TLS connection to %s:%d established\r\n", 00216 host.c_str(), port); 00217 00218 const uint32_t buf_size = 1024; 00219 char *buf = new char[buf_size]; 00220 mbedtls_x509_crt_info(buf, buf_size, "\r ", 00221 mbedtls_ssl_get_peer_cert(&_ssl)); 00222 00223 printf("Server certificate:\r\n%s\r", buf); 00224 // Verify server cert ... 00225 uint32_t flags = mbedtls_ssl_get_verify_result(&_ssl); 00226 if( flags != 0 ) 00227 { 00228 mbedtls_x509_crt_verify_info(buf, buf_size, "\r ! ", flags); 00229 printf("Certificate verification failed:\r\n%s\r\r\n", buf); 00230 // free server cert ... before error return 00231 delete [] buf; 00232 return -1; 00233 } 00234 00235 printf("Certificate verification passed\r\n\r\n"); 00236 // delete server cert after verification 00237 delete [] buf; 00238 00239 #if defined(MBEDTLS_SSL_CLI_C) 00240 printf("Saving SSL/TLS session ...\r\n"); 00241 // TODO: Save the session here for reconnect. 00242 if( ( ret = mbedtls_ssl_get_session( &_ssl, &saved_session ) ) != 0 ) 00243 { 00244 mbedtls_printf( "mbedtls_ssl_get_session returned -0x%x\n\n", -ret ); 00245 hasSavedSession = false; 00246 return -1; 00247 } 00248 printf("Session saved for reconnect ...\r\n"); 00249 #endif 00250 00251 hasSavedSession = true; 00252 00253 return 0; 00254 } 00255 00256 int MQTTThreadedClient::readBytesToBuffer(char * buffer, size_t size, int timeout) 00257 { 00258 int rc; 00259 00260 if (tcpSocket == NULL) 00261 return -1; 00262 00263 if (useTLS) 00264 { 00265 // Do SSL/TLS read 00266 rc = mbedtls_ssl_read(&_ssl, (unsigned char *) buffer, size); 00267 if (MBEDTLS_ERR_SSL_WANT_READ == rc) 00268 return TIMEOUT; 00269 else 00270 return rc; 00271 } else { 00272 // non-blocking socket ... 00273 tcpSocket->set_timeout(timeout); 00274 rc = tcpSocket->recv( (void *) buffer, size); 00275 00276 // return 0 bytes if timeout ... 00277 if (NSAPI_ERROR_WOULD_BLOCK == rc) 00278 return TIMEOUT; 00279 else 00280 return rc; // return the number of bytes received or error 00281 } 00282 } 00283 00284 int MQTTThreadedClient::sendBytesFromBuffer(char * buffer, size_t size, int timeout) 00285 { 00286 int rc; 00287 00288 if (tcpSocket == NULL) 00289 return -1; 00290 00291 if (useTLS) { 00292 // Do SSL/TLS write 00293 rc = mbedtls_ssl_write(&_ssl, (const unsigned char *) buffer, size); 00294 if (MBEDTLS_ERR_SSL_WANT_WRITE == rc) 00295 return TIMEOUT; 00296 else 00297 return rc; 00298 } else { 00299 00300 // set the write timeout 00301 tcpSocket->set_timeout(timeout); 00302 rc = tcpSocket->send(buffer, size); 00303 00304 if ( NSAPI_ERROR_WOULD_BLOCK == rc) 00305 return TIMEOUT; 00306 else 00307 return rc; 00308 } 00309 } 00310 00311 int MQTTThreadedClient::readPacketLength(int* value) 00312 { 00313 int rc = MQTTPACKET_READ_ERROR; 00314 unsigned char c; 00315 int multiplier = 1; 00316 int len = 0; 00317 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; 00318 00319 *value = 0; 00320 do 00321 { 00322 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) 00323 { 00324 rc = MQTTPACKET_READ_ERROR; /* bad data */ 00325 goto exit; 00326 } 00327 00328 rc = readBytesToBuffer((char *) &c, 1, DEFAULT_SOCKET_TIMEOUT); 00329 if (rc != 1) 00330 { 00331 rc = MQTTPACKET_READ_ERROR; 00332 goto exit; 00333 } 00334 00335 *value += (c & 127) * multiplier; 00336 multiplier *= 128; 00337 } while ((c & 128) != 0); 00338 00339 rc = MQTTPACKET_READ_COMPLETE; 00340 00341 exit: 00342 if (rc == MQTTPACKET_READ_ERROR ) 00343 len = -1; 00344 00345 return len; 00346 } 00347 00348 int MQTTThreadedClient::sendPacket(size_t length) 00349 { 00350 int rc = FAILURE; 00351 int sent = 0; 00352 00353 while (sent < length) 00354 { 00355 rc = sendBytesFromBuffer((char *) &sendbuf[sent], length - sent, DEFAULT_SOCKET_TIMEOUT); 00356 if (rc < 0) // there was an error writing the data 00357 break; 00358 sent += rc; 00359 } 00360 00361 if (sent == length) 00362 rc = SUCCESS; 00363 else 00364 rc = FAILURE; 00365 00366 return rc; 00367 } 00368 /** 00369 * Reads the entire packet to readbuf and returns 00370 * the type of packet when successful, otherwise 00371 * a negative error code is returned. 00372 **/ 00373 int MQTTThreadedClient::readPacket() 00374 { 00375 int rc = FAILURE; 00376 MQTTHeader header = {0}; 00377 int len = 0; 00378 int rem_len = 0; 00379 00380 /* 1. read the header byte. This has the packet type in it */ 00381 if ( (rc = readBytesToBuffer((char *) &readbuf[0], 1, DEFAULT_SOCKET_TIMEOUT)) != 1) 00382 goto exit; 00383 00384 len = 1; 00385 /* 2. read the remaining length. This is variable in itself */ 00386 if ( readPacketLength(&rem_len) < 0 ) 00387 goto exit; 00388 00389 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */ 00390 00391 if (rem_len > (MAX_MQTT_PACKET_SIZE - len)) 00392 { 00393 rc = BUFFER_OVERFLOW; 00394 goto exit; 00395 } 00396 00397 /* 3. read the rest of the buffer using a callback to supply the rest of the data */ 00398 if (rem_len > 0 && (readBytesToBuffer((char *) (readbuf + len), rem_len, DEFAULT_SOCKET_TIMEOUT) != rem_len)) 00399 goto exit; 00400 00401 // Convert the header to type 00402 // and update rc 00403 header.byte = readbuf[0]; 00404 rc = header.bits.type; 00405 00406 exit: 00407 00408 return rc; 00409 } 00410 00411 /** 00412 * Read until a specified packet type is received, or untill the specified 00413 * timeout dropping packets along the way. 00414 **/ 00415 int MQTTThreadedClient::readUntil(int packetType, int timeout) 00416 { 00417 int pType = FAILURE; 00418 Timer timer; 00419 00420 timer.start(); 00421 do { 00422 pType = readPacket(); 00423 if (pType < 0) 00424 break; 00425 00426 if (timer.read_ms() > timeout) 00427 { 00428 pType = FAILURE; 00429 break; 00430 } 00431 }while(pType != packetType); 00432 00433 return pType; 00434 } 00435 00436 00437 int MQTTThreadedClient::login() 00438 { 00439 int rc = FAILURE; 00440 int len = 0; 00441 00442 if (!isConnected) 00443 { 00444 DBG("Session not connected! \r\n"); 00445 return rc; 00446 } 00447 00448 // Copy the keepAliveInterval value to local 00449 // MQTT specifies in seconds, we have to multiply that 00450 // amount for our 32 bit timers which accepts ms. 00451 keepAliveInterval = (connect_options.keepAliveInterval * 1000); 00452 00453 DBG("Login with: \r\n"); 00454 DBG("\tUsername: [%s]\r\n", connect_options.username.cstring); 00455 DBG("\tPassword: [%s]\r\n", connect_options.password.cstring); 00456 00457 if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &connect_options)) <= 0) 00458 { 00459 DBG("Error serializing connect packet ...\r\n"); 00460 return rc; 00461 } 00462 if ((rc = sendPacket((size_t) len)) != SUCCESS) // send the connect packet 00463 { 00464 DBG("Error sending the connect request packet ...\r\n"); 00465 return rc; 00466 } 00467 00468 // Wait for the CONNACK 00469 if (readUntil(CONNACK, COMMAND_TIMEOUT) == CONNACK) 00470 { 00471 unsigned char connack_rc = 255; 00472 bool sessionPresent = false; 00473 DBG("Connection acknowledgement received ... deserializing respones ...\r\n"); 00474 if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00475 rc = connack_rc; 00476 else 00477 rc = FAILURE; 00478 } 00479 else 00480 rc = FAILURE; 00481 00482 if (rc == SUCCESS) 00483 { 00484 DBG("Connected!!! ... starting connection timers ...\r\n"); 00485 resetConnectionTimer(); 00486 } 00487 00488 DBG("Returning with rc = %d\r\n", rc); 00489 00490 return rc; 00491 } 00492 00493 00494 void MQTTThreadedClient::disconnect() 00495 { 00496 if (isConnected) 00497 { 00498 if( useTLS 00499 && ( mbedtls_ssl_session_reset( &_ssl ) != 0 ) 00500 ) 00501 { 00502 DBG( "Session reset returned an error \r\n"); 00503 } 00504 00505 isConnected = false; 00506 tcpSocket->close(); 00507 } 00508 } 00509 00510 int MQTTThreadedClient::connect() 00511 { 00512 int ret = FAILURE; 00513 00514 if ((network == NULL) || (tcpSocket == NULL) 00515 || host.empty()) 00516 { 00517 DBG("Network settings not set! \r\n"); 00518 return ret; 00519 } 00520 00521 if (useTLS) 00522 { 00523 if( ( ret = mbedtls_ssl_session_reset( &_ssl ) ) != 0 ) { 00524 mbedtls_printf( " failed\n ! mbedtls_ssl_session_reset returned -0x%x\n\n", -ret ); 00525 return ret; 00526 } 00527 #if defined(MBEDTLS_SSL_CLI_C) 00528 if ( hasSavedSession && (( ret = mbedtls_ssl_set_session( &_ssl, &saved_session ) ) != 0 )) { 00529 mbedtls_printf( " failed\n ! mbedtls_ssl_conf_session returned %d\n\n", ret ); 00530 return ret; 00531 } 00532 #endif 00533 } 00534 00535 tcpSocket->open(network); 00536 if (useTLS) 00537 { 00538 DBG("mbedtls_ssl_set_hostname ...\r\n"); 00539 mbedtls_ssl_set_hostname(&_ssl, host.c_str()); 00540 DBG("mbedtls_ssl_set_bio ...\r\n"); 00541 mbedtls_ssl_set_bio(&_ssl, static_cast<void *>(tcpSocket), 00542 ssl_send, ssl_recv, NULL ); 00543 } 00544 00545 if (( ret = tcpSocket->connect(host.c_str(), port)) < 0 ) 00546 { 00547 DBG("Error connecting to %s:%d with %d\r\n", host.c_str(), port, ret); 00548 return ret; 00549 }else 00550 isConnected = true; 00551 00552 if (useTLS) 00553 { 00554 00555 if (doTLSHandshake() < 0) 00556 { 00557 DBG("TLS Handshake failed! \r\n"); 00558 return FAILURE; 00559 }else 00560 DBG("TLS Handshake complete!! \r\n"); 00561 } 00562 00563 return login(); 00564 } 00565 00566 void MQTTThreadedClient::setConnectionParameters(const char * chost, uint16_t cport, MQTTPacket_connectData & options) 00567 { 00568 // Copy the settings for reconnection 00569 host = chost; 00570 port = cport; 00571 connect_options = options; 00572 } 00573 00574 int MQTTThreadedClient::publish(PubMessage& msg) 00575 { 00576 #if 0 00577 int id = queue.call(mbed::callback(this, &MQTTThreadedClient::sendPublish), topic, message); 00578 // TODO: handle id values when the function is called later 00579 if (id == 0) 00580 return FAILURE; 00581 else 00582 return SUCCESS; 00583 #endif 00584 PubMessage *message = mpool.alloc(); 00585 // Simple copy 00586 *message = msg; 00587 00588 // Push the data to the thread 00589 DBG("Pushing data to consumer thread ...\r\n"); 00590 mqueue.put(message); 00591 00592 return SUCCESS; 00593 } 00594 00595 int MQTTThreadedClient::sendPublish(PubMessage& message) 00596 { 00597 MQTTString topicString = MQTTString_initializer; 00598 00599 if (!isConnected) 00600 { 00601 DBG("Not connected!!! ...\r\n"); 00602 return FAILURE; 00603 } 00604 00605 topicString.cstring = (char*) &message.topic[0]; 00606 int len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, message.qos, false, message.id, 00607 topicString, (unsigned char*) &message.payload[0], (int) message.payloadlen); 00608 if (len <= 0) 00609 { 00610 DBG("Failed serializing message ...\r\n"); 00611 return FAILURE; 00612 } 00613 00614 if (sendPacket(len) == SUCCESS) 00615 { 00616 DBG("Successfully sent publish packet to server ...\r\n"); 00617 return SUCCESS; 00618 } 00619 00620 DBG("Failed to send publish packet to server ...\r\n"); 00621 return FAILURE; 00622 } 00623 00624 void MQTTThreadedClient::addTopicHandler(const char * topicstr, void (*function)(MessageData &)) 00625 { 00626 // Push the subscription into the map ... 00627 FP<void,MessageData &> fp; 00628 fp.attach(function); 00629 00630 topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp)); 00631 } 00632 00633 int MQTTThreadedClient::processSubscriptions() 00634 { 00635 int numsubscribed = 0; 00636 00637 if (!isConnected) 00638 { 00639 DBG("Session not connected!!\r\n"); 00640 return 0; 00641 } 00642 00643 DBG("Processing subscribed topics ....\r\n"); 00644 00645 std::map<std::string, FP<void, MessageData &> >::iterator it; 00646 for(it = topicCBMap.begin(); it != topicCBMap.end(); it++) 00647 { 00648 int rc = FAILURE; 00649 int len = 0; 00650 //TODO: We only subscribe to QoS = 0 for now 00651 QoS qos = QOS0; 00652 00653 MQTTString topic = {(char*)it->first.c_str(), {0, 0}}; 00654 DBG("Subscribing to topic [%s]\r\n", topic.cstring); 00655 00656 00657 len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); 00658 if (len <= 0) { 00659 DBG("Error serializing subscribe packet ...\r\n"); 00660 continue; 00661 } 00662 00663 if ((rc = sendPacket(len)) != SUCCESS) { 00664 DBG("Error sending subscribe packet [%d]\r\n", rc); 00665 continue; 00666 } 00667 00668 DBG("Waiting for subscription ack ...\r\n"); 00669 // Wait for SUBACK, dropping packets read along the way ... 00670 if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback 00671 int count = 0, grantedQoS = -1; 00672 unsigned short mypacketid; 00673 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00674 rc = grantedQoS; // 0, 1, 2 or 0x80 00675 // For as long as we do not get 0x80 .. 00676 if (rc != 0x80) 00677 { 00678 // Reset connection timers here ... 00679 resetConnectionTimer(); 00680 DBG("Successfully subscribed to %s ...\r\n", it->first.c_str()); 00681 numsubscribed++; 00682 } else { 00683 DBG("Failed to subscribe to topic %s ... (not authorized?)\r\n", it->first.c_str()); 00684 } 00685 } else 00686 DBG("Failed to subscribe to topic %s (ack not received) ...\r\n", it->first.c_str()); 00687 } // end for loop 00688 00689 return numsubscribed; 00690 } 00691 00692 bool MQTTThreadedClient::isTopicMatched(char* topicFilter, MQTTString& topicName) 00693 { 00694 char* curf = topicFilter; 00695 char* curn = topicName.lenstring.data; 00696 char* curn_end = curn + topicName.lenstring.len; 00697 00698 while (*curf && curn < curn_end) 00699 { 00700 if (*curn == '/' && *curf != '/') 00701 break; 00702 if (*curf != '+' && *curf != '#' && *curf != *curn) 00703 break; 00704 if (*curf == '+') 00705 { // skip until we meet the next separator, or end of string 00706 char* nextpos = curn + 1; 00707 while (nextpos < curn_end && *nextpos != '/') 00708 nextpos = ++curn + 1; 00709 } 00710 else if (*curf == '#') 00711 curn = curn_end - 1; // skip until end of string 00712 curf++; 00713 curn++; 00714 }; 00715 00716 return (curn == curn_end) && (*curf == '\0'); 00717 } 00718 00719 int MQTTThreadedClient::handlePublishMsg() 00720 { 00721 MQTTString topicName = MQTTString_initializer; 00722 Message msg; 00723 int intQoS; 00724 DBG("Deserializing publish message ...\r\n"); 00725 if (MQTTDeserialize_publish((unsigned char*)&msg.dup, 00726 &intQoS, 00727 (unsigned char*)&msg.retained, 00728 (unsigned short*)&msg.id, 00729 &topicName, 00730 (unsigned char**)&msg.payload, 00731 (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00732 { 00733 DBG("Error deserializing published message ...\r\n"); 00734 return -1; 00735 } 00736 00737 std::string topic; 00738 if (topicName.lenstring.len > 0) 00739 { 00740 topic = std::string((const char *) topicName.lenstring.data, (size_t) topicName.lenstring.len); 00741 }else 00742 topic = (const char *) topicName.cstring; 00743 00744 DBG("Got message for topic [%s], QoS [%d] ...\r\n", topic.c_str(), intQoS); 00745 00746 msg.qos = (QoS) intQoS; 00747 00748 00749 // Call the handlers for each topic 00750 if (topicCBMap.find(topic) != topicCBMap.end()) 00751 { 00752 // Call the callback function 00753 if (topicCBMap[topic].attached()) 00754 { 00755 DBG("Invoking function handler for topic ...\r\n"); 00756 MessageData md(topicName, msg); 00757 topicCBMap[topic](md); 00758 00759 return 1; 00760 } 00761 } 00762 00763 // TODO: depending on the QoS 00764 // we send data to the server = PUBACK or PUBREC 00765 switch(intQoS) 00766 { 00767 case QOS0: 00768 // We send back nothing ... 00769 break; 00770 case QOS1: 00771 // TODO: implement 00772 break; 00773 case QOS2: 00774 // TODO: implement 00775 break; 00776 default: 00777 break; 00778 } 00779 00780 return 0; 00781 } 00782 00783 void MQTTThreadedClient::resetConnectionTimer() 00784 { 00785 if (keepAliveInterval > 0) 00786 { 00787 comTimer.reset(); 00788 comTimer.start(); 00789 } 00790 } 00791 00792 bool MQTTThreadedClient::hasConnectionTimedOut() 00793 { 00794 if (keepAliveInterval > 0 ) { 00795 // Check connection timer 00796 if (comTimer.read_ms() > keepAliveInterval) 00797 return true; 00798 else 00799 return false; 00800 } 00801 00802 return false; 00803 } 00804 00805 void MQTTThreadedClient::sendPingRequest() 00806 { 00807 int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE); 00808 if (len > 0 && (sendPacket(len) == SUCCESS)) // send the ping packet 00809 { 00810 DBG("Ping request sent successfully ...\r\n"); 00811 } 00812 } 00813 00814 void MQTTThreadedClient::startListener() 00815 { 00816 int pType; 00817 int numsubs; 00818 // Continuesly listens for packets and dispatch 00819 // message handlers ... 00820 if (useTLS) 00821 { 00822 initTLS(); 00823 } 00824 00825 while(true) 00826 { 00827 00828 // Attempt to reconnect and login 00829 if ( connect() < 0 ) 00830 { 00831 disconnect(); 00832 // Wait for a few secs and reconnect ... 00833 Thread::wait(6000); 00834 continue; 00835 } 00836 00837 numsubs = processSubscriptions(); 00838 DBG("Subscribed %d topics ...\r\n", numsubs); 00839 00840 // loop read 00841 while(true) 00842 { 00843 pType = readPacket(); 00844 switch(pType) 00845 { 00846 case TIMEOUT: 00847 // No data available from the network ... 00848 break; 00849 case FAILURE: 00850 { 00851 DBG("readPacket returned failure \r\n"); 00852 goto reconnect; 00853 } 00854 case BUFFER_OVERFLOW: 00855 { 00856 // TODO: Network error, do we disconnect and reconnect? 00857 DBG("Failure or buffer overflow problem ... \r\n"); 00858 MBED_ASSERT(false); 00859 } 00860 break; 00861 /** 00862 * The rest of the return codes below (all positive) is about MQTT 00863 * response codes 00864 **/ 00865 case CONNACK: 00866 case PUBACK: 00867 case SUBACK: 00868 break; 00869 case PUBLISH: 00870 { 00871 DBG("Publish received!....\r\n"); 00872 // We receive data from the MQTT server .. 00873 if (handlePublishMsg() < 0) { 00874 DBG("Error handling PUBLISH message ... \r\n"); 00875 break; 00876 } 00877 } 00878 break; 00879 case PINGRESP: 00880 { 00881 DBG("Got ping response ...\r\n"); 00882 resetConnectionTimer(); 00883 } 00884 break; 00885 default: 00886 DBG("Unknown/Not handled message from server pType[%d]\r\n", pType); 00887 } 00888 00889 // Check if its time to send a keepAlive packet 00890 if (hasConnectionTimedOut()) { 00891 // Queue the ping request so that other 00892 // pending operations queued above will go first 00893 queue.call(this, &MQTTThreadedClient::sendPingRequest); 00894 } 00895 00896 // Check if we have messages on the message queue 00897 osEvent evt = mqueue.get(10); 00898 if (evt.status == osEventMessage) { 00899 00900 DBG("Got message to publish! ... \r\n"); 00901 00902 // Unpack the message 00903 PubMessage * message = (PubMessage *)evt.value.p; 00904 00905 // Send the packet, do not queue the call 00906 // like the ping above .. 00907 if ( sendPublish(*message) == SUCCESS) { 00908 // Reset timers if we have been able to send successfully 00909 resetConnectionTimer(); 00910 } else { 00911 // Disconnected? 00912 goto reconnect; 00913 } 00914 00915 // Free the message from mempool after using 00916 mpool.free(message); 00917 } 00918 00919 // Dispatch any queued events ... 00920 queue.dispatch(100); 00921 } // end while loop 00922 00923 reconnect: 00924 // reconnect? 00925 DBG("Client disconnected!! ... retrying ...\r\n"); 00926 disconnect(); 00927 00928 }; 00929 } 00930 00931 void MQTTThreadedClient::stopListener() 00932 { 00933 // TODO: Set a signal/flag that the running thread 00934 // will check if its ok to stop ... 00935 } 00936 00937 }
Generated on Thu Jul 14 2022 03:56:17 by
 1.7.2
 1.7.2 
    