MQTT Client for ENC28J60 Ethernet modules. Depends on the UIPEthernet library.

Dependents:   mBuino_ENC28_MQTT MQTT_DHT11_ENC28J60 MQTT_DHT11_ENC28J60_tushar MQTT_Hello_ENC28J60

MQTT library for ENC28J60 Ethernet modules.

/media/uploads/hudakz/enc28j60_module01.jpg

Depends on the UIPEthernet library.

Example program:

Import programMQTT_Hello_ENC28J60

MQTT Client example program. Ethernet connection is via an ENC28J60 module.

Revision:
4:8620de6d1696
Parent:
3:5e31b4687aad
Child:
5:361a6987739b
--- a/MQTTClient.cpp	Tue Sep 03 13:41:16 2019 +0000
+++ b/MQTTClient.cpp	Thu Sep 12 20:26:40 2019 +0000
@@ -15,7 +15,7 @@
  * @param
  * @retval
  */
-MQTTClient::MQTTClient(void) :
+MQTTClient::MQTTClient() :
     _client(NULL),
     _stream(NULL)
 { }
@@ -30,10 +30,8 @@
 (
     IpAddress&  ip,
     uint16_t    port,
-    void        (*onMessage) (char*, uint8_t*, unsigned int),
-    TcpClient&  client
+    Callback<void (char*, uint8_t*, uint16_t)> onMessage
 ) :
-    _client(&client),
     _ip(ip),
     _domain(NULL),
     _port(port),
@@ -49,12 +47,10 @@
  */
 MQTTClient::MQTTClient
 (
-    const char* domain,
+    const char* domain, 
     uint16_t    port,
-    void        (*onMessage) (char*, uint8_t*, unsigned int),
-    TcpClient&  client
+    Callback<void (char*, uint8_t*, uint16_t)> onMessage
 ) :
-    _client(&client),
     _domain((char*)domain),
     _port(port),
     _stream(NULL),
@@ -71,11 +67,9 @@
 (
     IpAddress&  ip,
     uint16_t    port,
-    void        (*onMessage) (char*, uint8_t*, unsigned int),
-    TcpClient&  client,
+    Callback<void (char*, uint8_t*, uint16_t)> onMessage,
     Stream&     stream
 ) :
-    _client(&client),
     _ip(ip),
     _domain(NULL),
     _port(port),
@@ -93,11 +87,9 @@
 (
     const char* domain,
     uint16_t    port,
-    void        (*onMessage) (char*, uint8_t*, unsigned int),
-    TcpClient&  client,
+    Callback<void (char*, uint8_t*, uint16_t)> onMessage,
     Stream&     stream
 ) :
-    _client(&client),
     _domain((char*)domain),
     _port(port),
     _stream(&stream),
@@ -132,7 +124,14 @@
  * @param
  * @retval
  */
-bool MQTTClient::connect(const char* id, const char* willTopic, uint8_t willQos, uint8_t willRetain, const char* willMessage)
+bool MQTTClient::connect
+(
+    const char*     id,
+    const char*     willTopic,
+    uint8_t         willQos,
+    uint8_t         willRetain,
+    const char*     willMessage
+)
 {
     return connect(id, NULL, NULL, willTopic, willQos, willRetain, willMessage);
 }
