not running
Revision 2:16b3bd337db2, committed 2019-12-11
- Comitter:
- hyan99
- Date:
- Wed Dec 11 20:20:12 2019 +0000
- Parent:
- 1:19c3299ea83a
- Commit message:
- testing
Changed in this revision
--- a/AccCar.cpp Tue Dec 10 23:40:25 2019 +0000
+++ b/AccCar.cpp Wed Dec 11 20:20:12 2019 +0000
@@ -11,7 +11,7 @@
Serial pc_acc(USBTX, USBRX);
-AccCar::AccCar(int id, Road* road, int flag, char* t1, char* t2) {
+AccCar::AccCar(int id, Road* road, int flag, Communication* c) {
this->id = id;
this->road = road;
this->flag = flag;
@@ -20,11 +20,7 @@
this->forward_car = NULL;
this->cycle = 0;
- this->comm = Communication::getInstance();
- this->topic_position = t1;
- this->topic_control = t2;
-// communication->subscribe_to_position(topic_position);
- comm->subscribe_to_topic_control(topic_control);
+ this->comm = c;
}
void AccCar::set_forward_car(AccCar* car) {
@@ -51,9 +47,12 @@
speed = target_speed;
}
// TODO: publish Position and Acc-speed
- comm->publish_car(topic_position, speed, position);
- Communication::control_flags.wait_all(flag); // wait for speed instruction from controller
- speed = Communication::speeds[id - 1];
+ if (position < 255) {
+ comm->publish_car(id, speed, position);
+ Communication::control_flags.wait_all(flag); // wait for speed instruction from controller
+ speed = Communication::speeds[id - 1];
+ pc_acc.printf("Suggested speed: %d\r\n", speed);
+ }
// // Crossing
// if (position < STOP) {
// speed = std::min(speed, STOP - position);
--- a/AccCar.h Tue Dec 10 23:40:25 2019 +0000
+++ b/AccCar.h Wed Dec 11 20:20:12 2019 +0000
@@ -16,7 +16,7 @@
int speed;
int flag;
- AccCar(int id, Road* road, int flag, char* t1, char* t2);
+ AccCar(int id, Road* road, int flag, Communication* c);
void set_forward_car(AccCar* car);
void update();
void reset(int speed);
--- a/Communication.cpp Tue Dec 10 23:40:25 2019 +0000
+++ b/Communication.cpp Wed Dec 11 20:20:12 2019 +0000
@@ -1,6 +1,7 @@
#include "Communication.h"
Communication* Communication::singleton = NULL;
+Thread* Communication::thread = NULL;
int Communication::speeds[5] = {0, 0, 0, 0, 0};
int Communication::sync = 0;
int Communication::flags[5] = {0x01, 0x02, 0x04, 0x08, 0x10};
@@ -8,32 +9,45 @@
EventFlags Communication::sync_flags;
Serial pc_comm(USBTX, USBRX);
+//Mutex mutex;
+//std::vector<message_t> v;
+//std::queue<message_t> q;
+//Queue<message_t, 16> q_car;
+//MemoryPool<message_t, 16> mpool;
-Communication* Communication::getInstance() {
+Communication* Communication::getInstance(char* t1, char* t2, char* t3, char* t4) {
if (singleton == NULL) {
- singleton = new Communication();
+ singleton = new Communication(t1, t2, t3, t4);
+// Communication();
}
return singleton;
}
-Communication::Communication()
+Communication::Communication(char* t1, char* t2, char* t3, char* t4)
{
// singleton = this;
+ mutex = new Mutex();
+ q = new std::queue<message_t>();
setup_wifi();
if (wifi == NULL) {
pc_comm.printf("Failed to set up Wifi.");
}
- MQTTNetwork network(wifi);
+ network = new MQTTNetwork(wifi);
setup_mqtt(network);
- if (client == NULL) {
+ if (this->client == NULL) {
pc_comm.printf("Failed to set up Client.");
}
- subscribe_to_topic_sync("Rahman/Sync/Send/1");
+ topic_pub_position = t1;
+ topic_sub_control = t2;
+ topic_pub_receive = t3;
+ topic_sub_send = t4;
+ subscribe_to_topic_control(topic_sub_control);
+ subscribe_to_topic_sync(topic_sub_send);
}
void Communication::setup_wifi() {
// Get a handle to the WiFi module
- wifi = WiFiInterface::get_default_instance();
+ this->wifi = WiFiInterface::get_default_instance();
// Connect the module to the wifi, based on the SSID and password
// specified in the mbed_app.json configuration file
@@ -46,7 +60,7 @@
// needed to register the device with AirPennNet-Device, so that you
// can use the campus wifi
pc_comm.printf("MAC Address: ");
- pc_comm.printf(wifi->get_mac_address());
+ pc_comm.printf(this->wifi->get_mac_address());
pc_comm.printf("\r\n");
if (rc != 0) {
@@ -58,7 +72,7 @@
}
-void Communication::setup_mqtt(MQTTNetwork &network) {
+void Communication::setup_mqtt(MQTTNetwork* network) {
// the hostname and port point to a Google Cloud MQTT server we setup for
// this project
const char* hostname = "34.68.206.11";
@@ -66,7 +80,7 @@
// Create the underlying network connection to the MQTT server
pc_comm.printf("Connecting to %s:%d\r\n", hostname, port);
- int rc = network.connect(hostname, port);
+ int rc = network->connect(hostname, port);
if (rc != 0) {
pc_comm.printf("There was an error with the TCP connect: %d\r\n", rc);
return;
@@ -75,8 +89,8 @@
pc_comm.printf("Connected to %s:%d\r\n", hostname, port);
// Connect the MQTT client to the server
- client = new MQTT::Client<MQTTNetwork, Countdown>(network);
- rc = client->connect();
+ this->client = new MQTT::Client<MQTTNetwork, Countdown>(*network);
+ rc = this->client->connect();
if (rc != 0) {
pc_comm.printf("There was an error with the MQTT connect: %d\r\n", rc);
return;
@@ -99,6 +113,7 @@
int id = (int) payload[0];
int speed = (int) payload[1];
+ pc_comm.printf("Control arrived for id %d, speed %d\r\n", id, speed);
speeds[id-1] = speed;
control_flags.set(flags[id-1]);
@@ -109,6 +124,7 @@
char* payload = (char*) message.payload;
sync = (int) payload[0];
+ pc_comm.printf("Sync arrived with number %d\r\n", sync);
sync_flags.set(0x01);
}
@@ -118,9 +134,13 @@
char buf[100];
sprintf(buf, "Hello World! This is a test message!\r\n");
- int rc = client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1);
+ if (this->client == NULL) {
+ pc_comm.printf("CLIENT IS NULL!!!\r\n");
+ }
+ int rc = this->client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1);
if (rc != 0) {
+ pc_comm.printf("Message failed!\r\n");
return -1;
} else {
pc_comm.printf("Message sent!\r\n");
@@ -128,23 +148,10 @@
}
}
-int Communication::subscribe_to_topic_position(char* topic) {
- pc_comm.printf("subcribing: %s", topic);
- int rc = client->subscribe(topic, MQTT::QOS1, message_arrived);
- if (rc != 0) {
- pc_comm.printf("There was a problem subscribing: %d\r\n", rc);
-
-// client->disconnect();
- return -1;
- } else {
- pc_comm.printf("Subscribed!\r\n");
- }
- return rc;
-}
int Communication::subscribe_to_topic_control(char* topic) {
pc_comm.printf("subcribing: %s\r\n", topic);
- int rc = client->subscribe(topic, MQTT::QOS1, control_arrived);
+ int rc = this->client->subscribe(topic, MQTT::QOS1, control_arrived);
if (rc != 0) {
pc_comm.printf("There was a problem subscribing: %d\r\n", rc);
@@ -158,7 +165,7 @@
int Communication::subscribe_to_topic_sync(char* topic) {
pc_comm.printf("subcribing: %s\r\n", topic);
- int rc = client->subscribe(topic, MQTT::QOS1, sync_arrived);
+ int rc = this->client->subscribe(topic, MQTT::QOS1, sync_arrived);
if (rc != 0) {
pc_comm.printf("There was a problem subscribing: %d\r\n", rc);
@@ -170,34 +177,84 @@
return rc;
}
-void Communication::publish_car(char* topic, int speed, int position) {
- message_t *message = mpool.alloc();
- message->topic = topic;
- message->speed = speed;
- message->position = position;
- q_car.put(message);
+void Communication::publish_car(int id, int speed, int position) {
+ message_t message;
+ message.id = (char) id;
+ message.speed = (char) speed;
+ message.position = (char) position;
+
+ pc_comm.printf("publish_car:%d,%d,%d\r\n",id, speed, position);
+ mutex->lock();
+ q->push(message);
+ mutex->unlock();
}
-void Communication::publish_road(char* topic, int num) {
- message_t *message = mpool.alloc();
- message->topic = topic;
- message->speed = -1; // indicate this is for sync
- message->position = num;
- q_car.put(message);
+void Communication::publish_road(int num) {
+ message_t message;
+ message.id = -1; // indicate this is for sync
+ message.position = (char) num; // use position as road update number
+
+ mutex->lock();
+ q->push(message);
+ mutex->unlock();
}
void Communication::start() {
- while(1) {
+ while (true) {
+ mutex->lock();
+// pc_comm.printf("queue size = %d\r\n", q.size());
+ if (q->size() > 0) {
+ message_t &message = q->front();
+ q->pop();
+ mutex->unlock();
+ if (message.id == 255 || message.id == -1) {
+ char buf[2];
+ buf[0] = (char) message.position; // update number
+ buf[1] = '\0';
+
+ int rc = this->client->publish(topic_pub_receive, (char*) buf, strlen(buf)+1, MQTT::QOS1);
+ if (rc != 0) {
+ pc_comm.printf("Failed to publish Road info to: %s", topic_pub_receive);
+ } else {
+ pc_comm.printf("Message sent! %s(%d)\r\n", topic_pub_receive, (int)buf[0]);
+ }
+ } else {
+ char buf[4];
+ buf[0] = message.id;
+ buf[1] = message.position;
+ buf[2] = message.speed;
+ buf[3] = '\0';
+
+ int rc = this->client->publish(topic_pub_position, (char*) buf, strlen(buf)+1, MQTT::QOS1);
+ if (rc != 0) {
+ pc_comm.printf("Failed to publish AccCar info to: %s", topic_pub_position);
+ } else {
+ pc_comm.printf("Message sent! (%d, %d, %d)\r\n", (int)buf[0], (int)buf[1], (int)buf[2]);
+ }
+ }
+// q->pop();
+// mutex->unlock();
+ this->client->yield(500); // wait for 100ms
+ } else {
+ mutex->unlock();
+ ThisThread::sleep_for(500);
+ }
+ }
+ /*
+ while(true) {
+ pc_comm.printf("queue size = %d\r\n", q_car.count());
osEvent evt = q_car.get();
if (evt.status == osEventMessage) {
+ pc_comm.printf("Gets one message.\r\n");
message_t *message = (message_t*)evt.value.p;
- char* topic = message->topic;
+// char* topic = message->topic;
+ char* topic = "Rahman/Sync/Send/1";
if (message->speed == -1) {
char buf[1];
buf[0] = (char) message->position; // update number
- int rc = client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1);
+ int rc = this->client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1);
if (rc != 0) {
pc_comm.printf("Failed to publish Road info: %s", topic);
} else {
@@ -208,7 +265,7 @@
buf[0] = (char) message->position;
buf[1] = (char) message->speed;
- int rc = client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1);
+ int rc = this->client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1);
if (rc != 0) {
pc_comm.printf("Failed to publish AccCar info: %s", topic);
} else {
@@ -216,13 +273,15 @@
}
}
mpool.free(message);
- client->yield(100); // wait for 100ms
+ this->client->yield(100); // wait for 100ms
}
}
+ */
}
void Communication::reset() {
control_flags.clear();
+ sync_flags.clear();
if (thread != NULL) {
thread->terminate();
--- a/Communication.h Tue Dec 10 23:40:25 2019 +0000
+++ b/Communication.h Wed Dec 11 20:20:12 2019 +0000
@@ -2,15 +2,19 @@
#define _COMMUNICATION_H_
#include "mbed.h"
+#include "rtos.h"
#include "MQTTNetwork.h"
#include "MQTTClient.h"
#include "MQTTmbed.h"
+#include <string.h>
+#include <queue>
+#include <mutex>
typedef struct {
- int speed; // speed for AccCar, -1 for Road
- int position; // position for AccCar, update number for Road
- char* topic;
+ char id;
+ char speed; // speed for AccCar, -1 for Road
+ char position; // position for AccCar, update number for Road
} message_t;
class Communication {
@@ -22,35 +26,40 @@
static int flags[5];
// Singleton
- static Communication *getInstance();
+ static Communication *getInstance(char* t1, char* t2, char* t3, char* t4);
// MQTT Setup
void setup_wifi();
- void setup_mqtt(MQTTNetwork &network);
+ void setup_mqtt(MQTTNetwork* network);
static void message_arrived(MQTT::MessageData& md);
static void control_arrived(MQTT::MessageData& md);
static void sync_arrived(MQTT::MessageData& md);
int send_message(char* topic);
- int subscribe_to_topic_position(char* topic);
int subscribe_to_topic_control(char* topic);
int subscribe_to_topic_sync(char* topic);
- void publish_car(char* topic, int speed, int position);
- void publish_road(char* topic, int num);
+ void publish_car(int id, int speed, int position);
+ void publish_road(int num);
void start(); // function for serving requests (single thread)
void reset(); // function to create and start the singleton thread
void stop();
-private:
+//private:
static Communication *singleton;
WiFiInterface *wifi;
+ MQTTNetwork* network;
MQTT::Client<MQTTNetwork, Countdown> *client;
- Thread* thread;
+ static Thread* thread;
+
+ Mutex* mutex;
+ std::queue<message_t>* q;
- MemoryPool<message_t, 10> mpool;
- Queue<message_t, 10> q_car;
+ char* topic_pub_position;
+ char* topic_sub_control;
+ char* topic_pub_receive;
+ char* topic_sub_send;
- Communication();
+ Communication(char* t1, char* t2, char* t3, char* t4);
};
--- a/Road.cpp Tue Dec 10 23:40:25 2019 +0000
+++ b/Road.cpp Wed Dec 11 20:20:12 2019 +0000
@@ -2,13 +2,10 @@
Serial pc_road(USBTX, USBRX);
-Road::Road(char* t1, char* t2) {
+Road::Road(Communication* c) {
active_cars = 0x00;
- topic_send = t1; // road publish, controller receive
- topic_receive = t2; // road receive, controller publish
- comm = Communication::getInstance();
- comm->subscribe_to_topic_sync(topic_receive); // for receiving sync messages
- n = 0;
+ comm = c;
+ n = 1;
}
void Road::add_car(Car* car) {
@@ -50,8 +47,8 @@
void Road::wait_for_car_update() {
done_flags.wait_all(active_cars);
- n = n + 1;
- comm->publish_road(topic_send, n);
+ comm->publish_road(n);
Communication::sync_flags.wait_all(0x01);
n = Communication::sync;
+ pc_road.printf("Update number %d\r\n", n);
}
--- a/Road.h Tue Dec 10 23:40:25 2019 +0000
+++ b/Road.h Wed Dec 11 20:20:12 2019 +0000
@@ -17,7 +17,7 @@
EventFlags done_flags;
Intersection *intersection;
- Road(char* t1, char* t2);
+ Road(Communication* c);
void add_car(Car* car);
void add_acc_car(AccCar* car, int id);
void let_cars_update();
--- a/main.cpp Tue Dec 10 23:40:25 2019 +0000
+++ b/main.cpp Wed Dec 11 20:20:12 2019 +0000
@@ -14,6 +14,7 @@
#include <algorithm>
#include <vector>
+#include <queue>
#include <string>
#include <stdlib.h>
@@ -62,34 +63,25 @@
Intersection intersection;
// Initialize Communication
- Communication* communication = Communication::getInstance();
- communication->send_message("Rahman/Sync/Send/1");
+
+ char buf1[30] = "Rahman/Position/2";
+ char buf2[30] = "Rahman/Control/2";
+ char buf3[50] = "Rahman/Sync/Receive/2";
+ char buf4[50] = "Rahman/Sync/Send/2";
+ Communication* c = Communication::getInstance(buf1, buf2, buf3, buf4);
+
// Initialize 5 AccCars and the Road
- char buf1[50] = "Rahman/Sync/Receive/1";
- char buf2[50] = "Rahman/Sync/Send/1";
- Road road1(buf1, buf2);
+
+ Road road1(c);
intersection.road1 = &road1;
road1.intersection = &intersection;
- char buf11[30] = "Rahman/Position/1/1";
- char buf12[30] = "Rahman/Control/1/1";
- AccCar car11(1, &road1, 0x01, buf11, buf12);
- char buf21[30] = "Rahman/Position/1/2";
- char buf22[30] = "Rahman/Control/1/2";
- AccCar car12(2, &road1, 0x02, buf21, buf22);
-
- char buf31[30] = "Rahman/Position/1/3";
- char buf32[30] = "Rahman/Control/1/3";
- AccCar car13(3, &road1, 0x04, buf31, buf32);
-
- char buf41[30] = "Rahman/Position/1/4";
- char buf42[30] = "Rahman/Control/1/4";
- AccCar car14(4, &road1, 0x08, buf41, buf42);
-
- char buf51[30] = "Rahman/Position/1/5";
- char buf52[30] = "Rahman/Control/1/5";
- AccCar car15(5, &road1, 0x10, buf51, buf52);
+ AccCar car11(1, &road1, 0x01, c);
+ AccCar car12(2, &road1, 0x02, c);
+ AccCar car13(3, &road1, 0x04, c);
+ AccCar car14(4, &road1, 0x08, c);
+ AccCar car15(5, &road1, 0x10, c);
std::vector<AccCar*> q1;
q1.push_back(&car15);
@@ -106,8 +98,10 @@
stopwatch.start();
- communication->reset(); // start thread
-
+ pc.printf("Dispatching communication thread.\r\n");
+
+ c->reset();
+
int interval = MAX_SPEED - MIN_SPEED + 1;
car11.reset(rand() % interval + MIN_SPEED); // set random speed [5, 15]
car12.reset(rand() % interval + MIN_SPEED);
@@ -115,6 +109,7 @@
car14.reset(rand() % interval + MIN_SPEED);
car15.reset(rand() % interval + MIN_SPEED);
+// c->reset();
stopwatch.reset();
int waitTime1 = 0;
@@ -190,7 +185,7 @@
car13.stop();
car14.stop();
car15.stop();
- communication->stop();
+ c->stop();
// ----------------------------------------------------------------------
// Timing statistics printout, do not modify
--- a/mbed_app.json Tue Dec 10 23:40:25 2019 +0000
+++ b/mbed_app.json Wed Dec 11 20:20:12 2019 +0000
@@ -24,7 +24,7 @@
"esp8266.provide-default" : true,
"esp8266.tx" : "p28",
"esp8266.rx" : "p27",
- "rtos.thread-stack-size": 512
+ "rtos.thread-stack-size": 1024
}
}
}
\ No newline at end of file