MQTT Client for ENC28J60 Ethernet modules. Depends on the UIPEthernet library.
Dependents: mBuino_ENC28_MQTT MQTT_DHT11_ENC28J60 MQTT_DHT11_ENC28J60_tushar MQTT_Hello_ENC28J60
MQTT library for ENC28J60 Ethernet modules.
Depends on the UIPEthernet library.
Example program:
Import programMQTT_Hello_ENC28J60
MQTT Client example program. Ethernet connection is via an ENC28J60 module.
Diff: MQTTClient.cpp
- Revision:
- 3:5e31b4687aad
- Parent:
- 2:ca17c5d846e7
- Child:
- 4:8620de6d1696
--- a/MQTTClient.cpp Sun Nov 29 18:12:22 2015 +0000 +++ b/MQTTClient.cpp Tue Sep 03 13:41:16 2019 +0000 @@ -3,7 +3,7 @@ Nicholas O'Leary http://knolleary.net - Ported to mbed by Zoltan Hudak <hudakz@inbox.com> + Ported to mbed by Zoltan Hudak <hudakz@outlook.com> */ #include "MQTTClient.h" #include <string.h> @@ -15,10 +15,10 @@ * @param * @retval */ -MQTTClient::MQTTClient(void) { - this->_client = NULL; - this->stream = NULL; -} +MQTTClient::MQTTClient(void) : + _client(NULL), + _stream(NULL) +{ } /** * @brief @@ -28,18 +28,18 @@ */ MQTTClient::MQTTClient ( - IPAddress& ip, + IpAddress& ip, uint16_t port, - void (*onMessage) (char*, uint8_t*, unsigned int), - Client& client -) { - this->_client = &client; - this->onMessage = onMessage; - this->ip = ip; - this->port = port; - this->domain = NULL; - this->stream = NULL; -} + void (*onMessage) (char*, uint8_t*, unsigned int), + TcpClient& client +) : + _client(&client), + _ip(ip), + _domain(NULL), + _port(port), + _stream(NULL), + _onMessage(onMessage) +{ } /** * @brief @@ -47,13 +47,19 @@ * @param * @retval */ -MQTTClient::MQTTClient(char* domain, uint16_t port, void (*onMessage) (char*, uint8_t*, unsigned int), Client& client) { - this->_client = &client; - this->onMessage = onMessage; - this->domain = domain; - this->port = port; - this->stream = NULL; -} +MQTTClient::MQTTClient +( + const char* domain, + uint16_t port, + void (*onMessage) (char*, uint8_t*, unsigned int), + TcpClient& client +) : + _client(&client), + _domain((char*)domain), + _port(port), + _stream(NULL), + _onMessage(onMessage) +{ } /** * @brief @@ -63,19 +69,19 @@ */ MQTTClient::MQTTClient ( - IPAddress& ip, + IpAddress& ip, uint16_t port, - void (*onMessage) (char*, uint8_t*, unsigned int), - Client& client, - Stream& stream -) { - this->_client = &client; - this->onMessage = onMessage; - this->ip = ip; - this->port = port; - this->domain = NULL; - this->stream = &stream; -} + void (*onMessage) (char*, uint8_t*, unsigned int), + TcpClient& client, + Stream& stream +) : + _client(&client), + _ip(ip), + _domain(NULL), + _port(port), + _stream(&stream), + _onMessage(onMessage) +{ } /** * @brief @@ -85,18 +91,18 @@ */ MQTTClient::MQTTClient ( - char* domain, + const char* domain, uint16_t port, - void (*onMessage) (char*, uint8_t*, unsigned int), - Client& client, - Stream& stream -) { - this->_client = &client; - this->onMessage = onMessage; - this->domain = domain; - this->port = port; - this->stream = &stream; -} + void (*onMessage) (char*, uint8_t*, unsigned int), + TcpClient& client, + Stream& stream +) : + _client(&client), + _domain((char*)domain), + _port(port), + _stream(&stream), + _onMessage(onMessage) +{ } /** * @brief @@ -104,7 +110,8 @@ * @param * @retval */ -bool MQTTClient::connect(char* id) { +bool MQTTClient::connect(const char* id) +{ return connect(id, NULL, NULL, 0, 0, 0, 0); } @@ -114,7 +121,8 @@ * @param * @retval */ -bool MQTTClient::connect(char* id, char* user, char* pass) { +bool MQTTClient::connect(const char* id, const char* user, const char* pass) +{ return connect(id, user, pass, 0, 0, 0, 0); } @@ -124,7 +132,8 @@ * @param * @retval */ -bool MQTTClient::connect(char* id, char* willTopic, uint8_t willQos, uint8_t willRetain, char* willMessage) { +bool MQTTClient::connect(const char* id, const char* willTopic, uint8_t willQos, uint8_t willRetain, const char* willMessage) +{ return connect(id, NULL, NULL, willTopic, willQos, willRetain, willMessage); } @@ -136,87 +145,88 @@ */ bool MQTTClient::connect ( - char* id, - char* user, - char* pass, - char* willTopic, - uint8_t willQos, - uint8_t willRetain, - char* willMessage -) { - if(!connected()) { + const char* id, + const char* user, + const char* pass, + const char* willTopic, + uint8_t willQos, + uint8_t willRetain, + const char* willMessage +) +{ + if (!connected()) { int result = 0; - if(domain != NULL) { - result = _client->connect(this->domain, this->port); + if (_domain != NULL) { + result = !_client->connect(this->_domain, this->_port); } else { - result = _client->connect(this->ip, this->port); + result = _client->connect(this->_ip, this->_port); } - if(result) { - nextMsgId = 1; + if (result) { + _nextMsgId = 1; uint8_t d[9] = { 0x00, 0x06, 'M', 'Q', 'I', 's', 'd', 'p', MQTTPROTOCOLVERSION }; // Leave room in the buffer for header and variable length field uint16_t length = 5; unsigned int j; - for(j = 0; j < 9; j++) { - buffer[length++] = d[j]; + for (j = 0; j < 9; j++) { + _buffer[length++] = d[j]; } uint8_t v; - if(willTopic) { + if (willTopic) { v = 0x06 | (willQos << 3) | (willRetain << 5); } else { v = 0x02; } - if(user != NULL) { + if (user != NULL) { v = v | 0x80; - if(pass != NULL) { + if (pass != NULL) { v = v | (0x80 >> 1); } } - buffer[length++] = v; + _buffer[length++] = v; - buffer[length++] = ((MQTT_KEEPALIVE) >> 8); - buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF); - length = writeString(id, buffer, length); - if(willTopic) { - length = writeString(willTopic, buffer, length); - length = writeString(willMessage, buffer, length); + _buffer[length++] = ((MQTT_KEEPALIVE) >> 8); + _buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF); + length = _writeString(id, _buffer, length); + if (willTopic) { + length = _writeString(willTopic, _buffer, length); + length = _writeString(willMessage, _buffer, length); } - if(user != NULL) { - length = writeString(user, buffer, length); - if(pass != NULL) { - length = writeString(pass, buffer, length); + if (user != NULL) { + length = _writeString(user, _buffer, length); + if (pass != NULL) { + length = _writeString(pass, _buffer, length); } } - write(MQTTCONNECT, buffer, length - 5); + _write(MQTTCONNECT, _buffer, length - 5); - lastInActivity = lastOutActivity = time(NULL); + _lastInActivity = _lastOutActivity = time(NULL); - while(!_client->available()) { + while (!_client->available()) { unsigned long t = time(NULL); - if(t - lastInActivity > MQTT_KEEPALIVE) { + if (t - _lastInActivity > MQTT_KEEPALIVE) { _client->stop(); return false; } } uint8_t llen; - uint16_t len = readPacket(&llen); + uint16_t len = _readPacket(&llen); - if(len == 4 && buffer[3] == 0) { - lastInActivity = time(NULL); - pingOutstanding = false; + if (len == 4 && _buffer[3] == 0) { + _lastInActivity = time(NULL); + _pingOutstanding = false; return true; } } @@ -233,10 +243,11 @@ * @param * @retval */ -uint8_t MQTTClient::readByte(void) { - while(!_client->available()) { } +uint8_t MQTTClient::_readByte(void) +{ + while (!_client->available()) { } - return _client->read(); + return _client->recv(); } /** @@ -245,56 +256,54 @@ * @param * @retval */ -uint16_t MQTTClient::readPacket(uint8_t* lengthLength) { +uint16_t MQTTClient::_readPacket(uint8_t* lengthLength) +{ uint16_t len = 0; - buffer[len++] = readByte(); + _buffer[len++] = _readByte(); - bool isPublish = (buffer[0] & 0xF0) == MQTTPUBLISH; + bool isPublish = (_buffer[0] & 0xF0) == MQTTPUBLISH; uint32_t multiplier = 1; uint16_t length = 0; uint8_t digit = 0; uint16_t skip = 0; uint8_t start = 0; - do - { - digit = readByte(); - buffer[len++] = digit; + do { + digit = _readByte(); + _buffer[len++] = digit; length += (digit & 127) * multiplier; multiplier *= 128; - } while((digit & 128) != 0); + } while ((digit & 128) != 0); *lengthLength = len - 1; - if(isPublish) { - + if (isPublish) { // Read in topic length to calculate bytes to skip over for Stream writing - buffer[len++] = readByte(); - buffer[len++] = readByte(); - skip = (buffer[*lengthLength + 1] << 8) + buffer[*lengthLength + 2]; + _buffer[len++] = _readByte(); + _buffer[len++] = _readByte(); + skip = (_buffer[*lengthLength + 1] << 8) + _buffer[*lengthLength + 2]; start = 2; - if(buffer[0] & MQTTQOS1) { - + if (_buffer[0] & MQTTQOS1) { // skip message id skip += 2; } } - for(uint16_t i = start; i < length; i++) { - digit = readByte(); - if(this->stream) { - if(isPublish && len -*lengthLength - 2 > skip) { - this->stream->putc(digit); + for (uint16_t i = start; i < length; i++) { + digit = _readByte(); + if (this->_stream) { + if (isPublish && len -*lengthLength - 2 > skip) { + this->_stream->putc(digit); } } - if(len < MQTT_MAX_PACKET_SIZE) { - buffer[len] = digit; + if (len < MQTT_MAX_PACKET_SIZE) { + _buffer[len] = digit; } len++; } - if(!this->stream && len > MQTT_MAX_PACKET_SIZE) { + if (!this->_stream && len > MQTT_MAX_PACKET_SIZE) { len = 0; // This will cause the packet to be ignored. } @@ -307,71 +316,73 @@ * @param * @retval */ -bool MQTTClient::loop(void) { - if(connected()) { +bool MQTTClient::loop(void) +{ + if (connected()) { unsigned long t = time(NULL); - if((t - lastInActivity > MQTT_KEEPALIVE) || (t - lastOutActivity > MQTT_KEEPALIVE)) { - if(pingOutstanding) { + if ((t - _lastInActivity > MQTT_KEEPALIVE) || (t - _lastOutActivity > MQTT_KEEPALIVE)) { + if (_pingOutstanding) { _client->stop(); return false; } else { - buffer[0] = MQTTPINGREQ; - buffer[1] = 0; - _client->write(buffer, 2); - lastOutActivity = t; - lastInActivity = t; - pingOutstanding = true; + _buffer[0] = MQTTPINGREQ; + _buffer[1] = 0; + _client->send(_buffer, 2); + _lastOutActivity = t; + _lastInActivity = t; + _pingOutstanding = true; } } - if(_client->available()) { + if (_client->available()) { uint8_t llen; - uint16_t len = readPacket(&llen); + uint16_t len = _readPacket(&llen); uint16_t msgId = 0; uint8_t* payload; - if(len > 0) { - lastInActivity = t; + if (len > 0) { + _lastInActivity = t; - uint8_t type = buffer[0] & 0xF0; - if(type == MQTTPUBLISH) { - if(onMessage) { - uint16_t tl = (buffer[llen + 1] << 8) + buffer[llen + 2]; + //printf("Here I am\r\n"); + uint8_t type = _buffer[0] & 0xF0; + if (type == MQTTPUBLISH) { + if (_onMessage) { + uint16_t tl = (_buffer[llen + 1] << 8) + _buffer[llen + 2]; char topic[tl + 1]; - for(uint16_t i = 0; i < tl; i++) { - topic[i] = buffer[llen + 3 + i]; + for (uint16_t i = 0; i < tl; i++) { + topic[i] = _buffer[llen + 3 + i]; } topic[tl] = 0; // msgId only present for QOS>0 - if((buffer[0] & 0x06) == MQTTQOS1) { - msgId = (buffer[llen + 3 + tl] << 8) + buffer[llen + 3 + tl + 1]; - payload = buffer + llen + 3 + tl + 2; - onMessage(topic, payload, len - llen - 3 - tl - 2); + if ((_buffer[0] & 0x06) == MQTTQOS1) { + msgId = (_buffer[llen + 3 + tl] << 8) + _buffer[llen + 3 + tl + 1]; + payload = _buffer + llen + 3 + tl + 2; + _onMessage(topic, payload, len - llen - 3 - tl - 2); - buffer[0] = MQTTPUBACK; - buffer[1] = 2; - buffer[2] = (msgId >> 8); - buffer[3] = (msgId & 0xFF); - _client->write(buffer, 4); - lastOutActivity = t; + _buffer[0] = MQTTPUBACK; + _buffer[1] = 2; + _buffer[2] = (msgId >> 8); + _buffer[3] = (msgId & 0xFF); + _client->send(_buffer, 4); + _lastOutActivity = t; } else { - payload = buffer + llen + 3 + tl; - onMessage(topic, payload, len - llen - 3 - tl); + payload = _buffer + llen + 3 + tl; + _onMessage(topic, payload, len - llen - 3 - tl); } } } else - if(type == MQTTPINGREQ) { - buffer[0] = MQTTPINGRESP; - buffer[1] = 0; - _client->write(buffer, 2); + if (type == MQTTPINGREQ) { + _buffer[0] = MQTTPINGRESP; + _buffer[1] = 0; + _client->send(_buffer, 2); } else - if(type == MQTTPINGRESP) { - pingOutstanding = false; + if (type == MQTTPINGRESP) { + _pingOutstanding = false; } } } @@ -388,7 +399,8 @@ * @param * @retval */ -bool MQTTClient::publish(char* topic, char* payload) { +bool MQTTClient::publish(const char* topic, const char* payload) +{ return publish(topic, (uint8_t*)payload, strlen(payload), false); } @@ -398,7 +410,8 @@ * @param * @retval */ -bool MQTTClient::publish(char* topic, uint8_t* payload, unsigned int plength) { +bool MQTTClient::publish(const char* topic, uint8_t* payload, unsigned int plength) +{ return publish(topic, payload, plength, false); } @@ -408,24 +421,24 @@ * @param * @retval */ -bool MQTTClient::publish(char* topic, uint8_t* payload, unsigned int plength, bool retained) { - if(connected()) { - +bool MQTTClient::publish(const char* topic, uint8_t* payload, unsigned int plength, bool retained) +{ + if (connected()) { // Leave room in the buffer for header and variable length field uint16_t length = 5; - length = writeString(topic, buffer, length); + length = _writeString(topic, _buffer, length); uint16_t i; - for(i = 0; i < plength; i++) { - buffer[length++] = payload[i]; + for (i = 0; i < plength; i++) { + _buffer[length++] = payload[i]; } uint8_t header = MQTTPUBLISH; - if(retained) { + if (retained) { header |= 1; } - return write(header, buffer, length - 5); + return _write(header, _buffer, length - 5); } return false; @@ -437,33 +450,33 @@ * @param * @retval */ -bool MQTTClient::write(uint8_t header, uint8_t* buf, uint16_t length) { +bool MQTTClient::_write(uint8_t header, uint8_t* buf, uint16_t length) +{ uint8_t lenBuf[4]; uint8_t llen = 0; uint8_t digit; uint8_t pos = 0; uint8_t rc; uint8_t len = length; - do - { + do { digit = len % 128; len = len / 128; - if(len > 0) { + if (len > 0) { digit |= 0x80; } lenBuf[pos++] = digit; llen++; - } while(len > 0); + } while (len > 0); buf[4 - llen] = header; - for(int i = 0; i < llen; i++) { + for (int i = 0; i < llen; i++) { buf[5 - llen + i] = lenBuf[i]; } - rc = _client->write(buf + (4 - llen), length + 1 + llen); + rc = _client->send(buf + (4 - llen), length + 1 + llen); - lastOutActivity = time(NULL); + _lastOutActivity = time(NULL); return(rc == 1 + llen + length); } @@ -473,7 +486,8 @@ * @param * @retval */ -bool MQTTClient::subscribe(char* topic) { +bool MQTTClient::subscribe(const char* topic) +{ bool result = subscribe(topic, 0); wait_ms(50); return result; @@ -485,24 +499,24 @@ * @param * @retval */ -bool MQTTClient::subscribe(char* topic, uint8_t qos) { - if(qos > 1) +bool MQTTClient::subscribe(const char* topic, uint8_t qos) +{ + if (qos > 1) return false; - if(connected()) { - + if (connected()) { // Leave room in the buffer for header and variable length field uint16_t length = 5; - nextMsgId++; - if(nextMsgId == 0) { - nextMsgId = 1; + _nextMsgId++; + if (_nextMsgId == 0) { + _nextMsgId = 1; } - buffer[length++] = (nextMsgId >> 8); - buffer[length++] = (nextMsgId & 0xFF); - length = writeString(topic, buffer, length); - buffer[length++] = qos; - return write(MQTTSUBSCRIBE | MQTTQOS1, buffer, length - 5); + _buffer[length++] = (_nextMsgId >> 8); + _buffer[length++] = (_nextMsgId & 0xFF); + length = _writeString(topic, _buffer, length); + _buffer[length++] = qos; + return _write(MQTTSUBSCRIBE | MQTTQOS1, _buffer, length - 5); } return false; @@ -514,18 +528,19 @@ * @param * @retval */ -bool MQTTClient::unsubscribe(char* topic) { - if(connected()) { +bool MQTTClient::unsubscribe(const char* topic) +{ + if (connected()) { uint16_t length = 5; - nextMsgId++; - if(nextMsgId == 0) { - nextMsgId = 1; + _nextMsgId++; + if (_nextMsgId == 0) { + _nextMsgId = 1; } - buffer[length++] = (nextMsgId >> 8); - buffer[length++] = (nextMsgId & 0xFF); - length = writeString(topic, buffer, length); - return write(MQTTUNSUBSCRIBE | MQTTQOS1, buffer, length - 5); + _buffer[length++] = (_nextMsgId >> 8); + _buffer[length++] = (_nextMsgId & 0xFF); + length = _writeString(topic, _buffer, length); + return _write(MQTTUNSUBSCRIBE | MQTTQOS1, _buffer, length - 5); } return false; @@ -537,12 +552,13 @@ * @param * @retval */ -void MQTTClient::disconnect(void) { - buffer[0] = MQTTDISCONNECT; - buffer[1] = 0; - _client->write(buffer, 2); +void MQTTClient::disconnect(void) +{ + _buffer[0] = MQTTDISCONNECT; + _buffer[1] = 0; + _client->send(_buffer, 2); _client->stop(); - lastInActivity = lastOutActivity = time(NULL); + _lastInActivity = _lastOutActivity = time(NULL); } /** @@ -551,11 +567,13 @@ * @param * @retval */ -uint16_t MQTTClient::writeString(char* string, uint8_t* buf, uint16_t pos) { - char* idp = string; +uint16_t MQTTClient::_writeString(const char* string, uint8_t* buf, uint16_t pos) +{ + char* idp = (char*)string; uint16_t i = 0; + pos += 2; - while(*idp) { + while (*idp) { buf[pos++] = *idp++; i++; } @@ -571,17 +589,17 @@ * @param * @retval */ -bool MQTTClient::connected(void) { +bool MQTTClient::connected(void) +{ bool rc; - if(_client == NULL) { + if (_client == NULL) { rc = false; } else { rc = (int)_client->connected(); - if(!rc) + if (!rc) _client->stop(); } return rc; } -