Ivo Noorhoff / MQTTClient
Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTClient.cpp Source File

MQTTClient.cpp

00001 /*
00002  MQTTClient.cpp - A simple client for MQTT.
00003   Nicholas O'Leary
00004   http://knolleary.net
00005 
00006   Ported to mbed by Zoltan Hudak <hudakz@outlook.com>
00007 */
00008 #include "MQTTClient.h"
00009 #include <string.h>
00010 #include <time.h>
00011 
00012 #define UIPETHERNET_DEBUG 1
00013 
00014 /**
00015  * @brief
00016  * @note
00017  * @param
00018  * @retval
00019  */
00020 MQTTClient::MQTTClient() :
00021     _client(NULL),
00022     _stream(NULL),
00023     _onMessage(NULL)
00024 { }
00025 
00026 /**
00027  * @brief
00028  * @note
00029  * @param
00030  * @retval
00031  */
00032 MQTTClient::MQTTClient(IpAddress& ip, uint16_t port) :
00033     _ip(ip),
00034     _domain(NULL),
00035     _port(port),
00036     _stream(NULL),
00037     _onMessage(NULL)
00038 { }
00039 
00040 /**
00041  * @brief
00042  * @note
00043  * @param
00044  * @retval
00045  */
00046 MQTTClient::MQTTClient(const char* domain, uint16_t port) :
00047     _domain((char*)domain),
00048     _port(port),
00049     _stream(NULL),
00050     _onMessage(NULL)
00051 { }
00052 
00053 /**
00054  * @brief
00055  * @note
00056  * @param
00057  * @retval
00058  */
00059 MQTTClient::MQTTClient(IpAddress& ip, uint16_t port, Stream& stream) :
00060     _ip(ip),
00061     _domain(NULL),
00062     _port(port),
00063     _stream(&stream),
00064     _onMessage(NULL)
00065 { }
00066 
00067 /**
00068  * @brief
00069  * @note
00070  * @param
00071  * @retval
00072  */
00073 MQTTClient::MQTTClient(const char* domain, uint16_t port, Stream& stream) :
00074     _domain((char*)domain),
00075     _port(port),
00076     _stream(&stream),
00077     _onMessage(NULL)
00078 { }
00079 
00080 /**
00081  * @brief
00082  * @note
00083  * @param
00084  * @retval
00085  */
00086 bool MQTTClient::connect(const char* id)
00087 {
00088     return connect(id, NULL, NULL, 0, 0, 0, 0);
00089 }
00090 
00091 /**
00092  * @brief
00093  * @note
00094  * @param
00095  * @retval
00096  */
00097 bool MQTTClient::connect(const char* id, const char* user, const char* pass)
00098 {
00099     return connect(id, user, pass, 0, 0, 0, 0);
00100 }
00101 
00102 /**
00103  * @brief
00104  * @note
00105  * @param
00106  * @retval
00107  */
00108 bool MQTTClient::connect
00109 (
00110     const char*     id,
00111     const char*     willTopic,
00112     uint8_t         willQos,
00113     uint8_t         willRetain,
00114     const char*     willMessage
00115 )
00116 {
00117     return connect(id, NULL, NULL, willTopic, willQos, willRetain, willMessage);
00118 }
00119 
00120 /**
00121  * @brief
00122  * @note
00123  * @param
00124  * @retval
00125  */
00126 bool MQTTClient::connect
00127 (
00128     const char*     id,
00129     const char*     user,
00130     const char*     pass,
00131     const char*     willTopic,
00132     uint8_t         willQos,
00133     uint8_t         willRetain,
00134     const char*     willMessage
00135 )
00136 {
00137     if (!connected()) {
00138 #ifdef UIPETHERNET_DEBUG
00139         printf("MQTTClient::connect \r\n");
00140 #endif        
00141         int result = 0;
00142 
00143         if (_domain != NULL) {
00144             result = !_client.connect(this->_domain, this->_port);
00145         }
00146         else {
00147             result = _client.connect(this->_ip, this->_port);
00148         }
00149 
00150         if (result) {
00151             _nextMsgId = 1;
00152 
00153             uint8_t     d[] = { 0x00, 0x06, 'M', 'Q', 'I', 's', 'd', 'p', MQTTPROTOCOLVERSION };
00154 
00155             // Leave room in the buffer for header and variable length field
00156             uint16_t    pos = 5;
00157             uint16_t    j;
00158             for (j = 0; j < sizeof(d); j++) {
00159                 _buffer[pos++] = d[j];
00160             }
00161 
00162             uint8_t v;
00163             if (willTopic) {
00164                 v = 0x06 | (willQos << 3) | (willRetain << 5);
00165             }
00166             else {
00167                 v = 0x02;
00168             }
00169 
00170             if (user != NULL) {
00171                 v = v | 0x80;
00172 
00173                 if (pass != NULL) {
00174                     v = v | (0x80 >> 1);
00175                 }
00176             }
00177 
00178             _buffer[pos++] = v;
00179 
00180             _buffer[pos++] = ((MQTT_KEEPALIVE) >> 8);
00181             _buffer[pos++] = ((MQTT_KEEPALIVE) & 0xFF);
00182             pos = _writeString(id, _buffer, pos);
00183             if (willTopic) {
00184                 pos = _writeString(willTopic, _buffer, pos);
00185                 pos = _writeString(willMessage, _buffer, pos);
00186             }
00187 
00188             if (user != NULL) {
00189                 pos = _writeString(user, _buffer, pos);
00190                 if (pass != NULL) {
00191                     pos = _writeString(pass, _buffer, pos);
00192                 }
00193             }
00194 
00195             _write(MQTTCONNECT, _buffer, pos - 5);
00196 
00197             _lastInActivity = _lastOutActivity = time(NULL);
00198 
00199             while (!_client.available()) {
00200                 unsigned long   t = time(NULL);
00201                 if (t - _lastInActivity > MQTT_KEEPALIVE) {
00202                     _client.stop();
00203 #ifdef UIPETHERNET_DEBUG
00204         printf("MQTTClient::connect available \r\n");
00205 #endif                            
00206                     return false;
00207                 }
00208             }
00209 
00210             uint8_t len;
00211 
00212             if (_readPacket(&len) == 4 && _buffer[3] == 0) {
00213                 _lastInActivity = time(NULL);
00214                 _pingOutstanding = false;
00215 #ifdef UIPETHERNET_DEBUG
00216         printf("MQTTClient::connect 4 \r\n");
00217 #endif                        
00218                 return true;
00219             }
00220         }
00221 
00222         _client.stop();
00223     }
00224 #ifdef UIPETHERNET_DEBUG
00225         printf("MQTTClient::connect End \r\n");
00226 #endif        
00227 
00228     return false;
00229 }
00230 
00231 /**
00232  * @brief
00233  * @note
00234  * @param
00235  * @retval
00236  */
00237 uint8_t MQTTClient::_readByte()
00238 {
00239     while (!_client.available()) { }
00240 
00241     return _client.recv();
00242 }
00243 
00244 /**
00245  * @brief
00246  * @note
00247  * @param
00248  * @retval
00249  */
00250 uint16_t MQTTClient::_readPacket(uint8_t* length)
00251 {
00252 #ifdef UIPETHERNET_DEBUG
00253     printf("MQTTClient::_readPacket \r\n");
00254 #endif        
00255 
00256     
00257     uint16_t    len = 0;
00258     _buffer[len++] = _readByte();
00259 
00260     bool        isPublish = (_buffer[0] & 0xF0) == MQTTPUBLISH;
00261     uint32_t    multiplier = 1;
00262     uint16_t    pos = 0;
00263     uint8_t     digit = 0;
00264     uint16_t    skip = 0;
00265     uint8_t     start = 0;
00266 
00267     do {
00268         digit = _readByte();
00269         _buffer[len++] = digit;
00270         pos += (digit & 127) * multiplier;
00271         multiplier *= 128;
00272     } while ((digit & 128) != 0);
00273     *length = len - 1;
00274 
00275     if (isPublish) {
00276         // Read in topic length to calculate bytes to skip over for Stream writing
00277         _buffer[len++] = _readByte();
00278         _buffer[len++] = _readByte();
00279         skip = (_buffer[*length + 1] << 8) + _buffer[*length + 2];
00280         start = 2;
00281         if (_buffer[0] & MQTTQOS1) {
00282             // skip message id
00283             skip += 2;
00284         }
00285     }
00286 
00287     for (uint16_t i = start; i < pos; i++) {
00288         digit = _readByte();
00289         if (this->_stream) {
00290             if (isPublish && len -*length - 2 > skip) {
00291                 this->_stream->putc(digit);
00292             }
00293         }
00294 
00295         if (len < MQTT_MAX_PACKET_SIZE) {
00296             _buffer[len] = digit;
00297         }
00298 
00299         len++;
00300     }
00301 
00302     if (!this->_stream && len > MQTT_MAX_PACKET_SIZE) {
00303         len = 0;    // This will cause the packet to be ignored.
00304     }
00305 
00306 #ifdef UIPETHERNET_DEBUG
00307     printf("MQTTClient::_readPacket End \r\n");
00308 #endif        
00309 
00310     return len;
00311 }
00312 
00313 /**
00314  * @brief
00315  * @note
00316  * @param
00317  * @retval
00318  */
00319 bool MQTTClient::poll()
00320 {
00321     
00322 #ifdef UIPETHERNET_DEBUG
00323 //    printf("bool MQTTClient::poll() \r\n");
00324 #endif        
00325         
00326     if (connected()) {
00327         time_t  now = time(NULL);
00328         if ((now - _lastInActivity > MQTT_KEEPALIVE) || (now - _lastOutActivity > MQTT_KEEPALIVE)) {
00329             if (_pingOutstanding) {
00330                 _client.stop();
00331                 return false;
00332             }
00333             else {
00334                 _buffer[0] = MQTTPINGREQ;
00335                 _buffer[1] = 0;
00336                 _client.send(_buffer, 2);
00337                 _lastOutActivity = now;
00338                 _lastInActivity = now;
00339                 _pingOutstanding = true;
00340             }
00341         }
00342 
00343         if (_client.available()) {
00344             uint8_t     len;
00345             uint16_t    length = _readPacket(&len);
00346             uint16_t    msgId = 0;
00347             uint8_t*    payload;
00348             if (length > 0) {
00349                 _lastInActivity = now;
00350 
00351                 uint8_t type = _buffer[0] & 0xF0;
00352                 if (type == MQTTPUBLISH) {
00353                     if (_onMessage) {
00354                         uint16_t    topicLen = (_buffer[len + 1] << 8) + _buffer[len + 2];
00355                         char        topic[topicLen + 1];
00356                         for (uint16_t i = 0; i < topicLen; i++) {
00357                             topic[i] = _buffer[len + 3 + i];
00358                         }
00359 
00360                         topic[topicLen] = '\0';
00361 
00362                         // msgId only present for QOS>0
00363                         if ((_buffer[0] & 0x06) == MQTTQOS1) {
00364                             msgId = (_buffer[len + 3 + topicLen] << 8) + _buffer[len + 3 + topicLen + 1];
00365                             payload = _buffer + len + 3 + topicLen + 2;
00366                             _onMessage(topic, payload, length - len - 3 - topicLen - 2);
00367 
00368                             _buffer[0] = MQTTPUBACK;
00369                             _buffer[1] = 2;
00370                             _buffer[2] = (msgId >> 8);
00371                             _buffer[3] = (msgId & 0xFF);
00372                             _client.send(_buffer, 4);
00373                             _lastOutActivity = now;
00374                         }
00375                         else {
00376                             payload = _buffer + len + 3 + topicLen;
00377                             _onMessage(topic, payload, length - len - 3 - topicLen);
00378                         }
00379                     }
00380                 }
00381                 else
00382                 if (type == MQTTPINGREQ) {
00383                     _buffer[0] = MQTTPINGRESP;
00384                     _buffer[1] = 0;
00385                     _client.send(_buffer, 2);
00386                 }
00387                 else
00388                 if (type == MQTTPINGRESP) {
00389                     _pingOutstanding = false;
00390                 }
00391             }
00392         }
00393 
00394         return true;
00395     }
00396 
00397 #ifdef UIPETHERNET_DEBUG
00398     printf("bool MQTTClient::poll() End \r\n");
00399 #endif        
00400 
00401     return false;
00402 }
00403 
00404 /**
00405  * @brief
00406  * @note
00407  * @param
00408  * @retval
00409  */
00410 bool MQTTClient::publish(const char* topic, const char* payload)
00411 {
00412     return publish(topic, (uint8_t*)payload, strlen(payload), false);
00413 }
00414 
00415 /**
00416  * @brief
00417  * @note
00418  * @param
00419  * @retval
00420  */
00421 bool MQTTClient::publish(const char* topic, uint8_t* payload, uint16_t length)
00422 {
00423     return publish(topic, payload, length, false);
00424 }
00425 
00426 /**
00427  * @brief
00428  * @note
00429  * @param
00430  * @retval
00431  */
00432 bool MQTTClient::publish(const char* topic, uint8_t* payload, uint16_t plength, bool retained)
00433 {
00434 
00435 #ifdef UIPETHERNET_DEBUG
00436     printf("MQTTClient::publish \r\n");
00437 #endif        
00438         
00439     if (connected()) {
00440         // Leave room in the buffer for header and variable length field
00441         uint16_t    length = 5;
00442         length = _writeString(topic, _buffer, length);
00443 
00444         uint16_t    i;
00445         for (i = 0; i < plength; i++) {
00446             _buffer[length++] = payload[i];
00447         }
00448 
00449         uint8_t header = MQTTPUBLISH;
00450         if (retained) {
00451             header |= 1;
00452         }
00453 
00454         return _write(header, _buffer, length - 5);
00455     }
00456 
00457 #ifdef UIPETHERNET_DEBUG
00458     printf("MQTTClient::publish End \r\n");
00459 #endif        
00460 
00461     return false;
00462 }
00463 
00464 /**
00465  * @brief
00466  * @note
00467  * @param
00468  * @retval
00469  */
00470 bool MQTTClient::_write(uint8_t header, uint8_t* buf, uint16_t length)
00471 {
00472     uint8_t digitBuf[4];
00473     uint8_t digitLen = 0;
00474     uint8_t digit;
00475     uint8_t pos = 0;
00476     uint8_t rc;
00477     uint8_t len = length;
00478     do {
00479         digit = len % 128;
00480         len = len / 128;
00481         if (len > 0) {
00482             digit |= 0x80;
00483         }
00484 
00485         digitBuf[pos++] = digit;
00486         digitLen++;
00487     } while (len > 0);
00488 
00489     buf[4 - digitLen] = header;
00490     for (int i = 0; i < digitLen; i++) {
00491         buf[5 - digitLen + i] = digitBuf[i];
00492     }
00493 
00494     rc = _client.send(buf + (4 - digitLen), length + 1 + digitLen);
00495 
00496     _lastOutActivity = time(NULL);
00497     return(rc == 1 + digitLen + length);
00498 }
00499 
00500 /**
00501  * @brief
00502  * @note
00503  * @param
00504  * @retval
00505  */
00506 bool MQTTClient::subscribe(const char* topic)
00507 {
00508     bool    result = subscribe(topic, 0);
00509 #if MBED_MAJOR_VERSION == 2
00510     wait_ms(50);
00511 #else
00512     thread_sleep_for(50);
00513 #endif
00514     
00515     return result;
00516 }
00517 
00518 /**
00519  * @brief
00520  * @note
00521  * @param
00522  * @retval
00523  */
00524 bool MQTTClient::subscribe(const char* topic, uint8_t qos)
00525 {
00526 #ifdef UIPETHERNET_DEBUG
00527     printf("MQTTClient::subscribe \r\n");
00528 #endif        
00529     
00530     if (qos > 1)
00531         return false;
00532 
00533     if (connected()) {
00534         // Leave room in the buffer for header and variable length field
00535         uint16_t    length = 5;
00536         _nextMsgId++;
00537         if (_nextMsgId == 0) {
00538             _nextMsgId = 1;
00539         }
00540 
00541         _buffer[length++] = (_nextMsgId >> 8);
00542         _buffer[length++] = (_nextMsgId & 0xFF);
00543         length = _writeString(topic, _buffer, length);
00544         _buffer[length++] = qos;
00545         return _write(MQTTSUBSCRIBE | MQTTQOS1, _buffer, length - 5);
00546     }
00547 
00548 #ifdef UIPETHERNET_DEBUG
00549     printf("MQTTClient::subscribe End \r\n");
00550 #endif        
00551 
00552     return false;
00553 }
00554 
00555 /**
00556  * @brief
00557  * @note
00558  * @param
00559  * @retval
00560  */
00561 bool MQTTClient::unsubscribe(const char* topic)
00562 {
00563 #ifdef UIPETHERNET_DEBUG
00564     printf("MQTTClient::unsubscribe \r\n");
00565 #endif        
00566     
00567     if (connected()) {
00568         uint16_t    length = 5;
00569         _nextMsgId++;
00570         if (_nextMsgId == 0) {
00571             _nextMsgId = 1;
00572         }
00573 
00574         _buffer[length++] = (_nextMsgId >> 8);
00575         _buffer[length++] = (_nextMsgId & 0xFF);
00576         length = _writeString(topic, _buffer, length);
00577         return _write(MQTTUNSUBSCRIBE | MQTTQOS1, _buffer, length - 5);
00578     }
00579 
00580 #ifdef UIPETHERNET_DEBUG
00581     printf("MQTTClient::unsubscribe End \r\n");
00582 #endif        
00583 
00584     return false;
00585 }
00586 
00587 /**
00588  * @brief
00589  * @note
00590  * @param
00591  * @retval
00592  */
00593 void MQTTClient::disconnect()
00594 {
00595 #ifdef UIPETHERNET_DEBUG
00596     printf("MQTTClient::disconnect() \r\n");
00597 #endif        
00598         
00599     _buffer[0] = MQTTDISCONNECT;
00600     _buffer[1] = 0;
00601     _client.send(_buffer, 2);
00602     _client.close();
00603     _client.stop();
00604     _lastInActivity = _lastOutActivity = time(NULL);
00605 
00606 #ifdef UIPETHERNET_DEBUG
00607     printf("MQTTClient::disconnect() End \r\n");
00608 #endif        
00609         
00610 }
00611 
00612 /**
00613  * @brief
00614  * @note
00615  * @param
00616  * @retval
00617  */
00618 uint16_t MQTTClient::_writeString(const char* string, uint8_t* buf, uint16_t length)
00619 {
00620     char*       idp = (char*)string;
00621     uint16_t    i = 0;
00622 
00623     length += 2;
00624     while (*idp) {
00625         buf[length++] = *idp++;
00626         i++;
00627     }
00628 
00629     buf[length - i - 2] = (i >> 8);
00630     buf[length - i - 1] = (i & 0xFF);
00631     return length;
00632 }
00633 
00634 /**
00635  * @brief
00636  * @note
00637  * @param
00638  * @retval
00639  */
00640 bool MQTTClient::connected()
00641 {
00642     bool    rc = (int)_client.connected();
00643 
00644     if (!rc)
00645         _client.stop();
00646 
00647     return rc;
00648 }
00649 
00650 /**
00651  * @brief
00652  * @note
00653  * @param
00654  * @retval
00655  */
00656 void MQTTClient::attach(Callback<void (char *, uint8_t *, uint16_t)> fnc)
00657 {
00658     _onMessage = fnc;
00659 }