MQTTClient
Revision 0:260fb10c0755, committed 2012-03-20
- Comitter:
- SomeRandomBloke
- Date:
- Tue Mar 20 20:53:22 2012 +0000
- Child:
- 1:a50b6c866a3b
- Commit message:
Changed in this revision
| MQTTClient.cpp | Show annotated file Show diff for this revision Revisions of this file |
| MQTTClient.h | Show annotated file Show diff for this revision Revisions of this file |
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/MQTTClient.cpp Tue Mar 20 20:53:22 2012 +0000
@@ -0,0 +1,352 @@
+/*
+MQTTClient.cpp
+Based on MQTTClient from http://ceit.uq.edu.au/content/mqttclient-mbed-version-20
+A simple MQTT client for mbed, version 2.0
+By Yilun FAN, @CEIT, @JAN 2011
+
+Bug fixes and additions by Andrew Lindsay (andrew [at] thiseldo [dot] co [dot] uk)
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
+*/
+
+#include "MQTTClient.h"
+
+MQTTClient::MQTTClient(IpAddr server, int port, void (*callback)(char*, char*)) {
+ this->port = port;
+ callback_server = callback;
+ serverIp = server;
+ connected = false;
+ sessionOpened = false;
+ timer.start();
+}
+
+MQTTClient::MQTTClient() {}
+
+MQTTClient::~MQTTClient() {}
+
+
+void MQTTClient::init(IpAddr *server, int port, void (*callback)(char*, char*)) {
+ this->port = port;
+ callback_server = callback;
+ serverIp = *server;
+ this->userName = NULL;
+ this->password = NULL;
+ connected = false;
+ sessionOpened = false;
+ timer.start();
+}
+
+void MQTTClient::init(IpAddr *server, int port, char *userName, char *password, void (*callback)(char*, char*)) {
+ this->port = port;
+ callback_server = callback;
+ serverIp = *server;
+ this->userName = userName;
+ this->password = password;
+ connected = false;
+ sessionOpened = false;
+ timer.start();
+}
+
+int MQTTClient::send_data(const char* msg, int size) {
+ int transLen = pTCPSocket->send(msg, size);
+
+ /*Check send length matches the message length*/
+ if (transLen != size) {
+ for (int i = 0; i < size; i++) {
+ printf("%x, ", msg[i]);
+ }
+ printf("Error on send.\r\n");
+ return -1;
+ }
+ return 1;
+}
+
+int MQTTClient::open_session(char* id) {
+ /*variable header*/
+ char var_header[] = {0x00,0x06,0x4d,0x51,0x49,0x73,0x64,0x70,0x03,0x02,0x00,KEEPALIVE/500,0x00,strlen(id)};
+
+ /*fixed header: 2 bytes, big endian*/
+ char fixed_header[] = {MQTTCONNECT,12+strlen(id)+2};
+ short usernameLen = strlen( userName );
+ short passwordLen = strlen( password );
+ var_header[9] |= (usernameLen > 0 ? 0x80 : 0x00 );
+ var_header[9] |= (passwordLen > 0 ? 0x40 : 0x00 );
+
+// printf("fixed %d, var %d id %d, username %d, password %d\n",sizeof(fixed_header), sizeof(var_header), sizeof(id), usernameLen, passwordLen );
+ char packet[sizeof(fixed_header) + sizeof(var_header) + sizeof(id) + usernameLen + (usernameLen > 0 ? 2 : 0) + passwordLen + (passwordLen > 0 ? 2 : 0) ];
+
+ memset(packet,0,sizeof(packet));
+ memcpy(packet,fixed_header,sizeof(fixed_header));
+ memcpy(packet+sizeof(fixed_header),var_header,sizeof(var_header));
+ memcpy(packet+sizeof(fixed_header)+sizeof(var_header), id, strlen(id));
+ if ( usernameLen > 0 ) {
+ packet[sizeof(fixed_header)+sizeof(var_header)+strlen(id)] = 0x00;
+ packet[sizeof(fixed_header)+sizeof(var_header)+strlen(id)+1] = usernameLen;
+ memcpy(packet+sizeof(fixed_header)+sizeof(var_header)+strlen(id)+2, userName, usernameLen);
+ packet[1] += (usernameLen + 2);
+ }
+ if ( passwordLen > 0 ) {
+ packet[sizeof(fixed_header)+sizeof(var_header)+strlen(id) + usernameLen + (usernameLen > 0 ? 2 : 0) ] = 0x00;
+ packet[sizeof(fixed_header)+sizeof(var_header)+strlen(id) + usernameLen + (usernameLen > 0 ? 2 : 0)+1] = passwordLen;
+ memcpy(packet+sizeof(fixed_header)+sizeof(var_header)+strlen(id)+ usernameLen + (usernameLen > 0 ? 2 : 0)+2, password, passwordLen);
+ packet[1] += (passwordLen + 2);
+ }
+
+ /* printf("Packet length %d\n", sizeof(packet));
+ for (int i = 0; i < sizeof(packet); i++) {
+ printf("%x, ", packet[i]);
+ }
+ printf("\r\n");
+ */
+ if (!send_data(packet, sizeof(packet))) {
+ return -1;
+ }
+// printf("Sent\n");
+ return 1;
+}
+
+int MQTTClient::connect(char* id) {
+ clientId = id;
+
+ /*Initial TCP socket*/
+ pTCPSocket = new TCPSocket;
+ pTCPSocket->setOnEvent(this, &MQTTClient::onTCPSocketEvent);
+
+ host.setPort(port);
+ host.setIp(serverIp);
+ host.setName("localhost");
+
+ /*Trying to connect to host*/
+ printf("Trying to connect to host..\r\n\r\n");
+ TCPSocketErr err = pTCPSocket->connect(host);
+
+ Net::poll();
+ if (err) {
+ printf("Error connecting to host [%d]\r\n", (int) err);
+ return -1;
+ }
+ printf("Connect to host sucessed..\r\n\r\n");
+
+ /*Wait TCP connection with server to be established*/
+ int i = 0;
+ while (!connected) {
+ Net::poll();
+ wait(1);
+ i++;
+ printf("Wait for connections %d..\r\n", i);
+ if (i == 35) {//If wait too long, give up.
+ return -1;
+ }
+ }
+
+ /*Send open session message to server*/
+ open_session(id);
+
+ /*Wait server notice of open sesion*/
+ while (!sessionOpened) {
+ Net::poll();
+ wait(1);
+ if (!connected) {
+ break;
+ }
+ printf("Wait for session..\r\n");
+ }
+ if (!connected) {
+ return -2;
+ }
+ lastActivity = timer.read_ms();
+ return 1;
+}
+
+int MQTTClient::publish(char* pub_topic, char* msg) {
+ uint8_t var_header_pub[strlen(pub_topic)+3];
+ strcpy((char *)&var_header_pub[2], pub_topic);
+ var_header_pub[0] = 0;
+ var_header_pub[1] = strlen(pub_topic);
+ var_header_pub[sizeof(var_header_pub)-1] = 0;
+
+ uint8_t fixed_header_pub[] = {MQTTPUBLISH,sizeof(var_header_pub)+strlen(msg)};
+
+ uint8_t packet_pub[sizeof(fixed_header_pub)+sizeof(var_header_pub)+strlen(msg)];
+ memset(packet_pub,0,sizeof(packet_pub));
+ memcpy(packet_pub,fixed_header_pub,sizeof(fixed_header_pub));
+ memcpy(packet_pub+sizeof(fixed_header_pub),var_header_pub,sizeof(var_header_pub));
+ memcpy(packet_pub+sizeof(fixed_header_pub)+sizeof(var_header_pub),msg,strlen(msg));
+
+ if (!send_data((char*)packet_pub, sizeof(packet_pub))) {
+ return -1;
+ }
+ return 1;
+}
+
+
+void MQTTClient::disconnect() {
+ char packet_224[] = {0xe0, 0x00};
+ send_data((char*)packet_224, 2);
+
+ connected = false;
+}
+
+void MQTTClient::read_data() {
+ char buffer[1024];
+ int len = 0, readLen;
+
+ while ((readLen = pTCPSocket->recv(buffer, 1024)) != 0) {
+ len += readLen;
+ }
+
+ buffer[len] = '\0';
+
+
+ printf("Read length: %d %d\r\n", len, readLen);
+
+ for (int i = 0; i < len; i++) {
+ printf("%2X ", buffer[i]);
+ }
+ printf("\r\n");
+
+ char type = buffer[0]>>4;
+ if ( type == 2 ) { // CONNACK
+ printf("CONNACK\n");
+
+ } else if (type == 3) { // PUBLISH
+ if (callback_server) {
+ short index = 1;
+ short multiplier = 1;
+ short value = 0;
+ uint8_t digit;
+ do {
+ digit = buffer[index++];
+ value += (digit & 127) * multiplier;
+ multiplier *= 128;
+ } while ((digit & 128) != 0);
+ printf( "variable length %d, index %d\n", value, index );
+
+// uint8_t tl = (buffer[2]<<3)+buffer[3];
+ uint8_t tl = (buffer[index]<<3)+buffer[index+1];
+ printf("Topic len %d\n",tl);
+ char topic[tl+1];
+ for (int i=0; i<tl; i++) {
+ topic[i] = buffer[index+2+i];
+ }
+ topic[tl] = 0;
+ // ignore msgID - only support QoS 0 subs
+ char *payload = buffer+index+2+tl;
+ callback_server(topic,(char*)payload);
+ }
+ } else if (type == 12) { // PINGREG -- Ask for alive
+ char packet_208[] = {0xd0, 0x00};
+ send_data((char*)packet_208, 2);
+ lastActivity = timer.read_ms();
+ }
+}
+
+void MQTTClient::read_open_session() {
+ char buffer[32];
+ int len = 0, readLen;
+
+ while ((readLen = pTCPSocket->recv(buffer, 32)) != 0) {
+ len += readLen;
+ }
+
+ if (len == 4 && buffer[3] == 0) {
+ printf("Session opened\r\n");
+ sessionOpened = true;
+ }
+}
+
+int MQTTClient::subscribe(char* topic) {
+
+ if (connected) {
+ uint8_t var_header_topic[] = {0,10};
+ uint8_t fixed_header_topic[] = {MQTTSUBSCRIBE,sizeof(var_header_topic)+strlen(topic)+3};
+
+ //utf topic
+ uint8_t utf_topic[strlen(topic)+3];
+ strcpy((char *)&utf_topic[2], topic);
+
+ utf_topic[0] = 0;
+ utf_topic[1] = strlen(topic);
+ utf_topic[sizeof(utf_topic)-1] = 0;
+
+ char packet_topic[sizeof(var_header_topic)+sizeof(fixed_header_topic)+strlen(topic)+3];
+ memset(packet_topic,0,sizeof(packet_topic));
+ memcpy(packet_topic,fixed_header_topic,sizeof(fixed_header_topic));
+ memcpy(packet_topic+sizeof(fixed_header_topic),var_header_topic,sizeof(var_header_topic));
+ memcpy(packet_topic+sizeof(fixed_header_topic)+sizeof(var_header_topic),utf_topic,sizeof(utf_topic));
+
+ if (!send_data(packet_topic, sizeof(packet_topic))) {
+ return -1;
+ }
+ return 1;
+ }
+ return -1;
+}
+
+void MQTTClient::live() {
+ if (connected) {
+ int t = timer.read_ms();
+ if (t - lastActivity > KEEPALIVE) {
+ //Send 192 0 to broker
+ printf("Send 192\r\n");
+ char packet_192[] = {0xc0, 0x00};
+ send_data((char*)packet_192, 2);
+ lastActivity = t;
+ }
+ }
+}
+
+void MQTTClient::onTCPSocketEvent(TCPSocketEvent e) {
+ switch (e) {
+ case TCPSOCKET_ACCEPT:
+ printf("New TCPSocketEvent: TCPSOCKET_ACCEPT\r\n");
+ break;
+ case TCPSOCKET_CONNECTED:
+ printf("New TCPSocketEvent: TCPSOCKET_CONNECTED\r\n");
+ connected = true;
+ break;
+ case TCPSOCKET_WRITEABLE:
+ printf("New TCPSocketEvent: TCPSOCKET_WRITEABLE\r\n");
+ break;
+ case TCPSOCKET_READABLE:
+ printf("New TCPSocketEvent: TCPSOCKET_READABLE\r\n");
+ if (!sessionOpened) {
+ read_open_session();
+ }
+ read_data();
+ break;
+ case TCPSOCKET_CONTIMEOUT:
+ printf("New TCPSocketEvent: TCPSOCKET_CONTIMEOUT\r\n");
+ break;
+ case TCPSOCKET_CONRST:
+ printf("New TCPSocketEvent: TCPSOCKET_CONRST\r\n");
+ break;
+ case TCPSOCKET_CONABRT:
+ printf("New TCPSocketEvent: TCPSOCKET_CONABRT\r\n");
+ break;
+ case TCPSOCKET_ERROR:
+ printf("New TCPSocketEvent: TCPSOCKET_ERROR\r\n");
+ break;
+ case TCPSOCKET_DISCONNECTED:
+ printf("New TCPSocketEvent: TCPSOCKET_DISCONNECTED\r\n");
+ pTCPSocket->close();
+ connected = false;
+ break;
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/MQTTClient.h Tue Mar 20 20:53:22 2012 +0000
@@ -0,0 +1,79 @@
+/*
+MQTTClient.cpp
+Based on MQTTClient from http://ceit.uq.edu.au/content/mqttclient-mbed-version-20
+A simple MQTT client for mbed, version 2.0
+By Yilun FAN, @CEIT, @JAN 2011
+
+Bug fixes and additions by Andrew Lindsay (andrew [at] thiseldo [dot] co [dot] uk)
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
+*/
+
+#ifndef MQTT_CLIENT_H
+#define MQTT_CLIENT_H
+
+#include "mbed.h"
+#include "TCPSocket.h"
+
+#define MQTTCONNECT 1<<4
+#define MQTTCONNACK 2<<4
+#define MQTTPUBLISH 3<<4
+#define MQTTSUBSCRIBE 8<<4
+
+#define MAX_PACKET_SIZE 128
+#define KEEPALIVE 15000
+
+class MQTTClient
+{
+public:
+ MQTTClient(IpAddr server, int port, void (*callback)(char*, char*));
+ MQTTClient();
+ ~MQTTClient();
+ void init(IpAddr *server, int port, void (*callback)(char*, char*));
+ void init(IpAddr *server, int port, char *userName, char *password, void (*callback)(char*, char*));
+ int connect(char *);
+ void disconnect();
+ int publish(char *, char *);
+ int subscribe(char *);
+ void live();
+
+private:
+ int open_session(char* id);
+ void read_open_session();
+ int send_data(const char* msg, int size);
+ void read_data();
+
+ char* clientId;
+ char* userName;
+ char *password;
+ Timer timer;
+ IpAddr serverIp;
+ int port;
+ bool connected;
+ bool sessionOpened;
+
+ void onTCPSocketEvent(TCPSocketEvent e);
+ TCPSocket* pTCPSocket;
+ Host host;
+
+ int lastActivity;
+ void (*callback_server)(char*, char*);
+};
+
+#endif