able to subscribe for >10hrs and still running

Dependencies:   ADE7758_v1 Crypto DHT11 MQTT MbedJSONValue SDFileSystem SPI_TFT_ILI9341 SWSPI SetRTC TFT_fonts Touch W5500Interface mbed-rtos mbed-src tuanpm

Fork of PB_emma_controller_mbed_src by Emma

Revision:
28:cd25d46cb141
Parent:
27:259aaa249619
--- a/emmaCode.cpp	Wed Aug 05 10:07:18 2015 +0000
+++ b/emmaCode.cpp	Fri Aug 07 02:56:53 2015 +0000
@@ -330,7 +330,7 @@
                     
                     parse(jsonValue,str.c_str());
                         
-                    char *parameter[2] = {"wifiSSID","wifiPASS"};
+                    const char *parameter[2] = {"wifiSSID","wifiPASS"};
                         
                     for(int i=0; i<2; i++) {
                         if(jsonValue.hasMember(parameter[i])) {
@@ -447,7 +447,7 @@
                             
                                 parse(jsonValue,str.c_str());
                         
-                                char *parameter[5] = {"gprsAPN","proxySERVER","proxyPORT","proxyAUTH","epochTime"};
+                                const char *parameter[5] = {"gprsAPN","proxySERVER","proxyPORT","proxyAUTH","epochTime"};
                         
                                 for(int i=0; i<4; i++) {
                                     if(jsonValue.hasMember(parameter[i])) {
@@ -507,7 +507,7 @@
                             
                     parse(jsonValue,str.c_str());
                         
-                    char *parameter[5] = {"gprsAPN","proxySERVER","proxyPORT","proxyAUTH","epochTime"};
+                    const char *parameter[5] = {"gprsAPN","proxySERVER","proxyPORT","proxyAUTH","epochTime"};
                         
                     for(int i=0; i<4; i++) {
                         if(jsonValue.hasMember(parameter[i])) {
@@ -633,7 +633,7 @@
         
                 parse(jsonValue,str.c_str());
     
-                char *parameter[4] = {"platformDOMAIN","platformKEY","platformSECRET","registrationKey"};
+                const char *parameter[4] = {"platformDOMAIN","platformKEY","platformSECRET","registrationKey"};
     
                 //save platform parameter
                 writeSetting(parameter[0],"()");    //sd card need to be initialized
@@ -771,7 +771,7 @@
         
                     parse(jsonValue,str.c_str());
     
-                    char *parameter[4] = {"platformDOMAIN","platformKEY","platformSECRET","registrationKey"};
+                    const char *parameter[4] = {"platformDOMAIN","platformKEY","platformSECRET","registrationKey"};
     
                     //save platform parameter
                     writeSetting(parameter[0],"()");    //sd card need to be initialized
@@ -882,7 +882,6 @@
 }
 
 void emmaModeOperation(void) {
-    char *parameter[5] = {"\"id\"","\"nType\"","\"nAddr\"","\"dType\"","\"cmd\""};
     char mqttClientId[32];
     char p[64];
     char q[32];
@@ -943,7 +942,7 @@
     //sprintf(s,"emma-%s",emmaUID.c_str());   
     //hmac = calculateMD5(s);
     //DBG.printf("hmac:%s\r\n",hmac.c_str());
-    
+    int count = 0;
     if(ethConnected) {
         DBG.printf("emmaModeOperation - eth\r\n");
         DBG.printf("IP Address:%s\r\n",ipstack.getEth().getIPAddress());
@@ -952,85 +951,19 @@
 //        t.start();
 //        DBG.printf("start\r\n");
         while(true) {
-//            if(!ipstack.getEth().linkstatus()) {
-//                NVIC_SystemReset();  
-//            }
-
-            //check for new command
-            if(newCommand) {
-                string cmd(globalCommand);
-                string cmdBuffer;
-                int keyIndex, key2Index;
-                int valueLength;
-                
-                DBG.printf("Parsing CMD...\r\n");
-                
-                for (int i=0; i<5; i++) {
-                    keyIndex = cmd.find(parameter[i]);
-                    key2Index = cmd.find(parameter[i+1]);
-                    valueLength = key2Index - keyIndex - strlen(parameter[i]) - 2;
-                    DBG.printf("%s index: %d\r\n", parameter[i], keyIndex);
-                    cmdBuffer.assign(cmd, keyIndex+strlen(parameter[i]), valueLength);
-                    DBG.printf("the value: %s\r\n", cmdBuffer);
-                }
-
-                DBG.printf("Checking command validity...\r\n");
-                //check if command is valid
-                bool validCommand = true;
-                DBG.printf("Command validity:%d\r\n",validCommand);
-                                
-//                if(validCommand) {
-//                    string commandId = jsonValue[parameter[0]].get<std::string>();
-//                    string commandNType = jsonValue[parameter[1]].get<std::string>();
-//                    string commandNAddr = jsonValue[parameter[2]].get<std::string>();
-//                    string commandDType = jsonValue[parameter[3]].get<std::string>();
-//                    string commandCmd = jsonValue[parameter[4]].get<std::string>();
-//                    
-//                    if(commandNType == "0") {       //switch on panel controller
-//                        DBG.printf("command for switch\r\n");
-//                    }
-//                    else if(commandNType == "1") {  //node with mac address
-//                        DBG.printf("command for node\r\n");
-//                        //get node ip address based on node mac address
-//                        string nodeIP = readNodeIP(commandNAddr);
-//                        //DBG.printf("nodeIP: %s\r\n",nodeIP.c_str());
-//                        
-//                        //get cmd string based on device type and command number
-//                        string nodeCmd = readNodeCmd(commandDType,commandCmd);
-//                        //DBG.printf("nodeCmd: %s\r\n",nodeCmd.c_str());                        
-//                        //execute command
-//                        //int trial=0;
-//                        trial = 0;
-//                        bool execStatus=false;
-//                    
-//                        while( !execStatus ) {
-//                            sprintf(s,"<?xml version=\"1.0\" encoding=\"utf-8\"?><app_cmd cmd=\"5\" /><app_data code=\"%s\"/>\r\n",nodeCmd.c_str());
-//                            str = s;
-//                            DBG.printf("str value:%s\r\n",str.c_str());                            
-//                            string rcv = ethGET(nodeIP,REMOTE_TCP_PORT,str); // cause mqtt to stop
-//                            DBG.printf("response:%s\r\n",rcv.c_str());
-//                            str = rcv;
-//                            if(str.find("OK") != std::string::npos) {
-//                                DBG.printf("CMD executed successfully\r\n");
-//                                execStatus = true;
-//                            }
-//                            if(trial>0) {   //two times trial
-//                                DBG.printf("cmd is not executed\r\n");
-//                                execStatus = false;
-//                            }
-//                            DBG.printf("Trial++\r\n");
-//                            trial++;
-//                            wait(3);
-//                        }
-//                        DBG.printf("Send execution status\r\n");
-//                        //send execution status
-//                    }
-//                }
-                newCommand = false;    
-                DBG.printf("New command false\r\n");
+            if(!ipstack.getEth().linkstatus()) {
+                NVIC_SystemReset();  
             }
+            
+            if (++count == 200)
+            {               // Publish a message every second
+                if (publish(&client, &ipstack) != 0)
+                    ethMQTTAttemptConnect(&client, &ipstack);   // if we have lost the connection
+                count = 0;
+            }
+            
             // DBG.printf("Yielding...\r\n");            
-            client.yield(10);  //allow MQTT client to receive message
+            client.yield(100);  //allow MQTT client to receive message
             // DBG.printf("Yield finished\r\n");
         }
     } else if(wifiConnected) {
@@ -1122,7 +1055,7 @@
             str.erase(str.begin()+str.rfind("}]")+2,str.end());
             
             parse(jsonValue,str.c_str());
-            char *parameter[2] = {"mac","ip"};
+            const char *parameter[2] = {"mac","ip"};
             
             TFT.locate(0,40);
             TFT.printf("                                        ");
@@ -1455,7 +1388,7 @@
                 TFT.printf("newCommand");
                 
                 parse(jsonValue,globalCommand.c_str());
-                char *parameter[5] = {"name","nType","nAddr","dType","cmd"};
+                const char *parameter[5] = {"name","nType","nAddr","dType","cmd"};
                 
                 //check whether command is valid
                 bool validCommand = true;
@@ -1674,7 +1607,7 @@
             
             parse(jsonValue,str.c_str());
             
-            char *parameter[2] = {"firmwareVer","numPart"};
+            const char *parameter[2] = {"firmwareVer","numPart"};
             
             for(int i=0; i<2; i++) {
                 if(jsonValue.hasMember(parameter[i])) {
@@ -1972,27 +1905,113 @@
 }
 /*end wifi rest*/
 
+int publish(MQTT::Client<MQTTEthernet, Countdown, MQTT_MAX_PACKET_SIZE>* client, MQTTEthernet* ipstack)
+{
+    MQTT::Message message;
+    char* topic = new char[strlen(platformDOMAIN.c_str())+strlen(emmaUID.c_str())+9];
+    *topic = '\0';
+    strcat(topic, platformDOMAIN.c_str());
+    strcat(topic, "/");
+    strcat(topic, emmaUID.c_str());
+    strcat(topic, "/dummy");
+
+    const char buf[25] = "{\"d\":{\"myName\":\"EMMA\"}}";
+    
+    message.qos = MQTT::QOS2;
+    message.retained = false;
+    message.dup = false;
+    message.payload = (void*)buf;
+    message.payloadlen = strlen(buf);
+
+    DBG.printf("Publishing %s\r\n", buf);
+    return client->publish(topic, &message);
+}
+
 /*start eth mqtt*/
 void ethMQTTMessageArrived(MQTT::MessageData& md) {
+
     MQTT::Message &message = md.message;
-    DBG.printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
-    DBG.printf("Payload %.*s\r\n", message.payloadlen, (char*)message.payload);
+    char topic[md.topicName.lenstring.len + 1];
+    string str;
+    sprintf(topic, "%.*s", md.topicName.lenstring.len, md.topicName.lenstring.data);
 
-    string sp = string((char*)(message.payload));
-    
+    DBG.printf("Message arrived on topic %s: %.*s\r\n",  topic, message.payloadlen, message.payload);
+    char *s = (char*)message.payload;
+    string sp(s);
     if(sp.find("[") != std::string::npos && sp.find("]") != std::string::npos) {
         sp.erase(sp.begin(),sp.begin()+sp.find("[")+1);
-        sp.erase(sp.begin()+sp.find("]"), sp.end());
-        globalCommand.assign(sp);
-        newCommand = true;
+        sp.erase(sp.begin()+sp.find("]"),sp.end());
+    }
+    else {
+        DBG.printf("Invalid MQTT command");
+        return;
+    }
+
+    MbedJSONValue jsonValue;
+    parse(jsonValue, sp.c_str());
+    const char *parameter[5] = {"name","nType","nAddr","dType","cmd"};
+    
+    //check if command is valid
+    bool validCommand = true;
+    for(int i=0; i<5; i++) {
+        validCommand = validCommand && jsonValue.hasMember(parameter[i]);
     }
-    sp.clear();
+    DBG.printf("command validity:%d\r\n",validCommand);
+    
+    //check for new command
+    if(validCommand) {
+        string commandId = jsonValue[parameter[0]].get<std::string>();
+        string commandNType = jsonValue[parameter[1]].get<std::string>();
+        string commandNAddr = jsonValue[parameter[2]].get<std::string>();
+        string commandDType = jsonValue[parameter[3]].get<std::string>();
+        string commandCmd = jsonValue[parameter[4]].get<std::string>();
+        
+        if(commandNType == "0") {       //switch on panel controller
+            DBG.printf("command for switch\r\n");
+        }
+        else if(commandNType == "1") {  //node with mac address
+            DBG.printf("command for node\r\n");
+            //get node ip address based on node mac address
+            string nodeIP = readNodeIP(commandNAddr);
+            //DBG.printf("nodeIP: %s\r\n",nodeIP.c_str());
+            
+            //get cmd string based on device type and command number
+            string nodeCmd = readNodeCmd(commandDType,commandCmd);
+            //DBG.printf("nodeCmd: %s\r\n",nodeCmd.c_str());                        
+            //execute command
+            int trial=0;
+            trial = 0;
+            bool execStatus=false;
+
+            while( !execStatus ) {
+                sprintf(s,"<?xml version=\"1.0\" encoding=\"utf-8\"?><app_cmd cmd=\"5\" /><app_data code=\"%s\"/>\r\n",nodeCmd.c_str());
+                str.assign(s);
+                DBG.printf("str value:%s\r\n",str.c_str());                            
+                string rcv = ethGET(nodeIP,REMOTE_TCP_PORT,str); // cause mqtt to stop
+                DBG.printf("response:%s\r\n",rcv.c_str());
+                str = rcv;
+                if(str.find("OK") != std::string::npos) {
+                    DBG.printf("CMD executed successfully\r\n");
+                    execStatus = true;
+                    break;
+                }
+                if(trial>5) {   //two times trial
+                    DBG.printf("cmd is not executed\r\n");
+                    execStatus = false;
+                }
+                DBG.printf("Trial++\r\n");
+                trial++;
+                wait(3);
+            }
+            DBG.printf("Send execution status\r\n");
+            //send execution status
+        }
+    }
+    DBG.printf("Finish processing new command\r\n");
 }
 
 int ethMQTTConnect(MQTT::Client<MQTTEthernet, Countdown, MQTT_MAX_PACKET_SIZE>* client, MQTTEthernet* ipstack) {
-    //char hostname[] = "q.thingfabric.com";
-    //char hostname[] = "192.168.131.200";
-    //int rc = ipstack->connect(hostname, MQTT_PORT);
+
     int rc = ipstack->connect(MQTT_HOST, MQTT_PORT);
     
     if(rc!=0)
@@ -2000,44 +2019,57 @@
         
     //MQTT Connect
     //char clientId[] = "emma/0674ff575349896767072538";
-    char clientId [32];
-    sprintf(clientId,"emma/%s",emmaUID.c_str());
-    //DBG.printf("clientId:%s\r\n",clientId);
+    char clientId [strlen(emmaUID.c_str())+6];
+    sprintf(clientId,"emma/%s", emmaUID.c_str());
+    DBG.printf("clientId:%s\r\n",clientId);
+    
     MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
     data.MQTTVersion = 3;
     data.clientID.cstring = clientId;
     data.username.cstring = "761b233e-a49a-4830-a8ae-87cec3dc1086";
     data.password.cstring = "ef25cf4567fbc07113252f8d72b7faf2";
     
-    if((rc = client->connect(&data)) == 0) {
-        DBG.printf("connected\r\n");
+    if ((rc = client->connect(&data)) == 0)
+    {
+//         connected = true;
+        DBG.printf("MQTT connected\r\n");
     }
     
     //MQTT Subscribe
-    //char* topic = "gaisbwqreqrfxjc/0674ff575349896767072538/command";
-    char s[64];
-    sprintf(s,"%s/%s/command",platformDOMAIN.c_str(),emmaUID.c_str());
-    string topic = s;
-    DBG.printf("topic:%s\r\n",topic.c_str());
-    if((rc=client->subscribe(topic.c_str(),MQTT::QOS2,ethMQTTMessageArrived)) == 0) {
-        DBG.printf("subscribe success!\r\n");    
+//    char* topic = "emma/005300553533510334313732/command";
+    char* topic = new char[strlen(platformDOMAIN.c_str())+strlen(emmaUID.c_str())+9];
+    *topic = '\0';
+    strcat(topic, platformDOMAIN.c_str());
+    strcat(topic, "/");
+    strcat(topic, emmaUID.c_str());
+    strcat(topic, "/command");
+    DBG.printf("MQTT subscription topic: %s\r\n", topic);
+                
+    if ((rc = client->subscribe(topic, MQTT::QOS2, ethMQTTMessageArrived)) == 0) {
+        DBG.printf("Subscribe success\r\n");    
     }
+    
     return rc;
 }
 
+int getConnTimeout(int attemptNumber)
+{  // First 10 attempts try within 3 seconds, next 10 attempts retry after every 1 minute
+   // after 20 attempts, retry every 10 minutes
+    return (attemptNumber < 10) ? 3 : (attemptNumber < 20) ? 60 : 600;
+}
+
 void ethMQTTAttemptConnect(MQTT::Client<MQTTEthernet, Countdown, MQTT_MAX_PACKET_SIZE>* client, MQTTEthernet* ipstack) {
     int retryAttempt = 0;
     
     while(!ipstack->getEth().linkstatus()) {
+        wait(1.0f);
         DBG.printf("Ethernet link not present. Check cable connection\r\n");
-        wait(1);
     }
     
     while(ethMQTTConnect(client,ipstack) != 0) {
-        int timeout = 3;
+        int timeout = getConnTimeout(++retryAttempt);
         DBG.printf("Retry attempt number %d waiting %d\r\n", retryAttempt, timeout);
         wait(timeout);
-        retryAttempt++;
     }
 }
 
@@ -2094,6 +2126,7 @@
         fclose(fp);
         return true;
     }
+    fclose(fp);
     return false;
 }
 /*end emma settings*/
@@ -2213,9 +2246,9 @@
 
 /*start emma connection function*/
 string ethGET(string host, int port, string url) {
-//    DBG.printf("Entering ethGET with url: %s\r\n", url);
-//    DBG.printf("Host: %s\r\n", host);
-//    DBG.printf("Port: %d\r\n", port);    
+    DBG.printf("Entering ethGET with url: %s\r\n", url);
+    DBG.printf("Host: %s\r\n", host);
+    DBG.printf("Port: %d\r\n", port);    
     char buf[1024];
     char s[256];
     int ret;