Emanuel Kuflik / Mbed OS smat_controller

Dependencies:   MQTT

mqtt.cpp

Committer:
micallef25
Date:
2019-12-10
Revision:
5:e0d8e5e922f1
Parent:
4:64c6fc70ddb7
Child:
6:6cb13ac483e0

File content as of revision 5:e0d8e5e922f1:

/* mbed Microcontroller Library
 * Copyright (c) 2018 ARM Limited
 * SPDX-License-Identifier: Apache-2.0
 */

#include "mbed.h"
#include "MQTTNetwork.h"
#include "MQTTClient.h"
#include "MQTTmbed.h"
#include "mqtt.h"
#include <assert.h>
#include "AccCar.h"

// topics interested in 
#define POSITION_TOPIC "MQTT_Position0x25"
#define ROAD_TOPIC "MQTT_Road0x25"
#define CONTROL_TOPIC "MQTT_Control0x25"

//#define DEBUG_MQTT
#define MQTT_TAG "[MQTT] "
 
Serial mqtt_pc (USBTX, USBRX);


// empty constructor
mqtt::mqtt()
{
    
}

/*
    This function sets up the wifi module and connects it to the SSID 
    configured in the configuration file. It also prints out the MAC address 
    of the module, which is needed if you are trying to use campus wifi.
    This function returns NULL if there are any issues.
*/
WiFiInterface* mqtt::setup_wifi() {
    // Get a handle to the WiFi module
    WiFiInterface* wifi = WiFiInterface::get_default_instance();
    
    // Connect the module to the wifi, based on the SSID and password 
    // specified in the mbed_app.json configuration file
    // If you are using AirPennNet-Device, this will not succeed until the MAC
    // address (printed shortly after this) is registered
    mqtt_pc.printf(MQTT_TAG"Connecting to wifi "DELIM);
    int rc = wifi->connect(MBED_CONF_APP_WIFI_SSID, MBED_CONF_APP_WIFI_PASSWORD, NSAPI_SECURITY_WPA_WPA2);
    
    // Print out the MAC address of the wifi module. The MAC address is 
    // needed to register the device with AirPennNet-Device, so that you
    // can use the campus wifi
    mqtt_pc.printf(MQTT_TAG"MAC Address: ");
    mqtt_pc.printf(wifi->get_mac_address());
    mqtt_pc.printf("\r\n");
    
    if (rc != 0) {
        mqtt_pc.printf(MQTT_TAG"Problem connecting to wifi "DELIM);   
        return NULL;
    } else {
        mqtt_pc.printf(MQTT_TAG"Wifi connected "DELIM);  
    }
      
    return wifi;
}

/*
    This function creates the MQTT client and connects it to the MQTT broker
    that we have setup for the course. If there are any errors with the 
    connection, it will return NULL
*/
MQTT::Client<MQTTNetwork, Countdown>* mqtt::setup_mqtt(MQTTNetwork& network) {
    // the hostname and port point to a Google Cloud MQTT server we setup for
    // this project
    const char* hostname = "34.68.206.11";
    int port = 1883;
    
    // Create the underlying network connection to the MQTT server
    mqtt_pc.printf(MQTT_TAG"Connecting to %s:%d "DELIM, hostname, port);
    int rc = network.connect(hostname, port);
    if (rc != 0) {
        mqtt_pc.printf(MQTT_TAG"There was an error with the TCP connect: %d "DELIM, rc);
        return NULL;
    }
    
    mqtt_pc.printf(MQTT_TAG"Connected to %s:%d "DELIM, hostname, port);
        
    // Connect the MQTT client to the server
    MQTT::Client<MQTTNetwork, Countdown>* temp_client = new MQTT::Client<MQTTNetwork, Countdown>(network);
    rc = temp_client->connect();
    if (rc != 0) {
        mqtt_pc.printf(MQTT_TAG"There was an error with the MQTT connect: %d "DELIM, rc);
        return NULL;
    }
    
    mqtt_pc.printf(MQTT_TAG"MQTT connect successful! "DELIM);
    
    return temp_client;
}

/*
    This function is the callback for when a message is received from the 
    MQTT broker. You register different callback functions with different
    topic subscriptions
*/
void mqtt::control_message_arrived(MQTT::MessageData& md)
{
    MQTT::Message &message = md.message;
    // make message receiver responsible for freeeing
    control_msg_t* msg = new control_msg_t;
    assert(msg != NULL && message.payloadlen == sizeof(control_msg_t));
    
    // copy to our new pointer for some reason just taking the payload ptr is bad for mbed?
    memcpy(msg,message.payload,message.payloadlen);
    
    if( msg->road_id == mqtt::instance()->mqtt_id )
    {
#ifdef DEBUG_MQTT  
    //mqtt_pc.printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
    mqtt_pc.printf(MQTT_TAG"rcvd control id %d "DELIM,msg->road_id);
#endif 
        // add our message to the queue no fucking clue what happens internally to
        // the message memory thanks mbed os 5 documentation
        mqtt::instance()->add_to_control_queue(msg->car_id,msg);
    }
    
    else
    {
#ifdef DEBUG_MQTT 
       mqtt_pc.printf(MQTT_TAG"ignoring control msg "DELIM,msg->road_id);
#endif
       delete msg;
    }
}

