Dependencies:   MQTT

Committer:
micallef25
Date:
Fri Dec 06 15:47:37 2019 +0000
Revision:
2:f10d6fecb345
Child:
4:64c6fc70ddb7
basic communication with mqtt and car threads

Who changed what in which revision?

UserRevisionLine numberNew contents of line
micallef25 2:f10d6fecb345 1 /* mbed Microcontroller Library
micallef25 2:f10d6fecb345 2 * Copyright (c) 2018 ARM Limited
micallef25 2:f10d6fecb345 3 * SPDX-License-Identifier: Apache-2.0
micallef25 2:f10d6fecb345 4 */
micallef25 2:f10d6fecb345 5
micallef25 2:f10d6fecb345 6 #include "mbed.h"
micallef25 2:f10d6fecb345 7 #include "MQTTNetwork.h"
micallef25 2:f10d6fecb345 8 #include "MQTTClient.h"
micallef25 2:f10d6fecb345 9 #include "MQTTmbed.h"
micallef25 2:f10d6fecb345 10 #include "mqtt.h"
micallef25 2:f10d6fecb345 11 #include <assert.h>
micallef25 2:f10d6fecb345 12 #include "AccCar.h"
micallef25 2:f10d6fecb345 13
micallef25 2:f10d6fecb345 14 // topics interested in
micallef25 2:f10d6fecb345 15 #define POSITION_TOPIC "MQTT_Position0x25"
micallef25 2:f10d6fecb345 16 #define CONTROL_TOPIC "MQTT_Control0x25"
micallef25 2:f10d6fecb345 17
micallef25 2:f10d6fecb345 18 #define DEBUG_MQTT
micallef25 2:f10d6fecb345 19
micallef25 2:f10d6fecb345 20 Serial mqtt_pc (USBTX, USBRX);
micallef25 2:f10d6fecb345 21
micallef25 2:f10d6fecb345 22
micallef25 2:f10d6fecb345 23 // empty constructor
micallef25 2:f10d6fecb345 24 mqtt::mqtt()
micallef25 2:f10d6fecb345 25 {
micallef25 2:f10d6fecb345 26
micallef25 2:f10d6fecb345 27 }
micallef25 2:f10d6fecb345 28
micallef25 2:f10d6fecb345 29 /*
micallef25 2:f10d6fecb345 30 This function sets up the wifi module and connects it to the SSID
micallef25 2:f10d6fecb345 31 configured in the configuration file. It also prints out the MAC address
micallef25 2:f10d6fecb345 32 of the module, which is needed if you are trying to use campus wifi.
micallef25 2:f10d6fecb345 33 This function returns NULL if there are any issues.
micallef25 2:f10d6fecb345 34 */
micallef25 2:f10d6fecb345 35 WiFiInterface* mqtt::setup_wifi() {
micallef25 2:f10d6fecb345 36 // Get a handle to the WiFi module
micallef25 2:f10d6fecb345 37 WiFiInterface* wifi = WiFiInterface::get_default_instance();
micallef25 2:f10d6fecb345 38
micallef25 2:f10d6fecb345 39 // Connect the module to the wifi, based on the SSID and password
micallef25 2:f10d6fecb345 40 // specified in the mbed_app.json configuration file
micallef25 2:f10d6fecb345 41 // If you are using AirPennNet-Device, this will not succeed until the MAC
micallef25 2:f10d6fecb345 42 // address (printed shortly after this) is registered
micallef25 2:f10d6fecb345 43 mqtt_pc.printf("Connecting to wifi\r\n");
micallef25 2:f10d6fecb345 44 int rc = wifi->connect(MBED_CONF_APP_WIFI_SSID, MBED_CONF_APP_WIFI_PASSWORD, NSAPI_SECURITY_WPA_WPA2);
micallef25 2:f10d6fecb345 45
micallef25 2:f10d6fecb345 46 // Print out the MAC address of the wifi module. The MAC address is
micallef25 2:f10d6fecb345 47 // needed to register the device with AirPennNet-Device, so that you
micallef25 2:f10d6fecb345 48 // can use the campus wifi
micallef25 2:f10d6fecb345 49 mqtt_pc.printf("MAC Address: ");
micallef25 2:f10d6fecb345 50 mqtt_pc.printf(wifi->get_mac_address());
micallef25 2:f10d6fecb345 51 mqtt_pc.printf("\r\n");
micallef25 2:f10d6fecb345 52
micallef25 2:f10d6fecb345 53 if (rc != 0) {
micallef25 2:f10d6fecb345 54 mqtt_pc.printf("Problem connecting to wifi\r\n");
micallef25 2:f10d6fecb345 55 return NULL;
micallef25 2:f10d6fecb345 56 } else {
micallef25 2:f10d6fecb345 57 mqtt_pc.printf("Wifi connected\r\n");
micallef25 2:f10d6fecb345 58 }
micallef25 2:f10d6fecb345 59
micallef25 2:f10d6fecb345 60 return wifi;
micallef25 2:f10d6fecb345 61 }
micallef25 2:f10d6fecb345 62
micallef25 2:f10d6fecb345 63 /*
micallef25 2:f10d6fecb345 64 This function creates the MQTT client and connects it to the MQTT broker
micallef25 2:f10d6fecb345 65 that we have setup for the course. If there are any errors with the
micallef25 2:f10d6fecb345 66 connection, it will return NULL
micallef25 2:f10d6fecb345 67 */
micallef25 2:f10d6fecb345 68 MQTT::Client<MQTTNetwork, Countdown>* mqtt::setup_mqtt(MQTTNetwork& network) {
micallef25 2:f10d6fecb345 69 // the hostname and port point to a Google Cloud MQTT server we setup for
micallef25 2:f10d6fecb345 70 // this project
micallef25 2:f10d6fecb345 71 const char* hostname = "34.68.206.11";
micallef25 2:f10d6fecb345 72 int port = 1883;
micallef25 2:f10d6fecb345 73
micallef25 2:f10d6fecb345 74 // Create the underlying network connection to the MQTT server
micallef25 2:f10d6fecb345 75 mqtt_pc.printf("Connecting to %s:%d\r\n", hostname, port);
micallef25 2:f10d6fecb345 76 int rc = network.connect(hostname, port);
micallef25 2:f10d6fecb345 77 if (rc != 0) {
micallef25 2:f10d6fecb345 78 mqtt_pc.printf("There was an error with the TCP connect: %d\r\n", rc);
micallef25 2:f10d6fecb345 79 return NULL;
micallef25 2:f10d6fecb345 80 }
micallef25 2:f10d6fecb345 81
micallef25 2:f10d6fecb345 82 mqtt_pc.printf("Connected to %s:%d\r\n", hostname, port);
micallef25 2:f10d6fecb345 83
micallef25 2:f10d6fecb345 84 // Connect the MQTT client to the server
micallef25 2:f10d6fecb345 85 MQTT::Client<MQTTNetwork, Countdown>* temp_client = new MQTT::Client<MQTTNetwork, Countdown>(network);
micallef25 2:f10d6fecb345 86 rc = temp_client->connect();
micallef25 2:f10d6fecb345 87 if (rc != 0) {
micallef25 2:f10d6fecb345 88 mqtt_pc.printf("There was an error with the MQTT connect: %d\r\n", rc);
micallef25 2:f10d6fecb345 89 return NULL;
micallef25 2:f10d6fecb345 90 }
micallef25 2:f10d6fecb345 91
micallef25 2:f10d6fecb345 92 mqtt_pc.printf("MQTT connect successful!\r\n");
micallef25 2:f10d6fecb345 93
micallef25 2:f10d6fecb345 94 return temp_client;
micallef25 2:f10d6fecb345 95 }
micallef25 2:f10d6fecb345 96
micallef25 2:f10d6fecb345 97 /*
micallef25 2:f10d6fecb345 98 This function is the callback for when a message is received from the
micallef25 2:f10d6fecb345 99 MQTT broker. You register different callback functions with different
micallef25 2:f10d6fecb345 100 topic subscriptions
micallef25 2:f10d6fecb345 101 */
micallef25 2:f10d6fecb345 102 void mqtt::control_message_arrived(MQTT::MessageData& md)
micallef25 2:f10d6fecb345 103 {
micallef25 2:f10d6fecb345 104 MQTT::Message &message = md.message;
micallef25 2:f10d6fecb345 105 // make message
micallef25 2:f10d6fecb345 106 control_msg_t* msg = new control_msg_t;
micallef25 2:f10d6fecb345 107 assert(msg != NULL && message.payloadlen == sizeof(control_msg_t));
micallef25 2:f10d6fecb345 108
micallef25 2:f10d6fecb345 109 // copy to our new pointer for some reason just taking the payload ptr is bad for mbed?
micallef25 2:f10d6fecb345 110 memcpy(msg,message.payload,message.payloadlen);
micallef25 2:f10d6fecb345 111
micallef25 2:f10d6fecb345 112 #ifdef DEBUG_MQTT
micallef25 2:f10d6fecb345 113 //mqtt_pc.printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
micallef25 2:f10d6fecb345 114 mqtt_pc.printf("speed %d id %d \r\n", msg->speed, msg->car_id);
micallef25 2:f10d6fecb345 115 #endif
micallef25 2:f10d6fecb345 116
micallef25 2:f10d6fecb345 117 // add our message to the queue no fucking clue what happens internally to
micallef25 2:f10d6fecb345 118 // the message memory
micallef25 2:f10d6fecb345 119 mqtt::instance()->add_to_control_queue(msg->car_id,msg);
micallef25 2:f10d6fecb345 120
micallef25 2:f10d6fecb345 121 // free message
micallef25 2:f10d6fecb345 122 delete msg;
micallef25 2:f10d6fecb345 123 }
micallef25 2:f10d6fecb345 124
micallef25 2:f10d6fecb345 125 /*
micallef25 2:f10d6fecb345 126 This function sends a message to the test topic.
micallef25 2:f10d6fecb345 127 */
micallef25 2:f10d6fecb345 128 int mqtt::send_position_msg( position_msg_t* msg )
micallef25 2:f10d6fecb345 129 {
micallef25 2:f10d6fecb345 130 assert(msg != NULL && client != NULL);
micallef25 2:f10d6fecb345 131
micallef25 2:f10d6fecb345 132 MQTT::Message message;
micallef25 2:f10d6fecb345 133
micallef25 2:f10d6fecb345 134 // might be safest to memcopy thius seems to work
micallef25 2:f10d6fecb345 135 //memcpy(message.payload,msg,sizeof(position_msg_t));
micallef25 2:f10d6fecb345 136 message.payload = (void*)msg;
micallef25 2:f10d6fecb345 137 message.payloadlen = sizeof(position_msg_t);
micallef25 2:f10d6fecb345 138 message.qos = MQTT::QOS1;
micallef25 2:f10d6fecb345 139
micallef25 2:f10d6fecb345 140 #ifdef DEBUG_MQTT
micallef25 2:f10d6fecb345 141 mqtt_pc.printf("Sending a message!\r\n");
micallef25 2:f10d6fecb345 142 #endif
micallef25 2:f10d6fecb345 143
micallef25 2:f10d6fecb345 144 int rc = client->publish(POSITION_TOPIC,message);
micallef25 2:f10d6fecb345 145 assert(rc == 0);
micallef25 2:f10d6fecb345 146
micallef25 2:f10d6fecb345 147 client->yield(1000);
micallef25 2:f10d6fecb345 148 return 0;
micallef25 2:f10d6fecb345 149 }
micallef25 2:f10d6fecb345 150
micallef25 2:f10d6fecb345 151 void mqtt::bringup_network() {
micallef25 2:f10d6fecb345 152
micallef25 2:f10d6fecb345 153 //
micallef25 2:f10d6fecb345 154 WiFiInterface* wifi = setup_wifi();
micallef25 2:f10d6fecb345 155 assert(wifi != NULL);
micallef25 2:f10d6fecb345 156
micallef25 2:f10d6fecb345 157 // Create the network object needed by the MQTT client
micallef25 2:f10d6fecb345 158 new_network = new MQTTNetwork(wifi);
micallef25 2:f10d6fecb345 159
micallef25 2:f10d6fecb345 160 // get client
micallef25 2:f10d6fecb345 161 client = setup_mqtt(*new_network);
micallef25 2:f10d6fecb345 162 assert(client != NULL);
micallef25 2:f10d6fecb345 163
micallef25 2:f10d6fecb345 164 // Subscribe to a topic / register a callback
micallef25 2:f10d6fecb345 165 mqtt_pc.printf("Subscribing to topic %s\r\n", CONTROL_TOPIC);
micallef25 2:f10d6fecb345 166 int rc = client->subscribe(CONTROL_TOPIC, MQTT::QOS1, control_message_arrived);
micallef25 2:f10d6fecb345 167 assert(rc == 0);
micallef25 2:f10d6fecb345 168
micallef25 2:f10d6fecb345 169 // make a road based of mqtt id
micallef25 2:f10d6fecb345 170 mqtt_id = 0;
micallef25 2:f10d6fecb345 171 if(strcmp(wifi->get_mac_address(),"2c:3a:e8:0b:75:06") == 0){
micallef25 2:f10d6fecb345 172 mqtt_id = 0;
micallef25 2:f10d6fecb345 173 }
micallef25 2:f10d6fecb345 174 else{
micallef25 2:f10d6fecb345 175 mqtt_id = 1;
micallef25 2:f10d6fecb345 176 }
micallef25 2:f10d6fecb345 177
micallef25 2:f10d6fecb345 178 mqtt_pc.printf("Subscribed, Setup complete!\r\n");
micallef25 2:f10d6fecb345 179 }
micallef25 2:f10d6fecb345 180
micallef25 2:f10d6fecb345 181
micallef25 2:f10d6fecb345 182 // manage callbacks from mqtt
micallef25 2:f10d6fecb345 183 void mqtt::manage_network()
micallef25 2:f10d6fecb345 184 {
micallef25 2:f10d6fecb345 185 while(true)
micallef25 2:f10d6fecb345 186 {
micallef25 2:f10d6fecb345 187 //
micallef25 2:f10d6fecb345 188 osEvent evt = position_queue.get();
micallef25 2:f10d6fecb345 189 assert(evt.status == osEventMessage);
micallef25 2:f10d6fecb345 190
micallef25 2:f10d6fecb345 191 //
micallef25 2:f10d6fecb345 192 position_msg_t *message = (position_msg_t*)evt.value.p;
micallef25 2:f10d6fecb345 193 send_position_msg(message);
micallef25 2:f10d6fecb345 194
micallef25 2:f10d6fecb345 195 // client->yield(100);
micallef25 2:f10d6fecb345 196 }
micallef25 2:f10d6fecb345 197 }
micallef25 2:f10d6fecb345 198
micallef25 2:f10d6fecb345 199 //
micallef25 2:f10d6fecb345 200 // clean up goes here
micallef25 2:f10d6fecb345 201 void mqtt::shutdown_network()
micallef25 2:f10d6fecb345 202 {
micallef25 2:f10d6fecb345 203 //
micallef25 2:f10d6fecb345 204 mqtt_pc.printf("shutting down mbed\r\n");
micallef25 2:f10d6fecb345 205
micallef25 2:f10d6fecb345 206 //
micallef25 2:f10d6fecb345 207 client->disconnect();
micallef25 2:f10d6fecb345 208 delete new_network;
micallef25 2:f10d6fecb345 209
micallef25 2:f10d6fecb345 210 //
micallef25 2:f10d6fecb345 211 thread->terminate();
micallef25 2:f10d6fecb345 212 delete thread;
micallef25 2:f10d6fecb345 213 }
micallef25 2:f10d6fecb345 214
micallef25 2:f10d6fecb345 215 // launch network manager thread
micallef25 2:f10d6fecb345 216 // responsible for pub sub
micallef25 2:f10d6fecb345 217 void mqtt::setup_network()
micallef25 2:f10d6fecb345 218 {
micallef25 2:f10d6fecb345 219 // bring up network if anything bad happens we will assert and crash
micallef25 2:f10d6fecb345 220 bringup_network();
micallef25 2:f10d6fecb345 221
micallef25 2:f10d6fecb345 222 // create a new thread thats sole purpose in life is to call client yield
micallef25 2:f10d6fecb345 223 // what a sad life for a thread
micallef25 2:f10d6fecb345 224 // calling yield is necessary as we will not get any messages for the mqtt
micallef25 2:f10d6fecb345 225 // serverwithout it
micallef25 2:f10d6fecb345 226 thread = new Thread();
micallef25 2:f10d6fecb345 227 assert(thread != NULL);
micallef25 2:f10d6fecb345 228 thread->start( callback(this,&mqtt::manage_network) );
micallef25 2:f10d6fecb345 229
micallef25 2:f10d6fecb345 230 }