123
Diff: MQTTClient.cpp
- Revision:
- 2:ca17c5d846e7
- Parent:
- 1:87da395325fc
- Child:
- 3:5e31b4687aad
diff -r 87da395325fc -r ca17c5d846e7 MQTTClient.cpp --- a/MQTTClient.cpp Mon Sep 15 12:48:55 2014 +0000 +++ b/MQTTClient.cpp Sun Nov 29 18:12:22 2015 +0000 @@ -2,22 +2,37 @@ MQTTClient.cpp - A simple client for MQTT. Nicholas O'Leary http://knolleary.net - + Ported to mbed by Zoltan Hudak <hudakz@inbox.com> */ - #include "MQTTClient.h" #include <string.h> #include <time.h> -MQTTClient::MQTTClient() -{ +/** + * @brief + * @note + * @param + * @retval + */ +MQTTClient::MQTTClient(void) { this->_client = NULL; this->stream = NULL; } -MQTTClient::MQTTClient(IPAddress& ip, uint16_t port, void (*onMessage)(char*,uint8_t*,unsigned int), Client& client) -{ +/** + * @brief + * @note + * @param + * @retval + */ +MQTTClient::MQTTClient +( + IPAddress& ip, + uint16_t port, + void (*onMessage) (char*, uint8_t*, unsigned int), + Client& client +) { this->_client = &client; this->onMessage = onMessage; this->ip = ip; @@ -26,8 +41,13 @@ this->stream = NULL; } -MQTTClient::MQTTClient(char* domain, uint16_t port, void (*onMessage)(char*,uint8_t*,unsigned int), Client& client) -{ +/** + * @brief + * @note + * @param + * @retval + */ +MQTTClient::MQTTClient(char* domain, uint16_t port, void (*onMessage) (char*, uint8_t*, unsigned int), Client& client) { this->_client = &client; this->onMessage = onMessage; this->domain = domain; @@ -35,8 +55,20 @@ this->stream = NULL; } -MQTTClient::MQTTClient(IPAddress& ip, uint16_t port, void (*onMessage)(char*,uint8_t*,unsigned int), Client& client, Stream& stream) -{ +/** + * @brief + * @note + * @param + * @retval + */ +MQTTClient::MQTTClient +( + IPAddress& ip, + uint16_t port, + void (*onMessage) (char*, uint8_t*, unsigned int), + Client& client, + Stream& stream +) { this->_client = &client; this->onMessage = onMessage; this->ip = ip; @@ -45,8 +77,20 @@ this->stream = &stream; } -MQTTClient::MQTTClient(char* domain, uint16_t port, void (*onMessage)(char*,uint8_t*,unsigned int), Client& client, Stream& stream) -{ +/** + * @brief + * @note + * @param + * @retval + */ +MQTTClient::MQTTClient +( + char* domain, + uint16_t port, + void (*onMessage) (char*, uint8_t*, unsigned int), + Client& client, + Stream& stream +) { this->_client = &client; this->onMessage = onMessage; this->domain = domain; @@ -54,54 +98,87 @@ this->stream = &stream; } -bool MQTTClient::connect(char *id) -{ - return connect(id,NULL,NULL,0,0,0,0); +/** + * @brief + * @note + * @param + * @retval + */ +bool MQTTClient::connect(char* id) { + return connect(id, NULL, NULL, 0, 0, 0, 0); } -bool MQTTClient::connect(char *id, char *user, char *pass) -{ - return connect(id,user,pass,0,0,0,0); +/** + * @brief + * @note + * @param + * @retval + */ +bool MQTTClient::connect(char* id, char* user, char* pass) { + return connect(id, user, pass, 0, 0, 0, 0); } -bool MQTTClient::connect(char *id, char* willTopic, uint8_t willQos, uint8_t willRetain, char* willMessage) -{ - return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage); +/** + * @brief + * @note + * @param + * @retval + */ +bool MQTTClient::connect(char* id, char* willTopic, uint8_t willQos, uint8_t willRetain, char* willMessage) { + return connect(id, NULL, NULL, willTopic, willQos, willRetain, willMessage); } -bool MQTTClient::connect(char *id, char *user, char *pass, char* willTopic, uint8_t willQos, uint8_t willRetain, char* willMessage) -{ - if (!connected()) { +/** + * @brief + * @note + * @param + * @retval + */ +bool MQTTClient::connect +( + char* id, + char* user, + char* pass, + char* willTopic, + uint8_t willQos, + uint8_t willRetain, + char* willMessage +) { + if(!connected()) { int result = 0; - if (domain != NULL) { + if(domain != NULL) { result = _client->connect(this->domain, this->port); - } else { + } + else { result = _client->connect(this->ip, this->port); } - if (result) { + if(result) { nextMsgId = 1; - uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p',MQTTPROTOCOLVERSION}; + + uint8_t d[9] = { 0x00, 0x06, 'M', 'Q', 'I', 's', 'd', 'p', MQTTPROTOCOLVERSION }; + // Leave room in the buffer for header and variable length field - uint16_t length = 5; - unsigned int j; - for (j = 0; j<9; j++) { + uint16_t length = 5; + unsigned int j; + for(j = 0; j < 9; j++) { buffer[length++] = d[j]; } uint8_t v; - if (willTopic) { - v = 0x06|(willQos<<3)|(willRetain<<5); - } else { + if(willTopic) { + v = 0x06 | (willQos << 3) | (willRetain << 5); + } + else { v = 0x02; } if(user != NULL) { - v = v|0x80; + v = v | 0x80; if(pass != NULL) { - v = v|(0x80>>1); + v = v | (0x80 >> 1); } } @@ -109,301 +186,402 @@ buffer[length++] = ((MQTT_KEEPALIVE) >> 8); buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF); - length = writeString(id,buffer,length); - if (willTopic) { - length = writeString(willTopic,buffer,length); - length = writeString(willMessage,buffer,length); + length = writeString(id, buffer, length); + if(willTopic) { + length = writeString(willTopic, buffer, length); + length = writeString(willMessage, buffer, length); } if(user != NULL) { - length = writeString(user,buffer,length); + length = writeString(user, buffer, length); if(pass != NULL) { - length = writeString(pass,buffer,length); + length = writeString(pass, buffer, length); } } - write(MQTTCONNECT,buffer,length-5); + write(MQTTCONNECT, buffer, length - 5); lastInActivity = lastOutActivity = time(NULL); - while (!_client->available()) { - time_t t = time(NULL); - if (t-lastInActivity > MQTT_KEEPALIVE) { + while(!_client->available()) { + unsigned long t = time(NULL); + if(t - lastInActivity > MQTT_KEEPALIVE) { _client->stop(); return false; } } - uint8_t llen; - uint16_t len = readPacket(&llen); - if (len == 4 && buffer[3] == 0) { + uint8_t llen; + uint16_t len = readPacket(&llen); + + if(len == 4 && buffer[3] == 0) { lastInActivity = time(NULL); pingOutstanding = false; return true; } } + _client->stop(); } + return false; } -uint8_t MQTTClient::readByte() -{ - while(!_client->available()) {} +/** + * @brief + * @note + * @param + * @retval + */ +uint8_t MQTTClient::readByte(void) { + while(!_client->available()) { } + return _client->read(); } -uint16_t MQTTClient::readPacket(uint8_t* lengthLength) -{ - uint16_t len = 0; +/** + * @brief + * @note + * @param + * @retval + */ +uint16_t MQTTClient::readPacket(uint8_t* lengthLength) { + uint16_t len = 0; buffer[len++] = readByte(); - bool isPublish = (buffer[0]&0xF0) == MQTTPUBLISH; - uint32_t multiplier = 1; - uint16_t length = 0; - uint8_t digit = 0; - uint16_t skip = 0; - uint8_t start = 0; - do { + bool isPublish = (buffer[0] & 0xF0) == MQTTPUBLISH; + uint32_t multiplier = 1; + uint16_t length = 0; + uint8_t digit = 0; + uint16_t skip = 0; + uint8_t start = 0; + + do + { digit = readByte(); buffer[len++] = digit; length += (digit & 127) * multiplier; multiplier *= 128; - } while ((digit & 128) != 0); - *lengthLength = len-1; + } while((digit & 128) != 0); + *lengthLength = len - 1; - if (isPublish) { + if(isPublish) { + // Read in topic length to calculate bytes to skip over for Stream writing buffer[len++] = readByte(); buffer[len++] = readByte(); - skip = (buffer[*lengthLength+1]<<8)+buffer[*lengthLength+2]; + skip = (buffer[*lengthLength + 1] << 8) + buffer[*lengthLength + 2]; start = 2; - if (buffer[0]&MQTTQOS1) { + if(buffer[0] & MQTTQOS1) { + // skip message id skip += 2; } } - for (uint16_t i = start; i<length; i++) { + for(uint16_t i = start; i < length; i++) { digit = readByte(); - if (this->stream) { - if (isPublish && len-*lengthLength-2>skip) { + if(this->stream) { + if(isPublish && len -*lengthLength - 2 > skip) { this->stream->putc(digit); } } - if (len < MQTT_MAX_PACKET_SIZE) { + + if(len < MQTT_MAX_PACKET_SIZE) { buffer[len] = digit; } + len++; } - if (!this->stream && len > MQTT_MAX_PACKET_SIZE) { - len = 0; // This will cause the packet to be ignored. + if(!this->stream && len > MQTT_MAX_PACKET_SIZE) { + len = 0; // This will cause the packet to be ignored. } return len; } -bool MQTTClient::loop() -{ - if (connected()) { - time_t t = time(NULL); - if ((t - lastInActivity > MQTT_KEEPALIVE) || (t - lastOutActivity > MQTT_KEEPALIVE)) { - if (pingOutstanding) { +/** + * @brief + * @note + * @param + * @retval + */ +bool MQTTClient::loop(void) { + if(connected()) { + unsigned long t = time(NULL); + if((t - lastInActivity > MQTT_KEEPALIVE) || (t - lastOutActivity > MQTT_KEEPALIVE)) { + if(pingOutstanding) { _client->stop(); return false; - } else { + } + else { buffer[0] = MQTTPINGREQ; buffer[1] = 0; - _client->write(buffer,2); + _client->write(buffer, 2); lastOutActivity = t; lastInActivity = t; pingOutstanding = true; } } - if (_client->available()) { - uint8_t llen; - uint16_t len = readPacket(&llen); - uint16_t msgId = 0; - uint8_t *payload; - if (len > 0) { + + if(_client->available()) { + uint8_t llen; + uint16_t len = readPacket(&llen); + uint16_t msgId = 0; + uint8_t* payload; + if(len > 0) { lastInActivity = t; - uint8_t type = buffer[0]&0xF0; - if (type == MQTTPUBLISH) { - if (onMessage) { - uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2]; - char topic[tl+1]; - for (uint16_t i=0; i<tl; i++) { - topic[i] = buffer[llen+3+i]; + + uint8_t type = buffer[0] & 0xF0; + if(type == MQTTPUBLISH) { + if(onMessage) { + uint16_t tl = (buffer[llen + 1] << 8) + buffer[llen + 2]; + char topic[tl + 1]; + for(uint16_t i = 0; i < tl; i++) { + topic[i] = buffer[llen + 3 + i]; } + topic[tl] = 0; + // msgId only present for QOS>0 - if ((buffer[0]&0x06) == MQTTQOS1) { - msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1]; - payload = buffer+llen+3+tl+2; - onMessage(topic,payload,len-llen-3-tl-2); + if((buffer[0] & 0x06) == MQTTQOS1) { + msgId = (buffer[llen + 3 + tl] << 8) + buffer[llen + 3 + tl + 1]; + payload = buffer + llen + 3 + tl + 2; + onMessage(topic, payload, len - llen - 3 - tl - 2); buffer[0] = MQTTPUBACK; buffer[1] = 2; buffer[2] = (msgId >> 8); buffer[3] = (msgId & 0xFF); - _client->write(buffer,4); + _client->write(buffer, 4); lastOutActivity = t; - - } else { - payload = buffer+llen+3+tl; - onMessage(topic,payload,len-llen-3-tl); + } + else { + payload = buffer + llen + 3 + tl; + onMessage(topic, payload, len - llen - 3 - tl); } } - } else if (type == MQTTPINGREQ) { + } + else + if(type == MQTTPINGREQ) { buffer[0] = MQTTPINGRESP; buffer[1] = 0; - _client->write(buffer,2); - } else if (type == MQTTPINGRESP) { + _client->write(buffer, 2); + } + else + if(type == MQTTPINGRESP) { pingOutstanding = false; } } } + return true; } + return false; } -bool MQTTClient::publish(char* topic, char* payload) -{ - return publish(topic,(uint8_t*)payload,strlen(payload),false); +/** + * @brief + * @note + * @param + * @retval + */ +bool MQTTClient::publish(char* topic, char* payload) { + return publish(topic, (uint8_t*)payload, strlen(payload), false); } -bool MQTTClient::publish(char* topic, uint8_t* payload, unsigned int plength) -{ +/** + * @brief + * @note + * @param + * @retval + */ +bool MQTTClient::publish(char* topic, uint8_t* payload, unsigned int plength) { return publish(topic, payload, plength, false); } -bool MQTTClient::publish(char* topic, uint8_t* payload, unsigned int plength, bool retained) -{ - if (connected()) { +/** + * @brief + * @note + * @param + * @retval + */ +bool MQTTClient::publish(char* topic, uint8_t* payload, unsigned int plength, bool retained) { + if(connected()) { + // Leave room in the buffer for header and variable length field - uint16_t length = 5; - length = writeString(topic,buffer,length); - uint16_t i; - for (i=0; i<plength; i++) { + uint16_t length = 5; + length = writeString(topic, buffer, length); + + uint16_t i; + for(i = 0; i < plength; i++) { buffer[length++] = payload[i]; } + uint8_t header = MQTTPUBLISH; - if (retained) { + if(retained) { header |= 1; } - return write(header,buffer,length-5); + + return write(header, buffer, length - 5); } + return false; } -bool MQTTClient::write(uint8_t header, uint8_t* buf, uint16_t length) -{ +/** + * @brief + * @note + * @param + * @retval + */ +bool MQTTClient::write(uint8_t header, uint8_t* buf, uint16_t length) { uint8_t lenBuf[4]; uint8_t llen = 0; uint8_t digit; uint8_t pos = 0; uint8_t rc; uint8_t len = length; - do { + do + { digit = len % 128; len = len / 128; - if (len > 0) { + if(len > 0) { digit |= 0x80; } + lenBuf[pos++] = digit; llen++; - } while(len>0); + } while(len > 0); - buf[4-llen] = header; - for (int i=0; i<llen; i++) { - buf[5-llen+i] = lenBuf[i]; + buf[4 - llen] = header; + for(int i = 0; i < llen; i++) { + buf[5 - llen + i] = lenBuf[i]; } - rc = _client->write(buf+(4-llen),length+1+llen); + + rc = _client->write(buf + (4 - llen), length + 1 + llen); lastOutActivity = time(NULL); - return (rc == 1+llen+length); + return(rc == 1 + llen + length); } -bool MQTTClient::subscribe(char* topic) -{ - return subscribe(topic, 0); +/** + * @brief + * @note + * @param + * @retval + */ +bool MQTTClient::subscribe(char* topic) { + bool result = subscribe(topic, 0); + wait_ms(50); + return result; } -bool MQTTClient::subscribe(char* topic, uint8_t qos) -{ - if (qos > 1) +/** + * @brief + * @note + * @param + * @retval + */ +bool MQTTClient::subscribe(char* topic, uint8_t qos) { + if(qos > 1) return false; - if (connected()) { + if(connected()) { + // Leave room in the buffer for header and variable length field - uint16_t length = 5; + uint16_t length = 5; nextMsgId++; - if (nextMsgId == 0) { + if(nextMsgId == 0) { nextMsgId = 1; } + buffer[length++] = (nextMsgId >> 8); buffer[length++] = (nextMsgId & 0xFF); - length = writeString(topic, buffer,length); + length = writeString(topic, buffer, length); buffer[length++] = qos; - return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5); + return write(MQTTSUBSCRIBE | MQTTQOS1, buffer, length - 5); } + return false; } -bool MQTTClient::unsubscribe(char* topic) -{ - if (connected()) { - uint16_t length = 5; +/** + * @brief + * @note + * @param + * @retval + */ +bool MQTTClient::unsubscribe(char* topic) { + if(connected()) { + uint16_t length = 5; nextMsgId++; - if (nextMsgId == 0) { + if(nextMsgId == 0) { nextMsgId = 1; } + buffer[length++] = (nextMsgId >> 8); buffer[length++] = (nextMsgId & 0xFF); - length = writeString(topic, buffer,length); - return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5); + length = writeString(topic, buffer, length); + return write(MQTTUNSUBSCRIBE | MQTTQOS1, buffer, length - 5); } + return false; } -void MQTTClient::disconnect() -{ +/** + * @brief + * @note + * @param + * @retval + */ +void MQTTClient::disconnect(void) { buffer[0] = MQTTDISCONNECT; buffer[1] = 0; - _client->write(buffer,2); + _client->write(buffer, 2); _client->stop(); lastInActivity = lastOutActivity = time(NULL); } -uint16_t MQTTClient::writeString(char* string, uint8_t* buf, uint16_t pos) -{ - char* idp = string; - uint16_t i = 0; +/** + * @brief + * @note + * @param + * @retval + */ +uint16_t MQTTClient::writeString(char* string, uint8_t* buf, uint16_t pos) { + char* idp = string; + uint16_t i = 0; pos += 2; - while (*idp) { + while(*idp) { buf[pos++] = *idp++; i++; } - buf[pos-i-2] = (i >> 8); - buf[pos-i-1] = (i & 0xFF); + + buf[pos - i - 2] = (i >> 8); + buf[pos - i - 1] = (i & 0xFF); return pos; } +/** + * @brief + * @note + * @param + * @retval + */ +bool MQTTClient::connected(void) { + bool rc; + if(_client == NULL) { + rc = false; + } + else { + rc = (int)_client->connected(); + if(!rc) + _client->stop(); + } -bool MQTTClient::connected() -{ - bool rc; - if (_client == NULL ) { - rc = false; - } else { - rc = (int)_client->connected(); - if (!rc) _client->stop(); - } return rc; } - - -