Emanuel Kuflik
/
smat_controller
mqtt.cpp@5:e0d8e5e922f1, 2019-12-10 (annotated)
- Committer:
- micallef25
- Date:
- Tue Dec 10 23:39:25 2019 +0000
- Revision:
- 5:e0d8e5e922f1
- Parent:
- 4:64c6fc70ddb7
- Child:
- 6:6cb13ac483e0
stable
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
micallef25 | 2:f10d6fecb345 | 1 | /* mbed Microcontroller Library |
micallef25 | 2:f10d6fecb345 | 2 | * Copyright (c) 2018 ARM Limited |
micallef25 | 2:f10d6fecb345 | 3 | * SPDX-License-Identifier: Apache-2.0 |
micallef25 | 2:f10d6fecb345 | 4 | */ |
micallef25 | 2:f10d6fecb345 | 5 | |
micallef25 | 2:f10d6fecb345 | 6 | #include "mbed.h" |
micallef25 | 2:f10d6fecb345 | 7 | #include "MQTTNetwork.h" |
micallef25 | 2:f10d6fecb345 | 8 | #include "MQTTClient.h" |
micallef25 | 2:f10d6fecb345 | 9 | #include "MQTTmbed.h" |
micallef25 | 2:f10d6fecb345 | 10 | #include "mqtt.h" |
micallef25 | 2:f10d6fecb345 | 11 | #include <assert.h> |
micallef25 | 2:f10d6fecb345 | 12 | #include "AccCar.h" |
micallef25 | 2:f10d6fecb345 | 13 | |
micallef25 | 2:f10d6fecb345 | 14 | // topics interested in |
micallef25 | 2:f10d6fecb345 | 15 | #define POSITION_TOPIC "MQTT_Position0x25" |
MannyK | 4:64c6fc70ddb7 | 16 | #define ROAD_TOPIC "MQTT_Road0x25" |
micallef25 | 2:f10d6fecb345 | 17 | #define CONTROL_TOPIC "MQTT_Control0x25" |
micallef25 | 2:f10d6fecb345 | 18 | |
micallef25 | 5:e0d8e5e922f1 | 19 | //#define DEBUG_MQTT |
micallef25 | 5:e0d8e5e922f1 | 20 | #define MQTT_TAG "[MQTT] " |
micallef25 | 2:f10d6fecb345 | 21 | |
micallef25 | 2:f10d6fecb345 | 22 | Serial mqtt_pc (USBTX, USBRX); |
micallef25 | 2:f10d6fecb345 | 23 | |
micallef25 | 2:f10d6fecb345 | 24 | |
micallef25 | 2:f10d6fecb345 | 25 | // empty constructor |
micallef25 | 2:f10d6fecb345 | 26 | mqtt::mqtt() |
micallef25 | 2:f10d6fecb345 | 27 | { |
micallef25 | 2:f10d6fecb345 | 28 | |
micallef25 | 2:f10d6fecb345 | 29 | } |
micallef25 | 2:f10d6fecb345 | 30 | |
micallef25 | 2:f10d6fecb345 | 31 | /* |
micallef25 | 2:f10d6fecb345 | 32 | This function sets up the wifi module and connects it to the SSID |
micallef25 | 2:f10d6fecb345 | 33 | configured in the configuration file. It also prints out the MAC address |
micallef25 | 2:f10d6fecb345 | 34 | of the module, which is needed if you are trying to use campus wifi. |
micallef25 | 2:f10d6fecb345 | 35 | This function returns NULL if there are any issues. |
micallef25 | 2:f10d6fecb345 | 36 | */ |
micallef25 | 2:f10d6fecb345 | 37 | WiFiInterface* mqtt::setup_wifi() { |
micallef25 | 2:f10d6fecb345 | 38 | // Get a handle to the WiFi module |
micallef25 | 2:f10d6fecb345 | 39 | WiFiInterface* wifi = WiFiInterface::get_default_instance(); |
micallef25 | 2:f10d6fecb345 | 40 | |
micallef25 | 2:f10d6fecb345 | 41 | // Connect the module to the wifi, based on the SSID and password |
micallef25 | 2:f10d6fecb345 | 42 | // specified in the mbed_app.json configuration file |
micallef25 | 2:f10d6fecb345 | 43 | // If you are using AirPennNet-Device, this will not succeed until the MAC |
micallef25 | 2:f10d6fecb345 | 44 | // address (printed shortly after this) is registered |
micallef25 | 5:e0d8e5e922f1 | 45 | mqtt_pc.printf(MQTT_TAG"Connecting to wifi "DELIM); |
micallef25 | 2:f10d6fecb345 | 46 | int rc = wifi->connect(MBED_CONF_APP_WIFI_SSID, MBED_CONF_APP_WIFI_PASSWORD, NSAPI_SECURITY_WPA_WPA2); |
micallef25 | 2:f10d6fecb345 | 47 | |
micallef25 | 2:f10d6fecb345 | 48 | // Print out the MAC address of the wifi module. The MAC address is |
micallef25 | 2:f10d6fecb345 | 49 | // needed to register the device with AirPennNet-Device, so that you |
micallef25 | 2:f10d6fecb345 | 50 | // can use the campus wifi |
micallef25 | 5:e0d8e5e922f1 | 51 | mqtt_pc.printf(MQTT_TAG"MAC Address: "); |
micallef25 | 2:f10d6fecb345 | 52 | mqtt_pc.printf(wifi->get_mac_address()); |
micallef25 | 2:f10d6fecb345 | 53 | mqtt_pc.printf("\r\n"); |
micallef25 | 2:f10d6fecb345 | 54 | |
micallef25 | 2:f10d6fecb345 | 55 | if (rc != 0) { |
micallef25 | 5:e0d8e5e922f1 | 56 | mqtt_pc.printf(MQTT_TAG"Problem connecting to wifi "DELIM); |
micallef25 | 2:f10d6fecb345 | 57 | return NULL; |
micallef25 | 2:f10d6fecb345 | 58 | } else { |
micallef25 | 5:e0d8e5e922f1 | 59 | mqtt_pc.printf(MQTT_TAG"Wifi connected "DELIM); |
micallef25 | 2:f10d6fecb345 | 60 | } |
micallef25 | 2:f10d6fecb345 | 61 | |
micallef25 | 2:f10d6fecb345 | 62 | return wifi; |
micallef25 | 2:f10d6fecb345 | 63 | } |
micallef25 | 2:f10d6fecb345 | 64 | |
micallef25 | 2:f10d6fecb345 | 65 | /* |
micallef25 | 2:f10d6fecb345 | 66 | This function creates the MQTT client and connects it to the MQTT broker |
micallef25 | 2:f10d6fecb345 | 67 | that we have setup for the course. If there are any errors with the |
micallef25 | 2:f10d6fecb345 | 68 | connection, it will return NULL |
micallef25 | 2:f10d6fecb345 | 69 | */ |
micallef25 | 2:f10d6fecb345 | 70 | MQTT::Client<MQTTNetwork, Countdown>* mqtt::setup_mqtt(MQTTNetwork& network) { |
micallef25 | 2:f10d6fecb345 | 71 | // the hostname and port point to a Google Cloud MQTT server we setup for |
micallef25 | 2:f10d6fecb345 | 72 | // this project |
micallef25 | 2:f10d6fecb345 | 73 | const char* hostname = "34.68.206.11"; |
micallef25 | 2:f10d6fecb345 | 74 | int port = 1883; |
micallef25 | 2:f10d6fecb345 | 75 | |
micallef25 | 2:f10d6fecb345 | 76 | // Create the underlying network connection to the MQTT server |
micallef25 | 5:e0d8e5e922f1 | 77 | mqtt_pc.printf(MQTT_TAG"Connecting to %s:%d "DELIM, hostname, port); |
micallef25 | 2:f10d6fecb345 | 78 | int rc = network.connect(hostname, port); |
micallef25 | 2:f10d6fecb345 | 79 | if (rc != 0) { |
micallef25 | 5:e0d8e5e922f1 | 80 | mqtt_pc.printf(MQTT_TAG"There was an error with the TCP connect: %d "DELIM, rc); |
micallef25 | 2:f10d6fecb345 | 81 | return NULL; |
micallef25 | 2:f10d6fecb345 | 82 | } |
micallef25 | 2:f10d6fecb345 | 83 | |
micallef25 | 5:e0d8e5e922f1 | 84 | mqtt_pc.printf(MQTT_TAG"Connected to %s:%d "DELIM, hostname, port); |
micallef25 | 2:f10d6fecb345 | 85 | |
micallef25 | 2:f10d6fecb345 | 86 | // Connect the MQTT client to the server |
micallef25 | 2:f10d6fecb345 | 87 | MQTT::Client<MQTTNetwork, Countdown>* temp_client = new MQTT::Client<MQTTNetwork, Countdown>(network); |
micallef25 | 2:f10d6fecb345 | 88 | rc = temp_client->connect(); |
micallef25 | 2:f10d6fecb345 | 89 | if (rc != 0) { |
micallef25 | 5:e0d8e5e922f1 | 90 | mqtt_pc.printf(MQTT_TAG"There was an error with the MQTT connect: %d "DELIM, rc); |
micallef25 | 2:f10d6fecb345 | 91 | return NULL; |
micallef25 | 2:f10d6fecb345 | 92 | } |
micallef25 | 2:f10d6fecb345 | 93 | |
micallef25 | 5:e0d8e5e922f1 | 94 | mqtt_pc.printf(MQTT_TAG"MQTT connect successful! "DELIM); |
micallef25 | 2:f10d6fecb345 | 95 | |
micallef25 | 2:f10d6fecb345 | 96 | return temp_client; |
micallef25 | 2:f10d6fecb345 | 97 | } |
micallef25 | 2:f10d6fecb345 | 98 | |
micallef25 | 2:f10d6fecb345 | 99 | /* |
micallef25 | 2:f10d6fecb345 | 100 | This function is the callback for when a message is received from the |
micallef25 | 2:f10d6fecb345 | 101 | MQTT broker. You register different callback functions with different |
micallef25 | 2:f10d6fecb345 | 102 | topic subscriptions |
micallef25 | 2:f10d6fecb345 | 103 | */ |
micallef25 | 2:f10d6fecb345 | 104 | void mqtt::control_message_arrived(MQTT::MessageData& md) |
micallef25 | 2:f10d6fecb345 | 105 | { |
micallef25 | 2:f10d6fecb345 | 106 | MQTT::Message &message = md.message; |
micallef25 | 5:e0d8e5e922f1 | 107 | // make message receiver responsible for freeeing |
micallef25 | 2:f10d6fecb345 | 108 | control_msg_t* msg = new control_msg_t; |
micallef25 | 2:f10d6fecb345 | 109 | assert(msg != NULL && message.payloadlen == sizeof(control_msg_t)); |
micallef25 | 2:f10d6fecb345 | 110 | |
micallef25 | 2:f10d6fecb345 | 111 | // copy to our new pointer for some reason just taking the payload ptr is bad for mbed? |
micallef25 | 2:f10d6fecb345 | 112 | memcpy(msg,message.payload,message.payloadlen); |
micallef25 | 5:e0d8e5e922f1 | 113 | |
micallef25 | 5:e0d8e5e922f1 | 114 | if( msg->road_id == mqtt::instance()->mqtt_id ) |
micallef25 | 5:e0d8e5e922f1 | 115 | { |
micallef25 | 2:f10d6fecb345 | 116 | #ifdef DEBUG_MQTT |
micallef25 | 2:f10d6fecb345 | 117 | //mqtt_pc.printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id); |
micallef25 | 5:e0d8e5e922f1 | 118 | mqtt_pc.printf(MQTT_TAG"rcvd control id %d "DELIM,msg->road_id); |
micallef25 | 2:f10d6fecb345 | 119 | #endif |
micallef25 | 5:e0d8e5e922f1 | 120 | // add our message to the queue no fucking clue what happens internally to |
micallef25 | 5:e0d8e5e922f1 | 121 | // the message memory thanks mbed os 5 documentation |
micallef25 | 5:e0d8e5e922f1 | 122 | mqtt::instance()->add_to_control_queue(msg->car_id,msg); |
micallef25 | 5:e0d8e5e922f1 | 123 | } |
micallef25 | 5:e0d8e5e922f1 | 124 | |
micallef25 | 5:e0d8e5e922f1 | 125 | else |
micallef25 | 5:e0d8e5e922f1 | 126 | { |
micallef25 | 5:e0d8e5e922f1 | 127 | #ifdef DEBUG_MQTT |
micallef25 | 5:e0d8e5e922f1 | 128 | mqtt_pc.printf(MQTT_TAG"ignoring control msg "DELIM,msg->road_id); |
micallef25 | 5:e0d8e5e922f1 | 129 | #endif |
micallef25 | 5:e0d8e5e922f1 | 130 | delete msg; |
micallef25 | 5:e0d8e5e922f1 | 131 | } |
micallef25 | 5:e0d8e5e922f1 | 132 | } |
micallef25 | 2:f10d6fecb345 | 133 | |
micallef25 | 5:e0d8e5e922f1 | 134 | void mqtt::road_message_arrived(MQTT::MessageData& md) |
micallef25 | 5:e0d8e5e922f1 | 135 | { |
micallef25 | 5:e0d8e5e922f1 | 136 | MQTT::Message &message = md.message; |
micallef25 | 5:e0d8e5e922f1 | 137 | // make message... I will endeavor to free this one the receiver side |
micallef25 | 5:e0d8e5e922f1 | 138 | // from example sender allocates and receiver frees so well stick |
micallef25 | 5:e0d8e5e922f1 | 139 | // with that paradigm |
micallef25 | 5:e0d8e5e922f1 | 140 | road_msg_t* msg = new road_msg_t; |
micallef25 | 5:e0d8e5e922f1 | 141 | assert(msg != NULL && message.payloadlen == sizeof(road_msg_t)); |
micallef25 | 5:e0d8e5e922f1 | 142 | |
micallef25 | 5:e0d8e5e922f1 | 143 | // copy to our new pointer for some reason just taking the payload ptr is bad for mbed? |
micallef25 | 5:e0d8e5e922f1 | 144 | memcpy(msg,message.payload,message.payloadlen); |
micallef25 | 2:f10d6fecb345 | 145 | |
micallef25 | 5:e0d8e5e922f1 | 146 | if( msg->road_id != mqtt::instance()->mqtt_id ) |
micallef25 | 5:e0d8e5e922f1 | 147 | { |
micallef25 | 5:e0d8e5e922f1 | 148 | #ifdef DEBUG_MQTT |
micallef25 | 5:e0d8e5e922f1 | 149 | mqtt_pc.printf(MQTT_TAG"rcvd road %d "DELIM,msg->road_id); |
micallef25 | 5:e0d8e5e922f1 | 150 | #endif |
micallef25 | 5:e0d8e5e922f1 | 151 | // add our message to the queue no fucking clue what happens internally to |
micallef25 | 5:e0d8e5e922f1 | 152 | // the message memory thanks mbed os 5 documentation |
micallef25 | 5:e0d8e5e922f1 | 153 | mqtt::instance()->add_to_network_to_road_queue(msg); |
micallef25 | 5:e0d8e5e922f1 | 154 | } |
micallef25 | 5:e0d8e5e922f1 | 155 | |
micallef25 | 5:e0d8e5e922f1 | 156 | else{ |
micallef25 | 5:e0d8e5e922f1 | 157 | #ifdef DEBUG_MQTT |
micallef25 | 5:e0d8e5e922f1 | 158 | mqtt_pc.printf(MQTT_TAG"ignoring road message %d "DELIM,msg->road_id); |
micallef25 | 5:e0d8e5e922f1 | 159 | #endif |
micallef25 | 5:e0d8e5e922f1 | 160 | delete msg; |
micallef25 | 5:e0d8e5e922f1 | 161 | } |
micallef25 | 5:e0d8e5e922f1 | 162 | |
micallef25 | 2:f10d6fecb345 | 163 | } |
micallef25 | 2:f10d6fecb345 | 164 | |
micallef25 | 2:f10d6fecb345 | 165 | /* |
micallef25 | 2:f10d6fecb345 | 166 | This function sends a message to the test topic. |
micallef25 | 2:f10d6fecb345 | 167 | */ |
micallef25 | 5:e0d8e5e922f1 | 168 | int mqtt::send_position_msg( MQTT::Message& message, position_msg_t* msg ) |
micallef25 | 2:f10d6fecb345 | 169 | { |
micallef25 | 2:f10d6fecb345 | 170 | assert(msg != NULL && client != NULL); |
micallef25 | 2:f10d6fecb345 | 171 | |
micallef25 | 5:e0d8e5e922f1 | 172 | // MQTT::Message message; |
micallef25 | 5:e0d8e5e922f1 | 173 | |
micallef25 | 5:e0d8e5e922f1 | 174 | #ifdef DEBUG_MQTT |
micallef25 | 5:e0d8e5e922f1 | 175 | mqtt_pc.printf(MQTT_TAG"sending position msg %d "DELIM,msg->road_id); |
micallef25 | 5:e0d8e5e922f1 | 176 | #endif |
micallef25 | 2:f10d6fecb345 | 177 | |
micallef25 | 5:e0d8e5e922f1 | 178 | // might be safest to memcopy this? but well if shit breaks then well fix |
micallef25 | 5:e0d8e5e922f1 | 179 | // memcpy(message.payload,msg,sizeof(position_msg_t)); |
micallef25 | 2:f10d6fecb345 | 180 | message.payload = (void*)msg; |
micallef25 | 2:f10d6fecb345 | 181 | message.payloadlen = sizeof(position_msg_t); |
micallef25 | 2:f10d6fecb345 | 182 | message.qos = MQTT::QOS1; |
micallef25 | 2:f10d6fecb345 | 183 | |
micallef25 | 2:f10d6fecb345 | 184 | int rc = client->publish(POSITION_TOPIC,message); |
micallef25 | 2:f10d6fecb345 | 185 | assert(rc == 0); |
micallef25 | 2:f10d6fecb345 | 186 | |
micallef25 | 5:e0d8e5e922f1 | 187 | // client->yield(1000); |
micallef25 | 2:f10d6fecb345 | 188 | return 0; |
micallef25 | 2:f10d6fecb345 | 189 | } |
micallef25 | 2:f10d6fecb345 | 190 | |
micallef25 | 5:e0d8e5e922f1 | 191 | int mqtt::send_road_msg( MQTT::Message& message, road_msg_t* msg ) |
MannyK | 4:64c6fc70ddb7 | 192 | { |
MannyK | 4:64c6fc70ddb7 | 193 | assert(msg != NULL && client != NULL); |
MannyK | 4:64c6fc70ddb7 | 194 | |
micallef25 | 5:e0d8e5e922f1 | 195 | // MQTT::Message message; |
micallef25 | 5:e0d8e5e922f1 | 196 | |
micallef25 | 5:e0d8e5e922f1 | 197 | #ifdef DEBUG_MQTT |
micallef25 | 5:e0d8e5e922f1 | 198 | mqtt_pc.printf(MQTT_TAG"sending road msg %d "DELIM,msg->road_id); |
micallef25 | 5:e0d8e5e922f1 | 199 | #endif |
MannyK | 4:64c6fc70ddb7 | 200 | |
MannyK | 4:64c6fc70ddb7 | 201 | // might be safest to memcopy thius seems to work |
MannyK | 4:64c6fc70ddb7 | 202 | //memcpy(message.payload,msg,sizeof(position_msg_t)); |
MannyK | 4:64c6fc70ddb7 | 203 | message.payload = (void*)msg; |
micallef25 | 5:e0d8e5e922f1 | 204 | message.payloadlen = sizeof(road_msg_t); |
MannyK | 4:64c6fc70ddb7 | 205 | message.qos = MQTT::QOS1; |
MannyK | 4:64c6fc70ddb7 | 206 | |
MannyK | 4:64c6fc70ddb7 | 207 | int rc = client->publish(ROAD_TOPIC,message); |
MannyK | 4:64c6fc70ddb7 | 208 | assert(rc == 0); |
MannyK | 4:64c6fc70ddb7 | 209 | |
micallef25 | 5:e0d8e5e922f1 | 210 | // client->yield(1000); |
MannyK | 4:64c6fc70ddb7 | 211 | return 0; |
MannyK | 4:64c6fc70ddb7 | 212 | } |
MannyK | 4:64c6fc70ddb7 | 213 | |
micallef25 | 2:f10d6fecb345 | 214 | void mqtt::bringup_network() { |
micallef25 | 2:f10d6fecb345 | 215 | |
micallef25 | 2:f10d6fecb345 | 216 | // |
micallef25 | 2:f10d6fecb345 | 217 | WiFiInterface* wifi = setup_wifi(); |
micallef25 | 2:f10d6fecb345 | 218 | assert(wifi != NULL); |
micallef25 | 2:f10d6fecb345 | 219 | |
micallef25 | 2:f10d6fecb345 | 220 | // Create the network object needed by the MQTT client |
micallef25 | 2:f10d6fecb345 | 221 | new_network = new MQTTNetwork(wifi); |
micallef25 | 2:f10d6fecb345 | 222 | |
micallef25 | 2:f10d6fecb345 | 223 | // get client |
micallef25 | 2:f10d6fecb345 | 224 | client = setup_mqtt(*new_network); |
micallef25 | 2:f10d6fecb345 | 225 | assert(client != NULL); |
micallef25 | 2:f10d6fecb345 | 226 | |
micallef25 | 2:f10d6fecb345 | 227 | // Subscribe to a topic / register a callback |
micallef25 | 5:e0d8e5e922f1 | 228 | mqtt_pc.printf(MQTT_TAG"Subscribing to topic %s "DELIM, CONTROL_TOPIC); |
micallef25 | 2:f10d6fecb345 | 229 | int rc = client->subscribe(CONTROL_TOPIC, MQTT::QOS1, control_message_arrived); |
micallef25 | 2:f10d6fecb345 | 230 | assert(rc == 0); |
micallef25 | 2:f10d6fecb345 | 231 | |
micallef25 | 5:e0d8e5e922f1 | 232 | // Subscribe to a topic / register a callback |
micallef25 | 5:e0d8e5e922f1 | 233 | mqtt_pc.printf(MQTT_TAG"Subscribing to topic %s "DELIM, ROAD_TOPIC); |
micallef25 | 5:e0d8e5e922f1 | 234 | rc = client->subscribe(ROAD_TOPIC, MQTT::QOS1, road_message_arrived); |
micallef25 | 5:e0d8e5e922f1 | 235 | assert(rc == 0); |
micallef25 | 5:e0d8e5e922f1 | 236 | |
micallef25 | 2:f10d6fecb345 | 237 | // make a road based of mqtt id |
micallef25 | 2:f10d6fecb345 | 238 | mqtt_id = 0; |
micallef25 | 2:f10d6fecb345 | 239 | if(strcmp(wifi->get_mac_address(),"2c:3a:e8:0b:75:06") == 0){ |
micallef25 | 2:f10d6fecb345 | 240 | mqtt_id = 0; |
micallef25 | 2:f10d6fecb345 | 241 | } |
micallef25 | 2:f10d6fecb345 | 242 | else{ |
micallef25 | 2:f10d6fecb345 | 243 | mqtt_id = 1; |
micallef25 | 2:f10d6fecb345 | 244 | } |
micallef25 | 2:f10d6fecb345 | 245 | |
micallef25 | 5:e0d8e5e922f1 | 246 | mqtt_pc.printf(MQTT_TAG"Subscribed, Setup complete! road %d "DELIM,mqtt_id); |
micallef25 | 2:f10d6fecb345 | 247 | } |
micallef25 | 2:f10d6fecb345 | 248 | |
micallef25 | 2:f10d6fecb345 | 249 | |
micallef25 | 2:f10d6fecb345 | 250 | // manage callbacks from mqtt |
micallef25 | 2:f10d6fecb345 | 251 | void mqtt::manage_network() |
micallef25 | 2:f10d6fecb345 | 252 | { |
micallef25 | 5:e0d8e5e922f1 | 253 | MQTT::Message mqtt_message; |
micallef25 | 2:f10d6fecb345 | 254 | while(true) |
micallef25 | 2:f10d6fecb345 | 255 | { |
micallef25 | 5:e0d8e5e922f1 | 256 | while(!position_queue.empty()) |
micallef25 | 5:e0d8e5e922f1 | 257 | { |
micallef25 | 5:e0d8e5e922f1 | 258 | |
micallef25 | 5:e0d8e5e922f1 | 259 | //#ifdef DEBUG_MQTT |
micallef25 | 5:e0d8e5e922f1 | 260 | // mqtt_pc.printf(MQTT_TAG"getting position msg\r\n"); |
micallef25 | 5:e0d8e5e922f1 | 261 | //#endif |
micallef25 | 5:e0d8e5e922f1 | 262 | // |
micallef25 | 5:e0d8e5e922f1 | 263 | osEvent evt = position_queue.get(); |
micallef25 | 5:e0d8e5e922f1 | 264 | assert(evt.status == osEventMessage); |
micallef25 | 2:f10d6fecb345 | 265 | |
micallef25 | 5:e0d8e5e922f1 | 266 | // |
micallef25 | 5:e0d8e5e922f1 | 267 | position_msg_t *message = (position_msg_t*)evt.value.p; |
micallef25 | 5:e0d8e5e922f1 | 268 | assert(message != NULL); |
micallef25 | 5:e0d8e5e922f1 | 269 | send_position_msg(mqtt_message,message); |
micallef25 | 5:e0d8e5e922f1 | 270 | } |
MannyK | 4:64c6fc70ddb7 | 271 | |
micallef25 | 5:e0d8e5e922f1 | 272 | if (!road_to_network_queue.empty()) |
micallef25 | 5:e0d8e5e922f1 | 273 | { |
micallef25 | 5:e0d8e5e922f1 | 274 | |
micallef25 | 5:e0d8e5e922f1 | 275 | //#ifdef DEBUG_MQTT |
micallef25 | 5:e0d8e5e922f1 | 276 | // mqtt_pc.printf(MQTT_TAG"getting road info\r\n"); |
micallef25 | 5:e0d8e5e922f1 | 277 | //#endif |
micallef25 | 5:e0d8e5e922f1 | 278 | osEvent revt = road_to_network_queue.get(); |
MannyK | 4:64c6fc70ddb7 | 279 | assert(revt.status == osEventMessage); |
micallef25 | 5:e0d8e5e922f1 | 280 | |
micallef25 | 5:e0d8e5e922f1 | 281 | // |
MannyK | 4:64c6fc70ddb7 | 282 | road_msg_t *road_msg = (road_msg_t*)revt.value.p; |
micallef25 | 5:e0d8e5e922f1 | 283 | assert(road_msg != NULL); |
micallef25 | 5:e0d8e5e922f1 | 284 | send_road_msg(mqtt_message,road_msg); |
MannyK | 4:64c6fc70ddb7 | 285 | } |
micallef25 | 5:e0d8e5e922f1 | 286 | |
micallef25 | 5:e0d8e5e922f1 | 287 | client->yield(10); |
micallef25 | 2:f10d6fecb345 | 288 | } |
micallef25 | 2:f10d6fecb345 | 289 | } |
micallef25 | 2:f10d6fecb345 | 290 | |
micallef25 | 2:f10d6fecb345 | 291 | // |
micallef25 | 2:f10d6fecb345 | 292 | // clean up goes here |
micallef25 | 2:f10d6fecb345 | 293 | void mqtt::shutdown_network() |
micallef25 | 2:f10d6fecb345 | 294 | { |
micallef25 | 2:f10d6fecb345 | 295 | // |
micallef25 | 5:e0d8e5e922f1 | 296 | mqtt_pc.printf(MQTT_TAG"shutting down mbed "DELIM); |
micallef25 | 2:f10d6fecb345 | 297 | |
micallef25 | 2:f10d6fecb345 | 298 | // |
micallef25 | 2:f10d6fecb345 | 299 | client->disconnect(); |
micallef25 | 2:f10d6fecb345 | 300 | delete new_network; |
micallef25 | 2:f10d6fecb345 | 301 | |
micallef25 | 2:f10d6fecb345 | 302 | // |
micallef25 | 2:f10d6fecb345 | 303 | thread->terminate(); |
micallef25 | 2:f10d6fecb345 | 304 | delete thread; |
micallef25 | 2:f10d6fecb345 | 305 | } |
micallef25 | 2:f10d6fecb345 | 306 | |
micallef25 | 2:f10d6fecb345 | 307 | // launch network manager thread |
micallef25 | 2:f10d6fecb345 | 308 | // responsible for pub sub |
micallef25 | 2:f10d6fecb345 | 309 | void mqtt::setup_network() |
micallef25 | 2:f10d6fecb345 | 310 | { |
micallef25 | 2:f10d6fecb345 | 311 | // bring up network if anything bad happens we will assert and crash |
micallef25 | 2:f10d6fecb345 | 312 | bringup_network(); |
micallef25 | 2:f10d6fecb345 | 313 | |
micallef25 | 2:f10d6fecb345 | 314 | // create a new thread thats sole purpose in life is to call client yield |
micallef25 | 2:f10d6fecb345 | 315 | // what a sad life for a thread |
micallef25 | 2:f10d6fecb345 | 316 | // calling yield is necessary as we will not get any messages for the mqtt |
micallef25 | 2:f10d6fecb345 | 317 | // serverwithout it |
micallef25 | 2:f10d6fecb345 | 318 | thread = new Thread(); |
micallef25 | 2:f10d6fecb345 | 319 | assert(thread != NULL); |
micallef25 | 2:f10d6fecb345 | 320 | thread->start( callback(this,&mqtt::manage_network) ); |
micallef25 | 2:f10d6fecb345 | 321 | |
micallef25 | 2:f10d6fecb345 | 322 | } |