541 smart traffic controller

Dependencies:   MQTT

Committer:
micallef25
Date:
Thu Dec 12 17:25:27 2019 +0000
Revision:
7:fd8e0604faaa
Parent:
6:6cb13ac483e0
stable, passed 1000 simulation regression

Who changed what in which revision?

UserRevisionLine numberNew contents of line
micallef25 2:f10d6fecb345 1 #ifndef _MQTT_H_
micallef25 2:f10d6fecb345 2 #define _MQTT_H_
micallef25 2:f10d6fecb345 3
micallef25 2:f10d6fecb345 4 #include "mbed.h"
micallef25 2:f10d6fecb345 5 #include "MQTTNetwork.h"
micallef25 2:f10d6fecb345 6 #include "MQTTClient.h"
micallef25 2:f10d6fecb345 7 #include "MQTTmbed.h"
micallef25 2:f10d6fecb345 8 #include <assert.h>
micallef25 2:f10d6fecb345 9
micallef25 2:f10d6fecb345 10 #define MAX_CARS_ON_ROAD 5
micallef25 2:f10d6fecb345 11
micallef25 5:e0d8e5e922f1 12 //#define WINDOWS
micallef25 5:e0d8e5e922f1 13
micallef25 5:e0d8e5e922f1 14 #ifdef WINDOWS
micallef25 5:e0d8e5e922f1 15 #define DELIM "\r\n"
micallef25 5:e0d8e5e922f1 16 #else
micallef25 5:e0d8e5e922f1 17 #define DELIM "\n"
micallef25 5:e0d8e5e922f1 18 #endif
micallef25 5:e0d8e5e922f1 19
micallef25 7:fd8e0604faaa 20 //#define PUBLISH_ONLY
micallef25 7:fd8e0604faaa 21 //#define SAFE_CRITICAL
micallef25 2:f10d6fecb345 22 enum drive_state_t{
micallef25 2:f10d6fecb345 23 NORMAL_STATE = 0,
micallef25 2:f10d6fecb345 24 STOPPED_STATE,
micallef25 2:f10d6fecb345 25 CROSSING_STATE
micallef25 2:f10d6fecb345 26 };
micallef25 2:f10d6fecb345 27
micallef25 2:f10d6fecb345 28 typedef struct position_msg{
micallef25 2:f10d6fecb345 29 int position;
micallef25 2:f10d6fecb345 30 int speed;
micallef25 2:f10d6fecb345 31 int car_id;
micallef25 2:f10d6fecb345 32 int road_id;
micallef25 2:f10d6fecb345 33 int counter; // for debugging and ensuring everyone is synchronized
micallef25 5:e0d8e5e922f1 34 //drive_state_t state; // for debugging
micallef25 2:f10d6fecb345 35 }position_msg_t;
micallef25 2:f10d6fecb345 36
MannyK 4:64c6fc70ddb7 37 typedef struct road_msg{
micallef25 5:e0d8e5e922f1 38 int road_id;
micallef25 5:e0d8e5e922f1 39 int road_clock;
micallef25 7:fd8e0604faaa 40 int simulating;
MannyK 4:64c6fc70ddb7 41 }road_msg_t;
MannyK 4:64c6fc70ddb7 42
micallef25 2:f10d6fecb345 43 typedef struct control_msg{
micallef25 2:f10d6fecb345 44 int speed;
micallef25 2:f10d6fecb345 45 int car_id;
micallef25 2:f10d6fecb345 46 int road_id;
micallef25 2:f10d6fecb345 47 int counter; // for debugging synchronization
micallef25 2:f10d6fecb345 48 }control_msg_t;
micallef25 2:f10d6fecb345 49
micallef25 2:f10d6fecb345 50
micallef25 2:f10d6fecb345 51 class mqtt{
micallef25 2:f10d6fecb345 52
micallef25 2:f10d6fecb345 53 private:
micallef25 2:f10d6fecb345 54 // static functions for control message callbacks
micallef25 2:f10d6fecb345 55 static void control_message_arrived(MQTT::MessageData& md);
micallef25 5:e0d8e5e922f1 56 static void road_message_arrived(MQTT::MessageData& md);
micallef25 2:f10d6fecb345 57
micallef25 2:f10d6fecb345 58 // functions for creatin of mqtt and wifi bring up
micallef25 2:f10d6fecb345 59 WiFiInterface* setup_wifi();
micallef25 5:e0d8e5e922f1 60 int send_position_msg(MQTT::Message& message,position_msg_t* msg);
micallef25 5:e0d8e5e922f1 61 int send_road_msg(MQTT::Message& message,road_msg_t* msg);
micallef25 2:f10d6fecb345 62 void manage_network();
micallef25 2:f10d6fecb345 63 void bringup_network();
micallef25 2:f10d6fecb345 64
micallef25 2:f10d6fecb345 65 // data structures needed for creation of network
micallef25 2:f10d6fecb345 66 MQTT::Client<MQTTNetwork, Countdown>* client;
micallef25 2:f10d6fecb345 67 MQTT::Client<MQTTNetwork, Countdown>* setup_mqtt(MQTTNetwork& network);
micallef25 2:f10d6fecb345 68 MQTTNetwork* new_network;
micallef25 2:f10d6fecb345 69
micallef25 2:f10d6fecb345 70 // thread that will manage the mqtt network status
micallef25 2:f10d6fecb345 71 Thread* thread;
micallef25 2:f10d6fecb345 72
micallef25 2:f10d6fecb345 73 // queue for sending position msessages
micallef25 7:fd8e0604faaa 74 Queue<position_msg_t, 15> position_queue;
micallef25 2:f10d6fecb345 75
MannyK 4:64c6fc70ddb7 76 // queue for sending road msessages
micallef25 7:fd8e0604faaa 77 Queue<road_msg_t, 3> road_to_network_queue;
micallef25 5:e0d8e5e922f1 78
micallef25 5:e0d8e5e922f1 79 // queue for sending road msessages
micallef25 7:fd8e0604faaa 80 Queue<road_msg_t, 3> network_to_road_queue;
MannyK 4:64c6fc70ddb7 81
micallef25 2:f10d6fecb345 82 // queue for control message passsing between mqtt and car threads
micallef25 7:fd8e0604faaa 83 Queue<control_msg_t, 3> control_queue[MAX_CARS_ON_ROAD];
micallef25 2:f10d6fecb345 84
micallef25 2:f10d6fecb345 85
micallef25 2:f10d6fecb345 86 public:
micallef25 2:f10d6fecb345 87 // singleton instance for managing network
micallef25 2:f10d6fecb345 88 // we can only have one network so this should prohibit accidental
micallef25 2:f10d6fecb345 89 // creations of multiple objects
micallef25 2:f10d6fecb345 90 static mqtt* mqtt_singleton;
micallef25 2:f10d6fecb345 91
micallef25 2:f10d6fecb345 92 //
micallef25 2:f10d6fecb345 93 int mqtt_id;
micallef25 2:f10d6fecb345 94
micallef25 2:f10d6fecb345 95 // empty constructor
micallef25 2:f10d6fecb345 96 mqtt();
micallef25 2:f10d6fecb345 97
micallef25 2:f10d6fecb345 98 // setup and tear down API's
micallef25 2:f10d6fecb345 99 void setup_network();
micallef25 5:e0d8e5e922f1 100 void shutdown_network();
micallef25 7:fd8e0604faaa 101 void clear_queues();
micallef25 2:f10d6fecb345 102
micallef25 2:f10d6fecb345 103 static mqtt *instance()
micallef25 2:f10d6fecb345 104 {
micallef25 2:f10d6fecb345 105 if (!mqtt_singleton)
micallef25 2:f10d6fecb345 106 mqtt_singleton = new mqtt;
micallef25 2:f10d6fecb345 107 return mqtt_singleton;
micallef25 2:f10d6fecb345 108 }
micallef25 2:f10d6fecb345 109
micallef25 2:f10d6fecb345 110 // adding to a queue to be sent on mqtt network
micallef25 2:f10d6fecb345 111 inline void add_to_position_queue(position_msg_t* msg)
micallef25 2:f10d6fecb345 112 {
micallef25 7:fd8e0604faaa 113 position_queue.put(msg,osWaitForever);
micallef25 2:f10d6fecb345 114 }
micallef25 2:f10d6fecb345 115
MannyK 4:64c6fc70ddb7 116 // adding to road queue to be sent on mqtt network
micallef25 5:e0d8e5e922f1 117 inline void add_to_road_to_network_queue(road_msg_t* msg)
MannyK 4:64c6fc70ddb7 118 {
micallef25 7:fd8e0604faaa 119 road_to_network_queue.put(msg,osWaitForever);
MannyK 4:64c6fc70ddb7 120 }
MannyK 4:64c6fc70ddb7 121
micallef25 5:e0d8e5e922f1 122 // adding to road queue to be sent on mqtt network
micallef25 5:e0d8e5e922f1 123 inline void add_to_network_to_road_queue(road_msg_t* msg)
micallef25 5:e0d8e5e922f1 124 {
micallef25 7:fd8e0604faaa 125 network_to_road_queue.put(msg,osWaitForever);
micallef25 5:e0d8e5e922f1 126 }
micallef25 5:e0d8e5e922f1 127
micallef25 5:e0d8e5e922f1 128
micallef25 2:f10d6fecb345 129 // adding to a queue to be sent on mqtt network
micallef25 2:f10d6fecb345 130 inline void add_to_control_queue(int car_id,control_msg_t* msg)
micallef25 2:f10d6fecb345 131 {
micallef25 7:fd8e0604faaa 132 control_queue[car_id].put(msg,osWaitForever);
micallef25 2:f10d6fecb345 133 }
micallef25 2:f10d6fecb345 134
micallef25 2:f10d6fecb345 135 // adding to a queue to be sent on mqtt network
micallef25 2:f10d6fecb345 136 inline control_msg_t* get_control_msg(int car_id)
micallef25 2:f10d6fecb345 137 {
micallef25 6:6cb13ac483e0 138 // handle the case if no message was received in time
micallef25 7:fd8e0604faaa 139 #ifdef SAFETY_CRITICAL
micallef25 6:6cb13ac483e0 140 if(control_queue[car_id].empty())
micallef25 6:6cb13ac483e0 141 {
micallef25 6:6cb13ac483e0 142 return NULL;
micallef25 6:6cb13ac483e0 143 }
micallef25 7:fd8e0604faaa 144 #endif
micallef25 7:fd8e0604faaa 145 osEvent evt = control_queue[car_id].get(osWaitForever);
micallef25 2:f10d6fecb345 146 assert(evt.status == osEventMessage);
micallef25 2:f10d6fecb345 147 control_msg_t *message = (control_msg_t*)evt.value.p;
micallef25 2:f10d6fecb345 148 return message;
micallef25 2:f10d6fecb345 149 }
micallef25 2:f10d6fecb345 150
micallef25 5:e0d8e5e922f1 151 // we are publishing and subscring to the road message
micallef25 5:e0d8e5e922f1 152 // thus we need to ensure we are not grabbing our own
micallef25 5:e0d8e5e922f1 153 inline road_msg_t* get_network_to_road_msg()
MannyK 4:64c6fc70ddb7 154 {
micallef25 5:e0d8e5e922f1 155 // get message from queue
micallef25 7:fd8e0604faaa 156 osEvent evt = network_to_road_queue.get(osWaitForever);
micallef25 5:e0d8e5e922f1 157 assert(evt.status == osEventMessage);
micallef25 5:e0d8e5e922f1 158 road_msg_t *message = (road_msg_t*)evt.value.p;
micallef25 5:e0d8e5e922f1 159 // we handle ignoring messages in callback
micallef25 5:e0d8e5e922f1 160 // delete the message and let the cars go
micallef25 5:e0d8e5e922f1 161 assert(mqtt_id != message->road_id );
micallef25 5:e0d8e5e922f1 162 return message;
MannyK 4:64c6fc70ddb7 163 }
MannyK 4:64c6fc70ddb7 164
micallef25 2:f10d6fecb345 165 };
micallef25 2:f10d6fecb345 166
micallef25 2:f10d6fecb345 167
micallef25 2:f10d6fecb345 168 #endif