Local fixes

MQTTClient.cpp

Committer:
hudakz
Date:
2014-09-15
Revision:
0:83c732b10e95
Child:
1:87da395325fc

File content as of revision 0:83c732b10e95:

/*
 MQTTClient.cpp - A simple client for MQTT.
  Nicholas O'Leary
  http://knolleary.net
*/

#include "MQTTClient.h"
#include <string.h>
#include <time.h>

MQTTClient::MQTTClient()
{
    this->_client = NULL;
    this->stream = NULL;
}

MQTTClient::MQTTClient(IPAddress& ip, uint16_t port, void (*onMessage)(char*,uint8_t*,unsigned int), Client& client)
{
    this->_client = &client;
    this->onMessage = onMessage;
    this->ip = ip;
    this->port = port;
    this->domain = NULL;
    this->stream = NULL;
}

MQTTClient::MQTTClient(char* domain, uint16_t port, void (*onMessage)(char*,uint8_t*,unsigned int), Client& client)
{
    this->_client = &client;
    this->onMessage = onMessage;
    this->domain = domain;
    this->port = port;
    this->stream = NULL;
}

MQTTClient::MQTTClient(IPAddress& ip, uint16_t port, void (*onMessage)(char*,uint8_t*,unsigned int), Client& client, Stream& stream)
{
    this->_client = &client;
    this->onMessage = onMessage;
    this->ip = ip;
    this->port = port;
    this->domain = NULL;
    this->stream = &stream;
}

MQTTClient::MQTTClient(char* domain, uint16_t port, void (*onMessage)(char*,uint8_t*,unsigned int), Client& client, Stream& stream)
{
    this->_client = &client;
    this->onMessage = onMessage;
    this->domain = domain;
    this->port = port;
    this->stream = &stream;
}

bool MQTTClient::connect(char *id)
{
    return connect(id,NULL,NULL,0,0,0,0);
}

bool MQTTClient::connect(char *id, char *user, char *pass)
{
    return connect(id,user,pass,0,0,0,0);
}

bool MQTTClient::connect(char *id, char* willTopic, uint8_t willQos, uint8_t willRetain, char* willMessage)
{
    return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage);
}

bool MQTTClient::connect(char *id, char *user, char *pass, char* willTopic, uint8_t willQos, uint8_t willRetain, char* willMessage)
{
    if (!connected()) {
        int result = 0;

        if (domain != NULL) {
            result = _client->connect(this->domain, this->port);
        } else {
            result = _client->connect(this->ip, this->port);
        }

        if (result) {
            nextMsgId = 1;
            uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p',MQTTPROTOCOLVERSION};
            // Leave room in the buffer for header and variable length field
            uint16_t length = 5;
            unsigned int j;
            for (j = 0; j<9; j++) {
                buffer[length++] = d[j];
            }

            uint8_t v;
            if (willTopic) {
                v = 0x06|(willQos<<3)|(willRetain<<5);
            } else {
                v = 0x02;
            }

            if(user != NULL) {
                v = v|0x80;

                if(pass != NULL) {
                    v = v|(0x80>>1);
                }
            }

            buffer[length++] = v;

            buffer[length++] = ((MQTT_KEEPALIVE) >> 8);
            buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF);
            length = writeString(id,buffer,length);
            if (willTopic) {
                length = writeString(willTopic,buffer,length);
                length = writeString(willMessage,buffer,length);
            }

            if(user != NULL) {
                length = writeString(user,buffer,length);
                if(pass != NULL) {
                    length = writeString(pass,buffer,length);
                }
            }

            write(MQTTCONNECT,buffer,length-5);

            lastInActivity = lastOutActivity = time(NULL);

            while (!_client->available()) {
                time_t t = time(NULL);
                if (t-lastInActivity > MQTT_KEEPALIVE) {
                    _client->stop();
                    return false;
                }
            }
            uint8_t llen;
            uint16_t len = readPacket(&llen);

            if (len == 4 && buffer[3] == 0) {
                lastInActivity = time(NULL);
                pingOutstanding = false;
                return true;
            }
        }
        _client->stop();
    }
    return false;
}

uint8_t MQTTClient::readByte()
{
    while(!_client->available()) {}
    return _client->read();
}

uint16_t MQTTClient::readPacket(uint8_t* lengthLength)
{
    uint16_t len = 0;
    buffer[len++] = readByte();
    bool isPublish = (buffer[0]&0xF0) == MQTTPUBLISH;
    uint32_t multiplier = 1;
    uint16_t length = 0;
    uint8_t digit = 0;
    uint16_t skip = 0;
    uint8_t start = 0;

    do {
        digit = readByte();
        buffer[len++] = digit;
        length += (digit & 127) * multiplier;
        multiplier *= 128;
    } while ((digit & 128) != 0);
    *lengthLength = len-1;

    if (isPublish) {
        // Read in topic length to calculate bytes to skip over for Stream writing
        buffer[len++] = readByte();
        buffer[len++] = readByte();
        skip = (buffer[*lengthLength+1]<<8)+buffer[*lengthLength+2];
        start = 2;
        if (buffer[0]&MQTTQOS1) {
            // skip message id
            skip += 2;
        }
    }

    for (uint16_t i = start; i<length; i++) {
        digit = readByte();
        if (this->stream) {
            if (isPublish && len-*lengthLength-2>skip) {
                this->stream->putc(digit);
            }
        }
        if (len < MQTT_MAX_PACKET_SIZE) {
            buffer[len] = digit;
        }
        len++;
    }

    if (!this->stream && len > MQTT_MAX_PACKET_SIZE) {
        len = 0; // This will cause the packet to be ignored.
    }

    return len;
}

