Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependents: mBuino_ENC28_MQTT MQTT_DHT11_ENC28J60 MQTT_DHT11_ENC28J60_tushar MQTT_Hello_ENC28J60
MQTTClient.cpp
00001 /* 00002 MQTTClient.cpp - A simple client for MQTT. 00003 Nicholas O'Leary 00004 http://knolleary.net 00005 00006 Ported to mbed by Zoltan Hudak <hudakz@outlook.com> 00007 */ 00008 #include "MQTTClient.h" 00009 #include <string.h> 00010 #include <time.h> 00011 00012 /** 00013 * @brief 00014 * @note 00015 * @param 00016 * @retval 00017 */ 00018 MQTTClient::MQTTClient() : 00019 _client(NULL), 00020 _stream(NULL), 00021 _onMessage(NULL) 00022 { } 00023 00024 /** 00025 * @brief 00026 * @note 00027 * @param 00028 * @retval 00029 */ 00030 MQTTClient::MQTTClient(IpAddress& ip, uint16_t port) : 00031 _ip(ip), 00032 _domain(NULL), 00033 _port(port), 00034 _stream(NULL), 00035 _onMessage(NULL) 00036 { } 00037 00038 /** 00039 * @brief 00040 * @note 00041 * @param 00042 * @retval 00043 */ 00044 MQTTClient::MQTTClient(const char* domain, uint16_t port) : 00045 _domain((char*)domain), 00046 _port(port), 00047 _stream(NULL), 00048 _onMessage(NULL) 00049 { } 00050 00051 /** 00052 * @brief 00053 * @note 00054 * @param 00055 * @retval 00056 */ 00057 MQTTClient::MQTTClient(IpAddress& ip, uint16_t port, Stream& stream) : 00058 _ip(ip), 00059 _domain(NULL), 00060 _port(port), 00061 _stream(&stream), 00062 _onMessage(NULL) 00063 { } 00064 00065 /** 00066 * @brief 00067 * @note 00068 * @param 00069 * @retval 00070 */ 00071 MQTTClient::MQTTClient(const char* domain, uint16_t port, Stream& stream) : 00072 _domain((char*)domain), 00073 _port(port), 00074 _stream(&stream), 00075 _onMessage(NULL) 00076 { } 00077 00078 /** 00079 * @brief 00080 * @note 00081 * @param 00082 * @retval 00083 */ 00084 bool MQTTClient::connect(const char* id) 00085 { 00086 return connect(id, NULL, NULL, 0, 0, 0, 0); 00087 } 00088 00089 /** 00090 * @brief 00091 * @note 00092 * @param 00093 * @retval 00094 */ 00095 bool MQTTClient::connect(const char* id, const char* user, const char* pass) 00096 { 00097 return connect(id, user, pass, 0, 0, 0, 0); 00098 } 00099 00100 /** 00101 * @brief 00102 * @note 00103 * @param 00104 * @retval 00105 */ 00106 bool MQTTClient::connect 00107 ( 00108 const char* id, 00109 const char* willTopic, 00110 uint8_t willQos, 00111 uint8_t willRetain, 00112 const char* willMessage 00113 ) 00114 { 00115 return connect(id, NULL, NULL, willTopic, willQos, willRetain, willMessage); 00116 } 00117 00118 /** 00119 * @brief 00120 * @note 00121 * @param 00122 * @retval 00123 */ 00124 bool MQTTClient::connect 00125 ( 00126 const char* id, 00127 const char* user, 00128 const char* pass, 00129 const char* willTopic, 00130 uint8_t willQos, 00131 uint8_t willRetain, 00132 const char* willMessage 00133 ) 00134 { 00135 if (!connected()) { 00136 int result = 0; 00137 00138 if (_domain != NULL) { 00139 result = !_client.connect(this->_domain, this->_port); 00140 } 00141 else { 00142 result = _client.connect(this->_ip, this->_port); 00143 } 00144 00145 if (result) { 00146 _nextMsgId = 1; 00147 00148 uint8_t d[] = { 0x00, 0x06, 'M', 'Q', 'I', 's', 'd', 'p', MQTTPROTOCOLVERSION }; 00149 00150 // Leave room in the buffer for header and variable length field 00151 uint16_t pos = 5; 00152 uint16_t j; 00153 for (j = 0; j < sizeof(d); j++) { 00154 _buffer[pos++] = d[j]; 00155 } 00156 00157 uint8_t v; 00158 if (willTopic) { 00159 v = 0x06 | (willQos << 3) | (willRetain << 5); 00160 } 00161 else { 00162 v = 0x02; 00163 } 00164 00165 if (user != NULL) { 00166 v = v | 0x80; 00167 00168 if (pass != NULL) { 00169 v = v | (0x80 >> 1); 00170 } 00171 } 00172 00173 _buffer[pos++] = v; 00174 00175 _buffer[pos++] = ((MQTT_KEEPALIVE) >> 8); 00176 _buffer[pos++] = ((MQTT_KEEPALIVE) & 0xFF); 00177 pos = _writeString(id, _buffer, pos); 00178 if (willTopic) { 00179 pos = _writeString(willTopic, _buffer, pos); 00180 pos = _writeString(willMessage, _buffer, pos); 00181 } 00182 00183 if (user != NULL) { 00184 pos = _writeString(user, _buffer, pos); 00185 if (pass != NULL) { 00186 pos = _writeString(pass, _buffer, pos); 00187 } 00188 } 00189 00190 _write(MQTTCONNECT, _buffer, pos - 5); 00191 00192 _lastInActivity = _lastOutActivity = time(NULL); 00193 00194 while (!_client.available()) { 00195 unsigned long t = time(NULL); 00196 if (t - _lastInActivity > MQTT_KEEPALIVE) { 00197 _client.stop(); 00198 return false; 00199 } 00200 } 00201 00202 uint8_t len; 00203 00204 if (_readPacket(&len) == 4 && _buffer[3] == 0) { 00205 _lastInActivity = time(NULL); 00206 _pingOutstanding = false; 00207 return true; 00208 } 00209 } 00210 00211 _client.stop(); 00212 } 00213 00214 return false; 00215 } 00216 00217 /** 00218 * @brief 00219 * @note 00220 * @param 00221 * @retval 00222 */ 00223 uint8_t MQTTClient::_readByte() 00224 { 00225 while (!_client.available()) { } 00226 00227 return _client.recv(); 00228 } 00229 00230 /** 00231 * @brief 00232 * @note 00233 * @param 00234 * @retval 00235 */ 00236 uint16_t MQTTClient::_readPacket(uint8_t* length) 00237 { 00238 uint16_t len = 0; 00239 _buffer[len++] = _readByte(); 00240 00241 bool isPublish = (_buffer[0] & 0xF0) == MQTTPUBLISH; 00242 uint32_t multiplier = 1; 00243 uint16_t pos = 0; 00244 uint8_t digit = 0; 00245 uint16_t skip = 0; 00246 uint8_t start = 0; 00247 00248 do { 00249 digit = _readByte(); 00250 _buffer[len++] = digit; 00251 pos += (digit & 127) * multiplier; 00252 multiplier *= 128; 00253 } while ((digit & 128) != 0); 00254 *length = len - 1; 00255 00256 if (isPublish) { 00257 // Read in topic length to calculate bytes to skip over for Stream writing 00258 _buffer[len++] = _readByte(); 00259 _buffer[len++] = _readByte(); 00260 skip = (_buffer[*length + 1] << 8) + _buffer[*length + 2]; 00261 start = 2; 00262 if (_buffer[0] & MQTTQOS1) { 00263 // skip message id 00264 skip += 2; 00265 } 00266 } 00267 00268 for (uint16_t i = start; i < pos; i++) { 00269 digit = _readByte(); 00270 if (this->_stream) { 00271 if (isPublish && len -*length - 2 > skip) { 00272 this->_stream->putc(digit); 00273 } 00274 } 00275 00276 if (len < MQTT_MAX_PACKET_SIZE) { 00277 _buffer[len] = digit; 00278 } 00279 00280 len++; 00281 } 00282 00283 if (!this->_stream && len > MQTT_MAX_PACKET_SIZE) { 00284 len = 0; // This will cause the packet to be ignored. 00285 } 00286 00287 return len; 00288 } 00289 00290 /** 00291 * @brief 00292 * @note 00293 * @param 00294 * @retval 00295 */ 00296 bool MQTTClient::poll() 00297 { 00298 if (connected()) { 00299 time_t now = time(NULL); 00300 if ((now - _lastInActivity > MQTT_KEEPALIVE) || (now - _lastOutActivity > MQTT_KEEPALIVE)) { 00301 if (_pingOutstanding) { 00302 _client.stop(); 00303 return false; 00304 } 00305 else { 00306 _buffer[0] = MQTTPINGREQ; 00307 _buffer[1] = 0; 00308 _client.send(_buffer, 2); 00309 _lastOutActivity = now; 00310 _lastInActivity = now; 00311 _pingOutstanding = true; 00312 } 00313 } 00314 00315 if (_client.available()) { 00316 uint8_t len; 00317 uint16_t length = _readPacket(&len); 00318 uint16_t msgId = 0; 00319 uint8_t* payload; 00320 if (length > 0) { 00321 _lastInActivity = now; 00322 00323 uint8_t type = _buffer[0] & 0xF0; 00324 if (type == MQTTPUBLISH) { 00325 if (_onMessage) { 00326 uint16_t topicLen = (_buffer[len + 1] << 8) + _buffer[len + 2]; 00327 char topic[topicLen + 1]; 00328 for (uint16_t i = 0; i < topicLen; i++) { 00329 topic[i] = _buffer[len + 3 + i]; 00330 } 00331 00332 topic[topicLen] = '\0'; 00333 00334 // msgId only present for QOS>0 00335 if ((_buffer[0] & 0x06) == MQTTQOS1) { 00336 msgId = (_buffer[len + 3 + topicLen] << 8) + _buffer[len + 3 + topicLen + 1]; 00337 payload = _buffer + len + 3 + topicLen + 2; 00338 _onMessage(topic, payload, length - len - 3 - topicLen - 2); 00339 00340 _buffer[0] = MQTTPUBACK; 00341 _buffer[1] = 2; 00342 _buffer[2] = (msgId >> 8); 00343 _buffer[3] = (msgId & 0xFF); 00344 _client.send(_buffer, 4); 00345 _lastOutActivity = now; 00346 } 00347 else { 00348 payload = _buffer + len + 3 + topicLen; 00349 _onMessage(topic, payload, length - len - 3 - topicLen); 00350 } 00351 } 00352 } 00353 else 00354 if (type == MQTTPINGREQ) { 00355 _buffer[0] = MQTTPINGRESP; 00356 _buffer[1] = 0; 00357 _client.send(_buffer, 2); 00358 } 00359 else 00360 if (type == MQTTPINGRESP) { 00361 _pingOutstanding = false; 00362 } 00363 } 00364 } 00365 00366 return true; 00367 } 00368 00369 return false; 00370 } 00371 00372 /** 00373 * @brief 00374 * @note 00375 * @param 00376 * @retval 00377 */ 00378 bool MQTTClient::publish(const char* topic, const char* payload) 00379 { 00380 return publish(topic, (uint8_t*)payload, strlen(payload), false); 00381 } 00382 00383 /** 00384 * @brief 00385 * @note 00386 * @param 00387 * @retval 00388 */ 00389 bool MQTTClient::publish(const char* topic, uint8_t* payload, uint16_t length) 00390 { 00391 return publish(topic, payload, length, false); 00392 } 00393 00394 /** 00395 * @brief 00396 * @note 00397 * @param 00398 * @retval 00399 */ 00400 bool MQTTClient::publish(const char* topic, uint8_t* payload, uint16_t plength, bool retained) 00401 { 00402 if (connected()) { 00403 // Leave room in the buffer for header and variable length field 00404 uint16_t length = 5; 00405 length = _writeString(topic, _buffer, length); 00406 00407 uint16_t i; 00408 for (i = 0; i < plength; i++) { 00409 _buffer[length++] = payload[i]; 00410 } 00411 00412 uint8_t header = MQTTPUBLISH; 00413 if (retained) { 00414 header |= 1; 00415 } 00416 00417 return _write(header, _buffer, length - 5); 00418 } 00419 00420 return false; 00421 } 00422 00423 /** 00424 * @brief 00425 * @note 00426 * @param 00427 * @retval 00428 */ 00429 bool MQTTClient::_write(uint8_t header, uint8_t* buf, uint16_t length) 00430 { 00431 uint8_t digitBuf[4]; 00432 uint8_t digitLen = 0; 00433 uint8_t digit; 00434 uint8_t pos = 0; 00435 uint8_t rc; 00436 uint8_t len = length; 00437 do { 00438 digit = len % 128; 00439 len = len / 128; 00440 if (len > 0) { 00441 digit |= 0x80; 00442 } 00443 00444 digitBuf[pos++] = digit; 00445 digitLen++; 00446 } while (len > 0); 00447 00448 buf[4 - digitLen] = header; 00449 for (int i = 0; i < digitLen; i++) { 00450 buf[5 - digitLen + i] = digitBuf[i]; 00451 } 00452 00453 rc = _client.send(buf + (4 - digitLen), length + 1 + digitLen); 00454 00455 _lastOutActivity = time(NULL); 00456 return(rc == 1 + digitLen + length); 00457 } 00458 00459 /** 00460 * @brief 00461 * @note 00462 * @param 00463 * @retval 00464 */ 00465 bool MQTTClient::subscribe(const char* topic) 00466 { 00467 bool result = subscribe(topic, 0); 00468 #if MBED_MAJOR_VERSION == 2 00469 wait_ms(50); 00470 #else 00471 thread_sleep_for(50); 00472 #endif 00473 00474 return result; 00475 } 00476 00477 /** 00478 * @brief 00479 * @note 00480 * @param 00481 * @retval 00482 */ 00483 bool MQTTClient::subscribe(const char* topic, uint8_t qos) 00484 { 00485 if (qos > 1) 00486 return false; 00487 00488 if (connected()) { 00489 // Leave room in the buffer for header and variable length field 00490 uint16_t length = 5; 00491 _nextMsgId++; 00492 if (_nextMsgId == 0) { 00493 _nextMsgId = 1; 00494 } 00495 00496 _buffer[length++] = (_nextMsgId >> 8); 00497 _buffer[length++] = (_nextMsgId & 0xFF); 00498 length = _writeString(topic, _buffer, length); 00499 _buffer[length++] = qos; 00500 return _write(MQTTSUBSCRIBE | MQTTQOS1, _buffer, length - 5); 00501 } 00502 00503 return false; 00504 } 00505 00506 /** 00507 * @brief 00508 * @note 00509 * @param 00510 * @retval 00511 */ 00512 bool MQTTClient::unsubscribe(const char* topic) 00513 { 00514 if (connected()) { 00515 uint16_t length = 5; 00516 _nextMsgId++; 00517 if (_nextMsgId == 0) { 00518 _nextMsgId = 1; 00519 } 00520 00521 _buffer[length++] = (_nextMsgId >> 8); 00522 _buffer[length++] = (_nextMsgId & 0xFF); 00523 length = _writeString(topic, _buffer, length); 00524 return _write(MQTTUNSUBSCRIBE | MQTTQOS1, _buffer, length - 5); 00525 } 00526 00527 return false; 00528 } 00529 00530 /** 00531 * @brief 00532 * @note 00533 * @param 00534 * @retval 00535 */ 00536 void MQTTClient::disconnect() 00537 { 00538 _buffer[0] = MQTTDISCONNECT; 00539 _buffer[1] = 0; 00540 _client.send(_buffer, 2); 00541 _client.stop(); 00542 _lastInActivity = _lastOutActivity = time(NULL); 00543 } 00544 00545 /** 00546 * @brief 00547 * @note 00548 * @param 00549 * @retval 00550 */ 00551 uint16_t MQTTClient::_writeString(const char* string, uint8_t* buf, uint16_t length) 00552 { 00553 char* idp = (char*)string; 00554 uint16_t i = 0; 00555 00556 length += 2; 00557 while (*idp) { 00558 buf[length++] = *idp++; 00559 i++; 00560 } 00561 00562 buf[length - i - 2] = (i >> 8); 00563 buf[length - i - 1] = (i & 0xFF); 00564 return length; 00565 } 00566 00567 /** 00568 * @brief 00569 * @note 00570 * @param 00571 * @retval 00572 */ 00573 bool MQTTClient::connected() 00574 { 00575 bool rc = (int)_client.connected(); 00576 00577 if (!rc) 00578 _client.stop(); 00579 00580 return rc; 00581 } 00582 00583 /** 00584 * @brief 00585 * @note 00586 * @param 00587 * @retval 00588 */ 00589 void MQTTClient::attach(Callback<void (char *, uint8_t *, uint16_t)> fnc) 00590 { 00591 _onMessage = fnc; 00592 }
Generated on Thu Jul 14 2022 11:00:49 by
1.7.2