not running

Dependencies:   TextLCD MQTT

Communication.cpp

Committer:
hyan99
Date:
2019-12-11
Revision:
2:16b3bd337db2
Parent:
1:19c3299ea83a

File content as of revision 2:16b3bd337db2:

#include "Communication.h"

Communication* Communication::singleton = NULL;
Thread* Communication::thread = NULL;
int Communication::speeds[5] = {0, 0, 0, 0, 0};
int Communication::sync = 0;
int Communication::flags[5] = {0x01, 0x02, 0x04, 0x08, 0x10};
EventFlags Communication::control_flags;
EventFlags Communication::sync_flags;

Serial pc_comm(USBTX, USBRX);
//Mutex mutex;
//std::vector<message_t> v;
//std::queue<message_t> q;
//Queue<message_t, 16> q_car;
//MemoryPool<message_t, 16> mpool;

Communication* Communication::getInstance(char* t1, char* t2, char* t3, char* t4) {
    if (singleton == NULL) {
        singleton = new Communication(t1, t2, t3, t4);
//        Communication();
    }
    return singleton;
}

Communication::Communication(char* t1, char* t2, char* t3, char* t4)
{
//    singleton = this;
    mutex = new Mutex();
    q = new std::queue<message_t>();
    setup_wifi();
    if (wifi == NULL) {
        pc_comm.printf("Failed to set up Wifi.");
    }
    network = new MQTTNetwork(wifi);
    setup_mqtt(network);
    if (this->client == NULL) {
        pc_comm.printf("Failed to set up Client.");
    }
    topic_pub_position = t1;
    topic_sub_control = t2;
    topic_pub_receive = t3;
    topic_sub_send = t4;
    subscribe_to_topic_control(topic_sub_control);
    subscribe_to_topic_sync(topic_sub_send);
}

void Communication::setup_wifi() {
    // Get a handle to the WiFi module
    this->wifi = WiFiInterface::get_default_instance();
    
    // Connect the module to the wifi, based on the SSID and password 
    // specified in the mbed_app.json configuration file
    // If you are using AirPennNet-Device, this will not succeed until the MAC
    // address (printed shortly after this) is registered
    pc_comm.printf("Connecting to wifi\r\n");
    int rc = wifi->connect(MBED_CONF_APP_WIFI_SSID, MBED_CONF_APP_WIFI_PASSWORD, NSAPI_SECURITY_WPA_WPA2);
    pc_comm.printf("Finished connected to wifi\r\n");
    // Print out the MAC address of the wifi module. The MAC address is 
    // needed to register the device with AirPennNet-Device, so that you
    // can use the campus wifi
    pc_comm.printf("MAC Address: ");
    pc_comm.printf(this->wifi->get_mac_address());
    pc_comm.printf("\r\n");
    
    if (rc != 0) {
        pc_comm.printf("Problem connecting to wifi\r\n");   
        return;
    } else {
        pc_comm.printf("Wifi connected\r\n");  
    }
}


void Communication::setup_mqtt(MQTTNetwork* network) {
    // the hostname and port point to a Google Cloud MQTT server we setup for
    // this project
    const char* hostname = "34.68.206.11";
    int port = 1883;
    
    // Create the underlying network connection to the MQTT server
    pc_comm.printf("Connecting to %s:%d\r\n", hostname, port);
    int rc = network->connect(hostname, port);
    if (rc != 0) {
        pc_comm.printf("There was an error with the TCP connect: %d\r\n", rc);
        return;
    }
    
    pc_comm.printf("Connected to %s:%d\r\n", hostname, port);
        
    // Connect the MQTT client to the server
    this->client = new MQTT::Client<MQTTNetwork, Countdown>(*network);
    rc = this->client->connect();
    if (rc != 0) {
        pc_comm.printf("There was an error with the MQTT connect: %d\r\n", rc);
        return;
    }
    
    pc_comm.printf("MQTT connect successful!\r\n");
}

void Communication::message_arrived(MQTT::MessageData& md)
{
    MQTT::Message &message = md.message;
    pc_comm.printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
    pc_comm.printf("Payload %.*s\r\n", message.payloadlen, (char*)message.payload);
}

void Communication::control_arrived(MQTT::MessageData& md) {
    MQTT::Message &message = md.message;
    
    char* payload = (char*) message.payload;
    
    int id = (int) payload[0];
    int speed = (int) payload[1];
    pc_comm.printf("Control arrived for id %d, speed %d\r\n", id, speed);
    
    speeds[id-1] = speed;
    control_flags.set(flags[id-1]);
}

void Communication::sync_arrived(MQTT::MessageData& md) {
    MQTT::Message &message = md.message;
    
    char* payload = (char*) message.payload;
    sync = (int) payload[0];
    pc_comm.printf("Sync arrived with number %d\r\n", sync);
    sync_flags.set(0x01);
}

