Zoltan Hudak / MQTTClient

Dependents:   mBuino_ENC28_MQTT MQTT_DHT11_ENC28J60 MQTT_DHT11_ENC28J60_tushar MQTT_Hello_ENC28J60

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTClient.cpp Source File

MQTTClient.cpp

00001 /*
00002  MQTTClient.cpp - A simple client for MQTT.
00003   Nicholas O'Leary
00004   http://knolleary.net
00005 
00006   Ported to mbed by Zoltan Hudak <hudakz@outlook.com>
00007 */
00008 #include "MQTTClient.h"
00009 #include <string.h>
00010 #include <time.h>
00011 
00012 /**
00013  * @brief
00014  * @note
00015  * @param
00016  * @retval
00017  */
00018 MQTTClient::MQTTClient() :
00019     _client(NULL),
00020     _stream(NULL),
00021     _onMessage(NULL)
00022 { }
00023 
00024 /**
00025  * @brief
00026  * @note
00027  * @param
00028  * @retval
00029  */
00030 MQTTClient::MQTTClient(IpAddress& ip, uint16_t port) :
00031     _ip(ip),
00032     _domain(NULL),
00033     _port(port),
00034     _stream(NULL),
00035     _onMessage(NULL)
00036 { }
00037 
00038 /**
00039  * @brief
00040  * @note
00041  * @param
00042  * @retval
00043  */
00044 MQTTClient::MQTTClient(const char* domain, uint16_t port) :
00045     _domain((char*)domain),
00046     _port(port),
00047     _stream(NULL),
00048     _onMessage(NULL)
00049 { }
00050 
00051 /**
00052  * @brief
00053  * @note
00054  * @param
00055  * @retval
00056  */
00057 MQTTClient::MQTTClient(IpAddress& ip, uint16_t port, Stream& stream) :
00058     _ip(ip),
00059     _domain(NULL),
00060     _port(port),
00061     _stream(&stream),
00062     _onMessage(NULL)
00063 { }
00064 
00065 /**
00066  * @brief
00067  * @note
00068  * @param
00069  * @retval
00070  */
00071 MQTTClient::MQTTClient(const char* domain, uint16_t port, Stream& stream) :
00072     _domain((char*)domain),
00073     _port(port),
00074     _stream(&stream),
00075     _onMessage(NULL)
00076 { }
00077 
00078 /**
00079  * @brief
00080  * @note
00081  * @param
00082  * @retval
00083  */
00084 bool MQTTClient::connect(const char* id)
00085 {
00086     return connect(id, NULL, NULL, 0, 0, 0, 0);
00087 }
00088 
00089 /**
00090  * @brief
00091  * @note
00092  * @param
00093  * @retval
00094  */
00095 bool MQTTClient::connect(const char* id, const char* user, const char* pass)
00096 {
00097     return connect(id, user, pass, 0, 0, 0, 0);
00098 }
00099 
00100 /**
00101  * @brief
00102  * @note
00103  * @param
00104  * @retval
00105  */
00106 bool MQTTClient::connect
00107 (
00108     const char*     id,
00109     const char*     willTopic,
00110     uint8_t         willQos,
00111     uint8_t         willRetain,
00112     const char*     willMessage
00113 )
00114 {
00115     return connect(id, NULL, NULL, willTopic, willQos, willRetain, willMessage);
00116 }
00117 
00118 /**
00119  * @brief
00120  * @note
00121  * @param
00122  * @retval
00123  */
00124 bool MQTTClient::connect
00125 (
00126     const char*     id,
00127     const char*     user,
00128     const char*     pass,
00129     const char*     willTopic,
00130     uint8_t         willQos,
00131     uint8_t         willRetain,
00132     const char*     willMessage
00133 )
00134 {
00135     if (!connected()) {
00136         int result = 0;
00137 
00138         if (_domain != NULL) {
00139             result = !_client.connect(this->_domain, this->_port);
00140         }
00141         else {
00142             result = _client.connect(this->_ip, this->_port);
00143         }
00144 
00145         if (result) {
00146             _nextMsgId = 1;
00147 
00148             uint8_t     d[] = { 0x00, 0x06, 'M', 'Q', 'I', 's', 'd', 'p', MQTTPROTOCOLVERSION };
00149 
00150             // Leave room in the buffer for header and variable length field
00151             uint16_t    pos = 5;
00152             uint16_t    j;
00153             for (j = 0; j < sizeof(d); j++) {
00154                 _buffer[pos++] = d[j];
00155             }
00156 
00157             uint8_t v;
00158             if (willTopic) {
00159                 v = 0x06 | (willQos << 3) | (willRetain << 5);
00160             }
00161             else {
00162                 v = 0x02;
00163             }
00164 
00165             if (user != NULL) {
00166                 v = v | 0x80;
00167 
00168                 if (pass != NULL) {
00169                     v = v | (0x80 >> 1);
00170                 }
00171             }
00172 
00173             _buffer[pos++] = v;
00174 
00175             _buffer[pos++] = ((MQTT_KEEPALIVE) >> 8);
00176             _buffer[pos++] = ((MQTT_KEEPALIVE) & 0xFF);
00177             pos = _writeString(id, _buffer, pos);
00178             if (willTopic) {
00179                 pos = _writeString(willTopic, _buffer, pos);
00180                 pos = _writeString(willMessage, _buffer, pos);
00181             }
00182 
00183             if (user != NULL) {
00184                 pos = _writeString(user, _buffer, pos);
00185                 if (pass != NULL) {
00186                     pos = _writeString(pass, _buffer, pos);
00187                 }
00188             }
00189 
00190             _write(MQTTCONNECT, _buffer, pos - 5);
00191 
00192             _lastInActivity = _lastOutActivity = time(NULL);
00193 
00194             while (!_client.available()) {
00195                 unsigned long   t = time(NULL);
00196                 if (t - _lastInActivity > MQTT_KEEPALIVE) {
00197                     _client.stop();
00198                     return false;
00199                 }
00200             }
00201 
00202             uint8_t len;
00203 
00204             if (_readPacket(&len) == 4 && _buffer[3] == 0) {
00205                 _lastInActivity = time(NULL);
00206                 _pingOutstanding = false;
00207                 return true;
00208             }
00209         }
00210 
00211         _client.stop();
00212     }
00213 
00214     return false;
00215 }
00216 
00217 /**
00218  * @brief
00219  * @note
00220  * @param
00221  * @retval
00222  */
00223 uint8_t MQTTClient::_readByte()
00224 {
00225     while (!_client.available()) { }
00226 
00227     return _client.recv();
00228 }
00229 
00230 /**
00231  * @brief
00232  * @note
00233  * @param
00234  * @retval
00235  */
00236 uint16_t MQTTClient::_readPacket(uint8_t* length)
00237 {
00238     uint16_t    len = 0;
00239     _buffer[len++] = _readByte();
00240 
00241     bool        isPublish = (_buffer[0] & 0xF0) == MQTTPUBLISH;
00242     uint32_t    multiplier = 1;
00243     uint16_t    pos = 0;
00244     uint8_t     digit = 0;
00245     uint16_t    skip = 0;
00246     uint8_t     start = 0;
00247 
00248     do {
00249         digit = _readByte();
00250         _buffer[len++] = digit;
00251         pos += (digit & 127) * multiplier;
00252         multiplier *= 128;
00253     } while ((digit & 128) != 0);
00254     *length = len - 1;
00255 
00256     if (isPublish) {
00257         // Read in topic length to calculate bytes to skip over for Stream writing
00258         _buffer[len++] = _readByte();
00259         _buffer[len++] = _readByte();
00260         skip = (_buffer[*length + 1] << 8) + _buffer[*length + 2];
00261         start = 2;
00262         if (_buffer[0] & MQTTQOS1) {
00263             // skip message id
00264             skip += 2;
00265         }
00266     }
00267 
00268     for (uint16_t i = start; i < pos; i++) {
00269         digit = _readByte();
00270         if (this->_stream) {
00271             if (isPublish && len -*length - 2 > skip) {
00272                 this->_stream->putc(digit);
00273             }
00274         }
00275 
00276         if (len < MQTT_MAX_PACKET_SIZE) {
00277             _buffer[len] = digit;
00278         }
00279 
00280         len++;
00281     }
00282 
00283     if (!this->_stream && len > MQTT_MAX_PACKET_SIZE) {
00284         len = 0;    // This will cause the packet to be ignored.
00285     }
00286 
00287     return len;
00288 }
00289 
00290 /**
00291  * @brief
00292  * @note
00293  * @param
00294  * @retval
00295  */
00296 bool MQTTClient::poll()
00297 {
00298     if (connected()) {
00299         time_t  now = time(NULL);
00300         if ((now - _lastInActivity > MQTT_KEEPALIVE) || (now - _lastOutActivity > MQTT_KEEPALIVE)) {
00301             if (_pingOutstanding) {
00302                 _client.stop();
00303                 return false;
00304             }
00305             else {
00306                 _buffer[0] = MQTTPINGREQ;
00307                 _buffer[1] = 0;
00308                 _client.send(_buffer, 2);
00309                 _lastOutActivity = now;
00310                 _lastInActivity = now;
00311                 _pingOutstanding = true;
00312             }
00313         }
00314 
00315         if (_client.available()) {
00316             uint8_t     len;
00317             uint16_t    length = _readPacket(&len);
00318             uint16_t    msgId = 0;
00319             uint8_t*    payload;
00320             if (length > 0) {
00321                 _lastInActivity = now;
00322 
00323                 uint8_t type = _buffer[0] & 0xF0;
00324                 if (type == MQTTPUBLISH) {
00325                     if (_onMessage) {
00326                         uint16_t    topicLen = (_buffer[len + 1] << 8) + _buffer[len + 2];
00327                         char        topic[topicLen + 1];
00328                         for (uint16_t i = 0; i < topicLen; i++) {
00329                             topic[i] = _buffer[len + 3 + i];
00330                         }
00331 
00332                         topic[topicLen] = '\0';
00333 
00334                         // msgId only present for QOS>0
00335                         if ((_buffer[0] & 0x06) == MQTTQOS1) {
00336                             msgId = (_buffer[len + 3 + topicLen] << 8) + _buffer[len + 3 + topicLen + 1];
00337                             payload = _buffer + len + 3 + topicLen + 2;
00338                             _onMessage(topic, payload, length - len - 3 - topicLen - 2);
00339 
00340                             _buffer[0] = MQTTPUBACK;
00341                             _buffer[1] = 2;
00342                             _buffer[2] = (msgId >> 8);
00343                             _buffer[3] = (msgId & 0xFF);
00344                             _client.send(_buffer, 4);
00345                             _lastOutActivity = now;
00346                         }
00347                         else {
00348                             payload = _buffer + len + 3 + topicLen;
00349                             _onMessage(topic, payload, length - len - 3 - topicLen);
00350                         }
00351                     }
00352                 }
00353                 else
00354                 if (type == MQTTPINGREQ) {
00355                     _buffer[0] = MQTTPINGRESP;
00356                     _buffer[1] = 0;
00357                     _client.send(_buffer, 2);
00358                 }
00359                 else
00360                 if (type == MQTTPINGRESP) {
00361                     _pingOutstanding = false;
00362                 }
00363             }
00364         }
00365 
00366         return true;
00367     }
00368 
00369     return false;
00370 }
00371 
00372 /**
00373  * @brief
00374  * @note
00375  * @param
00376  * @retval
00377  */
00378 bool MQTTClient::publish(const char* topic, const char* payload)
00379 {
00380     return publish(topic, (uint8_t*)payload, strlen(payload), false);
00381 }
00382 
00383 /**
00384  * @brief
00385  * @note
00386  * @param
00387  * @retval
00388  */
00389 bool MQTTClient::publish(const char* topic, uint8_t* payload, uint16_t length)
00390 {
00391     return publish(topic, payload, length, false);
00392 }
00393 
00394 /**
00395  * @brief
00396  * @note
00397  * @param
00398  * @retval
00399  */
00400 bool MQTTClient::publish(const char* topic, uint8_t* payload, uint16_t plength, bool retained)
00401 {
00402     if (connected()) {
00403         // Leave room in the buffer for header and variable length field
00404         uint16_t    length = 5;
00405         length = _writeString(topic, _buffer, length);
00406 
00407         uint16_t    i;
00408         for (i = 0; i < plength; i++) {
00409             _buffer[length++] = payload[i];
00410         }
00411 
00412         uint8_t header = MQTTPUBLISH;
00413         if (retained) {
00414             header |= 1;
00415         }
00416 
00417         return _write(header, _buffer, length - 5);
00418     }
00419 
00420     return false;
00421 }
00422 
00423 /**
00424  * @brief
00425  * @note
00426  * @param
00427  * @retval
00428  */
00429 bool MQTTClient::_write(uint8_t header, uint8_t* buf, uint16_t length)
00430 {
00431     uint8_t digitBuf[4];
00432     uint8_t digitLen = 0;
00433     uint8_t digit;
00434     uint8_t pos = 0;
00435     uint8_t rc;
00436     uint8_t len = length;
00437     do {
00438         digit = len % 128;
00439         len = len / 128;
00440         if (len > 0) {
00441             digit |= 0x80;
00442         }
00443 
00444         digitBuf[pos++] = digit;
00445         digitLen++;
00446     } while (len > 0);
00447 
00448     buf[4 - digitLen] = header;
00449     for (int i = 0; i < digitLen; i++) {
00450         buf[5 - digitLen + i] = digitBuf[i];
00451     }
00452 
00453     rc = _client.send(buf + (4 - digitLen), length + 1 + digitLen);
00454 
00455     _lastOutActivity = time(NULL);
00456     return(rc == 1 + digitLen + length);
00457 }
00458 
00459 /**
00460  * @brief
00461  * @note
00462  * @param
00463  * @retval
00464  */
00465 bool MQTTClient::subscribe(const char* topic)
00466 {
00467     bool    result = subscribe(topic, 0);
00468 #if MBED_MAJOR_VERSION == 2
00469     wait_ms(50);
00470 #else
00471     thread_sleep_for(50);
00472 #endif
00473     
00474     return result;
00475 }
00476 
00477 /**
00478  * @brief
00479  * @note
00480  * @param
00481  * @retval
00482  */
00483 bool MQTTClient::subscribe(const char* topic, uint8_t qos)
00484 {
00485     if (qos > 1)
00486         return false;
00487 
00488     if (connected()) {
00489         // Leave room in the buffer for header and variable length field
00490         uint16_t    length = 5;
00491         _nextMsgId++;
00492         if (_nextMsgId == 0) {
00493             _nextMsgId = 1;
00494         }
00495 
00496         _buffer[length++] = (_nextMsgId >> 8);
00497         _buffer[length++] = (_nextMsgId & 0xFF);
00498         length = _writeString(topic, _buffer, length);
00499         _buffer[length++] = qos;
00500         return _write(MQTTSUBSCRIBE | MQTTQOS1, _buffer, length - 5);
00501     }
00502 
00503     return false;
00504 }
00505 
00506 /**
00507  * @brief
00508  * @note
00509  * @param
00510  * @retval
00511  */
00512 bool MQTTClient::unsubscribe(const char* topic)
00513 {
00514     if (connected()) {
00515         uint16_t    length = 5;
00516         _nextMsgId++;
00517         if (_nextMsgId == 0) {
00518             _nextMsgId = 1;
00519         }
00520 
00521         _buffer[length++] = (_nextMsgId >> 8);
00522         _buffer[length++] = (_nextMsgId & 0xFF);
00523         length = _writeString(topic, _buffer, length);
00524         return _write(MQTTUNSUBSCRIBE | MQTTQOS1, _buffer, length - 5);
00525     }
00526 
00527     return false;
00528 }
00529 
00530 /**
00531  * @brief
00532  * @note
00533  * @param
00534  * @retval
00535  */
00536 void MQTTClient::disconnect()
00537 {
00538     _buffer[0] = MQTTDISCONNECT;
00539     _buffer[1] = 0;
00540     _client.send(_buffer, 2);
00541     _client.stop();
00542     _lastInActivity = _lastOutActivity = time(NULL);
00543 }
00544 
00545 /**
00546  * @brief
00547  * @note
00548  * @param
00549  * @retval
00550  */
00551 uint16_t MQTTClient::_writeString(const char* string, uint8_t* buf, uint16_t length)
00552 {
00553     char*       idp = (char*)string;
00554     uint16_t    i = 0;
00555 
00556     length += 2;
00557     while (*idp) {
00558         buf[length++] = *idp++;
00559         i++;
00560     }
00561 
00562     buf[length - i - 2] = (i >> 8);
00563     buf[length - i - 1] = (i & 0xFF);
00564     return length;
00565 }
00566 
00567 /**
00568  * @brief
00569  * @note
00570  * @param
00571  * @retval
00572  */
00573 bool MQTTClient::connected()
00574 {
00575     bool    rc = (int)_client.connected();
00576 
00577     if (!rc)
00578         _client.stop();
00579 
00580     return rc;
00581 }
00582 
00583 /**
00584  * @brief
00585  * @note
00586  * @param
00587  * @retval
00588  */
00589 void MQTTClient::attach(Callback<void (char *, uint8_t *, uint16_t)> fnc)
00590 {
00591     _onMessage = fnc;
00592 }