not running

Dependencies:   TextLCD MQTT

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers Communication.cpp Source File

Communication.cpp

00001 #include "Communication.h"
00002 
00003 Communication* Communication::singleton = NULL;
00004 Thread* Communication::thread = NULL;
00005 int Communication::speeds[5] = {0, 0, 0, 0, 0};
00006 int Communication::sync = 0;
00007 int Communication::flags[5] = {0x01, 0x02, 0x04, 0x08, 0x10};
00008 EventFlags Communication::control_flags;
00009 EventFlags Communication::sync_flags;
00010 
00011 Serial pc_comm(USBTX, USBRX);
00012 //Mutex mutex;
00013 //std::vector<message_t> v;
00014 //std::queue<message_t> q;
00015 //Queue<message_t, 16> q_car;
00016 //MemoryPool<message_t, 16> mpool;
00017 
00018 Communication* Communication::getInstance(char* t1, char* t2, char* t3, char* t4) {
00019     if (singleton == NULL) {
00020         singleton = new Communication(t1, t2, t3, t4);
00021 //        Communication();
00022     }
00023     return singleton;
00024 }
00025 
00026 Communication::Communication(char* t1, char* t2, char* t3, char* t4)
00027 {
00028 //    singleton = this;
00029     mutex = new Mutex();
00030     q = new std::queue<message_t>();
00031     setup_wifi();
00032     if (wifi == NULL) {
00033         pc_comm.printf("Failed to set up Wifi.");
00034     }
00035     network = new MQTTNetwork(wifi);
00036     setup_mqtt(network);
00037     if (this->client == NULL) {
00038         pc_comm.printf("Failed to set up Client.");
00039     }
00040     topic_pub_position = t1;
00041     topic_sub_control = t2;
00042     topic_pub_receive = t3;
00043     topic_sub_send = t4;
00044     subscribe_to_topic_control(topic_sub_control);
00045     subscribe_to_topic_sync(topic_sub_send);
00046 }
00047 
00048 void Communication::setup_wifi() {
00049     // Get a handle to the WiFi module
00050     this->wifi = WiFiInterface::get_default_instance();
00051     
00052     // Connect the module to the wifi, based on the SSID and password 
00053     // specified in the mbed_app.json configuration file
00054     // If you are using AirPennNet-Device, this will not succeed until the MAC
00055     // address (printed shortly after this) is registered
00056     pc_comm.printf("Connecting to wifi\r\n");
00057     int rc = wifi->connect(MBED_CONF_APP_WIFI_SSID, MBED_CONF_APP_WIFI_PASSWORD, NSAPI_SECURITY_WPA_WPA2);
00058     pc_comm.printf("Finished connected to wifi\r\n");
00059     // Print out the MAC address of the wifi module. The MAC address is 
00060     // needed to register the device with AirPennNet-Device, so that you
00061     // can use the campus wifi
00062     pc_comm.printf("MAC Address: ");
00063     pc_comm.printf(this->wifi->get_mac_address());
00064     pc_comm.printf("\r\n");
00065     
00066     if (rc != 0) {
00067         pc_comm.printf("Problem connecting to wifi\r\n");   
00068         return;
00069     } else {
00070         pc_comm.printf("Wifi connected\r\n");  
00071     }
00072 }
00073 
00074 
00075 void Communication::setup_mqtt(MQTTNetwork* network) {
00076     // the hostname and port point to a Google Cloud MQTT server we setup for
00077     // this project
00078     const char* hostname = "34.68.206.11";
00079     int port = 1883;
00080     
00081     // Create the underlying network connection to the MQTT server
00082     pc_comm.printf("Connecting to %s:%d\r\n", hostname, port);
00083     int rc = network->connect(hostname, port);
00084     if (rc != 0) {
00085         pc_comm.printf("There was an error with the TCP connect: %d\r\n", rc);
00086         return;
00087     }
00088     
00089     pc_comm.printf("Connected to %s:%d\r\n", hostname, port);
00090         
00091     // Connect the MQTT client to the server
00092     this->client = new MQTT::Client<MQTTNetwork, Countdown>(*network);
00093     rc = this->client->connect();
00094     if (rc != 0) {
00095         pc_comm.printf("There was an error with the MQTT connect: %d\r\n", rc);
00096         return;
00097     }
00098     
00099     pc_comm.printf("MQTT connect successful!\r\n");
00100 }
00101 
00102 void Communication::message_arrived(MQTT::MessageData& md)
00103 {
00104     MQTT::Message &message = md.message;
00105     pc_comm.printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
00106     pc_comm.printf("Payload %.*s\r\n", message.payloadlen, (char*)message.payload);
00107 }
00108 
00109 void Communication::control_arrived(MQTT::MessageData& md) {
00110     MQTT::Message &message = md.message;
00111     
00112     char* payload = (char*) message.payload;
00113     
00114     int id = (int) payload[0];
00115     int speed = (int) payload[1];
00116     pc_comm.printf("Control arrived for id %d, speed %d\r\n", id, speed);
00117     
00118     speeds[id-1] = speed;
00119     control_flags.set(flags[id-1]);
00120 }
00121 
00122 void Communication::sync_arrived(MQTT::MessageData& md) {
00123     MQTT::Message &message = md.message;
00124     
00125     char* payload = (char*) message.payload;
00126     sync = (int) payload[0];
00127     pc_comm.printf("Sync arrived with number %d\r\n", sync);
00128     sync_flags.set(0x01);
00129 }
00130 
00131 int Communication::send_message(char* topic) {
00132     pc_comm.printf("Sending a message!\r\n");
00133     MQTT::Message message;
00134  
00135     char buf[100];
00136     sprintf(buf, "Hello World!  This is a test message!\r\n");
00137     if (this->client == NULL) {
00138         pc_comm.printf("CLIENT IS NULL!!!\r\n");
00139     }
00140     int rc = this->client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1);
00141     
00142     if (rc != 0) {
00143         pc_comm.printf("Message failed!\r\n");
00144         return -1;   
00145     } else {
00146         pc_comm.printf("Message sent!\r\n");
00147         return 0;
00148     }
00149 }
00150 
00151 
00152 int Communication::subscribe_to_topic_control(char* topic) {
00153     pc_comm.printf("subcribing: %s\r\n", topic);
00154     int rc = this->client->subscribe(topic, MQTT::QOS1, control_arrived);
00155     if (rc != 0) {
00156         pc_comm.printf("There was a problem subscribing: %d\r\n", rc);
00157         
00158 //        client->disconnect();
00159         return -1;
00160     } else {
00161         pc_comm.printf("Subscribed!\r\n");
00162     }
00163     return rc;
00164 }
00165 
00166 int Communication::subscribe_to_topic_sync(char* topic) {
00167     pc_comm.printf("subcribing: %s\r\n", topic);
00168     int rc = this->client->subscribe(topic, MQTT::QOS1, sync_arrived);
00169     if (rc != 0) {
00170         pc_comm.printf("There was a problem subscribing: %d\r\n", rc);
00171         
00172 //        client->disconnect();
00173         return -1;
00174     } else {
00175         pc_comm.printf("Subscribed!\r\n");
00176     }
00177     return rc;
00178 }
00179     
00180 void Communication::publish_car(int id, int speed, int position) {
00181     message_t message;
00182     message.id = (char) id;
00183     message.speed = (char) speed;
00184     message.position = (char) position;
00185     
00186     pc_comm.printf("publish_car:%d,%d,%d\r\n",id, speed, position);
00187     mutex->lock();
00188     q->push(message);
00189     mutex->unlock();
00190 }
00191 
00192 void Communication::publish_road(int num) {
00193     message_t message;
00194     message.id = -1; // indicate this is for sync
00195     message.position = (char) num; // use position as road update number
00196     
00197     mutex->lock();
00198     q->push(message);
00199     mutex->unlock();
00200 }
00201 
00202 void Communication::start() {
00203     while (true) {
00204         mutex->lock();
00205 //        pc_comm.printf("queue size = %d\r\n", q.size());
00206         if (q->size() > 0) {
00207             message_t &message = q->front();
00208             q->pop();
00209             mutex->unlock();
00210             if (message.id == 255 || message.id == -1) {
00211                 char buf[2];
00212                 buf[0] = (char) message.position; // update number
00213                 buf[1] = '\0';
00214                 
00215                 int rc = this->client->publish(topic_pub_receive, (char*) buf, strlen(buf)+1, MQTT::QOS1);
00216                 if (rc != 0) {
00217                     pc_comm.printf("Failed to publish Road info to: %s", topic_pub_receive);
00218                 } else {
00219                     pc_comm.printf("Message sent! %s(%d)\r\n", topic_pub_receive, (int)buf[0]);
00220                 }
00221             } else {
00222                 char buf[4];
00223                 buf[0] = message.id;
00224                 buf[1] = message.position;
00225                 buf[2] = message.speed;
00226                 buf[3] = '\0';
00227                 
00228                 int rc = this->client->publish(topic_pub_position, (char*) buf, strlen(buf)+1, MQTT::QOS1);
00229                 if (rc != 0) {
00230                     pc_comm.printf("Failed to publish AccCar info to: %s", topic_pub_position);
00231                 } else {
00232                     pc_comm.printf("Message sent! (%d, %d, %d)\r\n", (int)buf[0], (int)buf[1], (int)buf[2]);
00233                 }
00234             }
00235 //            q->pop();
00236 //            mutex->unlock();
00237             this->client->yield(500); // wait for 100ms
00238         } else {
00239             mutex->unlock();
00240             ThisThread::sleep_for(500);
00241         }
00242     }
00243     /*
00244     while(true) {
00245         pc_comm.printf("queue size = %d\r\n", q_car.count());
00246         osEvent evt = q_car.get();
00247         if (evt.status == osEventMessage) {
00248             pc_comm.printf("Gets one message.\r\n");
00249             message_t *message = (message_t*)evt.value.p;
00250 //            char* topic = message->topic;
00251             char* topic = "Rahman/Sync/Send/1";
00252             
00253             if (message->speed == -1) {
00254                 char buf[1];
00255                 buf[0] = (char) message->position; // update number
00256                 
00257                 int rc = this->client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1);
00258                 if (rc != 0) {
00259                     pc_comm.printf("Failed to publish Road info: %s", topic);
00260                 } else {
00261                     pc_comm.printf("Message sent!\r\n");
00262                 }
00263             } else {
00264                 char buf[2];
00265                 buf[0] = (char) message->position;
00266                 buf[1] = (char) message->speed;
00267                 
00268                 int rc = this->client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1);
00269                 if (rc != 0) {
00270                     pc_comm.printf("Failed to publish AccCar info: %s", topic);
00271                 } else {
00272                     pc_comm.printf("Message sent!\r\n");
00273                 }
00274             }
00275             mpool.free(message);
00276             this->client->yield(100); // wait for 100ms
00277         }
00278     }
00279     */
00280 }
00281 
00282 void Communication::reset() {
00283     control_flags.clear();
00284     sync_flags.clear();
00285     
00286     if (thread != NULL) {
00287         thread->terminate();
00288     }
00289 
00290     thread = new Thread();
00291     thread->start( callback(this, &Communication::start) );
00292 }
00293 
00294 void Communication::stop() {
00295     if (thread != NULL) {
00296         thread->terminate();
00297     }
00298 }