int Communication::send_message(char* topic) {
    pc_comm.printf("Sending a message!\r\n");
    MQTT::Message message;
 
    char buf[100];
    sprintf(buf, "Hello World!  This is a test message!\r\n");
    if (this->client == NULL) {
        pc_comm.printf("CLIENT IS NULL!!!\r\n");
    }
    int rc = this->client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1);
    
    if (rc != 0) {
        pc_comm.printf("Message failed!\r\n");
        return -1;   
    } else {
        pc_comm.printf("Message sent!\r\n");
        return 0;
    }
}


int Communication::subscribe_to_topic_control(char* topic) {
    pc_comm.printf("subcribing: %s\r\n", topic);
    int rc = this->client->subscribe(topic, MQTT::QOS1, control_arrived);
    if (rc != 0) {
        pc_comm.printf("There was a problem subscribing: %d\r\n", rc);
        
//        client->disconnect();
        return -1;
    } else {
        pc_comm.printf("Subscribed!\r\n");
    }
    return rc;
}

int Communication::subscribe_to_topic_sync(char* topic) {
    pc_comm.printf("subcribing: %s\r\n", topic);
    int rc = this->client->subscribe(topic, MQTT::QOS1, sync_arrived);
    if (rc != 0) {
        pc_comm.printf("There was a problem subscribing: %d\r\n", rc);
        
//        client->disconnect();
        return -1;
    } else {
        pc_comm.printf("Subscribed!\r\n");
    }
    return rc;
}
    
void Communication::publish_car(int id, int speed, int position) {
    message_t message;
    message.id = (char) id;
    message.speed = (char) speed;
    message.position = (char) position;
    
    pc_comm.printf("publish_car:%d,%d,%d\r\n",id, speed, position);
    mutex->lock();
    q->push(message);
    mutex->unlock();
}

void Communication::publish_road(int num) {
    message_t message;
    message.id = -1; // indicate this is for sync
    message.position = (char) num; // use position as road update number
    
    mutex->lock();
    q->push(message);
    mutex->unlock();
}

void Communication::start() {
    while (true) {
        mutex->lock();
//        pc_comm.printf("queue size = %d\r\n", q.size());
        if (q->size() > 0) {
            message_t &message = q->front();
            q->pop();
            mutex->unlock();
            if (message.id == 255 || message.id == -1) {
                char buf[2];
                buf[0] = (char) message.position; // update number
                buf[1] = '\0';
                
                int rc = this->client->publish(topic_pub_receive, (char*) buf, strlen(buf)+1, MQTT::QOS1);
                if (rc != 0) {
                    pc_comm.printf("Failed to publish Road info to: %s", topic_pub_receive);
                } else {
                    pc_comm.printf("Message sent! %s(%d)\r\n", topic_pub_receive, (int)buf[0]);
                }
            } else {
                char buf[4];
                buf[0] = message.id;
                buf[1] = message.position;
                buf[2] = message.speed;
                buf[3] = '\0';
                
                int rc = this->client->publish(topic_pub_position, (char*) buf, strlen(buf)+1, MQTT::QOS1);
                if (rc != 0) {
                    pc_comm.printf("Failed to publish AccCar info to: %s", topic_pub_position);
                } else {
                    pc_comm.printf("Message sent! (%d, %d, %d)\r\n", (int)buf[0], (int)buf[1], (int)buf[2]);
                }
            }
//            q->pop();
//            mutex->unlock();
            this->client->yield(500); // wait for 100ms
        } else {
            mutex->unlock();
            ThisThread::sleep_for(500);
        }
    }
    /*
    while(true) {
        pc_comm.printf("queue size = %d\r\n", q_car.count());
        osEvent evt = q_car.get();
        if (evt.status == osEventMessage) {
            pc_comm.printf("Gets one message.\r\n");
            message_t *message = (message_t*)evt.value.p;
//            char* topic = message->topic;
            char* topic = "Rahman/Sync/Send/1";
            
            if (message->speed == -1) {
                char buf[1];
                buf[0] = (char) message->position; // update number
                
                int rc = this->client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1);
                if (rc != 0) {
                    pc_comm.printf("Failed to publish Road info: %s", topic);
                } else {
                    pc_comm.printf("Message sent!\r\n");
                }
            } else {
                char buf[2];
                buf[0] = (char) message->position;
                buf[1] = (char) message->speed;
                
                int rc = this->client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1);
                if (rc != 0) {
                    pc_comm.printf("Failed to publish AccCar info: %s", topic);
                } else {
                    pc_comm.printf("Message sent!\r\n");
                }
            }
            mpool.free(message);
            this->client->yield(100); // wait for 100ms
        }
    }
    */
}

void Communication::reset() {
    control_flags.clear();
    sync_flags.clear();
    
    if (thread != NULL) {
        thread->terminate();
    }

    thread = new Thread();
    thread->start( callback(this, &Communication::start) );
}

void Communication::stop() {
    if (thread != NULL) {
        thread->terminate();
    }
}