
not running
Diff: Communication.cpp
- Revision:
- 0:3b4906b8a747
- Child:
- 1:19c3299ea83a
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Communication.cpp Tue Dec 10 22:29:09 2019 +0000 @@ -0,0 +1,235 @@ +#include "Communication.h" + +Communication* Communication::singleton = NULL; +int Communication::speeds[5] = {0, 0, 0, 0, 0}; +int Communication::sync = 0; +int Communication::flags[5] = {0x01, 0x02, 0x04, 0x08, 0x10}; +EventFlags Communication::control_flags; +EventFlags Communication::sync_flags; + +Serial pc_comm(USBTX, USBRX); + +Communication* Communication::getInstance() { + if (singleton == NULL) { + Communication(); + } + return singleton; +} + +Communication::Communication() +{ + singleton = this; + setup_wifi(); + if (wifi == NULL) { + pc_comm.printf("Failed to set up Wifi."); + } + MQTTNetwork network(wifi); + setup_mqtt(network); + if (client == NULL) { + pc_comm.printf("Failed to set up Client."); + } +} + +void Communication::setup_wifi() { + // Get a handle to the WiFi module + wifi = WiFiInterface::get_default_instance(); + + // Connect the module to the wifi, based on the SSID and password + // specified in the mbed_app.json configuration file + // If you are using AirPennNet-Device, this will not succeed until the MAC + // address (printed shortly after this) is registered + pc_comm.printf("Connecting to wifi\r\n"); + int rc = wifi->connect(MBED_CONF_APP_WIFI_SSID, MBED_CONF_APP_WIFI_PASSWORD, NSAPI_SECURITY_WPA_WPA2); + + // Print out the MAC address of the wifi module. The MAC address is + // needed to register the device with AirPennNet-Device, so that you + // can use the campus wifi + pc_comm.printf("MAC Address: "); + pc_comm.printf(wifi->get_mac_address()); + pc_comm.printf("\r\n"); + + if (rc != 0) { + pc_comm.printf("Problem connecting to wifi\r\n"); + return; + } else { + pc_comm.printf("Wifi connected\r\n"); + } +} + + +void Communication::setup_mqtt(MQTTNetwork &network) { + // the hostname and port point to a Google Cloud MQTT server we setup for + // this project + const char* hostname = "34.68.206.11"; + int port = 1883; + + // Create the underlying network connection to the MQTT server + pc_comm.printf("Connecting to %s:%d\r\n", hostname, port); + int rc = network.connect(hostname, port); + if (rc != 0) { + pc_comm.printf("There was an error with the TCP connect: %d\r\n", rc); + return; + } + + pc_comm.printf("Connected to %s:%d\r\n", hostname, port); + + // Connect the MQTT client to the server + client = new MQTT::Client<MQTTNetwork, Countdown>(network); + rc = client->connect(); + if (rc != 0) { + pc_comm.printf("There was an error with the MQTT connect: %d\r\n", rc); + return; + } + + pc_comm.printf("MQTT connect successful!\r\n"); +} + +void Communication::message_arrived(MQTT::MessageData& md) +{ + MQTT::Message &message = md.message; + pc_comm.printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id); + pc_comm.printf("Payload %.*s\r\n", message.payloadlen, (char*)message.payload); +} + +void Communication::control_arrived(MQTT::MessageData& md) { + MQTT::Message &message = md.message; + + char* payload = (char*) message.payload; + + int id = (int) payload[0]; + int speed = (int) payload[1]; + + speeds[id-1] = speed; + control_flags.set(flags[id-1]); +} + +void Communication::sync_arrived(MQTT::MessageData& md) { + MQTT::Message &message = md.message; + + char* payload = (char*) message.payload; + sync = (int) payload[0]; + sync_flags.set(0x01); +} + +int Communication::send_message(char* topic) { + pc_comm.printf("Sending a message!\r\n"); + MQTT::Message message; + + char buf[100]; + sprintf(buf, "Hello World! This is a test message!\r\n"); + int rc = client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1); + + if (rc != 0) { + return -1; + } else { + pc_comm.printf("Message sent!\r\n"); + return 0; + } +} + +int Communication::subscribe_to_topic_position(char* topic) { + int rc = client->subscribe(topic, MQTT::QOS1, message_arrived); + if (rc != 0) { + pc_comm.printf("There was a problem subscribing: %d\r\n", rc); + +// client->disconnect(); + return -1; + } else { + pc_comm.printf("Subscribed!\r\n"); + } + return rc; +} + +int Communication::subscribe_to_topic_control(char* topic) { + int rc = client->subscribe(topic, MQTT::QOS1, control_arrived); + if (rc != 0) { + pc_comm.printf("There was a problem subscribing: %d\r\n", rc); + +// client->disconnect(); + return -1; + } else { + pc_comm.printf("Subscribed!\r\n"); + } + return rc; +} + +int Communication::subscribe_to_topic_sync(char* topic) { + int rc = client->subscribe(topic, MQTT::QOS1, message_arrived); + if (rc != 0) { + pc_comm.printf("There was a problem subscribing: %d\r\n", rc); + +// client->disconnect(); + return -1; + } else { + pc_comm.printf("Subscribed!\r\n"); + } + return rc; +} + +void Communication::publish_car(char* topic, int speed, int position) { + message_t *message = mpool.alloc(); + message->topic = topic; + message->speed = speed; + message->position = position; + q_car.put(message); +} + +void Communication::publish_road(char* topic, int num) { + message_t *message = mpool.alloc(); + message->topic = topic; + message->speed = -1; // indicate this is for sync + message->position = num; + q_car.put(message); +} + +void Communication::start() { + while(1) { + osEvent evt = q_car.get(); + if (evt.status == osEventMessage) { + message_t *message = (message_t*)evt.value.p; + char* topic = message->topic; + + if (message->speed == -1) { + char buf[1]; + buf[0] = (char) message->position; // update number + + int rc = client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1); + if (rc != 0) { + pc_comm.printf("Failed to publish Road info: %s", topic); + } else { + pc_comm.printf("Message sent!\r\n"); + } + } else { + char buf[2]; + buf[0] = (char) message->position; + buf[1] = (char) message->speed; + + int rc = client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1); + if (rc != 0) { + pc_comm.printf("Failed to publish AccCar info: %s", topic); + } else { + pc_comm.printf("Message sent!\r\n"); + } + } + mpool.free(message); + client->yield(100); // wait for 100ms + } + } +} + +void Communication::reset() { + control_flags.clear(); + + if (thread != NULL) { + thread->terminate(); + } + + thread = new Thread(); + thread->start( callback(this, &Communication::start) ); +} + +void Communication::stop() { + if (thread != NULL) { + thread->terminate(); + } +} \ No newline at end of file