MQTT Client for ENC28J60 Ethernet modules. Depends on the UIPEthernet library.

Dependents:   mBuino_ENC28_MQTT MQTT_DHT11_ENC28J60 MQTT_DHT11_ENC28J60_tushar MQTT_Hello_ENC28J60

MQTT library for ENC28J60 Ethernet modules.

/media/uploads/hudakz/enc28j60_module01.jpg

Depends on the UIPEthernet library.

Example program:

Import programMQTT_Hello_ENC28J60

MQTT Client example program. Ethernet connection is via an ENC28J60 module.

MQTTClient.cpp

Committer:
hudakz
Date:
2015-11-29
Revision:
2:ca17c5d846e7
Parent:
1:87da395325fc
Child:
3:5e31b4687aad

File content as of revision 2:ca17c5d846e7:

/*
 MQTTClient.cpp - A simple client for MQTT.
  Nicholas O'Leary
  http://knolleary.net

  Ported to mbed by Zoltan Hudak <hudakz@inbox.com>
*/
#include "MQTTClient.h"
#include <string.h>
#include <time.h>

/**
 * @brief
 * @note
 * @param
 * @retval
 */
MQTTClient::MQTTClient(void) {
    this->_client = NULL;
    this->stream = NULL;
}

/**
 * @brief
 * @note
 * @param
 * @retval
 */
MQTTClient::MQTTClient
(
    IPAddress&  ip,
    uint16_t    port,
    void (*onMessage) (char*, uint8_t*, unsigned int),
    Client& client
) {
    this->_client = &client;
    this->onMessage = onMessage;
    this->ip = ip;
    this->port = port;
    this->domain = NULL;
    this->stream = NULL;
}

/**
 * @brief
 * @note
 * @param
 * @retval
 */
MQTTClient::MQTTClient(char* domain, uint16_t port, void (*onMessage) (char*, uint8_t*, unsigned int), Client& client) {
    this->_client = &client;
    this->onMessage = onMessage;
    this->domain = domain;
    this->port = port;
    this->stream = NULL;
}

/**
 * @brief
 * @note
 * @param
 * @retval
 */
MQTTClient::MQTTClient
(
    IPAddress&  ip,
    uint16_t    port,
    void (*onMessage) (char*, uint8_t*, unsigned int),
    Client& client,
    Stream& stream
) {
    this->_client = &client;
    this->onMessage = onMessage;
    this->ip = ip;
    this->port = port;
    this->domain = NULL;
    this->stream = &stream;
}

/**
 * @brief
 * @note
 * @param
 * @retval
 */
MQTTClient::MQTTClient
(
    char*       domain,
    uint16_t    port,
    void (*onMessage) (char*, uint8_t*, unsigned int),
    Client& client,
    Stream& stream
) {
    this->_client = &client;
    this->onMessage = onMessage;
    this->domain = domain;
    this->port = port;
    this->stream = &stream;
}

/**
 * @brief
 * @note
 * @param
 * @retval
 */
bool MQTTClient::connect(char* id) {
    return connect(id, NULL, NULL, 0, 0, 0, 0);
}

/**
 * @brief
 * @note
 * @param
 * @retval
 */
bool MQTTClient::connect(char* id, char* user, char* pass) {
    return connect(id, user, pass, 0, 0, 0, 0);
}

/**
 * @brief
 * @note
 * @param
 * @retval
 */
bool MQTTClient::connect(char* id, char* willTopic, uint8_t willQos, uint8_t willRetain, char* willMessage) {
    return connect(id, NULL, NULL, willTopic, willQos, willRetain, willMessage);
}

/**
 * @brief
 * @note
 * @param
 * @retval
 */
bool MQTTClient::connect
(
    char*   id,
    char*   user,
    char*   pass,
    char*   willTopic,
    uint8_t willQos,
    uint8_t willRetain,
    char*   willMessage
) {
    if(!connected()) {
        int result = 0;

        if(domain != NULL) {
            result = _client->connect(this->domain, this->port);
        }
        else {
            result = _client->connect(this->ip, this->port);
        }

        if(result) {
            nextMsgId = 1;

            uint8_t         d[9] = { 0x00, 0x06, 'M', 'Q', 'I', 's', 'd', 'p', MQTTPROTOCOLVERSION };

            // Leave room in the buffer for header and variable length field
            uint16_t        length = 5;
            unsigned int    j;
            for(j = 0; j < 9; j++) {
                buffer[length++] = d[j];
            }

            uint8_t v;
            if(willTopic) {
                v = 0x06 | (willQos << 3) | (willRetain << 5);
            }
            else {
                v = 0x02;
            }

            if(user != NULL) {
                v = v | 0x80;

                if(pass != NULL) {
                    v = v | (0x80 >> 1);
                }
            }

            buffer[length++] = v;

            buffer[length++] = ((MQTT_KEEPALIVE) >> 8);
            buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF);
            length = writeString(id, buffer, length);
            if(willTopic) {
                length = writeString(willTopic, buffer, length);
                length = writeString(willMessage, buffer, length);
            }

            if(user != NULL) {
                length = writeString(user, buffer, length);
                if(pass != NULL) {
                    length = writeString(pass, buffer, length);
                }
            }

            write(MQTTCONNECT, buffer, length - 5);

            lastInActivity = lastOutActivity = time(NULL);

            while(!_client->available()) {
                unsigned long   t = time(NULL);
                if(t - lastInActivity > MQTT_KEEPALIVE) {
                    _client->stop();
                    return false;
                }
            }

            uint8_t     llen;
            uint16_t    len = readPacket(&llen);

            if(len == 4 && buffer[3] == 0) {
                lastInActivity = time(NULL);
                pingOutstanding = false;
                return true;
            }
        }

        _client->stop();
    }

    return false;
}

