Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
mqtt.h
00001 #ifndef _MQTT_H_ 00002 #define _MQTT_H_ 00003 00004 #include "mbed.h" 00005 #include "MQTTNetwork.h" 00006 #include "MQTTClient.h" 00007 #include "MQTTmbed.h" 00008 #include <assert.h> 00009 00010 #define MAX_CARS_ON_ROAD 5 00011 00012 //#define WINDOWS 00013 00014 #ifdef WINDOWS 00015 #define DELIM "\r\n" 00016 #else 00017 #define DELIM "\n" 00018 #endif 00019 00020 //#define PUBLISH_ONLY 00021 //#define SAFE_CRITICAL 00022 enum drive_state_t{ 00023 NORMAL_STATE = 0, 00024 STOPPED_STATE, 00025 CROSSING_STATE 00026 }; 00027 00028 typedef struct position_msg{ 00029 int position; 00030 int speed; 00031 int car_id; 00032 int road_id; 00033 int counter; // for debugging and ensuring everyone is synchronized 00034 //drive_state_t state; // for debugging 00035 }position_msg_t; 00036 00037 typedef struct road_msg{ 00038 int road_id; 00039 int road_clock; 00040 int simulating; 00041 }road_msg_t; 00042 00043 typedef struct control_msg{ 00044 int speed; 00045 int car_id; 00046 int road_id; 00047 int counter; // for debugging synchronization 00048 }control_msg_t; 00049 00050 00051 class mqtt{ 00052 00053 private: 00054 // static functions for control message callbacks 00055 static void control_message_arrived(MQTT::MessageData& md); 00056 static void road_message_arrived(MQTT::MessageData& md); 00057 00058 // functions for creatin of mqtt and wifi bring up 00059 WiFiInterface* setup_wifi(); 00060 int send_position_msg(MQTT::Message& message,position_msg_t* msg); 00061 int send_road_msg(MQTT::Message& message,road_msg_t* msg); 00062 void manage_network(); 00063 void bringup_network(); 00064 00065 // data structures needed for creation of network 00066 MQTT::Client<MQTTNetwork, Countdown>* client; 00067 MQTT::Client<MQTTNetwork, Countdown>* setup_mqtt(MQTTNetwork& network); 00068 MQTTNetwork* new_network; 00069 00070 // thread that will manage the mqtt network status 00071 Thread* thread; 00072 00073 // queue for sending position msessages 00074 Queue<position_msg_t, 15> position_queue; 00075 00076 // queue for sending road msessages 00077 Queue<road_msg_t, 3> road_to_network_queue; 00078 00079 // queue for sending road msessages 00080 Queue<road_msg_t, 3> network_to_road_queue; 00081 00082 // queue for control message passsing between mqtt and car threads 00083 Queue<control_msg_t, 3> control_queue[MAX_CARS_ON_ROAD]; 00084 00085 00086 public: 00087 // singleton instance for managing network 00088 // we can only have one network so this should prohibit accidental 00089 // creations of multiple objects 00090 static mqtt* mqtt_singleton; 00091 00092 // 00093 int mqtt_id; 00094 00095 // empty constructor 00096 mqtt(); 00097 00098 // setup and tear down API's 00099 void setup_network(); 00100 void shutdown_network(); 00101 void clear_queues(); 00102 00103 static mqtt *instance() 00104 { 00105 if (!mqtt_singleton) 00106 mqtt_singleton = new mqtt; 00107 return mqtt_singleton; 00108 } 00109 00110 // adding to a queue to be sent on mqtt network 00111 inline void add_to_position_queue(position_msg_t* msg) 00112 { 00113 position_queue.put(msg,osWaitForever); 00114 } 00115 00116 // adding to road queue to be sent on mqtt network 00117 inline void add_to_road_to_network_queue(road_msg_t* msg) 00118 { 00119 road_to_network_queue.put(msg,osWaitForever); 00120 } 00121 00122 // adding to road queue to be sent on mqtt network 00123 inline void add_to_network_to_road_queue(road_msg_t* msg) 00124 { 00125 network_to_road_queue.put(msg,osWaitForever); 00126 } 00127 00128 00129 // adding to a queue to be sent on mqtt network 00130 inline void add_to_control_queue(int car_id,control_msg_t* msg) 00131 { 00132 control_queue[car_id].put(msg,osWaitForever); 00133 } 00134 00135 // adding to a queue to be sent on mqtt network 00136 inline control_msg_t* get_control_msg(int car_id) 00137 { 00138 // handle the case if no message was received in time 00139 #ifdef SAFETY_CRITICAL 00140 if(control_queue[car_id].empty()) 00141 { 00142 return NULL; 00143 } 00144 #endif 00145 osEvent evt = control_queue[car_id].get(osWaitForever); 00146 assert(evt.status == osEventMessage); 00147 control_msg_t *message = (control_msg_t*)evt.value.p; 00148 return message; 00149 } 00150 00151 // we are publishing and subscring to the road message 00152 // thus we need to ensure we are not grabbing our own 00153 inline road_msg_t* get_network_to_road_msg() 00154 { 00155 // get message from queue 00156 osEvent evt = network_to_road_queue.get(osWaitForever); 00157 assert(evt.status == osEventMessage); 00158 road_msg_t *message = (road_msg_t*)evt.value.p; 00159 // we handle ignoring messages in callback 00160 // delete the message and let the cars go 00161 assert(mqtt_id != message->road_id ); 00162 return message; 00163 } 00164 00165 }; 00166 00167 00168 #endif
Generated on Fri Jul 29 2022 03:44:50 by
1.7.2