result of success : 1 to 0, max msg size: 127B to 16383B, fixed existing bugs

Fork of MQTTClient by wakwak koba

Revision:
10:6fa55ad359f0
Parent:
9:a0e39cea763a
Child:
11:2a8c86bfdeca
diff -r a0e39cea763a -r 6fa55ad359f0 MQTTClient.cpp
--- a/MQTTClient.cpp	Thu Apr 05 07:50:30 2012 +0000
+++ b/MQTTClient.cpp	Mon Jan 04 07:24:12 2016 +0000
@@ -46,53 +46,35 @@
 MQTTClient::MQTTClient(IpAddr server, int port, void (*callback)(char*, char*)) {
     this->port = port;
     callback_server = callback;
-    serverIp = server;
+
+    host.setPort(port);
+    host.setIp(server);
+    host.setName("localhost");
+
     this->userName = NULL;
     this->password = NULL;
     connected = false;
     sessionOpened = false;
+    
+    pTCPSocket = NULL;
     timer.start();
 }
 
-
-/** MQTT initialisation method
- *
- * Used when default constructor used and need to specify parameters at runtime
- *
- * @param server The IP address of the server to connect to
- * @param port   The TCP/IP port on the server to connect to
- * @param callback Callback function to handle subscription to topics
- */
-void MQTTClient::init(IpAddr *server, int port, void (*callback)(char*, char*)) {
+MQTTClient::MQTTClient(IpAddr server, int port, char *id, char *userName, char *password, void (*callback)(char*, char*)) {
     this->port = port;
     callback_server = callback;
-    serverIp = *server;
-    this->userName = NULL;
-    this->password = NULL;
+
+    host.setPort(port);
+    host.setIp(server);
+    host.setName("localhost");
+
+    this->userName = userName;
+    this->password = password;
+    clientId = id;
     connected = false;
     sessionOpened = false;
-    timer.start();
-}
-
-/** A brief description of the function foo
- *
- * More details about the function goes here
- * and here
- *
- * @param server The IP address of the server to connect to
- * @param port   The TCP/IP port on the server to connect to
- * @param userName Pointer to username string, zero terminated. Null for not used
- * @param password Pointer to password string, zero terminated. Null for not used
- * @param callback Callback function to handle subscription to topics
- */
-void MQTTClient::init(IpAddr *server, int port, char *userName, char *password, void (*callback)(char*, char*)) {
-    this->port = port;
-    callback_server = callback;
-    serverIp = *server;
-    this->userName = userName;
-    this->password = password;
-    connected = false;
-    sessionOpened = false;
+    
+    pTCPSocket = NULL;
     timer.start();
 }
 
@@ -100,11 +82,20 @@
  *
  * @param msg  message string
  * @param size Size of the message to send
- * @returns value to indicate message sent successfully or not, -1 for error, 1 for success.
+ * @returns value to indicate message sent successfully or not, -1 for error, 0 for success.
  */
 int MQTTClient::send_data(const char* msg, int size) {
+    writtable = false;
+
+/*    printf("send_data length %d\n", size);
+    for (int i = 0; i < size; i++) {
+        printf("%x, ", msg[i]);
+    }
+    printf("\r\n"); */
+
     int transLen = pTCPSocket->send(msg, size);
-
+    pool();
+    
     /*Check send length matches the message length*/
     if (transLen != size) {
         for (int i = 0; i < size; i++) {
@@ -113,17 +104,18 @@
         printf("Error on send.\r\n");
         return -1;
     }
-    return 1;
+    
+    return 0;
 }
 
 /** Start a MQTT session, build CONNECT packet
  *
  * @param id The client name shown on MQTT server.
- * @returns -1 for error, 1 for success
+ * @returns -1 for error, 0 for success
  */
 int MQTTClient::open_session(char* id) {
     /*variable header*/
-    char var_header[] = {0x00,0x06,0x4d,0x51,0x49,0x73,0x64,0x70,0x03,0x02,0x00,KEEPALIVE/500,0x00,strlen(id)};
+    char var_header[] = {0x00,0x06,0x4d,0x51,0x49,0x73,0x64,0x70,0x03,0x02,0x00,keepalive/500,0x00,strlen(id)};
 
     /*fixed header: 2 bytes, big endian*/
     char fixed_header[] = {MQTTCONNECT,12+strlen(id)+2};
@@ -132,8 +124,8 @@
     var_header[9] |= (usernameLen > 0 ? 0x80 : 0x00 );
     var_header[9] |= (passwordLen > 0 ? 0x40 : 0x00 );
 
-//    printf("fixed %d, var %d id %d, username %d, password %d\n",sizeof(fixed_header), sizeof(var_header), sizeof(id), usernameLen, passwordLen );
-    char packet[sizeof(fixed_header) + sizeof(var_header) + sizeof(id) + 6 + usernameLen + (usernameLen > 0 ? 2 : 0) + passwordLen + (passwordLen > 0 ? 2 : 0) ];
+    printf("fixed %d, var %d id %d, username %d, password %d\n",sizeof(fixed_header), sizeof(var_header), strlen(id), usernameLen, passwordLen );
+    char packet[sizeof(fixed_header) + sizeof(var_header) + strlen(id) + usernameLen + (usernameLen > 0 ? 2 : 0) + passwordLen + (passwordLen > 0 ? 2 : 0) ];
 
     memset(packet,0,sizeof(packet));
     memcpy(packet,fixed_header,sizeof(fixed_header));
@@ -152,34 +144,38 @@
         packet[1] += (passwordLen + 2);
     }
 
-    /*    printf("Packet length %d\n", sizeof(packet));
+/*        printf("Packet length %d\n", sizeof(packet));
         for (int i = 0; i < sizeof(packet); i++) {
             printf("%x, ", packet[i]);
         }
-        printf("\r\n");
-    */
-    if (!send_data(packet, sizeof(packet))) {
+        printf("\r\n");*/
+    
+    if (send_data(packet, sizeof(packet))) {
         return -1;
     }
-//    printf("Sent\n");
-    return 1;
+    printf("Sent\n");
+    return 0;
 }
 
 /** Open TCP port, connect to server on given IP address.
  *
+ * @returns -1: If connect to server failed. -2: Failed to open session on server.  0: Connection accessed.
+ */
+int MQTTClient::connect() {
+    return connect(clientId);
+}    
+
+/** Open TCP port, connect to server on given IP address.
+ *
  * @param id The client name shown on MQTT server.
- * @returns -1: If connect to server failed. -2: Failed to open session on server.  1: Connection accessed.
+ * @returns -1: If connect to server failed. -2: Failed to open session on server.  0: Connection accessed.
  */
 int MQTTClient::connect(char* id) {
-    clientId = id;
-
     /*Initial TCP socket*/
-    pTCPSocket = new TCPSocket;
-    pTCPSocket->setOnEvent(this, &MQTTClient::onTCPSocketEvent);
-
-    host.setPort(port);
-    host.setIp(serverIp);
-    host.setName("localhost");
+    if(!pTCPSocket) {
+        pTCPSocket = new TCPSocket;
+        pTCPSocket->setOnEvent(this, &MQTTClient::onTCPSocketEvent);
+    }        
 
     /*Trying to connect to host*/
     printf("Trying to connect to host..\r\n\r\n");
@@ -205,7 +201,10 @@
     }
 
     /*Send open session message to server*/
-    open_session(id);
+    if(open_session(id))    {
+        printf("Error open session [%s]\r\n", id);
+        return -1;            
+    }
 
     /*Wait server notice of open sesion*/
     i = 0;
@@ -217,38 +216,49 @@
         }
         printf("Wait for session %d..\r\n", i);
     }
