Dependencies:   MQTT

Committer:
MannyK
Date:
Fri Dec 06 20:34:52 2019 +0000
Revision:
4:64c6fc70ddb7
Parent:
2:f10d6fecb345
Child:
5:e0d8e5e922f1
Road sync

Who changed what in which revision?

UserRevisionLine numberNew contents of line
micallef25 2:f10d6fecb345 1 /* mbed Microcontroller Library
micallef25 2:f10d6fecb345 2 * Copyright (c) 2018 ARM Limited
micallef25 2:f10d6fecb345 3 * SPDX-License-Identifier: Apache-2.0
micallef25 2:f10d6fecb345 4 */
micallef25 2:f10d6fecb345 5
micallef25 2:f10d6fecb345 6 #include "mbed.h"
micallef25 2:f10d6fecb345 7 #include "MQTTNetwork.h"
micallef25 2:f10d6fecb345 8 #include "MQTTClient.h"
micallef25 2:f10d6fecb345 9 #include "MQTTmbed.h"
micallef25 2:f10d6fecb345 10 #include "mqtt.h"
micallef25 2:f10d6fecb345 11 #include <assert.h>
micallef25 2:f10d6fecb345 12 #include "AccCar.h"
micallef25 2:f10d6fecb345 13
micallef25 2:f10d6fecb345 14 // topics interested in
micallef25 2:f10d6fecb345 15 #define POSITION_TOPIC "MQTT_Position0x25"
MannyK 4:64c6fc70ddb7 16 #define ROAD_TOPIC "MQTT_Road0x25"
micallef25 2:f10d6fecb345 17 #define CONTROL_TOPIC "MQTT_Control0x25"
micallef25 2:f10d6fecb345 18
micallef25 2:f10d6fecb345 19 #define DEBUG_MQTT
micallef25 2:f10d6fecb345 20
micallef25 2:f10d6fecb345 21 Serial mqtt_pc (USBTX, USBRX);
micallef25 2:f10d6fecb345 22
micallef25 2:f10d6fecb345 23
micallef25 2:f10d6fecb345 24 // empty constructor
micallef25 2:f10d6fecb345 25 mqtt::mqtt()
micallef25 2:f10d6fecb345 26 {
micallef25 2:f10d6fecb345 27
micallef25 2:f10d6fecb345 28 }
micallef25 2:f10d6fecb345 29
micallef25 2:f10d6fecb345 30 /*
micallef25 2:f10d6fecb345 31 This function sets up the wifi module and connects it to the SSID
micallef25 2:f10d6fecb345 32 configured in the configuration file. It also prints out the MAC address
micallef25 2:f10d6fecb345 33 of the module, which is needed if you are trying to use campus wifi.
micallef25 2:f10d6fecb345 34 This function returns NULL if there are any issues.
micallef25 2:f10d6fecb345 35 */
micallef25 2:f10d6fecb345 36 WiFiInterface* mqtt::setup_wifi() {
micallef25 2:f10d6fecb345 37 // Get a handle to the WiFi module
micallef25 2:f10d6fecb345 38 WiFiInterface* wifi = WiFiInterface::get_default_instance();
micallef25 2:f10d6fecb345 39
micallef25 2:f10d6fecb345 40 // Connect the module to the wifi, based on the SSID and password
micallef25 2:f10d6fecb345 41 // specified in the mbed_app.json configuration file
micallef25 2:f10d6fecb345 42 // If you are using AirPennNet-Device, this will not succeed until the MAC
micallef25 2:f10d6fecb345 43 // address (printed shortly after this) is registered
micallef25 2:f10d6fecb345 44 mqtt_pc.printf("Connecting to wifi\r\n");
micallef25 2:f10d6fecb345 45 int rc = wifi->connect(MBED_CONF_APP_WIFI_SSID, MBED_CONF_APP_WIFI_PASSWORD, NSAPI_SECURITY_WPA_WPA2);
micallef25 2:f10d6fecb345 46
micallef25 2:f10d6fecb345 47 // Print out the MAC address of the wifi module. The MAC address is
micallef25 2:f10d6fecb345 48 // needed to register the device with AirPennNet-Device, so that you
micallef25 2:f10d6fecb345 49 // can use the campus wifi
micallef25 2:f10d6fecb345 50 mqtt_pc.printf("MAC Address: ");
micallef25 2:f10d6fecb345 51 mqtt_pc.printf(wifi->get_mac_address());
micallef25 2:f10d6fecb345 52 mqtt_pc.printf("\r\n");
micallef25 2:f10d6fecb345 53
micallef25 2:f10d6fecb345 54 if (rc != 0) {
micallef25 2:f10d6fecb345 55 mqtt_pc.printf("Problem connecting to wifi\r\n");
micallef25 2:f10d6fecb345 56 return NULL;
micallef25 2:f10d6fecb345 57 } else {
micallef25 2:f10d6fecb345 58 mqtt_pc.printf("Wifi connected\r\n");
micallef25 2:f10d6fecb345 59 }
micallef25 2:f10d6fecb345 60
micallef25 2:f10d6fecb345 61 return wifi;
micallef25 2:f10d6fecb345 62 }
micallef25 2:f10d6fecb345 63
micallef25 2:f10d6fecb345 64 /*
micallef25 2:f10d6fecb345 65 This function creates the MQTT client and connects it to the MQTT broker
micallef25 2:f10d6fecb345 66 that we have setup for the course. If there are any errors with the
micallef25 2:f10d6fecb345 67 connection, it will return NULL
micallef25 2:f10d6fecb345 68 */
micallef25 2:f10d6fecb345 69 MQTT::Client<MQTTNetwork, Countdown>* mqtt::setup_mqtt(MQTTNetwork& network) {
micallef25 2:f10d6fecb345 70 // the hostname and port point to a Google Cloud MQTT server we setup for
micallef25 2:f10d6fecb345 71 // this project
micallef25 2:f10d6fecb345 72 const char* hostname = "34.68.206.11";
micallef25 2:f10d6fecb345 73 int port = 1883;
micallef25 2:f10d6fecb345 74
micallef25 2:f10d6fecb345 75 // Create the underlying network connection to the MQTT server
micallef25 2:f10d6fecb345 76 mqtt_pc.printf("Connecting to %s:%d\r\n", hostname, port);
micallef25 2:f10d6fecb345 77 int rc = network.connect(hostname, port);
micallef25 2:f10d6fecb345 78 if (rc != 0) {
micallef25 2:f10d6fecb345 79 mqtt_pc.printf("There was an error with the TCP connect: %d\r\n", rc);
micallef25 2:f10d6fecb345 80 return NULL;
micallef25 2:f10d6fecb345 81 }
micallef25 2:f10d6fecb345 82
micallef25 2:f10d6fecb345 83 mqtt_pc.printf("Connected to %s:%d\r\n", hostname, port);
micallef25 2:f10d6fecb345 84
micallef25 2:f10d6fecb345 85 // Connect the MQTT client to the server
micallef25 2:f10d6fecb345 86 MQTT::Client<MQTTNetwork, Countdown>* temp_client = new MQTT::Client<MQTTNetwork, Countdown>(network);
micallef25 2:f10d6fecb345 87 rc = temp_client->connect();
micallef25 2:f10d6fecb345 88 if (rc != 0) {
micallef25 2:f10d6fecb345 89 mqtt_pc.printf("There was an error with the MQTT connect: %d\r\n", rc);
micallef25 2:f10d6fecb345 90 return NULL;
micallef25 2:f10d6fecb345 91 }
micallef25 2:f10d6fecb345 92
micallef25 2:f10d6fecb345 93 mqtt_pc.printf("MQTT connect successful!\r\n");
micallef25 2:f10d6fecb345 94
micallef25 2:f10d6fecb345 95 return temp_client;
micallef25 2:f10d6fecb345 96 }
micallef25 2:f10d6fecb345 97
micallef25 2:f10d6fecb345 98 /*
micallef25 2:f10d6fecb345 99 This function is the callback for when a message is received from the
micallef25 2:f10d6fecb345 100 MQTT broker. You register different callback functions with different
micallef25 2:f10d6fecb345 101 topic subscriptions
micallef25 2:f10d6fecb345 102 */
micallef25 2:f10d6fecb345 103 void mqtt::control_message_arrived(MQTT::MessageData& md)
micallef25 2:f10d6fecb345 104 {
micallef25 2:f10d6fecb345 105 MQTT::Message &message = md.message;
micallef25 2:f10d6fecb345 106 // make message
micallef25 2:f10d6fecb345 107 control_msg_t* msg = new control_msg_t;
micallef25 2:f10d6fecb345 108 assert(msg != NULL && message.payloadlen == sizeof(control_msg_t));
micallef25 2:f10d6fecb345 109
micallef25 2:f10d6fecb345 110 // copy to our new pointer for some reason just taking the payload ptr is bad for mbed?
micallef25 2:f10d6fecb345 111 memcpy(msg,message.payload,message.payloadlen);
micallef25 2:f10d6fecb345 112
micallef25 2:f10d6fecb345 113 #ifdef DEBUG_MQTT
micallef25 2:f10d6fecb345 114 //mqtt_pc.printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
micallef25 2:f10d6fecb345 115 mqtt_pc.printf("speed %d id %d \r\n", msg->speed, msg->car_id);
micallef25 2:f10d6fecb345 116 #endif
micallef25 2:f10d6fecb345 117
micallef25 2:f10d6fecb345 118 // add our message to the queue no fucking clue what happens internally to
micallef25 2:f10d6fecb345 119 // the message memory
micallef25 2:f10d6fecb345 120 mqtt::instance()->add_to_control_queue(msg->car_id,msg);
micallef25 2:f10d6fecb345 121
micallef25 2:f10d6fecb345 122 // free message
micallef25 2:f10d6fecb345 123 delete msg;
micallef25 2:f10d6fecb345 124 }
micallef25 2:f10d6fecb345 125
micallef25 2:f10d6fecb345 126 /*
micallef25 2:f10d6fecb345 127 This function sends a message to the test topic.
micallef25 2:f10d6fecb345 128 */
micallef25 2:f10d6fecb345 129 int mqtt::send_position_msg( position_msg_t* msg )
micallef25 2:f10d6fecb345 130 {
micallef25 2:f10d6fecb345 131 assert(msg != NULL && client != NULL);
micallef25 2:f10d6fecb345 132
micallef25 2:f10d6fecb345 133 MQTT::Message message;
micallef25 2:f10d6fecb345 134
micallef25 2:f10d6fecb345 135 // might be safest to memcopy thius seems to work
micallef25 2:f10d6fecb345 136 //memcpy(message.payload,msg,sizeof(position_msg_t));
micallef25 2:f10d6fecb345 137 message.payload = (void*)msg;
micallef25 2:f10d6fecb345 138 message.payloadlen = sizeof(position_msg_t);
micallef25 2:f10d6fecb345 139 message.qos = MQTT::QOS1;
micallef25 2:f10d6fecb345 140
micallef25 2:f10d6fecb345 141 #ifdef DEBUG_MQTT
micallef25 2:f10d6fecb345 142 mqtt_pc.printf("Sending a message!\r\n");
micallef25 2:f10d6fecb345 143 #endif
micallef25 2:f10d6fecb345 144
micallef25 2:f10d6fecb345 145 int rc = client->publish(POSITION_TOPIC,message);
micallef25 2:f10d6fecb345 146 assert(rc == 0);
micallef25 2:f10d6fecb345 147
micallef25 2:f10d6fecb345 148 client->yield(1000);
micallef25 2:f10d6fecb345 149 return 0;
micallef25 2:f10d6fecb345 150 }
micallef25 2:f10d6fecb345 151
MannyK 4:64c6fc70ddb7 152 int mqtt::send_road_msg( road_msg_t* msg )
MannyK 4:64c6fc70ddb7 153 {
MannyK 4:64c6fc70ddb7 154 assert(msg != NULL && client != NULL);
MannyK 4:64c6fc70ddb7 155
MannyK 4:64c6fc70ddb7 156 MQTT::Message message;
MannyK 4:64c6fc70ddb7 157
MannyK 4:64c6fc70ddb7 158 // might be safest to memcopy thius seems to work
MannyK 4:64c6fc70ddb7 159 //memcpy(message.payload,msg,sizeof(position_msg_t));
MannyK 4:64c6fc70ddb7 160 message.payload = (void*)msg;
MannyK 4:64c6fc70ddb7 161 message.payloadlen = sizeof(position_msg_t);
MannyK 4:64c6fc70ddb7 162 message.qos = MQTT::QOS1;
MannyK 4:64c6fc70ddb7 163
MannyK 4:64c6fc70ddb7 164 #ifdef DEBUG_MQTT
MannyK 4:64c6fc70ddb7 165 mqtt_pc.printf("Sending a message!\r\n");
MannyK 4:64c6fc70ddb7 166 #endif
MannyK 4:64c6fc70ddb7 167
MannyK 4:64c6fc70ddb7 168 int rc = client->publish(ROAD_TOPIC,message);
MannyK 4:64c6fc70ddb7 169 assert(rc == 0);
MannyK 4:64c6fc70ddb7 170
MannyK 4:64c6fc70ddb7 171 client->yield(1000);
MannyK 4:64c6fc70ddb7 172 return 0;
MannyK 4:64c6fc70ddb7 173 }
MannyK 4:64c6fc70ddb7 174
micallef25 2:f10d6fecb345 175 void mqtt::bringup_network() {
micallef25 2:f10d6fecb345 176
micallef25 2:f10d6fecb345 177 //
micallef25 2:f10d6fecb345 178 WiFiInterface* wifi = setup_wifi();
micallef25 2:f10d6fecb345 179 assert(wifi != NULL);
micallef25 2:f10d6fecb345 180
micallef25 2:f10d6fecb345 181 // Create the network object needed by the MQTT client
micallef25 2:f10d6fecb345 182 new_network = new MQTTNetwork(wifi);
micallef25 2:f10d6fecb345 183
micallef25 2:f10d6fecb345 184 // get client
micallef25 2:f10d6fecb345 185 client = setup_mqtt(*new_network);
micallef25 2:f10d6fecb345 186 assert(client != NULL);
micallef25 2:f10d6fecb345 187
micallef25 2:f10d6fecb345 188 // Subscribe to a topic / register a callback
micallef25 2:f10d6fecb345 189 mqtt_pc.printf("Subscribing to topic %s\r\n", CONTROL_TOPIC);
micallef25 2:f10d6fecb345 190 int rc = client->subscribe(CONTROL_TOPIC, MQTT::QOS1, control_message_arrived);
micallef25 2:f10d6fecb345 191 assert(rc == 0);
micallef25 2:f10d6fecb345 192
micallef25 2:f10d6fecb345 193 // make a road based of mqtt id
micallef25 2:f10d6fecb345 194 mqtt_id = 0;
micallef25 2:f10d6fecb345 195 if(strcmp(wifi->get_mac_address(),"2c:3a:e8:0b:75:06") == 0){
micallef25 2:f10d6fecb345 196 mqtt_id = 0;
micallef25 2:f10d6fecb345 197 }
micallef25 2:f10d6fecb345 198 else{
micallef25 2:f10d6fecb345 199 mqtt_id = 1;
micallef25 2:f10d6fecb345 200 }
micallef25 2:f10d6fecb345 201
micallef25 2:f10d6fecb345 202 mqtt_pc.printf("Subscribed, Setup complete!\r\n");
micallef25 2:f10d6fecb345 203 }
micallef25 2:f10d6fecb345 204
micallef25 2:f10d6fecb345 205
micallef25 2:f10d6fecb345 206 // manage callbacks from mqtt
micallef25 2:f10d6fecb345 207 void mqtt::manage_network()
micallef25 2:f10d6fecb345 208 {
micallef25 2:f10d6fecb345 209 while(true)
micallef25 2:f10d6fecb345 210 {
micallef25 2:f10d6fecb345 211 //
micallef25 2:f10d6fecb345 212 osEvent evt = position_queue.get();
micallef25 2:f10d6fecb345 213 assert(evt.status == osEventMessage);
micallef25 2:f10d6fecb345 214
micallef25 2:f10d6fecb345 215 //
micallef25 2:f10d6fecb345 216 position_msg_t *message = (position_msg_t*)evt.value.p;
micallef25 2:f10d6fecb345 217 send_position_msg(message);
MannyK 4:64c6fc70ddb7 218
MannyK 4:64c6fc70ddb7 219 if (!road_queue.empty()){
MannyK 4:64c6fc70ddb7 220 osEvent revt = road_queue.get();
MannyK 4:64c6fc70ddb7 221 assert(revt.status == osEventMessage);
MannyK 4:64c6fc70ddb7 222 //
MannyK 4:64c6fc70ddb7 223 road_msg_t *road_msg = (road_msg_t*)revt.value.p;
MannyK 4:64c6fc70ddb7 224 send_road_msg(road_msg);
MannyK 4:64c6fc70ddb7 225 }
micallef25 2:f10d6fecb345 226 // client->yield(100);
micallef25 2:f10d6fecb345 227 }
micallef25 2:f10d6fecb345 228 }
micallef25 2:f10d6fecb345 229
micallef25 2:f10d6fecb345 230 //
micallef25 2:f10d6fecb345 231 // clean up goes here
micallef25 2:f10d6fecb345 232 void mqtt::shutdown_network()
micallef25 2:f10d6fecb345 233 {
micallef25 2:f10d6fecb345 234 //
micallef25 2:f10d6fecb345 235 mqtt_pc.printf("shutting down mbed\r\n");
micallef25 2:f10d6fecb345 236
micallef25 2:f10d6fecb345 237 //
micallef25 2:f10d6fecb345 238 client->disconnect();
micallef25 2:f10d6fecb345 239 delete new_network;
micallef25 2:f10d6fecb345 240
micallef25 2:f10d6fecb345 241 //
micallef25 2:f10d6fecb345 242 thread->terminate();
micallef25 2:f10d6fecb345 243 delete thread;
micallef25 2:f10d6fecb345 244 }
micallef25 2:f10d6fecb345 245
micallef25 2:f10d6fecb345 246 // launch network manager thread
micallef25 2:f10d6fecb345 247 // responsible for pub sub
micallef25 2:f10d6fecb345 248 void mqtt::setup_network()
micallef25 2:f10d6fecb345 249 {
micallef25 2:f10d6fecb345 250 // bring up network if anything bad happens we will assert and crash
micallef25 2:f10d6fecb345 251 bringup_network();
micallef25 2:f10d6fecb345 252
micallef25 2:f10d6fecb345 253 // create a new thread thats sole purpose in life is to call client yield
micallef25 2:f10d6fecb345 254 // what a sad life for a thread
micallef25 2:f10d6fecb345 255 // calling yield is necessary as we will not get any messages for the mqtt
micallef25 2:f10d6fecb345 256 // serverwithout it
micallef25 2:f10d6fecb345 257 thread = new Thread();
micallef25 2:f10d6fecb345 258 assert(thread != NULL);
micallef25 2:f10d6fecb345 259 thread->start( callback(this,&mqtt::manage_network) );
micallef25 2:f10d6fecb345 260
micallef25 2:f10d6fecb345 261 }