MQTT Client for ENC28J60 Ethernet modules. Depends on the UIPEthernet library.
Dependents: mBuino_ENC28_MQTT MQTT_DHT11_ENC28J60 MQTT_DHT11_ENC28J60_tushar MQTT_Hello_ENC28J60
MQTT library for ENC28J60 Ethernet modules.
Depends on the UIPEthernet library.
Example program:
Import programMQTT_Hello_ENC28J60
MQTT Client example program. Ethernet connection is via an ENC28J60 module.
Diff: MQTTClient.cpp
- Revision:
- 4:8620de6d1696
- Parent:
- 3:5e31b4687aad
- Child:
- 5:361a6987739b
diff -r 5e31b4687aad -r 8620de6d1696 MQTTClient.cpp --- a/MQTTClient.cpp Tue Sep 03 13:41:16 2019 +0000 +++ b/MQTTClient.cpp Thu Sep 12 20:26:40 2019 +0000 @@ -15,7 +15,7 @@ * @param * @retval */ -MQTTClient::MQTTClient(void) : +MQTTClient::MQTTClient() : _client(NULL), _stream(NULL) { } @@ -30,10 +30,8 @@ ( IpAddress& ip, uint16_t port, - void (*onMessage) (char*, uint8_t*, unsigned int), - TcpClient& client + Callback<void (char*, uint8_t*, uint16_t)> onMessage ) : - _client(&client), _ip(ip), _domain(NULL), _port(port), @@ -49,12 +47,10 @@ */ MQTTClient::MQTTClient ( - const char* domain, + const char* domain, uint16_t port, - void (*onMessage) (char*, uint8_t*, unsigned int), - TcpClient& client + Callback<void (char*, uint8_t*, uint16_t)> onMessage ) : - _client(&client), _domain((char*)domain), _port(port), _stream(NULL), @@ -71,11 +67,9 @@ ( IpAddress& ip, uint16_t port, - void (*onMessage) (char*, uint8_t*, unsigned int), - TcpClient& client, + Callback<void (char*, uint8_t*, uint16_t)> onMessage, Stream& stream ) : - _client(&client), _ip(ip), _domain(NULL), _port(port), @@ -93,11 +87,9 @@ ( const char* domain, uint16_t port, - void (*onMessage) (char*, uint8_t*, unsigned int), - TcpClient& client, + Callback<void (char*, uint8_t*, uint16_t)> onMessage, Stream& stream ) : - _client(&client), _domain((char*)domain), _port(port), _stream(&stream), @@ -132,7 +124,14 @@ * @param * @retval */ -bool MQTTClient::connect(const char* id, const char* willTopic, uint8_t willQos, uint8_t willRetain, const char* willMessage) +bool MQTTClient::connect +( + const char* id, + const char* willTopic, + uint8_t willQos, + uint8_t willRetain, + const char* willMessage +) { return connect(id, NULL, NULL, willTopic, willQos, willRetain, willMessage); } @@ -145,35 +144,35 @@ */ bool MQTTClient::connect ( - const char* id, - const char* user, - const char* pass, - const char* willTopic, - uint8_t willQos, - uint8_t willRetain, - const char* willMessage + const char* id, + const char* user, + const char* pass, + const char* willTopic, + uint8_t willQos, + uint8_t willRetain, + const char* willMessage ) { if (!connected()) { int result = 0; if (_domain != NULL) { - result = !_client->connect(this->_domain, this->_port); + result = !_client.connect(this->_domain, this->_port); } else { - result = _client->connect(this->_ip, this->_port); + result = _client.connect(this->_ip, this->_port); } if (result) { _nextMsgId = 1; - uint8_t d[9] = { 0x00, 0x06, 'M', 'Q', 'I', 's', 'd', 'p', MQTTPROTOCOLVERSION }; + uint8_t d[] = { 0x00, 0x06, 'M', 'Q', 'I', 's', 'd', 'p', MQTTPROTOCOLVERSION }; // Leave room in the buffer for header and variable length field - uint16_t length = 5; - unsigned int j; - for (j = 0; j < 9; j++) { - _buffer[length++] = d[j]; + uint16_t pos = 5; + uint16_t j; + for (j = 0; j < sizeof(d); j++) { + _buffer[pos++] = d[j]; } uint8_t v; @@ -192,46 +191,45 @@ } } - _buffer[length++] = v; + _buffer[pos++] = v; - _buffer[length++] = ((MQTT_KEEPALIVE) >> 8); - _buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF); - length = _writeString(id, _buffer, length); + _buffer[pos++] = ((MQTT_KEEPALIVE) >> 8); + _buffer[pos++] = ((MQTT_KEEPALIVE) & 0xFF); + pos = _writeString(id, _buffer, pos); if (willTopic) { - length = _writeString(willTopic, _buffer, length); - length = _writeString(willMessage, _buffer, length); + pos = _writeString(willTopic, _buffer, pos); + pos = _writeString(willMessage, _buffer, pos); } if (user != NULL) { - length = _writeString(user, _buffer, length); + pos = _writeString(user, _buffer, pos); if (pass != NULL) { - length = _writeString(pass, _buffer, length); + pos = _writeString(pass, _buffer, pos); } } - _write(MQTTCONNECT, _buffer, length - 5); + _write(MQTTCONNECT, _buffer, pos - 5); _lastInActivity = _lastOutActivity = time(NULL); - while (!_client->available()) { + while (!_client.available()) { unsigned long t = time(NULL); if (t - _lastInActivity > MQTT_KEEPALIVE) { - _client->stop(); + _client.stop(); return false; } } - uint8_t llen; - uint16_t len = _readPacket(&llen); + uint8_t len; - if (len == 4 && _buffer[3] == 0) { + if (_readPacket(&len) == 4 && _buffer[3] == 0) { _lastInActivity = time(NULL); _pingOutstanding = false; return true; } } - _client->stop(); + _client.stop(); } return false; @@ -243,11 +241,11 @@ * @param * @retval */ -uint8_t MQTTClient::_readByte(void) +uint8_t MQTTClient::_readByte() { - while (!_client->available()) { } + while (!_client.available()) { } - return _client->recv(); + return _client.recv(); } /** @@ -256,14 +254,14 @@ * @param * @retval */ -uint16_t MQTTClient::_readPacket(uint8_t* lengthLength) +uint16_t MQTTClient::_readPacket(uint8_t* length) { uint16_t len = 0; _buffer[len++] = _readByte(); bool isPublish = (_buffer[0] & 0xF0) == MQTTPUBLISH; uint32_t multiplier = 1; - uint16_t length = 0; + uint16_t pos = 0; uint8_t digit = 0; uint16_t skip = 0; uint8_t start = 0; @@ -271,16 +269,16 @@ do { digit = _readByte(); _buffer[len++] = digit; - length += (digit & 127) * multiplier; + pos += (digit & 127) * multiplier; multiplier *= 128; } while ((digit & 128) != 0); - *lengthLength = len - 1; + *length = len - 1; if (isPublish) { // Read in topic length to calculate bytes to skip over for Stream writing _buffer[len++] = _readByte(); _buffer[len++] = _readByte(); - skip = (_buffer[*lengthLength + 1] << 8) + _buffer[*lengthLength + 2]; + skip = (_buffer[*length + 1] << 8) + _buffer[*length + 2]; start = 2; if (_buffer[0] & MQTTQOS1) { // skip message id @@ -288,10 +286,10 @@ } } - for (uint16_t i = start; i < length; i++) { + for (uint16_t i = start; i < pos; i++) { digit = _readByte(); if (this->_stream) { - if (isPublish && len -*lengthLength - 2 > skip) { + if (isPublish && len -*length - 2 > skip) { this->_stream->putc(digit); } } @@ -316,61 +314,61 @@ * @param * @retval */ -bool MQTTClient::loop(void) +bool MQTTClient::process() { if (connected()) { - unsigned long t = time(NULL); - if ((t - _lastInActivity > MQTT_KEEPALIVE) || (t - _lastOutActivity > MQTT_KEEPALIVE)) { + time_t now = time(NULL); + if ((now - _lastInActivity > MQTT_KEEPALIVE) || (now - _lastOutActivity > MQTT_KEEPALIVE)) { if (_pingOutstanding) { - _client->stop(); + _client.stop(); return false; } else { _buffer[0] = MQTTPINGREQ; _buffer[1] = 0; - _client->send(_buffer, 2); - _lastOutActivity = t; - _lastInActivity = t; + _client.send(_buffer, 2); + _lastOutActivity = now; + _lastInActivity = now; _pingOutstanding = true; } } - if (_client->available()) { - uint8_t llen; - uint16_t len = _readPacket(&llen); + if (_client.available()) { + uint8_t len; + uint16_t length = _readPacket(&len); uint16_t msgId = 0; uint8_t* payload; - if (len > 0) { - _lastInActivity = t; + if (length > 0) { + _lastInActivity = now; //printf("Here I am\r\n"); uint8_t type = _buffer[0] & 0xF0; if (type == MQTTPUBLISH) { if (_onMessage) { - uint16_t tl = (_buffer[llen + 1] << 8) + _buffer[llen + 2]; - char topic[tl + 1]; - for (uint16_t i = 0; i < tl; i++) { - topic[i] = _buffer[llen + 3 + i]; + uint16_t topicLen = (_buffer[len + 1] << 8) + _buffer[len + 2]; + char topic[topicLen + 1]; + for (uint16_t i = 0; i < topicLen; i++) { + topic[i] = _buffer[len + 3 + i]; } - topic[tl] = 0; + topic[topicLen] = '\0'; // msgId only present for QOS>0 if ((_buffer[0] & 0x06) == MQTTQOS1) { - msgId = (_buffer[llen + 3 + tl] << 8) + _buffer[llen + 3 + tl + 1]; - payload = _buffer + llen + 3 + tl + 2; - _onMessage(topic, payload, len - llen - 3 - tl - 2); + msgId = (_buffer[len + 3 + topicLen] << 8) + _buffer[len + 3 + topicLen + 1]; + payload = _buffer + len + 3 + topicLen + 2; + _onMessage(topic, payload, length - len - 3 - topicLen - 2); _buffer[0] = MQTTPUBACK; _buffer[1] = 2; _buffer[2] = (msgId >> 8); _buffer[3] = (msgId & 0xFF); - _client->send(_buffer, 4); - _lastOutActivity = t; + _client.send(_buffer, 4); + _lastOutActivity = now; } else { - payload = _buffer + llen + 3 + tl; - _onMessage(topic, payload, len - llen - 3 - tl); + payload = _buffer + len + 3 + topicLen; + _onMessage(topic, payload, length - len - 3 - topicLen); } } } @@ -378,7 +376,7 @@ if (type == MQTTPINGREQ) { _buffer[0] = MQTTPINGRESP; _buffer[1] = 0; - _client->send(_buffer, 2); + _client.send(_buffer, 2); } else if (type == MQTTPINGRESP) { @@ -410,9 +408,9 @@ * @param * @retval */ -bool MQTTClient::publish(const char* topic, uint8_t* payload, unsigned int plength) +bool MQTTClient::publish(const char* topic, uint8_t* payload, uint16_t length) { - return publish(topic, payload, plength, false); + return publish(topic, payload, length, false); } /** @@ -421,7 +419,7 @@ * @param * @retval */ -bool MQTTClient::publish(const char* topic, uint8_t* payload, unsigned int plength, bool retained) +bool MQTTClient::publish(const char* topic, uint8_t* payload, uint16_t plength, bool retained) { if (connected()) { // Leave room in the buffer for header and variable length field @@ -452,8 +450,8 @@ */ bool MQTTClient::_write(uint8_t header, uint8_t* buf, uint16_t length) { - uint8_t lenBuf[4]; - uint8_t llen = 0; + uint8_t digitBuf[4]; + uint8_t digitLen = 0; uint8_t digit; uint8_t pos = 0; uint8_t rc; @@ -465,19 +463,19 @@ digit |= 0x80; } - lenBuf[pos++] = digit; - llen++; + digitBuf[pos++] = digit; + digitLen++; } while (len > 0); - buf[4 - llen] = header; - for (int i = 0; i < llen; i++) { - buf[5 - llen + i] = lenBuf[i]; + buf[4 - digitLen] = header; + for (int i = 0; i < digitLen; i++) { + buf[5 - digitLen + i] = digitBuf[i]; } - rc = _client->send(buf + (4 - llen), length + 1 + llen); + rc = _client.send(buf + (4 - digitLen), length + 1 + digitLen); _lastOutActivity = time(NULL); - return(rc == 1 + llen + length); + return(rc == 1 + digitLen + length); } /** @@ -552,12 +550,12 @@ * @param * @retval */ -void MQTTClient::disconnect(void) +void MQTTClient::disconnect() { _buffer[0] = MQTTDISCONNECT; _buffer[1] = 0; - _client->send(_buffer, 2); - _client->stop(); + _client.send(_buffer, 2); + _client.stop(); _lastInActivity = _lastOutActivity = time(NULL); } @@ -567,20 +565,20 @@ * @param * @retval */ -uint16_t MQTTClient::_writeString(const char* string, uint8_t* buf, uint16_t pos) +uint16_t MQTTClient::_writeString(const char* string, uint8_t* buf, uint16_t length) { char* idp = (char*)string; uint16_t i = 0; - pos += 2; + length += 2; while (*idp) { - buf[pos++] = *idp++; + buf[length++] = *idp++; i++; } - buf[pos - i - 2] = (i >> 8); - buf[pos - i - 1] = (i & 0xFF); - return pos; + buf[length - i - 2] = (i >> 8); + buf[length - i - 1] = (i & 0xFF); + return length; } /** @@ -589,17 +587,12 @@ * @param * @retval */ -bool MQTTClient::connected(void) +bool MQTTClient::connected() { - bool rc; - if (_client == NULL) { - rc = false; - } - else { - rc = (int)_client->connected(); - if (!rc) - _client->stop(); - } + bool rc = (int)_client.connected(); + + if (!rc) + _client.stop(); return rc; }