bool MQTTClient::loop()
{
    if (connected()) {
        time_t t = time(NULL);
        if ((t - lastInActivity > MQTT_KEEPALIVE) || (t - lastOutActivity > MQTT_KEEPALIVE)) {
            if (pingOutstanding) {
                _client->stop();
                return false;
            } else {
                buffer[0] = MQTTPINGREQ;
                buffer[1] = 0;
                _client->write(buffer,2);
                lastOutActivity = t;
                lastInActivity = t;
                pingOutstanding = true;
            }
        }
        if (_client->available()) {
            uint8_t llen;
            uint16_t len = readPacket(&llen);
            uint16_t msgId = 0;
            uint8_t *payload;
            if (len > 0) {
                lastInActivity = t;
                uint8_t type = buffer[0]&0xF0;
                if (type == MQTTPUBLISH) {
                    if (onMessage) {
                        uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2];
                        char topic[tl+1];
                        for (uint16_t i=0; i<tl; i++) {
                            topic[i] = buffer[llen+3+i];
                        }
                        topic[tl] = 0;
                        // msgId only present for QOS>0
                        if ((buffer[0]&0x06) == MQTTQOS1) {
                            msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1];
                            payload = buffer+llen+3+tl+2;
                            onMessage(topic,payload,len-llen-3-tl-2);

                            buffer[0] = MQTTPUBACK;
                            buffer[1] = 2;
                            buffer[2] = (msgId >> 8);
                            buffer[3] = (msgId & 0xFF);
                            _client->write(buffer,4);
                            lastOutActivity = t;

                        } else {
                            payload = buffer+llen+3+tl;
                            onMessage(topic,payload,len-llen-3-tl);
                        }
                    }
                } else if (type == MQTTPINGREQ) {
                    buffer[0] = MQTTPINGRESP;
                    buffer[1] = 0;
                    _client->write(buffer,2);
                } else if (type == MQTTPINGRESP) {
                    pingOutstanding = false;
                }
            }
        }
        return true;
    }
    return false;
}

bool MQTTClient::publish(char* topic, char* payload)
{
    return publish(topic,(uint8_t*)payload,strlen(payload),false);
}

bool MQTTClient::publish(char* topic, uint8_t* payload, unsigned int plength)
{
    return publish(topic, payload, plength, false);
}

bool MQTTClient::publish(char* topic, uint8_t* payload, unsigned int plength, bool retained)
{
    if (connected()) {
        // Leave room in the buffer for header and variable length field
        uint16_t length = 5;
        length = writeString(topic,buffer,length);
        uint16_t i;
        for (i=0; i<plength; i++) {
            buffer[length++] = payload[i];
        }
        uint8_t header = MQTTPUBLISH;
        if (retained) {
            header |= 1;
        }
        return write(header,buffer,length-5);
    }
    return false;
}

bool MQTTClient::write(uint8_t header, uint8_t* buf, uint16_t length)
{
    uint8_t lenBuf[4];
    uint8_t llen = 0;
    uint8_t digit;
    uint8_t pos = 0;
    uint8_t rc;
    uint8_t len = length;
    do {
        digit = len % 128;
        len = len / 128;
        if (len > 0) {
            digit |= 0x80;
        }
        lenBuf[pos++] = digit;
        llen++;
    } while(len>0);

    buf[4-llen] = header;
    for (int i=0; i<llen; i++) {
        buf[5-llen+i] = lenBuf[i];
    }
    rc = _client->write(buf+(4-llen),length+1+llen);

    lastOutActivity = time(NULL);
    return (rc == 1+llen+length);
}

bool MQTTClient::subscribe(char* topic)
{
    return subscribe(topic, 0);
}

bool MQTTClient::subscribe(char* topic, uint8_t qos)
{
    if (qos > 1)
        return false;

    if (connected()) {
        // Leave room in the buffer for header and variable length field
        uint16_t length = 5;
        nextMsgId++;
        if (nextMsgId == 0) {
            nextMsgId = 1;
        }
        buffer[length++] = (nextMsgId >> 8);
        buffer[length++] = (nextMsgId & 0xFF);
        length = writeString(topic, buffer,length);
        buffer[length++] = qos;
        return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5);
    }
    return false;
}

bool MQTTClient::unsubscribe(char* topic)
{
    if (connected()) {
        uint16_t length = 5;
        nextMsgId++;
        if (nextMsgId == 0) {
            nextMsgId = 1;
        }
        buffer[length++] = (nextMsgId >> 8);
        buffer[length++] = (nextMsgId & 0xFF);
        length = writeString(topic, buffer,length);
        return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5);
    }
    return false;
}

void MQTTClient::disconnect()
{
    buffer[0] = MQTTDISCONNECT;
    buffer[1] = 0;
    _client->write(buffer,2);
    _client->stop();
    lastInActivity = lastOutActivity = time(NULL);
}

uint16_t MQTTClient::writeString(char* string, uint8_t* buf, uint16_t pos)
{
    char* idp = string;
    uint16_t i = 0;
    pos += 2;
    while (*idp) {
        buf[pos++] = *idp++;
        i++;
    }
    buf[pos-i-2] = (i >> 8);
    buf[pos-i-1] = (i & 0xFF);
    return pos;
}


bool MQTTClient::connected()
{
    bool rc;
    if (_client == NULL ) {
        rc = false;
    } else {
        rc = (int)_client->connected();
        if (!rc) _client->stop();
    }
    return rc;
}