MQTTClient

Dependents:   IoTGateway_Basic test

Revision:
0:260fb10c0755
Child:
1:a50b6c866a3b
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/MQTTClient.cpp	Tue Mar 20 20:53:22 2012 +0000
@@ -0,0 +1,352 @@
+/*
+MQTTClient.cpp
+Based on MQTTClient from http://ceit.uq.edu.au/content/mqttclient-mbed-version-20
+A simple MQTT client for mbed, version 2.0
+By Yilun FAN, @CEIT, @JAN 2011
+
+Bug fixes and additions by Andrew Lindsay (andrew [at] thiseldo [dot] co [dot] uk)
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
+*/
+
+#include "MQTTClient.h"
+
+MQTTClient::MQTTClient(IpAddr server, int port, void (*callback)(char*, char*)) {
+    this->port = port;
+    callback_server = callback;
+    serverIp = server;
+    connected = false;
+    sessionOpened = false;
+    timer.start();
+}
+
+MQTTClient::MQTTClient() {}
+
+MQTTClient::~MQTTClient() {}
+
+
+void MQTTClient::init(IpAddr *server, int port, void (*callback)(char*, char*)) {
+    this->port = port;
+    callback_server = callback;
+    serverIp = *server;
+    this->userName = NULL;
+    this->password = NULL;
+    connected = false;
+    sessionOpened = false;
+    timer.start();
+}
+
+void MQTTClient::init(IpAddr *server, int port, char *userName, char *password, void (*callback)(char*, char*)) {
+    this->port = port;
+    callback_server = callback;
+    serverIp = *server;
+    this->userName = userName;
+    this->password = password;
+    connected = false;
+    sessionOpened = false;
+    timer.start();
+}
+
+int MQTTClient::send_data(const char* msg, int size) {
+    int transLen = pTCPSocket->send(msg, size);
+
+    /*Check send length matches the message length*/
+    if (transLen != size) {
+        for (int i = 0; i < size; i++) {
+            printf("%x, ", msg[i]);
+        }
+        printf("Error on send.\r\n");
+        return -1;
+    }
+    return 1;
+}
+
+int MQTTClient::open_session(char* id) {
+    /*variable header*/
+    char var_header[] = {0x00,0x06,0x4d,0x51,0x49,0x73,0x64,0x70,0x03,0x02,0x00,KEEPALIVE/500,0x00,strlen(id)};
+
+    /*fixed header: 2 bytes, big endian*/
+    char fixed_header[] = {MQTTCONNECT,12+strlen(id)+2};
+    short usernameLen = strlen( userName );
+    short passwordLen = strlen( password );
+    var_header[9] |= (usernameLen > 0 ? 0x80 : 0x00 );
+    var_header[9] |= (passwordLen > 0 ? 0x40 : 0x00 );
+
+//    printf("fixed %d, var %d id %d, username %d, password %d\n",sizeof(fixed_header), sizeof(var_header), sizeof(id), usernameLen, passwordLen );
+    char packet[sizeof(fixed_header) + sizeof(var_header) + sizeof(id) + usernameLen + (usernameLen > 0 ? 2 : 0) + passwordLen + (passwordLen > 0 ? 2 : 0) ];
+
+    memset(packet,0,sizeof(packet));
+    memcpy(packet,fixed_header,sizeof(fixed_header));
+    memcpy(packet+sizeof(fixed_header),var_header,sizeof(var_header));
+    memcpy(packet+sizeof(fixed_header)+sizeof(var_header), id, strlen(id));
+    if ( usernameLen > 0 ) {
+        packet[sizeof(fixed_header)+sizeof(var_header)+strlen(id)] = 0x00;
+        packet[sizeof(fixed_header)+sizeof(var_header)+strlen(id)+1] = usernameLen;
+        memcpy(packet+sizeof(fixed_header)+sizeof(var_header)+strlen(id)+2, userName, usernameLen);
+        packet[1] += (usernameLen + 2);
+    }
+    if ( passwordLen > 0 ) {
+        packet[sizeof(fixed_header)+sizeof(var_header)+strlen(id) + usernameLen + (usernameLen > 0 ? 2 : 0) ] = 0x00;
+        packet[sizeof(fixed_header)+sizeof(var_header)+strlen(id) + usernameLen + (usernameLen > 0 ? 2 : 0)+1] = passwordLen;
+        memcpy(packet+sizeof(fixed_header)+sizeof(var_header)+strlen(id)+ usernameLen + (usernameLen > 0 ? 2 : 0)+2, password, passwordLen);
+        packet[1] += (passwordLen + 2);
+    }
+
+    /*    printf("Packet length %d\n", sizeof(packet));
+        for (int i = 0; i < sizeof(packet); i++) {
+            printf("%x, ", packet[i]);
+        }
+        printf("\r\n");
+    */
+    if (!send_data(packet, sizeof(packet))) {
+        return -1;
+    }
+//    printf("Sent\n");
+    return 1;
+}
+
+int MQTTClient::connect(char* id) {
+    clientId = id;
+
+    /*Initial TCP socket*/
+    pTCPSocket = new TCPSocket;
+    pTCPSocket->setOnEvent(this, &MQTTClient::onTCPSocketEvent);
+
+    host.setPort(port);
+    host.setIp(serverIp);
+    host.setName("localhost");
+
+    /*Trying to connect to host*/
+    printf("Trying to connect to host..\r\n\r\n");
+    TCPSocketErr err = pTCPSocket->connect(host);
+
+    Net::poll();
+    if (err) {
+        printf("Error connecting to host [%d]\r\n", (int) err);
+        return -1;
+    }
+    printf("Connect to host sucessed..\r\n\r\n");
+
+    /*Wait TCP connection with server to be established*/
+    int i = 0;
+    while (!connected) {
+        Net::poll();
+        wait(1);
+        i++;
+        printf("Wait for connections %d..\r\n", i);
+        if (i == 35) {//If wait too long, give up.
+            return -1;
+        }
+    }
+
+    /*Send open session message to server*/
+    open_session(id);
+
+    /*Wait server notice of open sesion*/
+    while (!sessionOpened) {
+        Net::poll();
+        wait(1);
+        if (!connected) {
+            break;
+        }
+        printf("Wait for session..\r\n");
+    }
+    if (!connected) {
+        return -2;
+    }
+    lastActivity = timer.read_ms();
+    return 1;
+}
+
+int MQTTClient::publish(char* pub_topic, char* msg) {
+    uint8_t var_header_pub[strlen(pub_topic)+3];
+    strcpy((char *)&var_header_pub[2], pub_topic);
+    var_header_pub[0] = 0;
+    var_header_pub[1] = strlen(pub_topic);
+    var_header_pub[sizeof(var_header_pub)-1] = 0;
+
+    uint8_t fixed_header_pub[] = {MQTTPUBLISH,sizeof(var_header_pub)+strlen(msg)};
+
+    uint8_t packet_pub[sizeof(fixed_header_pub)+sizeof(var_header_pub)+strlen(msg)];
+    memset(packet_pub,0,sizeof(packet_pub));
+    memcpy(packet_pub,fixed_header_pub,sizeof(fixed_header_pub));
+    memcpy(packet_pub+sizeof(fixed_header_pub),var_header_pub,sizeof(var_header_pub));
+    memcpy(packet_pub+sizeof(fixed_header_pub)+sizeof(var_header_pub),msg,strlen(msg));
+
+    if (!send_data((char*)packet_pub, sizeof(packet_pub))) {
+        return -1;
+    }
+    return 1;
+}
+
+
+void MQTTClient::disconnect() {
+    char packet_224[] = {0xe0, 0x00};
+    send_data((char*)packet_224, 2);
+
+    connected = false;
+}
+
+void MQTTClient::read_data() {
+    char buffer[1024];
+    int len = 0, readLen;
+
+    while ((readLen = pTCPSocket->recv(buffer, 1024)) != 0) {
+        len += readLen;
+    }
+
+    buffer[len] = '\0';
+
+
+    printf("Read length: %d %d\r\n", len, readLen);
+
+    for (int i = 0; i < len; i++) {
+        printf("%2X ", buffer[i]);
+    }
+    printf("\r\n");
+
+    char type = buffer[0]>>4;
+    if ( type == 2 ) { // CONNACK
+        printf("CONNACK\n");
+
+    } else if (type == 3) { // PUBLISH
+        if (callback_server) {
+            short index = 1;
+            short multiplier = 1;
+            short value = 0; 
+            uint8_t  digit;
+            do {
+                digit = buffer[index++]; 
+                value += (digit & 127) * multiplier; 
+                multiplier *= 128;
+            } while ((digit & 128) != 0);
+            printf( "variable length %d, index %d\n", value, index );
+            
+//            uint8_t tl = (buffer[2]<<3)+buffer[3];
+            uint8_t tl = (buffer[index]<<3)+buffer[index+1];
+            printf("Topic len %d\n",tl);
+            char topic[tl+1];
+            for (int i=0; i<tl; i++) {
+                topic[i] = buffer[index+2+i];
+            }
+            topic[tl] = 0;
+            // ignore msgID - only support QoS 0 subs
+            char *payload = buffer+index+2+tl;
+            callback_server(topic,(char*)payload);
+        }
+    } else if (type == 12) { // PINGREG -- Ask for alive
+        char packet_208[] = {0xd0, 0x00};
+        send_data((char*)packet_208, 2);
+        lastActivity = timer.read_ms();
+    }
+}
+
+void MQTTClient::read_open_session() {
+    char buffer[32];
+    int len = 0, readLen;
+
+    while ((readLen = pTCPSocket->recv(buffer, 32)) != 0) {
+        len += readLen;
+    }
+
+    if (len == 4 && buffer[3] == 0) {
+        printf("Session opened\r\n");
+        sessionOpened = true;
+    }
+}
+
+int MQTTClient::subscribe(char* topic) {
+
+    if (connected) {
+        uint8_t var_header_topic[] = {0,10};
+        uint8_t fixed_header_topic[] = {MQTTSUBSCRIBE,sizeof(var_header_topic)+strlen(topic)+3};
+
+        //utf topic
+        uint8_t utf_topic[strlen(topic)+3];
+        strcpy((char *)&utf_topic[2], topic);
+
+        utf_topic[0] = 0;
+        utf_topic[1] = strlen(topic);
+        utf_topic[sizeof(utf_topic)-1] = 0;
+
+        char packet_topic[sizeof(var_header_topic)+sizeof(fixed_header_topic)+strlen(topic)+3];
+        memset(packet_topic,0,sizeof(packet_topic));
+        memcpy(packet_topic,fixed_header_topic,sizeof(fixed_header_topic));
+        memcpy(packet_topic+sizeof(fixed_header_topic),var_header_topic,sizeof(var_header_topic));
+        memcpy(packet_topic+sizeof(fixed_header_topic)+sizeof(var_header_topic),utf_topic,sizeof(utf_topic));
+
+        if (!send_data(packet_topic, sizeof(packet_topic))) {
+            return -1;
+        }
+        return 1;
+    }
+    return -1;
+}
+
+void MQTTClient::live() {
+    if (connected) {
+        int t = timer.read_ms();
+        if (t - lastActivity > KEEPALIVE) {
+            //Send 192 0 to broker
+            printf("Send 192\r\n");
+            char packet_192[] = {0xc0, 0x00};
+            send_data((char*)packet_192, 2);
+            lastActivity = t;
+        }
+    }
+}
+
+void MQTTClient::onTCPSocketEvent(TCPSocketEvent e) {
+    switch (e) {
+        case TCPSOCKET_ACCEPT:
+            printf("New TCPSocketEvent: TCPSOCKET_ACCEPT\r\n");
+            break;
+        case TCPSOCKET_CONNECTED:
+            printf("New TCPSocketEvent: TCPSOCKET_CONNECTED\r\n");
+            connected = true;
+            break;
+        case TCPSOCKET_WRITEABLE:
+            printf("New TCPSocketEvent: TCPSOCKET_WRITEABLE\r\n");
+            break;
+        case TCPSOCKET_READABLE:
+            printf("New TCPSocketEvent: TCPSOCKET_READABLE\r\n");
+            if (!sessionOpened) {
+                read_open_session();
+            }
+            read_data();
+            break;
+        case TCPSOCKET_CONTIMEOUT:
+            printf("New TCPSocketEvent: TCPSOCKET_CONTIMEOUT\r\n");
+            break;
+        case TCPSOCKET_CONRST:
+            printf("New TCPSocketEvent: TCPSOCKET_CONRST\r\n");
+            break;
+        case TCPSOCKET_CONABRT:
+            printf("New TCPSocketEvent: TCPSOCKET_CONABRT\r\n");
+            break;
+        case TCPSOCKET_ERROR:
+            printf("New TCPSocketEvent: TCPSOCKET_ERROR\r\n");
+            break;
+        case TCPSOCKET_DISCONNECTED:
+            printf("New TCPSocketEvent: TCPSOCKET_DISCONNECTED\r\n");
+            pTCPSocket->close();
+            connected = false;
+            break;
+    }
+}