Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Diff: MQTTClient.cpp
- Revision:
- 4:8620de6d1696
- Parent:
- 3:5e31b4687aad
- Child:
- 5:361a6987739b
diff -r 5e31b4687aad -r 8620de6d1696 MQTTClient.cpp
--- a/MQTTClient.cpp Tue Sep 03 13:41:16 2019 +0000
+++ b/MQTTClient.cpp Thu Sep 12 20:26:40 2019 +0000
@@ -15,7 +15,7 @@
* @param
* @retval
*/
-MQTTClient::MQTTClient(void) :
+MQTTClient::MQTTClient() :
_client(NULL),
_stream(NULL)
{ }
@@ -30,10 +30,8 @@
(
IpAddress& ip,
uint16_t port,
- void (*onMessage) (char*, uint8_t*, unsigned int),
- TcpClient& client
+ Callback<void (char*, uint8_t*, uint16_t)> onMessage
) :
- _client(&client),
_ip(ip),
_domain(NULL),
_port(port),
@@ -49,12 +47,10 @@
*/
MQTTClient::MQTTClient
(
- const char* domain,
+ const char* domain,
uint16_t port,
- void (*onMessage) (char*, uint8_t*, unsigned int),
- TcpClient& client
+ Callback<void (char*, uint8_t*, uint16_t)> onMessage
) :
- _client(&client),
_domain((char*)domain),
_port(port),
_stream(NULL),
@@ -71,11 +67,9 @@
(
IpAddress& ip,
uint16_t port,
- void (*onMessage) (char*, uint8_t*, unsigned int),
- TcpClient& client,
+ Callback<void (char*, uint8_t*, uint16_t)> onMessage,
Stream& stream
) :
- _client(&client),
_ip(ip),
_domain(NULL),
_port(port),
@@ -93,11 +87,9 @@
(
const char* domain,
uint16_t port,
- void (*onMessage) (char*, uint8_t*, unsigned int),
- TcpClient& client,
+ Callback<void (char*, uint8_t*, uint16_t)> onMessage,
Stream& stream
) :
- _client(&client),
_domain((char*)domain),
_port(port),
_stream(&stream),
@@ -132,7 +124,14 @@
* @param
* @retval
*/
-bool MQTTClient::connect(const char* id, const char* willTopic, uint8_t willQos, uint8_t willRetain, const char* willMessage)
+bool MQTTClient::connect
+(
+ const char* id,
+ const char* willTopic,
+ uint8_t willQos,
+ uint8_t willRetain,
+ const char* willMessage
+)
{
return connect(id, NULL, NULL, willTopic, willQos, willRetain, willMessage);
}
@@ -145,35 +144,35 @@
*/
bool MQTTClient::connect
(
- const char* id,
- const char* user,
- const char* pass,
- const char* willTopic,
- uint8_t willQos,
- uint8_t willRetain,
- const char* willMessage
+ const char* id,
+ const char* user,
+ const char* pass,
+ const char* willTopic,
+ uint8_t willQos,
+ uint8_t willRetain,
+ const char* willMessage
)
{
if (!connected()) {
int result = 0;
if (_domain != NULL) {
- result = !_client->connect(this->_domain, this->_port);
+ result = !_client.connect(this->_domain, this->_port);
}
else {
- result = _client->connect(this->_ip, this->_port);
+ result = _client.connect(this->_ip, this->_port);
}
if (result) {
_nextMsgId = 1;
- uint8_t d[9] = { 0x00, 0x06, 'M', 'Q', 'I', 's', 'd', 'p', MQTTPROTOCOLVERSION };
+ uint8_t d[] = { 0x00, 0x06, 'M', 'Q', 'I', 's', 'd', 'p', MQTTPROTOCOLVERSION };
// Leave room in the buffer for header and variable length field
- uint16_t length = 5;
- unsigned int j;
- for (j = 0; j < 9; j++) {
- _buffer[length++] = d[j];
+ uint16_t pos = 5;
+ uint16_t j;
+ for (j = 0; j < sizeof(d); j++) {
+ _buffer[pos++] = d[j];
}
uint8_t v;
@@ -192,46 +191,45 @@
}
}
- _buffer[length++] = v;
+ _buffer[pos++] = v;
- _buffer[length++] = ((MQTT_KEEPALIVE) >> 8);
- _buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF);
- length = _writeString(id, _buffer, length);
+ _buffer[pos++] = ((MQTT_KEEPALIVE) >> 8);
+ _buffer[pos++] = ((MQTT_KEEPALIVE) & 0xFF);
+ pos = _writeString(id, _buffer, pos);
if (willTopic) {
- length = _writeString(willTopic, _buffer, length);
- length = _writeString(willMessage, _buffer, length);
+ pos = _writeString(willTopic, _buffer, pos);
+ pos = _writeString(willMessage, _buffer, pos);
}
if (user != NULL) {
- length = _writeString(user, _buffer, length);
+ pos = _writeString(user, _buffer, pos);
if (pass != NULL) {
- length = _writeString(pass, _buffer, length);
+ pos = _writeString(pass, _buffer, pos);
}
}
- _write(MQTTCONNECT, _buffer, length - 5);
+ _write(MQTTCONNECT, _buffer, pos - 5);
_lastInActivity = _lastOutActivity = time(NULL);
- while (!_client->available()) {
+ while (!_client.available()) {
unsigned long t = time(NULL);
if (t - _lastInActivity > MQTT_KEEPALIVE) {
- _client->stop();
+ _client.stop();
return false;
}
}
- uint8_t llen;
- uint16_t len = _readPacket(&llen);
+ uint8_t len;
- if (len == 4 && _buffer[3] == 0) {
+ if (_readPacket(&len) == 4 && _buffer[3] == 0) {
_lastInActivity = time(NULL);
_pingOutstanding = false;
return true;
}
}
- _client->stop();
+ _client.stop();
}
return false;
@@ -243,11 +241,11 @@
* @param
* @retval
*/
-uint8_t MQTTClient::_readByte(void)
+uint8_t MQTTClient::_readByte()
{
- while (!_client->available()) { }
+ while (!_client.available()) { }
- return _client->recv();
+ return _client.recv();
}
/**
@@ -256,14 +254,14 @@
* @param
* @retval
*/
-uint16_t MQTTClient::_readPacket(uint8_t* lengthLength)
+uint16_t MQTTClient::_readPacket(uint8_t* length)
{
uint16_t len = 0;
_buffer[len++] = _readByte();
bool isPublish = (_buffer[0] & 0xF0) == MQTTPUBLISH;
uint32_t multiplier = 1;
- uint16_t length = 0;
+ uint16_t pos = 0;
uint8_t digit = 0;
uint16_t skip = 0;
uint8_t start = 0;
@@ -271,16 +269,16 @@
do {
digit = _readByte();
_buffer[len++] = digit;
- length += (digit & 127) * multiplier;
+ pos += (digit & 127) * multiplier;
multiplier *= 128;
} while ((digit & 128) != 0);
- *lengthLength = len - 1;
+ *length = len - 1;
if (isPublish) {
// Read in topic length to calculate bytes to skip over for Stream writing
_buffer[len++] = _readByte();
_buffer[len++] = _readByte();
- skip = (_buffer[*lengthLength + 1] << 8) + _buffer[*lengthLength + 2];
+ skip = (_buffer[*length + 1] << 8) + _buffer[*length + 2];
start = 2;
if (_buffer[0] & MQTTQOS1) {
// skip message id
@@ -288,10 +286,10 @@
}
}
- for (uint16_t i = start; i < length; i++) {
+ for (uint16_t i = start; i < pos; i++) {
digit = _readByte();
if (this->_stream) {
- if (isPublish && len -*lengthLength - 2 > skip) {
+ if (isPublish && len -*length - 2 > skip) {
this->_stream->putc(digit);
}
}
@@ -316,61 +314,61 @@
* @param
* @retval
*/
-bool MQTTClient::loop(void)
+bool MQTTClient::process()
{
if (connected()) {
- unsigned long t = time(NULL);
- if ((t - _lastInActivity > MQTT_KEEPALIVE) || (t - _lastOutActivity > MQTT_KEEPALIVE)) {
+ time_t now = time(NULL);
+ if ((now - _lastInActivity > MQTT_KEEPALIVE) || (now - _lastOutActivity > MQTT_KEEPALIVE)) {
if (_pingOutstanding) {
- _client->stop();
+ _client.stop();
return false;
}
else {
_buffer[0] = MQTTPINGREQ;
_buffer[1] = 0;
- _client->send(_buffer, 2);
- _lastOutActivity = t;
- _lastInActivity = t;
+ _client.send(_buffer, 2);
+ _lastOutActivity = now;
+ _lastInActivity = now;
_pingOutstanding = true;
}
}
- if (_client->available()) {
- uint8_t llen;
- uint16_t len = _readPacket(&llen);
+ if (_client.available()) {
+ uint8_t len;
+ uint16_t length = _readPacket(&len);
uint16_t msgId = 0;
uint8_t* payload;
- if (len > 0) {
- _lastInActivity = t;
+ if (length > 0) {
+ _lastInActivity = now;
//printf("Here I am\r\n");
uint8_t type = _buffer[0] & 0xF0;
if (type == MQTTPUBLISH) {
if (_onMessage) {
- uint16_t tl = (_buffer[llen + 1] << 8) + _buffer[llen + 2];
- char topic[tl + 1];
- for (uint16_t i = 0; i < tl; i++) {
- topic[i] = _buffer[llen + 3 + i];
+ uint16_t topicLen = (_buffer[len + 1] << 8) + _buffer[len + 2];
+ char topic[topicLen + 1];
+ for (uint16_t i = 0; i < topicLen; i++) {
+ topic[i] = _buffer[len + 3 + i];
}
- topic[tl] = 0;
+ topic[topicLen] = '\0';
// msgId only present for QOS>0
if ((_buffer[0] & 0x06) == MQTTQOS1) {
- msgId = (_buffer[llen + 3 + tl] << 8) + _buffer[llen + 3 + tl + 1];
- payload = _buffer + llen + 3 + tl + 2;
- _onMessage(topic, payload, len - llen - 3 - tl - 2);
+ msgId = (_buffer[len + 3 + topicLen] << 8) + _buffer[len + 3 + topicLen + 1];
+ payload = _buffer + len + 3 + topicLen + 2;
+ _onMessage(topic, payload, length - len - 3 - topicLen - 2);
_buffer[0] = MQTTPUBACK;
_buffer[1] = 2;
_buffer[2] = (msgId >> 8);
_buffer[3] = (msgId & 0xFF);
- _client->send(_buffer, 4);
- _lastOutActivity = t;
+ _client.send(_buffer, 4);
+ _lastOutActivity = now;
}
else {
- payload = _buffer + llen + 3 + tl;
- _onMessage(topic, payload, len - llen - 3 - tl);
+ payload = _buffer + len + 3 + topicLen;
+ _onMessage(topic, payload, length - len - 3 - topicLen);
}
}
}
@@ -378,7 +376,7 @@
if (type == MQTTPINGREQ) {
_buffer[0] = MQTTPINGRESP;
_buffer[1] = 0;
- _client->send(_buffer, 2);
+ _client.send(_buffer, 2);
}
else
if (type == MQTTPINGRESP) {
@@ -410,9 +408,9 @@
* @param
* @retval
*/
-bool MQTTClient::publish(const char* topic, uint8_t* payload, unsigned int plength)
+bool MQTTClient::publish(const char* topic, uint8_t* payload, uint16_t length)
{
- return publish(topic, payload, plength, false);
+ return publish(topic, payload, length, false);
}
/**
@@ -421,7 +419,7 @@
* @param
* @retval
*/
-bool MQTTClient::publish(const char* topic, uint8_t* payload, unsigned int plength, bool retained)
+bool MQTTClient::publish(const char* topic, uint8_t* payload, uint16_t plength, bool retained)
{
if (connected()) {
// Leave room in the buffer for header and variable length field
@@ -452,8 +450,8 @@
*/
bool MQTTClient::_write(uint8_t header, uint8_t* buf, uint16_t length)
{
- uint8_t lenBuf[4];
- uint8_t llen = 0;
+ uint8_t digitBuf[4];
+ uint8_t digitLen = 0;
uint8_t digit;
uint8_t pos = 0;
uint8_t rc;
@@ -465,19 +463,19 @@
digit |= 0x80;
}
- lenBuf[pos++] = digit;
- llen++;
+ digitBuf[pos++] = digit;
+ digitLen++;
} while (len > 0);
- buf[4 - llen] = header;
- for (int i = 0; i < llen; i++) {
- buf[5 - llen + i] = lenBuf[i];
+ buf[4 - digitLen] = header;
+ for (int i = 0; i < digitLen; i++) {
+ buf[5 - digitLen + i] = digitBuf[i];
}
- rc = _client->send(buf + (4 - llen), length + 1 + llen);
+ rc = _client.send(buf + (4 - digitLen), length + 1 + digitLen);
_lastOutActivity = time(NULL);
- return(rc == 1 + llen + length);
+ return(rc == 1 + digitLen + length);
}
/**
@@ -552,12 +550,12 @@
* @param
* @retval
*/
-void MQTTClient::disconnect(void)
+void MQTTClient::disconnect()
{
_buffer[0] = MQTTDISCONNECT;
_buffer[1] = 0;
- _client->send(_buffer, 2);
- _client->stop();
+ _client.send(_buffer, 2);
+ _client.stop();
_lastInActivity = _lastOutActivity = time(NULL);
}
@@ -567,20 +565,20 @@
* @param
* @retval
*/
-uint16_t MQTTClient::_writeString(const char* string, uint8_t* buf, uint16_t pos)
+uint16_t MQTTClient::_writeString(const char* string, uint8_t* buf, uint16_t length)
{
char* idp = (char*)string;
uint16_t i = 0;
- pos += 2;
+ length += 2;
while (*idp) {
- buf[pos++] = *idp++;
+ buf[length++] = *idp++;
i++;
}
- buf[pos - i - 2] = (i >> 8);
- buf[pos - i - 1] = (i & 0xFF);
- return pos;
+ buf[length - i - 2] = (i >> 8);
+ buf[length - i - 1] = (i & 0xFF);
+ return length;
}
/**
@@ -589,17 +587,12 @@
* @param
* @retval
*/
-bool MQTTClient::connected(void)
+bool MQTTClient::connected()
{
- bool rc;
- if (_client == NULL) {
- rc = false;
- }
- else {
- rc = (int)_client->connected();
- if (!rc)
- _client->stop();
- }
+ bool rc = (int)_client.connected();
+
+ if (!rc)
+ _client.stop();
return rc;
}