-    if (!connected) {
+//  if (!connected) {       wakwak_koba
+    if (!connected || !sessionOpened)   {
         return -2;
     }
     lastActivity = timer.read_ms();
-    return 1;
+    return 0;
 }
 
 /** Publish a message on a topic.
  *
  * @param pub_topic  The topic name the massage will be publish on.
  * @param msg        The massage to be published.
- * @returns  -1: Failed to publish message. 1: Publish sucessed.
+ * @returns  -1: Failed to publish message. 0: Publish sucessed.
  */
-int MQTTClient::publish(char* pub_topic, char* msg) {
-    uint8_t var_header_pub[strlen(pub_topic)+3];
+int MQTTClient::publish(char* pub_topic, char* msg, bool retain) {
+    uint8_t var_header_pub[strlen(pub_topic)+2];
     strcpy((char *)&var_header_pub[2], pub_topic);
     var_header_pub[0] = 0;
     var_header_pub[1] = strlen(pub_topic);
-    var_header_pub[sizeof(var_header_pub)-1] = 0;
+
+//  uint8_t fixed_header_pub[] = {MQTTPUBLISH,sizeof(var_header_pub)+strlen(msg)};
+    int  var_len = sizeof(var_header_pub)+strlen(msg);
+    uint8_t fixed_header_pub[1 + (var_len < 128 ? 1 : 2)];
+    fixed_header_pub[0] = MQTTPUBLISH | (retain ? 0x01 : 0x00);
 
-    uint8_t fixed_header_pub[] = {MQTTPUBLISH,sizeof(var_header_pub)+strlen(msg)};
-
+    if(var_len < 128)
+        fixed_header_pub[1] = var_len;
+    else    {
+        fixed_header_pub[1] = (var_len % 0x80) | 0x80;
+        fixed_header_pub[2] = (var_len / 0x80);
+    }
+    
     uint8_t packet_pub[sizeof(fixed_header_pub)+sizeof(var_header_pub)+strlen(msg)];
     memset(packet_pub,0,sizeof(packet_pub));
     memcpy(packet_pub,fixed_header_pub,sizeof(fixed_header_pub));
     memcpy(packet_pub+sizeof(fixed_header_pub),var_header_pub,sizeof(var_header_pub));
     memcpy(packet_pub+sizeof(fixed_header_pub)+sizeof(var_header_pub),msg,strlen(msg));
 
-    if (!send_data((char*)packet_pub, sizeof(packet_pub))) {
+    if (send_data((char*)packet_pub, sizeof(packet_pub))) {
         return -1;
     }
-    return 1;
+    
+    return 0;
 }
 
 /** Disconnect from server
@@ -257,6 +267,9 @@
     char packet_224[] = {0xe0, 0x00};
     send_data((char*)packet_224, 2);
 
+    pTCPSocket->close();
+    delete pTCPSocket;
+    pTCPSocket = NULL;
     connected = false;
 }
 
@@ -274,12 +287,12 @@
     buffer[len] = '\0';
 
 
-    printf("Read length: %d %d\r\n", len, readLen);
+/*    printf("Read length: %d %d\r\n", len, readLen);
 
     for (int i = 0; i < len; i++) {
         printf("%2X ", buffer[i]);
     }
-    printf("\r\n");
+    printf("\r\n"); */
 
     char type = buffer[0]>>4;
     if ( type == 2 ) { // CONNACK
@@ -336,7 +349,7 @@
 /** Subscribe to a topic
  *
  * @param topic The topic name to be subscribed.
- * @returns  -1: Failed to subscribe to topic. 1: Subscribe sucessed.
+ * @returns  -1: Failed to subscribe to topic. 0: Subscribe sucessed.
  */
 int MQTTClient::subscribe(char* topic) {
 
@@ -358,10 +371,10 @@
         memcpy(packet_topic+sizeof(fixed_header_topic),var_header_topic,sizeof(var_header_topic));
         memcpy(packet_topic+sizeof(fixed_header_topic)+sizeof(var_header_topic),utf_topic,sizeof(utf_topic));
 
-        if (!send_data(packet_topic, sizeof(packet_topic))) {
+        if (send_data(packet_topic, sizeof(packet_topic))) {
             return -1;
         }
-        return 1;
+        return 0;
     }
     return -1;
 }
@@ -372,9 +385,9 @@
 void MQTTClient::live() {
     if (connected) {
         int t = timer.read_ms();
-        if (t - lastActivity > KEEPALIVE) {
+        if (t - lastActivity > keepalive) {
             //Send 192 0 to broker
-            printf("Send 192\r\n");
+            //printf("Send 192\r\n");
             char packet_192[] = {0xc0, 0x00};
             send_data((char*)packet_192, 2);
             lastActivity = t;
@@ -382,6 +395,10 @@
     }
 }
 
+void MQTTClient::pool()  {
+    Net::poll();
+}
+
 /** TCP Socket event handling
  *
  * @param e  Event object
@@ -396,7 +413,8 @@
             connected = true;
             break;
         case TCPSOCKET_WRITEABLE:
-            printf("New TCPSocketEvent: TCPSOCKET_WRITEABLE\r\n");
+//            printf("New TCPSocketEvent: TCPSOCKET_WRITEABLE\r\n");
+            writtable = true;
             break;
         case TCPSOCKET_READABLE:
             printf("New TCPSocketEvent: TCPSOCKET_READABLE\r\n");