
not running
Revision 2:16b3bd337db2, committed 2019-12-11
- Comitter:
- hyan99
- Date:
- Wed Dec 11 20:20:12 2019 +0000
- Parent:
- 1:19c3299ea83a
- Commit message:
- testing
Changed in this revision
--- a/AccCar.cpp Tue Dec 10 23:40:25 2019 +0000 +++ b/AccCar.cpp Wed Dec 11 20:20:12 2019 +0000 @@ -11,7 +11,7 @@ Serial pc_acc(USBTX, USBRX); -AccCar::AccCar(int id, Road* road, int flag, char* t1, char* t2) { +AccCar::AccCar(int id, Road* road, int flag, Communication* c) { this->id = id; this->road = road; this->flag = flag; @@ -20,11 +20,7 @@ this->forward_car = NULL; this->cycle = 0; - this->comm = Communication::getInstance(); - this->topic_position = t1; - this->topic_control = t2; -// communication->subscribe_to_position(topic_position); - comm->subscribe_to_topic_control(topic_control); + this->comm = c; } void AccCar::set_forward_car(AccCar* car) { @@ -51,9 +47,12 @@ speed = target_speed; } // TODO: publish Position and Acc-speed - comm->publish_car(topic_position, speed, position); - Communication::control_flags.wait_all(flag); // wait for speed instruction from controller - speed = Communication::speeds[id - 1]; + if (position < 255) { + comm->publish_car(id, speed, position); + Communication::control_flags.wait_all(flag); // wait for speed instruction from controller + speed = Communication::speeds[id - 1]; + pc_acc.printf("Suggested speed: %d\r\n", speed); + } // // Crossing // if (position < STOP) { // speed = std::min(speed, STOP - position);
--- a/AccCar.h Tue Dec 10 23:40:25 2019 +0000 +++ b/AccCar.h Wed Dec 11 20:20:12 2019 +0000 @@ -16,7 +16,7 @@ int speed; int flag; - AccCar(int id, Road* road, int flag, char* t1, char* t2); + AccCar(int id, Road* road, int flag, Communication* c); void set_forward_car(AccCar* car); void update(); void reset(int speed);
--- a/Communication.cpp Tue Dec 10 23:40:25 2019 +0000 +++ b/Communication.cpp Wed Dec 11 20:20:12 2019 +0000 @@ -1,6 +1,7 @@ #include "Communication.h" Communication* Communication::singleton = NULL; +Thread* Communication::thread = NULL; int Communication::speeds[5] = {0, 0, 0, 0, 0}; int Communication::sync = 0; int Communication::flags[5] = {0x01, 0x02, 0x04, 0x08, 0x10}; @@ -8,32 +9,45 @@ EventFlags Communication::sync_flags; Serial pc_comm(USBTX, USBRX); +//Mutex mutex; +//std::vector<message_t> v; +//std::queue<message_t> q; +//Queue<message_t, 16> q_car; +//MemoryPool<message_t, 16> mpool; -Communication* Communication::getInstance() { +Communication* Communication::getInstance(char* t1, char* t2, char* t3, char* t4) { if (singleton == NULL) { - singleton = new Communication(); + singleton = new Communication(t1, t2, t3, t4); +// Communication(); } return singleton; } -Communication::Communication() +Communication::Communication(char* t1, char* t2, char* t3, char* t4) { // singleton = this; + mutex = new Mutex(); + q = new std::queue<message_t>(); setup_wifi(); if (wifi == NULL) { pc_comm.printf("Failed to set up Wifi."); } - MQTTNetwork network(wifi); + network = new MQTTNetwork(wifi); setup_mqtt(network); - if (client == NULL) { + if (this->client == NULL) { pc_comm.printf("Failed to set up Client."); } - subscribe_to_topic_sync("Rahman/Sync/Send/1"); + topic_pub_position = t1; + topic_sub_control = t2; + topic_pub_receive = t3; + topic_sub_send = t4; + subscribe_to_topic_control(topic_sub_control); + subscribe_to_topic_sync(topic_sub_send); } void Communication::setup_wifi() { // Get a handle to the WiFi module - wifi = WiFiInterface::get_default_instance(); + this->wifi = WiFiInterface::get_default_instance(); // Connect the module to the wifi, based on the SSID and password // specified in the mbed_app.json configuration file @@ -46,7 +60,7 @@ // needed to register the device with AirPennNet-Device, so that you // can use the campus wifi pc_comm.printf("MAC Address: "); - pc_comm.printf(wifi->get_mac_address()); + pc_comm.printf(this->wifi->get_mac_address()); pc_comm.printf("\r\n"); if (rc != 0) { @@ -58,7 +72,7 @@ } -void Communication::setup_mqtt(MQTTNetwork &network) { +void Communication::setup_mqtt(MQTTNetwork* network) { // the hostname and port point to a Google Cloud MQTT server we setup for // this project const char* hostname = "34.68.206.11"; @@ -66,7 +80,7 @@ // Create the underlying network connection to the MQTT server pc_comm.printf("Connecting to %s:%d\r\n", hostname, port); - int rc = network.connect(hostname, port); + int rc = network->connect(hostname, port); if (rc != 0) { pc_comm.printf("There was an error with the TCP connect: %d\r\n", rc); return; @@ -75,8 +89,8 @@ pc_comm.printf("Connected to %s:%d\r\n", hostname, port); // Connect the MQTT client to the server - client = new MQTT::Client<MQTTNetwork, Countdown>(network); - rc = client->connect(); + this->client = new MQTT::Client<MQTTNetwork, Countdown>(*network); + rc = this->client->connect(); if (rc != 0) { pc_comm.printf("There was an error with the MQTT connect: %d\r\n", rc); return; @@ -99,6 +113,7 @@ int id = (int) payload[0]; int speed = (int) payload[1]; + pc_comm.printf("Control arrived for id %d, speed %d\r\n", id, speed); speeds[id-1] = speed; control_flags.set(flags[id-1]); @@ -109,6 +124,7 @@ char* payload = (char*) message.payload; sync = (int) payload[0]; + pc_comm.printf("Sync arrived with number %d\r\n", sync); sync_flags.set(0x01); } @@ -118,9 +134,13 @@ char buf[100]; sprintf(buf, "Hello World! This is a test message!\r\n"); - int rc = client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1); + if (this->client == NULL) { + pc_comm.printf("CLIENT IS NULL!!!\r\n"); + } + int rc = this->client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1); if (rc != 0) { + pc_comm.printf("Message failed!\r\n"); return -1; } else { pc_comm.printf("Message sent!\r\n"); @@ -128,23 +148,10 @@ } } -int Communication::subscribe_to_topic_position(char* topic) { - pc_comm.printf("subcribing: %s", topic); - int rc = client->subscribe(topic, MQTT::QOS1, message_arrived); - if (rc != 0) { - pc_comm.printf("There was a problem subscribing: %d\r\n", rc); - -// client->disconnect(); - return -1; - } else { - pc_comm.printf("Subscribed!\r\n"); - } - return rc; -} int Communication::subscribe_to_topic_control(char* topic) { pc_comm.printf("subcribing: %s\r\n", topic); - int rc = client->subscribe(topic, MQTT::QOS1, control_arrived); + int rc = this->client->subscribe(topic, MQTT::QOS1, control_arrived); if (rc != 0) { pc_comm.printf("There was a problem subscribing: %d\r\n", rc); @@ -158,7 +165,7 @@ int Communication::subscribe_to_topic_sync(char* topic) { pc_comm.printf("subcribing: %s\r\n", topic); - int rc = client->subscribe(topic, MQTT::QOS1, sync_arrived); + int rc = this->client->subscribe(topic, MQTT::QOS1, sync_arrived); if (rc != 0) { pc_comm.printf("There was a problem subscribing: %d\r\n", rc); @@ -170,34 +177,84 @@ return rc; } -void Communication::publish_car(char* topic, int speed, int position) { - message_t *message = mpool.alloc(); - message->topic = topic; - message->speed = speed; - message->position = position; - q_car.put(message); +void Communication::publish_car(int id, int speed, int position) { + message_t message; + message.id = (char) id; + message.speed = (char) speed; + message.position = (char) position; + + pc_comm.printf("publish_car:%d,%d,%d\r\n",id, speed, position); + mutex->lock(); + q->push(message); + mutex->unlock(); } -void Communication::publish_road(char* topic, int num) { - message_t *message = mpool.alloc(); - message->topic = topic; - message->speed = -1; // indicate this is for sync - message->position = num; - q_car.put(message); +void Communication::publish_road(int num) { + message_t message; + message.id = -1; // indicate this is for sync + message.position = (char) num; // use position as road update number + + mutex->lock(); + q->push(message); + mutex->unlock(); } void Communication::start() { - while(1) { + while (true) { + mutex->lock(); +// pc_comm.printf("queue size = %d\r\n", q.size()); + if (q->size() > 0) { + message_t &message = q->front(); + q->pop(); + mutex->unlock(); + if (message.id == 255 || message.id == -1) { + char buf[2]; + buf[0] = (char) message.position; // update number + buf[1] = '\0'; + + int rc = this->client->publish(topic_pub_receive, (char*) buf, strlen(buf)+1, MQTT::QOS1); + if (rc != 0) { + pc_comm.printf("Failed to publish Road info to: %s", topic_pub_receive); + } else { + pc_comm.printf("Message sent! %s(%d)\r\n", topic_pub_receive, (int)buf[0]); + } + } else { + char buf[4]; + buf[0] = message.id; + buf[1] = message.position; + buf[2] = message.speed; + buf[3] = '\0'; + + int rc = this->client->publish(topic_pub_position, (char*) buf, strlen(buf)+1, MQTT::QOS1); + if (rc != 0) { + pc_comm.printf("Failed to publish AccCar info to: %s", topic_pub_position); + } else { + pc_comm.printf("Message sent! (%d, %d, %d)\r\n", (int)buf[0], (int)buf[1], (int)buf[2]); + } + } +// q->pop(); +// mutex->unlock(); + this->client->yield(500); // wait for 100ms + } else { + mutex->unlock(); + ThisThread::sleep_for(500); + } + } + /* + while(true) { + pc_comm.printf("queue size = %d\r\n", q_car.count()); osEvent evt = q_car.get(); if (evt.status == osEventMessage) { + pc_comm.printf("Gets one message.\r\n"); message_t *message = (message_t*)evt.value.p; - char* topic = message->topic; +// char* topic = message->topic; + char* topic = "Rahman/Sync/Send/1"; if (message->speed == -1) { char buf[1]; buf[0] = (char) message->position; // update number - int rc = client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1); + int rc = this->client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1); if (rc != 0) { pc_comm.printf("Failed to publish Road info: %s", topic); } else { @@ -208,7 +265,7 @@ buf[0] = (char) message->position; buf[1] = (char) message->speed; - int rc = client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1); + int rc = this->client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1); if (rc != 0) { pc_comm.printf("Failed to publish AccCar info: %s", topic); } else { @@ -216,13 +273,15 @@ } } mpool.free(message); - client->yield(100); // wait for 100ms + this->client->yield(100); // wait for 100ms } } + */ } void Communication::reset() { control_flags.clear(); + sync_flags.clear(); if (thread != NULL) { thread->terminate();
--- a/Communication.h Tue Dec 10 23:40:25 2019 +0000 +++ b/Communication.h Wed Dec 11 20:20:12 2019 +0000 @@ -2,15 +2,19 @@ #define _COMMUNICATION_H_ #include "mbed.h" +#include "rtos.h" #include "MQTTNetwork.h" #include "MQTTClient.h" #include "MQTTmbed.h" +#include <string.h> +#include <queue> +#include <mutex> typedef struct { - int speed; // speed for AccCar, -1 for Road - int position; // position for AccCar, update number for Road - char* topic; + char id; + char speed; // speed for AccCar, -1 for Road + char position; // position for AccCar, update number for Road } message_t; class Communication { @@ -22,35 +26,40 @@ static int flags[5]; // Singleton - static Communication *getInstance(); + static Communication *getInstance(char* t1, char* t2, char* t3, char* t4); // MQTT Setup void setup_wifi(); - void setup_mqtt(MQTTNetwork &network); + void setup_mqtt(MQTTNetwork* network); static void message_arrived(MQTT::MessageData& md); static void control_arrived(MQTT::MessageData& md); static void sync_arrived(MQTT::MessageData& md); int send_message(char* topic); - int subscribe_to_topic_position(char* topic); int subscribe_to_topic_control(char* topic); int subscribe_to_topic_sync(char* topic); - void publish_car(char* topic, int speed, int position); - void publish_road(char* topic, int num); + void publish_car(int id, int speed, int position); + void publish_road(int num); void start(); // function for serving requests (single thread) void reset(); // function to create and start the singleton thread void stop(); -private: +//private: static Communication *singleton; WiFiInterface *wifi; + MQTTNetwork* network; MQTT::Client<MQTTNetwork, Countdown> *client; - Thread* thread; + static Thread* thread; + + Mutex* mutex; + std::queue<message_t>* q; - MemoryPool<message_t, 10> mpool; - Queue<message_t, 10> q_car; + char* topic_pub_position; + char* topic_sub_control; + char* topic_pub_receive; + char* topic_sub_send; - Communication(); + Communication(char* t1, char* t2, char* t3, char* t4); };
--- a/Road.cpp Tue Dec 10 23:40:25 2019 +0000 +++ b/Road.cpp Wed Dec 11 20:20:12 2019 +0000 @@ -2,13 +2,10 @@ Serial pc_road(USBTX, USBRX); -Road::Road(char* t1, char* t2) { +Road::Road(Communication* c) { active_cars = 0x00; - topic_send = t1; // road publish, controller receive - topic_receive = t2; // road receive, controller publish - comm = Communication::getInstance(); - comm->subscribe_to_topic_sync(topic_receive); // for receiving sync messages - n = 0; + comm = c; + n = 1; } void Road::add_car(Car* car) { @@ -50,8 +47,8 @@ void Road::wait_for_car_update() { done_flags.wait_all(active_cars); - n = n + 1; - comm->publish_road(topic_send, n); + comm->publish_road(n); Communication::sync_flags.wait_all(0x01); n = Communication::sync; + pc_road.printf("Update number %d\r\n", n); }
--- a/Road.h Tue Dec 10 23:40:25 2019 +0000 +++ b/Road.h Wed Dec 11 20:20:12 2019 +0000 @@ -17,7 +17,7 @@ EventFlags done_flags; Intersection *intersection; - Road(char* t1, char* t2); + Road(Communication* c); void add_car(Car* car); void add_acc_car(AccCar* car, int id); void let_cars_update();
--- a/main.cpp Tue Dec 10 23:40:25 2019 +0000 +++ b/main.cpp Wed Dec 11 20:20:12 2019 +0000 @@ -14,6 +14,7 @@ #include <algorithm> #include <vector> +#include <queue> #include <string> #include <stdlib.h> @@ -62,34 +63,25 @@ Intersection intersection; // Initialize Communication - Communication* communication = Communication::getInstance(); - communication->send_message("Rahman/Sync/Send/1"); + + char buf1[30] = "Rahman/Position/2"; + char buf2[30] = "Rahman/Control/2"; + char buf3[50] = "Rahman/Sync/Receive/2"; + char buf4[50] = "Rahman/Sync/Send/2"; + Communication* c = Communication::getInstance(buf1, buf2, buf3, buf4); + // Initialize 5 AccCars and the Road - char buf1[50] = "Rahman/Sync/Receive/1"; - char buf2[50] = "Rahman/Sync/Send/1"; - Road road1(buf1, buf2); + + Road road1(c); intersection.road1 = &road1; road1.intersection = &intersection; - char buf11[30] = "Rahman/Position/1/1"; - char buf12[30] = "Rahman/Control/1/1"; - AccCar car11(1, &road1, 0x01, buf11, buf12); - char buf21[30] = "Rahman/Position/1/2"; - char buf22[30] = "Rahman/Control/1/2"; - AccCar car12(2, &road1, 0x02, buf21, buf22); - - char buf31[30] = "Rahman/Position/1/3"; - char buf32[30] = "Rahman/Control/1/3"; - AccCar car13(3, &road1, 0x04, buf31, buf32); - - char buf41[30] = "Rahman/Position/1/4"; - char buf42[30] = "Rahman/Control/1/4"; - AccCar car14(4, &road1, 0x08, buf41, buf42); - - char buf51[30] = "Rahman/Position/1/5"; - char buf52[30] = "Rahman/Control/1/5"; - AccCar car15(5, &road1, 0x10, buf51, buf52); + AccCar car11(1, &road1, 0x01, c); + AccCar car12(2, &road1, 0x02, c); + AccCar car13(3, &road1, 0x04, c); + AccCar car14(4, &road1, 0x08, c); + AccCar car15(5, &road1, 0x10, c); std::vector<AccCar*> q1; q1.push_back(&car15); @@ -106,8 +98,10 @@ stopwatch.start(); - communication->reset(); // start thread - + pc.printf("Dispatching communication thread.\r\n"); + + c->reset(); + int interval = MAX_SPEED - MIN_SPEED + 1; car11.reset(rand() % interval + MIN_SPEED); // set random speed [5, 15] car12.reset(rand() % interval + MIN_SPEED); @@ -115,6 +109,7 @@ car14.reset(rand() % interval + MIN_SPEED); car15.reset(rand() % interval + MIN_SPEED); +// c->reset(); stopwatch.reset(); int waitTime1 = 0; @@ -190,7 +185,7 @@ car13.stop(); car14.stop(); car15.stop(); - communication->stop(); + c->stop(); // ---------------------------------------------------------------------- // Timing statistics printout, do not modify
--- a/mbed_app.json Tue Dec 10 23:40:25 2019 +0000 +++ b/mbed_app.json Wed Dec 11 20:20:12 2019 +0000 @@ -24,7 +24,7 @@ "esp8266.provide-default" : true, "esp8266.tx" : "p28", "esp8266.rx" : "p27", - "rtos.thread-stack-size": 512 + "rtos.thread-stack-size": 1024 } } } \ No newline at end of file