result of success : 1 to 0, max msg size: 127B to 16383B, fixed existing bugs
Fork of MQTTClient by
Diff: MQTTClient.cpp
- Revision:
- 10:6fa55ad359f0
- Parent:
- 9:a0e39cea763a
- Child:
- 11:2a8c86bfdeca
diff -r a0e39cea763a -r 6fa55ad359f0 MQTTClient.cpp --- a/MQTTClient.cpp Thu Apr 05 07:50:30 2012 +0000 +++ b/MQTTClient.cpp Mon Jan 04 07:24:12 2016 +0000 @@ -46,53 +46,35 @@ MQTTClient::MQTTClient(IpAddr server, int port, void (*callback)(char*, char*)) { this->port = port; callback_server = callback; - serverIp = server; + + host.setPort(port); + host.setIp(server); + host.setName("localhost"); + this->userName = NULL; this->password = NULL; connected = false; sessionOpened = false; + + pTCPSocket = NULL; timer.start(); } - -/** MQTT initialisation method - * - * Used when default constructor used and need to specify parameters at runtime - * - * @param server The IP address of the server to connect to - * @param port The TCP/IP port on the server to connect to - * @param callback Callback function to handle subscription to topics - */ -void MQTTClient::init(IpAddr *server, int port, void (*callback)(char*, char*)) { +MQTTClient::MQTTClient(IpAddr server, int port, char *id, char *userName, char *password, void (*callback)(char*, char*)) { this->port = port; callback_server = callback; - serverIp = *server; - this->userName = NULL; - this->password = NULL; + + host.setPort(port); + host.setIp(server); + host.setName("localhost"); + + this->userName = userName; + this->password = password; + clientId = id; connected = false; sessionOpened = false; - timer.start(); -} - -/** A brief description of the function foo - * - * More details about the function goes here - * and here - * - * @param server The IP address of the server to connect to - * @param port The TCP/IP port on the server to connect to - * @param userName Pointer to username string, zero terminated. Null for not used - * @param password Pointer to password string, zero terminated. Null for not used - * @param callback Callback function to handle subscription to topics - */ -void MQTTClient::init(IpAddr *server, int port, char *userName, char *password, void (*callback)(char*, char*)) { - this->port = port; - callback_server = callback; - serverIp = *server; - this->userName = userName; - this->password = password; - connected = false; - sessionOpened = false; + + pTCPSocket = NULL; timer.start(); } @@ -100,11 +82,20 @@ * * @param msg message string * @param size Size of the message to send - * @returns value to indicate message sent successfully or not, -1 for error, 1 for success. + * @returns value to indicate message sent successfully or not, -1 for error, 0 for success. */ int MQTTClient::send_data(const char* msg, int size) { + writtable = false; + +/* printf("send_data length %d\n", size); + for (int i = 0; i < size; i++) { + printf("%x, ", msg[i]); + } + printf("\r\n"); */ + int transLen = pTCPSocket->send(msg, size); - + pool(); + /*Check send length matches the message length*/ if (transLen != size) { for (int i = 0; i < size; i++) { @@ -113,17 +104,18 @@ printf("Error on send.\r\n"); return -1; } - return 1; + + return 0; } /** Start a MQTT session, build CONNECT packet * * @param id The client name shown on MQTT server. - * @returns -1 for error, 1 for success + * @returns -1 for error, 0 for success */ int MQTTClient::open_session(char* id) { /*variable header*/ - char var_header[] = {0x00,0x06,0x4d,0x51,0x49,0x73,0x64,0x70,0x03,0x02,0x00,KEEPALIVE/500,0x00,strlen(id)}; + char var_header[] = {0x00,0x06,0x4d,0x51,0x49,0x73,0x64,0x70,0x03,0x02,0x00,keepalive/500,0x00,strlen(id)}; /*fixed header: 2 bytes, big endian*/ char fixed_header[] = {MQTTCONNECT,12+strlen(id)+2}; @@ -132,8 +124,8 @@ var_header[9] |= (usernameLen > 0 ? 0x80 : 0x00 ); var_header[9] |= (passwordLen > 0 ? 0x40 : 0x00 ); -// printf("fixed %d, var %d id %d, username %d, password %d\n",sizeof(fixed_header), sizeof(var_header), sizeof(id), usernameLen, passwordLen ); - char packet[sizeof(fixed_header) + sizeof(var_header) + sizeof(id) + 6 + usernameLen + (usernameLen > 0 ? 2 : 0) + passwordLen + (passwordLen > 0 ? 2 : 0) ]; + printf("fixed %d, var %d id %d, username %d, password %d\n",sizeof(fixed_header), sizeof(var_header), strlen(id), usernameLen, passwordLen ); + char packet[sizeof(fixed_header) + sizeof(var_header) + strlen(id) + usernameLen + (usernameLen > 0 ? 2 : 0) + passwordLen + (passwordLen > 0 ? 2 : 0) ]; memset(packet,0,sizeof(packet)); memcpy(packet,fixed_header,sizeof(fixed_header)); @@ -152,34 +144,38 @@ packet[1] += (passwordLen + 2); } - /* printf("Packet length %d\n", sizeof(packet)); +/* printf("Packet length %d\n", sizeof(packet)); for (int i = 0; i < sizeof(packet); i++) { printf("%x, ", packet[i]); } - printf("\r\n"); - */ - if (!send_data(packet, sizeof(packet))) { + printf("\r\n");*/ + + if (send_data(packet, sizeof(packet))) { return -1; } -// printf("Sent\n"); - return 1; + printf("Sent\n"); + return 0; } /** Open TCP port, connect to server on given IP address. * + * @returns -1: If connect to server failed. -2: Failed to open session on server. 0: Connection accessed. + */ +int MQTTClient::connect() { + return connect(clientId); +} + +/** Open TCP port, connect to server on given IP address. + * * @param id The client name shown on MQTT server. - * @returns -1: If connect to server failed. -2: Failed to open session on server. 1: Connection accessed. + * @returns -1: If connect to server failed. -2: Failed to open session on server. 0: Connection accessed. */ int MQTTClient::connect(char* id) { - clientId = id; - /*Initial TCP socket*/ - pTCPSocket = new TCPSocket; - pTCPSocket->setOnEvent(this, &MQTTClient::onTCPSocketEvent); - - host.setPort(port); - host.setIp(serverIp); - host.setName("localhost"); + if(!pTCPSocket) { + pTCPSocket = new TCPSocket; + pTCPSocket->setOnEvent(this, &MQTTClient::onTCPSocketEvent); + } /*Trying to connect to host*/ printf("Trying to connect to host..\r\n\r\n"); @@ -205,7 +201,10 @@ } /*Send open session message to server*/ - open_session(id); + if(open_session(id)) { + printf("Error open session [%s]\r\n", id); + return -1; + } /*Wait server notice of open sesion*/ i = 0; @@ -217,38 +216,49 @@ } printf("Wait for session %d..\r\n", i); } - if (!connected) { +// if (!connected) { wakwak_koba + if (!connected || !sessionOpened) { return -2; } lastActivity = timer.read_ms(); - return 1; + return 0; } /** Publish a message on a topic. * * @param pub_topic The topic name the massage will be publish on. * @param msg The massage to be published. - * @returns -1: Failed to publish message. 1: Publish sucessed. + * @returns -1: Failed to publish message. 0: Publish sucessed. */ -int MQTTClient::publish(char* pub_topic, char* msg) { - uint8_t var_header_pub[strlen(pub_topic)+3]; +int MQTTClient::publish(char* pub_topic, char* msg, bool retain) { + uint8_t var_header_pub[strlen(pub_topic)+2]; strcpy((char *)&var_header_pub[2], pub_topic); var_header_pub[0] = 0; var_header_pub[1] = strlen(pub_topic); - var_header_pub[sizeof(var_header_pub)-1] = 0; + +// uint8_t fixed_header_pub[] = {MQTTPUBLISH,sizeof(var_header_pub)+strlen(msg)}; + int var_len = sizeof(var_header_pub)+strlen(msg); + uint8_t fixed_header_pub[1 + (var_len < 128 ? 1 : 2)]; + fixed_header_pub[0] = MQTTPUBLISH | (retain ? 0x01 : 0x00); - uint8_t fixed_header_pub[] = {MQTTPUBLISH,sizeof(var_header_pub)+strlen(msg)}; - + if(var_len < 128) + fixed_header_pub[1] = var_len; + else { + fixed_header_pub[1] = (var_len % 0x80) | 0x80; + fixed_header_pub[2] = (var_len / 0x80); + } + uint8_t packet_pub[sizeof(fixed_header_pub)+sizeof(var_header_pub)+strlen(msg)]; memset(packet_pub,0,sizeof(packet_pub)); memcpy(packet_pub,fixed_header_pub,sizeof(fixed_header_pub)); memcpy(packet_pub+sizeof(fixed_header_pub),var_header_pub,sizeof(var_header_pub)); memcpy(packet_pub+sizeof(fixed_header_pub)+sizeof(var_header_pub),msg,strlen(msg)); - if (!send_data((char*)packet_pub, sizeof(packet_pub))) { + if (send_data((char*)packet_pub, sizeof(packet_pub))) { return -1; } - return 1; + + return 0; } /** Disconnect from server @@ -257,6 +267,9 @@ char packet_224[] = {0xe0, 0x00}; send_data((char*)packet_224, 2); + pTCPSocket->close(); + delete pTCPSocket; + pTCPSocket = NULL; connected = false; } @@ -274,12 +287,12 @@ buffer[len] = '\0'; - printf("Read length: %d %d\r\n", len, readLen); +/* printf("Read length: %d %d\r\n", len, readLen); for (int i = 0; i < len; i++) { printf("%2X ", buffer[i]); } - printf("\r\n"); + printf("\r\n"); */ char type = buffer[0]>>4; if ( type == 2 ) { // CONNACK @@ -336,7 +349,7 @@ /** Subscribe to a topic * * @param topic The topic name to be subscribed. - * @returns -1: Failed to subscribe to topic. 1: Subscribe sucessed. + * @returns -1: Failed to subscribe to topic. 0: Subscribe sucessed. */ int MQTTClient::subscribe(char* topic) { @@ -358,10 +371,10 @@ memcpy(packet_topic+sizeof(fixed_header_topic),var_header_topic,sizeof(var_header_topic)); memcpy(packet_topic+sizeof(fixed_header_topic)+sizeof(var_header_topic),utf_topic,sizeof(utf_topic)); - if (!send_data(packet_topic, sizeof(packet_topic))) { + if (send_data(packet_topic, sizeof(packet_topic))) { return -1; } - return 1; + return 0; } return -1; } @@ -372,9 +385,9 @@ void MQTTClient::live() { if (connected) { int t = timer.read_ms(); - if (t - lastActivity > KEEPALIVE) { + if (t - lastActivity > keepalive) { //Send 192 0 to broker - printf("Send 192\r\n"); + //printf("Send 192\r\n"); char packet_192[] = {0xc0, 0x00}; send_data((char*)packet_192, 2); lastActivity = t; @@ -382,6 +395,10 @@ } } +void MQTTClient::pool() { + Net::poll(); +} + /** TCP Socket event handling * * @param e Event object @@ -396,7 +413,8 @@ connected = true; break; case TCPSOCKET_WRITEABLE: - printf("New TCPSocketEvent: TCPSOCKET_WRITEABLE\r\n"); +// printf("New TCPSocketEvent: TCPSOCKET_WRITEABLE\r\n"); + writtable = true; break; case TCPSOCKET_READABLE: printf("New TCPSocketEvent: TCPSOCKET_READABLE\r\n");