Local fixes
MQTTClient.cpp
- Committer:
- ivo_n
- Date:
- 2020-09-24
- Revision:
- 7:0f12a3d0bd10
- Parent:
- 6:1ce536bf461b
File content as of revision 7:0f12a3d0bd10:
/*
MQTTClient.cpp - A simple client for MQTT.
Nicholas O'Leary
http://knolleary.net
Ported to mbed by Zoltan Hudak <hudakz@outlook.com>
*/
#include "MQTTClient.h"
#include <string.h>
#include <time.h>
#define UIPETHERNET_DEBUG 1
/**
* @brief
* @note
* @param
* @retval
*/
MQTTClient::MQTTClient() :
_client(NULL),
_stream(NULL),
_onMessage(NULL)
{ }
/**
* @brief
* @note
* @param
* @retval
*/
MQTTClient::MQTTClient(IpAddress& ip, uint16_t port) :
_ip(ip),
_domain(NULL),
_port(port),
_stream(NULL),
_onMessage(NULL)
{ }
/**
* @brief
* @note
* @param
* @retval
*/
MQTTClient::MQTTClient(const char* domain, uint16_t port) :
_domain((char*)domain),
_port(port),
_stream(NULL),
_onMessage(NULL)
{ }
/**
* @brief
* @note
* @param
* @retval
*/
MQTTClient::MQTTClient(IpAddress& ip, uint16_t port, Stream& stream) :
_ip(ip),
_domain(NULL),
_port(port),
_stream(&stream),
_onMessage(NULL)
{ }
/**
* @brief
* @note
* @param
* @retval
*/
MQTTClient::MQTTClient(const char* domain, uint16_t port, Stream& stream) :
_domain((char*)domain),
_port(port),
_stream(&stream),
_onMessage(NULL)
{ }
/**
* @brief
* @note
* @param
* @retval
*/
bool MQTTClient::connect(const char* id)
{
return connect(id, NULL, NULL, 0, 0, 0, 0);
}
/**
* @brief
* @note
* @param
* @retval
*/
bool MQTTClient::connect(const char* id, const char* user, const char* pass)
{
return connect(id, user, pass, 0, 0, 0, 0);
}
/**
* @brief
* @note
* @param
* @retval
*/
bool MQTTClient::connect
(
const char* id,
const char* willTopic,
uint8_t willQos,
uint8_t willRetain,
const char* willMessage
)
{
return connect(id, NULL, NULL, willTopic, willQos, willRetain, willMessage);
}
/**
* @brief
* @note
* @param
* @retval
*/
bool MQTTClient::connect
(
const char* id,
const char* user,
const char* pass,
const char* willTopic,
uint8_t willQos,
uint8_t willRetain,
const char* willMessage
)
{
if (!connected()) {
#ifdef UIPETHERNET_DEBUG
printf("MQTTClient::connect \r\n");
#endif
int result = 0;
if (_domain != NULL) {
result = !_client.connect(this->_domain, this->_port);
}
else {
result = _client.connect(this->_ip, this->_port);
}
if (result) {
_nextMsgId = 1;
uint8_t d[] = { 0x00, 0x06, 'M', 'Q', 'I', 's', 'd', 'p', MQTTPROTOCOLVERSION };
// Leave room in the buffer for header and variable length field
uint16_t pos = 5;
uint16_t j;
for (j = 0; j < sizeof(d); j++) {
_buffer[pos++] = d[j];
}
uint8_t v;
if (willTopic) {
v = 0x06 | (willQos << 3) | (willRetain << 5);
}
else {
v = 0x02;
}
if (user != NULL) {
v = v | 0x80;
if (pass != NULL) {
v = v | (0x80 >> 1);
}
}
_buffer[pos++] = v;
_buffer[pos++] = ((MQTT_KEEPALIVE) >> 8);
_buffer[pos++] = ((MQTT_KEEPALIVE) & 0xFF);
pos = _writeString(id, _buffer, pos);
if (willTopic) {
pos = _writeString(willTopic, _buffer, pos);
pos = _writeString(willMessage, _buffer, pos);
}
if (user != NULL) {
pos = _writeString(user, _buffer, pos);
if (pass != NULL) {
pos = _writeString(pass, _buffer, pos);
}
}
_write(MQTTCONNECT, _buffer, pos - 5);
_lastInActivity = _lastOutActivity = time(NULL);
while (!_client.available()) {
unsigned long t = time(NULL);
if (t - _lastInActivity > MQTT_KEEPALIVE) {
_client.stop();
#ifdef UIPETHERNET_DEBUG
printf("MQTTClient::connect available \r\n");
#endif
return false;
}
}
uint8_t len;
if (_readPacket(&len) == 4 && _buffer[3] == 0) {
_lastInActivity = time(NULL);
_pingOutstanding = false;
#ifdef UIPETHERNET_DEBUG
printf("MQTTClient::connect 4 \r\n");
#endif
return true;
}
}
_client.stop();
}
#ifdef UIPETHERNET_DEBUG
printf("MQTTClient::connect End \r\n");
#endif
return false;
}
/**
* @brief
* @note
* @param
* @retval
*/
uint8_t MQTTClient::_readByte()
{
while (!_client.available()) { }
return _client.recv();
}
/**
* @brief
* @note
* @param
* @retval
*/
uint16_t MQTTClient::_readPacket(uint8_t* length)
{
#ifdef UIPETHERNET_DEBUG
printf("MQTTClient::_readPacket \r\n");
#endif
uint16_t len = 0;
_buffer[len++] = _readByte();
bool isPublish = (_buffer[0] & 0xF0) == MQTTPUBLISH;
uint32_t multiplier = 1;
uint16_t pos = 0;
uint8_t digit = 0;
uint16_t skip = 0;
uint8_t start = 0;
do {
digit = _readByte();
_buffer[len++] = digit;
pos += (digit & 127) * multiplier;
multiplier *= 128;
} while ((digit & 128) != 0);
*length = len - 1;
if (isPublish) {
// Read in topic length to calculate bytes to skip over for Stream writing
_buffer[len++] = _readByte();
_buffer[len++] = _readByte();
skip = (_buffer[*length + 1] << 8) + _buffer[*length + 2];
start = 2;
if (_buffer[0] & MQTTQOS1) {
// skip message id
skip += 2;
}
}
for (uint16_t i = start; i < pos; i++) {
digit = _readByte();
if (this->_stream) {
if (isPublish && len -*length - 2 > skip) {
this->_stream->putc(digit);
}
}
if (len < MQTT_MAX_PACKET_SIZE) {
_buffer[len] = digit;
}
len++;
}
if (!this->_stream && len > MQTT_MAX_PACKET_SIZE) {
len = 0; // This will cause the packet to be ignored.
}
#ifdef UIPETHERNET_DEBUG
printf("MQTTClient::_readPacket End \r\n");
#endif
return len;
}
/**
* @brief
* @note
* @param
* @retval
*/
bool MQTTClient::poll()
{
#ifdef UIPETHERNET_DEBUG
// printf("bool MQTTClient::poll() \r\n");
#endif
if (connected()) {
time_t now = time(NULL);
if ((now - _lastInActivity > MQTT_KEEPALIVE) || (now - _lastOutActivity > MQTT_KEEPALIVE)) {
if (_pingOutstanding) {
_client.stop();
return false;
}
else {
_buffer[0] = MQTTPINGREQ;
_buffer[1] = 0;
_client.send(_buffer, 2);
_lastOutActivity = now;
_lastInActivity = now;
_pingOutstanding = true;
}
}
if (_client.available()) {
uint8_t len;
uint16_t length = _readPacket(&len);
uint16_t msgId = 0;
uint8_t* payload;
if (length > 0) {
_lastInActivity = now;
uint8_t type = _buffer[0] & 0xF0;
if (type == MQTTPUBLISH) {
if (_onMessage) {
uint16_t topicLen = (_buffer[len + 1] << 8) + _buffer[len + 2];
char topic[topicLen + 1];
for (uint16_t i = 0; i < topicLen; i++) {
topic[i] = _buffer[len + 3 + i];
}
topic[topicLen] = '\0';
// msgId only present for QOS>0
if ((_buffer[0] & 0x06) == MQTTQOS1) {
msgId = (_buffer[len + 3 + topicLen] << 8) + _buffer[len + 3 + topicLen + 1];
payload = _buffer + len + 3 + topicLen + 2;
_onMessage(topic, payload, length - len - 3 - topicLen - 2);
_buffer[0] = MQTTPUBACK;
_buffer[1] = 2;
_buffer[2] = (msgId >> 8);
_buffer[3] = (msgId & 0xFF);
_client.send(_buffer, 4);
_lastOutActivity = now;
}
else {
payload = _buffer + len + 3 + topicLen;
_onMessage(topic, payload, length - len - 3 - topicLen);
}
}
}
else
if (type == MQTTPINGREQ) {
_buffer[0] = MQTTPINGRESP;
_buffer[1] = 0;
_client.send(_buffer, 2);
}
else
if (type == MQTTPINGRESP) {
_pingOutstanding = false;
}
}
}
return true;
}
#ifdef UIPETHERNET_DEBUG
printf("bool MQTTClient::poll() End \r\n");
#endif
return false;
}
/**
* @brief
* @note
* @param
* @retval
*/
bool MQTTClient::publish(const char* topic, const char* payload)
{
return publish(topic, (uint8_t*)payload, strlen(payload), false);
}
/**
* @brief
* @note
* @param
* @retval
*/
bool MQTTClient::publish(const char* topic, uint8_t* payload, uint16_t length)
{
return publish(topic, payload, length, false);
}
/**
* @brief
* @note
* @param
* @retval
*/
bool MQTTClient::publish(const char* topic, uint8_t* payload, uint16_t plength, bool retained)
{
#ifdef UIPETHERNET_DEBUG
printf("MQTTClient::publish \r\n");
#endif
if (connected()) {
// Leave room in the buffer for header and variable length field
uint16_t length = 5;
length = _writeString(topic, _buffer, length);
uint16_t i;
for (i = 0; i < plength; i++) {
_buffer[length++] = payload[i];
}
uint8_t header = MQTTPUBLISH;
if (retained) {
header |= 1;
}
return _write(header, _buffer, length - 5);
}
#ifdef UIPETHERNET_DEBUG
printf("MQTTClient::publish End \r\n");
#endif
return false;
}
/**
* @brief
* @note
* @param
* @retval
*/
bool MQTTClient::_write(uint8_t header, uint8_t* buf, uint16_t length)
{
uint8_t digitBuf[4];
uint8_t digitLen = 0;
uint8_t digit;
uint8_t pos = 0;
uint8_t rc;
uint8_t len = length;
do {
digit = len % 128;
len = len / 128;
if (len > 0) {
digit |= 0x80;
}
digitBuf[pos++] = digit;
digitLen++;
} while (len > 0);
buf[4 - digitLen] = header;
for (int i = 0; i < digitLen; i++) {
buf[5 - digitLen + i] = digitBuf[i];
}
rc = _client.send(buf + (4 - digitLen), length + 1 + digitLen);
_lastOutActivity = time(NULL);
return(rc == 1 + digitLen + length);
}
/**
* @brief
* @note
* @param
* @retval
*/
bool MQTTClient::subscribe(const char* topic)
{
bool result = subscribe(topic, 0);
#if MBED_MAJOR_VERSION == 2
wait_ms(50);
#else
thread_sleep_for(50);
#endif
return result;
}
/**
* @brief
* @note
* @param
* @retval
*/
bool MQTTClient::subscribe(const char* topic, uint8_t qos)
{
#ifdef UIPETHERNET_DEBUG
printf("MQTTClient::subscribe \r\n");
#endif
if (qos > 1)
return false;
if (connected()) {
// Leave room in the buffer for header and variable length field
uint16_t length = 5;
_nextMsgId++;
if (_nextMsgId == 0) {
_nextMsgId = 1;
}
_buffer[length++] = (_nextMsgId >> 8);
_buffer[length++] = (_nextMsgId & 0xFF);
length = _writeString(topic, _buffer, length);
_buffer[length++] = qos;
return _write(MQTTSUBSCRIBE | MQTTQOS1, _buffer, length - 5);
}
#ifdef UIPETHERNET_DEBUG
printf("MQTTClient::subscribe End \r\n");
#endif
return false;
}
/**
* @brief
* @note
* @param
* @retval
*/
bool MQTTClient::unsubscribe(const char* topic)
{
#ifdef UIPETHERNET_DEBUG
printf("MQTTClient::unsubscribe \r\n");
#endif
if (connected()) {
uint16_t length = 5;
_nextMsgId++;
if (_nextMsgId == 0) {
_nextMsgId = 1;
}
_buffer[length++] = (_nextMsgId >> 8);
_buffer[length++] = (_nextMsgId & 0xFF);
length = _writeString(topic, _buffer, length);
return _write(MQTTUNSUBSCRIBE | MQTTQOS1, _buffer, length - 5);
}
#ifdef UIPETHERNET_DEBUG
printf("MQTTClient::unsubscribe End \r\n");
#endif
return false;
}
/**
* @brief
* @note
* @param
* @retval
*/
void MQTTClient::disconnect()
{
#ifdef UIPETHERNET_DEBUG
printf("MQTTClient::disconnect() \r\n");
#endif
_buffer[0] = MQTTDISCONNECT;
_buffer[1] = 0;
_client.send(_buffer, 2);
_client.close();
_client.stop();
_lastInActivity = _lastOutActivity = time(NULL);
#ifdef UIPETHERNET_DEBUG
printf("MQTTClient::disconnect() End \r\n");
#endif
}
/**
* @brief
* @note
* @param
* @retval
*/
uint16_t MQTTClient::_writeString(const char* string, uint8_t* buf, uint16_t length)
{
char* idp = (char*)string;
uint16_t i = 0;
length += 2;
while (*idp) {
buf[length++] = *idp++;
i++;
}
buf[length - i - 2] = (i >> 8);
buf[length - i - 1] = (i & 0xFF);
return length;
}
/**
* @brief
* @note
* @param
* @retval
*/
bool MQTTClient::connected()
{
bool rc = (int)_client.connected();
if (!rc)
_client.stop();
return rc;
}
/**
* @brief
* @note
* @param
* @retval
*/
void MQTTClient::attach(Callback<void (char *, uint8_t *, uint16_t)> fnc)
{
_onMessage = fnc;
}