/**
 * @brief
 * @note
 * @param
 * @retval
 */
uint8_t MQTTClient::readByte(void) {
    while(!_client->available()) { }

    return _client->read();
}

/**
 * @brief
 * @note
 * @param
 * @retval
 */
uint16_t MQTTClient::readPacket(uint8_t* lengthLength) {
    uint16_t    len = 0;
    buffer[len++] = readByte();

    bool        isPublish = (buffer[0] & 0xF0) == MQTTPUBLISH;
    uint32_t    multiplier = 1;
    uint16_t    length = 0;
    uint8_t     digit = 0;
    uint16_t    skip = 0;
    uint8_t     start = 0;

    do
    {
        digit = readByte();
        buffer[len++] = digit;
        length += (digit & 127) * multiplier;
        multiplier *= 128;
    } while((digit & 128) != 0);
    *lengthLength = len - 1;

    if(isPublish) {

        // Read in topic length to calculate bytes to skip over for Stream writing
        buffer[len++] = readByte();
        buffer[len++] = readByte();
        skip = (buffer[*lengthLength + 1] << 8) + buffer[*lengthLength + 2];
        start = 2;
        if(buffer[0] & MQTTQOS1) {

            // skip message id
            skip += 2;
        }
    }

    for(uint16_t i = start; i < length; i++) {
        digit = readByte();
        if(this->stream) {
            if(isPublish && len -*lengthLength - 2 > skip) {
                this->stream->putc(digit);
            }
        }

        if(len < MQTT_MAX_PACKET_SIZE) {
            buffer[len] = digit;
        }

        len++;
    }

    if(!this->stream && len > MQTT_MAX_PACKET_SIZE) {
        len = 0;    // This will cause the packet to be ignored.
    }

    return len;
}

/**
 * @brief
 * @note
 * @param
 * @retval
 */
bool MQTTClient::loop(void) {
    if(connected()) {
        unsigned long   t = time(NULL);
        if((t - lastInActivity > MQTT_KEEPALIVE) || (t - lastOutActivity > MQTT_KEEPALIVE)) {
            if(pingOutstanding) {
                _client->stop();
                return false;
            }
            else {
                buffer[0] = MQTTPINGREQ;
                buffer[1] = 0;
                _client->write(buffer, 2);
                lastOutActivity = t;
                lastInActivity = t;
                pingOutstanding = true;
            }
        }

        if(_client->available()) {
            uint8_t     llen;
            uint16_t    len = readPacket(&llen);
            uint16_t    msgId = 0;
            uint8_t*    payload;
            if(len > 0) {
                lastInActivity = t;

                uint8_t type = buffer[0] & 0xF0;
                if(type == MQTTPUBLISH) {
                    if(onMessage) {
                        uint16_t    tl = (buffer[llen + 1] << 8) + buffer[llen + 2];
                        char        topic[tl + 1];
                        for(uint16_t i = 0; i < tl; i++) {
                            topic[i] = buffer[llen + 3 + i];
                        }

                        topic[tl] = 0;

                        // msgId only present for QOS>0
                        if((buffer[0] & 0x06) == MQTTQOS1) {
                            msgId = (buffer[llen + 3 + tl] << 8) + buffer[llen + 3 + tl + 1];
                            payload = buffer + llen + 3 + tl + 2;
                            onMessage(topic, payload, len - llen - 3 - tl - 2);

                            buffer[0] = MQTTPUBACK;
                            buffer[1] = 2;
                            buffer[2] = (msgId >> 8);
                            buffer[3] = (msgId & 0xFF);
                            _client->write(buffer, 4);
                            lastOutActivity = t;
                        }
                        else {
                            payload = buffer + llen + 3 + tl;
                            onMessage(topic, payload, len - llen - 3 - tl);
                        }
                    }
                }
                else
                if(type == MQTTPINGREQ) {
                    buffer[0] = MQTTPINGRESP;
                    buffer[1] = 0;
                    _client->write(buffer, 2);
                }
                else
                if(type == MQTTPINGRESP) {
                    pingOutstanding = false;
                }
            }
        }

        return true;
    }

    return false;
}

