Initial port of the Arduino MQTT Client: http://knolleary.net/arduino-client-for-mqtt/ Updated with larger timeout and buffer sizes
Dependents: mbed_mqtt_endpoint_ublox_ethernet mbed_mqtt_endpoint_ublox_cellular mbed_mqtt_endpoint_nxp
Fork of MQTT by
Diff: PubSubClient.cpp
- Revision:
- 0:ca855d29545b
- Child:
- 7:9e005abd6846
diff -r 000000000000 -r ca855d29545b PubSubClient.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/PubSubClient.cpp Sun May 26 16:52:01 2013 +0000 @@ -0,0 +1,323 @@ +/* +PubSubClient.cpp - A simple client for MQTT. +Nicholas O'Leary +http://knolleary.net + +initial port for mbed +Joerg Wende +https://twitter.com/joerg_wende +*/ + +#include "PubSubClient.h" + +Timer t; +//Serial pc1(USBTX, USBRX); + + +int millis() +{ + return t.read_ms(); +} + +PubSubClient::PubSubClient() +{ +} + +PubSubClient::PubSubClient(char *ip, int port, void (*callback)(char*,char*,unsigned int)) +{ + this->callback = callback; + this->ip = ip; + this->port = port; + t.start(); +} + + +bool PubSubClient::connect(char *id) +{ + return connect(id,NULL,NULL,0,0,0,0); +} + +bool PubSubClient::connect(char *id, char *user, char *pass) +{ + return connect(id,user,pass,0,0,0,0); +} + +bool PubSubClient::connect(char *id, char* willTopic, short willQos, short willRetain, char* willMessage) +{ + return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage); +} + +bool PubSubClient::connect(char *id, char *user, char *pass, char* willTopic, short willQos, short willRetain, char* willMessage) +{ + if (!connected()) { + int result = 0; + result = _client.connect(this->ip, this->port); + _client.set_blocking(false, 1); + //pc1.printf("IP: %s\r\n",this->ip); + //pc1.printf("Port: %i\r\n",this->port); + //pc1.printf("Result: %i \r\n", result); + + if (result==0) { + nextMsgId = 1; + char d[9] = {0x00,0x06,'M','Q','I','s','d','p',MQTTPROTOCOLVERSION}; + // Leave room in the buffer for header and variable length field + int length = 5; + unsigned int j; + for (j = 0; j<9; j++) { + buffer[length++] = d[j]; + } + + char v; + if (willTopic) { + v = 0x06|(willQos<<3)|(willRetain<<5); + } else { + v = 0x02; + } + + if(user != NULL) { + v = v|0x80; + + if(pass != NULL) { + v = v|(0x80>>1); + } + } + + buffer[length++] = v; + + buffer[length++] = ((MQTT_KEEPALIVE) >> 8); + buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF); + length = writeString(id,buffer,length); + if (willTopic) { + length = writeString(willTopic,buffer,length); + length = writeString(willMessage,buffer,length); + } + + if(user != NULL) { + length = writeString(user,buffer,length); + if(pass != NULL) { + length = writeString(pass,buffer,length); + } + } + //pc1.printf("Before MQTT Connect ... \r\n"); + write(MQTTCONNECT,buffer,length-5); + + lastInActivity = lastOutActivity = millis(); + + int llen=128; + int len =0; + + while ((len=readPacket(llen))==0) { + unsigned long t = millis(); + if (t-lastInActivity > MQTT_KEEPALIVE*1000UL) { + _client.close(true); + return false; + } + } + //pc1.printf("after MQTT Connect ... %i\r\n",len); + if (len == 4 && buffer[3] == 0) { + lastInActivity = millis(); + pingOutstanding = false; + return true; + } + } + _client.close(true); + } + return false; +} + + +int PubSubClient::readPacket(int lengthLength) +{ + int len = 0; + len = _client.receive_all(buffer,lengthLength); + return len; +} + +bool PubSubClient::loop() +{ + if (connected()) { + unsigned long t = millis(); + if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) { + if (pingOutstanding) { + _client.close(true); + return false; + } else { + buffer[0] = MQTTPINGREQ; + buffer[1] = 0; + _client.send(buffer,2); + lastOutActivity = t; + lastInActivity = t; + pingOutstanding = true; + } + } + int len; + int llen= 128; + if (!((len=readPacket(llen))==0)) { + if (len > 0) { + lastInActivity = t; + char type = buffer[0]&0xF0; + if (type == MQTTPUBLISH) { + if (callback) { + //pc1.printf("MQTTPUBLISH received ... %i\r\n",len); + int tl = (buffer[2]<<8)+buffer[3]; + //pc1.printf("t1 ... %i\r\n",tl); + char topic[tl+1]; + for (int i=0; i<tl; i++) { + topic[i] = buffer[4+i]; + } + topic[tl] = 0; + //pc1.printf("MQTTPUBLISH Topic ... %s\r\n",topic); + // ignore msgID - only support QoS 0 subs + int t2 = len-4-tl; + //pc1.printf("t2 ... %i\r\n",t2); + char payload[t2+1]; + for (int i=0; i<t2; i++) { + payload[i] = buffer[4+i+tl]; + } + payload[t2] = 0; + //pc1.printf("MQTTPUBLISH Payload ... %s\r\n",payload); + callback(topic,payload,t2); + } + } else if (type == MQTTPINGREQ) { + buffer[0] = MQTTPINGRESP; + buffer[1] = 0; + _client.send(buffer,2); + } else if (type == MQTTPINGRESP) { + pingOutstanding = false; + } + } + } + return true; + } + return false; +} + +bool PubSubClient::publish(char* topic, char* payload) +{ + return publish(topic,payload,strlen(payload),false); +} + +bool PubSubClient::publish(char* topic, char* payload, unsigned int plength) +{ + return publish(topic, payload, plength, false); +} + +bool PubSubClient::publish(char* topic, char* payload, unsigned int plength, bool retained) +{ + if (connected()) { + // Leave room in the buffer for header and variable length field + //pc1.printf("in publish ... %s\r\n",topic); + //pc1.printf("in publish ... %s\r\n",payload); + int length = 5; + length = writeString(topic,buffer,length); + int i; + for (i=0; i<plength; i++) { + buffer[length++] = payload[i]; + } + short header = MQTTPUBLISH; + if (retained) { + header |= 1; + } + return write(header,buffer,length-5); + } + return false; +} + + + +bool PubSubClient::write(short header, char* buf, int length) +{ + short lenBuf[4]; + short llen = 0; + short digit; + short pos = 0; + short rc; + short len = length; + //pc1.printf("in write ... %d\r\n",length); + //pc1.printf("in write ... %s\r\n",buf); + do { + digit = len % 128; + len = len / 128; + if (len > 0) { + digit |= 0x80; + } + lenBuf[pos++] = digit; + llen++; + } while(len>0); + + buf[4-llen] = header; + for (int i=0; i<llen; i++) { + buf[5-llen+i] = lenBuf[i]; + } + rc = _client.send(buf+(4-llen),length+1+llen); + + lastOutActivity = millis(); + return (rc == 1+llen+length); +} + +bool PubSubClient::subscribe(char* topic) +{ + //pc1.printf("in subscribe ... %s\r\n",topic); + if (connected()) { + // Leave room in the buffer for header and variable length field + int length = 5; + nextMsgId++; + if (nextMsgId == 0) { + nextMsgId = 1; + } + buffer[length++] = (nextMsgId >> 8); + buffer[length++] = (nextMsgId & 0xFF); + length = writeString(topic, buffer,length); + buffer[length++] = 0; // Only do QoS 0 subs + return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5); + } + return false; +} + +bool PubSubClient::unsubscribe(char* topic) +{ + if (connected()) { + int length = 5; + nextMsgId++; + if (nextMsgId == 0) { + nextMsgId = 1; + } + buffer[length++] = (nextMsgId >> 8); + buffer[length++] = (nextMsgId & 0xFF); + length = writeString(topic, buffer,length); + return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5); + } + return false; +} + +void PubSubClient::disconnect() +{ + buffer[0] = MQTTDISCONNECT; + buffer[1] = 0; + _client.send(buffer,2); + _client.close(true); + lastInActivity = lastOutActivity = millis(); +} + +int PubSubClient::writeString(char* string, char* buf, int pos) +{ + char* idp = string; + int i = 0; + pos += 2; + while (*idp) { + buf[pos++] = *idp++; + i++; + } + buf[pos-i-2] = (i >> 8); + buf[pos-i-1] = (i & 0xFF); + return pos; +} + + +bool PubSubClient::connected() +{ + bool rc; + rc = (int)_client.is_connected(); + if (!rc) _client.close(true); + return rc; +} \ No newline at end of file