Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
mqtt.cpp@7:fd8e0604faaa, 2019-12-12 (annotated)
- Committer:
- micallef25
- Date:
- Thu Dec 12 17:25:27 2019 +0000
- Revision:
- 7:fd8e0604faaa
- Parent:
- 6:6cb13ac483e0
stable, passed 1000 simulation regression
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
micallef25 | 2:f10d6fecb345 | 1 | /* mbed Microcontroller Library |
micallef25 | 2:f10d6fecb345 | 2 | * Copyright (c) 2018 ARM Limited |
micallef25 | 2:f10d6fecb345 | 3 | * SPDX-License-Identifier: Apache-2.0 |
micallef25 | 2:f10d6fecb345 | 4 | */ |
micallef25 | 2:f10d6fecb345 | 5 | |
micallef25 | 2:f10d6fecb345 | 6 | #include "mbed.h" |
micallef25 | 2:f10d6fecb345 | 7 | #include "MQTTNetwork.h" |
micallef25 | 2:f10d6fecb345 | 8 | #include "MQTTClient.h" |
micallef25 | 2:f10d6fecb345 | 9 | #include "MQTTmbed.h" |
micallef25 | 2:f10d6fecb345 | 10 | #include "mqtt.h" |
micallef25 | 2:f10d6fecb345 | 11 | #include <assert.h> |
micallef25 | 2:f10d6fecb345 | 12 | #include "AccCar.h" |
micallef25 | 2:f10d6fecb345 | 13 | |
micallef25 | 2:f10d6fecb345 | 14 | // topics interested in |
micallef25 | 2:f10d6fecb345 | 15 | #define POSITION_TOPIC "MQTT_Position0x25" |
MannyK | 4:64c6fc70ddb7 | 16 | #define ROAD_TOPIC "MQTT_Road0x25" |
micallef25 | 2:f10d6fecb345 | 17 | #define CONTROL_TOPIC "MQTT_Control0x25" |
micallef25 | 2:f10d6fecb345 | 18 | |
micallef25 | 5:e0d8e5e922f1 | 19 | //#define DEBUG_MQTT |
micallef25 | 5:e0d8e5e922f1 | 20 | #define MQTT_TAG "[MQTT] " |
micallef25 | 2:f10d6fecb345 | 21 | |
micallef25 | 2:f10d6fecb345 | 22 | Serial mqtt_pc (USBTX, USBRX); |
micallef25 | 2:f10d6fecb345 | 23 | |
micallef25 | 2:f10d6fecb345 | 24 | |
micallef25 | 2:f10d6fecb345 | 25 | // empty constructor |
micallef25 | 2:f10d6fecb345 | 26 | mqtt::mqtt() |
micallef25 | 2:f10d6fecb345 | 27 | { |
micallef25 | 2:f10d6fecb345 | 28 | |
micallef25 | 2:f10d6fecb345 | 29 | } |
micallef25 | 2:f10d6fecb345 | 30 | |
micallef25 | 2:f10d6fecb345 | 31 | /* |
micallef25 | 2:f10d6fecb345 | 32 | This function sets up the wifi module and connects it to the SSID |
micallef25 | 2:f10d6fecb345 | 33 | configured in the configuration file. It also prints out the MAC address |
micallef25 | 2:f10d6fecb345 | 34 | of the module, which is needed if you are trying to use campus wifi. |
micallef25 | 2:f10d6fecb345 | 35 | This function returns NULL if there are any issues. |
micallef25 | 2:f10d6fecb345 | 36 | */ |
micallef25 | 2:f10d6fecb345 | 37 | WiFiInterface* mqtt::setup_wifi() { |
micallef25 | 2:f10d6fecb345 | 38 | // Get a handle to the WiFi module |
micallef25 | 2:f10d6fecb345 | 39 | WiFiInterface* wifi = WiFiInterface::get_default_instance(); |
micallef25 | 2:f10d6fecb345 | 40 | |
micallef25 | 2:f10d6fecb345 | 41 | // Connect the module to the wifi, based on the SSID and password |
micallef25 | 2:f10d6fecb345 | 42 | // specified in the mbed_app.json configuration file |
micallef25 | 2:f10d6fecb345 | 43 | // If you are using AirPennNet-Device, this will not succeed until the MAC |
micallef25 | 2:f10d6fecb345 | 44 | // address (printed shortly after this) is registered |
micallef25 | 5:e0d8e5e922f1 | 45 | mqtt_pc.printf(MQTT_TAG"Connecting to wifi "DELIM); |
micallef25 | 2:f10d6fecb345 | 46 | int rc = wifi->connect(MBED_CONF_APP_WIFI_SSID, MBED_CONF_APP_WIFI_PASSWORD, NSAPI_SECURITY_WPA_WPA2); |
micallef25 | 2:f10d6fecb345 | 47 | |
micallef25 | 2:f10d6fecb345 | 48 | // Print out the MAC address of the wifi module. The MAC address is |
micallef25 | 2:f10d6fecb345 | 49 | // needed to register the device with AirPennNet-Device, so that you |
micallef25 | 2:f10d6fecb345 | 50 | // can use the campus wifi |
micallef25 | 5:e0d8e5e922f1 | 51 | mqtt_pc.printf(MQTT_TAG"MAC Address: "); |
micallef25 | 2:f10d6fecb345 | 52 | mqtt_pc.printf(wifi->get_mac_address()); |
micallef25 | 2:f10d6fecb345 | 53 | mqtt_pc.printf("\r\n"); |
micallef25 | 2:f10d6fecb345 | 54 | |
micallef25 | 2:f10d6fecb345 | 55 | if (rc != 0) { |
micallef25 | 5:e0d8e5e922f1 | 56 | mqtt_pc.printf(MQTT_TAG"Problem connecting to wifi "DELIM); |
micallef25 | 2:f10d6fecb345 | 57 | return NULL; |
micallef25 | 2:f10d6fecb345 | 58 | } else { |
micallef25 | 5:e0d8e5e922f1 | 59 | mqtt_pc.printf(MQTT_TAG"Wifi connected "DELIM); |
micallef25 | 2:f10d6fecb345 | 60 | } |
micallef25 | 2:f10d6fecb345 | 61 | |
micallef25 | 2:f10d6fecb345 | 62 | return wifi; |
micallef25 | 2:f10d6fecb345 | 63 | } |
micallef25 | 2:f10d6fecb345 | 64 | |
micallef25 | 2:f10d6fecb345 | 65 | /* |
micallef25 | 2:f10d6fecb345 | 66 | This function creates the MQTT client and connects it to the MQTT broker |
micallef25 | 2:f10d6fecb345 | 67 | that we have setup for the course. If there are any errors with the |
micallef25 | 2:f10d6fecb345 | 68 | connection, it will return NULL |
micallef25 | 2:f10d6fecb345 | 69 | */ |
micallef25 | 2:f10d6fecb345 | 70 | MQTT::Client<MQTTNetwork, Countdown>* mqtt::setup_mqtt(MQTTNetwork& network) { |
micallef25 | 2:f10d6fecb345 | 71 | // the hostname and port point to a Google Cloud MQTT server we setup for |
micallef25 | 2:f10d6fecb345 | 72 | // this project |
micallef25 | 2:f10d6fecb345 | 73 | const char* hostname = "34.68.206.11"; |
micallef25 | 2:f10d6fecb345 | 74 | int port = 1883; |
micallef25 | 2:f10d6fecb345 | 75 | |
micallef25 | 2:f10d6fecb345 | 76 | // Create the underlying network connection to the MQTT server |
micallef25 | 5:e0d8e5e922f1 | 77 | mqtt_pc.printf(MQTT_TAG"Connecting to %s:%d "DELIM, hostname, port); |
micallef25 | 2:f10d6fecb345 | 78 | int rc = network.connect(hostname, port); |
micallef25 | 2:f10d6fecb345 | 79 | if (rc != 0) { |
micallef25 | 5:e0d8e5e922f1 | 80 | mqtt_pc.printf(MQTT_TAG"There was an error with the TCP connect: %d "DELIM, rc); |
micallef25 | 2:f10d6fecb345 | 81 | return NULL; |
micallef25 | 2:f10d6fecb345 | 82 | } |
micallef25 | 2:f10d6fecb345 | 83 | |
micallef25 | 5:e0d8e5e922f1 | 84 | mqtt_pc.printf(MQTT_TAG"Connected to %s:%d "DELIM, hostname, port); |
micallef25 | 2:f10d6fecb345 | 85 | |
micallef25 | 2:f10d6fecb345 | 86 | // Connect the MQTT client to the server |
micallef25 | 2:f10d6fecb345 | 87 | MQTT::Client<MQTTNetwork, Countdown>* temp_client = new MQTT::Client<MQTTNetwork, Countdown>(network); |
micallef25 | 2:f10d6fecb345 | 88 | rc = temp_client->connect(); |
micallef25 | 2:f10d6fecb345 | 89 | if (rc != 0) { |
micallef25 | 5:e0d8e5e922f1 | 90 | mqtt_pc.printf(MQTT_TAG"There was an error with the MQTT connect: %d "DELIM, rc); |
micallef25 | 2:f10d6fecb345 | 91 | return NULL; |
micallef25 | 2:f10d6fecb345 | 92 | } |
micallef25 | 2:f10d6fecb345 | 93 | |
micallef25 | 5:e0d8e5e922f1 | 94 | mqtt_pc.printf(MQTT_TAG"MQTT connect successful! "DELIM); |
micallef25 | 2:f10d6fecb345 | 95 | |
micallef25 | 2:f10d6fecb345 | 96 | return temp_client; |
micallef25 | 2:f10d6fecb345 | 97 | } |
micallef25 | 2:f10d6fecb345 | 98 | |
micallef25 | 2:f10d6fecb345 | 99 | /* |
micallef25 | 2:f10d6fecb345 | 100 | This function is the callback for when a message is received from the |
micallef25 | 2:f10d6fecb345 | 101 | MQTT broker. You register different callback functions with different |
micallef25 | 2:f10d6fecb345 | 102 | topic subscriptions |
micallef25 | 2:f10d6fecb345 | 103 | */ |
micallef25 | 2:f10d6fecb345 | 104 | void mqtt::control_message_arrived(MQTT::MessageData& md) |
micallef25 | 2:f10d6fecb345 | 105 | { |
micallef25 | 2:f10d6fecb345 | 106 | MQTT::Message &message = md.message; |
micallef25 | 5:e0d8e5e922f1 | 107 | // make message receiver responsible for freeeing |
micallef25 | 2:f10d6fecb345 | 108 | control_msg_t* msg = new control_msg_t; |
micallef25 | 2:f10d6fecb345 | 109 | assert(msg != NULL && message.payloadlen == sizeof(control_msg_t)); |
micallef25 | 2:f10d6fecb345 | 110 | |
micallef25 | 2:f10d6fecb345 | 111 | // copy to our new pointer for some reason just taking the payload ptr is bad for mbed? |
micallef25 | 2:f10d6fecb345 | 112 | memcpy(msg,message.payload,message.payloadlen); |
micallef25 | 5:e0d8e5e922f1 | 113 | |
micallef25 | 5:e0d8e5e922f1 | 114 | if( msg->road_id == mqtt::instance()->mqtt_id ) |
micallef25 | 5:e0d8e5e922f1 | 115 | { |
micallef25 | 2:f10d6fecb345 | 116 | #ifdef DEBUG_MQTT |
micallef25 | 2:f10d6fecb345 | 117 | //mqtt_pc.printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id); |
micallef25 | 5:e0d8e5e922f1 | 118 | mqtt_pc.printf(MQTT_TAG"rcvd control id %d "DELIM,msg->road_id); |
micallef25 | 2:f10d6fecb345 | 119 | #endif |
micallef25 | 5:e0d8e5e922f1 | 120 | // add our message to the queue no fucking clue what happens internally to |
micallef25 | 5:e0d8e5e922f1 | 121 | // the message memory thanks mbed os 5 documentation |
micallef25 | 5:e0d8e5e922f1 | 122 | mqtt::instance()->add_to_control_queue(msg->car_id,msg); |
micallef25 | 5:e0d8e5e922f1 | 123 | } |
micallef25 | 5:e0d8e5e922f1 | 124 | |
micallef25 | 5:e0d8e5e922f1 | 125 | else |
micallef25 | 5:e0d8e5e922f1 | 126 | { |
micallef25 | 5:e0d8e5e922f1 | 127 | #ifdef DEBUG_MQTT |
micallef25 | 5:e0d8e5e922f1 | 128 | mqtt_pc.printf(MQTT_TAG"ignoring control msg "DELIM,msg->road_id); |
micallef25 | 5:e0d8e5e922f1 | 129 | #endif |
micallef25 | 5:e0d8e5e922f1 | 130 | delete msg; |
micallef25 | 5:e0d8e5e922f1 | 131 | } |
micallef25 | 5:e0d8e5e922f1 | 132 | } |
micallef25 | 2:f10d6fecb345 | 133 | |
micallef25 | 5:e0d8e5e922f1 | 134 | void mqtt::road_message_arrived(MQTT::MessageData& md) |
micallef25 | 5:e0d8e5e922f1 | 135 | { |
micallef25 | 5:e0d8e5e922f1 | 136 | MQTT::Message &message = md.message; |
micallef25 | 5:e0d8e5e922f1 | 137 | // make message... I will endeavor to free this one the receiver side |
micallef25 | 5:e0d8e5e922f1 | 138 | // from example sender allocates and receiver frees so well stick |
micallef25 | 5:e0d8e5e922f1 | 139 | // with that paradigm |
micallef25 | 5:e0d8e5e922f1 | 140 | road_msg_t* msg = new road_msg_t; |
micallef25 | 5:e0d8e5e922f1 | 141 | assert(msg != NULL && message.payloadlen == sizeof(road_msg_t)); |
micallef25 | 5:e0d8e5e922f1 | 142 | |
micallef25 | 5:e0d8e5e922f1 | 143 | // copy to our new pointer for some reason just taking the payload ptr is bad for mbed? |
micallef25 | 5:e0d8e5e922f1 | 144 | memcpy(msg,message.payload,message.payloadlen); |
micallef25 | 2:f10d6fecb345 | 145 | |
micallef25 | 5:e0d8e5e922f1 | 146 | if( msg->road_id != mqtt::instance()->mqtt_id ) |
micallef25 | 5:e0d8e5e922f1 | 147 | { |
micallef25 | 5:e0d8e5e922f1 | 148 | #ifdef DEBUG_MQTT |
micallef25 | 7:fd8e0604faaa | 149 | mqtt_pc.printf(MQTT_TAG"rcvd road %d "DELIM,msg->road_clock); |
micallef25 | 5:e0d8e5e922f1 | 150 | #endif |
micallef25 | 5:e0d8e5e922f1 | 151 | // add our message to the queue no fucking clue what happens internally to |
micallef25 | 5:e0d8e5e922f1 | 152 | // the message memory thanks mbed os 5 documentation |
micallef25 | 5:e0d8e5e922f1 | 153 | mqtt::instance()->add_to_network_to_road_queue(msg); |
micallef25 | 5:e0d8e5e922f1 | 154 | } |
micallef25 | 5:e0d8e5e922f1 | 155 | |
micallef25 | 5:e0d8e5e922f1 | 156 | else{ |
micallef25 | 5:e0d8e5e922f1 | 157 | #ifdef DEBUG_MQTT |
micallef25 | 5:e0d8e5e922f1 | 158 | mqtt_pc.printf(MQTT_TAG"ignoring road message %d "DELIM,msg->road_id); |
micallef25 | 5:e0d8e5e922f1 | 159 | #endif |
micallef25 | 5:e0d8e5e922f1 | 160 | delete msg; |
micallef25 | 5:e0d8e5e922f1 | 161 | } |
micallef25 | 5:e0d8e5e922f1 | 162 | |
micallef25 | 2:f10d6fecb345 | 163 | } |
micallef25 | 2:f10d6fecb345 | 164 | |
micallef25 | 2:f10d6fecb345 | 165 | /* |
micallef25 | 2:f10d6fecb345 | 166 | This function sends a message to the test topic. |
micallef25 | 2:f10d6fecb345 | 167 | */ |
micallef25 | 5:e0d8e5e922f1 | 168 | int mqtt::send_position_msg( MQTT::Message& message, position_msg_t* msg ) |
micallef25 | 2:f10d6fecb345 | 169 | { |
micallef25 | 2:f10d6fecb345 | 170 | assert(msg != NULL && client != NULL); |
micallef25 | 2:f10d6fecb345 | 171 | |
micallef25 | 5:e0d8e5e922f1 | 172 | #ifdef DEBUG_MQTT |
micallef25 | 5:e0d8e5e922f1 | 173 | mqtt_pc.printf(MQTT_TAG"sending position msg %d "DELIM,msg->road_id); |
micallef25 | 5:e0d8e5e922f1 | 174 | #endif |
micallef25 | 7:fd8e0604faaa | 175 | |
micallef25 | 7:fd8e0604faaa | 176 | MQTT::Message tmessage; |
micallef25 | 2:f10d6fecb345 | 177 | |
micallef25 | 7:fd8e0604faaa | 178 | tmessage.payload = (void*)msg; |
micallef25 | 7:fd8e0604faaa | 179 | tmessage.payloadlen = sizeof(position_msg_t); |
micallef25 | 7:fd8e0604faaa | 180 | tmessage.qos = MQTT::QOS1; |
micallef25 | 2:f10d6fecb345 | 181 | |
micallef25 | 7:fd8e0604faaa | 182 | int rc = client->publish(POSITION_TOPIC,tmessage); |
micallef25 | 2:f10d6fecb345 | 183 | assert(rc == 0); |
micallef25 | 2:f10d6fecb345 | 184 | |
micallef25 | 2:f10d6fecb345 | 185 | return 0; |
micallef25 | 2:f10d6fecb345 | 186 | } |
micallef25 | 2:f10d6fecb345 | 187 | |
micallef25 | 5:e0d8e5e922f1 | 188 | int mqtt::send_road_msg( MQTT::Message& message, road_msg_t* msg ) |
MannyK | 4:64c6fc70ddb7 | 189 | { |
MannyK | 4:64c6fc70ddb7 | 190 | assert(msg != NULL && client != NULL); |
MannyK | 4:64c6fc70ddb7 | 191 | |
micallef25 | 5:e0d8e5e922f1 | 192 | #ifdef DEBUG_MQTT |
micallef25 | 7:fd8e0604faaa | 193 | mqtt_pc.printf(MQTT_TAG"sending road msg %d "DELIM,msg->road_clock); |
micallef25 | 5:e0d8e5e922f1 | 194 | #endif |
micallef25 | 7:fd8e0604faaa | 195 | MQTT::Message tmessage; |
MannyK | 4:64c6fc70ddb7 | 196 | |
micallef25 | 7:fd8e0604faaa | 197 | tmessage.payload = (void*)msg; |
micallef25 | 7:fd8e0604faaa | 198 | tmessage.payloadlen = sizeof(road_msg_t); |
micallef25 | 7:fd8e0604faaa | 199 | tmessage.qos = MQTT::QOS1; |
MannyK | 4:64c6fc70ddb7 | 200 | |
micallef25 | 7:fd8e0604faaa | 201 | int rc = client->publish(ROAD_TOPIC,tmessage); |
MannyK | 4:64c6fc70ddb7 | 202 | assert(rc == 0); |
micallef25 | 6:6cb13ac483e0 | 203 | |
MannyK | 4:64c6fc70ddb7 | 204 | return 0; |
MannyK | 4:64c6fc70ddb7 | 205 | } |
MannyK | 4:64c6fc70ddb7 | 206 | |
micallef25 | 2:f10d6fecb345 | 207 | void mqtt::bringup_network() { |
micallef25 | 2:f10d6fecb345 | 208 | |
micallef25 | 2:f10d6fecb345 | 209 | // |
micallef25 | 2:f10d6fecb345 | 210 | WiFiInterface* wifi = setup_wifi(); |
micallef25 | 2:f10d6fecb345 | 211 | assert(wifi != NULL); |
micallef25 | 2:f10d6fecb345 | 212 | |
micallef25 | 2:f10d6fecb345 | 213 | // Create the network object needed by the MQTT client |
micallef25 | 2:f10d6fecb345 | 214 | new_network = new MQTTNetwork(wifi); |
micallef25 | 2:f10d6fecb345 | 215 | |
micallef25 | 2:f10d6fecb345 | 216 | // get client |
micallef25 | 2:f10d6fecb345 | 217 | client = setup_mqtt(*new_network); |
micallef25 | 2:f10d6fecb345 | 218 | assert(client != NULL); |
micallef25 | 2:f10d6fecb345 | 219 | |
micallef25 | 2:f10d6fecb345 | 220 | // Subscribe to a topic / register a callback |
micallef25 | 5:e0d8e5e922f1 | 221 | mqtt_pc.printf(MQTT_TAG"Subscribing to topic %s "DELIM, CONTROL_TOPIC); |
micallef25 | 2:f10d6fecb345 | 222 | int rc = client->subscribe(CONTROL_TOPIC, MQTT::QOS1, control_message_arrived); |
micallef25 | 2:f10d6fecb345 | 223 | assert(rc == 0); |
micallef25 | 2:f10d6fecb345 | 224 | |
micallef25 | 5:e0d8e5e922f1 | 225 | // Subscribe to a topic / register a callback |
micallef25 | 5:e0d8e5e922f1 | 226 | mqtt_pc.printf(MQTT_TAG"Subscribing to topic %s "DELIM, ROAD_TOPIC); |
micallef25 | 5:e0d8e5e922f1 | 227 | rc = client->subscribe(ROAD_TOPIC, MQTT::QOS1, road_message_arrived); |
micallef25 | 5:e0d8e5e922f1 | 228 | assert(rc == 0); |
micallef25 | 5:e0d8e5e922f1 | 229 | |
micallef25 | 2:f10d6fecb345 | 230 | // make a road based of mqtt id |
micallef25 | 2:f10d6fecb345 | 231 | mqtt_id = 0; |
micallef25 | 7:fd8e0604faaa | 232 | if(strcmp(wifi->get_mac_address(),"2c:3a:e8:0b:8e:77") == 0){ |
micallef25 | 2:f10d6fecb345 | 233 | mqtt_id = 0; |
micallef25 | 2:f10d6fecb345 | 234 | } |
micallef25 | 2:f10d6fecb345 | 235 | else{ |
micallef25 | 2:f10d6fecb345 | 236 | mqtt_id = 1; |
micallef25 | 2:f10d6fecb345 | 237 | } |
micallef25 | 2:f10d6fecb345 | 238 | |
micallef25 | 5:e0d8e5e922f1 | 239 | mqtt_pc.printf(MQTT_TAG"Subscribed, Setup complete! road %d "DELIM,mqtt_id); |
micallef25 | 2:f10d6fecb345 | 240 | } |
micallef25 | 2:f10d6fecb345 | 241 | |
micallef25 | 2:f10d6fecb345 | 242 | |
micallef25 | 2:f10d6fecb345 | 243 | // manage callbacks from mqtt |
micallef25 | 2:f10d6fecb345 | 244 | void mqtt::manage_network() |
micallef25 | 2:f10d6fecb345 | 245 | { |
micallef25 | 5:e0d8e5e922f1 | 246 | MQTT::Message mqtt_message; |
micallef25 | 2:f10d6fecb345 | 247 | while(true) |
micallef25 | 2:f10d6fecb345 | 248 | { |
micallef25 | 5:e0d8e5e922f1 | 249 | while(!position_queue.empty()) |
micallef25 | 5:e0d8e5e922f1 | 250 | { |
micallef25 | 5:e0d8e5e922f1 | 251 | |
micallef25 | 5:e0d8e5e922f1 | 252 | //#ifdef DEBUG_MQTT |
micallef25 | 5:e0d8e5e922f1 | 253 | // mqtt_pc.printf(MQTT_TAG"getting position msg\r\n"); |
micallef25 | 5:e0d8e5e922f1 | 254 | //#endif |
micallef25 | 5:e0d8e5e922f1 | 255 | // |
micallef25 | 5:e0d8e5e922f1 | 256 | osEvent evt = position_queue.get(); |
micallef25 | 5:e0d8e5e922f1 | 257 | assert(evt.status == osEventMessage); |
micallef25 | 2:f10d6fecb345 | 258 | |
micallef25 | 5:e0d8e5e922f1 | 259 | // |
micallef25 | 5:e0d8e5e922f1 | 260 | position_msg_t *message = (position_msg_t*)evt.value.p; |
micallef25 | 5:e0d8e5e922f1 | 261 | assert(message != NULL); |
micallef25 | 5:e0d8e5e922f1 | 262 | send_position_msg(mqtt_message,message); |
micallef25 | 5:e0d8e5e922f1 | 263 | } |
MannyK | 4:64c6fc70ddb7 | 264 | |
micallef25 | 5:e0d8e5e922f1 | 265 | if (!road_to_network_queue.empty()) |
micallef25 | 5:e0d8e5e922f1 | 266 | { |
micallef25 | 5:e0d8e5e922f1 | 267 | |
micallef25 | 5:e0d8e5e922f1 | 268 | //#ifdef DEBUG_MQTT |
micallef25 | 5:e0d8e5e922f1 | 269 | // mqtt_pc.printf(MQTT_TAG"getting road info\r\n"); |
micallef25 | 5:e0d8e5e922f1 | 270 | //#endif |
micallef25 | 5:e0d8e5e922f1 | 271 | osEvent revt = road_to_network_queue.get(); |
MannyK | 4:64c6fc70ddb7 | 272 | assert(revt.status == osEventMessage); |
micallef25 | 5:e0d8e5e922f1 | 273 | |
micallef25 | 5:e0d8e5e922f1 | 274 | // |
MannyK | 4:64c6fc70ddb7 | 275 | road_msg_t *road_msg = (road_msg_t*)revt.value.p; |
micallef25 | 5:e0d8e5e922f1 | 276 | assert(road_msg != NULL); |
micallef25 | 5:e0d8e5e922f1 | 277 | send_road_msg(mqtt_message,road_msg); |
micallef25 | 7:fd8e0604faaa | 278 | delete road_msg; |
MannyK | 4:64c6fc70ddb7 | 279 | } |
micallef25 | 5:e0d8e5e922f1 | 280 | |
micallef25 | 5:e0d8e5e922f1 | 281 | client->yield(10); |
micallef25 | 2:f10d6fecb345 | 282 | } |
micallef25 | 2:f10d6fecb345 | 283 | } |
micallef25 | 2:f10d6fecb345 | 284 | |
micallef25 | 2:f10d6fecb345 | 285 | // |
micallef25 | 2:f10d6fecb345 | 286 | // clean up goes here |
micallef25 | 2:f10d6fecb345 | 287 | void mqtt::shutdown_network() |
micallef25 | 2:f10d6fecb345 | 288 | { |
micallef25 | 2:f10d6fecb345 | 289 | // |
micallef25 | 5:e0d8e5e922f1 | 290 | mqtt_pc.printf(MQTT_TAG"shutting down mbed "DELIM); |
micallef25 | 2:f10d6fecb345 | 291 | |
micallef25 | 2:f10d6fecb345 | 292 | // |
micallef25 | 2:f10d6fecb345 | 293 | client->disconnect(); |
micallef25 | 2:f10d6fecb345 | 294 | delete new_network; |
micallef25 | 2:f10d6fecb345 | 295 | |
micallef25 | 2:f10d6fecb345 | 296 | // |
micallef25 | 2:f10d6fecb345 | 297 | thread->terminate(); |
micallef25 | 2:f10d6fecb345 | 298 | delete thread; |
micallef25 | 2:f10d6fecb345 | 299 | } |
micallef25 | 2:f10d6fecb345 | 300 | |
micallef25 | 2:f10d6fecb345 | 301 | // launch network manager thread |
micallef25 | 2:f10d6fecb345 | 302 | // responsible for pub sub |
micallef25 | 2:f10d6fecb345 | 303 | void mqtt::setup_network() |
micallef25 | 2:f10d6fecb345 | 304 | { |
micallef25 | 2:f10d6fecb345 | 305 | // bring up network if anything bad happens we will assert and crash |
micallef25 | 2:f10d6fecb345 | 306 | bringup_network(); |
micallef25 | 2:f10d6fecb345 | 307 | |
micallef25 | 2:f10d6fecb345 | 308 | // create a new thread thats sole purpose in life is to call client yield |
micallef25 | 2:f10d6fecb345 | 309 | // what a sad life for a thread |
micallef25 | 2:f10d6fecb345 | 310 | // calling yield is necessary as we will not get any messages for the mqtt |
micallef25 | 2:f10d6fecb345 | 311 | // serverwithout it |
micallef25 | 2:f10d6fecb345 | 312 | thread = new Thread(); |
micallef25 | 2:f10d6fecb345 | 313 | assert(thread != NULL); |
micallef25 | 2:f10d6fecb345 | 314 | thread->start( callback(this,&mqtt::manage_network) ); |
micallef25 | 2:f10d6fecb345 | 315 | |
micallef25 | 7:fd8e0604faaa | 316 | } |
micallef25 | 7:fd8e0604faaa | 317 | |
micallef25 | 7:fd8e0604faaa | 318 | // before the next run make sure |
micallef25 | 7:fd8e0604faaa | 319 | // that everything is cleaned up properly |
micallef25 | 7:fd8e0604faaa | 320 | // the control queue may have one entry due to |
micallef25 | 7:fd8e0604faaa | 321 | // the last iteration ending |
micallef25 | 7:fd8e0604faaa | 322 | void mqtt::clear_queues() |
micallef25 | 7:fd8e0604faaa | 323 | { |
micallef25 | 7:fd8e0604faaa | 324 | while(!position_queue.empty()) |
micallef25 | 7:fd8e0604faaa | 325 | { |
micallef25 | 7:fd8e0604faaa | 326 | osEvent revt = position_queue.get(); |
micallef25 | 7:fd8e0604faaa | 327 | assert(revt.status == osEventMessage); |
micallef25 | 7:fd8e0604faaa | 328 | //printf("cleaned up queue\n"); |
micallef25 | 7:fd8e0604faaa | 329 | // |
micallef25 | 7:fd8e0604faaa | 330 | position_msg_t *msg = (position_msg_t*)revt.value.p; |
micallef25 | 7:fd8e0604faaa | 331 | delete msg; |
micallef25 | 7:fd8e0604faaa | 332 | } |
micallef25 | 7:fd8e0604faaa | 333 | for(int i = 0; i < 5; i++) |
micallef25 | 7:fd8e0604faaa | 334 | { |
micallef25 | 7:fd8e0604faaa | 335 | while(!control_queue[i].empty()) |
micallef25 | 7:fd8e0604faaa | 336 | { |
micallef25 | 7:fd8e0604faaa | 337 | osEvent revt = control_queue[i].get(); |
micallef25 | 7:fd8e0604faaa | 338 | assert(revt.status == osEventMessage); |
micallef25 | 7:fd8e0604faaa | 339 | //printf("cleaned up control queue\n"); |
micallef25 | 7:fd8e0604faaa | 340 | // |
micallef25 | 7:fd8e0604faaa | 341 | control_msg_t *msg = (control_msg_t*)revt.value.p; |
micallef25 | 7:fd8e0604faaa | 342 | delete msg; |
micallef25 | 7:fd8e0604faaa | 343 | } |
micallef25 | 7:fd8e0604faaa | 344 | } |
micallef25 | 7:fd8e0604faaa | 345 | |
micallef25 | 7:fd8e0604faaa | 346 | while(!network_to_road_queue.empty()) |
micallef25 | 7:fd8e0604faaa | 347 | { |
micallef25 | 7:fd8e0604faaa | 348 | osEvent evt = network_to_road_queue.get(); |
micallef25 | 7:fd8e0604faaa | 349 | assert(evt.status == osEventMessage); |
micallef25 | 7:fd8e0604faaa | 350 | road_msg_t *message = (road_msg_t*)evt.value.p; |
micallef25 | 7:fd8e0604faaa | 351 | delete message; |
micallef25 | 7:fd8e0604faaa | 352 | } |
micallef25 | 7:fd8e0604faaa | 353 | while(!road_to_network_queue.empty()) |
micallef25 | 7:fd8e0604faaa | 354 | { |
micallef25 | 7:fd8e0604faaa | 355 | osEvent evt = road_to_network_queue.get(); |
micallef25 | 7:fd8e0604faaa | 356 | assert(evt.status == osEventMessage); |
micallef25 | 7:fd8e0604faaa | 357 | road_msg_t *message = (road_msg_t*)evt.value.p; |
micallef25 | 7:fd8e0604faaa | 358 | delete message; |
micallef25 | 7:fd8e0604faaa | 359 | } |
micallef25 | 2:f10d6fecb345 | 360 | } |