Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
MQTTClient.cpp
00001 /* 00002 MQTTClient.cpp - A simple client for MQTT. 00003 Nicholas O'Leary 00004 http://knolleary.net 00005 00006 Ported to mbed by Zoltan Hudak <hudakz@outlook.com> 00007 */ 00008 #include "MQTTClient.h" 00009 #include <string.h> 00010 #include <time.h> 00011 00012 #define UIPETHERNET_DEBUG 1 00013 00014 /** 00015 * @brief 00016 * @note 00017 * @param 00018 * @retval 00019 */ 00020 MQTTClient::MQTTClient() : 00021 _client(NULL), 00022 _stream(NULL), 00023 _onMessage(NULL) 00024 { } 00025 00026 /** 00027 * @brief 00028 * @note 00029 * @param 00030 * @retval 00031 */ 00032 MQTTClient::MQTTClient(IpAddress& ip, uint16_t port) : 00033 _ip(ip), 00034 _domain(NULL), 00035 _port(port), 00036 _stream(NULL), 00037 _onMessage(NULL) 00038 { } 00039 00040 /** 00041 * @brief 00042 * @note 00043 * @param 00044 * @retval 00045 */ 00046 MQTTClient::MQTTClient(const char* domain, uint16_t port) : 00047 _domain((char*)domain), 00048 _port(port), 00049 _stream(NULL), 00050 _onMessage(NULL) 00051 { } 00052 00053 /** 00054 * @brief 00055 * @note 00056 * @param 00057 * @retval 00058 */ 00059 MQTTClient::MQTTClient(IpAddress& ip, uint16_t port, Stream& stream) : 00060 _ip(ip), 00061 _domain(NULL), 00062 _port(port), 00063 _stream(&stream), 00064 _onMessage(NULL) 00065 { } 00066 00067 /** 00068 * @brief 00069 * @note 00070 * @param 00071 * @retval 00072 */ 00073 MQTTClient::MQTTClient(const char* domain, uint16_t port, Stream& stream) : 00074 _domain((char*)domain), 00075 _port(port), 00076 _stream(&stream), 00077 _onMessage(NULL) 00078 { } 00079 00080 /** 00081 * @brief 00082 * @note 00083 * @param 00084 * @retval 00085 */ 00086 bool MQTTClient::connect(const char* id) 00087 { 00088 return connect(id, NULL, NULL, 0, 0, 0, 0); 00089 } 00090 00091 /** 00092 * @brief 00093 * @note 00094 * @param 00095 * @retval 00096 */ 00097 bool MQTTClient::connect(const char* id, const char* user, const char* pass) 00098 { 00099 return connect(id, user, pass, 0, 0, 0, 0); 00100 } 00101 00102 /** 00103 * @brief 00104 * @note 00105 * @param 00106 * @retval 00107 */ 00108 bool MQTTClient::connect 00109 ( 00110 const char* id, 00111 const char* willTopic, 00112 uint8_t willQos, 00113 uint8_t willRetain, 00114 const char* willMessage 00115 ) 00116 { 00117 return connect(id, NULL, NULL, willTopic, willQos, willRetain, willMessage); 00118 } 00119 00120 /** 00121 * @brief 00122 * @note 00123 * @param 00124 * @retval 00125 */ 00126 bool MQTTClient::connect 00127 ( 00128 const char* id, 00129 const char* user, 00130 const char* pass, 00131 const char* willTopic, 00132 uint8_t willQos, 00133 uint8_t willRetain, 00134 const char* willMessage 00135 ) 00136 { 00137 if (!connected()) { 00138 #ifdef UIPETHERNET_DEBUG 00139 printf("MQTTClient::connect \r\n"); 00140 #endif 00141 int result = 0; 00142 00143 if (_domain != NULL) { 00144 result = !_client.connect(this->_domain, this->_port); 00145 } 00146 else { 00147 result = _client.connect(this->_ip, this->_port); 00148 } 00149 00150 if (result) { 00151 _nextMsgId = 1; 00152 00153 uint8_t d[] = { 0x00, 0x06, 'M', 'Q', 'I', 's', 'd', 'p', MQTTPROTOCOLVERSION }; 00154 00155 // Leave room in the buffer for header and variable length field 00156 uint16_t pos = 5; 00157 uint16_t j; 00158 for (j = 0; j < sizeof(d); j++) { 00159 _buffer[pos++] = d[j]; 00160 } 00161 00162 uint8_t v; 00163 if (willTopic) { 00164 v = 0x06 | (willQos << 3) | (willRetain << 5); 00165 } 00166 else { 00167 v = 0x02; 00168 } 00169 00170 if (user != NULL) { 00171 v = v | 0x80; 00172 00173 if (pass != NULL) { 00174 v = v | (0x80 >> 1); 00175 } 00176 } 00177 00178 _buffer[pos++] = v; 00179 00180 _buffer[pos++] = ((MQTT_KEEPALIVE) >> 8); 00181 _buffer[pos++] = ((MQTT_KEEPALIVE) & 0xFF); 00182 pos = _writeString(id, _buffer, pos); 00183 if (willTopic) { 00184 pos = _writeString(willTopic, _buffer, pos); 00185 pos = _writeString(willMessage, _buffer, pos); 00186 } 00187 00188 if (user != NULL) { 00189 pos = _writeString(user, _buffer, pos); 00190 if (pass != NULL) { 00191 pos = _writeString(pass, _buffer, pos); 00192 } 00193 } 00194 00195 _write(MQTTCONNECT, _buffer, pos - 5); 00196 00197 _lastInActivity = _lastOutActivity = time(NULL); 00198 00199 while (!_client.available()) { 00200 unsigned long t = time(NULL); 00201 if (t - _lastInActivity > MQTT_KEEPALIVE) { 00202 _client.stop(); 00203 #ifdef UIPETHERNET_DEBUG 00204 printf("MQTTClient::connect available \r\n"); 00205 #endif 00206 return false; 00207 } 00208 } 00209 00210 uint8_t len; 00211 00212 if (_readPacket(&len) == 4 && _buffer[3] == 0) { 00213 _lastInActivity = time(NULL); 00214 _pingOutstanding = false; 00215 #ifdef UIPETHERNET_DEBUG 00216 printf("MQTTClient::connect 4 \r\n"); 00217 #endif 00218 return true; 00219 } 00220 } 00221 00222 _client.stop(); 00223 } 00224 #ifdef UIPETHERNET_DEBUG 00225 printf("MQTTClient::connect End \r\n"); 00226 #endif 00227 00228 return false; 00229 } 00230 00231 /** 00232 * @brief 00233 * @note 00234 * @param 00235 * @retval 00236 */ 00237 uint8_t MQTTClient::_readByte() 00238 { 00239 while (!_client.available()) { } 00240 00241 return _client.recv(); 00242 } 00243 00244 /** 00245 * @brief 00246 * @note 00247 * @param 00248 * @retval 00249 */ 00250 uint16_t MQTTClient::_readPacket(uint8_t* length) 00251 { 00252 #ifdef UIPETHERNET_DEBUG 00253 printf("MQTTClient::_readPacket \r\n"); 00254 #endif 00255 00256 00257 uint16_t len = 0; 00258 _buffer[len++] = _readByte(); 00259 00260 bool isPublish = (_buffer[0] & 0xF0) == MQTTPUBLISH; 00261 uint32_t multiplier = 1; 00262 uint16_t pos = 0; 00263 uint8_t digit = 0; 00264 uint16_t skip = 0; 00265 uint8_t start = 0; 00266 00267 do { 00268 digit = _readByte(); 00269 _buffer[len++] = digit; 00270 pos += (digit & 127) * multiplier; 00271 multiplier *= 128; 00272 } while ((digit & 128) != 0); 00273 *length = len - 1; 00274 00275 if (isPublish) { 00276 // Read in topic length to calculate bytes to skip over for Stream writing 00277 _buffer[len++] = _readByte(); 00278 _buffer[len++] = _readByte(); 00279 skip = (_buffer[*length + 1] << 8) + _buffer[*length + 2]; 00280 start = 2; 00281 if (_buffer[0] & MQTTQOS1) { 00282 // skip message id 00283 skip += 2; 00284 } 00285 } 00286 00287 for (uint16_t i = start; i < pos; i++) { 00288 digit = _readByte(); 00289 if (this->_stream) { 00290 if (isPublish && len -*length - 2 > skip) { 00291 this->_stream->putc(digit); 00292 } 00293 } 00294 00295 if (len < MQTT_MAX_PACKET_SIZE) { 00296 _buffer[len] = digit; 00297 } 00298 00299 len++; 00300 } 00301 00302 if (!this->_stream && len > MQTT_MAX_PACKET_SIZE) { 00303 len = 0; // This will cause the packet to be ignored. 00304 } 00305 00306 #ifdef UIPETHERNET_DEBUG 00307 printf("MQTTClient::_readPacket End \r\n"); 00308 #endif 00309 00310 return len; 00311 } 00312 00313 /** 00314 * @brief 00315 * @note 00316 * @param 00317 * @retval 00318 */ 00319 bool MQTTClient::poll() 00320 { 00321 00322 #ifdef UIPETHERNET_DEBUG 00323 // printf("bool MQTTClient::poll() \r\n"); 00324 #endif 00325 00326 if (connected()) { 00327 time_t now = time(NULL); 00328 if ((now - _lastInActivity > MQTT_KEEPALIVE) || (now - _lastOutActivity > MQTT_KEEPALIVE)) { 00329 if (_pingOutstanding) { 00330 _client.stop(); 00331 return false; 00332 } 00333 else { 00334 _buffer[0] = MQTTPINGREQ; 00335 _buffer[1] = 0; 00336 _client.send(_buffer, 2); 00337 _lastOutActivity = now; 00338 _lastInActivity = now; 00339 _pingOutstanding = true; 00340 } 00341 } 00342 00343 if (_client.available()) { 00344 uint8_t len; 00345 uint16_t length = _readPacket(&len); 00346 uint16_t msgId = 0; 00347 uint8_t* payload; 00348 if (length > 0) { 00349 _lastInActivity = now; 00350 00351 uint8_t type = _buffer[0] & 0xF0; 00352 if (type == MQTTPUBLISH) { 00353 if (_onMessage) { 00354 uint16_t topicLen = (_buffer[len + 1] << 8) + _buffer[len + 2]; 00355 char topic[topicLen + 1]; 00356 for (uint16_t i = 0; i < topicLen; i++) { 00357 topic[i] = _buffer[len + 3 + i]; 00358 } 00359 00360 topic[topicLen] = '\0'; 00361 00362 // msgId only present for QOS>0 00363 if ((_buffer[0] & 0x06) == MQTTQOS1) { 00364 msgId = (_buffer[len + 3 + topicLen] << 8) + _buffer[len + 3 + topicLen + 1]; 00365 payload = _buffer + len + 3 + topicLen + 2; 00366 _onMessage(topic, payload, length - len - 3 - topicLen - 2); 00367 00368 _buffer[0] = MQTTPUBACK; 00369 _buffer[1] = 2; 00370 _buffer[2] = (msgId >> 8); 00371 _buffer[3] = (msgId & 0xFF); 00372 _client.send(_buffer, 4); 00373 _lastOutActivity = now; 00374 } 00375 else { 00376 payload = _buffer + len + 3 + topicLen; 00377 _onMessage(topic, payload, length - len - 3 - topicLen); 00378 } 00379 } 00380 } 00381 else 00382 if (type == MQTTPINGREQ) { 00383 _buffer[0] = MQTTPINGRESP; 00384 _buffer[1] = 0; 00385 _client.send(_buffer, 2); 00386 } 00387 else 00388 if (type == MQTTPINGRESP) { 00389 _pingOutstanding = false; 00390 } 00391 } 00392 } 00393 00394 return true; 00395 } 00396 00397 #ifdef UIPETHERNET_DEBUG 00398 printf("bool MQTTClient::poll() End \r\n"); 00399 #endif 00400 00401 return false; 00402 } 00403 00404 /** 00405 * @brief 00406 * @note 00407 * @param 00408 * @retval 00409 */ 00410 bool MQTTClient::publish(const char* topic, const char* payload) 00411 { 00412 return publish(topic, (uint8_t*)payload, strlen(payload), false); 00413 } 00414 00415 /** 00416 * @brief 00417 * @note 00418 * @param 00419 * @retval 00420 */ 00421 bool MQTTClient::publish(const char* topic, uint8_t* payload, uint16_t length) 00422 { 00423 return publish(topic, payload, length, false); 00424 } 00425 00426 /** 00427 * @brief 00428 * @note 00429 * @param 00430 * @retval 00431 */ 00432 bool MQTTClient::publish(const char* topic, uint8_t* payload, uint16_t plength, bool retained) 00433 { 00434 00435 #ifdef UIPETHERNET_DEBUG 00436 printf("MQTTClient::publish \r\n"); 00437 #endif 00438 00439 if (connected()) { 00440 // Leave room in the buffer for header and variable length field 00441 uint16_t length = 5; 00442 length = _writeString(topic, _buffer, length); 00443 00444 uint16_t i; 00445 for (i = 0; i < plength; i++) { 00446 _buffer[length++] = payload[i]; 00447 } 00448 00449 uint8_t header = MQTTPUBLISH; 00450 if (retained) { 00451 header |= 1; 00452 } 00453 00454 return _write(header, _buffer, length - 5); 00455 } 00456 00457 #ifdef UIPETHERNET_DEBUG 00458 printf("MQTTClient::publish End \r\n"); 00459 #endif 00460 00461 return false; 00462 } 00463 00464 /** 00465 * @brief 00466 * @note 00467 * @param 00468 * @retval 00469 */ 00470 bool MQTTClient::_write(uint8_t header, uint8_t* buf, uint16_t length) 00471 { 00472 uint8_t digitBuf[4]; 00473 uint8_t digitLen = 0; 00474 uint8_t digit; 00475 uint8_t pos = 0; 00476 uint8_t rc; 00477 uint8_t len = length; 00478 do { 00479 digit = len % 128; 00480 len = len / 128; 00481 if (len > 0) { 00482 digit |= 0x80; 00483 } 00484 00485 digitBuf[pos++] = digit; 00486 digitLen++; 00487 } while (len > 0); 00488 00489 buf[4 - digitLen] = header; 00490 for (int i = 0; i < digitLen; i++) { 00491 buf[5 - digitLen + i] = digitBuf[i]; 00492 } 00493 00494 rc = _client.send(buf + (4 - digitLen), length + 1 + digitLen); 00495 00496 _lastOutActivity = time(NULL); 00497 return(rc == 1 + digitLen + length); 00498 } 00499 00500 /** 00501 * @brief 00502 * @note 00503 * @param 00504 * @retval 00505 */ 00506 bool MQTTClient::subscribe(const char* topic) 00507 { 00508 bool result = subscribe(topic, 0); 00509 #if MBED_MAJOR_VERSION == 2 00510 wait_ms(50); 00511 #else 00512 thread_sleep_for(50); 00513 #endif 00514 00515 return result; 00516 } 00517 00518 /** 00519 * @brief 00520 * @note 00521 * @param 00522 * @retval 00523 */ 00524 bool MQTTClient::subscribe(const char* topic, uint8_t qos) 00525 { 00526 #ifdef UIPETHERNET_DEBUG 00527 printf("MQTTClient::subscribe \r\n"); 00528 #endif 00529 00530 if (qos > 1) 00531 return false; 00532 00533 if (connected()) { 00534 // Leave room in the buffer for header and variable length field 00535 uint16_t length = 5; 00536 _nextMsgId++; 00537 if (_nextMsgId == 0) { 00538 _nextMsgId = 1; 00539 } 00540 00541 _buffer[length++] = (_nextMsgId >> 8); 00542 _buffer[length++] = (_nextMsgId & 0xFF); 00543 length = _writeString(topic, _buffer, length); 00544 _buffer[length++] = qos; 00545 return _write(MQTTSUBSCRIBE | MQTTQOS1, _buffer, length - 5); 00546 } 00547 00548 #ifdef UIPETHERNET_DEBUG 00549 printf("MQTTClient::subscribe End \r\n"); 00550 #endif 00551 00552 return false; 00553 } 00554 00555 /** 00556 * @brief 00557 * @note 00558 * @param 00559 * @retval 00560 */ 00561 bool MQTTClient::unsubscribe(const char* topic) 00562 { 00563 #ifdef UIPETHERNET_DEBUG 00564 printf("MQTTClient::unsubscribe \r\n"); 00565 #endif 00566 00567 if (connected()) { 00568 uint16_t length = 5; 00569 _nextMsgId++; 00570 if (_nextMsgId == 0) { 00571 _nextMsgId = 1; 00572 } 00573 00574 _buffer[length++] = (_nextMsgId >> 8); 00575 _buffer[length++] = (_nextMsgId & 0xFF); 00576 length = _writeString(topic, _buffer, length); 00577 return _write(MQTTUNSUBSCRIBE | MQTTQOS1, _buffer, length - 5); 00578 } 00579 00580 #ifdef UIPETHERNET_DEBUG 00581 printf("MQTTClient::unsubscribe End \r\n"); 00582 #endif 00583 00584 return false; 00585 } 00586 00587 /** 00588 * @brief 00589 * @note 00590 * @param 00591 * @retval 00592 */ 00593 void MQTTClient::disconnect() 00594 { 00595 #ifdef UIPETHERNET_DEBUG 00596 printf("MQTTClient::disconnect() \r\n"); 00597 #endif 00598 00599 _buffer[0] = MQTTDISCONNECT; 00600 _buffer[1] = 0; 00601 _client.send(_buffer, 2); 00602 _client.close(); 00603 _client.stop(); 00604 _lastInActivity = _lastOutActivity = time(NULL); 00605 00606 #ifdef UIPETHERNET_DEBUG 00607 printf("MQTTClient::disconnect() End \r\n"); 00608 #endif 00609 00610 } 00611 00612 /** 00613 * @brief 00614 * @note 00615 * @param 00616 * @retval 00617 */ 00618 uint16_t MQTTClient::_writeString(const char* string, uint8_t* buf, uint16_t length) 00619 { 00620 char* idp = (char*)string; 00621 uint16_t i = 0; 00622 00623 length += 2; 00624 while (*idp) { 00625 buf[length++] = *idp++; 00626 i++; 00627 } 00628 00629 buf[length - i - 2] = (i >> 8); 00630 buf[length - i - 1] = (i & 0xFF); 00631 return length; 00632 } 00633 00634 /** 00635 * @brief 00636 * @note 00637 * @param 00638 * @retval 00639 */ 00640 bool MQTTClient::connected() 00641 { 00642 bool rc = (int)_client.connected(); 00643 00644 if (!rc) 00645 _client.stop(); 00646 00647 return rc; 00648 } 00649 00650 /** 00651 * @brief 00652 * @note 00653 * @param 00654 * @retval 00655 */ 00656 void MQTTClient::attach(Callback<void (char *, uint8_t *, uint16_t)> fnc) 00657 { 00658 _onMessage = fnc; 00659 }
Generated on Sat Jul 16 2022 08:45:21 by
1.7.2