Vergil Cola
/
MQTTGateway2
Fork of my original MQTTGateway
Embed:
(wiki syntax)
Show/hide line numbers
MQTTThreadedClient.cpp
00001 #include "mbed.h" 00002 #include "rtos.h" 00003 #include "MQTTThreadedClient.h" 00004 #include "mbedtls/platform.h" 00005 #include "mbedtls/ssl.h" 00006 #include "mbedtls/entropy.h" 00007 #include "mbedtls/ctr_drbg.h" 00008 #include "mbedtls/error.h" 00009 00010 #ifdef DEBUG 00011 #define DBG(fmt, args...) printf(fmt, ## args) 00012 #else 00013 #define DBG(fmt, args...) /* Don't do anything in release builds */ 00014 #endif 00015 00016 static MemoryPool<MQTT::PubMessage, 4> mpool; 00017 static Queue<MQTT::PubMessage, 4> mqueue; 00018 00019 // SSL/TLS variables 00020 mbedtls_entropy_context _entropy; 00021 mbedtls_ctr_drbg_context _ctr_drbg; 00022 mbedtls_x509_crt _cacert; 00023 mbedtls_ssl_context _ssl; 00024 mbedtls_ssl_config _ssl_conf; 00025 mbedtls_ssl_session saved_session; 00026 00027 namespace MQTT { 00028 00029 /** 00030 * Receive callback for mbed TLS 00031 */ 00032 static int ssl_recv(void *ctx, unsigned char *buf, size_t len) 00033 { 00034 int recv = -1; 00035 TCPSocket *socket = static_cast<TCPSocket *>(ctx); 00036 socket->set_timeout(DEFAULT_SOCKET_TIMEOUT); 00037 recv = socket->recv(buf, len); 00038 00039 if (NSAPI_ERROR_WOULD_BLOCK == recv) { 00040 return MBEDTLS_ERR_SSL_WANT_READ; 00041 } else if (recv < 0) { 00042 return -1; 00043 } else { 00044 return recv; 00045 } 00046 } 00047 00048 /** 00049 * Send callback for mbed TLS 00050 */ 00051 static int ssl_send(void *ctx, const unsigned char *buf, size_t len) 00052 { 00053 int sent = -1; 00054 TCPSocket *socket = static_cast<TCPSocket *>(ctx); 00055 socket->set_timeout(DEFAULT_SOCKET_TIMEOUT); 00056 sent = socket->send(buf, len); 00057 00058 if(NSAPI_ERROR_WOULD_BLOCK == sent) { 00059 return MBEDTLS_ERR_SSL_WANT_WRITE; 00060 } else if (sent < 0) { 00061 return -1; 00062 } else { 00063 return sent; 00064 } 00065 } 00066 00067 #if DEBUG_LEVEL > 0 00068 /** 00069 * Debug callback for mbed TLS 00070 * Just prints on the USB serial port 00071 */ 00072 static void my_debug(void *ctx, int level, const char *file, int line, 00073 const char *str) 00074 { 00075 const char *p, *basename; 00076 (void) ctx; 00077 00078 /* Extract basename from file */ 00079 for(p = basename = file; *p != '\0'; p++) { 00080 if(*p == '/' || *p == '\\') { 00081 basename = p + 1; 00082 } 00083 } 00084 00085 if (_debug) { 00086 mbedtls_printf("%s:%04d: |%d| %s", basename, line, level, str); 00087 } 00088 } 00089 00090 /** 00091 * Certificate verification callback for mbed TLS 00092 * Here we only use it to display information on each cert in the chain 00093 */ 00094 static int my_verify(void *data, mbedtls_x509_crt *crt, int depth, uint32_t *flags) 00095 { 00096 const uint32_t buf_size = 1024; 00097 char *buf = new char[buf_size]; 00098 (void) data; 00099 00100 if (_debug) mbedtls_printf("\nVerifying certificate at depth %d:\r\n", depth); 00101 mbedtls_x509_crt_info(buf, buf_size - 1, " ", crt); 00102 if (_debug) mbedtls_printf("%s", buf); 00103 00104 if (*flags == 0) 00105 if (_debug) mbedtls_printf("No verification issue for this certificate\r\n"); 00106 else { 00107 mbedtls_x509_crt_verify_info(buf, buf_size, " ! ", *flags); 00108 if (_debug) mbedtls_printf("%s\n", buf); 00109 } 00110 00111 delete[] buf; 00112 return 0; 00113 } 00114 #endif 00115 00116 00117 void MQTTThreadedClient::setupTLS() 00118 { 00119 if (useTLS) 00120 { 00121 mbedtls_entropy_init(&_entropy); 00122 mbedtls_ctr_drbg_init(&_ctr_drbg); 00123 mbedtls_x509_crt_init(&_cacert); 00124 mbedtls_ssl_init(&_ssl); 00125 mbedtls_ssl_config_init(&_ssl_conf); 00126 memset( &saved_session, 0, sizeof( mbedtls_ssl_session ) ); 00127 } 00128 } 00129 00130 void MQTTThreadedClient::freeTLS() 00131 { 00132 if (useTLS) 00133 { 00134 mbedtls_entropy_free(&_entropy); 00135 mbedtls_ctr_drbg_free(&_ctr_drbg); 00136 mbedtls_x509_crt_free(&_cacert); 00137 mbedtls_ssl_free(&_ssl); 00138 mbedtls_ssl_config_free(&_ssl_conf); 00139 } 00140 } 00141 00142 int MQTTThreadedClient::initTLS() 00143 { 00144 int ret; 00145 00146 DBG("Initializing TLS ...\r\n"); 00147 DBG("mbedtls_ctr_drdbg_seed ...\r\n"); 00148 if ((ret = mbedtls_ctr_drbg_seed(&_ctr_drbg, mbedtls_entropy_func, &_entropy, 00149 (const unsigned char *) DRBG_PERS, 00150 sizeof (DRBG_PERS))) != 0) { 00151 mbedtls_printf("mbedtls_crt_drbg_init returned [%x]\r\n", ret); 00152 _error = ret; 00153 return -1; 00154 } 00155 DBG("mbedtls_x509_crt_parse ...\r\n"); 00156 if ((ret = mbedtls_x509_crt_parse(&_cacert, (const unsigned char *) ssl_ca_pem, 00157 strlen(ssl_ca_pem) + 1)) != 0) { 00158 mbedtls_printf("mbedtls_x509_crt_parse returned [%x]\r\n", ret); 00159 _error = ret; 00160 return -1; 00161 } 00162 00163 DBG("mbedtls_ssl_config_defaults ...\r\n"); 00164 if ((ret = mbedtls_ssl_config_defaults(&_ssl_conf, 00165 MBEDTLS_SSL_IS_CLIENT, 00166 MBEDTLS_SSL_TRANSPORT_STREAM, 00167 MBEDTLS_SSL_PRESET_DEFAULT)) != 0) { 00168 mbedtls_printf("mbedtls_ssl_config_defaults returned [%x]\r\n", ret); 00169 _error = ret; 00170 return -1; 00171 } 00172 00173 DBG("mbedtls_ssl_config_ca_chain ...\r\n"); 00174 mbedtls_ssl_conf_ca_chain(&_ssl_conf, &_cacert, NULL); 00175 DBG("mbedtls_ssl_conf_rng ...\r\n"); 00176 mbedtls_ssl_conf_rng(&_ssl_conf, mbedtls_ctr_drbg_random, &_ctr_drbg); 00177 00178 /* It is possible to disable authentication by passing 00179 * MBEDTLS_SSL_VERIFY_NONE in the call to mbedtls_ssl_conf_authmode() 00180 */ 00181 DBG("mbedtls_ssl_conf_authmode ...\r\n"); 00182 mbedtls_ssl_conf_authmode(&_ssl_conf, MBEDTLS_SSL_VERIFY_REQUIRED); 00183 00184 #if DEBUG_LEVEL > 0 00185 mbedtls_ssl_conf_verify(&_ssl_conf, my_verify, NULL); 00186 mbedtls_ssl_conf_dbg(&_ssl_conf, my_debug, NULL); 00187 mbedtls_debug_set_threshold(DEBUG_LEVEL); 00188 #endif 00189 00190 DBG("mbedtls_ssl_setup ...\r\n"); 00191 if ((ret = mbedtls_ssl_setup(&_ssl, &_ssl_conf)) != 0) { 00192 mbedtls_printf("mbedtls_ssl_setup returned [%x]\r\n", ret); 00193 _error = ret; 00194 return -1; 00195 } 00196 00197 return 0; 00198 } 00199 00200 int MQTTThreadedClient::doTLSHandshake() 00201 { 00202 int ret; 00203 00204 /* Start the handshake, the rest will be done in onReceive() */ 00205 DBG("Starting the TLS handshake...\r\n"); 00206 ret = mbedtls_ssl_handshake(&_ssl); 00207 if (ret < 0) 00208 { 00209 if (ret != MBEDTLS_ERR_SSL_WANT_READ && 00210 ret != MBEDTLS_ERR_SSL_WANT_WRITE) 00211 mbedtls_printf("mbedtls_ssl_handshake returned [%x]\r\n", ret); 00212 else 00213 { 00214 // do not close the socket if timed out 00215 ret = TIMEOUT; 00216 } 00217 return ret; 00218 } 00219 00220 /* Handshake done, time to print info */ 00221 DBG("TLS connection to %s:%d established\r\n", 00222 host.c_str(), port); 00223 00224 const uint32_t buf_size = 1024; 00225 char *buf = new char[buf_size]; 00226 mbedtls_x509_crt_info(buf, buf_size, "\r ", 00227 mbedtls_ssl_get_peer_cert(&_ssl)); 00228 00229 DBG("Server certificate:\r\n%s\r", buf); 00230 // Verify server cert ... 00231 uint32_t flags = mbedtls_ssl_get_verify_result(&_ssl); 00232 if( flags != 0 ) 00233 { 00234 mbedtls_x509_crt_verify_info(buf, buf_size, "\r ! ", flags); 00235 DBG("Certificate verification failed:\r\n%s\r\r\n", buf); 00236 // free server cert ... before error return 00237 delete [] buf; 00238 return -1; 00239 } 00240 00241 DBG("Certificate verification passed\r\n\r\n"); 00242 // delete server cert after verification 00243 delete [] buf; 00244 00245 #if defined(MBEDTLS_SSL_CLI_C) 00246 // TODO: Save the session here for reconnect. 00247 if( ( ret = mbedtls_ssl_get_session( &_ssl, &saved_session ) ) != 0 ) 00248 { 00249 mbedtls_printf( "mbedtls_ssl_get_session returned -0x%x\n\n", -ret ); 00250 hasSavedSession = false; 00251 return -1; 00252 } 00253 #endif 00254 DBG("Session saved for reconnect ...\r\n"); 00255 hasSavedSession = true; 00256 00257 return 0; 00258 } 00259 00260 int MQTTThreadedClient::readBytesToBuffer(char * buffer, size_t size, int timeout) 00261 { 00262 int rc; 00263 00264 if (tcpSocket == NULL) 00265 return -1; 00266 00267 if (useTLS) 00268 { 00269 // Do SSL/TLS read 00270 rc = mbedtls_ssl_read(&_ssl, (unsigned char *) buffer, size); 00271 if (MBEDTLS_ERR_SSL_WANT_READ == rc) 00272 return TIMEOUT; 00273 else 00274 return rc; 00275 } else { 00276 // non-blocking socket ... 00277 tcpSocket->set_timeout(timeout); 00278 rc = tcpSocket->recv( (void *) buffer, size); 00279 00280 // return 0 bytes if timeout ... 00281 if (NSAPI_ERROR_WOULD_BLOCK == rc) 00282 return TIMEOUT; 00283 else 00284 return rc; // return the number of bytes received or error 00285 } 00286 } 00287 00288 int MQTTThreadedClient::sendBytesFromBuffer(char * buffer, size_t size, int timeout) 00289 { 00290 int rc; 00291 00292 if (tcpSocket == NULL) 00293 return -1; 00294 00295 if (useTLS) { 00296 // Do SSL/TLS write 00297 rc = mbedtls_ssl_write(&_ssl, (const unsigned char *) buffer, size); 00298 if (MBEDTLS_ERR_SSL_WANT_WRITE == rc) 00299 return TIMEOUT; 00300 else 00301 return rc; 00302 } else { 00303 00304 // set the write timeout 00305 tcpSocket->set_timeout(timeout); 00306 rc = tcpSocket->send(buffer, size); 00307 00308 if ( NSAPI_ERROR_WOULD_BLOCK == rc) 00309 return TIMEOUT; 00310 else 00311 return rc; 00312 } 00313 } 00314 00315 int MQTTThreadedClient::readPacketLength(int* value) 00316 { 00317 int rc = MQTTPACKET_READ_ERROR; 00318 unsigned char c; 00319 int multiplier = 1; 00320 int len = 0; 00321 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; 00322 00323 *value = 0; 00324 do 00325 { 00326 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) 00327 { 00328 rc = MQTTPACKET_READ_ERROR; /* bad data */ 00329 goto exit; 00330 } 00331 00332 rc = readBytesToBuffer((char *) &c, 1, DEFAULT_SOCKET_TIMEOUT); 00333 if (rc != 1) 00334 { 00335 rc = MQTTPACKET_READ_ERROR; 00336 goto exit; 00337 } 00338 00339 *value += (c & 127) * multiplier; 00340 multiplier *= 128; 00341 } while ((c & 128) != 0); 00342 00343 rc = MQTTPACKET_READ_COMPLETE; 00344 00345 exit: 00346 if (rc == MQTTPACKET_READ_ERROR ) 00347 len = -1; 00348 00349 return len; 00350 } 00351 00352 int MQTTThreadedClient::sendPacket(size_t length) 00353 { 00354 int rc = FAILURE; 00355 int sent = 0; 00356 00357 while (sent < length) 00358 { 00359 rc = sendBytesFromBuffer((char *) &sendbuf[sent], length - sent, DEFAULT_SOCKET_TIMEOUT); 00360 if (rc < 0) // there was an error writing the data 00361 break; 00362 sent += rc; 00363 } 00364 00365 if (sent == length) 00366 rc = SUCCESS; 00367 else 00368 rc = FAILURE; 00369 00370 return rc; 00371 } 00372 /** 00373 * Reads the entire packet to readbuf and returns 00374 * the type of packet when successful, otherwise 00375 * a negative error code is returned. 00376 **/ 00377 int MQTTThreadedClient::readPacket() 00378 { 00379 int rc = FAILURE; 00380 MQTTHeader header = {0}; 00381 int len = 0; 00382 int rem_len = 0; 00383 00384 /* 1. read the header byte. This has the packet type in it */ 00385 if ( (rc = readBytesToBuffer((char *) &readbuf[0], 1, DEFAULT_SOCKET_TIMEOUT)) != 1) 00386 goto exit; 00387 00388 len = 1; 00389 /* 2. read the remaining length. This is variable in itself */ 00390 if ( readPacketLength(&rem_len) < 0 ) 00391 goto exit; 00392 00393 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */ 00394 00395 if (rem_len > (MAX_MQTT_PACKET_SIZE - len)) 00396 { 00397 rc = BUFFER_OVERFLOW; 00398 goto exit; 00399 } 00400 00401 /* 3. read the rest of the buffer using a callback to supply the rest of the data */ 00402 if (rem_len > 0 && (readBytesToBuffer((char *) (readbuf + len), rem_len, DEFAULT_SOCKET_TIMEOUT) != rem_len)) 00403 goto exit; 00404 00405 // Convert the header to type 00406 // and update rc 00407 header.byte = readbuf[0]; 00408 rc = header.bits.type; 00409 00410 exit: 00411 00412 return rc; 00413 } 00414 00415 /** 00416 * Read until a specified packet type is received, or untill the specified 00417 * timeout dropping packets along the way. 00418 **/ 00419 int MQTTThreadedClient::readUntil(int packetType, int timeout) 00420 { 00421 int pType = FAILURE; 00422 Timer timer; 00423 00424 timer.start(); 00425 do { 00426 pType = readPacket(); 00427 if (pType < 0) 00428 break; 00429 00430 if (timer.read_ms() > timeout) 00431 { 00432 pType = FAILURE; 00433 break; 00434 } 00435 }while(pType != packetType); 00436 00437 return pType; 00438 } 00439 00440 00441 int MQTTThreadedClient::login() 00442 { 00443 int rc = FAILURE; 00444 int len = 0; 00445 00446 if (!isConnected) 00447 { 00448 DBG("Session not connected! \r\n"); 00449 return rc; 00450 } 00451 00452 // Copy the keepAliveInterval value to local 00453 // MQTT specifies in seconds, we have to multiply that 00454 // amount for our 32 bit timers which accepts ms. 00455 keepAliveInterval = (connect_options.keepAliveInterval * 1000); 00456 00457 DBG("Login with: \r\n"); 00458 DBG("\tUsername: [%s]\r\n", connect_options.username.cstring); 00459 DBG("\tPassword: [%s]\r\n", connect_options.password.cstring); 00460 00461 if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &connect_options)) <= 0) 00462 { 00463 DBG("Error serializing connect packet ...\r\n"); 00464 return rc; 00465 } 00466 if ((rc = sendPacket((size_t) len)) != SUCCESS) // send the connect packet 00467 { 00468 DBG("Error sending the connect request packet ...\r\n"); 00469 return rc; 00470 } 00471 00472 // Wait for the CONNACK 00473 if (readUntil(CONNACK, COMMAND_TIMEOUT) == CONNACK) 00474 { 00475 unsigned char connack_rc = 255; 00476 bool sessionPresent = false; 00477 DBG("Connection acknowledgement received ... deserializing respones ...\r\n"); 00478 if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00479 rc = connack_rc; 00480 else 00481 rc = FAILURE; 00482 } 00483 else 00484 rc = FAILURE; 00485 00486 if (rc == SUCCESS) 00487 { 00488 DBG("Connected!!! ... starting connection timers ...\r\n"); 00489 resetConnectionTimer(); 00490 } 00491 00492 DBG("Returning with rc = %d\r\n", rc); 00493 00494 return rc; 00495 } 00496 00497 00498 void MQTTThreadedClient::disconnect() 00499 { 00500 if (isConnected) 00501 { 00502 if( useTLS 00503 && ( mbedtls_ssl_session_reset( &_ssl ) != 0 ) 00504 ) 00505 { 00506 DBG( "Session reset returned an error \r\n"); 00507 } 00508 00509 isConnected = false; 00510 tcpSocket->close(); 00511 } 00512 } 00513 00514 int MQTTThreadedClient::connect() 00515 { 00516 int ret = FAILURE; 00517 00518 if ((network == NULL) || (tcpSocket == NULL) 00519 || host.empty()) 00520 { 00521 DBG("Network settings not set! \r\n"); 00522 return ret; 00523 } 00524 00525 if (useTLS) 00526 { 00527 if( ( ret = mbedtls_ssl_session_reset( &_ssl ) ) != 0 ) { 00528 mbedtls_printf( " failed\n ! mbedtls_ssl_session_reset returned -0x%x\n\n", -ret ); 00529 return ret; 00530 } 00531 #if defined(MBEDTLS_SSL_CLI_C) 00532 if ( hasSavedSession && (( ret = mbedtls_ssl_set_session( &_ssl, &saved_session ) ) != 0 )) { 00533 mbedtls_printf( " failed\n ! mbedtls_ssl_conf_session returned %d\n\n", ret ); 00534 return ret; 00535 } 00536 #endif 00537 } 00538 00539 tcpSocket->open(network); 00540 if (useTLS) 00541 { 00542 DBG("mbedtls_ssl_set_hostname ...\r\n"); 00543 mbedtls_ssl_set_hostname(&_ssl, host.c_str()); 00544 DBG("mbedtls_ssl_set_bio ...\r\n"); 00545 mbedtls_ssl_set_bio(&_ssl, static_cast<void *>(tcpSocket), 00546 ssl_send, ssl_recv, NULL ); 00547 } 00548 00549 if (( ret = tcpSocket->connect(host.c_str(), port)) < 0 ) 00550 { 00551 DBG("Error connecting to %s:%d with %d\r\n", host.c_str(), port, ret); 00552 return ret; 00553 }else 00554 isConnected = true; 00555 00556 if (useTLS) 00557 { 00558 00559 if (doTLSHandshake() < 0) 00560 { 00561 DBG("TLS Handshake failed! \r\n"); 00562 return FAILURE; 00563 }else 00564 DBG("TLS Handshake complete!! \r\n"); 00565 } 00566 00567 return login(); 00568 } 00569 00570 void MQTTThreadedClient::setConnectionParameters(const char * chost, uint16_t cport, MQTTPacket_connectData & options) 00571 { 00572 // Copy the settings for reconnection 00573 host = chost; 00574 port = cport; 00575 connect_options = options; 00576 } 00577 00578 int MQTTThreadedClient::publish(PubMessage& msg) 00579 { 00580 #if 0 00581 int id = queue.call(mbed::callback(this, &MQTTThreadedClient::sendPublish), topic, message); 00582 // TODO: handle id values when the function is called later 00583 if (id == 0) 00584 return FAILURE; 00585 else 00586 return SUCCESS; 00587 #endif 00588 PubMessage *message = mpool.alloc(); 00589 // Simple copy 00590 *message = msg; 00591 00592 // Push the data to the thread 00593 DBG("[Thread:%d] Pushing data to consumer thread ...\r\n", Thread::gettid()); 00594 mqueue.put(message); 00595 00596 return SUCCESS; 00597 } 00598 00599 int MQTTThreadedClient::sendPublish(PubMessage& message) 00600 { 00601 MQTTString topicString = MQTTString_initializer; 00602 00603 if (!isConnected) 00604 { 00605 DBG("[Thread:%d] Not connected!!! ...\r\n", Thread::gettid()); 00606 return FAILURE; 00607 } 00608 00609 topicString.cstring = (char*) &message.topic[0]; 00610 int len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, message.qos, false, message.id, 00611 topicString, (unsigned char*) &message.payload[0], (int) message.payloadlen); 00612 if (len <= 0) 00613 { 00614 DBG("[Thread:%d]Failed serializing message ...\r\n", Thread::gettid()); 00615 return FAILURE; 00616 } 00617 00618 if (sendPacket(len) == SUCCESS) 00619 { 00620 DBG("[Thread:%d]Successfully sent publish packet to server ...\r\n", Thread::gettid()); 00621 return SUCCESS; 00622 } 00623 00624 DBG("[Thread:%d]Failed to send publish packet to server ...\r\n", Thread::gettid()); 00625 return FAILURE; 00626 } 00627 00628 void MQTTThreadedClient::addTopicHandler(const char * topicstr, void (*function)(MessageData &)) 00629 { 00630 // Push the subscription into the map ... 00631 FP<void,MessageData &> fp; 00632 fp.attach(function); 00633 00634 topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp)); 00635 } 00636 00637 int MQTTThreadedClient::processSubscriptions() 00638 { 00639 int numsubscribed = 0; 00640 00641 if (!isConnected) 00642 { 00643 DBG("Session not connected!!\r\n"); 00644 return 0; 00645 } 00646 00647 DBG("Processing subscribed topics ....\r\n"); 00648 00649 std::map<std::string, FP<void, MessageData &> >::iterator it; 00650 for(it = topicCBMap.begin(); it != topicCBMap.end(); it++) 00651 { 00652 int rc = FAILURE; 00653 int len = 0; 00654 //TODO: We only subscribe to QoS = 0 for now 00655 QoS qos = QOS0; 00656 00657 MQTTString topic = {(char*)it->first.c_str(), {0, 0}}; 00658 DBG("Subscribing to topic [%s]\r\n", topic.cstring); 00659 00660 00661 len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); 00662 if (len <= 0) { 00663 DBG("Error serializing subscribe packet ...\r\n"); 00664 continue; 00665 } 00666 00667 if ((rc = sendPacket(len)) != SUCCESS) { 00668 DBG("Error sending subscribe packet [%d]\r\n", rc); 00669 continue; 00670 } 00671 00672 DBG("Waiting for subscription ack ...\r\n"); 00673 // Wait for SUBACK, dropping packets read along the way ... 00674 if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback 00675 int count = 0, grantedQoS = -1; 00676 unsigned short mypacketid; 00677 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) 00678 rc = grantedQoS; // 0, 1, 2 or 0x80 00679 // For as long as we do not get 0x80 .. 00680 if (rc != 0x80) 00681 { 00682 // Reset connection timers here ... 00683 resetConnectionTimer(); 00684 DBG("Successfully subscribed to %s ...\r\n", it->first.c_str()); 00685 numsubscribed++; 00686 } else { 00687 DBG("Failed to subscribe to topic %s ... (not authorized?)\r\n", it->first.c_str()); 00688 } 00689 } else 00690 DBG("Failed to subscribe to topic %s (ack not received) ...\r\n", it->first.c_str()); 00691 } // end for loop 00692 00693 return numsubscribed; 00694 } 00695 00696 bool MQTTThreadedClient::isTopicMatched(char* topicFilter, MQTTString& topicName) 00697 { 00698 char* curf = topicFilter; 00699 char* curn = topicName.lenstring.data; 00700 char* curn_end = curn + topicName.lenstring.len; 00701 00702 while (*curf && curn < curn_end) 00703 { 00704 if (*curn == '/' && *curf != '/') 00705 break; 00706 if (*curf != '+' && *curf != '#' && *curf != *curn) 00707 break; 00708 if (*curf == '+') 00709 { // skip until we meet the next separator, or end of string 00710 char* nextpos = curn + 1; 00711 while (nextpos < curn_end && *nextpos != '/') 00712 nextpos = ++curn + 1; 00713 } 00714 else if (*curf == '#') 00715 curn = curn_end - 1; // skip until end of string 00716 curf++; 00717 curn++; 00718 }; 00719 00720 return (curn == curn_end) && (*curf == '\0'); 00721 } 00722 00723 int MQTTThreadedClient::handlePublishMsg() 00724 { 00725 MQTTString topicName = MQTTString_initializer; 00726 Message msg; 00727 int intQoS; 00728 DBG("[Thread:%d]Deserializing publish message ...\r\n", Thread::gettid()); 00729 if (MQTTDeserialize_publish((unsigned char*)&msg.dup, 00730 &intQoS, 00731 (unsigned char*)&msg.retained, 00732 (unsigned short*)&msg.id, 00733 &topicName, 00734 (unsigned char**)&msg.payload, 00735 (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1) 00736 { 00737 DBG("[Thread:%d]Error deserializing published message ...\r\n", Thread::gettid()); 00738 return -1; 00739 } 00740 00741 std::string topic; 00742 if (topicName.lenstring.len > 0) 00743 { 00744 topic = std::string((const char *) topicName.lenstring.data, (size_t) topicName.lenstring.len); 00745 }else 00746 topic = (const char *) topicName.cstring; 00747 00748 DBG("[Thread:%d]Got message for topic [%s], QoS [%d] ...\r\n", Thread::gettid(), topic.c_str(), intQoS); 00749 00750 msg.qos = (QoS) intQoS; 00751 00752 00753 // Call the handlers for each topic 00754 if (topicCBMap.find(topic) != topicCBMap.end()) 00755 { 00756 // Call the callback function 00757 if (topicCBMap[topic].attached()) 00758 { 00759 DBG("[Thread:%d]Invoking function handler for topic ...\r\n", Thread::gettid()); 00760 MessageData md(topicName, msg); 00761 topicCBMap[topic](md); 00762 00763 return 1; 00764 } 00765 } 00766 00767 // TODO: depending on the QoS 00768 // we send data to the server = PUBACK or PUBREC 00769 switch(intQoS) 00770 { 00771 case QOS0: 00772 // We send back nothing ... 00773 break; 00774 case QOS1: 00775 // TODO: implement 00776 break; 00777 case QOS2: 00778 // TODO: implement 00779 break; 00780 default: 00781 break; 00782 } 00783 00784 return 0; 00785 } 00786 00787 void MQTTThreadedClient::resetConnectionTimer() 00788 { 00789 if (keepAliveInterval > 0) 00790 { 00791 comTimer.reset(); 00792 comTimer.start(); 00793 } 00794 } 00795 00796 bool MQTTThreadedClient::hasConnectionTimedOut() 00797 { 00798 if (keepAliveInterval > 0 ) { 00799 // Check connection timer 00800 if (comTimer.read_ms() > keepAliveInterval) 00801 return true; 00802 else 00803 return false; 00804 } 00805 00806 return false; 00807 } 00808 00809 void MQTTThreadedClient::sendPingRequest() 00810 { 00811 int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE); 00812 if (len > 0 && (sendPacket(len) == SUCCESS)) // send the ping packet 00813 { 00814 printf("MQTT Ping request sent successfully ...\r\n"); 00815 } 00816 } 00817 00818 void MQTTThreadedClient::startListener() 00819 { 00820 int pType; 00821 int numsubs; 00822 // Continuesly listens for packets and dispatch 00823 // message handlers ... 00824 if (useTLS) 00825 { 00826 initTLS(); 00827 } 00828 00829 while(true) 00830 { 00831 00832 // Attempt to reconnect and login 00833 if ( connect() < 0 ) 00834 { 00835 disconnect(); 00836 // Wait for a few secs and reconnect ... 00837 Thread::wait(6000); 00838 continue; 00839 } 00840 00841 numsubs = processSubscriptions(); 00842 DBG("Subscribed %d topics ...\r\n", numsubs); 00843 00844 // loop read 00845 while(true) 00846 { 00847 pType = readPacket(); 00848 switch(pType) 00849 { 00850 case TIMEOUT: 00851 // No data available from the network ... 00852 break; 00853 case FAILURE: 00854 { 00855 DBG("readPacket returned failure \r\n"); 00856 goto reconnect; 00857 } 00858 case BUFFER_OVERFLOW: 00859 { 00860 // TODO: Network error, do we disconnect and reconnect? 00861 DBG("[Thread:%d]Failure or buffer overflow problem ... \r\n", Thread::gettid()); 00862 MBED_ASSERT(false); 00863 } 00864 break; 00865 /** 00866 * The rest of the return codes below (all positive) is about MQTT 00867 * response codes 00868 **/ 00869 case CONNACK: 00870 case PUBACK: 00871 case SUBACK: 00872 break; 00873 case PUBLISH: 00874 { 00875 DBG("[Thread:%d]Publish received!....\r\n", Thread::gettid()); 00876 // We receive data from the MQTT server .. 00877 if (handlePublishMsg() < 0) { 00878 DBG("[Thread:%d]Error handling PUBLISH message ... \r\n", Thread::gettid()); 00879 break; 00880 } 00881 } 00882 break; 00883 case PINGRESP: 00884 { 00885 printf("MQTT Got ping response ...\r\n"); 00886 resetConnectionTimer(); 00887 } 00888 break; 00889 default: 00890 DBG("[Thread:%d]Unknown/Not handled message from server pType[%d]\r\n", Thread::gettid(), pType); 00891 } 00892 00893 // Check if its time to send a keepAlive packet 00894 if (hasConnectionTimedOut()) { 00895 // Queue the ping request so that other 00896 // pending operations queued above will go first 00897 queue.call(this, &MQTTThreadedClient::sendPingRequest); 00898 } 00899 00900 // Check if we have messages on the message queue 00901 osEvent evt = mqueue.get(10); 00902 if (evt.status == osEventMessage) { 00903 00904 DBG("[Thread:%d]Got message to publish! ... \r\n", Thread::gettid()); 00905 00906 // Unpack the message 00907 PubMessage * message = (PubMessage *)evt.value.p; 00908 00909 // Send the packet, do not queue the call 00910 // like the ping above .. 00911 if ( sendPublish(*message) == SUCCESS) { 00912 // Reset timers if we have been able to send successfully 00913 resetConnectionTimer(); 00914 } else { 00915 // Disconnected? 00916 goto reconnect; 00917 } 00918 00919 // Free the message from mempool after using 00920 mpool.free(message); 00921 } 00922 00923 // Dispatch any queued events ... 00924 queue.dispatch(100); 00925 } // end while loop 00926 00927 reconnect: 00928 // reconnect? 00929 DBG("Client disconnected!! ... retrying ...\r\n"); 00930 disconnect(); 00931 00932 }; 00933 } 00934 00935 void MQTTThreadedClient::stopListener() 00936 { 00937 // TODO: Set a signal/flag that the running thread 00938 // will check if its ok to stop ... 00939 } 00940 00941 }
Generated on Tue Jul 12 2022 18:06:46 by 1.7.2