
not running
Communication.cpp
- Committer:
- hyan99
- Date:
- 2019-12-10
- Revision:
- 1:19c3299ea83a
- Parent:
- 0:3b4906b8a747
- Child:
- 2:16b3bd337db2
File content as of revision 1:19c3299ea83a:
#include "Communication.h" Communication* Communication::singleton = NULL; int Communication::speeds[5] = {0, 0, 0, 0, 0}; int Communication::sync = 0; int Communication::flags[5] = {0x01, 0x02, 0x04, 0x08, 0x10}; EventFlags Communication::control_flags; EventFlags Communication::sync_flags; Serial pc_comm(USBTX, USBRX); Communication* Communication::getInstance() { if (singleton == NULL) { singleton = new Communication(); } return singleton; } Communication::Communication() { // singleton = this; setup_wifi(); if (wifi == NULL) { pc_comm.printf("Failed to set up Wifi."); } MQTTNetwork network(wifi); setup_mqtt(network); if (client == NULL) { pc_comm.printf("Failed to set up Client."); } subscribe_to_topic_sync("Rahman/Sync/Send/1"); } void Communication::setup_wifi() { // Get a handle to the WiFi module wifi = WiFiInterface::get_default_instance(); // Connect the module to the wifi, based on the SSID and password // specified in the mbed_app.json configuration file // If you are using AirPennNet-Device, this will not succeed until the MAC // address (printed shortly after this) is registered pc_comm.printf("Connecting to wifi\r\n"); int rc = wifi->connect(MBED_CONF_APP_WIFI_SSID, MBED_CONF_APP_WIFI_PASSWORD, NSAPI_SECURITY_WPA_WPA2); pc_comm.printf("Finished connected to wifi\r\n"); // Print out the MAC address of the wifi module. The MAC address is // needed to register the device with AirPennNet-Device, so that you // can use the campus wifi pc_comm.printf("MAC Address: "); pc_comm.printf(wifi->get_mac_address()); pc_comm.printf("\r\n"); if (rc != 0) { pc_comm.printf("Problem connecting to wifi\r\n"); return; } else { pc_comm.printf("Wifi connected\r\n"); } } void Communication::setup_mqtt(MQTTNetwork &network) { // the hostname and port point to a Google Cloud MQTT server we setup for // this project const char* hostname = "34.68.206.11"; int port = 1883; // Create the underlying network connection to the MQTT server pc_comm.printf("Connecting to %s:%d\r\n", hostname, port); int rc = network.connect(hostname, port); if (rc != 0) { pc_comm.printf("There was an error with the TCP connect: %d\r\n", rc); return; } pc_comm.printf("Connected to %s:%d\r\n", hostname, port); // Connect the MQTT client to the server client = new MQTT::Client<MQTTNetwork, Countdown>(network); rc = client->connect(); if (rc != 0) { pc_comm.printf("There was an error with the MQTT connect: %d\r\n", rc); return; } pc_comm.printf("MQTT connect successful!\r\n"); } void Communication::message_arrived(MQTT::MessageData& md) { MQTT::Message &message = md.message; pc_comm.printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id); pc_comm.printf("Payload %.*s\r\n", message.payloadlen, (char*)message.payload); } void Communication::control_arrived(MQTT::MessageData& md) { MQTT::Message &message = md.message; char* payload = (char*) message.payload; int id = (int) payload[0]; int speed = (int) payload[1]; speeds[id-1] = speed; control_flags.set(flags[id-1]); } void Communication::sync_arrived(MQTT::MessageData& md) { MQTT::Message &message = md.message; char* payload = (char*) message.payload; sync = (int) payload[0]; sync_flags.set(0x01); } int Communication::send_message(char* topic) { pc_comm.printf("Sending a message!\r\n"); MQTT::Message message; char buf[100]; sprintf(buf, "Hello World! This is a test message!\r\n"); int rc = client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1); if (rc != 0) { return -1; } else { pc_comm.printf("Message sent!\r\n"); return 0; } } int Communication::subscribe_to_topic_position(char* topic) { pc_comm.printf("subcribing: %s", topic); int rc = client->subscribe(topic, MQTT::QOS1, message_arrived); if (rc != 0) { pc_comm.printf("There was a problem subscribing: %d\r\n", rc); // client->disconnect(); return -1; } else { pc_comm.printf("Subscribed!\r\n"); } return rc; } int Communication::subscribe_to_topic_control(char* topic) { pc_comm.printf("subcribing: %s\r\n", topic); int rc = client->subscribe(topic, MQTT::QOS1, control_arrived); if (rc != 0) { pc_comm.printf("There was a problem subscribing: %d\r\n", rc); // client->disconnect(); return -1; } else { pc_comm.printf("Subscribed!\r\n"); } return rc; } int Communication::subscribe_to_topic_sync(char* topic) { pc_comm.printf("subcribing: %s\r\n", topic); int rc = client->subscribe(topic, MQTT::QOS1, sync_arrived); if (rc != 0) { pc_comm.printf("There was a problem subscribing: %d\r\n", rc); // client->disconnect(); return -1; } else { pc_comm.printf("Subscribed!\r\n"); } return rc; } void Communication::publish_car(char* topic, int speed, int position) { message_t *message = mpool.alloc(); message->topic = topic; message->speed = speed; message->position = position; q_car.put(message); } void Communication::publish_road(char* topic, int num) { message_t *message = mpool.alloc(); message->topic = topic; message->speed = -1; // indicate this is for sync message->position = num; q_car.put(message); } void Communication::start() { while(1) { osEvent evt = q_car.get(); if (evt.status == osEventMessage) { message_t *message = (message_t*)evt.value.p; char* topic = message->topic; if (message->speed == -1) { char buf[1]; buf[0] = (char) message->position; // update number int rc = client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1); if (rc != 0) { pc_comm.printf("Failed to publish Road info: %s", topic); } else { pc_comm.printf("Message sent!\r\n"); } } else { char buf[2]; buf[0] = (char) message->position; buf[1] = (char) message->speed; int rc = client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1); if (rc != 0) { pc_comm.printf("Failed to publish AccCar info: %s", topic); } else { pc_comm.printf("Message sent!\r\n"); } } mpool.free(message); client->yield(100); // wait for 100ms } } } void Communication::reset() { control_flags.clear(); if (thread != NULL) { thread->terminate(); } thread = new Thread(); thread->start( callback(this, &Communication::start) ); } void Communication::stop() { if (thread != NULL) { thread->terminate(); } }