Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
plotter.py
00001 from __future__ import print_function 00002 import ast 00003 from collections import deque 00004 import csv 00005 import datetime 00006 00007 from tkinter import * 00008 import tkinter.simpledialog as simpledialog 00009 00010 from matplotlib import pyplot as plt 00011 import matplotlib.animation as animation 00012 import numpy as np 00013 import serial 00014 00015 from telemetry.parser import TelemetrySerial, DataPacket, HeaderPacket, NumericData, NumericArray 00016 00017 class BasePlot (object): 00018 """Base class / interface definition for telemetry plotter plots with a 00019 dependent variable vs. an scrolling independent variable (like time). 00020 """ 00021 def __init__ (self, subplot, indep_def, dep_def, indep_span): 00022 """Constructor. 00023 00024 Arguments: 00025 subplot -- matplotlib subplot to draw stuff onto 00026 indep_def -- TelemetryData of the independent variable 00027 dep_def -- TelemetryData of the dependent variable 00028 indep_span -- the span of the independent variable to keep track of 00029 """ 00030 self.indep_span = indep_span 00031 self.indep_id = indep_def.data_id 00032 self.dep_id = dep_def.data_id 00033 00034 self.dep_def = dep_def 00035 00036 if dep_def.limits[0] != dep_def.limits[1]: 00037 self.limits = dep_def.limits 00038 else: 00039 self.limits = None 00040 00041 self.subplot = subplot 00042 00043 def update_from_packet (self, packet): 00044 """Updates my internal data structures from a received packet. Should not do 00045 rendering - this may be called multiple times per visible update. 00046 """ 00047 raise NotImplementedError 00048 00049 def update_show (self, packet): 00050 """Render my data. This is separated from update_from_packet to allow 00051 multiple packets to be processed while doing only one time-consuming render. 00052 """ 00053 raise NotImplementedError 00054 00055 def get_name(self): 00056 return self.dep_def .display_name 00057 00058 def get_dep_def(self): 00059 return self.dep_def 00060 00061 class NumericPlot (BasePlot ): 00062 """A plot of a single numeric dependent variable vs. a single independent 00063 variable 00064 """ 00065 def __init__ (self, subplot, indep_def, dep_def, indep_span): 00066 super(NumericPlot, self).__init__(subplot, indep_def, dep_def, indep_span) 00067 self.line, = subplot.plot([0]) 00068 00069 self.indep_data = deque() 00070 self.dep_data = deque() 00071 00072 def update_from_packet (self, packet): 00073 assert isinstance(packet, DataPacket) 00074 indep_val = packet.get_data_by_id(self.indep_id) 00075 dep_val = packet.get_data_by_id(self.dep_id) 00076 00077 if indep_val is not None and dep_val is not None: 00078 self.indep_data .append(indep_val) 00079 self.dep_data .append(dep_val) 00080 00081 indep_cutoff = indep_val - self.indep_span 00082 00083 while self.indep_data [0] < indep_cutoff or self.indep_data [0] > indep_val: 00084 self.indep_data .popleft() 00085 self.dep_data .popleft() 00086 00087 def update_show(self): 00088 self.line.set_xdata(self.indep_data ) 00089 self.line.set_ydata(self.dep_data ) 00090 00091 if self.limits is not None: 00092 minlim = self.limits[0] 00093 maxlim = self.limits[1] 00094 else: 00095 if not self.dep_data : 00096 return 00097 minlim = min(self.dep_data ) 00098 maxlim = max(self.dep_data ) 00099 if minlim < 0 and maxlim < 0: 00100 maxlim = 0 00101 elif minlim > 0 and maxlim > 0: 00102 minlim = 0 00103 rangelim = maxlim - minlim 00104 minlim -= rangelim / 20 # TODO make variable padding 00105 maxlim += rangelim / 20 00106 self.subplot.set_ylim(minlim, maxlim) 00107 00108 class WaterfallPlot(BasePlot ): 00109 def __init__ (self, subplot, indep_def, dep_def, indep_span): 00110 super(WaterfallPlot, self).__init__(subplot, indep_def, dep_def, indep_span) 00111 self.count = dep_def.count 00112 00113 self.x_mesh = [0] * (self.count + 1) 00114 self.x_mesh = np.array([self.x_mesh]) 00115 00116 self.y_array = range(0, self.count + 1) 00117 self.y_array = list(map(lambda x: x - 0.5, self.y_array)) 00118 self.y_mesh = np.array([self.y_array]) 00119 00120 self.data_array = None 00121 self.indep_data = deque() 00122 self.quad = None 00123 00124 def update_from_packet (self, packet): 00125 assert isinstance(packet, DataPacket) 00126 indep_val = packet.get_data_by_id(self.indep_id) 00127 dep_val = packet.get_data_by_id(self.dep_id) 00128 00129 if indep_val is not None and dep_val is not None: 00130 self.x_mesh = np.vstack([self.x_mesh, np.array([[indep_val] * (self.count + 1)])]) 00131 self.y_mesh = np.vstack([self.y_mesh, np.array(self.y_array)]) 00132 if self.data_array is None: 00133 self.data_array = np.array([dep_val]) 00134 else: 00135 self.data_array = np.vstack([self.data_array, dep_val]) 00136 00137 indep_cutoff = indep_val - self.indep_span 00138 00139 while self.x_mesh[0][0] < indep_cutoff or self.x_mesh[0][0] > indep_val: 00140 self.x_mesh = np.delete(self.x_mesh, (0), axis=0) 00141 self.y_mesh = np.delete(self.y_mesh, (0), axis=0) 00142 self.data_array = np.delete(self.data_array, (0), axis=0) 00143 00144 def update_show(self): 00145 if self.quad is not None: 00146 self.quad.remove() 00147 del self.quad 00148 00149 if self.limits is not None: 00150 self.quad = self.subplot.pcolorfast(self.x_mesh, self.y_mesh, self.data_array, 00151 cmap='gray', vmin=self.limits[0], vmax=self.limits[1], 00152 interpolation='None') 00153 else: 00154 self.quad = self.subplot.pcolorfast(self.x_mesh, self.y_mesh, self.data_array, 00155 cmap='gray', interpolation='None') 00156 00157 plot_registry = {} 00158 plot_registry[NumericData] = NumericPlot 00159 plot_registry[NumericArray] = WaterfallPlot 00160 00161 00162 def subplots_from_header(packet, figure, indep_def, indep_span=10000): 00163 """Instantiate subplots and plots from a received telemetry HeaderPacket. 00164 The default implementation creates a new plot for each dependent variable, 00165 but you can customize it to do better things. 00166 00167 Arguments: 00168 packet -- HeaderPacket object 00169 figure -- matplotlib figure to draw on 00170 indep_name -- internal_name of the independent variable. 00171 indep_span -- span of the independent variable to display. 00172 00173 Returns: a dict of matplotlib subpolots to list of contained BasePLot objects. 00174 """ 00175 assert isinstance(packet, HeaderPacket) 00176 00177 if indep_def is None: 00178 print("No independent variable") 00179 return [], [] 00180 00181 data_defs = [] 00182 for _, data_def in reversed(sorted(packet.get_data_defs().items())): 00183 if data_def != indep_def: 00184 data_defs.append(data_def) 00185 00186 plots_dict = {} 00187 00188 for plot_idx, data_def in enumerate(data_defs): 00189 ax = figure.add_subplot(len(data_defs), 1, len(data_defs)-plot_idx) 00190 ax.set_title("%s: %s (%s)" 00191 % (data_def.internal_name, data_def.display_name, data_def.units)) 00192 if plot_idx != 0: 00193 plt.setp(ax.get_xticklabels(), visible=False) 00194 00195 assert data_def.__class__ in plot_registry, "Unable to handle TelemetryData type %s" % data_def.__class__.__name__ 00196 plot = plot_registry[data_def.__class__](ax, indep_def, data_def, 00197 indep_span) 00198 plots_dict[ax] = [plot] 00199 00200 print("Found dependent data %s" % data_def.internal_name) 00201 00202 return plots_dict 00203 00204 class CsvLogger(object): 00205 def __init__ (self, name, header_packet): 00206 csv_header = [] 00207 internal_name_dict = {} 00208 display_name_dict = {} 00209 units_dict = {} 00210 00211 for _, data_def in sorted(header_packet.get_data_defs().items()): 00212 csv_header.append(data_def.data_id) 00213 internal_name_dict[data_def.data_id] = data_def.internal_name 00214 display_name_dict[data_def.data_id] = data_def.display_name 00215 units_dict[data_def.data_id] = data_def.units 00216 csv_header.append("data") # for non-telemetry data 00217 00218 self.csv_file = open(name, 'w', newline='') 00219 self.csv_writer = csv.DictWriter(self.csv_file, fieldnames=csv_header) 00220 00221 self.csv_writer.writerow(internal_name_dict) 00222 self.csv_writer.writerow(display_name_dict) 00223 self.csv_writer.writerow(units_dict) 00224 00225 self.pending_data = "" 00226 00227 def add_char(self, data): 00228 if data == '\n' or data == '\r': 00229 if self.pending_data: 00230 self.csv_writer.writerow({'data': self.pending_data}) 00231 self.pending_data = '' 00232 else: 00233 self.pending_data += data 00234 00235 def write_data(self, data_packet): 00236 if self.pending_data: 00237 self.csv_writer.writerow({'data': self.pending_data}) 00238 self.pending_data = "" 00239 self.csv_writer.writerow(data_packet.get_data_dict()) 00240 00241 def finish(self): 00242 if self.pending_data: 00243 self.csv_writer.writerow({'data': self.pending_data}) 00244 self.csv_file.close() 00245 00246 if __name__ == "__main__": 00247 import argparse 00248 parser = argparse.ArgumentParser(description='Telemetry data plotter.') 00249 parser.add_argument('port', metavar='p', help='serial port to receive on') 00250 parser.add_argument('--baud', metavar='b', type=int, default=38400, 00251 help='serial baud rate') 00252 parser.add_argument('--indep_name', metavar='i', default='time', 00253 help='internal name of independent axis') 00254 parser.add_argument('--span', metavar='s', type=int, default=10000, 00255 help='independent variable axis span') 00256 parser.add_argument('--log_prefix', metavar='f', default='telemetry', 00257 help='filename prefix for logging output, set to empty to disable logging') 00258 args = parser.parse_args() 00259 00260 telemetry = TelemetrySerial(serial.Serial(args.port, args.baud)) 00261 00262 fig = plt.figure() 00263 00264 # note: mutable elements are in lists to allow access from nested functions 00265 indep_def = [None] # note: data ID 0 is invalid 00266 latest_indep = [0] 00267 plots_dict = [[]] 00268 00269 csv_logger = [None] 00270 00271 def update(data): 00272 telemetry.process_rx() 00273 plot_updated = False 00274 00275 while True: 00276 packet = telemetry.next_rx_packet() 00277 if not packet: 00278 break 00279 00280 if isinstance(packet, HeaderPacket): 00281 fig.clf() 00282 00283 # get independent variable data ID 00284 indep_def[0] = None 00285 for _, data_def in packet.get_data_defs().items(): 00286 if data_def.internal_name == args.indep_name: 00287 indep_def[0] = data_def 00288 # TODO warn on missing indep id or duplicates 00289 00290 # instantiate plots 00291 plots_dict[0] = subplots_from_header(packet, fig, indep_def[0], args.span) 00292 00293 # prepare CSV file and headers 00294 timestring = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") 00295 filename = '%s-%s.csv' % (args.log_prefix, timestring) 00296 if csv_logger[0] is not None: 00297 csv_logger[0].finish() 00298 if args.log_prefix: 00299 csv_logger[0] = CsvLogger(filename, packet) 00300 00301 elif isinstance(packet, DataPacket): 00302 if indep_def[0] is not None: 00303 indep_value = packet.get_data_by_id(indep_def[0].data_id) 00304 if indep_value is not None: 00305 latest_indep[0] = indep_value 00306 for plot_list in plots_dict[0].values(): 00307 for plot in plot_list: 00308 plot.update_from_packet(packet) 00309 plot_updated = True 00310 00311 if csv_logger[0]: 00312 csv_logger[0].write_data(packet) 00313 00314 else: 00315 raise Exception("Unknown received packet %s" % repr(packet)) 00316 00317 while True: 00318 next_byte = telemetry.next_rx_byte() 00319 if next_byte is None: 00320 break 00321 try: 00322 print(chr(next_byte), end='') 00323 if csv_logger[0]: 00324 csv_logger[0].add_char(chr(next_byte)) 00325 except UnicodeEncodeError: 00326 pass 00327 00328 if plot_updated: 00329 for plot_list in plots_dict[0].values(): 00330 for plot in plot_list: 00331 plot.update_show() 00332 for subplot in plots_dict[0].keys(): 00333 subplot.set_xlim([latest_indep[0] - args.span, latest_indep[0]]) 00334 00335 def set_plot_dialog(plot): 00336 def set_plot_dialog_inner(): 00337 got = plot.get_dep_def().get_latest_value() 00338 error = "" 00339 while True: 00340 got = simpledialog.askstring("Set remote value", 00341 "Set %s\n%s" % (plot.get_name(), error), 00342 initialvalue=got) 00343 if got is None: 00344 break 00345 try: 00346 parsed = ast.literal_eval(got) 00347 telemetry.transmit_set_packet(plot.get_dep_def(), parsed) 00348 break 00349 except ValueError as e: 00350 error = str(e) 00351 except SyntaxError as e: 00352 error = str(e) 00353 00354 return set_plot_dialog_inner 00355 00356 def on_click(event): 00357 ax = event.inaxes 00358 if event.dblclick: 00359 if ax is None or ax not in plots_dict[0]: 00360 return 00361 plots_list = plots_dict[0][ax] 00362 if len(plots_list) > 1: 00363 menu = Menu(fig.canvas.get_tk_widget(), tearoff=0) 00364 for plot in plots_list: 00365 menu.add_command(label="Set %s" % plot.get_name(), 00366 command=set_plot_dialog(plot)) 00367 menu.post(event.guiEvent.x_root, event.guiEvent.y_root) 00368 elif len(plots_list) == 1: 00369 set_plot_dialog(plots_list[0])() 00370 else: 00371 return 00372 00373 fig.canvas.mpl_connect('button_press_event', on_click) 00374 ani = animation.FuncAnimation(fig, update, interval=30) 00375 plt.show()
Generated on Tue Jul 12 2022 22:03:01 by
