123
MQTTClient.cpp
- Committer:
- hudakz
- Date:
- 2014-09-15
- Revision:
- 1:87da395325fc
- Parent:
- 0:83c732b10e95
- Child:
- 2:ca17c5d846e7
File content as of revision 1:87da395325fc:
/* MQTTClient.cpp - A simple client for MQTT. Nicholas O'Leary http://knolleary.net Ported to mbed by Zoltan Hudak <hudakz@inbox.com> */ #include "MQTTClient.h" #include <string.h> #include <time.h> MQTTClient::MQTTClient() { this->_client = NULL; this->stream = NULL; } MQTTClient::MQTTClient(IPAddress& ip, uint16_t port, void (*onMessage)(char*,uint8_t*,unsigned int), Client& client) { this->_client = &client; this->onMessage = onMessage; this->ip = ip; this->port = port; this->domain = NULL; this->stream = NULL; } MQTTClient::MQTTClient(char* domain, uint16_t port, void (*onMessage)(char*,uint8_t*,unsigned int), Client& client) { this->_client = &client; this->onMessage = onMessage; this->domain = domain; this->port = port; this->stream = NULL; } MQTTClient::MQTTClient(IPAddress& ip, uint16_t port, void (*onMessage)(char*,uint8_t*,unsigned int), Client& client, Stream& stream) { this->_client = &client; this->onMessage = onMessage; this->ip = ip; this->port = port; this->domain = NULL; this->stream = &stream; } MQTTClient::MQTTClient(char* domain, uint16_t port, void (*onMessage)(char*,uint8_t*,unsigned int), Client& client, Stream& stream) { this->_client = &client; this->onMessage = onMessage; this->domain = domain; this->port = port; this->stream = &stream; } bool MQTTClient::connect(char *id) { return connect(id,NULL,NULL,0,0,0,0); } bool MQTTClient::connect(char *id, char *user, char *pass) { return connect(id,user,pass,0,0,0,0); } bool MQTTClient::connect(char *id, char* willTopic, uint8_t willQos, uint8_t willRetain, char* willMessage) { return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage); } bool MQTTClient::connect(char *id, char *user, char *pass, char* willTopic, uint8_t willQos, uint8_t willRetain, char* willMessage) { if (!connected()) { int result = 0; if (domain != NULL) { result = _client->connect(this->domain, this->port); } else { result = _client->connect(this->ip, this->port); } if (result) { nextMsgId = 1; uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p',MQTTPROTOCOLVERSION}; // Leave room in the buffer for header and variable length field uint16_t length = 5; unsigned int j; for (j = 0; j<9; j++) { buffer[length++] = d[j]; } uint8_t v; if (willTopic) { v = 0x06|(willQos<<3)|(willRetain<<5); } else { v = 0x02; } if(user != NULL) { v = v|0x80; if(pass != NULL) { v = v|(0x80>>1); } } buffer[length++] = v; buffer[length++] = ((MQTT_KEEPALIVE) >> 8); buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF); length = writeString(id,buffer,length); if (willTopic) { length = writeString(willTopic,buffer,length); length = writeString(willMessage,buffer,length); } if(user != NULL) { length = writeString(user,buffer,length); if(pass != NULL) { length = writeString(pass,buffer,length); } } write(MQTTCONNECT,buffer,length-5); lastInActivity = lastOutActivity = time(NULL); while (!_client->available()) { time_t t = time(NULL); if (t-lastInActivity > MQTT_KEEPALIVE) { _client->stop(); return false; } } uint8_t llen; uint16_t len = readPacket(&llen); if (len == 4 && buffer[3] == 0) { lastInActivity = time(NULL); pingOutstanding = false; return true; } } _client->stop(); } return false; } uint8_t MQTTClient::readByte() { while(!_client->available()) {} return _client->read(); } uint16_t MQTTClient::readPacket(uint8_t* lengthLength) { uint16_t len = 0; buffer[len++] = readByte(); bool isPublish = (buffer[0]&0xF0) == MQTTPUBLISH; uint32_t multiplier = 1; uint16_t length = 0; uint8_t digit = 0; uint16_t skip = 0; uint8_t start = 0; do { digit = readByte(); buffer[len++] = digit; length += (digit & 127) * multiplier; multiplier *= 128; } while ((digit & 128) != 0); *lengthLength = len-1; if (isPublish) { // Read in topic length to calculate bytes to skip over for Stream writing buffer[len++] = readByte(); buffer[len++] = readByte(); skip = (buffer[*lengthLength+1]<<8)+buffer[*lengthLength+2]; start = 2; if (buffer[0]&MQTTQOS1) { // skip message id skip += 2; } } for (uint16_t i = start; i<length; i++) { digit = readByte(); if (this->stream) { if (isPublish && len-*lengthLength-2>skip) { this->stream->putc(digit); } } if (len < MQTT_MAX_PACKET_SIZE) { buffer[len] = digit; } len++; } if (!this->stream && len > MQTT_MAX_PACKET_SIZE) { len = 0; // This will cause the packet to be ignored. } return len; } bool MQTTClient::loop() { if (connected()) { time_t t = time(NULL); if ((t - lastInActivity > MQTT_KEEPALIVE) || (t - lastOutActivity > MQTT_KEEPALIVE)) { if (pingOutstanding) { _client->stop(); return false; } else { buffer[0] = MQTTPINGREQ; buffer[1] = 0; _client->write(buffer,2); lastOutActivity = t; lastInActivity = t; pingOutstanding = true; } } if (_client->available()) { uint8_t llen; uint16_t len = readPacket(&llen); uint16_t msgId = 0; uint8_t *payload; if (len > 0) { lastInActivity = t; uint8_t type = buffer[0]&0xF0; if (type == MQTTPUBLISH) { if (onMessage) { uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2]; char topic[tl+1]; for (uint16_t i=0; i<tl; i++) { topic[i] = buffer[llen+3+i]; } topic[tl] = 0; // msgId only present for QOS>0 if ((buffer[0]&0x06) == MQTTQOS1) { msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1]; payload = buffer+llen+3+tl+2; onMessage(topic,payload,len-llen-3-tl-2); buffer[0] = MQTTPUBACK; buffer[1] = 2; buffer[2] = (msgId >> 8); buffer[3] = (msgId & 0xFF); _client->write(buffer,4); lastOutActivity = t; } else { payload = buffer+llen+3+tl; onMessage(topic,payload,len-llen-3-tl); } } } else if (type == MQTTPINGREQ) { buffer[0] = MQTTPINGRESP; buffer[1] = 0; _client->write(buffer,2); } else if (type == MQTTPINGRESP) { pingOutstanding = false; } } } return true; } return false; } bool MQTTClient::publish(char* topic, char* payload) { return publish(topic,(uint8_t*)payload,strlen(payload),false); } bool MQTTClient::publish(char* topic, uint8_t* payload, unsigned int plength) { return publish(topic, payload, plength, false); } bool MQTTClient::publish(char* topic, uint8_t* payload, unsigned int plength, bool retained) { if (connected()) { // Leave room in the buffer for header and variable length field uint16_t length = 5; length = writeString(topic,buffer,length); uint16_t i; for (i=0; i<plength; i++) { buffer[length++] = payload[i]; } uint8_t header = MQTTPUBLISH; if (retained) { header |= 1; } return write(header,buffer,length-5); } return false; } bool MQTTClient::write(uint8_t header, uint8_t* buf, uint16_t length) { uint8_t lenBuf[4]; uint8_t llen = 0; uint8_t digit; uint8_t pos = 0; uint8_t rc; uint8_t len = length; do { digit = len % 128; len = len / 128; if (len > 0) { digit |= 0x80; } lenBuf[pos++] = digit; llen++; } while(len>0); buf[4-llen] = header; for (int i=0; i<llen; i++) { buf[5-llen+i] = lenBuf[i]; } rc = _client->write(buf+(4-llen),length+1+llen); lastOutActivity = time(NULL); return (rc == 1+llen+length); } bool MQTTClient::subscribe(char* topic) { return subscribe(topic, 0); } bool MQTTClient::subscribe(char* topic, uint8_t qos) { if (qos > 1) return false; if (connected()) { // Leave room in the buffer for header and variable length field uint16_t length = 5; nextMsgId++; if (nextMsgId == 0) { nextMsgId = 1; } buffer[length++] = (nextMsgId >> 8); buffer[length++] = (nextMsgId & 0xFF); length = writeString(topic, buffer,length); buffer[length++] = qos; return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5); } return false; } bool MQTTClient::unsubscribe(char* topic) { if (connected()) { uint16_t length = 5; nextMsgId++; if (nextMsgId == 0) { nextMsgId = 1; } buffer[length++] = (nextMsgId >> 8); buffer[length++] = (nextMsgId & 0xFF); length = writeString(topic, buffer,length); return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5); } return false; } void MQTTClient::disconnect() { buffer[0] = MQTTDISCONNECT; buffer[1] = 0; _client->write(buffer,2); _client->stop(); lastInActivity = lastOutActivity = time(NULL); } uint16_t MQTTClient::writeString(char* string, uint8_t* buf, uint16_t pos) { char* idp = string; uint16_t i = 0; pos += 2; while (*idp) { buf[pos++] = *idp++; i++; } buf[pos-i-2] = (i >> 8); buf[pos-i-1] = (i & 0xFF); return pos; } bool MQTTClient::connected() { bool rc; if (_client == NULL ) { rc = false; } else { rc = (int)_client->connected(); if (!rc) _client->stop(); } return rc; }