Dependencies:   MQTT

Committer:
micallef25
Date:
Tue Dec 10 23:39:25 2019 +0000
Revision:
5:e0d8e5e922f1
Parent:
4:64c6fc70ddb7
Child:
6:6cb13ac483e0
stable

Who changed what in which revision?

UserRevisionLine numberNew contents of line
micallef25 2:f10d6fecb345 1 /* mbed Microcontroller Library
micallef25 2:f10d6fecb345 2 * Copyright (c) 2018 ARM Limited
micallef25 2:f10d6fecb345 3 * SPDX-License-Identifier: Apache-2.0
micallef25 2:f10d6fecb345 4 */
micallef25 2:f10d6fecb345 5
micallef25 2:f10d6fecb345 6 #include "mbed.h"
micallef25 2:f10d6fecb345 7 #include "MQTTNetwork.h"
micallef25 2:f10d6fecb345 8 #include "MQTTClient.h"
micallef25 2:f10d6fecb345 9 #include "MQTTmbed.h"
micallef25 2:f10d6fecb345 10 #include "mqtt.h"
micallef25 2:f10d6fecb345 11 #include <assert.h>
micallef25 2:f10d6fecb345 12 #include "AccCar.h"
micallef25 2:f10d6fecb345 13
micallef25 2:f10d6fecb345 14 // topics interested in
micallef25 2:f10d6fecb345 15 #define POSITION_TOPIC "MQTT_Position0x25"
MannyK 4:64c6fc70ddb7 16 #define ROAD_TOPIC "MQTT_Road0x25"
micallef25 2:f10d6fecb345 17 #define CONTROL_TOPIC "MQTT_Control0x25"
micallef25 2:f10d6fecb345 18
micallef25 5:e0d8e5e922f1 19 //#define DEBUG_MQTT
micallef25 5:e0d8e5e922f1 20 #define MQTT_TAG "[MQTT] "
micallef25 2:f10d6fecb345 21
micallef25 2:f10d6fecb345 22 Serial mqtt_pc (USBTX, USBRX);
micallef25 2:f10d6fecb345 23
micallef25 2:f10d6fecb345 24
micallef25 2:f10d6fecb345 25 // empty constructor
micallef25 2:f10d6fecb345 26 mqtt::mqtt()
micallef25 2:f10d6fecb345 27 {
micallef25 2:f10d6fecb345 28
micallef25 2:f10d6fecb345 29 }
micallef25 2:f10d6fecb345 30
micallef25 2:f10d6fecb345 31 /*
micallef25 2:f10d6fecb345 32 This function sets up the wifi module and connects it to the SSID
micallef25 2:f10d6fecb345 33 configured in the configuration file. It also prints out the MAC address
micallef25 2:f10d6fecb345 34 of the module, which is needed if you are trying to use campus wifi.
micallef25 2:f10d6fecb345 35 This function returns NULL if there are any issues.
micallef25 2:f10d6fecb345 36 */
micallef25 2:f10d6fecb345 37 WiFiInterface* mqtt::setup_wifi() {
micallef25 2:f10d6fecb345 38 // Get a handle to the WiFi module
micallef25 2:f10d6fecb345 39 WiFiInterface* wifi = WiFiInterface::get_default_instance();
micallef25 2:f10d6fecb345 40
micallef25 2:f10d6fecb345 41 // Connect the module to the wifi, based on the SSID and password
micallef25 2:f10d6fecb345 42 // specified in the mbed_app.json configuration file
micallef25 2:f10d6fecb345 43 // If you are using AirPennNet-Device, this will not succeed until the MAC
micallef25 2:f10d6fecb345 44 // address (printed shortly after this) is registered
micallef25 5:e0d8e5e922f1 45 mqtt_pc.printf(MQTT_TAG"Connecting to wifi "DELIM);
micallef25 2:f10d6fecb345 46 int rc = wifi->connect(MBED_CONF_APP_WIFI_SSID, MBED_CONF_APP_WIFI_PASSWORD, NSAPI_SECURITY_WPA_WPA2);
micallef25 2:f10d6fecb345 47
micallef25 2:f10d6fecb345 48 // Print out the MAC address of the wifi module. The MAC address is
micallef25 2:f10d6fecb345 49 // needed to register the device with AirPennNet-Device, so that you
micallef25 2:f10d6fecb345 50 // can use the campus wifi
micallef25 5:e0d8e5e922f1 51 mqtt_pc.printf(MQTT_TAG"MAC Address: ");
micallef25 2:f10d6fecb345 52 mqtt_pc.printf(wifi->get_mac_address());
micallef25 2:f10d6fecb345 53 mqtt_pc.printf("\r\n");
micallef25 2:f10d6fecb345 54
micallef25 2:f10d6fecb345 55 if (rc != 0) {
micallef25 5:e0d8e5e922f1 56 mqtt_pc.printf(MQTT_TAG"Problem connecting to wifi "DELIM);
micallef25 2:f10d6fecb345 57 return NULL;
micallef25 2:f10d6fecb345 58 } else {
micallef25 5:e0d8e5e922f1 59 mqtt_pc.printf(MQTT_TAG"Wifi connected "DELIM);
micallef25 2:f10d6fecb345 60 }
micallef25 2:f10d6fecb345 61
micallef25 2:f10d6fecb345 62 return wifi;
micallef25 2:f10d6fecb345 63 }
micallef25 2:f10d6fecb345 64
micallef25 2:f10d6fecb345 65 /*
micallef25 2:f10d6fecb345 66 This function creates the MQTT client and connects it to the MQTT broker
micallef25 2:f10d6fecb345 67 that we have setup for the course. If there are any errors with the
micallef25 2:f10d6fecb345 68 connection, it will return NULL
micallef25 2:f10d6fecb345 69 */
micallef25 2:f10d6fecb345 70 MQTT::Client<MQTTNetwork, Countdown>* mqtt::setup_mqtt(MQTTNetwork& network) {
micallef25 2:f10d6fecb345 71 // the hostname and port point to a Google Cloud MQTT server we setup for
micallef25 2:f10d6fecb345 72 // this project
micallef25 2:f10d6fecb345 73 const char* hostname = "34.68.206.11";
micallef25 2:f10d6fecb345 74 int port = 1883;
micallef25 2:f10d6fecb345 75
micallef25 2:f10d6fecb345 76 // Create the underlying network connection to the MQTT server
micallef25 5:e0d8e5e922f1 77 mqtt_pc.printf(MQTT_TAG"Connecting to %s:%d "DELIM, hostname, port);
micallef25 2:f10d6fecb345 78 int rc = network.connect(hostname, port);
micallef25 2:f10d6fecb345 79 if (rc != 0) {
micallef25 5:e0d8e5e922f1 80 mqtt_pc.printf(MQTT_TAG"There was an error with the TCP connect: %d "DELIM, rc);
micallef25 2:f10d6fecb345 81 return NULL;
micallef25 2:f10d6fecb345 82 }
micallef25 2:f10d6fecb345 83
micallef25 5:e0d8e5e922f1 84 mqtt_pc.printf(MQTT_TAG"Connected to %s:%d "DELIM, hostname, port);
micallef25 2:f10d6fecb345 85
micallef25 2:f10d6fecb345 86 // Connect the MQTT client to the server
micallef25 2:f10d6fecb345 87 MQTT::Client<MQTTNetwork, Countdown>* temp_client = new MQTT::Client<MQTTNetwork, Countdown>(network);
micallef25 2:f10d6fecb345 88 rc = temp_client->connect();
micallef25 2:f10d6fecb345 89 if (rc != 0) {
micallef25 5:e0d8e5e922f1 90 mqtt_pc.printf(MQTT_TAG"There was an error with the MQTT connect: %d "DELIM, rc);
micallef25 2:f10d6fecb345 91 return NULL;
micallef25 2:f10d6fecb345 92 }
micallef25 2:f10d6fecb345 93
micallef25 5:e0d8e5e922f1 94 mqtt_pc.printf(MQTT_TAG"MQTT connect successful! "DELIM);
micallef25 2:f10d6fecb345 95
micallef25 2:f10d6fecb345 96 return temp_client;
micallef25 2:f10d6fecb345 97 }
micallef25 2:f10d6fecb345 98
micallef25 2:f10d6fecb345 99 /*
micallef25 2:f10d6fecb345 100 This function is the callback for when a message is received from the
micallef25 2:f10d6fecb345 101 MQTT broker. You register different callback functions with different
micallef25 2:f10d6fecb345 102 topic subscriptions
micallef25 2:f10d6fecb345 103 */
micallef25 2:f10d6fecb345 104 void mqtt::control_message_arrived(MQTT::MessageData& md)
micallef25 2:f10d6fecb345 105 {
micallef25 2:f10d6fecb345 106 MQTT::Message &message = md.message;
micallef25 5:e0d8e5e922f1 107 // make message receiver responsible for freeeing
micallef25 2:f10d6fecb345 108 control_msg_t* msg = new control_msg_t;
micallef25 2:f10d6fecb345 109 assert(msg != NULL && message.payloadlen == sizeof(control_msg_t));
micallef25 2:f10d6fecb345 110
micallef25 2:f10d6fecb345 111 // copy to our new pointer for some reason just taking the payload ptr is bad for mbed?
micallef25 2:f10d6fecb345 112 memcpy(msg,message.payload,message.payloadlen);
micallef25 5:e0d8e5e922f1 113
micallef25 5:e0d8e5e922f1 114 if( msg->road_id == mqtt::instance()->mqtt_id )
micallef25 5:e0d8e5e922f1 115 {
micallef25 2:f10d6fecb345 116 #ifdef DEBUG_MQTT
micallef25 2:f10d6fecb345 117 //mqtt_pc.printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
micallef25 5:e0d8e5e922f1 118 mqtt_pc.printf(MQTT_TAG"rcvd control id %d "DELIM,msg->road_id);
micallef25 2:f10d6fecb345 119 #endif
micallef25 5:e0d8e5e922f1 120 // add our message to the queue no fucking clue what happens internally to
micallef25 5:e0d8e5e922f1 121 // the message memory thanks mbed os 5 documentation
micallef25 5:e0d8e5e922f1 122 mqtt::instance()->add_to_control_queue(msg->car_id,msg);
micallef25 5:e0d8e5e922f1 123 }
micallef25 5:e0d8e5e922f1 124
micallef25 5:e0d8e5e922f1 125 else
micallef25 5:e0d8e5e922f1 126 {
micallef25 5:e0d8e5e922f1 127 #ifdef DEBUG_MQTT
micallef25 5:e0d8e5e922f1 128 mqtt_pc.printf(MQTT_TAG"ignoring control msg "DELIM,msg->road_id);
micallef25 5:e0d8e5e922f1 129 #endif
micallef25 5:e0d8e5e922f1 130 delete msg;
micallef25 5:e0d8e5e922f1 131 }
micallef25 5:e0d8e5e922f1 132 }
micallef25 2:f10d6fecb345 133
micallef25 5:e0d8e5e922f1 134 void mqtt::road_message_arrived(MQTT::MessageData& md)
micallef25 5:e0d8e5e922f1 135 {
micallef25 5:e0d8e5e922f1 136 MQTT::Message &message = md.message;
micallef25 5:e0d8e5e922f1 137 // make message... I will endeavor to free this one the receiver side
micallef25 5:e0d8e5e922f1 138 // from example sender allocates and receiver frees so well stick
micallef25 5:e0d8e5e922f1 139 // with that paradigm
micallef25 5:e0d8e5e922f1 140 road_msg_t* msg = new road_msg_t;
micallef25 5:e0d8e5e922f1 141 assert(msg != NULL && message.payloadlen == sizeof(road_msg_t));
micallef25 5:e0d8e5e922f1 142
micallef25 5:e0d8e5e922f1 143 // copy to our new pointer for some reason just taking the payload ptr is bad for mbed?
micallef25 5:e0d8e5e922f1 144 memcpy(msg,message.payload,message.payloadlen);
micallef25 2:f10d6fecb345 145
micallef25 5:e0d8e5e922f1 146 if( msg->road_id != mqtt::instance()->mqtt_id )
micallef25 5:e0d8e5e922f1 147 {
micallef25 5:e0d8e5e922f1 148 #ifdef DEBUG_MQTT
micallef25 5:e0d8e5e922f1 149 mqtt_pc.printf(MQTT_TAG"rcvd road %d "DELIM,msg->road_id);
micallef25 5:e0d8e5e922f1 150 #endif
micallef25 5:e0d8e5e922f1 151 // add our message to the queue no fucking clue what happens internally to
micallef25 5:e0d8e5e922f1 152 // the message memory thanks mbed os 5 documentation
micallef25 5:e0d8e5e922f1 153 mqtt::instance()->add_to_network_to_road_queue(msg);
micallef25 5:e0d8e5e922f1 154 }
micallef25 5:e0d8e5e922f1 155
micallef25 5:e0d8e5e922f1 156 else{
micallef25 5:e0d8e5e922f1 157 #ifdef DEBUG_MQTT
micallef25 5:e0d8e5e922f1 158 mqtt_pc.printf(MQTT_TAG"ignoring road message %d "DELIM,msg->road_id);
micallef25 5:e0d8e5e922f1 159 #endif
micallef25 5:e0d8e5e922f1 160 delete msg;
micallef25 5:e0d8e5e922f1 161 }
micallef25 5:e0d8e5e922f1 162
micallef25 2:f10d6fecb345 163 }
micallef25 2:f10d6fecb345 164
micallef25 2:f10d6fecb345 165 /*
micallef25 2:f10d6fecb345 166 This function sends a message to the test topic.
micallef25 2:f10d6fecb345 167 */
micallef25 5:e0d8e5e922f1 168 int mqtt::send_position_msg( MQTT::Message& message, position_msg_t* msg )
micallef25 2:f10d6fecb345 169 {
micallef25 2:f10d6fecb345 170 assert(msg != NULL && client != NULL);
micallef25 2:f10d6fecb345 171
micallef25 5:e0d8e5e922f1 172 // MQTT::Message message;
micallef25 5:e0d8e5e922f1 173
micallef25 5:e0d8e5e922f1 174 #ifdef DEBUG_MQTT
micallef25 5:e0d8e5e922f1 175 mqtt_pc.printf(MQTT_TAG"sending position msg %d "DELIM,msg->road_id);
micallef25 5:e0d8e5e922f1 176 #endif
micallef25 2:f10d6fecb345 177
micallef25 5:e0d8e5e922f1 178 // might be safest to memcopy this? but well if shit breaks then well fix
micallef25 5:e0d8e5e922f1 179 // memcpy(message.payload,msg,sizeof(position_msg_t));
micallef25 2:f10d6fecb345 180 message.payload = (void*)msg;
micallef25 2:f10d6fecb345 181 message.payloadlen = sizeof(position_msg_t);
micallef25 2:f10d6fecb345 182 message.qos = MQTT::QOS1;
micallef25 2:f10d6fecb345 183
micallef25 2:f10d6fecb345 184 int rc = client->publish(POSITION_TOPIC,message);
micallef25 2:f10d6fecb345 185 assert(rc == 0);
micallef25 2:f10d6fecb345 186
micallef25 5:e0d8e5e922f1 187 // client->yield(1000);
micallef25 2:f10d6fecb345 188 return 0;
micallef25 2:f10d6fecb345 189 }
micallef25 2:f10d6fecb345 190
micallef25 5:e0d8e5e922f1 191 int mqtt::send_road_msg( MQTT::Message& message, road_msg_t* msg )
MannyK 4:64c6fc70ddb7 192 {
MannyK 4:64c6fc70ddb7 193 assert(msg != NULL && client != NULL);
MannyK 4:64c6fc70ddb7 194
micallef25 5:e0d8e5e922f1 195 // MQTT::Message message;
micallef25 5:e0d8e5e922f1 196
micallef25 5:e0d8e5e922f1 197 #ifdef DEBUG_MQTT
micallef25 5:e0d8e5e922f1 198 mqtt_pc.printf(MQTT_TAG"sending road msg %d "DELIM,msg->road_id);
micallef25 5:e0d8e5e922f1 199 #endif
MannyK 4:64c6fc70ddb7 200
MannyK 4:64c6fc70ddb7 201 // might be safest to memcopy thius seems to work
MannyK 4:64c6fc70ddb7 202 //memcpy(message.payload,msg,sizeof(position_msg_t));
MannyK 4:64c6fc70ddb7 203 message.payload = (void*)msg;
micallef25 5:e0d8e5e922f1 204 message.payloadlen = sizeof(road_msg_t);
MannyK 4:64c6fc70ddb7 205 message.qos = MQTT::QOS1;
MannyK 4:64c6fc70ddb7 206
MannyK 4:64c6fc70ddb7 207 int rc = client->publish(ROAD_TOPIC,message);
MannyK 4:64c6fc70ddb7 208 assert(rc == 0);
MannyK 4:64c6fc70ddb7 209
micallef25 5:e0d8e5e922f1 210 // client->yield(1000);
MannyK 4:64c6fc70ddb7 211 return 0;
MannyK 4:64c6fc70ddb7 212 }
MannyK 4:64c6fc70ddb7 213
micallef25 2:f10d6fecb345 214 void mqtt::bringup_network() {
micallef25 2:f10d6fecb345 215
micallef25 2:f10d6fecb345 216 //
micallef25 2:f10d6fecb345 217 WiFiInterface* wifi = setup_wifi();
micallef25 2:f10d6fecb345 218 assert(wifi != NULL);
micallef25 2:f10d6fecb345 219
micallef25 2:f10d6fecb345 220 // Create the network object needed by the MQTT client
micallef25 2:f10d6fecb345 221 new_network = new MQTTNetwork(wifi);
micallef25 2:f10d6fecb345 222
micallef25 2:f10d6fecb345 223 // get client
micallef25 2:f10d6fecb345 224 client = setup_mqtt(*new_network);
micallef25 2:f10d6fecb345 225 assert(client != NULL);
micallef25 2:f10d6fecb345 226
micallef25 2:f10d6fecb345 227 // Subscribe to a topic / register a callback
micallef25 5:e0d8e5e922f1 228 mqtt_pc.printf(MQTT_TAG"Subscribing to topic %s "DELIM, CONTROL_TOPIC);
micallef25 2:f10d6fecb345 229 int rc = client->subscribe(CONTROL_TOPIC, MQTT::QOS1, control_message_arrived);
micallef25 2:f10d6fecb345 230 assert(rc == 0);
micallef25 2:f10d6fecb345 231
micallef25 5:e0d8e5e922f1 232 // Subscribe to a topic / register a callback
micallef25 5:e0d8e5e922f1 233 mqtt_pc.printf(MQTT_TAG"Subscribing to topic %s "DELIM, ROAD_TOPIC);
micallef25 5:e0d8e5e922f1 234 rc = client->subscribe(ROAD_TOPIC, MQTT::QOS1, road_message_arrived);
micallef25 5:e0d8e5e922f1 235 assert(rc == 0);
micallef25 5:e0d8e5e922f1 236
micallef25 2:f10d6fecb345 237 // make a road based of mqtt id
micallef25 2:f10d6fecb345 238 mqtt_id = 0;
micallef25 2:f10d6fecb345 239 if(strcmp(wifi->get_mac_address(),"2c:3a:e8:0b:75:06") == 0){
micallef25 2:f10d6fecb345 240 mqtt_id = 0;
micallef25 2:f10d6fecb345 241 }
micallef25 2:f10d6fecb345 242 else{
micallef25 2:f10d6fecb345 243 mqtt_id = 1;
micallef25 2:f10d6fecb345 244 }
micallef25 2:f10d6fecb345 245
micallef25 5:e0d8e5e922f1 246 mqtt_pc.printf(MQTT_TAG"Subscribed, Setup complete! road %d "DELIM,mqtt_id);
micallef25 2:f10d6fecb345 247 }
micallef25 2:f10d6fecb345 248
micallef25 2:f10d6fecb345 249
micallef25 2:f10d6fecb345 250 // manage callbacks from mqtt
micallef25 2:f10d6fecb345 251 void mqtt::manage_network()
micallef25 2:f10d6fecb345 252 {
micallef25 5:e0d8e5e922f1 253 MQTT::Message mqtt_message;
micallef25 2:f10d6fecb345 254 while(true)
micallef25 2:f10d6fecb345 255 {
micallef25 5:e0d8e5e922f1 256 while(!position_queue.empty())
micallef25 5:e0d8e5e922f1 257 {
micallef25 5:e0d8e5e922f1 258
micallef25 5:e0d8e5e922f1 259 //#ifdef DEBUG_MQTT
micallef25 5:e0d8e5e922f1 260 // mqtt_pc.printf(MQTT_TAG"getting position msg\r\n");
micallef25 5:e0d8e5e922f1 261 //#endif
micallef25 5:e0d8e5e922f1 262 //
micallef25 5:e0d8e5e922f1 263 osEvent evt = position_queue.get();
micallef25 5:e0d8e5e922f1 264 assert(evt.status == osEventMessage);
micallef25 2:f10d6fecb345 265
micallef25 5:e0d8e5e922f1 266 //
micallef25 5:e0d8e5e922f1 267 position_msg_t *message = (position_msg_t*)evt.value.p;
micallef25 5:e0d8e5e922f1 268 assert(message != NULL);
micallef25 5:e0d8e5e922f1 269 send_position_msg(mqtt_message,message);
micallef25 5:e0d8e5e922f1 270 }
MannyK 4:64c6fc70ddb7 271
micallef25 5:e0d8e5e922f1 272 if (!road_to_network_queue.empty())
micallef25 5:e0d8e5e922f1 273 {
micallef25 5:e0d8e5e922f1 274
micallef25 5:e0d8e5e922f1 275 //#ifdef DEBUG_MQTT
micallef25 5:e0d8e5e922f1 276 // mqtt_pc.printf(MQTT_TAG"getting road info\r\n");
micallef25 5:e0d8e5e922f1 277 //#endif
micallef25 5:e0d8e5e922f1 278 osEvent revt = road_to_network_queue.get();
MannyK 4:64c6fc70ddb7 279 assert(revt.status == osEventMessage);
micallef25 5:e0d8e5e922f1 280
micallef25 5:e0d8e5e922f1 281 //
MannyK 4:64c6fc70ddb7 282 road_msg_t *road_msg = (road_msg_t*)revt.value.p;
micallef25 5:e0d8e5e922f1 283 assert(road_msg != NULL);
micallef25 5:e0d8e5e922f1 284 send_road_msg(mqtt_message,road_msg);
MannyK 4:64c6fc70ddb7 285 }
micallef25 5:e0d8e5e922f1 286
micallef25 5:e0d8e5e922f1 287 client->yield(10);
micallef25 2:f10d6fecb345 288 }
micallef25 2:f10d6fecb345 289 }
micallef25 2:f10d6fecb345 290
micallef25 2:f10d6fecb345 291 //
micallef25 2:f10d6fecb345 292 // clean up goes here
micallef25 2:f10d6fecb345 293 void mqtt::shutdown_network()
micallef25 2:f10d6fecb345 294 {
micallef25 2:f10d6fecb345 295 //
micallef25 5:e0d8e5e922f1 296 mqtt_pc.printf(MQTT_TAG"shutting down mbed "DELIM);
micallef25 2:f10d6fecb345 297
micallef25 2:f10d6fecb345 298 //
micallef25 2:f10d6fecb345 299 client->disconnect();
micallef25 2:f10d6fecb345 300 delete new_network;
micallef25 2:f10d6fecb345 301
micallef25 2:f10d6fecb345 302 //
micallef25 2:f10d6fecb345 303 thread->terminate();
micallef25 2:f10d6fecb345 304 delete thread;
micallef25 2:f10d6fecb345 305 }
micallef25 2:f10d6fecb345 306
micallef25 2:f10d6fecb345 307 // launch network manager thread
micallef25 2:f10d6fecb345 308 // responsible for pub sub
micallef25 2:f10d6fecb345 309 void mqtt::setup_network()
micallef25 2:f10d6fecb345 310 {
micallef25 2:f10d6fecb345 311 // bring up network if anything bad happens we will assert and crash
micallef25 2:f10d6fecb345 312 bringup_network();
micallef25 2:f10d6fecb345 313
micallef25 2:f10d6fecb345 314 // create a new thread thats sole purpose in life is to call client yield
micallef25 2:f10d6fecb345 315 // what a sad life for a thread
micallef25 2:f10d6fecb345 316 // calling yield is necessary as we will not get any messages for the mqtt
micallef25 2:f10d6fecb345 317 // serverwithout it
micallef25 2:f10d6fecb345 318 thread = new Thread();
micallef25 2:f10d6fecb345 319 assert(thread != NULL);
micallef25 2:f10d6fecb345 320 thread->start( callback(this,&mqtt::manage_network) );
micallef25 2:f10d6fecb345 321
micallef25 2:f10d6fecb345 322 }