@@ -145,35 +144,35 @@
  */
 bool MQTTClient::connect
 (
-    const char*   id,
-    const char*   user,
-    const char*   pass,
-    const char*   willTopic,
-    uint8_t       willQos,
-    uint8_t       willRetain,
-    const char*   willMessage
+    const char*     id,
+    const char*     user,
+    const char*     pass,
+    const char*     willTopic,
+    uint8_t         willQos,
+    uint8_t         willRetain,
+    const char*     willMessage
 )
 {
     if (!connected()) {
         int result = 0;
 
         if (_domain != NULL) {
-            result = !_client->connect(this->_domain, this->_port);
+            result = !_client.connect(this->_domain, this->_port);
         }
         else {
-            result = _client->connect(this->_ip, this->_port);
+            result = _client.connect(this->_ip, this->_port);
         }
 
         if (result) {
             _nextMsgId = 1;
 
-            uint8_t         d[9] = { 0x00, 0x06, 'M', 'Q', 'I', 's', 'd', 'p', MQTTPROTOCOLVERSION };
+            uint8_t  d[] = { 0x00, 0x06, 'M', 'Q', 'I', 's', 'd', 'p', MQTTPROTOCOLVERSION };
 
             // Leave room in the buffer for header and variable length field
-            uint16_t        length = 5;
-            unsigned int    j;
-            for (j = 0; j < 9; j++) {
-                _buffer[length++] = d[j];
+            uint16_t    pos = 5;
+            uint16_t    j;
+            for (j = 0; j < sizeof(d); j++) {
+                _buffer[pos++] = d[j];
             }
 
             uint8_t v;
@@ -192,46 +191,45 @@
                 }
             }
 
-            _buffer[length++] = v;
+            _buffer[pos++] = v;
 
-            _buffer[length++] = ((MQTT_KEEPALIVE) >> 8);
-            _buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF);
-            length = _writeString(id, _buffer, length);
+            _buffer[pos++] = ((MQTT_KEEPALIVE) >> 8);
+            _buffer[pos++] = ((MQTT_KEEPALIVE) & 0xFF);
+            pos = _writeString(id, _buffer, pos);
             if (willTopic) {
-                length = _writeString(willTopic, _buffer, length);
-                length = _writeString(willMessage, _buffer, length);
+                pos = _writeString(willTopic, _buffer, pos);
+                pos = _writeString(willMessage, _buffer, pos);
             }
 
             if (user != NULL) {
-                length = _writeString(user, _buffer, length);
+                pos = _writeString(user, _buffer, pos);
                 if (pass != NULL) {
-                    length = _writeString(pass, _buffer, length);
+                    pos = _writeString(pass, _buffer, pos);
                 }
             }
 
-            _write(MQTTCONNECT, _buffer, length - 5);
+            _write(MQTTCONNECT, _buffer, pos - 5);
 
             _lastInActivity = _lastOutActivity = time(NULL);
 
-            while (!_client->available()) {
+            while (!_client.available()) {
                 unsigned long   t = time(NULL);
                 if (t - _lastInActivity > MQTT_KEEPALIVE) {
-                    _client->stop();
+                    _client.stop();
                     return false;
                 }
             }
 
-            uint8_t     llen;
-            uint16_t    len = _readPacket(&llen);
+            uint8_t len;
 
-            if (len == 4 && _buffer[3] == 0) {
+            if (_readPacket(&len) == 4 && _buffer[3] == 0) {
                 _lastInActivity = time(NULL);
                 _pingOutstanding = false;
                 return true;
             }
         }
 
-        _client->stop();
+        _client.stop();
     }
 
     return false;
@@ -243,11 +241,11 @@
  * @param
  * @retval
  */
-uint8_t MQTTClient::_readByte(void)
+uint8_t MQTTClient::_readByte()
 {
-    while (!_client->available()) { }
+    while (!_client.available()) { }
 
-    return _client->recv();
+    return _client.recv();
 }
 
 /**
@@ -256,14 +254,14 @@
  * @param
  * @retval
  */
-uint16_t MQTTClient::_readPacket(uint8_t* lengthLength)
+uint16_t MQTTClient::_readPacket(uint8_t* length)
 {
     uint16_t    len = 0;
     _buffer[len++] = _readByte();
 
     bool        isPublish = (_buffer[0] & 0xF0) == MQTTPUBLISH;
     uint32_t    multiplier = 1;
-    uint16_t    length = 0;
+    uint16_t    pos = 0;
     uint8_t     digit = 0;
     uint16_t    skip = 0;
     uint8_t     start = 0;
@@ -271,16 +269,16 @@
     do {
         digit = _readByte();
         _buffer[len++] = digit;
-        length += (digit & 127) * multiplier;
+        pos += (digit & 127) * multiplier;
         multiplier *= 128;
     } while ((digit & 128) != 0);
-    *lengthLength = len - 1;
+    *length = len - 1;
 
     if (isPublish) {
         // Read in topic length to calculate bytes to skip over for Stream writing
         _buffer[len++] = _readByte();
         _buffer[len++] = _readByte();
-        skip = (_buffer[*lengthLength + 1] << 8) + _buffer[*lengthLength + 2];
+        skip = (_buffer[*length + 1] << 8) + _buffer[*length + 2];
         start = 2;
         if (_buffer[0] & MQTTQOS1) {
             // skip message id
@@ -288,10 +286,10 @@
         }
     }
 
-    for (uint16_t i = start; i < length; i++) {
+    for (uint16_t i = start; i < pos; i++) {
         digit = _readByte();
         if (this->_stream) {
-            if (isPublish && len -*lengthLength - 2 > skip) {
+            if (isPublish && len -*length - 2 > skip) {
                 this->_stream->putc(digit);
             }
         }
@@ -316,61 +314,61 @@
  * @param
  * @retval
  */
-bool MQTTClient::loop(void)
+bool MQTTClient::process()
 {
     if (connected()) {
-        unsigned long   t = time(NULL);
-        if ((t - _lastInActivity > MQTT_KEEPALIVE) || (t - _lastOutActivity > MQTT_KEEPALIVE)) {
+        time_t  now = time(NULL);
+        if ((now - _lastInActivity > MQTT_KEEPALIVE) || (now - _lastOutActivity > MQTT_KEEPALIVE)) {
             if (_pingOutstanding) {
-                _client->stop();
+                _client.stop();
                 return false;
             }
             else {
                 _buffer[0] = MQTTPINGREQ;
                 _buffer[1] = 0;
-                _client->send(_buffer, 2);
-                _lastOutActivity = t;
-                _lastInActivity = t;
+                _client.send(_buffer, 2);
+                _lastOutActivity = now;
+                _lastInActivity = now;
                 _pingOutstanding = true;
             }
         }
 
-        if (_client->available()) {
-            uint8_t     llen;
-            uint16_t    len = _readPacket(&llen);
+        if (_client.available()) {
+            uint8_t     len;
+            uint16_t    length = _readPacket(&len);
             uint16_t    msgId = 0;
             uint8_t*    payload;
-            if (len > 0) {
-                _lastInActivity = t;
+            if (length > 0) {
+                _lastInActivity = now;
 
                 //printf("Here I am\r\n");
                 uint8_t type = _buffer[0] & 0xF0;
                 if (type == MQTTPUBLISH) {
                     if (_onMessage) {
-                        uint16_t    tl = (_buffer[llen + 1] << 8) + _buffer[llen + 2];
-                        char        topic[tl + 1];
-                        for (uint16_t i = 0; i < tl; i++) {
-                            topic[i] = _buffer[llen + 3 + i];
+                        uint16_t    topicLen = (_buffer[len + 1] << 8) + _buffer[len + 2];
+                        char        topic[topicLen + 1];
+                        for (uint16_t i = 0; i < topicLen; i++) {
+                            topic[i] = _buffer[len + 3 + i];
                         }
 
-                        topic[tl] = 0;
+                        topic[topicLen] = '\0';
 
                         // msgId only present for QOS>0
                         if ((_buffer[0] & 0x06) == MQTTQOS1) {
-                            msgId = (_buffer[llen + 3 + tl] << 8) + _buffer[llen + 3 + tl + 1];
-                            payload = _buffer + llen + 3 + tl + 2;
-                            _onMessage(topic, payload, len - llen - 3 - tl - 2);
+                            msgId = (_buffer[len + 3 + topicLen] << 8) + _buffer[len + 3 + topicLen + 1];
+                            payload = _buffer + len + 3 + topicLen + 2;
+                            _onMessage(topic, payload, length - len - 3 - topicLen - 2);
 
                             _buffer[0] = MQTTPUBACK;
                             _buffer[1] = 2;
                             _buffer[2] = (msgId >> 8);
                             _buffer[3] = (msgId & 0xFF);
-                            _client->send(_buffer, 4);
-                            _lastOutActivity = t;
+                            _client.send(_buffer, 4);
+                            _lastOutActivity = now;
                         }
                         else {
-                            payload = _buffer + llen + 3 + tl;
-                            _onMessage(topic, payload, len - llen - 3 - tl);
+                            payload = _buffer + len + 3 + topicLen;
+                            _onMessage(topic, payload, length - len - 3 - topicLen);
                         }
                     }
                 }
@@ -378,7 +376,7 @@
                 if (type == MQTTPINGREQ) {
                     _buffer[0] = MQTTPINGRESP;
                     _buffer[1] = 0;
-                    _client->send(_buffer, 2);
+                    _client.send(_buffer, 2);
                 }
                 else
                 if (type == MQTTPINGRESP) {
@@ -410,9 +408,9 @@
  * @param
  * @retval
  */
-bool MQTTClient::publish(const char* topic, uint8_t* payload, unsigned int plength)
+bool MQTTClient::publish(const char* topic, uint8_t* payload, uint16_t length)
 {
-    return publish(topic, payload, plength, false);
+    return publish(topic, payload, length, false);
 }
 
 /**
@@ -421,7 +419,7 @@
  * @param
  * @retval
  */
-bool MQTTClient::publish(const char* topic, uint8_t* payload, unsigned int plength, bool retained)
+bool MQTTClient::publish(const char* topic, uint8_t* payload, uint16_t plength, bool retained)
 {
     if (connected()) {
         // Leave room in the buffer for header and variable length field
@@ -452,8 +450,8 @@
  */
 bool MQTTClient::_write(uint8_t header, uint8_t* buf, uint16_t length)
 {
-    uint8_t lenBuf[4];
-    uint8_t llen = 0;
+    uint8_t digitBuf[4];
+    uint8_t digitLen = 0;
     uint8_t digit;
     uint8_t pos = 0;
     uint8_t rc;
@@ -465,19 +463,19 @@
             digit |= 0x80;
         }
 
-        lenBuf[pos++] = digit;
-        llen++;
+        digitBuf[pos++] = digit;
+        digitLen++;
     } while (len > 0);
 
-    buf[4 - llen] = header;
-    for (int i = 0; i < llen; i++) {
-        buf[5 - llen + i] = lenBuf[i];
+    buf[4 - digitLen] = header;
+    for (int i = 0; i < digitLen; i++) {
+        buf[5 - digitLen + i] = digitBuf[i];
     }
 
-    rc = _client->send(buf + (4 - llen), length + 1 + llen);
+    rc = _client.send(buf + (4 - digitLen), length + 1 + digitLen);
 
     _lastOutActivity = time(NULL);
-    return(rc == 1 + llen + length);
+    return(rc == 1 + digitLen + length);
 }
 
 /**
@@ -552,12 +550,12 @@
  * @param
  * @retval
  */
-void MQTTClient::disconnect(void)
+void MQTTClient::disconnect()
 {
     _buffer[0] = MQTTDISCONNECT;
     _buffer[1] = 0;
-    _client->send(_buffer, 2);
-    _client->stop();
+    _client.send(_buffer, 2);
+    _client.stop();
     _lastInActivity = _lastOutActivity = time(NULL);
 }
 
@@ -567,20 +565,20 @@
  * @param
  * @retval
  */
-uint16_t MQTTClient::_writeString(const char* string, uint8_t* buf, uint16_t pos)
+uint16_t MQTTClient::_writeString(const char* string, uint8_t* buf, uint16_t length)
 {
     char*       idp = (char*)string;
     uint16_t    i = 0;
 
-    pos += 2;
+    length += 2;
     while (*idp) {
-        buf[pos++] = *idp++;
+        buf[length++] = *idp++;
         i++;
     }
 
-    buf[pos - i - 2] = (i >> 8);
-    buf[pos - i - 1] = (i & 0xFF);
-    return pos;
+    buf[length - i - 2] = (i >> 8);
+    buf[length - i - 1] = (i & 0xFF);
+    return length;
 }
 
 /**
@@ -589,17 +587,12 @@
  * @param
  * @retval
  */
-bool MQTTClient::connected(void)
+bool MQTTClient::connected()
 {
-    bool    rc;
-    if (_client == NULL) {
-        rc = false;
-    }
-    else {
-        rc = (int)_client->connected();
-        if (!rc)
-            _client->stop();
-    }
+    bool    rc = (int)_client.connected();
+
+    if (!rc)
+        _client.stop();
 
     return rc;
 }