not running

Dependencies:   TextLCD MQTT

Revision:
2:16b3bd337db2
Parent:
1:19c3299ea83a
--- a/Communication.cpp	Tue Dec 10 23:40:25 2019 +0000
+++ b/Communication.cpp	Wed Dec 11 20:20:12 2019 +0000
@@ -1,6 +1,7 @@
 #include "Communication.h"
 
 Communication* Communication::singleton = NULL;
+Thread* Communication::thread = NULL;
 int Communication::speeds[5] = {0, 0, 0, 0, 0};
 int Communication::sync = 0;
 int Communication::flags[5] = {0x01, 0x02, 0x04, 0x08, 0x10};
@@ -8,32 +9,45 @@
 EventFlags Communication::sync_flags;
 
 Serial pc_comm(USBTX, USBRX);
+//Mutex mutex;
+//std::vector<message_t> v;
+//std::queue<message_t> q;
+//Queue<message_t, 16> q_car;
+//MemoryPool<message_t, 16> mpool;
 
-Communication* Communication::getInstance() {
+Communication* Communication::getInstance(char* t1, char* t2, char* t3, char* t4) {
     if (singleton == NULL) {
-        singleton = new Communication();
+        singleton = new Communication(t1, t2, t3, t4);
+//        Communication();
     }
     return singleton;
 }
 
-Communication::Communication() 
+Communication::Communication(char* t1, char* t2, char* t3, char* t4)
 {
 //    singleton = this;
+    mutex = new Mutex();
+    q = new std::queue<message_t>();
     setup_wifi();
     if (wifi == NULL) {
         pc_comm.printf("Failed to set up Wifi.");
     }
-    MQTTNetwork network(wifi);
+    network = new MQTTNetwork(wifi);
     setup_mqtt(network);
-    if (client == NULL) {
+    if (this->client == NULL) {
         pc_comm.printf("Failed to set up Client.");
     }
-    subscribe_to_topic_sync("Rahman/Sync/Send/1");
+    topic_pub_position = t1;
+    topic_sub_control = t2;
+    topic_pub_receive = t3;
+    topic_sub_send = t4;
+    subscribe_to_topic_control(topic_sub_control);
+    subscribe_to_topic_sync(topic_sub_send);
 }
 
 void Communication::setup_wifi() {
     // Get a handle to the WiFi module
-    wifi = WiFiInterface::get_default_instance();
+    this->wifi = WiFiInterface::get_default_instance();
     
     // Connect the module to the wifi, based on the SSID and password 
     // specified in the mbed_app.json configuration file
@@ -46,7 +60,7 @@
     // needed to register the device with AirPennNet-Device, so that you
     // can use the campus wifi
     pc_comm.printf("MAC Address: ");
-    pc_comm.printf(wifi->get_mac_address());
+    pc_comm.printf(this->wifi->get_mac_address());
     pc_comm.printf("\r\n");
     
     if (rc != 0) {
@@ -58,7 +72,7 @@
 }
 
 
-void Communication::setup_mqtt(MQTTNetwork &network) {
+void Communication::setup_mqtt(MQTTNetwork* network) {
     // the hostname and port point to a Google Cloud MQTT server we setup for
     // this project
     const char* hostname = "34.68.206.11";
@@ -66,7 +80,7 @@
     
     // Create the underlying network connection to the MQTT server
     pc_comm.printf("Connecting to %s:%d\r\n", hostname, port);
-    int rc = network.connect(hostname, port);
+    int rc = network->connect(hostname, port);
     if (rc != 0) {
         pc_comm.printf("There was an error with the TCP connect: %d\r\n", rc);
         return;
@@ -75,8 +89,8 @@
     pc_comm.printf("Connected to %s:%d\r\n", hostname, port);
         
     // Connect the MQTT client to the server
-    client = new MQTT::Client<MQTTNetwork, Countdown>(network);
-    rc = client->connect();
+    this->client = new MQTT::Client<MQTTNetwork, Countdown>(*network);
+    rc = this->client->connect();
     if (rc != 0) {
         pc_comm.printf("There was an error with the MQTT connect: %d\r\n", rc);
         return;
@@ -99,6 +113,7 @@
     
     int id = (int) payload[0];
     int speed = (int) payload[1];
+    pc_comm.printf("Control arrived for id %d, speed %d\r\n", id, speed);
     
     speeds[id-1] = speed;
     control_flags.set(flags[id-1]);
@@ -109,6 +124,7 @@
     
     char* payload = (char*) message.payload;
     sync = (int) payload[0];
+    pc_comm.printf("Sync arrived with number %d\r\n", sync);
     sync_flags.set(0x01);
 }
 
@@ -118,9 +134,13 @@
  
     char buf[100];
     sprintf(buf, "Hello World!  This is a test message!\r\n");
-    int rc = client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1);
+    if (this->client == NULL) {
+        pc_comm.printf("CLIENT IS NULL!!!\r\n");
+    }
+    int rc = this->client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1);
     
     if (rc != 0) {
+        pc_comm.printf("Message failed!\r\n");
         return -1;   
     } else {
         pc_comm.printf("Message sent!\r\n");
@@ -128,23 +148,10 @@
     }
 }
 
-int Communication::subscribe_to_topic_position(char* topic) {
-    pc_comm.printf("subcribing: %s", topic);
-    int rc = client->subscribe(topic, MQTT::QOS1, message_arrived);
-    if (rc != 0) {
-        pc_comm.printf("There was a problem subscribing: %d\r\n", rc);
-        
-//        client->disconnect();
-        return -1;
-    } else {
-        pc_comm.printf("Subscribed!\r\n");
-    }
-    return rc;
-}
 
 int Communication::subscribe_to_topic_control(char* topic) {
     pc_comm.printf("subcribing: %s\r\n", topic);
-    int rc = client->subscribe(topic, MQTT::QOS1, control_arrived);
+    int rc = this->client->subscribe(topic, MQTT::QOS1, control_arrived);
     if (rc != 0) {
         pc_comm.printf("There was a problem subscribing: %d\r\n", rc);
         
@@ -158,7 +165,7 @@
 
 int Communication::subscribe_to_topic_sync(char* topic) {
     pc_comm.printf("subcribing: %s\r\n", topic);
-    int rc = client->subscribe(topic, MQTT::QOS1, sync_arrived);
+    int rc = this->client->subscribe(topic, MQTT::QOS1, sync_arrived);
     if (rc != 0) {
         pc_comm.printf("There was a problem subscribing: %d\r\n", rc);
         
@@ -170,34 +177,84 @@
     return rc;
 }
     
-void Communication::publish_car(char* topic, int speed, int position) {
-    message_t *message = mpool.alloc();
-    message->topic = topic;
-    message->speed = speed;
-    message->position = position;
-    q_car.put(message);
+void Communication::publish_car(int id, int speed, int position) {
+    message_t message;
+    message.id = (char) id;
+    message.speed = (char) speed;
+    message.position = (char) position;
+    
+    pc_comm.printf("publish_car:%d,%d,%d\r\n",id, speed, position);
+    mutex->lock();
+    q->push(message);
+    mutex->unlock();
 }
 
-void Communication::publish_road(char* topic, int num) {
-    message_t *message = mpool.alloc();
-    message->topic = topic;
-    message->speed = -1; // indicate this is for sync
-    message->position = num;
-    q_car.put(message);
+void Communication::publish_road(int num) {
+    message_t message;
+    message.id = -1; // indicate this is for sync
+    message.position = (char) num; // use position as road update number
+    
+    mutex->lock();
+    q->push(message);
+    mutex->unlock();
 }
 
 void Communication::start() {
-    while(1) {
+    while (true) {
+        mutex->lock();
+//        pc_comm.printf("queue size = %d\r\n", q.size());
+        if (q->size() > 0) {
+            message_t &message = q->front();
+            q->pop();
+            mutex->unlock();
+            if (message.id == 255 || message.id == -1) {
+                char buf[2];
+                buf[0] = (char) message.position; // update number
+                buf[1] = '\0';
+                
+                int rc = this->client->publish(topic_pub_receive, (char*) buf, strlen(buf)+1, MQTT::QOS1);
+                if (rc != 0) {
+                    pc_comm.printf("Failed to publish Road info to: %s", topic_pub_receive);
+                } else {
+                    pc_comm.printf("Message sent! %s(%d)\r\n", topic_pub_receive, (int)buf[0]);
+                }
+            } else {
+                char buf[4];
+                buf[0] = message.id;
+                buf[1] = message.position;
+                buf[2] = message.speed;
+                buf[3] = '\0';
+                
+                int rc = this->client->publish(topic_pub_position, (char*) buf, strlen(buf)+1, MQTT::QOS1);
+                if (rc != 0) {
+                    pc_comm.printf("Failed to publish AccCar info to: %s", topic_pub_position);
+                } else {
+                    pc_comm.printf("Message sent! (%d, %d, %d)\r\n", (int)buf[0], (int)buf[1], (int)buf[2]);
+                }
+            }
+//            q->pop();
+//            mutex->unlock();
+            this->client->yield(500); // wait for 100ms
+        } else {
+            mutex->unlock();
+            ThisThread::sleep_for(500);
+        }
+    }
+    /*
+    while(true) {
+        pc_comm.printf("queue size = %d\r\n", q_car.count());
         osEvent evt = q_car.get();
         if (evt.status == osEventMessage) {
+            pc_comm.printf("Gets one message.\r\n");
             message_t *message = (message_t*)evt.value.p;
-            char* topic = message->topic;
+//            char* topic = message->topic;
+            char* topic = "Rahman/Sync/Send/1";
             
             if (message->speed == -1) {
                 char buf[1];
                 buf[0] = (char) message->position; // update number
                 
-                int rc = client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1);
+                int rc = this->client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1);
                 if (rc != 0) {
                     pc_comm.printf("Failed to publish Road info: %s", topic);
                 } else {
@@ -208,7 +265,7 @@
                 buf[0] = (char) message->position;
                 buf[1] = (char) message->speed;
                 
-                int rc = client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1);
+                int rc = this->client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1);
                 if (rc != 0) {
                     pc_comm.printf("Failed to publish AccCar info: %s", topic);
                 } else {
@@ -216,13 +273,15 @@
                 }
             }
             mpool.free(message);
-            client->yield(100); // wait for 100ms
+            this->client->yield(100); // wait for 100ms
         }
     }
+    */
 }
 
 void Communication::reset() {
     control_flags.clear();
+    sync_flags.clear();
     
     if (thread != NULL) {
         thread->terminate();