not running

Dependencies:   TextLCD MQTT

Committer:
hyan99
Date:
Wed Dec 11 20:20:12 2019 +0000
Revision:
2:16b3bd337db2
Parent:
1:19c3299ea83a
testing

Who changed what in which revision?

UserRevisionLine numberNew contents of line
hyan99 0:3b4906b8a747 1 #include "Communication.h"
hyan99 0:3b4906b8a747 2
hyan99 0:3b4906b8a747 3 Communication* Communication::singleton = NULL;
hyan99 2:16b3bd337db2 4 Thread* Communication::thread = NULL;
hyan99 0:3b4906b8a747 5 int Communication::speeds[5] = {0, 0, 0, 0, 0};
hyan99 0:3b4906b8a747 6 int Communication::sync = 0;
hyan99 0:3b4906b8a747 7 int Communication::flags[5] = {0x01, 0x02, 0x04, 0x08, 0x10};
hyan99 0:3b4906b8a747 8 EventFlags Communication::control_flags;
hyan99 0:3b4906b8a747 9 EventFlags Communication::sync_flags;
hyan99 0:3b4906b8a747 10
hyan99 0:3b4906b8a747 11 Serial pc_comm(USBTX, USBRX);
hyan99 2:16b3bd337db2 12 //Mutex mutex;
hyan99 2:16b3bd337db2 13 //std::vector<message_t> v;
hyan99 2:16b3bd337db2 14 //std::queue<message_t> q;
hyan99 2:16b3bd337db2 15 //Queue<message_t, 16> q_car;
hyan99 2:16b3bd337db2 16 //MemoryPool<message_t, 16> mpool;
hyan99 0:3b4906b8a747 17
hyan99 2:16b3bd337db2 18 Communication* Communication::getInstance(char* t1, char* t2, char* t3, char* t4) {
hyan99 0:3b4906b8a747 19 if (singleton == NULL) {
hyan99 2:16b3bd337db2 20 singleton = new Communication(t1, t2, t3, t4);
hyan99 2:16b3bd337db2 21 // Communication();
hyan99 0:3b4906b8a747 22 }
hyan99 0:3b4906b8a747 23 return singleton;
hyan99 0:3b4906b8a747 24 }
hyan99 0:3b4906b8a747 25
hyan99 2:16b3bd337db2 26 Communication::Communication(char* t1, char* t2, char* t3, char* t4)
hyan99 0:3b4906b8a747 27 {
hyan99 1:19c3299ea83a 28 // singleton = this;
hyan99 2:16b3bd337db2 29 mutex = new Mutex();
hyan99 2:16b3bd337db2 30 q = new std::queue<message_t>();
hyan99 0:3b4906b8a747 31 setup_wifi();
hyan99 0:3b4906b8a747 32 if (wifi == NULL) {
hyan99 0:3b4906b8a747 33 pc_comm.printf("Failed to set up Wifi.");
hyan99 0:3b4906b8a747 34 }
hyan99 2:16b3bd337db2 35 network = new MQTTNetwork(wifi);
hyan99 0:3b4906b8a747 36 setup_mqtt(network);
hyan99 2:16b3bd337db2 37 if (this->client == NULL) {
hyan99 0:3b4906b8a747 38 pc_comm.printf("Failed to set up Client.");
hyan99 0:3b4906b8a747 39 }
hyan99 2:16b3bd337db2 40 topic_pub_position = t1;
hyan99 2:16b3bd337db2 41 topic_sub_control = t2;
hyan99 2:16b3bd337db2 42 topic_pub_receive = t3;
hyan99 2:16b3bd337db2 43 topic_sub_send = t4;
hyan99 2:16b3bd337db2 44 subscribe_to_topic_control(topic_sub_control);
hyan99 2:16b3bd337db2 45 subscribe_to_topic_sync(topic_sub_send);
hyan99 0:3b4906b8a747 46 }
hyan99 0:3b4906b8a747 47
hyan99 0:3b4906b8a747 48 void Communication::setup_wifi() {
hyan99 0:3b4906b8a747 49 // Get a handle to the WiFi module
hyan99 2:16b3bd337db2 50 this->wifi = WiFiInterface::get_default_instance();
hyan99 0:3b4906b8a747 51
hyan99 0:3b4906b8a747 52 // Connect the module to the wifi, based on the SSID and password
hyan99 0:3b4906b8a747 53 // specified in the mbed_app.json configuration file
hyan99 0:3b4906b8a747 54 // If you are using AirPennNet-Device, this will not succeed until the MAC
hyan99 0:3b4906b8a747 55 // address (printed shortly after this) is registered
hyan99 0:3b4906b8a747 56 pc_comm.printf("Connecting to wifi\r\n");
hyan99 0:3b4906b8a747 57 int rc = wifi->connect(MBED_CONF_APP_WIFI_SSID, MBED_CONF_APP_WIFI_PASSWORD, NSAPI_SECURITY_WPA_WPA2);
hyan99 1:19c3299ea83a 58 pc_comm.printf("Finished connected to wifi\r\n");
hyan99 0:3b4906b8a747 59 // Print out the MAC address of the wifi module. The MAC address is
hyan99 0:3b4906b8a747 60 // needed to register the device with AirPennNet-Device, so that you
hyan99 0:3b4906b8a747 61 // can use the campus wifi
hyan99 0:3b4906b8a747 62 pc_comm.printf("MAC Address: ");
hyan99 2:16b3bd337db2 63 pc_comm.printf(this->wifi->get_mac_address());
hyan99 0:3b4906b8a747 64 pc_comm.printf("\r\n");
hyan99 0:3b4906b8a747 65
hyan99 0:3b4906b8a747 66 if (rc != 0) {
hyan99 0:3b4906b8a747 67 pc_comm.printf("Problem connecting to wifi\r\n");
hyan99 0:3b4906b8a747 68 return;
hyan99 0:3b4906b8a747 69 } else {
hyan99 0:3b4906b8a747 70 pc_comm.printf("Wifi connected\r\n");
hyan99 0:3b4906b8a747 71 }
hyan99 0:3b4906b8a747 72 }
hyan99 0:3b4906b8a747 73
hyan99 0:3b4906b8a747 74
hyan99 2:16b3bd337db2 75 void Communication::setup_mqtt(MQTTNetwork* network) {
hyan99 0:3b4906b8a747 76 // the hostname and port point to a Google Cloud MQTT server we setup for
hyan99 0:3b4906b8a747 77 // this project
hyan99 0:3b4906b8a747 78 const char* hostname = "34.68.206.11";
hyan99 0:3b4906b8a747 79 int port = 1883;
hyan99 0:3b4906b8a747 80
hyan99 0:3b4906b8a747 81 // Create the underlying network connection to the MQTT server
hyan99 0:3b4906b8a747 82 pc_comm.printf("Connecting to %s:%d\r\n", hostname, port);
hyan99 2:16b3bd337db2 83 int rc = network->connect(hostname, port);
hyan99 0:3b4906b8a747 84 if (rc != 0) {
hyan99 0:3b4906b8a747 85 pc_comm.printf("There was an error with the TCP connect: %d\r\n", rc);
hyan99 0:3b4906b8a747 86 return;
hyan99 0:3b4906b8a747 87 }
hyan99 0:3b4906b8a747 88
hyan99 0:3b4906b8a747 89 pc_comm.printf("Connected to %s:%d\r\n", hostname, port);
hyan99 0:3b4906b8a747 90
hyan99 0:3b4906b8a747 91 // Connect the MQTT client to the server
hyan99 2:16b3bd337db2 92 this->client = new MQTT::Client<MQTTNetwork, Countdown>(*network);
hyan99 2:16b3bd337db2 93 rc = this->client->connect();
hyan99 0:3b4906b8a747 94 if (rc != 0) {
hyan99 0:3b4906b8a747 95 pc_comm.printf("There was an error with the MQTT connect: %d\r\n", rc);
hyan99 0:3b4906b8a747 96 return;
hyan99 0:3b4906b8a747 97 }
hyan99 0:3b4906b8a747 98
hyan99 0:3b4906b8a747 99 pc_comm.printf("MQTT connect successful!\r\n");
hyan99 0:3b4906b8a747 100 }
hyan99 0:3b4906b8a747 101
hyan99 0:3b4906b8a747 102 void Communication::message_arrived(MQTT::MessageData& md)
hyan99 0:3b4906b8a747 103 {
hyan99 0:3b4906b8a747 104 MQTT::Message &message = md.message;
hyan99 0:3b4906b8a747 105 pc_comm.printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
hyan99 0:3b4906b8a747 106 pc_comm.printf("Payload %.*s\r\n", message.payloadlen, (char*)message.payload);
hyan99 0:3b4906b8a747 107 }
hyan99 0:3b4906b8a747 108
hyan99 0:3b4906b8a747 109 void Communication::control_arrived(MQTT::MessageData& md) {
hyan99 0:3b4906b8a747 110 MQTT::Message &message = md.message;
hyan99 0:3b4906b8a747 111
hyan99 0:3b4906b8a747 112 char* payload = (char*) message.payload;
hyan99 0:3b4906b8a747 113
hyan99 0:3b4906b8a747 114 int id = (int) payload[0];
hyan99 0:3b4906b8a747 115 int speed = (int) payload[1];
hyan99 2:16b3bd337db2 116 pc_comm.printf("Control arrived for id %d, speed %d\r\n", id, speed);
hyan99 0:3b4906b8a747 117
hyan99 0:3b4906b8a747 118 speeds[id-1] = speed;
hyan99 0:3b4906b8a747 119 control_flags.set(flags[id-1]);
hyan99 0:3b4906b8a747 120 }
hyan99 0:3b4906b8a747 121
hyan99 0:3b4906b8a747 122 void Communication::sync_arrived(MQTT::MessageData& md) {
hyan99 0:3b4906b8a747 123 MQTT::Message &message = md.message;
hyan99 0:3b4906b8a747 124
hyan99 0:3b4906b8a747 125 char* payload = (char*) message.payload;
hyan99 0:3b4906b8a747 126 sync = (int) payload[0];
hyan99 2:16b3bd337db2 127 pc_comm.printf("Sync arrived with number %d\r\n", sync);
hyan99 0:3b4906b8a747 128 sync_flags.set(0x01);
hyan99 0:3b4906b8a747 129 }
hyan99 0:3b4906b8a747 130
hyan99 0:3b4906b8a747 131 int Communication::send_message(char* topic) {
hyan99 0:3b4906b8a747 132 pc_comm.printf("Sending a message!\r\n");
hyan99 0:3b4906b8a747 133 MQTT::Message message;
hyan99 0:3b4906b8a747 134
hyan99 0:3b4906b8a747 135 char buf[100];
hyan99 0:3b4906b8a747 136 sprintf(buf, "Hello World! This is a test message!\r\n");
hyan99 2:16b3bd337db2 137 if (this->client == NULL) {
hyan99 2:16b3bd337db2 138 pc_comm.printf("CLIENT IS NULL!!!\r\n");
hyan99 2:16b3bd337db2 139 }
hyan99 2:16b3bd337db2 140 int rc = this->client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1);
hyan99 0:3b4906b8a747 141
hyan99 0:3b4906b8a747 142 if (rc != 0) {
hyan99 2:16b3bd337db2 143 pc_comm.printf("Message failed!\r\n");
hyan99 0:3b4906b8a747 144 return -1;
hyan99 0:3b4906b8a747 145 } else {
hyan99 0:3b4906b8a747 146 pc_comm.printf("Message sent!\r\n");
hyan99 0:3b4906b8a747 147 return 0;
hyan99 0:3b4906b8a747 148 }
hyan99 0:3b4906b8a747 149 }
hyan99 0:3b4906b8a747 150
hyan99 0:3b4906b8a747 151
hyan99 0:3b4906b8a747 152 int Communication::subscribe_to_topic_control(char* topic) {
hyan99 1:19c3299ea83a 153 pc_comm.printf("subcribing: %s\r\n", topic);
hyan99 2:16b3bd337db2 154 int rc = this->client->subscribe(topic, MQTT::QOS1, control_arrived);
hyan99 0:3b4906b8a747 155 if (rc != 0) {
hyan99 0:3b4906b8a747 156 pc_comm.printf("There was a problem subscribing: %d\r\n", rc);
hyan99 0:3b4906b8a747 157
hyan99 0:3b4906b8a747 158 // client->disconnect();
hyan99 0:3b4906b8a747 159 return -1;
hyan99 0:3b4906b8a747 160 } else {
hyan99 0:3b4906b8a747 161 pc_comm.printf("Subscribed!\r\n");
hyan99 0:3b4906b8a747 162 }
hyan99 0:3b4906b8a747 163 return rc;
hyan99 0:3b4906b8a747 164 }
hyan99 0:3b4906b8a747 165
hyan99 0:3b4906b8a747 166 int Communication::subscribe_to_topic_sync(char* topic) {
hyan99 1:19c3299ea83a 167 pc_comm.printf("subcribing: %s\r\n", topic);
hyan99 2:16b3bd337db2 168 int rc = this->client->subscribe(topic, MQTT::QOS1, sync_arrived);
hyan99 0:3b4906b8a747 169 if (rc != 0) {
hyan99 0:3b4906b8a747 170 pc_comm.printf("There was a problem subscribing: %d\r\n", rc);
hyan99 0:3b4906b8a747 171
hyan99 0:3b4906b8a747 172 // client->disconnect();
hyan99 0:3b4906b8a747 173 return -1;
hyan99 0:3b4906b8a747 174 } else {
hyan99 0:3b4906b8a747 175 pc_comm.printf("Subscribed!\r\n");
hyan99 0:3b4906b8a747 176 }
hyan99 0:3b4906b8a747 177 return rc;
hyan99 0:3b4906b8a747 178 }
hyan99 0:3b4906b8a747 179
hyan99 2:16b3bd337db2 180 void Communication::publish_car(int id, int speed, int position) {
hyan99 2:16b3bd337db2 181 message_t message;
hyan99 2:16b3bd337db2 182 message.id = (char) id;
hyan99 2:16b3bd337db2 183 message.speed = (char) speed;
hyan99 2:16b3bd337db2 184 message.position = (char) position;
hyan99 2:16b3bd337db2 185
hyan99 2:16b3bd337db2 186 pc_comm.printf("publish_car:%d,%d,%d\r\n",id, speed, position);
hyan99 2:16b3bd337db2 187 mutex->lock();
hyan99 2:16b3bd337db2 188 q->push(message);
hyan99 2:16b3bd337db2 189 mutex->unlock();
hyan99 0:3b4906b8a747 190 }
hyan99 0:3b4906b8a747 191
hyan99 2:16b3bd337db2 192 void Communication::publish_road(int num) {
hyan99 2:16b3bd337db2 193 message_t message;
hyan99 2:16b3bd337db2 194 message.id = -1; // indicate this is for sync
hyan99 2:16b3bd337db2 195 message.position = (char) num; // use position as road update number
hyan99 2:16b3bd337db2 196
hyan99 2:16b3bd337db2 197 mutex->lock();
hyan99 2:16b3bd337db2 198 q->push(message);
hyan99 2:16b3bd337db2 199 mutex->unlock();
hyan99 0:3b4906b8a747 200 }
hyan99 0:3b4906b8a747 201
hyan99 0:3b4906b8a747 202 void Communication::start() {
hyan99 2:16b3bd337db2 203 while (true) {
hyan99 2:16b3bd337db2 204 mutex->lock();
hyan99 2:16b3bd337db2 205 // pc_comm.printf("queue size = %d\r\n", q.size());
hyan99 2:16b3bd337db2 206 if (q->size() > 0) {
hyan99 2:16b3bd337db2 207 message_t &message = q->front();
hyan99 2:16b3bd337db2 208 q->pop();
hyan99 2:16b3bd337db2 209 mutex->unlock();
hyan99 2:16b3bd337db2 210 if (message.id == 255 || message.id == -1) {
hyan99 2:16b3bd337db2 211 char buf[2];
hyan99 2:16b3bd337db2 212 buf[0] = (char) message.position; // update number
hyan99 2:16b3bd337db2 213 buf[1] = '\0';
hyan99 2:16b3bd337db2 214
hyan99 2:16b3bd337db2 215 int rc = this->client->publish(topic_pub_receive, (char*) buf, strlen(buf)+1, MQTT::QOS1);
hyan99 2:16b3bd337db2 216 if (rc != 0) {
hyan99 2:16b3bd337db2 217 pc_comm.printf("Failed to publish Road info to: %s", topic_pub_receive);
hyan99 2:16b3bd337db2 218 } else {
hyan99 2:16b3bd337db2 219 pc_comm.printf("Message sent! %s(%d)\r\n", topic_pub_receive, (int)buf[0]);
hyan99 2:16b3bd337db2 220 }
hyan99 2:16b3bd337db2 221 } else {
hyan99 2:16b3bd337db2 222 char buf[4];
hyan99 2:16b3bd337db2 223 buf[0] = message.id;
hyan99 2:16b3bd337db2 224 buf[1] = message.position;
hyan99 2:16b3bd337db2 225 buf[2] = message.speed;
hyan99 2:16b3bd337db2 226 buf[3] = '\0';
hyan99 2:16b3bd337db2 227
hyan99 2:16b3bd337db2 228 int rc = this->client->publish(topic_pub_position, (char*) buf, strlen(buf)+1, MQTT::QOS1);
hyan99 2:16b3bd337db2 229 if (rc != 0) {
hyan99 2:16b3bd337db2 230 pc_comm.printf("Failed to publish AccCar info to: %s", topic_pub_position);
hyan99 2:16b3bd337db2 231 } else {
hyan99 2:16b3bd337db2 232 pc_comm.printf("Message sent! (%d, %d, %d)\r\n", (int)buf[0], (int)buf[1], (int)buf[2]);
hyan99 2:16b3bd337db2 233 }
hyan99 2:16b3bd337db2 234 }
hyan99 2:16b3bd337db2 235 // q->pop();
hyan99 2:16b3bd337db2 236 // mutex->unlock();
hyan99 2:16b3bd337db2 237 this->client->yield(500); // wait for 100ms
hyan99 2:16b3bd337db2 238 } else {
hyan99 2:16b3bd337db2 239 mutex->unlock();
hyan99 2:16b3bd337db2 240 ThisThread::sleep_for(500);
hyan99 2:16b3bd337db2 241 }
hyan99 2:16b3bd337db2 242 }
hyan99 2:16b3bd337db2 243 /*
hyan99 2:16b3bd337db2 244 while(true) {
hyan99 2:16b3bd337db2 245 pc_comm.printf("queue size = %d\r\n", q_car.count());
hyan99 0:3b4906b8a747 246 osEvent evt = q_car.get();
hyan99 0:3b4906b8a747 247 if (evt.status == osEventMessage) {
hyan99 2:16b3bd337db2 248 pc_comm.printf("Gets one message.\r\n");
hyan99 0:3b4906b8a747 249 message_t *message = (message_t*)evt.value.p;
hyan99 2:16b3bd337db2 250 // char* topic = message->topic;
hyan99 2:16b3bd337db2 251 char* topic = "Rahman/Sync/Send/1";
hyan99 0:3b4906b8a747 252
hyan99 0:3b4906b8a747 253 if (message->speed == -1) {
hyan99 0:3b4906b8a747 254 char buf[1];
hyan99 0:3b4906b8a747 255 buf[0] = (char) message->position; // update number
hyan99 0:3b4906b8a747 256
hyan99 2:16b3bd337db2 257 int rc = this->client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1);
hyan99 0:3b4906b8a747 258 if (rc != 0) {
hyan99 0:3b4906b8a747 259 pc_comm.printf("Failed to publish Road info: %s", topic);
hyan99 0:3b4906b8a747 260 } else {
hyan99 0:3b4906b8a747 261 pc_comm.printf("Message sent!\r\n");
hyan99 0:3b4906b8a747 262 }
hyan99 0:3b4906b8a747 263 } else {
hyan99 0:3b4906b8a747 264 char buf[2];
hyan99 0:3b4906b8a747 265 buf[0] = (char) message->position;
hyan99 0:3b4906b8a747 266 buf[1] = (char) message->speed;
hyan99 0:3b4906b8a747 267
hyan99 2:16b3bd337db2 268 int rc = this->client->publish(topic, (char*) buf, strlen(buf)+1, MQTT::QOS1);
hyan99 0:3b4906b8a747 269 if (rc != 0) {
hyan99 0:3b4906b8a747 270 pc_comm.printf("Failed to publish AccCar info: %s", topic);
hyan99 0:3b4906b8a747 271 } else {
hyan99 0:3b4906b8a747 272 pc_comm.printf("Message sent!\r\n");
hyan99 0:3b4906b8a747 273 }
hyan99 0:3b4906b8a747 274 }
hyan99 0:3b4906b8a747 275 mpool.free(message);
hyan99 2:16b3bd337db2 276 this->client->yield(100); // wait for 100ms
hyan99 0:3b4906b8a747 277 }
hyan99 0:3b4906b8a747 278 }
hyan99 2:16b3bd337db2 279 */
hyan99 0:3b4906b8a747 280 }
hyan99 0:3b4906b8a747 281
hyan99 0:3b4906b8a747 282 void Communication::reset() {
hyan99 0:3b4906b8a747 283 control_flags.clear();
hyan99 2:16b3bd337db2 284 sync_flags.clear();
hyan99 0:3b4906b8a747 285
hyan99 0:3b4906b8a747 286 if (thread != NULL) {
hyan99 0:3b4906b8a747 287 thread->terminate();
hyan99 0:3b4906b8a747 288 }
hyan99 0:3b4906b8a747 289
hyan99 0:3b4906b8a747 290 thread = new Thread();
hyan99 0:3b4906b8a747 291 thread->start( callback(this, &Communication::start) );
hyan99 0:3b4906b8a747 292 }
hyan99 0:3b4906b8a747 293
hyan99 0:3b4906b8a747 294 void Communication::stop() {
hyan99 0:3b4906b8a747 295 if (thread != NULL) {
hyan99 0:3b4906b8a747 296 thread->terminate();
hyan99 0:3b4906b8a747 297 }
hyan99 0:3b4906b8a747 298 }