Initial port of the Arduino MQTT Client: http://knolleary.net/arduino-client-for-mqtt/
Dependents: MQTTTest 2lemetryNXP MyTempuratureMqtt MqttClientEthInt ... more
Revision 0:ca855d29545b, committed 2013-05-26
- Comitter:
- jwende
- Date:
- Sun May 26 16:52:01 2013 +0000
- Commit message:
- PubSubClient Library developed for Arduino - http://knolleary.net/arduino-client-for-mqtt/; Initial port to mbed using EthernetInterface
Changed in this revision
PubSubClient.cpp | Show annotated file Show diff for this revision Revisions of this file |
PubSubClient.h | Show annotated file Show diff for this revision Revisions of this file |
diff -r 000000000000 -r ca855d29545b PubSubClient.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/PubSubClient.cpp Sun May 26 16:52:01 2013 +0000 @@ -0,0 +1,323 @@ +/* +PubSubClient.cpp - A simple client for MQTT. +Nicholas O'Leary +http://knolleary.net + +initial port for mbed +Joerg Wende +https://twitter.com/joerg_wende +*/ + +#include "PubSubClient.h" + +Timer t; +//Serial pc1(USBTX, USBRX); + + +int millis() +{ + return t.read_ms(); +} + +PubSubClient::PubSubClient() +{ +} + +PubSubClient::PubSubClient(char *ip, int port, void (*callback)(char*,char*,unsigned int)) +{ + this->callback = callback; + this->ip = ip; + this->port = port; + t.start(); +} + + +bool PubSubClient::connect(char *id) +{ + return connect(id,NULL,NULL,0,0,0,0); +} + +bool PubSubClient::connect(char *id, char *user, char *pass) +{ + return connect(id,user,pass,0,0,0,0); +} + +bool PubSubClient::connect(char *id, char* willTopic, short willQos, short willRetain, char* willMessage) +{ + return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage); +} + +bool PubSubClient::connect(char *id, char *user, char *pass, char* willTopic, short willQos, short willRetain, char* willMessage) +{ + if (!connected()) { + int result = 0; + result = _client.connect(this->ip, this->port); + _client.set_blocking(false, 1); + //pc1.printf("IP: %s\r\n",this->ip); + //pc1.printf("Port: %i\r\n",this->port); + //pc1.printf("Result: %i \r\n", result); + + if (result==0) { + nextMsgId = 1; + char d[9] = {0x00,0x06,'M','Q','I','s','d','p',MQTTPROTOCOLVERSION}; + // Leave room in the buffer for header and variable length field + int length = 5; + unsigned int j; + for (j = 0; j<9; j++) { + buffer[length++] = d[j]; + } + + char v; + if (willTopic) { + v = 0x06|(willQos<<3)|(willRetain<<5); + } else { + v = 0x02; + } + + if(user != NULL) { + v = v|0x80; + + if(pass != NULL) { + v = v|(0x80>>1); + } + } + + buffer[length++] = v; + + buffer[length++] = ((MQTT_KEEPALIVE) >> 8); + buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF); + length = writeString(id,buffer,length); + if (willTopic) { + length = writeString(willTopic,buffer,length); + length = writeString(willMessage,buffer,length); + } + + if(user != NULL) { + length = writeString(user,buffer,length); + if(pass != NULL) { + length = writeString(pass,buffer,length); + } + } + //pc1.printf("Before MQTT Connect ... \r\n"); + write(MQTTCONNECT,buffer,length-5); + + lastInActivity = lastOutActivity = millis(); + + int llen=128; + int len =0; + + while ((len=readPacket(llen))==0) { + unsigned long t = millis(); + if (t-lastInActivity > MQTT_KEEPALIVE*1000UL) { + _client.close(true); + return false; + } + } + //pc1.printf("after MQTT Connect ... %i\r\n",len); + if (len == 4 && buffer[3] == 0) { + lastInActivity = millis(); + pingOutstanding = false; + return true; + } + } + _client.close(true); + } + return false; +} + + +int PubSubClient::readPacket(int lengthLength) +{ + int len = 0; + len = _client.receive_all(buffer,lengthLength); + return len; +} + +bool PubSubClient::loop() +{ + if (connected()) { + unsigned long t = millis(); + if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) { + if (pingOutstanding) { + _client.close(true); + return false; + } else { + buffer[0] = MQTTPINGREQ; + buffer[1] = 0; + _client.send(buffer,2); + lastOutActivity = t; + lastInActivity = t; + pingOutstanding = true; + } + } + int len; + int llen= 128; + if (!((len=readPacket(llen))==0)) { + if (len > 0) { + lastInActivity = t; + char type = buffer[0]&0xF0; + if (type == MQTTPUBLISH) { + if (callback) { + //pc1.printf("MQTTPUBLISH received ... %i\r\n",len); + int tl = (buffer[2]<<8)+buffer[3]; + //pc1.printf("t1 ... %i\r\n",tl); + char topic[tl+1]; + for (int i=0; i<tl; i++) { + topic[i] = buffer[4+i]; + } + topic[tl] = 0; + //pc1.printf("MQTTPUBLISH Topic ... %s\r\n",topic); + // ignore msgID - only support QoS 0 subs + int t2 = len-4-tl; + //pc1.printf("t2 ... %i\r\n",t2); + char payload[t2+1]; + for (int i=0; i<t2; i++) { + payload[i] = buffer[4+i+tl]; + } + payload[t2] = 0; + //pc1.printf("MQTTPUBLISH Payload ... %s\r\n",payload); + callback(topic,payload,t2); + } + } else if (type == MQTTPINGREQ) { + buffer[0] = MQTTPINGRESP; + buffer[1] = 0; + _client.send(buffer,2); + } else if (type == MQTTPINGRESP) { + pingOutstanding = false; + } + } + } + return true; + } + return false; +} + +bool PubSubClient::publish(char* topic, char* payload) +{ + return publish(topic,payload,strlen(payload),false); +} + +bool PubSubClient::publish(char* topic, char* payload, unsigned int plength) +{ + return publish(topic, payload, plength, false); +} + +bool PubSubClient::publish(char* topic, char* payload, unsigned int plength, bool retained) +{ + if (connected()) { + // Leave room in the buffer for header and variable length field + //pc1.printf("in publish ... %s\r\n",topic); + //pc1.printf("in publish ... %s\r\n",payload); + int length = 5; + length = writeString(topic,buffer,length); + int i; + for (i=0; i<plength; i++) { + buffer[length++] = payload[i]; + } + short header = MQTTPUBLISH; + if (retained) { + header |= 1; + } + return write(header,buffer,length-5); + } + return false; +} + + + +bool PubSubClient::write(short header, char* buf, int length) +{ + short lenBuf[4]; + short llen = 0; + short digit; + short pos = 0; + short rc; + short len = length; + //pc1.printf("in write ... %d\r\n",length); + //pc1.printf("in write ... %s\r\n",buf); + do { + digit = len % 128; + len = len / 128; + if (len > 0) { + digit |= 0x80; + } + lenBuf[pos++] = digit; + llen++; + } while(len>0); + + buf[4-llen] = header; + for (int i=0; i<llen; i++) { + buf[5-llen+i] = lenBuf[i]; + } + rc = _client.send(buf+(4-llen),length+1+llen); + + lastOutActivity = millis(); + return (rc == 1+llen+length); +} + +bool PubSubClient::subscribe(char* topic) +{ + //pc1.printf("in subscribe ... %s\r\n",topic); + if (connected()) { + // Leave room in the buffer for header and variable length field + int length = 5; + nextMsgId++; + if (nextMsgId == 0) { + nextMsgId = 1; + } + buffer[length++] = (nextMsgId >> 8); + buffer[length++] = (nextMsgId & 0xFF); + length = writeString(topic, buffer,length); + buffer[length++] = 0; // Only do QoS 0 subs + return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5); + } + return false; +} + +bool PubSubClient::unsubscribe(char* topic) +{ + if (connected()) { + int length = 5; + nextMsgId++; + if (nextMsgId == 0) { + nextMsgId = 1; + } + buffer[length++] = (nextMsgId >> 8); + buffer[length++] = (nextMsgId & 0xFF); + length = writeString(topic, buffer,length); + return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5); + } + return false; +} + +void PubSubClient::disconnect() +{ + buffer[0] = MQTTDISCONNECT; + buffer[1] = 0; + _client.send(buffer,2); + _client.close(true); + lastInActivity = lastOutActivity = millis(); +} + +int PubSubClient::writeString(char* string, char* buf, int pos) +{ + char* idp = string; + int i = 0; + pos += 2; + while (*idp) { + buf[pos++] = *idp++; + i++; + } + buf[pos-i-2] = (i >> 8); + buf[pos-i-1] = (i & 0xFF); + return pos; +} + + +bool PubSubClient::connected() +{ + bool rc; + rc = (int)_client.is_connected(); + if (!rc) _client.close(true); + return rc; +} \ No newline at end of file
diff -r 000000000000 -r ca855d29545b PubSubClient.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/PubSubClient.h Sun May 26 16:52:01 2013 +0000 @@ -0,0 +1,76 @@ +/* +PubSubClient.h - A simple client for MQTT. +Nicholas O'Leary +http://knolleary.net +*/ +#include "mbed.h" +#include "EthernetInterface.h" +#include "TCPSocketConnection.h" + +#ifndef PubSubClient_h +#define PubSubClient_h + + + +// MQTT_MAX_PACKET_SIZE : Maximum packet size +#define MQTT_MAX_PACKET_SIZE 128 + +// MQTT_KEEPALIVE : keepAlive interval in Seconds +#define MQTT_KEEPALIVE 15 + +#define MQTTPROTOCOLVERSION 3 +#define MQTTCONNECT 1 << 4 // Client request to connect to Server +#define MQTTCONNACK 2 << 4 // Connect Acknowledgment +#define MQTTPUBLISH 3 << 4 // Publish message +#define MQTTPUBACK 4 << 4 // Publish Acknowledgment +#define MQTTPUBREC 5 << 4 // Publish Received (assured delivery part 1) +#define MQTTPUBREL 6 << 4 // Publish Release (assured delivery part 2) +#define MQTTPUBCOMP 7 << 4 // Publish Complete (assured delivery part 3) +#define MQTTSUBSCRIBE 8 << 4 // Client Subscribe request +#define MQTTSUBACK 9 << 4 // Subscribe Acknowledgment +#define MQTTUNSUBSCRIBE 10 << 4 // Client Unsubscribe request +#define MQTTUNSUBACK 11 << 4 // Unsubscribe Acknowledgment +#define MQTTPINGREQ 12 << 4 // PING Request +#define MQTTPINGRESP 13 << 4 // PING Response +#define MQTTDISCONNECT 14 << 4 // Client is Disconnecting +#define MQTTReserved 15 << 4 // Reserved + +#define MQTTQOS0 (0 << 1) +#define MQTTQOS1 (1 << 1) +#define MQTTQOS2 (2 << 1) + +class PubSubClient { +private: + TCPSocketConnection _client; + char buffer[MQTT_MAX_PACKET_SIZE]; + int nextMsgId; + unsigned long lastOutActivity; + unsigned long lastInActivity; + bool pingOutstanding; + void (*callback)(char*,char*,unsigned int); + int readPacket(int); + char readByte(); + bool write(short header, char* buf, int length); + int writeString(char* string, char* buf, int pos); + char* ip; + int port; +public: + PubSubClient(); + PubSubClient(char*, int, void(*)(char*,char*,unsigned int)); + bool connect(char *); + bool connect(char *, char *, char *); + bool connect(char *, char *, short, short, char *); + bool connect(char *, char *, char *, char *, short, short, char*); + void disconnect(); + bool publish(char *, char *); + bool publish(char *, char *, unsigned int); + bool publish(char *, char *, unsigned int, bool); +// bool publish_P(char *, short PROGMEM *, unsigned int, bool); + bool subscribe(char *); + bool unsubscribe(char *); + bool loop(); + bool connected(); +}; + + +#endif \ No newline at end of file