Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
mqtt.cpp
00001 /* mbed Microcontroller Library 00002 * Copyright (c) 2018 ARM Limited 00003 * SPDX-License-Identifier: Apache-2.0 00004 */ 00005 00006 #include "mbed.h" 00007 #include "MQTTNetwork.h" 00008 #include "MQTTClient.h" 00009 #include "MQTTmbed.h" 00010 #include "mqtt.h" 00011 #include <assert.h> 00012 #include "AccCar.h" 00013 00014 // topics interested in 00015 #define POSITION_TOPIC "MQTT_Position0x25" 00016 #define ROAD_TOPIC "MQTT_Road0x25" 00017 #define CONTROL_TOPIC "MQTT_Control0x25" 00018 00019 //#define DEBUG_MQTT 00020 #define MQTT_TAG "[MQTT] " 00021 00022 Serial mqtt_pc (USBTX, USBRX); 00023 00024 00025 // empty constructor 00026 mqtt::mqtt() 00027 { 00028 00029 } 00030 00031 /* 00032 This function sets up the wifi module and connects it to the SSID 00033 configured in the configuration file. It also prints out the MAC address 00034 of the module, which is needed if you are trying to use campus wifi. 00035 This function returns NULL if there are any issues. 00036 */ 00037 WiFiInterface* mqtt::setup_wifi() { 00038 // Get a handle to the WiFi module 00039 WiFiInterface* wifi = WiFiInterface::get_default_instance(); 00040 00041 // Connect the module to the wifi, based on the SSID and password 00042 // specified in the mbed_app.json configuration file 00043 // If you are using AirPennNet-Device, this will not succeed until the MAC 00044 // address (printed shortly after this) is registered 00045 mqtt_pc.printf(MQTT_TAG"Connecting to wifi "DELIM); 00046 int rc = wifi->connect(MBED_CONF_APP_WIFI_SSID, MBED_CONF_APP_WIFI_PASSWORD, NSAPI_SECURITY_WPA_WPA2); 00047 00048 // Print out the MAC address of the wifi module. The MAC address is 00049 // needed to register the device with AirPennNet-Device, so that you 00050 // can use the campus wifi 00051 mqtt_pc.printf(MQTT_TAG"MAC Address: "); 00052 mqtt_pc.printf(wifi->get_mac_address()); 00053 mqtt_pc.printf("\r\n"); 00054 00055 if (rc != 0) { 00056 mqtt_pc.printf(MQTT_TAG"Problem connecting to wifi "DELIM); 00057 return NULL; 00058 } else { 00059 mqtt_pc.printf(MQTT_TAG"Wifi connected "DELIM); 00060 } 00061 00062 return wifi; 00063 } 00064 00065 /* 00066 This function creates the MQTT client and connects it to the MQTT broker 00067 that we have setup for the course. If there are any errors with the 00068 connection, it will return NULL 00069 */ 00070 MQTT::Client<MQTTNetwork, Countdown>* mqtt::setup_mqtt(MQTTNetwork& network) { 00071 // the hostname and port point to a Google Cloud MQTT server we setup for 00072 // this project 00073 const char* hostname = "34.68.206.11"; 00074 int port = 1883; 00075 00076 // Create the underlying network connection to the MQTT server 00077 mqtt_pc.printf(MQTT_TAG"Connecting to %s:%d "DELIM, hostname, port); 00078 int rc = network.connect(hostname, port); 00079 if (rc != 0) { 00080 mqtt_pc.printf(MQTT_TAG"There was an error with the TCP connect: %d "DELIM, rc); 00081 return NULL; 00082 } 00083 00084 mqtt_pc.printf(MQTT_TAG"Connected to %s:%d "DELIM, hostname, port); 00085 00086 // Connect the MQTT client to the server 00087 MQTT::Client<MQTTNetwork, Countdown>* temp_client = new MQTT::Client<MQTTNetwork, Countdown>(network); 00088 rc = temp_client->connect(); 00089 if (rc != 0) { 00090 mqtt_pc.printf(MQTT_TAG"There was an error with the MQTT connect: %d "DELIM, rc); 00091 return NULL; 00092 } 00093 00094 mqtt_pc.printf(MQTT_TAG"MQTT connect successful! "DELIM); 00095 00096 return temp_client; 00097 } 00098 00099 /* 00100 This function is the callback for when a message is received from the 00101 MQTT broker. You register different callback functions with different 00102 topic subscriptions 00103 */ 00104 void mqtt::control_message_arrived(MQTT::MessageData& md) 00105 { 00106 MQTT::Message &message = md.message; 00107 // make message receiver responsible for freeeing 00108 control_msg_t* msg = new control_msg_t; 00109 assert(msg != NULL && message.payloadlen == sizeof(control_msg_t)); 00110 00111 // copy to our new pointer for some reason just taking the payload ptr is bad for mbed? 00112 memcpy(msg,message.payload,message.payloadlen); 00113 00114 if( msg->road_id == mqtt::instance()->mqtt_id ) 00115 { 00116 #ifdef DEBUG_MQTT 00117 //mqtt_pc.printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id); 00118 mqtt_pc.printf(MQTT_TAG"rcvd control id %d "DELIM,msg->road_id); 00119 #endif 00120 // add our message to the queue no fucking clue what happens internally to 00121 // the message memory thanks mbed os 5 documentation 00122 mqtt::instance()->add_to_control_queue(msg->car_id,msg); 00123 } 00124 00125 else 00126 { 00127 #ifdef DEBUG_MQTT 00128 mqtt_pc.printf(MQTT_TAG"ignoring control msg "DELIM,msg->road_id); 00129 #endif 00130 delete msg; 00131 } 00132 } 00133 00134 void mqtt::road_message_arrived(MQTT::MessageData& md) 00135 { 00136 MQTT::Message &message = md.message; 00137 // make message... I will endeavor to free this one the receiver side 00138 // from example sender allocates and receiver frees so well stick 00139 // with that paradigm 00140 road_msg_t* msg = new road_msg_t; 00141 assert(msg != NULL && message.payloadlen == sizeof(road_msg_t)); 00142 00143 // copy to our new pointer for some reason just taking the payload ptr is bad for mbed? 00144 memcpy(msg,message.payload,message.payloadlen); 00145 00146 if( msg->road_id != mqtt::instance()->mqtt_id ) 00147 { 00148 #ifdef DEBUG_MQTT 00149 mqtt_pc.printf(MQTT_TAG"rcvd road %d "DELIM,msg->road_clock); 00150 #endif 00151 // add our message to the queue no fucking clue what happens internally to 00152 // the message memory thanks mbed os 5 documentation 00153 mqtt::instance()->add_to_network_to_road_queue(msg); 00154 } 00155 00156 else{ 00157 #ifdef DEBUG_MQTT 00158 mqtt_pc.printf(MQTT_TAG"ignoring road message %d "DELIM,msg->road_id); 00159 #endif 00160 delete msg; 00161 } 00162 00163 } 00164 00165 /* 00166 This function sends a message to the test topic. 00167 */ 00168 int mqtt::send_position_msg( MQTT::Message& message, position_msg_t* msg ) 00169 { 00170 assert(msg != NULL && client != NULL); 00171 00172 #ifdef DEBUG_MQTT 00173 mqtt_pc.printf(MQTT_TAG"sending position msg %d "DELIM,msg->road_id); 00174 #endif 00175 00176 MQTT::Message tmessage; 00177 00178 tmessage.payload = (void*)msg; 00179 tmessage.payloadlen = sizeof(position_msg_t); 00180 tmessage.qos = MQTT::QOS1; 00181 00182 int rc = client->publish(POSITION_TOPIC,tmessage); 00183 assert(rc == 0); 00184 00185 return 0; 00186 } 00187 00188 int mqtt::send_road_msg( MQTT::Message& message, road_msg_t* msg ) 00189 { 00190 assert(msg != NULL && client != NULL); 00191 00192 #ifdef DEBUG_MQTT 00193 mqtt_pc.printf(MQTT_TAG"sending road msg %d "DELIM,msg->road_clock); 00194 #endif 00195 MQTT::Message tmessage; 00196 00197 tmessage.payload = (void*)msg; 00198 tmessage.payloadlen = sizeof(road_msg_t); 00199 tmessage.qos = MQTT::QOS1; 00200 00201 int rc = client->publish(ROAD_TOPIC,tmessage); 00202 assert(rc == 0); 00203 00204 return 0; 00205 } 00206 00207 void mqtt::bringup_network() { 00208 00209 // 00210 WiFiInterface* wifi = setup_wifi(); 00211 assert(wifi != NULL); 00212 00213 // Create the network object needed by the MQTT client 00214 new_network = new MQTTNetwork(wifi); 00215 00216 // get client 00217 client = setup_mqtt(*new_network); 00218 assert(client != NULL); 00219 00220 // Subscribe to a topic / register a callback 00221 mqtt_pc.printf(MQTT_TAG"Subscribing to topic %s "DELIM, CONTROL_TOPIC); 00222 int rc = client->subscribe(CONTROL_TOPIC, MQTT::QOS1, control_message_arrived); 00223 assert(rc == 0); 00224 00225 // Subscribe to a topic / register a callback 00226 mqtt_pc.printf(MQTT_TAG"Subscribing to topic %s "DELIM, ROAD_TOPIC); 00227 rc = client->subscribe(ROAD_TOPIC, MQTT::QOS1, road_message_arrived); 00228 assert(rc == 0); 00229 00230 // make a road based of mqtt id 00231 mqtt_id = 0; 00232 if(strcmp(wifi->get_mac_address(),"2c:3a:e8:0b:8e:77") == 0){ 00233 mqtt_id = 0; 00234 } 00235 else{ 00236 mqtt_id = 1; 00237 } 00238 00239 mqtt_pc.printf(MQTT_TAG"Subscribed, Setup complete! road %d "DELIM,mqtt_id); 00240 } 00241 00242 00243 // manage callbacks from mqtt 00244 void mqtt::manage_network() 00245 { 00246 MQTT::Message mqtt_message; 00247 while(true) 00248 { 00249 while(!position_queue.empty()) 00250 { 00251 00252 //#ifdef DEBUG_MQTT 00253 // mqtt_pc.printf(MQTT_TAG"getting position msg\r\n"); 00254 //#endif 00255 // 00256 osEvent evt = position_queue.get(); 00257 assert(evt.status == osEventMessage); 00258 00259 // 00260 position_msg_t *message = (position_msg_t*)evt.value.p; 00261 assert(message != NULL); 00262 send_position_msg(mqtt_message,message); 00263 } 00264 00265 if (!road_to_network_queue.empty()) 00266 { 00267 00268 //#ifdef DEBUG_MQTT 00269 // mqtt_pc.printf(MQTT_TAG"getting road info\r\n"); 00270 //#endif 00271 osEvent revt = road_to_network_queue.get(); 00272 assert(revt.status == osEventMessage); 00273 00274 // 00275 road_msg_t *road_msg = (road_msg_t*)revt.value.p; 00276 assert(road_msg != NULL); 00277 send_road_msg(mqtt_message,road_msg); 00278 delete road_msg; 00279 } 00280 00281 client->yield(10); 00282 } 00283 } 00284 00285 // 00286 // clean up goes here 00287 void mqtt::shutdown_network() 00288 { 00289 // 00290 mqtt_pc.printf(MQTT_TAG"shutting down mbed "DELIM); 00291 00292 // 00293 client->disconnect(); 00294 delete new_network; 00295 00296 // 00297 thread->terminate(); 00298 delete thread; 00299 } 00300 00301 // launch network manager thread 00302 // responsible for pub sub 00303 void mqtt::setup_network() 00304 { 00305 // bring up network if anything bad happens we will assert and crash 00306 bringup_network(); 00307 00308 // create a new thread thats sole purpose in life is to call client yield 00309 // what a sad life for a thread 00310 // calling yield is necessary as we will not get any messages for the mqtt 00311 // serverwithout it 00312 thread = new Thread(); 00313 assert(thread != NULL); 00314 thread->start( callback(this,&mqtt::manage_network) ); 00315 00316 } 00317 00318 // before the next run make sure 00319 // that everything is cleaned up properly 00320 // the control queue may have one entry due to 00321 // the last iteration ending 00322 void mqtt::clear_queues() 00323 { 00324 while(!position_queue.empty()) 00325 { 00326 osEvent revt = position_queue.get(); 00327 assert(revt.status == osEventMessage); 00328 //printf("cleaned up queue\n"); 00329 // 00330 position_msg_t *msg = (position_msg_t*)revt.value.p; 00331 delete msg; 00332 } 00333 for(int i = 0; i < 5; i++) 00334 { 00335 while(!control_queue[i].empty()) 00336 { 00337 osEvent revt = control_queue[i].get(); 00338 assert(revt.status == osEventMessage); 00339 //printf("cleaned up control queue\n"); 00340 // 00341 control_msg_t *msg = (control_msg_t*)revt.value.p; 00342 delete msg; 00343 } 00344 } 00345 00346 while(!network_to_road_queue.empty()) 00347 { 00348 osEvent evt = network_to_road_queue.get(); 00349 assert(evt.status == osEventMessage); 00350 road_msg_t *message = (road_msg_t*)evt.value.p; 00351 delete message; 00352 } 00353 while(!road_to_network_queue.empty()) 00354 { 00355 osEvent evt = road_to_network_queue.get(); 00356 assert(evt.status == osEventMessage); 00357 road_msg_t *message = (road_msg_t*)evt.value.p; 00358 delete message; 00359 } 00360 }
Generated on Fri Jul 29 2022 03:44:50 by
1.7.2