/**
 * @brief
 * @note
 * @param
 * @retval
 */
bool MQTTClient::publish(char* topic, char* payload) {
    return publish(topic, (uint8_t*)payload, strlen(payload), false);
}

/**
 * @brief
 * @note
 * @param
 * @retval
 */
bool MQTTClient::publish(char* topic, uint8_t* payload, unsigned int plength) {
    return publish(topic, payload, plength, false);
}

/**
 * @brief
 * @note
 * @param
 * @retval
 */
bool MQTTClient::publish(char* topic, uint8_t* payload, unsigned int plength, bool retained) {
    if(connected()) {

        // Leave room in the buffer for header and variable length field
        uint16_t    length = 5;
        length = writeString(topic, buffer, length);

        uint16_t    i;
        for(i = 0; i < plength; i++) {
            buffer[length++] = payload[i];
        }

        uint8_t header = MQTTPUBLISH;
        if(retained) {
            header |= 1;
        }

        return write(header, buffer, length - 5);
    }

    return false;
}

/**
 * @brief
 * @note
 * @param
 * @retval
 */
bool MQTTClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
    uint8_t lenBuf[4];
    uint8_t llen = 0;
    uint8_t digit;
    uint8_t pos = 0;
    uint8_t rc;
    uint8_t len = length;
    do
    {
        digit = len % 128;
        len = len / 128;
        if(len > 0) {
            digit |= 0x80;
        }

        lenBuf[pos++] = digit;
        llen++;
    } while(len > 0);

    buf[4 - llen] = header;
    for(int i = 0; i < llen; i++) {
        buf[5 - llen + i] = lenBuf[i];
    }

    rc = _client->write(buf + (4 - llen), length + 1 + llen);

    lastOutActivity = time(NULL);
    return(rc == 1 + llen + length);
}

/**
 * @brief
 * @note
 * @param
 * @retval
 */
bool MQTTClient::subscribe(char* topic) {
    bool    result = subscribe(topic, 0);
    wait_ms(50);
    return result;
}

/**
 * @brief
 * @note
 * @param
 * @retval
 */
bool MQTTClient::subscribe(char* topic, uint8_t qos) {
    if(qos > 1)
        return false;

    if(connected()) {

        // Leave room in the buffer for header and variable length field
        uint16_t    length = 5;
        nextMsgId++;
        if(nextMsgId == 0) {
            nextMsgId = 1;
        }

        buffer[length++] = (nextMsgId >> 8);
        buffer[length++] = (nextMsgId & 0xFF);
        length = writeString(topic, buffer, length);
        buffer[length++] = qos;
        return write(MQTTSUBSCRIBE | MQTTQOS1, buffer, length - 5);
    }

    return false;
}

/**
 * @brief
 * @note
 * @param
 * @retval
 */
bool MQTTClient::unsubscribe(char* topic) {
    if(connected()) {
        uint16_t    length = 5;
        nextMsgId++;
        if(nextMsgId == 0) {
            nextMsgId = 1;
        }

        buffer[length++] = (nextMsgId >> 8);
        buffer[length++] = (nextMsgId & 0xFF);
        length = writeString(topic, buffer, length);
        return write(MQTTUNSUBSCRIBE | MQTTQOS1, buffer, length - 5);
    }

    return false;
}

/**
 * @brief
 * @note
 * @param
 * @retval
 */
void MQTTClient::disconnect(void) {
    buffer[0] = MQTTDISCONNECT;
    buffer[1] = 0;
    _client->write(buffer, 2);
    _client->stop();
    lastInActivity = lastOutActivity = time(NULL);
}

/**
 * @brief
 * @note
 * @param
 * @retval
 */
uint16_t MQTTClient::writeString(char* string, uint8_t* buf, uint16_t pos) {
    char*       idp = string;
    uint16_t    i = 0;
    pos += 2;
    while(*idp) {
        buf[pos++] = *idp++;
        i++;
    }

    buf[pos - i - 2] = (i >> 8);
    buf[pos - i - 1] = (i & 0xFF);
    return pos;
}

/**
 * @brief
 * @note
 * @param
 * @retval
 */
bool MQTTClient::connected(void) {
    bool    rc;
    if(_client == NULL) {
        rc = false;
    }
    else {
        rc = (int)_client->connected();
        if(!rc)
            _client->stop();
    }

    return rc;
}