Emanuel Kuflik
/
smat_controller
mqtt.cpp@4:64c6fc70ddb7, 2019-12-06 (annotated)
- Committer:
- MannyK
- Date:
- Fri Dec 06 20:34:52 2019 +0000
- Revision:
- 4:64c6fc70ddb7
- Parent:
- 2:f10d6fecb345
- Child:
- 5:e0d8e5e922f1
Road sync
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
micallef25 | 2:f10d6fecb345 | 1 | /* mbed Microcontroller Library |
micallef25 | 2:f10d6fecb345 | 2 | * Copyright (c) 2018 ARM Limited |
micallef25 | 2:f10d6fecb345 | 3 | * SPDX-License-Identifier: Apache-2.0 |
micallef25 | 2:f10d6fecb345 | 4 | */ |
micallef25 | 2:f10d6fecb345 | 5 | |
micallef25 | 2:f10d6fecb345 | 6 | #include "mbed.h" |
micallef25 | 2:f10d6fecb345 | 7 | #include "MQTTNetwork.h" |
micallef25 | 2:f10d6fecb345 | 8 | #include "MQTTClient.h" |
micallef25 | 2:f10d6fecb345 | 9 | #include "MQTTmbed.h" |
micallef25 | 2:f10d6fecb345 | 10 | #include "mqtt.h" |
micallef25 | 2:f10d6fecb345 | 11 | #include <assert.h> |
micallef25 | 2:f10d6fecb345 | 12 | #include "AccCar.h" |
micallef25 | 2:f10d6fecb345 | 13 | |
micallef25 | 2:f10d6fecb345 | 14 | // topics interested in |
micallef25 | 2:f10d6fecb345 | 15 | #define POSITION_TOPIC "MQTT_Position0x25" |
MannyK | 4:64c6fc70ddb7 | 16 | #define ROAD_TOPIC "MQTT_Road0x25" |
micallef25 | 2:f10d6fecb345 | 17 | #define CONTROL_TOPIC "MQTT_Control0x25" |
micallef25 | 2:f10d6fecb345 | 18 | |
micallef25 | 2:f10d6fecb345 | 19 | #define DEBUG_MQTT |
micallef25 | 2:f10d6fecb345 | 20 | |
micallef25 | 2:f10d6fecb345 | 21 | Serial mqtt_pc (USBTX, USBRX); |
micallef25 | 2:f10d6fecb345 | 22 | |
micallef25 | 2:f10d6fecb345 | 23 | |
micallef25 | 2:f10d6fecb345 | 24 | // empty constructor |
micallef25 | 2:f10d6fecb345 | 25 | mqtt::mqtt() |
micallef25 | 2:f10d6fecb345 | 26 | { |
micallef25 | 2:f10d6fecb345 | 27 | |
micallef25 | 2:f10d6fecb345 | 28 | } |
micallef25 | 2:f10d6fecb345 | 29 | |
micallef25 | 2:f10d6fecb345 | 30 | /* |
micallef25 | 2:f10d6fecb345 | 31 | This function sets up the wifi module and connects it to the SSID |
micallef25 | 2:f10d6fecb345 | 32 | configured in the configuration file. It also prints out the MAC address |
micallef25 | 2:f10d6fecb345 | 33 | of the module, which is needed if you are trying to use campus wifi. |
micallef25 | 2:f10d6fecb345 | 34 | This function returns NULL if there are any issues. |
micallef25 | 2:f10d6fecb345 | 35 | */ |
micallef25 | 2:f10d6fecb345 | 36 | WiFiInterface* mqtt::setup_wifi() { |
micallef25 | 2:f10d6fecb345 | 37 | // Get a handle to the WiFi module |
micallef25 | 2:f10d6fecb345 | 38 | WiFiInterface* wifi = WiFiInterface::get_default_instance(); |
micallef25 | 2:f10d6fecb345 | 39 | |
micallef25 | 2:f10d6fecb345 | 40 | // Connect the module to the wifi, based on the SSID and password |
micallef25 | 2:f10d6fecb345 | 41 | // specified in the mbed_app.json configuration file |
micallef25 | 2:f10d6fecb345 | 42 | // If you are using AirPennNet-Device, this will not succeed until the MAC |
micallef25 | 2:f10d6fecb345 | 43 | // address (printed shortly after this) is registered |
micallef25 | 2:f10d6fecb345 | 44 | mqtt_pc.printf("Connecting to wifi\r\n"); |
micallef25 | 2:f10d6fecb345 | 45 | int rc = wifi->connect(MBED_CONF_APP_WIFI_SSID, MBED_CONF_APP_WIFI_PASSWORD, NSAPI_SECURITY_WPA_WPA2); |
micallef25 | 2:f10d6fecb345 | 46 | |
micallef25 | 2:f10d6fecb345 | 47 | // Print out the MAC address of the wifi module. The MAC address is |
micallef25 | 2:f10d6fecb345 | 48 | // needed to register the device with AirPennNet-Device, so that you |
micallef25 | 2:f10d6fecb345 | 49 | // can use the campus wifi |
micallef25 | 2:f10d6fecb345 | 50 | mqtt_pc.printf("MAC Address: "); |
micallef25 | 2:f10d6fecb345 | 51 | mqtt_pc.printf(wifi->get_mac_address()); |
micallef25 | 2:f10d6fecb345 | 52 | mqtt_pc.printf("\r\n"); |
micallef25 | 2:f10d6fecb345 | 53 | |
micallef25 | 2:f10d6fecb345 | 54 | if (rc != 0) { |
micallef25 | 2:f10d6fecb345 | 55 | mqtt_pc.printf("Problem connecting to wifi\r\n"); |
micallef25 | 2:f10d6fecb345 | 56 | return NULL; |
micallef25 | 2:f10d6fecb345 | 57 | } else { |
micallef25 | 2:f10d6fecb345 | 58 | mqtt_pc.printf("Wifi connected\r\n"); |
micallef25 | 2:f10d6fecb345 | 59 | } |
micallef25 | 2:f10d6fecb345 | 60 | |
micallef25 | 2:f10d6fecb345 | 61 | return wifi; |
micallef25 | 2:f10d6fecb345 | 62 | } |
micallef25 | 2:f10d6fecb345 | 63 | |
micallef25 | 2:f10d6fecb345 | 64 | /* |
micallef25 | 2:f10d6fecb345 | 65 | This function creates the MQTT client and connects it to the MQTT broker |
micallef25 | 2:f10d6fecb345 | 66 | that we have setup for the course. If there are any errors with the |
micallef25 | 2:f10d6fecb345 | 67 | connection, it will return NULL |
micallef25 | 2:f10d6fecb345 | 68 | */ |
micallef25 | 2:f10d6fecb345 | 69 | MQTT::Client<MQTTNetwork, Countdown>* mqtt::setup_mqtt(MQTTNetwork& network) { |
micallef25 | 2:f10d6fecb345 | 70 | // the hostname and port point to a Google Cloud MQTT server we setup for |
micallef25 | 2:f10d6fecb345 | 71 | // this project |
micallef25 | 2:f10d6fecb345 | 72 | const char* hostname = "34.68.206.11"; |
micallef25 | 2:f10d6fecb345 | 73 | int port = 1883; |
micallef25 | 2:f10d6fecb345 | 74 | |
micallef25 | 2:f10d6fecb345 | 75 | // Create the underlying network connection to the MQTT server |
micallef25 | 2:f10d6fecb345 | 76 | mqtt_pc.printf("Connecting to %s:%d\r\n", hostname, port); |
micallef25 | 2:f10d6fecb345 | 77 | int rc = network.connect(hostname, port); |
micallef25 | 2:f10d6fecb345 | 78 | if (rc != 0) { |
micallef25 | 2:f10d6fecb345 | 79 | mqtt_pc.printf("There was an error with the TCP connect: %d\r\n", rc); |
micallef25 | 2:f10d6fecb345 | 80 | return NULL; |
micallef25 | 2:f10d6fecb345 | 81 | } |
micallef25 | 2:f10d6fecb345 | 82 | |
micallef25 | 2:f10d6fecb345 | 83 | mqtt_pc.printf("Connected to %s:%d\r\n", hostname, port); |
micallef25 | 2:f10d6fecb345 | 84 | |
micallef25 | 2:f10d6fecb345 | 85 | // Connect the MQTT client to the server |
micallef25 | 2:f10d6fecb345 | 86 | MQTT::Client<MQTTNetwork, Countdown>* temp_client = new MQTT::Client<MQTTNetwork, Countdown>(network); |
micallef25 | 2:f10d6fecb345 | 87 | rc = temp_client->connect(); |
micallef25 | 2:f10d6fecb345 | 88 | if (rc != 0) { |
micallef25 | 2:f10d6fecb345 | 89 | mqtt_pc.printf("There was an error with the MQTT connect: %d\r\n", rc); |
micallef25 | 2:f10d6fecb345 | 90 | return NULL; |
micallef25 | 2:f10d6fecb345 | 91 | } |
micallef25 | 2:f10d6fecb345 | 92 | |
micallef25 | 2:f10d6fecb345 | 93 | mqtt_pc.printf("MQTT connect successful!\r\n"); |
micallef25 | 2:f10d6fecb345 | 94 | |
micallef25 | 2:f10d6fecb345 | 95 | return temp_client; |
micallef25 | 2:f10d6fecb345 | 96 | } |
micallef25 | 2:f10d6fecb345 | 97 | |
micallef25 | 2:f10d6fecb345 | 98 | /* |
micallef25 | 2:f10d6fecb345 | 99 | This function is the callback for when a message is received from the |
micallef25 | 2:f10d6fecb345 | 100 | MQTT broker. You register different callback functions with different |
micallef25 | 2:f10d6fecb345 | 101 | topic subscriptions |
micallef25 | 2:f10d6fecb345 | 102 | */ |
micallef25 | 2:f10d6fecb345 | 103 | void mqtt::control_message_arrived(MQTT::MessageData& md) |
micallef25 | 2:f10d6fecb345 | 104 | { |
micallef25 | 2:f10d6fecb345 | 105 | MQTT::Message &message = md.message; |
micallef25 | 2:f10d6fecb345 | 106 | // make message |
micallef25 | 2:f10d6fecb345 | 107 | control_msg_t* msg = new control_msg_t; |
micallef25 | 2:f10d6fecb345 | 108 | assert(msg != NULL && message.payloadlen == sizeof(control_msg_t)); |
micallef25 | 2:f10d6fecb345 | 109 | |
micallef25 | 2:f10d6fecb345 | 110 | // copy to our new pointer for some reason just taking the payload ptr is bad for mbed? |
micallef25 | 2:f10d6fecb345 | 111 | memcpy(msg,message.payload,message.payloadlen); |
micallef25 | 2:f10d6fecb345 | 112 | |
micallef25 | 2:f10d6fecb345 | 113 | #ifdef DEBUG_MQTT |
micallef25 | 2:f10d6fecb345 | 114 | //mqtt_pc.printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id); |
micallef25 | 2:f10d6fecb345 | 115 | mqtt_pc.printf("speed %d id %d \r\n", msg->speed, msg->car_id); |
micallef25 | 2:f10d6fecb345 | 116 | #endif |
micallef25 | 2:f10d6fecb345 | 117 | |
micallef25 | 2:f10d6fecb345 | 118 | // add our message to the queue no fucking clue what happens internally to |
micallef25 | 2:f10d6fecb345 | 119 | // the message memory |
micallef25 | 2:f10d6fecb345 | 120 | mqtt::instance()->add_to_control_queue(msg->car_id,msg); |
micallef25 | 2:f10d6fecb345 | 121 | |
micallef25 | 2:f10d6fecb345 | 122 | // free message |
micallef25 | 2:f10d6fecb345 | 123 | delete msg; |
micallef25 | 2:f10d6fecb345 | 124 | } |
micallef25 | 2:f10d6fecb345 | 125 | |
micallef25 | 2:f10d6fecb345 | 126 | /* |
micallef25 | 2:f10d6fecb345 | 127 | This function sends a message to the test topic. |
micallef25 | 2:f10d6fecb345 | 128 | */ |
micallef25 | 2:f10d6fecb345 | 129 | int mqtt::send_position_msg( position_msg_t* msg ) |
micallef25 | 2:f10d6fecb345 | 130 | { |
micallef25 | 2:f10d6fecb345 | 131 | assert(msg != NULL && client != NULL); |
micallef25 | 2:f10d6fecb345 | 132 | |
micallef25 | 2:f10d6fecb345 | 133 | MQTT::Message message; |
micallef25 | 2:f10d6fecb345 | 134 | |
micallef25 | 2:f10d6fecb345 | 135 | // might be safest to memcopy thius seems to work |
micallef25 | 2:f10d6fecb345 | 136 | //memcpy(message.payload,msg,sizeof(position_msg_t)); |
micallef25 | 2:f10d6fecb345 | 137 | message.payload = (void*)msg; |
micallef25 | 2:f10d6fecb345 | 138 | message.payloadlen = sizeof(position_msg_t); |
micallef25 | 2:f10d6fecb345 | 139 | message.qos = MQTT::QOS1; |
micallef25 | 2:f10d6fecb345 | 140 | |
micallef25 | 2:f10d6fecb345 | 141 | #ifdef DEBUG_MQTT |
micallef25 | 2:f10d6fecb345 | 142 | mqtt_pc.printf("Sending a message!\r\n"); |
micallef25 | 2:f10d6fecb345 | 143 | #endif |
micallef25 | 2:f10d6fecb345 | 144 | |
micallef25 | 2:f10d6fecb345 | 145 | int rc = client->publish(POSITION_TOPIC,message); |
micallef25 | 2:f10d6fecb345 | 146 | assert(rc == 0); |
micallef25 | 2:f10d6fecb345 | 147 | |
micallef25 | 2:f10d6fecb345 | 148 | client->yield(1000); |
micallef25 | 2:f10d6fecb345 | 149 | return 0; |
micallef25 | 2:f10d6fecb345 | 150 | } |
micallef25 | 2:f10d6fecb345 | 151 | |
MannyK | 4:64c6fc70ddb7 | 152 | int mqtt::send_road_msg( road_msg_t* msg ) |
MannyK | 4:64c6fc70ddb7 | 153 | { |
MannyK | 4:64c6fc70ddb7 | 154 | assert(msg != NULL && client != NULL); |
MannyK | 4:64c6fc70ddb7 | 155 | |
MannyK | 4:64c6fc70ddb7 | 156 | MQTT::Message message; |
MannyK | 4:64c6fc70ddb7 | 157 | |
MannyK | 4:64c6fc70ddb7 | 158 | // might be safest to memcopy thius seems to work |
MannyK | 4:64c6fc70ddb7 | 159 | //memcpy(message.payload,msg,sizeof(position_msg_t)); |
MannyK | 4:64c6fc70ddb7 | 160 | message.payload = (void*)msg; |
MannyK | 4:64c6fc70ddb7 | 161 | message.payloadlen = sizeof(position_msg_t); |
MannyK | 4:64c6fc70ddb7 | 162 | message.qos = MQTT::QOS1; |
MannyK | 4:64c6fc70ddb7 | 163 | |
MannyK | 4:64c6fc70ddb7 | 164 | #ifdef DEBUG_MQTT |
MannyK | 4:64c6fc70ddb7 | 165 | mqtt_pc.printf("Sending a message!\r\n"); |
MannyK | 4:64c6fc70ddb7 | 166 | #endif |
MannyK | 4:64c6fc70ddb7 | 167 | |
MannyK | 4:64c6fc70ddb7 | 168 | int rc = client->publish(ROAD_TOPIC,message); |
MannyK | 4:64c6fc70ddb7 | 169 | assert(rc == 0); |
MannyK | 4:64c6fc70ddb7 | 170 | |
MannyK | 4:64c6fc70ddb7 | 171 | client->yield(1000); |
MannyK | 4:64c6fc70ddb7 | 172 | return 0; |
MannyK | 4:64c6fc70ddb7 | 173 | } |
MannyK | 4:64c6fc70ddb7 | 174 | |
micallef25 | 2:f10d6fecb345 | 175 | void mqtt::bringup_network() { |
micallef25 | 2:f10d6fecb345 | 176 | |
micallef25 | 2:f10d6fecb345 | 177 | // |
micallef25 | 2:f10d6fecb345 | 178 | WiFiInterface* wifi = setup_wifi(); |
micallef25 | 2:f10d6fecb345 | 179 | assert(wifi != NULL); |
micallef25 | 2:f10d6fecb345 | 180 | |
micallef25 | 2:f10d6fecb345 | 181 | // Create the network object needed by the MQTT client |
micallef25 | 2:f10d6fecb345 | 182 | new_network = new MQTTNetwork(wifi); |
micallef25 | 2:f10d6fecb345 | 183 | |
micallef25 | 2:f10d6fecb345 | 184 | // get client |
micallef25 | 2:f10d6fecb345 | 185 | client = setup_mqtt(*new_network); |
micallef25 | 2:f10d6fecb345 | 186 | assert(client != NULL); |
micallef25 | 2:f10d6fecb345 | 187 | |
micallef25 | 2:f10d6fecb345 | 188 | // Subscribe to a topic / register a callback |
micallef25 | 2:f10d6fecb345 | 189 | mqtt_pc.printf("Subscribing to topic %s\r\n", CONTROL_TOPIC); |
micallef25 | 2:f10d6fecb345 | 190 | int rc = client->subscribe(CONTROL_TOPIC, MQTT::QOS1, control_message_arrived); |
micallef25 | 2:f10d6fecb345 | 191 | assert(rc == 0); |
micallef25 | 2:f10d6fecb345 | 192 | |
micallef25 | 2:f10d6fecb345 | 193 | // make a road based of mqtt id |
micallef25 | 2:f10d6fecb345 | 194 | mqtt_id = 0; |
micallef25 | 2:f10d6fecb345 | 195 | if(strcmp(wifi->get_mac_address(),"2c:3a:e8:0b:75:06") == 0){ |
micallef25 | 2:f10d6fecb345 | 196 | mqtt_id = 0; |
micallef25 | 2:f10d6fecb345 | 197 | } |
micallef25 | 2:f10d6fecb345 | 198 | else{ |
micallef25 | 2:f10d6fecb345 | 199 | mqtt_id = 1; |
micallef25 | 2:f10d6fecb345 | 200 | } |
micallef25 | 2:f10d6fecb345 | 201 | |
micallef25 | 2:f10d6fecb345 | 202 | mqtt_pc.printf("Subscribed, Setup complete!\r\n"); |
micallef25 | 2:f10d6fecb345 | 203 | } |
micallef25 | 2:f10d6fecb345 | 204 | |
micallef25 | 2:f10d6fecb345 | 205 | |
micallef25 | 2:f10d6fecb345 | 206 | // manage callbacks from mqtt |
micallef25 | 2:f10d6fecb345 | 207 | void mqtt::manage_network() |
micallef25 | 2:f10d6fecb345 | 208 | { |
micallef25 | 2:f10d6fecb345 | 209 | while(true) |
micallef25 | 2:f10d6fecb345 | 210 | { |
micallef25 | 2:f10d6fecb345 | 211 | // |
micallef25 | 2:f10d6fecb345 | 212 | osEvent evt = position_queue.get(); |
micallef25 | 2:f10d6fecb345 | 213 | assert(evt.status == osEventMessage); |
micallef25 | 2:f10d6fecb345 | 214 | |
micallef25 | 2:f10d6fecb345 | 215 | // |
micallef25 | 2:f10d6fecb345 | 216 | position_msg_t *message = (position_msg_t*)evt.value.p; |
micallef25 | 2:f10d6fecb345 | 217 | send_position_msg(message); |
MannyK | 4:64c6fc70ddb7 | 218 | |
MannyK | 4:64c6fc70ddb7 | 219 | if (!road_queue.empty()){ |
MannyK | 4:64c6fc70ddb7 | 220 | osEvent revt = road_queue.get(); |
MannyK | 4:64c6fc70ddb7 | 221 | assert(revt.status == osEventMessage); |
MannyK | 4:64c6fc70ddb7 | 222 | // |
MannyK | 4:64c6fc70ddb7 | 223 | road_msg_t *road_msg = (road_msg_t*)revt.value.p; |
MannyK | 4:64c6fc70ddb7 | 224 | send_road_msg(road_msg); |
MannyK | 4:64c6fc70ddb7 | 225 | } |
micallef25 | 2:f10d6fecb345 | 226 | // client->yield(100); |
micallef25 | 2:f10d6fecb345 | 227 | } |
micallef25 | 2:f10d6fecb345 | 228 | } |
micallef25 | 2:f10d6fecb345 | 229 | |
micallef25 | 2:f10d6fecb345 | 230 | // |
micallef25 | 2:f10d6fecb345 | 231 | // clean up goes here |
micallef25 | 2:f10d6fecb345 | 232 | void mqtt::shutdown_network() |
micallef25 | 2:f10d6fecb345 | 233 | { |
micallef25 | 2:f10d6fecb345 | 234 | // |
micallef25 | 2:f10d6fecb345 | 235 | mqtt_pc.printf("shutting down mbed\r\n"); |
micallef25 | 2:f10d6fecb345 | 236 | |
micallef25 | 2:f10d6fecb345 | 237 | // |
micallef25 | 2:f10d6fecb345 | 238 | client->disconnect(); |
micallef25 | 2:f10d6fecb345 | 239 | delete new_network; |
micallef25 | 2:f10d6fecb345 | 240 | |
micallef25 | 2:f10d6fecb345 | 241 | // |
micallef25 | 2:f10d6fecb345 | 242 | thread->terminate(); |
micallef25 | 2:f10d6fecb345 | 243 | delete thread; |
micallef25 | 2:f10d6fecb345 | 244 | } |
micallef25 | 2:f10d6fecb345 | 245 | |
micallef25 | 2:f10d6fecb345 | 246 | // launch network manager thread |
micallef25 | 2:f10d6fecb345 | 247 | // responsible for pub sub |
micallef25 | 2:f10d6fecb345 | 248 | void mqtt::setup_network() |
micallef25 | 2:f10d6fecb345 | 249 | { |
micallef25 | 2:f10d6fecb345 | 250 | // bring up network if anything bad happens we will assert and crash |
micallef25 | 2:f10d6fecb345 | 251 | bringup_network(); |
micallef25 | 2:f10d6fecb345 | 252 | |
micallef25 | 2:f10d6fecb345 | 253 | // create a new thread thats sole purpose in life is to call client yield |
micallef25 | 2:f10d6fecb345 | 254 | // what a sad life for a thread |
micallef25 | 2:f10d6fecb345 | 255 | // calling yield is necessary as we will not get any messages for the mqtt |
micallef25 | 2:f10d6fecb345 | 256 | // serverwithout it |
micallef25 | 2:f10d6fecb345 | 257 | thread = new Thread(); |
micallef25 | 2:f10d6fecb345 | 258 | assert(thread != NULL); |
micallef25 | 2:f10d6fecb345 | 259 | thread->start( callback(this,&mqtt::manage_network) ); |
micallef25 | 2:f10d6fecb345 | 260 | |
micallef25 | 2:f10d6fecb345 | 261 | } |