not running
Embed:
(wiki syntax)
Show/hide line numbers
Communication.cpp
00001 #include "Communication.h" 00002 00003 Communication* Communication::singleton = NULL; 00004 Thread* Communication::thread = NULL; 00005 int Communication::speeds[5] = {0, 0, 0, 0, 0}; 00006 int Communication::sync = 0; 00007 int Communication::flags[5] = {0x01, 0x02, 0x04, 0x08, 0x10}; 00008 EventFlags Communication::control_flags; 00009 EventFlags Communication::sync_flags; 00010 00011 Serial pc_comm(USBTX, USBRX); 00012 //Mutex mutex; 00013 //std::vector<message_t> v; 00014 //std::queue<message_t> q; 00015 //Queue<message_t, 16> q_car; 00016 //MemoryPool<message_t, 16> mpool; 00017 00018 Communication* Communication::getInstance(char* t1, char* t2, char* t3, char* t4) { 00019 if (singleton == NULL) { 00020 singleton = new Communication(t1, t2, t3, t4); 00021 // Communication(); 00022 } 00023 return singleton; 00024 } 00025 00026 Communication::Communication(char* t1, char* t2, char* t3, char* t4) 00027 { 00028 // singleton = this; 00029 mutex = new Mutex(); 00030 q = new std::queue<message_t>(); 00031 setup_wifi(); 00032 if (wifi == NULL) { 00033 pc_comm.printf("Failed to set up Wifi."); 00034 } 00035 network = new MQTTNetwork(wifi); 00036 setup_mqtt(network); 00037 if (this->client == NULL) { 00038 pc_comm.printf("Failed to set up Client."); 00039 } 00040 topic_pub_position = t1; 00041 topic_sub_control = t2; 00042 topic_pub_receive = t3; 00043 topic_sub_send = t4; 00044 subscribe_to_topic_control(topic_sub_control); 00045 subscribe_to_topic_sync(topic_sub_send); 00046 } 00047 00048 void Communication::setup_wifi() { 00049 // Get a handle to the WiFi module 00050 this->wifi = WiFiInterface::get_default_instance(); 00051 00052 // Connect the module to the wifi, based on the SSID and password 00053 // specified in the mbed_app.json configuration file 00054 // If you are using AirPennNet-Device, this will not succeed until the MAC 00055 // address (printed shortly after this) is registered 00056 pc_comm.printf("Connecting to wifi\r\n"); 00057 int rc = wifi->connect(MBED_CONF_APP_WIFI_SSID, MBED_CONF_APP_WIFI_PASSWORD, NSAPI_SECURITY_WPA_WPA2); 00058 pc_comm.printf("Finished connected to wifi\r\n"); 00059 // Print out the MAC address of the wifi module. The MAC address is 00060 // needed to register the device with AirPennNet-Device, so that you 00061 // can use the campus wifi 00062 pc_comm.printf("MAC Address: "); 00063 pc_comm.printf(this->wifi->get_mac_address()); 00064 pc_comm.printf("\r\n"); 00065 00066 if (rc != 0) { 00067 pc_comm.printf("Problem connecting to wifi\r\n"); 00068 return; 00069 } else { 00070 pc_comm.printf("Wifi connected\r\n"); 00071 } 00072 } 00073 00074 00075 void Communication::setup_mqtt(MQTTNetwork* network) { 00076 // the hostname and port point to a Google Cloud MQTT server we setup for 00077 // this project 00078 const char* hostname = "34.68.206.11"; 00079 int port = 1883; 00080 00081 // Create the underlying network connection to the MQTT server 00082 pc_comm.printf("Connecting to %s:%d\r\n", hostname, port); 00083 int rc = network->connect(hostname, port); 00084 if (rc != 0) { 00085 pc_comm.printf("There was an error with the TCP connect: %d\r\n", rc); 00086 return; 00087 } 00088 00089 pc_comm.printf("Connected to %s:%d\r\n", hostname, port); 00090 00091 // Connect the MQTT client to the server 00092 this->client = new MQTT::Client<MQTTNetwork, Countdown>(*network); 00093 rc = this->client->connect(); 00094 if (rc != 0) { 00095 pc_comm.printf("There was an error with the MQTT connect: %d\r\n", rc); 00096 return; 00097 } 00098 00099 pc_comm.printf("MQTT connect successful!\r\n"); 00100 } 00101 00102 void Communication::message_arrived(MQTT::MessageData& md) 00103 { 00104 MQTT::Message &message = md.message; 00105 pc_comm.printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id); 00106 pc_comm.printf("Payload %.*s\r\n", message.payloadlen, (char*)message.payload); 00107 } 00108 00109 void Communication::control_arrived(MQTT::MessageData& md) { 00110 MQTT::Message &message = md.message; 00111 00112 char* payload = (char*) message.payload; 00113 00114 int id = (int) payload[0]; 00115 int speed = (int) payload[1]; 00116 pc_comm.printf("Control arrived for id %d, speed %d\r\n", id, speed); 00117 00118 speeds[id-1] = speed; 00119 control_flags.set(flags[id-1]); 00120 } 00121 00122 void Communication::sync_arrived(MQTT::MessageData& md) { 00123 MQTT::Message &message = md.message; 00124 00125 char* payload = (char*) message.payload; 00126 sync = (int) payload[0]; 00127 pc_comm.printf("Sync arrived with number %d\r\n", sync); 00128 sync_flags.set(0x01); 00129 } 00130 00131 int Communication::send_message(char* topic) { 00132 pc_comm.printf("Sending a message!\r\n"); 00133 MQTT::Message message; 00134 00135 char buf[100]; 00136 sprintf(buf, "Hello World! This is a test message!\r\n"); 00137 if (this->client == NULL) { 00138 pc_comm.printf("CLIENT IS NULL!!!\r\n"); 00139 } 00140 int rc = this->client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1); 00141 00142 if (rc != 0) { 00143 pc_comm.printf("Message failed!\r\n"); 00144 return -1; 00145 } else { 00146 pc_comm.printf("Message sent!\r\n"); 00147 return 0; 00148 } 00149 } 00150 00151 00152 int Communication::subscribe_to_topic_control(char* topic) { 00153 pc_comm.printf("subcribing: %s\r\n", topic); 00154 int rc = this->client->subscribe(topic, MQTT::QOS1, control_arrived); 00155 if (rc != 0) { 00156 pc_comm.printf("There was a problem subscribing: %d\r\n", rc); 00157 00158 // client->disconnect(); 00159 return -1; 00160 } else { 00161 pc_comm.printf("Subscribed!\r\n"); 00162 } 00163 return rc; 00164 } 00165 00166 int Communication::subscribe_to_topic_sync(char* topic) { 00167 pc_comm.printf("subcribing: %s\r\n", topic); 00168 int rc = this->client->subscribe(topic, MQTT::QOS1, sync_arrived); 00169 if (rc != 0) { 00170 pc_comm.printf("There was a problem subscribing: %d\r\n", rc); 00171 00172 // client->disconnect(); 00173 return -1; 00174 } else { 00175 pc_comm.printf("Subscribed!\r\n"); 00176 } 00177 return rc; 00178 } 00179 00180 void Communication::publish_car(int id, int speed, int position) { 00181 message_t message; 00182 message.id = (char) id; 00183 message.speed = (char) speed; 00184 message.position = (char) position; 00185 00186 pc_comm.printf("publish_car:%d,%d,%d\r\n",id, speed, position); 00187 mutex->lock(); 00188 q->push(message); 00189 mutex->unlock(); 00190 } 00191 00192 void Communication::publish_road(int num) { 00193 message_t message; 00194 message.id = -1; // indicate this is for sync 00195 message.position = (char) num; // use position as road update number 00196 00197 mutex->lock(); 00198 q->push(message); 00199 mutex->unlock(); 00200 } 00201 00202 void Communication::start() { 00203 while (true) { 00204 mutex->lock(); 00205 // pc_comm.printf("queue size = %d\r\n", q.size()); 00206 if (q->size() > 0) { 00207 message_t &message = q->front(); 00208 q->pop(); 00209 mutex->unlock(); 00210 if (message.id == 255 || message.id == -1) { 00211 char buf[2]; 00212 buf[0] = (char) message.position; // update number 00213 buf[1] = '\0'; 00214 00215 int rc = this->client->publish(topic_pub_receive, (char*) buf, strlen(buf)+1, MQTT::QOS1); 00216 if (rc != 0) { 00217 pc_comm.printf("Failed to publish Road info to: %s", topic_pub_receive); 00218 } else { 00219 pc_comm.printf("Message sent! %s(%d)\r\n", topic_pub_receive, (int)buf[0]); 00220 } 00221 } else { 00222 char buf[4]; 00223 buf[0] = message.id; 00224 buf[1] = message.position; 00225 buf[2] = message.speed; 00226 buf[3] = '\0'; 00227 00228 int rc = this->client->publish(topic_pub_position, (char*) buf, strlen(buf)+1, MQTT::QOS1); 00229 if (rc != 0) { 00230 pc_comm.printf("Failed to publish AccCar info to: %s", topic_pub_position); 00231 } else { 00232 pc_comm.printf("Message sent! (%d, %d, %d)\r\n", (int)buf[0], (int)buf[1], (int)buf[2]); 00233 } 00234 } 00235 // q->pop(); 00236 // mutex->unlock(); 00237 this->client->yield(500); // wait for 100ms 00238 } else { 00239 mutex->unlock(); 00240 ThisThread::sleep_for(500); 00241 } 00242 } 00243 /* 00244 while(true) { 00245 pc_comm.printf("queue size = %d\r\n", q_car.count()); 00246 osEvent evt = q_car.get(); 00247 if (evt.status == osEventMessage) { 00248 pc_comm.printf("Gets one message.\r\n"); 00249 message_t *message = (message_t*)evt.value.p; 00250 // char* topic = message->topic; 00251 char* topic = "Rahman/Sync/Send/1"; 00252 00253 if (message->speed == -1) { 00254 char buf[1]; 00255 buf[0] = (char) message->position; // update number 00256 00257 int rc = this->client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1); 00258 if (rc != 0) { 00259 pc_comm.printf("Failed to publish Road info: %s", topic); 00260 } else { 00261 pc_comm.printf("Message sent!\r\n"); 00262 } 00263 } else { 00264 char buf[2]; 00265 buf[0] = (char) message->position; 00266 buf[1] = (char) message->speed; 00267 00268 int rc = this->client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1); 00269 if (rc != 0) { 00270 pc_comm.printf("Failed to publish AccCar info: %s", topic); 00271 } else { 00272 pc_comm.printf("Message sent!\r\n"); 00273 } 00274 } 00275 mpool.free(message); 00276 this->client->yield(100); // wait for 100ms 00277 } 00278 } 00279 */ 00280 } 00281 00282 void Communication::reset() { 00283 control_flags.clear(); 00284 sync_flags.clear(); 00285 00286 if (thread != NULL) { 00287 thread->terminate(); 00288 } 00289 00290 thread = new Thread(); 00291 thread->start( callback(this, &Communication::start) ); 00292 } 00293 00294 void Communication::stop() { 00295 if (thread != NULL) { 00296 thread->terminate(); 00297 } 00298 }
Generated on Fri Jul 22 2022 13:26:47 by
1.7.2