A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.
Fork of HelloMQTT by
MQTTThreadedClient.cpp
00001 #include "mbed.h" 00002 #include "rtos.h" 00003 #include "MQTTThreadedClient.h" 00004 #include "mbedtls/platform.h" 00005 #include "mbedtls/ssl.h" 00006 #include "mbedtls/entropy.h" 00007 #include "mbedtls/ctr_drbg.h" 00008 #include "mbedtls/error.h" 00009 00010 static MemoryPool<MQTT::PubMessage, 8> mpool; 00011 static Queue<MQTT::PubMessage, 8> mqueue; 00012 00013 // SSL/TLS variables 00014 mbedtls_entropy_context _entropy; 00015 mbedtls_ctr_drbg_context _ctr_drbg; 00016 mbedtls_x509_crt _cacert; 00017 mbedtls_ssl_context _ssl; 00018 mbedtls_ssl_config _ssl_conf; 00019 mbedtls_ssl_session saved_session; 00020 00021 namespace MQTT { 00022 00023 /** 00024 * Receive callback for mbed TLS 00025 */ 00026 static int ssl_recv(void *ctx, unsigned char *buf, size_t len) 00027 { 00028 int recv = -1; 00029 TCPSocket *socket = static_cast<TCPSocket *>(ctx); 00030 socket->set_timeout(DEFAULT_SOCKET_TIMEOUT); 00031 recv = socket->recv(buf, len); 00032 00033 if (NSAPI_ERROR_WOULD_BLOCK == recv) { 00034 return MBEDTLS_ERR_SSL_WANT_READ; 00035 } else if (recv < 0) { 00036 return -1; 00037 } else { 00038 return recv; 00039 } 00040 } 00041 00042 /** 00043 * Send callback for mbed TLS 00044 */ 00045 static int ssl_send(void *ctx, const unsigned char *buf, size_t len) 00046 { 00047 int sent = -1; 00048 TCPSocket *socket = static_cast<TCPSocket *>(ctx); 00049 socket->set_timeout(DEFAULT_SOCKET_TIMEOUT); 00050 sent = socket->send(buf, len); 00051 00052 if(NSAPI_ERROR_WOULD_BLOCK == sent) { 00053 return MBEDTLS_ERR_SSL_WANT_WRITE; 00054 } else if (sent < 0) { 00055 return -1; 00056 } else { 00057 return sent; 00058 } 00059 } 00060 00061 #if DEBUG_LEVEL > 0 00062 /** 00063 * Debug callback for mbed TLS 00064 * Just prints on the USB serial port 00065 */ 00066 static void my_debug(void *ctx, int level, const char *file, int line, 00067 const char *str) 00068 { 00069 const char *p, *basename; 00070 (void) ctx; 00071 00072 /* Extract basename from file */ 00073 for(p = basename = file; *p != '\0'; p++) { 00074 if(*p == '/' || *p == '\\') { 00075 basename = p + 1; 00076 } 00077 } 00078 00079 if (_debug) { 00080 mbedtls_printf("%s:%04d: |%d| %s", basename, line, level, str); 00081 } 00082 } 00083 00084 /** 00085 * Certificate verification callback for mbed TLS 00086 * Here we only use it to display information on each cert in the chain 00087 */ 00088 static int my_verify(void *data, mbedtls_x509_crt *crt, int depth, uint32_t *flags) 00089 { 00090 const uint32_t buf_size = 1024; 00091 char *buf = new char[buf_size]; 00092 (void) data; 00093 00094 if (_debug) mbedtls_printf("\nVerifying certificate at depth %d:\r\n", depth); 00095 mbedtls_x509_crt_info(buf, buf_size - 1, " ", crt); 00096 if (_debug) mbedtls_printf("%s", buf); 00097 00098 if (*flags == 0) 00099 if (_debug) mbedtls_printf("No verification issue for this certificate\r\n"); 00100 else { 00101 mbedtls_x509_crt_verify_info(buf, buf_size, " ! ", *flags); 00102 if (_debug) mbedtls_printf("%s\n", buf); 00103 } 00104 00105 delete[] buf; 00106 return 0; 00107 } 00108 #endif 00109 00110 00111 void MQTTThreadedClient::setupTLS() 00112 { 00113 if (useTLS) 00114 { 00115 mbedtls_entropy_init(&_entropy); 00116 mbedtls_ctr_drbg_init(&_ctr_drbg); 00117 mbedtls_x509_crt_init(&_cacert); 00118 mbedtls_ssl_init(&_ssl); 00119 mbedtls_ssl_config_init(&_ssl_conf); 00120 memset( &saved_session, 0, sizeof( mbedtls_ssl_session ) ); 00121 } 00122 } 00123 00124 void MQTTThreadedClient::freeTLS() 00125 { 00126 if (useTLS) 00127 { 00128 mbedtls_entropy_free(&_entropy); 00129 mbedtls_ctr_drbg_free(&_ctr_drbg); 00130 mbedtls_x509_crt_free(&_cacert); 00131 mbedtls_ssl_free(&_ssl); 00132 mbedtls_ssl_config_free(&_ssl_conf); 00133 } 00134 } 00135 00136 int MQTTThreadedClient::initTLS() 00137 { 00138 int ret; 00139 00140 DBG("Initializing TLS ...\r\n"); 00141 DBG("mbedtls_ctr_drdbg_seed ...\r\n"); 00142 if ((ret = mbedtls_ctr_drbg_seed(&_ctr_drbg, mbedtls_entropy_func, &_entropy, 00143 (const unsigned char *) DRBG_PERS, 00144 sizeof (DRBG_PERS))) != 0) { 00145 mbedtls_printf("mbedtls_crt_drbg_init returned [%x]\r\n", ret); 00146 _error = ret; 00147 return -1; 00148 } 00149 DBG("mbedtls_x509_crt_parse ...\r\n"); 00150 if ((ret = mbedtls_x509_crt_parse(&_cacert, (const unsigned char *) ssl_ca_pem, 00151 strlen(ssl_ca_pem) + 1)) != 0) { 00152 mbedtls_printf("mbedtls_x509_crt_parse returned [%x]\r\n", ret); 00153 _error = ret; 00154 return -1; 00155 } 00156 00157 DBG("mbedtls_ssl_config_defaults ...\r\n"); 00158 if ((ret = mbedtls_ssl_config_defaults(&_ssl_conf, 00159 MBEDTLS_SSL_IS_CLIENT, 00160 MBEDTLS_SSL_TRANSPORT_STREAM, 00161 MBEDTLS_SSL_PRESET_DEFAULT)) != 0) { 00162 mbedtls_printf("mbedtls_ssl_config_defaults returned [%x]\r\n", ret); 00163 _error = ret; 00164 return -1; 00165 } 00166 00167 DBG("mbedtls_ssl_config_ca_chain ...\r\n"); 00168 mbedtls_ssl_conf_ca_chain(&_ssl_conf, &_cacert, NULL); 00169 DBG("mbedtls_ssl_conf_rng ...\r\n"); 00170 mbedtls_ssl_conf_rng(&_ssl_conf, mbedtls_ctr_drbg_random, &_ctr_drbg); 00171 00172 /* It is possible to disable authentication by passing 00173 * MBEDTLS_SSL_VERIFY_NONE in the call to mbedtls_ssl_conf_authmode() 00174 */ 00175 DBG("mbedtls_ssl_conf_authmode ...\r\n"); 00176 mbedtls_ssl_conf_authmode(&_ssl_conf, MBEDTLS_SSL_VERIFY_REQUIRED); 00177 00178 #if DEBUG_LEVEL > 0 00179 mbedtls_ssl_conf_verify(&_ssl_conf, my_verify, NULL); 00180 mbedtls_ssl_conf_dbg(&_ssl_conf, my_debug, NULL); 00181 mbedtls_debug_set_threshold(DEBUG_LEVEL); 00182 #endif 00183 00184 DBG("mbedtls_ssl_setup ...\r\n"); 00185 if ((ret = mbedtls_ssl_setup(&_ssl, &_ssl_conf)) != 0) { 00186 mbedtls_printf("mbedtls_ssl_setup returned [%x]\r\n", ret); 00187 _error = ret; 00188 return -1; 00189 } 00190 00191 return 0; 00192 } 00193 00194 int MQTTThreadedClient::doTLSHandshake() 00195 { 00196 int ret; 00197 00198 /* Start the handshake, the rest will be done in onReceive() */ 00199 printf("Starting the TLS handshake...\r\n"); 00200 ret = mbedtls_ssl_handshake(&_ssl); 00201 if (ret < 0) 00202 { 00203 if (ret != MBEDTLS_ERR_SSL_WANT_READ && 00204 ret != MBEDTLS_ERR_SSL_WANT_WRITE) 00205 mbedtls_printf("mbedtls_ssl_handshake returned [%x]\r\n", ret); 00206 else 00207 { 00208 // do not close the socket if timed out 00209 ret = TIMEOUT; 00210 } 00211 return ret; 00212 } 00213 00214 /* Handshake done, time to print info */ 00215 printf("TLS connection to %s:%d established\r\n", 00216 host.c_str(), port); 00217 00218 const uint32_t buf_size = 1024; 00219 char *buf = new char[buf_size]; 00220 mbedtls_x509_crt_info(buf, buf_size, "\r ", 00221 mbedtls_ssl_get_peer_cert(&_ssl)); 00222 00223 printf("Server certificate:\r\n%s\r", buf); 00224 // Verify server cert ... 00225 uint32_t flags = mbedtls_ssl_get_verify_result(&_ssl); 00226 if( flags != 0 ) 00227 { 00228 mbedtls_x509_crt_verify_info(buf, buf_size, "\r ! ", flags); 00229 printf("Certificate verification failed:\r\n%s\r\r\n", buf); 00230 // free server cert ... before error return 00231 delete [] buf; 00232 return -1; 00233 } 00234 00235 printf("Certificate verification passed\r\n\r\n"); 00236 // delete server cert after verification 00237 delete [] buf; 00238 00239 #if defined(MBEDTLS_SSL_CLI_C) 00240 printf("Saving SSL/TLS session ...\r\n"); 00241 // TODO: Save the session here for reconnect. 00242 if( ( ret = mbedtls_ssl_get_session( &_ssl, &saved_session ) ) != 0 ) 00243 { 00244 mbedtls_printf( "mbedtls_ssl_get_session returned -0x%x\n\n", -ret ); 00245 hasSavedSession = false; 00246 return -1; 00247 } 00248 printf("Session saved for reconnect ...\r\n"); 00249 #endif 00250 00251 hasSavedSession = true; 00252 00253 return 0; 00254 } 00255 00256 int MQTTThreadedClient::readBytesToBuffer(char * buffer, size_t size, int timeout) 00257 { 00258 int rc; 00259 00260 if (tcpSocket == NULL) 00261 return -1; 00262 00263 if (useTLS) 00264 { 00265 // Do SSL/TLS read 00266 rc = mbedtls_ssl_read(&_ssl, (unsigned char *) buffer, size); 00267 if (MBEDTLS_ERR_SSL_WANT_READ == rc) 00268 return TIMEOUT; 00269 else 00270 return rc; 00271 } else { 00272 // non-blocking socket ... 00273 tcpSocket->set_timeout(timeout); 00274 rc = tcpSocket->recv( (void *) buffer, size); 00275 00276 // return 0 bytes if timeout ... 00277 if (NSAPI_ERROR_WOULD_BLOCK == rc) 00278 return TIMEOUT; 00279 else 00280 return rc; // return the number of bytes received or error 00281 } 00282 } 00283 00284 int MQTTThreadedClient::sendBytesFromBuffer(char * buffer, size_t size, int timeout) 00285 { 00286 int rc; 00287 00288 if (tcpSocket == NULL) 00289 return -1; 00290 00291 if (useTLS) { 00292 // Do SSL/TLS write 00293 rc = mbedtls_ssl_write(&_ssl, (const unsigned char *) buffer, size); 00294 if (MBEDTLS_ERR_SSL_WANT_WRITE == rc) 00295 return TIMEOUT; 00296 else 00297 return rc; 00298 } else { 00299 00300 // set the write timeout 00301 tcpSocket->set_timeout(timeout); 00302 rc = tcpSocket->send(buffer, size); 00303 00304 if ( NSAPI_ERROR_WOULD_BLOCK == rc) 00305 return TIMEOUT; 00306 else 00307 return rc; 00308 } 00309 } 00310 00311 int MQTTThreadedClient::readPacketLength(int* value) 00312 { 00313 int rc = MQTTPACKET_READ_ERROR; 00314 unsigned char c; 00315 int multiplier = 1; 00316 int len = 0; 00317 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; 00318 00319 *value = 0; 00320 do 00321 { 00322 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) 00323 { 00324 rc = MQTTPACKET_READ_ERROR; /* bad data */ 00325 goto exit; 00326 } 00327 00328 rc = readBytesToBuffer((char *) &c, 1, DEFAULT_SOCKET_TIMEOUT); 00329 if (rc != 1) 00330 { 00331 rc = MQTTPACKET_READ_ERROR; 00332 goto exit; 00333 } 00334 00335 *value += (c & 127) * multiplier; 00336 multiplier *= 128; 00337 } while ((c & 128) != 0); 00338 00339 rc = MQTTPACKET_READ_COMPLETE; 00340 00341 exit: 00342 if (rc == MQTTPACKET_READ_ERROR ) 00343 len = -1; 00344 00345 return len; 00346 } 00347 00348 int MQTTThreadedClient::sendPacket(size_t length) 00349 { 00350 int rc = FAILURE; 00351 int sent = 0; 00352 00353 while (sent < length) 00354 { 00355 rc = sendBytesFromBuffer((char *) &sendbuf[sent], length - sent, DEFAULT_SOCKET_TIMEOUT); 00356 if (rc < 0) // there was an error writing the data 00357 break; 00358 sent += rc; 00359 } 00360 00361 if (sent == length) 00362 rc = SUCCESS; 00363 else 00364 rc = FAILURE; 00365 00366 return rc; 00367 } 00368 /** 00369 * Reads the entire packet to readbuf and returns 00370 * the type of packet when successful, otherwise 00371 * a negative error code is returned. 00372 **/ 00373 int MQTTThreadedClient::readPacket() 00374 { 00375 int rc = FAILURE; 00376 MQTTHeader header = {0}; 00377 int len = 0; 00378 int rem_len = 0; 00379 00380 /* 1. read the header byte. This has the packet type in it */ 00381 if ( (rc = readBytesToBuffer((char *) &readbuf[0], 1, DEFAULT_SOCKET_TIMEOUT)) != 1) 00382 goto exit; 00383 00384 len = 1; 00385 /* 2. read the remaining length. This is variable in itself */ 00386 if ( readPacketLength(&rem_len) < 0 ) 00387 goto exit; 00388 00389 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */ 00390 00391 if (rem_len > (MAX_MQTT_PACKET_SIZE - len)) 00392 { 00393 rc = BUFFER_OVERFLOW; 00394 goto exit; 00395 } 00396 00397 /* 3. read the rest of the buffer using a callback to supply the rest of the data */ 00398 if (rem_len > 0 && (readBytesToBuffer((char *) (readbuf + len), rem_len, DEFAULT_SOCKET_TIMEOUT) != rem_len)) 00399 goto exit; 00400 00401 // Convert the header to type 00402 // and update rc 00403 header.byte = readbuf[0]; 00404 rc = header.bits.type; 00405 00406 exit: 00407 00408 return rc; 00409 } 00410 00411 /** 00412 * Read until a specified packet type is received, or untill the specified 00413 * timeout dropping packets along the way. 00414 **/ 00415 int MQTTThreadedClient::readUntil(int packetType, int timeout) 00416 { 00417 int pType = FAILURE; 00418 Timer timer; 00419 00420 timer.start(); 00421 do { 00422 pType = readPacket(); 00423 if (pType < 0) 00424 break; 00425 00426 if (timer.read_ms() > timeout) 00427 { 00428 pType = FAILURE; 00429 break; 00430 } 00431 }while(pType != packetType); 00432 00433 return pType; 00434 } 00435 00436 00437 int MQTTThreadedClient::login() 00438 { 00439 int rc = FAILURE; 00440 int len = 0; 00441 00442 if (!isConnected) 00443 { 00444 DBG("Session not connected! \r\n"); 00445 return rc; 00446 } 00447 00448 // Copy the keepAliveInterval value to local 00449 // MQTT specifies in seconds, we have to multiply that 00450 // amount for our 32 bit timers which accepts ms. 00451 keepAliveInterval = (connect_options.keepAliveInterval * 1000); 00452 00453 DBG("Login with: \r\n"); 00454 DBG("\tUsername: [%s]\r\n", connect_options.username.cstring); 00455 DBG("\tPassword: [%s]\r\n", connect_options.password.cstring); 00456 00457 if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &connect_options)) <= 0) 00458 { 00459 DBG("Error serializing connect packet ...\r\n"); 00460 return rc; 00461 } 00462 if ((rc = sendPacket((size_t) len)) != SUCCESS) // send the connect packet 00463 { 00464 DBG("Error sending the connect request packet ...\r\n"); 00465 return rc; 00466 } 00467 00468 // Wait for the CONNACK 00469 if (readUntil(CONNACK, COMMAND_TIMEOUT) == CONNACK) 00470 { 00471 unsigned char connack_rc = 255; 00472 bool sessionPresent = false; 00473 DBG("Connection acknowledgement received ... deserializing respones ...\r\n"); 00474 if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00475 rc = connack_rc; 00476 else 00477 rc = FAILURE; 00478 } 00479 else 00480 rc = FAILURE; 00481 00482 if (rc == SUCCESS) 00483 { 00484 DBG("Connected!!! ... starting connection timers ...\r\n"); 00485 resetConnectionTimer(); 00486 } 00487 00488 DBG("Returning with rc = %d\r\n", rc); 00489 00490 return rc; 00491 } 00492 00493 00494 void MQTTThreadedClient::disconnect() 00495 { 00496 if (isConnected) 00497 { 00498 if( useTLS 00499 && ( mbedtls_ssl_session_reset( &_ssl ) != 0 ) 00500 ) 00501 { 00502 DBG( "Session reset returned an error \r\n"); 00503 } 00504 00505 isConnected = false; 00506 tcpSocket->close(); 00507 } 00508 } 00509 00510 int MQTTThreadedClient::connect() 00511 { 00512 int ret = FAILURE; 00513 00514 if ((network == NULL) || (tcpSocket == NULL) 00515 || host.empty()) 00516 { 00517 DBG("Network settings not set! \r\n"); 00518 return ret; 00519 } 00520 00521 if (useTLS) 00522 { 00523 if( ( ret = mbedtls_ssl_session_reset( &_ssl ) ) != 0 ) { 00524 mbedtls_printf( " failed\n ! mbedtls_ssl_session_reset returned -0x%x\n\n", -ret ); 00525 return ret; 00526 } 00527 #if defined(MBEDTLS_SSL_CLI_C) 00528 if ( hasSavedSession && (( ret = mbedtls_ssl_set_session( &_ssl, &saved_session ) ) != 0 )) { 00529 mbedtls_printf( " failed\n ! mbedtls_ssl_conf_session returned %d\n\n", ret ); 00530 return ret; 00531 } 00532 #endif 00533 } 00534 00535 tcpSocket->open(network); 00536 if (useTLS) 00537 { 00538 DBG("mbedtls_ssl_set_hostname ...\r\n"); 00539 mbedtls_ssl_set_hostname(&_ssl, host.c_str()); 00540 DBG("mbedtls_ssl_set_bio ...\r\n"); 00541 mbedtls_ssl_set_bio(&_ssl, static_cast<void *>(tcpSocket), 00542 ssl_send, ssl_recv, NULL ); 00543 } 00544 00545 if (( ret = tcpSocket->connect(host.c_str(), port)) < 0 ) 00546 { 00547 DBG("Error connecting to %s:%d with %d\r\n", host.c_str(), port, ret); 00548 return ret; 00549 }else 00550 isConnected = true; 00551 00552 if (useTLS) 00553 { 00554 00555 if (doTLSHandshake() < 0) 00556 { 00557 DBG("TLS Handshake failed! \r\n"); 00558 return FAILURE; 00559 }else 00560 DBG("TLS Handshake complete!! \r\n"); 00561 } 00562 00563 return login(); 00564 } 00565 00566 void MQTTThreadedClient::setConnectionParameters(const char * chost, uint16_t cport, MQTTPacket_connectData & options) 00567 { 00568 // Copy the settings for reconnection 00569 host = chost; 00570 port = cport; 00571 connect_options = options; 00572 } 00573 00574 int MQTTThreadedClient::publish(PubMessage& msg) 00575 { 00576 #if 0 00577 int id = queue.call(mbed::callback(this, &MQTTThreadedClient::sendPublish), topic, message); 00578 // TODO: handle id values when the function is called later 00579 if (id == 0) 00580 return FAILURE; 00581 else 00582 return SUCCESS; 00583 #endif 00584 PubMessage *message = mpool.alloc(); 00585 // Simple copy 00586 *message = msg; 00587 00588 // Push the data to the thread 00589 DBG("Pushing data to consumer thread ...\r\n"); 00590 mqueue.put(message); 00591 00592 return SUCCESS; 00593 } 00594 00595 int MQTTThreadedClient::sendPublish(PubMessage& message) 00596 { 00597 MQTTString topicString = MQTTString_initializer; 00598 00599 if (!isConnected) 00600 { 00601 DBG("Not connected!!! ...\r\n"); 00602 return FAILURE; 00603 } 00604 00605 topicString.cstring = (char*) &message.topic[0]; 00606 int len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, message.qos, false, message.id, 00607 topicString, (unsigned char*) &message.payload[0], (int) message.payloadlen); 00608 if (len <= 0) 00609 { 00610 DBG("Failed serializing message ...\r\n"); 00611 return FAILURE; 00612 } 00613 00614 if (sendPacket(len) == SUCCESS) 00615 { 00616 DBG("Successfully sent publish packet to server ...\r\n"); 00617 return SUCCESS; 00618 } 00619 00620 DBG("Failed to send publish packet to server ...\r\n"); 00621 return FAILURE; 00622 } 00623 00624 void MQTTThreadedClient::addTopicHandler(const char * topicstr, void (*function)(MessageData &)) 00625 { 00626 // Push the subscription into the map ... 00627 FP<void,MessageData &> fp; 00628 fp.attach(function); 00629 00630 topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp)); 00631 } 00632 00633 int MQTTThreadedClient::processSubscriptions() 00634 { 00635 int numsubscribed = 0; 00636 00637 if (!isConnected) 00638 { 00639 DBG("Session not connected!!\r\n"); 00640 return 0; 00641 } 00642 00643 DBG("Processing subscribed topics ....\r\n"); 00644 00645 std::map<std::string, FP<void, MessageData &> >::iterator it; 00646 for(it = topicCBMap.begin(); it != topicCBMap.end(); it++) 00647 { 00648 int rc = FAILURE; 00649 int len = 0; 00650 //TODO: We only subscribe to QoS = 0 for now 00651 QoS qos = QOS0; 00652 00653 MQTTString topic = {(char*)it->first.c_str(), {0, 0}}; 00654 DBG("Subscribing to topic [%s]\r\n", topic.cstring); 00655 00656 00657 len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); 00658 if (len <= 0) { 00659 DBG("Error serializing subscribe packet ...\r\n"); 00660 continue; 00661 } 00662 00663 if ((rc = sendPacket(len)) != SUCCESS) { 00664 DBG("Error sending subscribe packet [%d]\r\n", rc); 00665 continue; 00666 } 00667 00668 DBG("Waiting for subscription ack ...\r\n"); 00669 // Wait for SUBACK, dropping packets read along the way ... 00670 if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback 00671 int count = 0, grantedQoS = -1; 00672 unsigned short mypacketid; 00673 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00674 rc = grantedQoS; // 0, 1, 2 or 0x80 00675 // For as long as we do not get 0x80 .. 00676 if (rc != 0x80) 00677 { 00678 // Reset connection timers here ... 00679 resetConnectionTimer(); 00680 DBG("Successfully subscribed to %s ...\r\n", it->first.c_str()); 00681 numsubscribed++; 00682 } else { 00683 DBG("Failed to subscribe to topic %s ... (not authorized?)\r\n", it->first.c_str()); 00684 } 00685 } else 00686 DBG("Failed to subscribe to topic %s (ack not received) ...\r\n", it->first.c_str()); 00687 } // end for loop 00688 00689 return numsubscribed; 00690 } 00691 00692 bool MQTTThreadedClient::isTopicMatched(char* topicFilter, MQTTString& topicName) 00693 { 00694 char* curf = topicFilter; 00695 char* curn = topicName.lenstring.data; 00696 char* curn_end = curn + topicName.lenstring.len; 00697 00698 while (*curf && curn < curn_end) 00699 { 00700 if (*curn == '/' && *curf != '/') 00701 break; 00702 if (*curf != '+' && *curf != '#' && *curf != *curn) 00703 break; 00704 if (*curf == '+') 00705 { // skip until we meet the next separator, or end of string 00706 char* nextpos = curn + 1; 00707 while (nextpos < curn_end && *nextpos != '/') 00708 nextpos = ++curn + 1; 00709 } 00710 else if (*curf == '#') 00711 curn = curn_end - 1; // skip until end of string 00712 curf++; 00713 curn++; 00714 }; 00715 00716 return (curn == curn_end) && (*curf == '\0'); 00717 } 00718 00719 int MQTTThreadedClient::handlePublishMsg() 00720 { 00721 MQTTString topicName = MQTTString_initializer; 00722 Message msg; 00723 int intQoS; 00724 DBG("Deserializing publish message ...\r\n"); 00725 if (MQTTDeserialize_publish((unsigned char*)&msg.dup, 00726 &intQoS, 00727 (unsigned char*)&msg.retained, 00728 (unsigned short*)&msg.id, 00729 &topicName, 00730 (unsigned char**)&msg.payload, 00731 (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00732 { 00733 DBG("Error deserializing published message ...\r\n"); 00734 return -1; 00735 } 00736 00737 std::string topic; 00738 if (topicName.lenstring.len > 0) 00739 { 00740 topic = std::string((const char *) topicName.lenstring.data, (size_t) topicName.lenstring.len); 00741 }else 00742 topic = (const char *) topicName.cstring; 00743 00744 DBG("Got message for topic [%s], QoS [%d] ...\r\n", topic.c_str(), intQoS); 00745 00746 msg.qos = (QoS) intQoS; 00747 00748 00749 // Call the handlers for each topic 00750 if (topicCBMap.find(topic) != topicCBMap.end()) 00751 { 00752 // Call the callback function 00753 if (topicCBMap[topic].attached()) 00754 { 00755 DBG("Invoking function handler for topic ...\r\n"); 00756 MessageData md(topicName, msg); 00757 topicCBMap[topic](md); 00758 00759 return 1; 00760 } 00761 } 00762 00763 // TODO: depending on the QoS 00764 // we send data to the server = PUBACK or PUBREC 00765 switch(intQoS) 00766 { 00767 case QOS0: 00768 // We send back nothing ... 00769 break; 00770 case QOS1: 00771 // TODO: implement 00772 break; 00773 case QOS2: 00774 // TODO: implement 00775 break; 00776 default: 00777 break; 00778 } 00779 00780 return 0; 00781 } 00782 00783 void MQTTThreadedClient::resetConnectionTimer() 00784 { 00785 if (keepAliveInterval > 0) 00786 { 00787 comTimer.reset(); 00788 comTimer.start(); 00789 } 00790 } 00791 00792 bool MQTTThreadedClient::hasConnectionTimedOut() 00793 { 00794 if (keepAliveInterval > 0 ) { 00795 // Check connection timer 00796 if (comTimer.read_ms() > keepAliveInterval) 00797 return true; 00798 else 00799 return false; 00800 } 00801 00802 return false; 00803 } 00804 00805 void MQTTThreadedClient::sendPingRequest() 00806 { 00807 int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE); 00808 if (len > 0 && (sendPacket(len) == SUCCESS)) // send the ping packet 00809 { 00810 DBG("Ping request sent successfully ...\r\n"); 00811 } 00812 } 00813 00814 void MQTTThreadedClient::startListener() 00815 { 00816 int pType; 00817 int numsubs; 00818 // Continuesly listens for packets and dispatch 00819 // message handlers ... 00820 if (useTLS) 00821 { 00822 initTLS(); 00823 } 00824 00825 while(true) 00826 { 00827 00828 // Attempt to reconnect and login 00829 if ( connect() < 0 ) 00830 { 00831 disconnect(); 00832 // Wait for a few secs and reconnect ... 00833 Thread::wait(6000); 00834 continue; 00835 } 00836 00837 numsubs = processSubscriptions(); 00838 DBG("Subscribed %d topics ...\r\n", numsubs); 00839 00840 // loop read 00841 while(true) 00842 { 00843 pType = readPacket(); 00844 switch(pType) 00845 { 00846 case TIMEOUT: 00847 // No data available from the network ... 00848 break; 00849 case FAILURE: 00850 { 00851 DBG("readPacket returned failure \r\n"); 00852 goto reconnect; 00853 } 00854 case BUFFER_OVERFLOW: 00855 { 00856 // TODO: Network error, do we disconnect and reconnect? 00857 DBG("Failure or buffer overflow problem ... \r\n"); 00858 MBED_ASSERT(false); 00859 } 00860 break; 00861 /** 00862 * The rest of the return codes below (all positive) is about MQTT 00863 * response codes 00864 **/ 00865 case CONNACK: 00866 case PUBACK: 00867 case SUBACK: 00868 break; 00869 case PUBLISH: 00870 { 00871 DBG("Publish received!....\r\n"); 00872 // We receive data from the MQTT server .. 00873 if (handlePublishMsg() < 0) { 00874 DBG("Error handling PUBLISH message ... \r\n"); 00875 break; 00876 } 00877 } 00878 break; 00879 case PINGRESP: 00880 { 00881 DBG("Got ping response ...\r\n"); 00882 resetConnectionTimer(); 00883 } 00884 break; 00885 default: 00886 DBG("Unknown/Not handled message from server pType[%d]\r\n", pType); 00887 } 00888 00889 // Check if its time to send a keepAlive packet 00890 if (hasConnectionTimedOut()) { 00891 // Queue the ping request so that other 00892 // pending operations queued above will go first 00893 queue.call(this, &MQTTThreadedClient::sendPingRequest); 00894 } 00895 00896 // Check if we have messages on the message queue 00897 osEvent evt = mqueue.get(10); 00898 if (evt.status == osEventMessage) { 00899 00900 DBG("Got message to publish! ... \r\n"); 00901 00902 // Unpack the message 00903 PubMessage * message = (PubMessage *)evt.value.p; 00904 00905 // Send the packet, do not queue the call 00906 // like the ping above .. 00907 if ( sendPublish(*message) == SUCCESS) { 00908 // Reset timers if we have been able to send successfully 00909 resetConnectionTimer(); 00910 } else { 00911 // Disconnected? 00912 goto reconnect; 00913 } 00914 00915 // Free the message from mempool after using 00916 mpool.free(message); 00917 } 00918 00919 // Dispatch any queued events ... 00920 queue.dispatch(100); 00921 } // end while loop 00922 00923 reconnect: 00924 // reconnect? 00925 DBG("Client disconnected!! ... retrying ...\r\n"); 00926 disconnect(); 00927 00928 }; 00929 } 00930 00931 void MQTTThreadedClient::stopListener() 00932 { 00933 // TODO: Set a signal/flag that the running thread 00934 // will check if its ok to stop ... 00935 } 00936 00937 }
Generated on Thu Jul 14 2022 03:56:17 by 1.7.2