Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Communication.cpp
00001 #include "Communication.h" 00002 00003 Communication* Communication::singleton = NULL; 00004 Thread* Communication::thread = NULL; 00005 int Communication::speeds[5] = {0, 0, 0, 0, 0}; 00006 int Communication::sync = 0; 00007 int Communication::flags[5] = {0x01, 0x02, 0x04, 0x08, 0x10}; 00008 EventFlags Communication::control_flags; 00009 EventFlags Communication::sync_flags; 00010 00011 Serial pc_comm(USBTX, USBRX); 00012 //Mutex mutex; 00013 //std::vector<message_t> v; 00014 //std::queue<message_t> q; 00015 //Queue<message_t, 16> q_car; 00016 //MemoryPool<message_t, 16> mpool; 00017 00018 Communication* Communication::getInstance(char* t1, char* t2, char* t3, char* t4) { 00019 if (singleton == NULL) { 00020 singleton = new Communication(t1, t2, t3, t4); 00021 // Communication(); 00022 } 00023 return singleton; 00024 } 00025 00026 Communication::Communication(char* t1, char* t2, char* t3, char* t4) 00027 { 00028 // singleton = this; 00029 mutex = new Mutex(); 00030 q = new std::queue<message_t>(); 00031 setup_wifi(); 00032 if (wifi == NULL) { 00033 pc_comm.printf("Failed to set up Wifi."); 00034 } 00035 network = new MQTTNetwork(wifi); 00036 setup_mqtt(network); 00037 if (this->client == NULL) { 00038 pc_comm.printf("Failed to set up Client."); 00039 } 00040 topic_pub_position = t1; 00041 topic_sub_control = t2; 00042 topic_pub_receive = t3; 00043 topic_sub_send = t4; 00044 subscribe_to_topic_control(topic_sub_control); 00045 subscribe_to_topic_sync(topic_sub_send); 00046 } 00047 00048 void Communication::setup_wifi() { 00049 // Get a handle to the WiFi module 00050 this->wifi = WiFiInterface::get_default_instance(); 00051 00052 // Connect the module to the wifi, based on the SSID and password 00053 // specified in the mbed_app.json configuration file 00054 // If you are using AirPennNet-Device, this will not succeed until the MAC 00055 // address (printed shortly after this) is registered 00056 pc_comm.printf("Connecting to wifi\r\n"); 00057 int rc = wifi->connect(MBED_CONF_APP_WIFI_SSID, MBED_CONF_APP_WIFI_PASSWORD, NSAPI_SECURITY_WPA_WPA2); 00058 pc_comm.printf("Finished connected to wifi\r\n"); 00059 // Print out the MAC address of the wifi module. The MAC address is 00060 // needed to register the device with AirPennNet-Device, so that you 00061 // can use the campus wifi 00062 pc_comm.printf("MAC Address: "); 00063 pc_comm.printf(this->wifi->get_mac_address()); 00064 pc_comm.printf("\r\n"); 00065 00066 if (rc != 0) { 00067 pc_comm.printf("Problem connecting to wifi\r\n"); 00068 return; 00069 } else { 00070 pc_comm.printf("Wifi connected\r\n"); 00071 } 00072 } 00073 00074 00075 void Communication::setup_mqtt(MQTTNetwork* network) { 00076 // the hostname and port point to a Google Cloud MQTT server we setup for 00077 // this project 00078 const char* hostname = "34.68.206.11"; 00079 int port = 1883; 00080 00081 // Create the underlying network connection to the MQTT server 00082 pc_comm.printf("Connecting to %s:%d\r\n", hostname, port); 00083 int rc = network->connect(hostname, port); 00084 if (rc != 0) { 00085 pc_comm.printf("There was an error with the TCP connect: %d\r\n", rc); 00086 return; 00087 } 00088 00089 pc_comm.printf("Connected to %s:%d\r\n", hostname, port); 00090 00091 // Connect the MQTT client to the server 00092 this->client = new MQTT::Client<MQTTNetwork, Countdown>(*network); 00093 rc = this->client->connect(); 00094 if (rc != 0) { 00095 pc_comm.printf("There was an error with the MQTT connect: %d\r\n", rc); 00096 return; 00097 } 00098 00099 pc_comm.printf("MQTT connect successful!\r\n"); 00100 } 00101 00102 void Communication::message_arrived(MQTT::MessageData& md) 00103 { 00104 MQTT::Message &message = md.message; 00105 pc_comm.printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id); 00106 pc_comm.printf("Payload %.*s\r\n", message.payloadlen, (char*)message.payload); 00107 } 00108 00109 void Communication::control_arrived(MQTT::MessageData& md) { 00110 MQTT::Message &message = md.message; 00111 00112 char* payload = (char*) message.payload; 00113 00114 int id = (int) payload[0]; 00115 int speed = (int) payload[1]; 00116 pc_comm.printf("Control arrived for id %d, speed %d\r\n", id, speed); 00117 00118 speeds[id-1] = speed; 00119 control_flags.set(flags[id-1]); 00120 } 00121 00122 void Communication::sync_arrived(MQTT::MessageData& md) { 00123 MQTT::Message &message = md.message; 00124 00125 char* payload = (char*) message.payload; 00126 sync = (int) payload[0]; 00127 pc_comm.printf("Sync arrived with number %d\r\n", sync); 00128 sync_flags.set(0x01); 00129 } 00130 00131 int Communication::send_message(char* topic) { 00132 pc_comm.printf("Sending a message!\r\n"); 00133 MQTT::Message message; 00134 00135 char buf[100]; 00136 sprintf(buf, "Hello World! This is a test message!\r\n"); 00137 if (this->client == NULL) { 00138 pc_comm.printf("CLIENT IS NULL!!!\r\n"); 00139 } 00140 int rc = this->client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1); 00141 00142 if (rc != 0) { 00143 pc_comm.printf("Message failed!\r\n"); 00144 return -1; 00145 } else { 00146 pc_comm.printf("Message sent!\r\n"); 00147 return 0; 00148 } 00149 } 00150 00151 00152 int Communication::subscribe_to_topic_control(char* topic) { 00153 pc_comm.printf("subcribing: %s\r\n", topic); 00154 int rc = this->client->subscribe(topic, MQTT::QOS1, control_arrived); 00155 if (rc != 0) { 00156 pc_comm.printf("There was a problem subscribing: %d\r\n", rc); 00157 00158 // client->disconnect(); 00159 return -1; 00160 } else { 00161 pc_comm.printf("Subscribed!\r\n"); 00162 } 00163 return rc; 00164 } 00165 00166 int Communication::subscribe_to_topic_sync(char* topic) { 00167 pc_comm.printf("subcribing: %s\r\n", topic); 00168 int rc = this->client->subscribe(topic, MQTT::QOS1, sync_arrived); 00169 if (rc != 0) { 00170 pc_comm.printf("There was a problem subscribing: %d\r\n", rc); 00171 00172 // client->disconnect(); 00173 return -1; 00174 } else { 00175 pc_comm.printf("Subscribed!\r\n"); 00176 } 00177 return rc; 00178 } 00179 00180 void Communication::publish_car(int id, int speed, int position) { 00181 message_t message; 00182 message.id = (char) id; 00183 message.speed = (char) speed; 00184 message.position = (char) position; 00185 00186 pc_comm.printf("publish_car:%d,%d,%d\r\n",id, speed, position); 00187 mutex->lock(); 00188 q->push(message); 00189 mutex->unlock(); 00190 } 00191 00192 void Communication::publish_road(int num) { 00193 message_t message; 00194 message.id = -1; // indicate this is for sync 00195 message.position = (char) num; // use position as road update number 00196 00197 mutex->lock(); 00198 q->push(message); 00199 mutex->unlock(); 00200 } 00201 00202 void Communication::start() { 00203 while (true) { 00204 mutex->lock(); 00205 // pc_comm.printf("queue size = %d\r\n", q.size()); 00206 if (q->size() > 0) { 00207 message_t &message = q->front(); 00208 q->pop(); 00209 mutex->unlock(); 00210 if (message.id == 255 || message.id == -1) { 00211 char buf[2]; 00212 buf[0] = (char) message.position; // update number 00213 buf[1] = '\0'; 00214 00215 int rc = this->client->publish(topic_pub_receive, (char*) buf, strlen(buf)+1, MQTT::QOS1); 00216 if (rc != 0) { 00217 pc_comm.printf("Failed to publish Road info to: %s", topic_pub_receive); 00218 } else { 00219 pc_comm.printf("Message sent! %s(%d)\r\n", topic_pub_receive, (int)buf[0]); 00220 } 00221 } else { 00222 char buf[4]; 00223 buf[0] = message.id; 00224 buf[1] = message.position; 00225 buf[2] = message.speed; 00226 buf[3] = '\0'; 00227 00228 int rc = this->client->publish(topic_pub_position, (char*) buf, strlen(buf)+1, MQTT::QOS1); 00229 if (rc != 0) { 00230 pc_comm.printf("Failed to publish AccCar info to: %s", topic_pub_position); 00231 } else { 00232 pc_comm.printf("Message sent! (%d, %d, %d)\r\n", (int)buf[0], (int)buf[1], (int)buf[2]); 00233 } 00234 } 00235 // q->pop(); 00236 // mutex->unlock(); 00237 this->client->yield(500); // wait for 100ms 00238 } else { 00239 mutex->unlock(); 00240 ThisThread::sleep_for(500); 00241 } 00242 } 00243 /* 00244 while(true) { 00245 pc_comm.printf("queue size = %d\r\n", q_car.count()); 00246 osEvent evt = q_car.get(); 00247 if (evt.status == osEventMessage) { 00248 pc_comm.printf("Gets one message.\r\n"); 00249 message_t *message = (message_t*)evt.value.p; 00250 // char* topic = message->topic; 00251 char* topic = "Rahman/Sync/Send/1"; 00252 00253 if (message->speed == -1) { 00254 char buf[1]; 00255 buf[0] = (char) message->position; // update number 00256 00257 int rc = this->client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1); 00258 if (rc != 0) { 00259 pc_comm.printf("Failed to publish Road info: %s", topic); 00260 } else { 00261 pc_comm.printf("Message sent!\r\n"); 00262 } 00263 } else { 00264 char buf[2]; 00265 buf[0] = (char) message->position; 00266 buf[1] = (char) message->speed; 00267 00268 int rc = this->client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1); 00269 if (rc != 0) { 00270 pc_comm.printf("Failed to publish AccCar info: %s", topic); 00271 } else { 00272 pc_comm.printf("Message sent!\r\n"); 00273 } 00274 } 00275 mpool.free(message); 00276 this->client->yield(100); // wait for 100ms 00277 } 00278 } 00279 */ 00280 } 00281 00282 void Communication::reset() { 00283 control_flags.clear(); 00284 sync_flags.clear(); 00285 00286 if (thread != NULL) { 00287 thread->terminate(); 00288 } 00289 00290 thread = new Thread(); 00291 thread->start( callback(this, &Communication::start) ); 00292 } 00293 00294 void Communication::stop() { 00295 if (thread != NULL) { 00296 thread->terminate(); 00297 } 00298 }
Generated on Fri Jul 22 2022 13:26:47 by
1.7.2