void mqtt::road_message_arrived(MQTT::MessageData& md)
{
    MQTT::Message &message = md.message;
    // make message... I will endeavor to free this one the receiver side 
    // from example sender allocates and receiver frees so well stick 
    // with that paradigm
    road_msg_t* msg = new road_msg_t;
    assert(msg != NULL && message.payloadlen == sizeof(road_msg_t));
    
    // copy to our new pointer for some reason just taking the payload ptr is bad for mbed?
    memcpy(msg,message.payload,message.payloadlen);
    
    if( msg->road_id != mqtt::instance()->mqtt_id )
    {
#ifdef DEBUG_MQTT  
    mqtt_pc.printf(MQTT_TAG"rcvd road %d "DELIM,msg->road_id);
#endif
        // add our message to the queue no fucking clue what happens internally to
        // the message memory thanks mbed os 5 documentation
        mqtt::instance()->add_to_network_to_road_queue(msg);
    }
    
    else{
#ifdef DEBUG_MQTT
       mqtt_pc.printf(MQTT_TAG"ignoring road message %d "DELIM,msg->road_id);
#endif
       delete msg;
    }
    
}

/*
    This function sends a message to the test topic.
*/
int mqtt::send_position_msg(  MQTT::Message& message, position_msg_t* msg ) 
{
    assert(msg != NULL && client != NULL);
    
//    MQTT::Message message;
    
#ifdef DEBUG_MQTT
    mqtt_pc.printf(MQTT_TAG"sending position msg %d "DELIM,msg->road_id);
#endif

    // might be safest to memcopy this? but well if shit breaks then well fix
//    memcpy(message.payload,msg,sizeof(position_msg_t));
    message.payload = (void*)msg;
    message.payloadlen = sizeof(position_msg_t); 
    message.qos = MQTT::QOS1;

    int rc = client->publish(POSITION_TOPIC,message);
    assert(rc == 0);  

//    client->yield(1000);
    return 0;
}

int mqtt::send_road_msg(  MQTT::Message& message, road_msg_t* msg ) 
{
    assert(msg != NULL && client != NULL);
    
//    MQTT::Message message;
    
#ifdef DEBUG_MQTT
    mqtt_pc.printf(MQTT_TAG"sending road msg %d "DELIM,msg->road_id);
#endif

    // might be safest to memcopy thius seems to work
    //memcpy(message.payload,msg,sizeof(position_msg_t));
    message.payload = (void*)msg;
    message.payloadlen = sizeof(road_msg_t); 
    message.qos = MQTT::QOS1;

    int rc = client->publish(ROAD_TOPIC,message);
    assert(rc == 0);  

//    client->yield(1000);
    return 0;
}

void mqtt::bringup_network() {
    
    // 
    WiFiInterface* wifi = setup_wifi();
    assert(wifi != NULL);
    
    // Create the network object needed by the MQTT client
    new_network = new MQTTNetwork(wifi);
    
    // get client
    client = setup_mqtt(*new_network);
    assert(client != NULL);

    // Subscribe to a topic / register a callback  
    mqtt_pc.printf(MQTT_TAG"Subscribing to topic %s "DELIM, CONTROL_TOPIC);
    int rc = client->subscribe(CONTROL_TOPIC, MQTT::QOS1, control_message_arrived);
    assert(rc == 0);
    
    // Subscribe to a topic / register a callback  
    mqtt_pc.printf(MQTT_TAG"Subscribing to topic %s "DELIM, ROAD_TOPIC);
    rc = client->subscribe(ROAD_TOPIC, MQTT::QOS1, road_message_arrived);
    assert(rc == 0);
    
    // make a road based of mqtt id
    mqtt_id = 0;
    if(strcmp(wifi->get_mac_address(),"2c:3a:e8:0b:75:06") == 0){
        mqtt_id = 0;
    }
    else{
        mqtt_id = 1;
    }
    
    mqtt_pc.printf(MQTT_TAG"Subscribed, Setup complete! road %d "DELIM,mqtt_id);
}


// manage callbacks from mqtt
void mqtt::manage_network()
{
    MQTT::Message mqtt_message;
    while(true)
    {
        while(!position_queue.empty()) 
        {
            
//#ifdef DEBUG_MQTT
//    mqtt_pc.printf(MQTT_TAG"getting position msg\r\n");
//#endif           
            //
            osEvent evt = position_queue.get();
            assert(evt.status == osEventMessage); 
        
            //
            position_msg_t *message = (position_msg_t*)evt.value.p;
            assert(message != NULL);
            send_position_msg(mqtt_message,message);
        }
        
        if (!road_to_network_queue.empty())
        {
            
//#ifdef DEBUG_MQTT
//    mqtt_pc.printf(MQTT_TAG"getting road info\r\n");
//#endif
            osEvent revt = road_to_network_queue.get();
            assert(revt.status == osEventMessage);
            
            //
            road_msg_t *road_msg = (road_msg_t*)revt.value.p;
            assert(road_msg != NULL);
            send_road_msg(mqtt_message,road_msg);
        }

      client->yield(10);
    }
}

//
// clean up goes here
void mqtt::shutdown_network()
{    
    //
    mqtt_pc.printf(MQTT_TAG"shutting down mbed "DELIM);
    
    //
    client->disconnect();
    delete new_network;
    
    //
    thread->terminate();
    delete thread;
}

// launch network manager thread
// responsible for pub sub 
void mqtt::setup_network()
{
    // bring up network if anything bad happens we will assert and crash
    bringup_network();
    
    // create a new thread thats sole purpose in life is to call client yield
    // what a sad life for a thread
    // calling yield is necessary as we will not get any messages for the mqtt 
    // serverwithout it
    thread = new Thread();
    assert(thread != NULL);
    thread->start( callback(this,&mqtt::manage_network) );
    
}