MQTTClient
Dependents: IoTGateway_Basic test
Diff: MQTTClient.cpp
- Revision:
- 0:260fb10c0755
- Child:
- 1:a50b6c866a3b
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/MQTTClient.cpp Tue Mar 20 20:53:22 2012 +0000 @@ -0,0 +1,352 @@ +/* +MQTTClient.cpp +Based on MQTTClient from http://ceit.uq.edu.au/content/mqttclient-mbed-version-20 +A simple MQTT client for mbed, version 2.0 +By Yilun FAN, @CEIT, @JAN 2011 + +Bug fixes and additions by Andrew Lindsay (andrew [at] thiseldo [dot] co [dot] uk) + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#include "MQTTClient.h" + +MQTTClient::MQTTClient(IpAddr server, int port, void (*callback)(char*, char*)) { + this->port = port; + callback_server = callback; + serverIp = server; + connected = false; + sessionOpened = false; + timer.start(); +} + +MQTTClient::MQTTClient() {} + +MQTTClient::~MQTTClient() {} + + +void MQTTClient::init(IpAddr *server, int port, void (*callback)(char*, char*)) { + this->port = port; + callback_server = callback; + serverIp = *server; + this->userName = NULL; + this->password = NULL; + connected = false; + sessionOpened = false; + timer.start(); +} + +void MQTTClient::init(IpAddr *server, int port, char *userName, char *password, void (*callback)(char*, char*)) { + this->port = port; + callback_server = callback; + serverIp = *server; + this->userName = userName; + this->password = password; + connected = false; + sessionOpened = false; + timer.start(); +} + +int MQTTClient::send_data(const char* msg, int size) { + int transLen = pTCPSocket->send(msg, size); + + /*Check send length matches the message length*/ + if (transLen != size) { + for (int i = 0; i < size; i++) { + printf("%x, ", msg[i]); + } + printf("Error on send.\r\n"); + return -1; + } + return 1; +} + +int MQTTClient::open_session(char* id) { + /*variable header*/ + char var_header[] = {0x00,0x06,0x4d,0x51,0x49,0x73,0x64,0x70,0x03,0x02,0x00,KEEPALIVE/500,0x00,strlen(id)}; + + /*fixed header: 2 bytes, big endian*/ + char fixed_header[] = {MQTTCONNECT,12+strlen(id)+2}; + short usernameLen = strlen( userName ); + short passwordLen = strlen( password ); + var_header[9] |= (usernameLen > 0 ? 0x80 : 0x00 ); + var_header[9] |= (passwordLen > 0 ? 0x40 : 0x00 ); + +// printf("fixed %d, var %d id %d, username %d, password %d\n",sizeof(fixed_header), sizeof(var_header), sizeof(id), usernameLen, passwordLen ); + char packet[sizeof(fixed_header) + sizeof(var_header) + sizeof(id) + usernameLen + (usernameLen > 0 ? 2 : 0) + passwordLen + (passwordLen > 0 ? 2 : 0) ]; + + memset(packet,0,sizeof(packet)); + memcpy(packet,fixed_header,sizeof(fixed_header)); + memcpy(packet+sizeof(fixed_header),var_header,sizeof(var_header)); + memcpy(packet+sizeof(fixed_header)+sizeof(var_header), id, strlen(id)); + if ( usernameLen > 0 ) { + packet[sizeof(fixed_header)+sizeof(var_header)+strlen(id)] = 0x00; + packet[sizeof(fixed_header)+sizeof(var_header)+strlen(id)+1] = usernameLen; + memcpy(packet+sizeof(fixed_header)+sizeof(var_header)+strlen(id)+2, userName, usernameLen); + packet[1] += (usernameLen + 2); + } + if ( passwordLen > 0 ) { + packet[sizeof(fixed_header)+sizeof(var_header)+strlen(id) + usernameLen + (usernameLen > 0 ? 2 : 0) ] = 0x00; + packet[sizeof(fixed_header)+sizeof(var_header)+strlen(id) + usernameLen + (usernameLen > 0 ? 2 : 0)+1] = passwordLen; + memcpy(packet+sizeof(fixed_header)+sizeof(var_header)+strlen(id)+ usernameLen + (usernameLen > 0 ? 2 : 0)+2, password, passwordLen); + packet[1] += (passwordLen + 2); + } + + /* printf("Packet length %d\n", sizeof(packet)); + for (int i = 0; i < sizeof(packet); i++) { + printf("%x, ", packet[i]); + } + printf("\r\n"); + */ + if (!send_data(packet, sizeof(packet))) { + return -1; + } +// printf("Sent\n"); + return 1; +} + +int MQTTClient::connect(char* id) { + clientId = id; + + /*Initial TCP socket*/ + pTCPSocket = new TCPSocket; + pTCPSocket->setOnEvent(this, &MQTTClient::onTCPSocketEvent); + + host.setPort(port); + host.setIp(serverIp); + host.setName("localhost"); + + /*Trying to connect to host*/ + printf("Trying to connect to host..\r\n\r\n"); + TCPSocketErr err = pTCPSocket->connect(host); + + Net::poll(); + if (err) { + printf("Error connecting to host [%d]\r\n", (int) err); + return -1; + } + printf("Connect to host sucessed..\r\n\r\n"); + + /*Wait TCP connection with server to be established*/ + int i = 0; + while (!connected) { + Net::poll(); + wait(1); + i++; + printf("Wait for connections %d..\r\n", i); + if (i == 35) {//If wait too long, give up. + return -1; + } + } + + /*Send open session message to server*/ + open_session(id); + + /*Wait server notice of open sesion*/ + while (!sessionOpened) { + Net::poll(); + wait(1); + if (!connected) { + break; + } + printf("Wait for session..\r\n"); + } + if (!connected) { + return -2; + } + lastActivity = timer.read_ms(); + return 1; +} + +int MQTTClient::publish(char* pub_topic, char* msg) { + uint8_t var_header_pub[strlen(pub_topic)+3]; + strcpy((char *)&var_header_pub[2], pub_topic); + var_header_pub[0] = 0; + var_header_pub[1] = strlen(pub_topic); + var_header_pub[sizeof(var_header_pub)-1] = 0; + + uint8_t fixed_header_pub[] = {MQTTPUBLISH,sizeof(var_header_pub)+strlen(msg)}; + + uint8_t packet_pub[sizeof(fixed_header_pub)+sizeof(var_header_pub)+strlen(msg)]; + memset(packet_pub,0,sizeof(packet_pub)); + memcpy(packet_pub,fixed_header_pub,sizeof(fixed_header_pub)); + memcpy(packet_pub+sizeof(fixed_header_pub),var_header_pub,sizeof(var_header_pub)); + memcpy(packet_pub+sizeof(fixed_header_pub)+sizeof(var_header_pub),msg,strlen(msg)); + + if (!send_data((char*)packet_pub, sizeof(packet_pub))) { + return -1; + } + return 1; +} + + +void MQTTClient::disconnect() { + char packet_224[] = {0xe0, 0x00}; + send_data((char*)packet_224, 2); + + connected = false; +} + +void MQTTClient::read_data() { + char buffer[1024]; + int len = 0, readLen; + + while ((readLen = pTCPSocket->recv(buffer, 1024)) != 0) { + len += readLen; + } + + buffer[len] = '\0'; + + + printf("Read length: %d %d\r\n", len, readLen); + + for (int i = 0; i < len; i++) { + printf("%2X ", buffer[i]); + } + printf("\r\n"); + + char type = buffer[0]>>4; + if ( type == 2 ) { // CONNACK + printf("CONNACK\n"); + + } else if (type == 3) { // PUBLISH + if (callback_server) { + short index = 1; + short multiplier = 1; + short value = 0; + uint8_t digit; + do { + digit = buffer[index++]; + value += (digit & 127) * multiplier; + multiplier *= 128; + } while ((digit & 128) != 0); + printf( "variable length %d, index %d\n", value, index ); + +// uint8_t tl = (buffer[2]<<3)+buffer[3]; + uint8_t tl = (buffer[index]<<3)+buffer[index+1]; + printf("Topic len %d\n",tl); + char topic[tl+1]; + for (int i=0; i<tl; i++) { + topic[i] = buffer[index+2+i]; + } + topic[tl] = 0; + // ignore msgID - only support QoS 0 subs + char *payload = buffer+index+2+tl; + callback_server(topic,(char*)payload); + } + } else if (type == 12) { // PINGREG -- Ask for alive + char packet_208[] = {0xd0, 0x00}; + send_data((char*)packet_208, 2); + lastActivity = timer.read_ms(); + } +} + +void MQTTClient::read_open_session() { + char buffer[32]; + int len = 0, readLen; + + while ((readLen = pTCPSocket->recv(buffer, 32)) != 0) { + len += readLen; + } + + if (len == 4 && buffer[3] == 0) { + printf("Session opened\r\n"); + sessionOpened = true; + } +} + +int MQTTClient::subscribe(char* topic) { + + if (connected) { + uint8_t var_header_topic[] = {0,10}; + uint8_t fixed_header_topic[] = {MQTTSUBSCRIBE,sizeof(var_header_topic)+strlen(topic)+3}; + + //utf topic + uint8_t utf_topic[strlen(topic)+3]; + strcpy((char *)&utf_topic[2], topic); + + utf_topic[0] = 0; + utf_topic[1] = strlen(topic); + utf_topic[sizeof(utf_topic)-1] = 0; + + char packet_topic[sizeof(var_header_topic)+sizeof(fixed_header_topic)+strlen(topic)+3]; + memset(packet_topic,0,sizeof(packet_topic)); + memcpy(packet_topic,fixed_header_topic,sizeof(fixed_header_topic)); + memcpy(packet_topic+sizeof(fixed_header_topic),var_header_topic,sizeof(var_header_topic)); + memcpy(packet_topic+sizeof(fixed_header_topic)+sizeof(var_header_topic),utf_topic,sizeof(utf_topic)); + + if (!send_data(packet_topic, sizeof(packet_topic))) { + return -1; + } + return 1; + } + return -1; +} + +void MQTTClient::live() { + if (connected) { + int t = timer.read_ms(); + if (t - lastActivity > KEEPALIVE) { + //Send 192 0 to broker + printf("Send 192\r\n"); + char packet_192[] = {0xc0, 0x00}; + send_data((char*)packet_192, 2); + lastActivity = t; + } + } +} + +void MQTTClient::onTCPSocketEvent(TCPSocketEvent e) { + switch (e) { + case TCPSOCKET_ACCEPT: + printf("New TCPSocketEvent: TCPSOCKET_ACCEPT\r\n"); + break; + case TCPSOCKET_CONNECTED: + printf("New TCPSocketEvent: TCPSOCKET_CONNECTED\r\n"); + connected = true; + break; + case TCPSOCKET_WRITEABLE: + printf("New TCPSocketEvent: TCPSOCKET_WRITEABLE\r\n"); + break; + case TCPSOCKET_READABLE: + printf("New TCPSocketEvent: TCPSOCKET_READABLE\r\n"); + if (!sessionOpened) { + read_open_session(); + } + read_data(); + break; + case TCPSOCKET_CONTIMEOUT: + printf("New TCPSocketEvent: TCPSOCKET_CONTIMEOUT\r\n"); + break; + case TCPSOCKET_CONRST: + printf("New TCPSocketEvent: TCPSOCKET_CONRST\r\n"); + break; + case TCPSOCKET_CONABRT: + printf("New TCPSocketEvent: TCPSOCKET_CONABRT\r\n"); + break; + case TCPSOCKET_ERROR: + printf("New TCPSocketEvent: TCPSOCKET_ERROR\r\n"); + break; + case TCPSOCKET_DISCONNECTED: + printf("New TCPSocketEvent: TCPSOCKET_DISCONNECTED\r\n"); + pTCPSocket->close(); + connected = false; + break; + } +}