Emanuel Kuflik
/
smat_controller
541 smart traffic controller
mqtt.h@7:fd8e0604faaa, 2019-12-12 (annotated)
- Committer:
- micallef25
- Date:
- Thu Dec 12 17:25:27 2019 +0000
- Revision:
- 7:fd8e0604faaa
- Parent:
- 6:6cb13ac483e0
stable, passed 1000 simulation regression
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
micallef25 | 2:f10d6fecb345 | 1 | #ifndef _MQTT_H_ |
micallef25 | 2:f10d6fecb345 | 2 | #define _MQTT_H_ |
micallef25 | 2:f10d6fecb345 | 3 | |
micallef25 | 2:f10d6fecb345 | 4 | #include "mbed.h" |
micallef25 | 2:f10d6fecb345 | 5 | #include "MQTTNetwork.h" |
micallef25 | 2:f10d6fecb345 | 6 | #include "MQTTClient.h" |
micallef25 | 2:f10d6fecb345 | 7 | #include "MQTTmbed.h" |
micallef25 | 2:f10d6fecb345 | 8 | #include <assert.h> |
micallef25 | 2:f10d6fecb345 | 9 | |
micallef25 | 2:f10d6fecb345 | 10 | #define MAX_CARS_ON_ROAD 5 |
micallef25 | 2:f10d6fecb345 | 11 | |
micallef25 | 5:e0d8e5e922f1 | 12 | //#define WINDOWS |
micallef25 | 5:e0d8e5e922f1 | 13 | |
micallef25 | 5:e0d8e5e922f1 | 14 | #ifdef WINDOWS |
micallef25 | 5:e0d8e5e922f1 | 15 | #define DELIM "\r\n" |
micallef25 | 5:e0d8e5e922f1 | 16 | #else |
micallef25 | 5:e0d8e5e922f1 | 17 | #define DELIM "\n" |
micallef25 | 5:e0d8e5e922f1 | 18 | #endif |
micallef25 | 5:e0d8e5e922f1 | 19 | |
micallef25 | 7:fd8e0604faaa | 20 | //#define PUBLISH_ONLY |
micallef25 | 7:fd8e0604faaa | 21 | //#define SAFE_CRITICAL |
micallef25 | 2:f10d6fecb345 | 22 | enum drive_state_t{ |
micallef25 | 2:f10d6fecb345 | 23 | NORMAL_STATE = 0, |
micallef25 | 2:f10d6fecb345 | 24 | STOPPED_STATE, |
micallef25 | 2:f10d6fecb345 | 25 | CROSSING_STATE |
micallef25 | 2:f10d6fecb345 | 26 | }; |
micallef25 | 2:f10d6fecb345 | 27 | |
micallef25 | 2:f10d6fecb345 | 28 | typedef struct position_msg{ |
micallef25 | 2:f10d6fecb345 | 29 | int position; |
micallef25 | 2:f10d6fecb345 | 30 | int speed; |
micallef25 | 2:f10d6fecb345 | 31 | int car_id; |
micallef25 | 2:f10d6fecb345 | 32 | int road_id; |
micallef25 | 2:f10d6fecb345 | 33 | int counter; // for debugging and ensuring everyone is synchronized |
micallef25 | 5:e0d8e5e922f1 | 34 | //drive_state_t state; // for debugging |
micallef25 | 2:f10d6fecb345 | 35 | }position_msg_t; |
micallef25 | 2:f10d6fecb345 | 36 | |
MannyK | 4:64c6fc70ddb7 | 37 | typedef struct road_msg{ |
micallef25 | 5:e0d8e5e922f1 | 38 | int road_id; |
micallef25 | 5:e0d8e5e922f1 | 39 | int road_clock; |
micallef25 | 7:fd8e0604faaa | 40 | int simulating; |
MannyK | 4:64c6fc70ddb7 | 41 | }road_msg_t; |
MannyK | 4:64c6fc70ddb7 | 42 | |
micallef25 | 2:f10d6fecb345 | 43 | typedef struct control_msg{ |
micallef25 | 2:f10d6fecb345 | 44 | int speed; |
micallef25 | 2:f10d6fecb345 | 45 | int car_id; |
micallef25 | 2:f10d6fecb345 | 46 | int road_id; |
micallef25 | 2:f10d6fecb345 | 47 | int counter; // for debugging synchronization |
micallef25 | 2:f10d6fecb345 | 48 | }control_msg_t; |
micallef25 | 2:f10d6fecb345 | 49 | |
micallef25 | 2:f10d6fecb345 | 50 | |
micallef25 | 2:f10d6fecb345 | 51 | class mqtt{ |
micallef25 | 2:f10d6fecb345 | 52 | |
micallef25 | 2:f10d6fecb345 | 53 | private: |
micallef25 | 2:f10d6fecb345 | 54 | // static functions for control message callbacks |
micallef25 | 2:f10d6fecb345 | 55 | static void control_message_arrived(MQTT::MessageData& md); |
micallef25 | 5:e0d8e5e922f1 | 56 | static void road_message_arrived(MQTT::MessageData& md); |
micallef25 | 2:f10d6fecb345 | 57 | |
micallef25 | 2:f10d6fecb345 | 58 | // functions for creatin of mqtt and wifi bring up |
micallef25 | 2:f10d6fecb345 | 59 | WiFiInterface* setup_wifi(); |
micallef25 | 5:e0d8e5e922f1 | 60 | int send_position_msg(MQTT::Message& message,position_msg_t* msg); |
micallef25 | 5:e0d8e5e922f1 | 61 | int send_road_msg(MQTT::Message& message,road_msg_t* msg); |
micallef25 | 2:f10d6fecb345 | 62 | void manage_network(); |
micallef25 | 2:f10d6fecb345 | 63 | void bringup_network(); |
micallef25 | 2:f10d6fecb345 | 64 | |
micallef25 | 2:f10d6fecb345 | 65 | // data structures needed for creation of network |
micallef25 | 2:f10d6fecb345 | 66 | MQTT::Client<MQTTNetwork, Countdown>* client; |
micallef25 | 2:f10d6fecb345 | 67 | MQTT::Client<MQTTNetwork, Countdown>* setup_mqtt(MQTTNetwork& network); |
micallef25 | 2:f10d6fecb345 | 68 | MQTTNetwork* new_network; |
micallef25 | 2:f10d6fecb345 | 69 | |
micallef25 | 2:f10d6fecb345 | 70 | // thread that will manage the mqtt network status |
micallef25 | 2:f10d6fecb345 | 71 | Thread* thread; |
micallef25 | 2:f10d6fecb345 | 72 | |
micallef25 | 2:f10d6fecb345 | 73 | // queue for sending position msessages |
micallef25 | 7:fd8e0604faaa | 74 | Queue<position_msg_t, 15> position_queue; |
micallef25 | 2:f10d6fecb345 | 75 | |
MannyK | 4:64c6fc70ddb7 | 76 | // queue for sending road msessages |
micallef25 | 7:fd8e0604faaa | 77 | Queue<road_msg_t, 3> road_to_network_queue; |
micallef25 | 5:e0d8e5e922f1 | 78 | |
micallef25 | 5:e0d8e5e922f1 | 79 | // queue for sending road msessages |
micallef25 | 7:fd8e0604faaa | 80 | Queue<road_msg_t, 3> network_to_road_queue; |
MannyK | 4:64c6fc70ddb7 | 81 | |
micallef25 | 2:f10d6fecb345 | 82 | // queue for control message passsing between mqtt and car threads |
micallef25 | 7:fd8e0604faaa | 83 | Queue<control_msg_t, 3> control_queue[MAX_CARS_ON_ROAD]; |
micallef25 | 2:f10d6fecb345 | 84 | |
micallef25 | 2:f10d6fecb345 | 85 | |
micallef25 | 2:f10d6fecb345 | 86 | public: |
micallef25 | 2:f10d6fecb345 | 87 | // singleton instance for managing network |
micallef25 | 2:f10d6fecb345 | 88 | // we can only have one network so this should prohibit accidental |
micallef25 | 2:f10d6fecb345 | 89 | // creations of multiple objects |
micallef25 | 2:f10d6fecb345 | 90 | static mqtt* mqtt_singleton; |
micallef25 | 2:f10d6fecb345 | 91 | |
micallef25 | 2:f10d6fecb345 | 92 | // |
micallef25 | 2:f10d6fecb345 | 93 | int mqtt_id; |
micallef25 | 2:f10d6fecb345 | 94 | |
micallef25 | 2:f10d6fecb345 | 95 | // empty constructor |
micallef25 | 2:f10d6fecb345 | 96 | mqtt(); |
micallef25 | 2:f10d6fecb345 | 97 | |
micallef25 | 2:f10d6fecb345 | 98 | // setup and tear down API's |
micallef25 | 2:f10d6fecb345 | 99 | void setup_network(); |
micallef25 | 5:e0d8e5e922f1 | 100 | void shutdown_network(); |
micallef25 | 7:fd8e0604faaa | 101 | void clear_queues(); |
micallef25 | 2:f10d6fecb345 | 102 | |
micallef25 | 2:f10d6fecb345 | 103 | static mqtt *instance() |
micallef25 | 2:f10d6fecb345 | 104 | { |
micallef25 | 2:f10d6fecb345 | 105 | if (!mqtt_singleton) |
micallef25 | 2:f10d6fecb345 | 106 | mqtt_singleton = new mqtt; |
micallef25 | 2:f10d6fecb345 | 107 | return mqtt_singleton; |
micallef25 | 2:f10d6fecb345 | 108 | } |
micallef25 | 2:f10d6fecb345 | 109 | |
micallef25 | 2:f10d6fecb345 | 110 | // adding to a queue to be sent on mqtt network |
micallef25 | 2:f10d6fecb345 | 111 | inline void add_to_position_queue(position_msg_t* msg) |
micallef25 | 2:f10d6fecb345 | 112 | { |
micallef25 | 7:fd8e0604faaa | 113 | position_queue.put(msg,osWaitForever); |
micallef25 | 2:f10d6fecb345 | 114 | } |
micallef25 | 2:f10d6fecb345 | 115 | |
MannyK | 4:64c6fc70ddb7 | 116 | // adding to road queue to be sent on mqtt network |
micallef25 | 5:e0d8e5e922f1 | 117 | inline void add_to_road_to_network_queue(road_msg_t* msg) |
MannyK | 4:64c6fc70ddb7 | 118 | { |
micallef25 | 7:fd8e0604faaa | 119 | road_to_network_queue.put(msg,osWaitForever); |
MannyK | 4:64c6fc70ddb7 | 120 | } |
MannyK | 4:64c6fc70ddb7 | 121 | |
micallef25 | 5:e0d8e5e922f1 | 122 | // adding to road queue to be sent on mqtt network |
micallef25 | 5:e0d8e5e922f1 | 123 | inline void add_to_network_to_road_queue(road_msg_t* msg) |
micallef25 | 5:e0d8e5e922f1 | 124 | { |
micallef25 | 7:fd8e0604faaa | 125 | network_to_road_queue.put(msg,osWaitForever); |
micallef25 | 5:e0d8e5e922f1 | 126 | } |
micallef25 | 5:e0d8e5e922f1 | 127 | |
micallef25 | 5:e0d8e5e922f1 | 128 | |
micallef25 | 2:f10d6fecb345 | 129 | // adding to a queue to be sent on mqtt network |
micallef25 | 2:f10d6fecb345 | 130 | inline void add_to_control_queue(int car_id,control_msg_t* msg) |
micallef25 | 2:f10d6fecb345 | 131 | { |
micallef25 | 7:fd8e0604faaa | 132 | control_queue[car_id].put(msg,osWaitForever); |
micallef25 | 2:f10d6fecb345 | 133 | } |
micallef25 | 2:f10d6fecb345 | 134 | |
micallef25 | 2:f10d6fecb345 | 135 | // adding to a queue to be sent on mqtt network |
micallef25 | 2:f10d6fecb345 | 136 | inline control_msg_t* get_control_msg(int car_id) |
micallef25 | 2:f10d6fecb345 | 137 | { |
micallef25 | 6:6cb13ac483e0 | 138 | // handle the case if no message was received in time |
micallef25 | 7:fd8e0604faaa | 139 | #ifdef SAFETY_CRITICAL |
micallef25 | 6:6cb13ac483e0 | 140 | if(control_queue[car_id].empty()) |
micallef25 | 6:6cb13ac483e0 | 141 | { |
micallef25 | 6:6cb13ac483e0 | 142 | return NULL; |
micallef25 | 6:6cb13ac483e0 | 143 | } |
micallef25 | 7:fd8e0604faaa | 144 | #endif |
micallef25 | 7:fd8e0604faaa | 145 | osEvent evt = control_queue[car_id].get(osWaitForever); |
micallef25 | 2:f10d6fecb345 | 146 | assert(evt.status == osEventMessage); |
micallef25 | 2:f10d6fecb345 | 147 | control_msg_t *message = (control_msg_t*)evt.value.p; |
micallef25 | 2:f10d6fecb345 | 148 | return message; |
micallef25 | 2:f10d6fecb345 | 149 | } |
micallef25 | 2:f10d6fecb345 | 150 | |
micallef25 | 5:e0d8e5e922f1 | 151 | // we are publishing and subscring to the road message |
micallef25 | 5:e0d8e5e922f1 | 152 | // thus we need to ensure we are not grabbing our own |
micallef25 | 5:e0d8e5e922f1 | 153 | inline road_msg_t* get_network_to_road_msg() |
MannyK | 4:64c6fc70ddb7 | 154 | { |
micallef25 | 5:e0d8e5e922f1 | 155 | // get message from queue |
micallef25 | 7:fd8e0604faaa | 156 | osEvent evt = network_to_road_queue.get(osWaitForever); |
micallef25 | 5:e0d8e5e922f1 | 157 | assert(evt.status == osEventMessage); |
micallef25 | 5:e0d8e5e922f1 | 158 | road_msg_t *message = (road_msg_t*)evt.value.p; |
micallef25 | 5:e0d8e5e922f1 | 159 | // we handle ignoring messages in callback |
micallef25 | 5:e0d8e5e922f1 | 160 | // delete the message and let the cars go |
micallef25 | 5:e0d8e5e922f1 | 161 | assert(mqtt_id != message->road_id ); |
micallef25 | 5:e0d8e5e922f1 | 162 | return message; |
MannyK | 4:64c6fc70ddb7 | 163 | } |
MannyK | 4:64c6fc70ddb7 | 164 | |
micallef25 | 2:f10d6fecb345 | 165 | }; |
micallef25 | 2:f10d6fecb345 | 166 | |
micallef25 | 2:f10d6fecb345 | 167 | |
micallef25 | 2:f10d6fecb345 | 168 | #endif |