Emanuel Kuflik
/
smat_controller
mqtt.cpp@2:f10d6fecb345, 2019-12-06 (annotated)
- Committer:
- micallef25
- Date:
- Fri Dec 06 15:47:37 2019 +0000
- Revision:
- 2:f10d6fecb345
- Child:
- 4:64c6fc70ddb7
basic communication with mqtt and car threads
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
micallef25 | 2:f10d6fecb345 | 1 | /* mbed Microcontroller Library |
micallef25 | 2:f10d6fecb345 | 2 | * Copyright (c) 2018 ARM Limited |
micallef25 | 2:f10d6fecb345 | 3 | * SPDX-License-Identifier: Apache-2.0 |
micallef25 | 2:f10d6fecb345 | 4 | */ |
micallef25 | 2:f10d6fecb345 | 5 | |
micallef25 | 2:f10d6fecb345 | 6 | #include "mbed.h" |
micallef25 | 2:f10d6fecb345 | 7 | #include "MQTTNetwork.h" |
micallef25 | 2:f10d6fecb345 | 8 | #include "MQTTClient.h" |
micallef25 | 2:f10d6fecb345 | 9 | #include "MQTTmbed.h" |
micallef25 | 2:f10d6fecb345 | 10 | #include "mqtt.h" |
micallef25 | 2:f10d6fecb345 | 11 | #include <assert.h> |
micallef25 | 2:f10d6fecb345 | 12 | #include "AccCar.h" |
micallef25 | 2:f10d6fecb345 | 13 | |
micallef25 | 2:f10d6fecb345 | 14 | // topics interested in |
micallef25 | 2:f10d6fecb345 | 15 | #define POSITION_TOPIC "MQTT_Position0x25" |
micallef25 | 2:f10d6fecb345 | 16 | #define CONTROL_TOPIC "MQTT_Control0x25" |
micallef25 | 2:f10d6fecb345 | 17 | |
micallef25 | 2:f10d6fecb345 | 18 | #define DEBUG_MQTT |
micallef25 | 2:f10d6fecb345 | 19 | |
micallef25 | 2:f10d6fecb345 | 20 | Serial mqtt_pc (USBTX, USBRX); |
micallef25 | 2:f10d6fecb345 | 21 | |
micallef25 | 2:f10d6fecb345 | 22 | |
micallef25 | 2:f10d6fecb345 | 23 | // empty constructor |
micallef25 | 2:f10d6fecb345 | 24 | mqtt::mqtt() |
micallef25 | 2:f10d6fecb345 | 25 | { |
micallef25 | 2:f10d6fecb345 | 26 | |
micallef25 | 2:f10d6fecb345 | 27 | } |
micallef25 | 2:f10d6fecb345 | 28 | |
micallef25 | 2:f10d6fecb345 | 29 | /* |
micallef25 | 2:f10d6fecb345 | 30 | This function sets up the wifi module and connects it to the SSID |
micallef25 | 2:f10d6fecb345 | 31 | configured in the configuration file. It also prints out the MAC address |
micallef25 | 2:f10d6fecb345 | 32 | of the module, which is needed if you are trying to use campus wifi. |
micallef25 | 2:f10d6fecb345 | 33 | This function returns NULL if there are any issues. |
micallef25 | 2:f10d6fecb345 | 34 | */ |
micallef25 | 2:f10d6fecb345 | 35 | WiFiInterface* mqtt::setup_wifi() { |
micallef25 | 2:f10d6fecb345 | 36 | // Get a handle to the WiFi module |
micallef25 | 2:f10d6fecb345 | 37 | WiFiInterface* wifi = WiFiInterface::get_default_instance(); |
micallef25 | 2:f10d6fecb345 | 38 | |
micallef25 | 2:f10d6fecb345 | 39 | // Connect the module to the wifi, based on the SSID and password |
micallef25 | 2:f10d6fecb345 | 40 | // specified in the mbed_app.json configuration file |
micallef25 | 2:f10d6fecb345 | 41 | // If you are using AirPennNet-Device, this will not succeed until the MAC |
micallef25 | 2:f10d6fecb345 | 42 | // address (printed shortly after this) is registered |
micallef25 | 2:f10d6fecb345 | 43 | mqtt_pc.printf("Connecting to wifi\r\n"); |
micallef25 | 2:f10d6fecb345 | 44 | int rc = wifi->connect(MBED_CONF_APP_WIFI_SSID, MBED_CONF_APP_WIFI_PASSWORD, NSAPI_SECURITY_WPA_WPA2); |
micallef25 | 2:f10d6fecb345 | 45 | |
micallef25 | 2:f10d6fecb345 | 46 | // Print out the MAC address of the wifi module. The MAC address is |
micallef25 | 2:f10d6fecb345 | 47 | // needed to register the device with AirPennNet-Device, so that you |
micallef25 | 2:f10d6fecb345 | 48 | // can use the campus wifi |
micallef25 | 2:f10d6fecb345 | 49 | mqtt_pc.printf("MAC Address: "); |
micallef25 | 2:f10d6fecb345 | 50 | mqtt_pc.printf(wifi->get_mac_address()); |
micallef25 | 2:f10d6fecb345 | 51 | mqtt_pc.printf("\r\n"); |
micallef25 | 2:f10d6fecb345 | 52 | |
micallef25 | 2:f10d6fecb345 | 53 | if (rc != 0) { |
micallef25 | 2:f10d6fecb345 | 54 | mqtt_pc.printf("Problem connecting to wifi\r\n"); |
micallef25 | 2:f10d6fecb345 | 55 | return NULL; |
micallef25 | 2:f10d6fecb345 | 56 | } else { |
micallef25 | 2:f10d6fecb345 | 57 | mqtt_pc.printf("Wifi connected\r\n"); |
micallef25 | 2:f10d6fecb345 | 58 | } |
micallef25 | 2:f10d6fecb345 | 59 | |
micallef25 | 2:f10d6fecb345 | 60 | return wifi; |
micallef25 | 2:f10d6fecb345 | 61 | } |
micallef25 | 2:f10d6fecb345 | 62 | |
micallef25 | 2:f10d6fecb345 | 63 | /* |
micallef25 | 2:f10d6fecb345 | 64 | This function creates the MQTT client and connects it to the MQTT broker |
micallef25 | 2:f10d6fecb345 | 65 | that we have setup for the course. If there are any errors with the |
micallef25 | 2:f10d6fecb345 | 66 | connection, it will return NULL |
micallef25 | 2:f10d6fecb345 | 67 | */ |
micallef25 | 2:f10d6fecb345 | 68 | MQTT::Client<MQTTNetwork, Countdown>* mqtt::setup_mqtt(MQTTNetwork& network) { |
micallef25 | 2:f10d6fecb345 | 69 | // the hostname and port point to a Google Cloud MQTT server we setup for |
micallef25 | 2:f10d6fecb345 | 70 | // this project |
micallef25 | 2:f10d6fecb345 | 71 | const char* hostname = "34.68.206.11"; |
micallef25 | 2:f10d6fecb345 | 72 | int port = 1883; |
micallef25 | 2:f10d6fecb345 | 73 | |
micallef25 | 2:f10d6fecb345 | 74 | // Create the underlying network connection to the MQTT server |
micallef25 | 2:f10d6fecb345 | 75 | mqtt_pc.printf("Connecting to %s:%d\r\n", hostname, port); |
micallef25 | 2:f10d6fecb345 | 76 | int rc = network.connect(hostname, port); |
micallef25 | 2:f10d6fecb345 | 77 | if (rc != 0) { |
micallef25 | 2:f10d6fecb345 | 78 | mqtt_pc.printf("There was an error with the TCP connect: %d\r\n", rc); |
micallef25 | 2:f10d6fecb345 | 79 | return NULL; |
micallef25 | 2:f10d6fecb345 | 80 | } |
micallef25 | 2:f10d6fecb345 | 81 | |
micallef25 | 2:f10d6fecb345 | 82 | mqtt_pc.printf("Connected to %s:%d\r\n", hostname, port); |
micallef25 | 2:f10d6fecb345 | 83 | |
micallef25 | 2:f10d6fecb345 | 84 | // Connect the MQTT client to the server |
micallef25 | 2:f10d6fecb345 | 85 | MQTT::Client<MQTTNetwork, Countdown>* temp_client = new MQTT::Client<MQTTNetwork, Countdown>(network); |
micallef25 | 2:f10d6fecb345 | 86 | rc = temp_client->connect(); |
micallef25 | 2:f10d6fecb345 | 87 | if (rc != 0) { |
micallef25 | 2:f10d6fecb345 | 88 | mqtt_pc.printf("There was an error with the MQTT connect: %d\r\n", rc); |
micallef25 | 2:f10d6fecb345 | 89 | return NULL; |
micallef25 | 2:f10d6fecb345 | 90 | } |
micallef25 | 2:f10d6fecb345 | 91 | |
micallef25 | 2:f10d6fecb345 | 92 | mqtt_pc.printf("MQTT connect successful!\r\n"); |
micallef25 | 2:f10d6fecb345 | 93 | |
micallef25 | 2:f10d6fecb345 | 94 | return temp_client; |
micallef25 | 2:f10d6fecb345 | 95 | } |
micallef25 | 2:f10d6fecb345 | 96 | |
micallef25 | 2:f10d6fecb345 | 97 | /* |
micallef25 | 2:f10d6fecb345 | 98 | This function is the callback for when a message is received from the |
micallef25 | 2:f10d6fecb345 | 99 | MQTT broker. You register different callback functions with different |
micallef25 | 2:f10d6fecb345 | 100 | topic subscriptions |
micallef25 | 2:f10d6fecb345 | 101 | */ |
micallef25 | 2:f10d6fecb345 | 102 | void mqtt::control_message_arrived(MQTT::MessageData& md) |
micallef25 | 2:f10d6fecb345 | 103 | { |
micallef25 | 2:f10d6fecb345 | 104 | MQTT::Message &message = md.message; |
micallef25 | 2:f10d6fecb345 | 105 | // make message |
micallef25 | 2:f10d6fecb345 | 106 | control_msg_t* msg = new control_msg_t; |
micallef25 | 2:f10d6fecb345 | 107 | assert(msg != NULL && message.payloadlen == sizeof(control_msg_t)); |
micallef25 | 2:f10d6fecb345 | 108 | |
micallef25 | 2:f10d6fecb345 | 109 | // copy to our new pointer for some reason just taking the payload ptr is bad for mbed? |
micallef25 | 2:f10d6fecb345 | 110 | memcpy(msg,message.payload,message.payloadlen); |
micallef25 | 2:f10d6fecb345 | 111 | |
micallef25 | 2:f10d6fecb345 | 112 | #ifdef DEBUG_MQTT |
micallef25 | 2:f10d6fecb345 | 113 | //mqtt_pc.printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id); |
micallef25 | 2:f10d6fecb345 | 114 | mqtt_pc.printf("speed %d id %d \r\n", msg->speed, msg->car_id); |
micallef25 | 2:f10d6fecb345 | 115 | #endif |
micallef25 | 2:f10d6fecb345 | 116 | |
micallef25 | 2:f10d6fecb345 | 117 | // add our message to the queue no fucking clue what happens internally to |
micallef25 | 2:f10d6fecb345 | 118 | // the message memory |
micallef25 | 2:f10d6fecb345 | 119 | mqtt::instance()->add_to_control_queue(msg->car_id,msg); |
micallef25 | 2:f10d6fecb345 | 120 | |
micallef25 | 2:f10d6fecb345 | 121 | // free message |
micallef25 | 2:f10d6fecb345 | 122 | delete msg; |
micallef25 | 2:f10d6fecb345 | 123 | } |
micallef25 | 2:f10d6fecb345 | 124 | |
micallef25 | 2:f10d6fecb345 | 125 | /* |
micallef25 | 2:f10d6fecb345 | 126 | This function sends a message to the test topic. |
micallef25 | 2:f10d6fecb345 | 127 | */ |
micallef25 | 2:f10d6fecb345 | 128 | int mqtt::send_position_msg( position_msg_t* msg ) |
micallef25 | 2:f10d6fecb345 | 129 | { |
micallef25 | 2:f10d6fecb345 | 130 | assert(msg != NULL && client != NULL); |
micallef25 | 2:f10d6fecb345 | 131 | |
micallef25 | 2:f10d6fecb345 | 132 | MQTT::Message message; |
micallef25 | 2:f10d6fecb345 | 133 | |
micallef25 | 2:f10d6fecb345 | 134 | // might be safest to memcopy thius seems to work |
micallef25 | 2:f10d6fecb345 | 135 | //memcpy(message.payload,msg,sizeof(position_msg_t)); |
micallef25 | 2:f10d6fecb345 | 136 | message.payload = (void*)msg; |
micallef25 | 2:f10d6fecb345 | 137 | message.payloadlen = sizeof(position_msg_t); |
micallef25 | 2:f10d6fecb345 | 138 | message.qos = MQTT::QOS1; |
micallef25 | 2:f10d6fecb345 | 139 | |
micallef25 | 2:f10d6fecb345 | 140 | #ifdef DEBUG_MQTT |
micallef25 | 2:f10d6fecb345 | 141 | mqtt_pc.printf("Sending a message!\r\n"); |
micallef25 | 2:f10d6fecb345 | 142 | #endif |
micallef25 | 2:f10d6fecb345 | 143 | |
micallef25 | 2:f10d6fecb345 | 144 | int rc = client->publish(POSITION_TOPIC,message); |
micallef25 | 2:f10d6fecb345 | 145 | assert(rc == 0); |
micallef25 | 2:f10d6fecb345 | 146 | |
micallef25 | 2:f10d6fecb345 | 147 | client->yield(1000); |
micallef25 | 2:f10d6fecb345 | 148 | return 0; |
micallef25 | 2:f10d6fecb345 | 149 | } |
micallef25 | 2:f10d6fecb345 | 150 | |
micallef25 | 2:f10d6fecb345 | 151 | void mqtt::bringup_network() { |
micallef25 | 2:f10d6fecb345 | 152 | |
micallef25 | 2:f10d6fecb345 | 153 | // |
micallef25 | 2:f10d6fecb345 | 154 | WiFiInterface* wifi = setup_wifi(); |
micallef25 | 2:f10d6fecb345 | 155 | assert(wifi != NULL); |
micallef25 | 2:f10d6fecb345 | 156 | |
micallef25 | 2:f10d6fecb345 | 157 | // Create the network object needed by the MQTT client |
micallef25 | 2:f10d6fecb345 | 158 | new_network = new MQTTNetwork(wifi); |
micallef25 | 2:f10d6fecb345 | 159 | |
micallef25 | 2:f10d6fecb345 | 160 | // get client |
micallef25 | 2:f10d6fecb345 | 161 | client = setup_mqtt(*new_network); |
micallef25 | 2:f10d6fecb345 | 162 | assert(client != NULL); |
micallef25 | 2:f10d6fecb345 | 163 | |
micallef25 | 2:f10d6fecb345 | 164 | // Subscribe to a topic / register a callback |
micallef25 | 2:f10d6fecb345 | 165 | mqtt_pc.printf("Subscribing to topic %s\r\n", CONTROL_TOPIC); |
micallef25 | 2:f10d6fecb345 | 166 | int rc = client->subscribe(CONTROL_TOPIC, MQTT::QOS1, control_message_arrived); |
micallef25 | 2:f10d6fecb345 | 167 | assert(rc == 0); |
micallef25 | 2:f10d6fecb345 | 168 | |
micallef25 | 2:f10d6fecb345 | 169 | // make a road based of mqtt id |
micallef25 | 2:f10d6fecb345 | 170 | mqtt_id = 0; |
micallef25 | 2:f10d6fecb345 | 171 | if(strcmp(wifi->get_mac_address(),"2c:3a:e8:0b:75:06") == 0){ |
micallef25 | 2:f10d6fecb345 | 172 | mqtt_id = 0; |
micallef25 | 2:f10d6fecb345 | 173 | } |
micallef25 | 2:f10d6fecb345 | 174 | else{ |
micallef25 | 2:f10d6fecb345 | 175 | mqtt_id = 1; |
micallef25 | 2:f10d6fecb345 | 176 | } |
micallef25 | 2:f10d6fecb345 | 177 | |
micallef25 | 2:f10d6fecb345 | 178 | mqtt_pc.printf("Subscribed, Setup complete!\r\n"); |
micallef25 | 2:f10d6fecb345 | 179 | } |
micallef25 | 2:f10d6fecb345 | 180 | |
micallef25 | 2:f10d6fecb345 | 181 | |
micallef25 | 2:f10d6fecb345 | 182 | // manage callbacks from mqtt |
micallef25 | 2:f10d6fecb345 | 183 | void mqtt::manage_network() |
micallef25 | 2:f10d6fecb345 | 184 | { |
micallef25 | 2:f10d6fecb345 | 185 | while(true) |
micallef25 | 2:f10d6fecb345 | 186 | { |
micallef25 | 2:f10d6fecb345 | 187 | // |
micallef25 | 2:f10d6fecb345 | 188 | osEvent evt = position_queue.get(); |
micallef25 | 2:f10d6fecb345 | 189 | assert(evt.status == osEventMessage); |
micallef25 | 2:f10d6fecb345 | 190 | |
micallef25 | 2:f10d6fecb345 | 191 | // |
micallef25 | 2:f10d6fecb345 | 192 | position_msg_t *message = (position_msg_t*)evt.value.p; |
micallef25 | 2:f10d6fecb345 | 193 | send_position_msg(message); |
micallef25 | 2:f10d6fecb345 | 194 | |
micallef25 | 2:f10d6fecb345 | 195 | // client->yield(100); |
micallef25 | 2:f10d6fecb345 | 196 | } |
micallef25 | 2:f10d6fecb345 | 197 | } |
micallef25 | 2:f10d6fecb345 | 198 | |
micallef25 | 2:f10d6fecb345 | 199 | // |
micallef25 | 2:f10d6fecb345 | 200 | // clean up goes here |
micallef25 | 2:f10d6fecb345 | 201 | void mqtt::shutdown_network() |
micallef25 | 2:f10d6fecb345 | 202 | { |
micallef25 | 2:f10d6fecb345 | 203 | // |
micallef25 | 2:f10d6fecb345 | 204 | mqtt_pc.printf("shutting down mbed\r\n"); |
micallef25 | 2:f10d6fecb345 | 205 | |
micallef25 | 2:f10d6fecb345 | 206 | // |
micallef25 | 2:f10d6fecb345 | 207 | client->disconnect(); |
micallef25 | 2:f10d6fecb345 | 208 | delete new_network; |
micallef25 | 2:f10d6fecb345 | 209 | |
micallef25 | 2:f10d6fecb345 | 210 | // |
micallef25 | 2:f10d6fecb345 | 211 | thread->terminate(); |
micallef25 | 2:f10d6fecb345 | 212 | delete thread; |
micallef25 | 2:f10d6fecb345 | 213 | } |
micallef25 | 2:f10d6fecb345 | 214 | |
micallef25 | 2:f10d6fecb345 | 215 | // launch network manager thread |
micallef25 | 2:f10d6fecb345 | 216 | // responsible for pub sub |
micallef25 | 2:f10d6fecb345 | 217 | void mqtt::setup_network() |
micallef25 | 2:f10d6fecb345 | 218 | { |
micallef25 | 2:f10d6fecb345 | 219 | // bring up network if anything bad happens we will assert and crash |
micallef25 | 2:f10d6fecb345 | 220 | bringup_network(); |
micallef25 | 2:f10d6fecb345 | 221 | |
micallef25 | 2:f10d6fecb345 | 222 | // create a new thread thats sole purpose in life is to call client yield |
micallef25 | 2:f10d6fecb345 | 223 | // what a sad life for a thread |
micallef25 | 2:f10d6fecb345 | 224 | // calling yield is necessary as we will not get any messages for the mqtt |
micallef25 | 2:f10d6fecb345 | 225 | // serverwithout it |
micallef25 | 2:f10d6fecb345 | 226 | thread = new Thread(); |
micallef25 | 2:f10d6fecb345 | 227 | assert(thread != NULL); |
micallef25 | 2:f10d6fecb345 | 228 | thread->start( callback(this,&mqtt::manage_network) ); |
micallef25 | 2:f10d6fecb345 | 229 | |
micallef25 | 2:f10d6fecb345 | 230 | } |