not running

Dependencies:   TextLCD MQTT

Files at this revision

API Documentation at this revision

Comitter:
hyan99
Date:
Wed Dec 11 20:20:12 2019 +0000
Parent:
1:19c3299ea83a
Commit message:
testing

Changed in this revision

AccCar.cpp Show annotated file Show diff for this revision Revisions of this file
AccCar.h Show annotated file Show diff for this revision Revisions of this file
Communication.cpp Show annotated file Show diff for this revision Revisions of this file
Communication.h Show annotated file Show diff for this revision Revisions of this file
Road.cpp Show annotated file Show diff for this revision Revisions of this file
Road.h Show annotated file Show diff for this revision Revisions of this file
main.cpp Show annotated file Show diff for this revision Revisions of this file
mbed_app.json Show annotated file Show diff for this revision Revisions of this file
--- a/AccCar.cpp	Tue Dec 10 23:40:25 2019 +0000
+++ b/AccCar.cpp	Wed Dec 11 20:20:12 2019 +0000
@@ -11,7 +11,7 @@
 
 Serial pc_acc(USBTX, USBRX);
 
-AccCar::AccCar(int id, Road* road, int flag, char* t1, char* t2) {
+AccCar::AccCar(int id, Road* road, int flag, Communication* c) {
     this->id = id;
     this->road = road;
     this->flag = flag;
@@ -20,11 +20,7 @@
     this->forward_car = NULL;
 
     this->cycle = 0;
-    this->comm = Communication::getInstance();
-    this->topic_position = t1;
-    this->topic_control = t2;
-//    communication->subscribe_to_position(topic_position);
-    comm->subscribe_to_topic_control(topic_control);
+    this->comm = c;
 }
 
 void AccCar::set_forward_car(AccCar* car) {
@@ -51,9 +47,12 @@
             speed = target_speed;
         }
         // TODO: publish Position and Acc-speed
-        comm->publish_car(topic_position, speed, position);
-        Communication::control_flags.wait_all(flag); // wait for speed instruction from controller
-        speed = Communication::speeds[id - 1];
+        if (position < 255) {
+            comm->publish_car(id, speed, position);
+            Communication::control_flags.wait_all(flag); // wait for speed instruction from controller
+            speed = Communication::speeds[id - 1];
+            pc_acc.printf("Suggested speed: %d\r\n", speed);
+        }
 //        // Crossing
 //        if (position < STOP) {
 //            speed = std::min(speed, STOP - position);
--- a/AccCar.h	Tue Dec 10 23:40:25 2019 +0000
+++ b/AccCar.h	Wed Dec 11 20:20:12 2019 +0000
@@ -16,7 +16,7 @@
     int speed;
     int flag;
     
-    AccCar(int id, Road* road, int flag, char* t1, char* t2);
+    AccCar(int id, Road* road, int flag, Communication* c);
     void set_forward_car(AccCar* car);
     void update();
     void reset(int speed);
--- a/Communication.cpp	Tue Dec 10 23:40:25 2019 +0000
+++ b/Communication.cpp	Wed Dec 11 20:20:12 2019 +0000
@@ -1,6 +1,7 @@
 #include "Communication.h"
 
 Communication* Communication::singleton = NULL;
+Thread* Communication::thread = NULL;
 int Communication::speeds[5] = {0, 0, 0, 0, 0};
 int Communication::sync = 0;
 int Communication::flags[5] = {0x01, 0x02, 0x04, 0x08, 0x10};
@@ -8,32 +9,45 @@
 EventFlags Communication::sync_flags;
 
 Serial pc_comm(USBTX, USBRX);
+//Mutex mutex;
+//std::vector<message_t> v;
+//std::queue<message_t> q;
+//Queue<message_t, 16> q_car;
+//MemoryPool<message_t, 16> mpool;
 
-Communication* Communication::getInstance() {
+Communication* Communication::getInstance(char* t1, char* t2, char* t3, char* t4) {
     if (singleton == NULL) {
-        singleton = new Communication();
+        singleton = new Communication(t1, t2, t3, t4);
+//        Communication();
     }
     return singleton;
 }
 
-Communication::Communication() 
+Communication::Communication(char* t1, char* t2, char* t3, char* t4)
 {
 //    singleton = this;
+    mutex = new Mutex();
+    q = new std::queue<message_t>();
     setup_wifi();
     if (wifi == NULL) {
         pc_comm.printf("Failed to set up Wifi.");
     }
-    MQTTNetwork network(wifi);
+    network = new MQTTNetwork(wifi);
     setup_mqtt(network);
-    if (client == NULL) {
+    if (this->client == NULL) {
         pc_comm.printf("Failed to set up Client.");
     }
-    subscribe_to_topic_sync("Rahman/Sync/Send/1");
+    topic_pub_position = t1;
+    topic_sub_control = t2;
+    topic_pub_receive = t3;
+    topic_sub_send = t4;
+    subscribe_to_topic_control(topic_sub_control);
+    subscribe_to_topic_sync(topic_sub_send);
 }
 
 void Communication::setup_wifi() {
     // Get a handle to the WiFi module
-    wifi = WiFiInterface::get_default_instance();
+    this->wifi = WiFiInterface::get_default_instance();
     
     // Connect the module to the wifi, based on the SSID and password 
     // specified in the mbed_app.json configuration file
@@ -46,7 +60,7 @@
     // needed to register the device with AirPennNet-Device, so that you
     // can use the campus wifi
     pc_comm.printf("MAC Address: ");
-    pc_comm.printf(wifi->get_mac_address());
+    pc_comm.printf(this->wifi->get_mac_address());
     pc_comm.printf("\r\n");
     
     if (rc != 0) {
@@ -58,7 +72,7 @@
 }
 
 
-void Communication::setup_mqtt(MQTTNetwork &network) {
+void Communication::setup_mqtt(MQTTNetwork* network) {
     // the hostname and port point to a Google Cloud MQTT server we setup for
     // this project
     const char* hostname = "34.68.206.11";
@@ -66,7 +80,7 @@
     
     // Create the underlying network connection to the MQTT server
     pc_comm.printf("Connecting to %s:%d\r\n", hostname, port);
-    int rc = network.connect(hostname, port);
+    int rc = network->connect(hostname, port);
     if (rc != 0) {
         pc_comm.printf("There was an error with the TCP connect: %d\r\n", rc);
         return;
@@ -75,8 +89,8 @@
     pc_comm.printf("Connected to %s:%d\r\n", hostname, port);
         
     // Connect the MQTT client to the server
-    client = new MQTT::Client<MQTTNetwork, Countdown>(network);
-    rc = client->connect();
+    this->client = new MQTT::Client<MQTTNetwork, Countdown>(*network);
+    rc = this->client->connect();
     if (rc != 0) {
         pc_comm.printf("There was an error with the MQTT connect: %d\r\n", rc);
         return;
@@ -99,6 +113,7 @@
     
     int id = (int) payload[0];
     int speed = (int) payload[1];
+    pc_comm.printf("Control arrived for id %d, speed %d\r\n", id, speed);
     
     speeds[id-1] = speed;
     control_flags.set(flags[id-1]);
@@ -109,6 +124,7 @@
     
     char* payload = (char*) message.payload;
     sync = (int) payload[0];
+    pc_comm.printf("Sync arrived with number %d\r\n", sync);
     sync_flags.set(0x01);
 }
 
@@ -118,9 +134,13 @@
  
     char buf[100];
     sprintf(buf, "Hello World!  This is a test message!\r\n");
-    int rc = client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1);
+    if (this->client == NULL) {
+        pc_comm.printf("CLIENT IS NULL!!!\r\n");
+    }
+    int rc = this->client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1);
     
     if (rc != 0) {
+        pc_comm.printf("Message failed!\r\n");
         return -1;   
     } else {
         pc_comm.printf("Message sent!\r\n");
@@ -128,23 +148,10 @@
     }
 }
 
-int Communication::subscribe_to_topic_position(char* topic) {
-    pc_comm.printf("subcribing: %s", topic);
-    int rc = client->subscribe(topic, MQTT::QOS1, message_arrived);
-    if (rc != 0) {
-        pc_comm.printf("There was a problem subscribing: %d\r\n", rc);
-        
-//        client->disconnect();
-        return -1;
-    } else {
-        pc_comm.printf("Subscribed!\r\n");
-    }
-    return rc;
-}
 
 int Communication::subscribe_to_topic_control(char* topic) {
     pc_comm.printf("subcribing: %s\r\n", topic);
-    int rc = client->subscribe(topic, MQTT::QOS1, control_arrived);
+    int rc = this->client->subscribe(topic, MQTT::QOS1, control_arrived);
     if (rc != 0) {
         pc_comm.printf("There was a problem subscribing: %d\r\n", rc);
         
@@ -158,7 +165,7 @@
 
 int Communication::subscribe_to_topic_sync(char* topic) {
     pc_comm.printf("subcribing: %s\r\n", topic);
-    int rc = client->subscribe(topic, MQTT::QOS1, sync_arrived);
+    int rc = this->client->subscribe(topic, MQTT::QOS1, sync_arrived);
     if (rc != 0) {
         pc_comm.printf("There was a problem subscribing: %d\r\n", rc);
         
@@ -170,34 +177,84 @@
     return rc;
 }
     
-void Communication::publish_car(char* topic, int speed, int position) {
-    message_t *message = mpool.alloc();
-    message->topic = topic;
-    message->speed = speed;
-    message->position = position;
-    q_car.put(message);
+void Communication::publish_car(int id, int speed, int position) {
+    message_t message;
+    message.id = (char) id;
+    message.speed = (char) speed;
+    message.position = (char) position;
+    
+    pc_comm.printf("publish_car:%d,%d,%d\r\n",id, speed, position);
+    mutex->lock();
+    q->push(message);
+    mutex->unlock();
 }
 
-void Communication::publish_road(char* topic, int num) {
-    message_t *message = mpool.alloc();
-    message->topic = topic;
-    message->speed = -1; // indicate this is for sync
-    message->position = num;
-    q_car.put(message);
+void Communication::publish_road(int num) {
+    message_t message;
+    message.id = -1; // indicate this is for sync
+    message.position = (char) num; // use position as road update number
+    
+    mutex->lock();
+    q->push(message);
+    mutex->unlock();
 }
 
 void Communication::start() {
-    while(1) {
+    while (true) {
+        mutex->lock();
+//        pc_comm.printf("queue size = %d\r\n", q.size());
+        if (q->size() > 0) {
+            message_t &message = q->front();
+            q->pop();
+            mutex->unlock();
+            if (message.id == 255 || message.id == -1) {
+                char buf[2];
+                buf[0] = (char) message.position; // update number
+                buf[1] = '\0';
+                
+                int rc = this->client->publish(topic_pub_receive, (char*) buf, strlen(buf)+1, MQTT::QOS1);
+                if (rc != 0) {
+                    pc_comm.printf("Failed to publish Road info to: %s", topic_pub_receive);
+                } else {
+                    pc_comm.printf("Message sent! %s(%d)\r\n", topic_pub_receive, (int)buf[0]);
+                }
+            } else {
+                char buf[4];
+                buf[0] = message.id;
+                buf[1] = message.position;
+                buf[2] = message.speed;
+                buf[3] = '\0';
+                
+                int rc = this->client->publish(topic_pub_position, (char*) buf, strlen(buf)+1, MQTT::QOS1);
+                if (rc != 0) {
+                    pc_comm.printf("Failed to publish AccCar info to: %s", topic_pub_position);
+                } else {
+                    pc_comm.printf("Message sent! (%d, %d, %d)\r\n", (int)buf[0], (int)buf[1], (int)buf[2]);
+                }
+            }
+//            q->pop();
+//            mutex->unlock();
+            this->client->yield(500); // wait for 100ms
+        } else {
+            mutex->unlock();
+            ThisThread::sleep_for(500);
+        }
+    }
+    /*
+    while(true) {
+        pc_comm.printf("queue size = %d\r\n", q_car.count());
         osEvent evt = q_car.get();
         if (evt.status == osEventMessage) {
+            pc_comm.printf("Gets one message.\r\n");
             message_t *message = (message_t*)evt.value.p;
-            char* topic = message->topic;
+//            char* topic = message->topic;
+            char* topic = "Rahman/Sync/Send/1";
             
             if (message->speed == -1) {
                 char buf[1];
                 buf[0] = (char) message->position; // update number
                 
-                int rc = client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1);
+                int rc = this->client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1);
                 if (rc != 0) {
                     pc_comm.printf("Failed to publish Road info: %s", topic);
                 } else {
@@ -208,7 +265,7 @@
                 buf[0] = (char) message->position;
                 buf[1] = (char) message->speed;
                 
-                int rc = client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1);
+                int rc = this->client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1);
                 if (rc != 0) {
                     pc_comm.printf("Failed to publish AccCar info: %s", topic);
                 } else {
@@ -216,13 +273,15 @@
                 }
             }
             mpool.free(message);
-            client->yield(100); // wait for 100ms
+            this->client->yield(100); // wait for 100ms
         }
     }
+    */
 }
 
 void Communication::reset() {
     control_flags.clear();
+    sync_flags.clear();
     
     if (thread != NULL) {
         thread->terminate();
--- a/Communication.h	Tue Dec 10 23:40:25 2019 +0000
+++ b/Communication.h	Wed Dec 11 20:20:12 2019 +0000
@@ -2,15 +2,19 @@
 #define _COMMUNICATION_H_
 
 #include "mbed.h"
+#include "rtos.h"
 #include "MQTTNetwork.h"
 #include "MQTTClient.h"
 #include "MQTTmbed.h"
+#include <string.h>
+#include <queue>
+#include <mutex>
 
 
 typedef struct {
-    int    speed; // speed for AccCar, -1 for Road
-    int    position; // position for AccCar, update number for Road
-    char*  topic;
+    char id;
+    char speed; // speed for AccCar, -1 for Road
+    char position; // position for AccCar, update number for Road
 } message_t;
 
 class Communication {
@@ -22,35 +26,40 @@
     static int flags[5];
     
     // Singleton
-    static Communication *getInstance();
+    static Communication *getInstance(char* t1, char* t2, char* t3, char* t4);
     
     // MQTT Setup
     void setup_wifi();
-    void setup_mqtt(MQTTNetwork &network);
+    void setup_mqtt(MQTTNetwork* network);
     static void message_arrived(MQTT::MessageData& md);
     static void control_arrived(MQTT::MessageData& md);
     static void sync_arrived(MQTT::MessageData& md);
     int send_message(char* topic);
-    int subscribe_to_topic_position(char* topic);
     int subscribe_to_topic_control(char* topic);
     int subscribe_to_topic_sync(char* topic);
-    void publish_car(char* topic, int speed, int position);
-    void publish_road(char* topic, int num);
+    void publish_car(int id, int speed, int position);
+    void publish_road(int num);
     void start(); // function for serving requests (single thread)
     void reset(); // function to create and start the singleton thread
     void stop();
     
-private:
+//private:
     static Communication *singleton;
     
     WiFiInterface *wifi;
+    MQTTNetwork* network;
     MQTT::Client<MQTTNetwork, Countdown> *client;
-    Thread* thread;
+    static Thread* thread;
+    
+    Mutex* mutex;
+    std::queue<message_t>* q;
     
-    MemoryPool<message_t, 10> mpool;
-    Queue<message_t, 10> q_car;
+    char* topic_pub_position;
+    char* topic_sub_control;
+    char* topic_pub_receive;
+    char* topic_sub_send;
     
-    Communication();
+    Communication(char* t1, char* t2, char* t3, char* t4);
     
 };
 
--- a/Road.cpp	Tue Dec 10 23:40:25 2019 +0000
+++ b/Road.cpp	Wed Dec 11 20:20:12 2019 +0000
@@ -2,13 +2,10 @@
 
 Serial pc_road(USBTX, USBRX);
 
-Road::Road(char* t1, char* t2) {
+Road::Road(Communication* c) {
     active_cars = 0x00;
-    topic_send = t1; // road publish, controller receive
-    topic_receive = t2; // road receive, controller publish
-    comm = Communication::getInstance();
-    comm->subscribe_to_topic_sync(topic_receive); // for receiving sync messages
-    n = 0;
+    comm = c;
+    n = 1;
 }
 
 void Road::add_car(Car* car) {
@@ -50,8 +47,8 @@
 
 void Road::wait_for_car_update() {
     done_flags.wait_all(active_cars);
-    n = n + 1;
-    comm->publish_road(topic_send, n);
+    comm->publish_road(n);
     Communication::sync_flags.wait_all(0x01);
     n = Communication::sync;
+    pc_road.printf("Update number %d\r\n", n);
 }
--- a/Road.h	Tue Dec 10 23:40:25 2019 +0000
+++ b/Road.h	Wed Dec 11 20:20:12 2019 +0000
@@ -17,7 +17,7 @@
     EventFlags done_flags;
     Intersection *intersection;
 
-    Road(char* t1, char* t2);
+    Road(Communication* c);
     void add_car(Car* car);
     void add_acc_car(AccCar* car, int id);
     void let_cars_update();
--- a/main.cpp	Tue Dec 10 23:40:25 2019 +0000
+++ b/main.cpp	Wed Dec 11 20:20:12 2019 +0000
@@ -14,6 +14,7 @@
 
 #include <algorithm>
 #include <vector>
+#include <queue>
 #include <string>
 #include <stdlib.h>
 
@@ -62,34 +63,25 @@
     Intersection intersection;
 
     // Initialize Communication
-    Communication* communication = Communication::getInstance();
-    communication->send_message("Rahman/Sync/Send/1");
+    
+    char buf1[30] = "Rahman/Position/2";
+    char buf2[30] = "Rahman/Control/2";
+    char buf3[50] = "Rahman/Sync/Receive/2";
+    char buf4[50] = "Rahman/Sync/Send/2";
+    Communication* c = Communication::getInstance(buf1, buf2, buf3, buf4);
+    
     // Initialize 5 AccCars and the Road
-    char buf1[50] = "Rahman/Sync/Receive/1";
-    char buf2[50] = "Rahman/Sync/Send/1";
-    Road road1(buf1, buf2);
+    
+    Road road1(c);
     intersection.road1 = &road1;
     road1.intersection = &intersection;
     
-    char buf11[30] = "Rahman/Position/1/1";
-    char buf12[30] = "Rahman/Control/1/1";
-    AccCar car11(1, &road1, 0x01, buf11, buf12);
     
-    char buf21[30] = "Rahman/Position/1/2";
-    char buf22[30] = "Rahman/Control/1/2";
-    AccCar car12(2, &road1, 0x02, buf21, buf22);
-    
-    char buf31[30] = "Rahman/Position/1/3";
-    char buf32[30] = "Rahman/Control/1/3";
-    AccCar car13(3, &road1, 0x04, buf31, buf32);
-    
-    char buf41[30] = "Rahman/Position/1/4";
-    char buf42[30] = "Rahman/Control/1/4";
-    AccCar car14(4, &road1, 0x08, buf41, buf42);
-    
-    char buf51[30] = "Rahman/Position/1/5";
-    char buf52[30] = "Rahman/Control/1/5";
-    AccCar car15(5, &road1, 0x10, buf51, buf52);
+    AccCar car11(1, &road1, 0x01, c);
+    AccCar car12(2, &road1, 0x02, c);
+    AccCar car13(3, &road1, 0x04, c);
+    AccCar car14(4, &road1, 0x08, c);
+    AccCar car15(5, &road1, 0x10, c);
 
     std::vector<AccCar*> q1;
     q1.push_back(&car15);
@@ -106,8 +98,10 @@
 
     stopwatch.start();
     
-    communication->reset(); // start thread
-
+    pc.printf("Dispatching communication thread.\r\n");
+    
+    c->reset();
+    
     int interval = MAX_SPEED - MIN_SPEED + 1;
     car11.reset(rand() % interval + MIN_SPEED); // set random speed [5, 15]
     car12.reset(rand() % interval + MIN_SPEED);
@@ -115,6 +109,7 @@
     car14.reset(rand() % interval + MIN_SPEED);
     car15.reset(rand() % interval + MIN_SPEED);
 
+//    c->reset();
     stopwatch.reset();
 
     int waitTime1 = 0;
@@ -190,7 +185,7 @@
     car13.stop();
     car14.stop();
     car15.stop();
-    communication->stop();
+    c->stop();
 
             // ----------------------------------------------------------------------
     // Timing statistics printout, do not modify
--- a/mbed_app.json	Tue Dec 10 23:40:25 2019 +0000
+++ b/mbed_app.json	Wed Dec 11 20:20:12 2019 +0000
@@ -24,7 +24,7 @@
             "esp8266.provide-default" : true,
             "esp8266.tx" : "p28",
             "esp8266.rx" : "p27",
-            "rtos.thread-stack-size": 512   
+            "rtos.thread-stack-size": 1024   
         }   
     }  
 }
\ No newline at end of file