Emanuel Kuflik / Mbed OS smat_controller

Dependencies:   MQTT

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers mqtt.cpp Source File

mqtt.cpp

00001 /* mbed Microcontroller Library
00002  * Copyright (c) 2018 ARM Limited
00003  * SPDX-License-Identifier: Apache-2.0
00004  */
00005 
00006 #include "mbed.h"
00007 #include "MQTTNetwork.h"
00008 #include "MQTTClient.h"
00009 #include "MQTTmbed.h"
00010 #include "mqtt.h"
00011 #include <assert.h>
00012 #include "AccCar.h"
00013 
00014 // topics interested in 
00015 #define POSITION_TOPIC "MQTT_Position0x25"
00016 #define ROAD_TOPIC "MQTT_Road0x25"
00017 #define CONTROL_TOPIC "MQTT_Control0x25"
00018 
00019 //#define DEBUG_MQTT
00020 #define MQTT_TAG "[MQTT] "
00021  
00022 Serial mqtt_pc (USBTX, USBRX);
00023 
00024 
00025 // empty constructor
00026 mqtt::mqtt()
00027 {
00028     
00029 }
00030 
00031 /*
00032     This function sets up the wifi module and connects it to the SSID 
00033     configured in the configuration file. It also prints out the MAC address 
00034     of the module, which is needed if you are trying to use campus wifi.
00035     This function returns NULL if there are any issues.
00036 */
00037 WiFiInterface* mqtt::setup_wifi() {
00038     // Get a handle to the WiFi module
00039     WiFiInterface* wifi = WiFiInterface::get_default_instance();
00040     
00041     // Connect the module to the wifi, based on the SSID and password 
00042     // specified in the mbed_app.json configuration file
00043     // If you are using AirPennNet-Device, this will not succeed until the MAC
00044     // address (printed shortly after this) is registered
00045     mqtt_pc.printf(MQTT_TAG"Connecting to wifi "DELIM);
00046     int rc = wifi->connect(MBED_CONF_APP_WIFI_SSID, MBED_CONF_APP_WIFI_PASSWORD, NSAPI_SECURITY_WPA_WPA2);
00047     
00048     // Print out the MAC address of the wifi module. The MAC address is 
00049     // needed to register the device with AirPennNet-Device, so that you
00050     // can use the campus wifi
00051     mqtt_pc.printf(MQTT_TAG"MAC Address: ");
00052     mqtt_pc.printf(wifi->get_mac_address());
00053     mqtt_pc.printf("\r\n");
00054     
00055     if (rc != 0) {
00056         mqtt_pc.printf(MQTT_TAG"Problem connecting to wifi "DELIM);   
00057         return NULL;
00058     } else {
00059         mqtt_pc.printf(MQTT_TAG"Wifi connected "DELIM);  
00060     }
00061       
00062     return wifi;
00063 }
00064 
00065 /*
00066     This function creates the MQTT client and connects it to the MQTT broker
00067     that we have setup for the course. If there are any errors with the 
00068     connection, it will return NULL
00069 */
00070 MQTT::Client<MQTTNetwork, Countdown>* mqtt::setup_mqtt(MQTTNetwork& network) {
00071     // the hostname and port point to a Google Cloud MQTT server we setup for
00072     // this project
00073     const char* hostname = "34.68.206.11";
00074     int port = 1883;
00075     
00076     // Create the underlying network connection to the MQTT server
00077     mqtt_pc.printf(MQTT_TAG"Connecting to %s:%d "DELIM, hostname, port);
00078     int rc = network.connect(hostname, port);
00079     if (rc != 0) {
00080         mqtt_pc.printf(MQTT_TAG"There was an error with the TCP connect: %d "DELIM, rc);
00081         return NULL;
00082     }
00083     
00084     mqtt_pc.printf(MQTT_TAG"Connected to %s:%d "DELIM, hostname, port);
00085         
00086     // Connect the MQTT client to the server
00087     MQTT::Client<MQTTNetwork, Countdown>* temp_client = new MQTT::Client<MQTTNetwork, Countdown>(network);
00088     rc = temp_client->connect();
00089     if (rc != 0) {
00090         mqtt_pc.printf(MQTT_TAG"There was an error with the MQTT connect: %d "DELIM, rc);
00091         return NULL;
00092     }
00093     
00094     mqtt_pc.printf(MQTT_TAG"MQTT connect successful! "DELIM);
00095     
00096     return temp_client;
00097 }
00098 
00099 /*
00100     This function is the callback for when a message is received from the 
00101     MQTT broker. You register different callback functions with different
00102     topic subscriptions
00103 */
00104 void mqtt::control_message_arrived(MQTT::MessageData& md)
00105 {
00106     MQTT::Message &message = md.message;
00107     // make message receiver responsible for freeeing
00108     control_msg_t* msg = new control_msg_t;
00109     assert(msg != NULL && message.payloadlen == sizeof(control_msg_t));
00110     
00111     // copy to our new pointer for some reason just taking the payload ptr is bad for mbed?
00112     memcpy(msg,message.payload,message.payloadlen);
00113     
00114     if( msg->road_id == mqtt::instance()->mqtt_id )
00115     {
00116 #ifdef DEBUG_MQTT  
00117     //mqtt_pc.printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
00118     mqtt_pc.printf(MQTT_TAG"rcvd control id %d "DELIM,msg->road_id);
00119 #endif 
00120         // add our message to the queue no fucking clue what happens internally to
00121         // the message memory thanks mbed os 5 documentation
00122         mqtt::instance()->add_to_control_queue(msg->car_id,msg);
00123     }
00124     
00125     else
00126     {
00127 #ifdef DEBUG_MQTT 
00128        mqtt_pc.printf(MQTT_TAG"ignoring control msg "DELIM,msg->road_id);
00129 #endif
00130        delete msg;
00131     }
00132 }
00133 
00134 void mqtt::road_message_arrived(MQTT::MessageData& md)
00135 {
00136     MQTT::Message &message = md.message;
00137     // make message... I will endeavor to free this one the receiver side 
00138     // from example sender allocates and receiver frees so well stick 
00139     // with that paradigm
00140     road_msg_t* msg = new road_msg_t;
00141     assert(msg != NULL && message.payloadlen == sizeof(road_msg_t));
00142     
00143     // copy to our new pointer for some reason just taking the payload ptr is bad for mbed?
00144     memcpy(msg,message.payload,message.payloadlen);
00145     
00146     if( msg->road_id != mqtt::instance()->mqtt_id )
00147     {
00148 #ifdef DEBUG_MQTT  
00149     mqtt_pc.printf(MQTT_TAG"rcvd road %d "DELIM,msg->road_clock);
00150 #endif
00151         // add our message to the queue no fucking clue what happens internally to
00152         // the message memory thanks mbed os 5 documentation
00153         mqtt::instance()->add_to_network_to_road_queue(msg);
00154     }
00155     
00156     else{
00157 #ifdef DEBUG_MQTT
00158        mqtt_pc.printf(MQTT_TAG"ignoring road message %d "DELIM,msg->road_id);
00159 #endif
00160        delete msg;
00161     }
00162     
00163 }
00164 
00165 /*
00166     This function sends a message to the test topic.
00167 */
00168 int mqtt::send_position_msg(  MQTT::Message& message, position_msg_t* msg ) 
00169 {
00170     assert(msg != NULL && client != NULL);
00171     
00172 #ifdef DEBUG_MQTT
00173     mqtt_pc.printf(MQTT_TAG"sending position msg %d "DELIM,msg->road_id);
00174 #endif
00175     
00176     MQTT::Message tmessage;
00177 
00178     tmessage.payload = (void*)msg;
00179     tmessage.payloadlen = sizeof(position_msg_t); 
00180     tmessage.qos = MQTT::QOS1;
00181 
00182     int rc = client->publish(POSITION_TOPIC,tmessage);
00183     assert(rc == 0);  
00184 
00185     return 0;
00186 }
00187 
00188 int mqtt::send_road_msg(  MQTT::Message& message, road_msg_t* msg ) 
00189 {
00190     assert(msg != NULL && client != NULL);
00191     
00192 #ifdef DEBUG_MQTT
00193     mqtt_pc.printf(MQTT_TAG"sending road msg %d "DELIM,msg->road_clock);
00194 #endif
00195     MQTT::Message tmessage;
00196 
00197     tmessage.payload = (void*)msg;
00198     tmessage.payloadlen = sizeof(road_msg_t); 
00199     tmessage.qos = MQTT::QOS1;
00200 
00201     int rc = client->publish(ROAD_TOPIC,tmessage);
00202     assert(rc == 0);  
00203     
00204     return 0;
00205 }
00206 
00207 void mqtt::bringup_network() {
00208     
00209     // 
00210     WiFiInterface* wifi = setup_wifi();
00211     assert(wifi != NULL);
00212     
00213     // Create the network object needed by the MQTT client
00214     new_network = new MQTTNetwork(wifi);
00215     
00216     // get client
00217     client = setup_mqtt(*new_network);
00218     assert(client != NULL);
00219 
00220     // Subscribe to a topic / register a callback  
00221     mqtt_pc.printf(MQTT_TAG"Subscribing to topic %s "DELIM, CONTROL_TOPIC);
00222     int rc = client->subscribe(CONTROL_TOPIC, MQTT::QOS1, control_message_arrived);
00223     assert(rc == 0);
00224     
00225     // Subscribe to a topic / register a callback  
00226     mqtt_pc.printf(MQTT_TAG"Subscribing to topic %s "DELIM, ROAD_TOPIC);
00227     rc = client->subscribe(ROAD_TOPIC, MQTT::QOS1, road_message_arrived);
00228     assert(rc == 0);
00229     
00230     // make a road based of mqtt id
00231     mqtt_id = 0;
00232     if(strcmp(wifi->get_mac_address(),"2c:3a:e8:0b:8e:77") == 0){
00233         mqtt_id = 0;
00234     }
00235     else{
00236         mqtt_id = 1;
00237     }
00238     
00239     mqtt_pc.printf(MQTT_TAG"Subscribed, Setup complete! road %d "DELIM,mqtt_id);
00240 }
00241 
00242 
00243 // manage callbacks from mqtt
00244 void mqtt::manage_network()
00245 {
00246     MQTT::Message mqtt_message;
00247     while(true)
00248     {
00249         while(!position_queue.empty()) 
00250         {
00251             
00252 //#ifdef DEBUG_MQTT
00253 //    mqtt_pc.printf(MQTT_TAG"getting position msg\r\n");
00254 //#endif           
00255             //
00256             osEvent evt = position_queue.get();
00257             assert(evt.status == osEventMessage); 
00258         
00259             //
00260             position_msg_t *message = (position_msg_t*)evt.value.p;
00261             assert(message != NULL);
00262             send_position_msg(mqtt_message,message);
00263         }
00264         
00265         if (!road_to_network_queue.empty())
00266         {
00267             
00268 //#ifdef DEBUG_MQTT
00269 //    mqtt_pc.printf(MQTT_TAG"getting road info\r\n");
00270 //#endif
00271             osEvent revt = road_to_network_queue.get();
00272             assert(revt.status == osEventMessage);
00273             
00274             //
00275             road_msg_t *road_msg = (road_msg_t*)revt.value.p;
00276             assert(road_msg != NULL);
00277             send_road_msg(mqtt_message,road_msg);
00278             delete road_msg;
00279         }
00280 
00281       client->yield(10);
00282     }
00283 }
00284 
00285 //
00286 // clean up goes here
00287 void mqtt::shutdown_network()
00288 {    
00289     //
00290     mqtt_pc.printf(MQTT_TAG"shutting down mbed "DELIM);
00291     
00292     //
00293     client->disconnect();
00294     delete new_network;
00295     
00296     //
00297     thread->terminate();
00298     delete thread;
00299 }
00300 
00301 // launch network manager thread
00302 // responsible for pub sub 
00303 void mqtt::setup_network()
00304 {
00305     // bring up network if anything bad happens we will assert and crash
00306     bringup_network();
00307     
00308     // create a new thread thats sole purpose in life is to call client yield
00309     // what a sad life for a thread
00310     // calling yield is necessary as we will not get any messages for the mqtt 
00311     // serverwithout it
00312     thread = new Thread();
00313     assert(thread != NULL);
00314     thread->start( callback(this,&mqtt::manage_network) );
00315     
00316 }
00317 
00318 // before the next run make sure
00319 // that everything is cleaned up properly
00320 // the control queue may have one entry due to 
00321 // the last iteration ending
00322 void mqtt::clear_queues()
00323 {
00324     while(!position_queue.empty())
00325     {
00326         osEvent revt = position_queue.get();
00327         assert(revt.status == osEventMessage);
00328         //printf("cleaned up queue\n");
00329             //
00330         position_msg_t *msg = (position_msg_t*)revt.value.p;
00331         delete msg;     
00332     }
00333     for(int i = 0; i < 5; i++)
00334     {
00335         while(!control_queue[i].empty())
00336         {
00337             osEvent revt = control_queue[i].get();
00338             assert(revt.status == osEventMessage);
00339             //printf("cleaned up control queue\n");
00340             //
00341             control_msg_t *msg = (control_msg_t*)revt.value.p;
00342             delete msg;     
00343         }  
00344     }
00345        
00346     while(!network_to_road_queue.empty())
00347     {
00348         osEvent evt = network_to_road_queue.get();
00349         assert(evt.status == osEventMessage);
00350         road_msg_t *message = (road_msg_t*)evt.value.p;
00351         delete message;     
00352     }  
00353     while(!road_to_network_queue.empty())
00354     {
00355         osEvent evt = road_to_network_queue.get();
00356         assert(evt.status == osEventMessage);
00357         road_msg_t *message = (road_msg_t*)evt.value.p;
00358         delete message;     
00359     }  
00360 }