Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
parser.py
00001 from collections import deque 00002 from numbers import Number 00003 import struct 00004 import time 00005 00006 # TODO: MASSIVE REFACTORING EVERYWHERE 00007 00008 # lifted from https://stackoverflow.com/questions/36932/how-can-i-represent-an-enum-in-python 00009 def enum(*sequential, **named): 00010 enums = dict(zip(sequential, range(len(sequential))), **named) 00011 return type('Enum', (), enums) 00012 00013 # Global constants defined by the telemetry protocol. 00014 # TODO: parse constants from cpp header 00015 SOF_BYTE = [0x05, 0x39] 00016 00017 OPCODE_HEADER = 0x81 00018 OPCODE_DATA = 0x01 00019 00020 DATAID_TERMINATOR = 0x00 00021 00022 DATATYPE_NUMERIC = 0x01 00023 DATATYPE_NUMERIC_ARRAY = 0x02 00024 00025 NUMERIC_SUBTYPE_UINT = 0x01 00026 NUMERIC_SUBTYPE_SINT = 0x02 00027 NUMERIC_SUBTYPE_FLOAT = 0x03 00028 00029 RECORDID_TERMINATOR = 0x00 00030 00031 # Deserialization functions that (destructively) reads data from the input "stream". 00032 def deserialize_uint8(byte_stream): 00033 # TODO: handle overflow 00034 return byte_stream.popleft() 00035 00036 def deserialize_bool(byte_stream): 00037 # TODO: handle overflow 00038 return not(byte_stream.popleft() == 0) 00039 00040 def deserialize_uint16(byte_stream): 00041 # TODO: handle overflow 00042 return byte_stream.popleft() << 8 | byte_stream.popleft() 00043 00044 def deserialize_uint32(byte_stream): 00045 # TODO: handle overflow 00046 return (byte_stream.popleft() << 24 00047 | byte_stream.popleft() << 16 00048 | byte_stream.popleft() << 8 00049 | byte_stream.popleft()) 00050 00051 def deserialize_float(byte_stream): 00052 # TODO: handle overflow 00053 packed = bytearray([byte_stream.popleft(), 00054 byte_stream.popleft(), 00055 byte_stream.popleft(), 00056 byte_stream.popleft()]) 00057 return struct.unpack('!f', packed)[0] 00058 00059 def deserialize_numeric(byte_stream, subtype, length): 00060 if subtype == NUMERIC_SUBTYPE_UINT: 00061 value = 0 00062 remaining = length 00063 while remaining > 0: 00064 value = value << 8 | deserialize_uint8(byte_stream) 00065 remaining -= 1 00066 return value 00067 # TODO: add support for sint 00068 elif subtype == NUMERIC_SUBTYPE_FLOAT: 00069 if length == 4: 00070 return deserialize_float(byte_stream) 00071 else: 00072 raise UnknownNumericSubtype("Unknown float length %02x" % length) 00073 else: 00074 raise UnknownNumericSubtype("Unknown subtype %02x" % subtype) 00075 00076 def deserialize_numeric_from_def(data_def, count=None): 00077 def deserialize_numeric_inner(byte_stream): 00078 # these should have already been decoded 00079 assert hasattr(data_def, 'subtype') 00080 assert hasattr(data_def, 'length') 00081 if count is not None: 00082 inner_count = count 00083 out = [] 00084 while inner_count > 0: 00085 out.append(deserialize_numeric(byte_stream, data_def.subtype, data_def.length)) 00086 inner_count -= 1 00087 return out 00088 else: 00089 return deserialize_numeric(byte_stream, data_def.subtype, data_def.length) 00090 return deserialize_numeric_inner 00091 00092 def deserialize_string(byte_stream): 00093 # TODO: handle overflow 00094 outstr = "" 00095 data = byte_stream.popleft() 00096 while data: 00097 outstr += chr(data) 00098 data = byte_stream.popleft() 00099 return outstr 00100 00101 00102 00103 def serialize_uint8(value): 00104 if (not isinstance(value, int)) or (value < 0 or value > 255): 00105 raise ValueError("Invalid uint8: %s" % value) 00106 return struct.pack('!B', value) 00107 00108 def serialize_uint16(value): 00109 if (not isinstance(value, int)) or (value < 0 or value > 65535): 00110 raise ValueError("Invalid uint16: %s" % value) 00111 return struct.pack('!H', value) 00112 00113 def serialize_uint32(value): 00114 if (not isinstance(value, int)) or (value < 0 or value > 2 ** 32 - 1): 00115 raise ValueError("Invalid uint32: %s" % value) 00116 return struct.pack('!L', value) 00117 00118 def serialize_float(value): 00119 if not isinstance(value, Number): 00120 raise ValueError("Invalid uintfloat: %s" % value) 00121 return struct.pack('!f', value) 00122 00123 def serialize_numeric(value, subtype, length): 00124 if subtype == NUMERIC_SUBTYPE_UINT: 00125 if length == 1: 00126 return serialize_uint8(value) 00127 elif length == 2: 00128 return serialize_uint16(value) 00129 elif length == 4: 00130 return serialize_uint32(value) 00131 else: 00132 raise ValueError("Unknown uint length %02x" % length) 00133 elif subtype == NUMERIC_SUBTYPE_FLOAT: 00134 if length == 4: 00135 return serialize_float(value) 00136 else: 00137 raise ValueError("Unknown float length %02x" % length) 00138 else: 00139 raise ValueError("Unknown subtype %02x" % subtype) 00140 00141 00142 00143 PACKET_LENGTH_BYTES = 2 # number of bytes in the packet length field 00144 00145 class TelemetryDeserializationError(Exception): 00146 pass 00147 00148 class NoRecordIdError(TelemetryDeserializationError): 00149 pass 00150 class MissingKvrError(TelemetryDeserializationError): 00151 pass 00152 00153 datatype_registry = {} 00154 class TelemetryData (object): 00155 """Abstract base class for telemetry data ID definitions. 00156 """ 00157 def __repr__(self): 00158 out = self.__class__.__name__ 00159 for _, record_desc in self.get_kvrs_dict ().items(): 00160 record_name, _ = record_desc 00161 out += " %s=%s" % (record_name, repr(getattr(self, record_name))) 00162 return out 00163 00164 def get_kvrs_dict (self): 00165 """Returns a dict of record id => (record name, deserialization function) known by this class. 00166 The record name is used as an instance variable name if the KVR is read in. 00167 """ 00168 return { 00169 0x01: ('internal_name', deserialize_string), 00170 0x02: ('display_name', deserialize_string), 00171 0x03: ('units', deserialize_string), 00172 # TODO: make more robust by allowing defaults / partial parses 00173 # add record 0x08 = freeze 00174 } 00175 00176 @staticmethod 00177 def decode_header (data_id, byte_stream): 00178 """Decodes a data header from the telemetry stream, automatically detecting and returning 00179 the correct TelemetryData subclass object. 00180 """ 00181 opcode = byte_stream[0] 00182 if opcode not in datatype_registry: 00183 raise NoOpcodeError("No datatype %02x" % opcode) 00184 data_cls = datatype_registry[opcode] 00185 return data_cls(data_id, byte_stream) 00186 00187 def __init__(self, data_id, byte_stream): 00188 self.data_id = data_id 00189 self.data_type = deserialize_uint8(byte_stream) 00190 00191 self.internal_name = "%02x" % data_id 00192 self.display_name = self.internal_name 00193 self.units = "" 00194 00195 self.latest_value = None 00196 00197 self.decode_kvrs (byte_stream) 00198 00199 def decode_kvrs (self, byte_stream): 00200 """Destructively reads in a sequence of KVRs from the input stream, writing 00201 the known ones as instance variables and throwing exceptions on unknowns. 00202 """ 00203 kvrs_dict = self.get_kvrs_dict () 00204 while True: 00205 record_id = deserialize_uint8(byte_stream) 00206 if record_id == RECORDID_TERMINATOR: 00207 break 00208 elif record_id not in kvrs_dict: 00209 raise NoRecordIdError("No RecordId %02x in %s" % (record_id, self.__class__.__name__)) 00210 record_name, record_deserializer = kvrs_dict[record_id] 00211 setattr(self, record_name, record_deserializer(byte_stream)) 00212 00213 # check that all KVRs have been read in / defaulted 00214 for record_id, record_desc in kvrs_dict.items(): 00215 record_name, _ = record_desc 00216 if not hasattr(self, record_name): 00217 raise NoRecordIdError("%s missing RecordId %02x (%s) in header" % (self.__class__.__name__, record_id, record_name)) 00218 00219 def deserialize_data (self, byte_stream): 00220 """Destructively reads in the data of this type from the input stream. 00221 """ 00222 raise NotImplementedError 00223 00224 def serialize_data (self, value): 00225 """Returns the serialized version (as bytes) of this data given a value. 00226 Can raise a ValueError if there is a conversion issue. 00227 """ 00228 raise NotImplementedError 00229 00230 def get_latest_value(self): 00231 return self.latest_value 00232 00233 def set_latest_value(self, value): 00234 self.latest_value = value 00235 00236 00237 00238 class UnknownNumericSubtype(Exception): 00239 pass 00240 00241 class NumericData(TelemetryData): 00242 def get_kvrs_dict (self): 00243 newdict = super(NumericData, self).get_kvrs_dict().copy() 00244 newdict.update({ 00245 0x40: ('subtype', deserialize_uint8), 00246 0x41: ('length', deserialize_uint8), 00247 0x42: ('limits', deserialize_numeric_from_def(self, count=2)), 00248 }) 00249 return newdict 00250 00251 def deserialize_data (self, byte_stream): 00252 return deserialize_numeric(byte_stream, self.subtype, self.length) 00253 00254 def serialize_data (self, value): 00255 return serialize_numeric(value, self.subtype, self.length) 00256 00257 datatype_registry[DATATYPE_NUMERIC] = NumericData 00258 00259 class NumericArray(TelemetryData): 00260 def get_kvrs_dict (self): 00261 newdict = super(NumericArray, self).get_kvrs_dict().copy() 00262 newdict.update({ 00263 0x40: ('subtype', deserialize_uint8), 00264 0x41: ('length', deserialize_uint8), 00265 0x42: ('limits', deserialize_numeric_from_def(self, count=2)), 00266 0x50: ('count', deserialize_uint32), 00267 }) 00268 return newdict 00269 00270 def deserialize_data (self, byte_stream): 00271 out = [] 00272 for _ in range(self.count): 00273 out.append(deserialize_numeric(byte_stream, self.subtype, self.length)) 00274 return out 00275 00276 def serialize_data (self, value): 00277 if len(value) != self.count: 00278 raise ValueError("Length mismatch: got %i, expected %i" 00279 % (len(value), self.count)) 00280 out = bytes() 00281 for elt in value: 00282 out += serialize_numeric(elt, self.subtype, self.length) 00283 return out 00284 00285 datatype_registry[DATATYPE_NUMERIC_ARRAY] = NumericArray 00286 00287 class PacketSizeError(TelemetryDeserializationError): 00288 pass 00289 class NoOpcodeError(TelemetryDeserializationError): 00290 pass 00291 class DuplicateDataIdError(TelemetryDeserializationError): 00292 pass 00293 class UndefinedDataIdError(TelemetryDeserializationError): 00294 pass 00295 00296 opcodes_registry = {} 00297 class TelemetryPacket (object): 00298 """Abstract base class for telemetry packets. 00299 """ 00300 @staticmethod 00301 def decode(byte_stream, context): 00302 opcode = byte_stream[0] 00303 if opcode not in opcodes_registry: 00304 raise NoOpcodeError("No opcode %02x" % opcode) 00305 packet_cls = opcodes_registry[opcode] 00306 return packet_cls(byte_stream, context) 00307 00308 def __init__(self, byte_stream, context): 00309 self.opcode = deserialize_uint8(byte_stream) 00310 self.sequence = deserialize_uint8(byte_stream) 00311 self.decode_payload (byte_stream, context) 00312 if len(byte_stream) > 0: 00313 raise PacketSizeError("%i unused bytes in packet" % len(byte_stream)) 00314 00315 def decode_payload(self, byte_stream, context): 00316 raise NotImplementedError 00317 00318 class HeaderPacket(TelemetryPacket ): 00319 def __repr__(self): 00320 return "[%i]Header: %s" % (self.sequence , repr(self.data)) 00321 00322 def decode_payload(self, byte_stream, context): 00323 self.data = {} 00324 while True: 00325 data_id = deserialize_uint8(byte_stream) 00326 if data_id == DATAID_TERMINATOR: 00327 break 00328 elif data_id in self.data: 00329 raise DuplicateDataIdError("Duplicate DataId %02x" % data_id) 00330 self.data[data_id] = TelemetryData.decode_header(data_id, byte_stream) 00331 00332 def get_data_defs(self): 00333 """Returns the data defs defined in this header as a dict of data ID to 00334 TelemetryData objects. 00335 """ 00336 return self.data 00337 00338 def get_data_names(self): 00339 data_names = [] 00340 for data_def in self.data.values(): 00341 data_names.append(data_def.internal_name) 00342 return data_names 00343 00344 opcodes_registry[OPCODE_HEADER] = HeaderPacket 00345 00346 class DataPacket(TelemetryPacket ): 00347 def __repr__(self): 00348 return "[%i]Data: %s" % (self.sequence , repr(self.data)) 00349 00350 def decode_payload(self, byte_stream, context): 00351 self.data = {} 00352 while True: 00353 data_id = deserialize_uint8(byte_stream) 00354 if data_id == DATAID_TERMINATOR: 00355 break 00356 data_def = context.get_data_def(data_id) 00357 if not data_def: 00358 raise UndefinedDataIdError("Received DataId %02x not defined in header" % data_id) 00359 data_value = data_def.deserialize_data(byte_stream) 00360 data_def.set_latest_value(data_value) 00361 self.data[data_def.data_id] = data_value 00362 00363 def get_data_dict(self): 00364 return self.data 00365 00366 def get_data_by_id(self, data_id): 00367 if data_id in self.data: 00368 return self.data[data_id] 00369 else: 00370 return None 00371 00372 opcodes_registry[OPCODE_DATA] = DataPacket 00373 00374 00375 00376 class TelemetryContext (object): 00377 """Context for telemetry communications, containing the setup information in 00378 the header. 00379 """ 00380 def __init__(self, data_defs): 00381 self.data_defs = data_defs 00382 00383 def get_data_def(self, data_id): 00384 if data_id in self.data_defs : 00385 return self.data_defs [data_id] 00386 else: 00387 return None 00388 00389 00390 00391 class TelemetrySerial (object): 00392 """Telemetry serial receiver state machine. Separates out telemetry packets 00393 from the rest of the stream. 00394 """ 00395 DecoderState = enum('SOF', 'LENGTH', 'DATA', 'DATA_DESTUFF', 'DATA_DESTUFF_END') 00396 PACKET_TIMEOUT_THRESHOLD = 0.1 # seconds 00397 00398 def __init__(self, serial): 00399 self.serial = serial 00400 00401 self.rx_packets = deque() # queued decoded packets 00402 00403 self.context = TelemetryContext([]) 00404 00405 # decoder state machine variables 00406 self.decoder_state = self.DecoderState .SOF; # expected next byte 00407 self.decoder_pos = 0; # position within decoder_State 00408 self.packet_length = 0; # expected packet length 00409 self.packet_buffer = deque() 00410 00411 self.data_buffer = deque() 00412 00413 # decoder packet timeout variables 00414 self.last_loop_received = False 00415 self.last_receive_time = time.time() 00416 00417 def process_rx(self): 00418 if ((not self.last_loop_received ) 00419 and (time.time() - self.last_receive_time > self.PACKET_TIMEOUT_THRESHOLD ) 00420 and (self.decoder_state != self.DecoderState .SOF and self.decoder_pos > 0)): 00421 self.decoder_state = self.DecoderState .SOF 00422 self.decoder_pos = 0 00423 self.packet_buffer = deque() 00424 print("Packet timed out; dropping") 00425 00426 self.last_loop_received = False 00427 00428 while self.serial .inWaiting(): 00429 self.last_loop_received = True 00430 self.last_receive_time = time.time() 00431 00432 rx_byte = ord(self.serial .read()) 00433 if self.decoder_state == self.DecoderState .SOF: 00434 self.packet_buffer .append(rx_byte) 00435 00436 if rx_byte == SOF_BYTE[self.decoder_pos ]: 00437 self.decoder_pos += 1 00438 if self.decoder_pos == len(SOF_BYTE): 00439 self.packet_length = 0 00440 self.decoder_pos = 0 00441 self.decoder_state = self.DecoderState .LENGTH 00442 else: 00443 self.data_buffer .extend(self.packet_buffer ) 00444 self.packet_buffer = deque() 00445 self.decoder_pos = 0 00446 elif self.decoder_state == self.DecoderState .LENGTH: 00447 self.packet_length = self.packet_length << 8 | rx_byte 00448 self.decoder_pos += 1 00449 if self.decoder_pos == PACKET_LENGTH_BYTES: 00450 self.packet_buffer = deque() 00451 self.decoder_pos = 0 00452 self.decoder_state = self.DecoderState .DATA 00453 elif self.decoder_state == self.DecoderState .DATA: 00454 self.packet_buffer .append(rx_byte) 00455 self.decoder_pos += 1 00456 if self.decoder_pos == self.packet_length : 00457 try: 00458 decoded = TelemetryPacket.decode(self.packet_buffer , self.context ) 00459 00460 if isinstance(decoded, HeaderPacket): 00461 self.context = TelemetryContext(decoded.get_data_defs()) 00462 00463 self.rx_packets .append(decoded) 00464 except TelemetryDeserializationError as e: 00465 print("Deserialization error: %s" % repr(e)) # TODO prettier cleaner 00466 00467 self.packet_buffer = deque() 00468 00469 self.decoder_pos = 0 00470 if rx_byte == SOF_BYTE[0]: 00471 self.decoder_state = self.DecoderState .DATA_DESTUFF_END 00472 else: 00473 self.decoder_state = self.DecoderState .SOF 00474 elif rx_byte == SOF_BYTE[0]: 00475 self.decoder_state = self.DecoderState .DATA_DESTUFF 00476 elif self.decoder_state == self.DecoderState .DATA_DESTUFF: 00477 self.decoder_state = self.DecoderState .DATA 00478 elif self.decoder_state == self.DecoderState .DATA_DESTUFF_END: 00479 self.decoder_state = self.DecoderState .SOF 00480 else: 00481 raise RuntimeError("Unknown DecoderState") 00482 00483 def transmit_set_packet(self, data_def, value): 00484 packet = bytearray() 00485 packet += serialize_uint8(OPCODE_DATA) 00486 packet += serialize_uint8(data_def.data_id) 00487 packet += data_def.serialize_data(value) 00488 packet += serialize_uint8(DATAID_TERMINATOR) 00489 self.transmit_packet (packet) 00490 00491 def transmit_packet(self, packet): 00492 header = bytearray() 00493 for elt in SOF_BYTE: 00494 header += serialize_uint8(elt) 00495 header += serialize_uint16(len(packet)) 00496 00497 modified_packet = bytearray() 00498 for packet_byte in packet: 00499 modified_packet.append(packet_byte) 00500 if packet_byte == SOF_BYTE[0]: 00501 modified_packet.append(0x00) 00502 00503 self.serial .write(header + modified_packet) 00504 00505 # TODO: add CRC support 00506 00507 def next_rx_packet(self): 00508 if self.rx_packets : 00509 return self.rx_packets .popleft() 00510 else: 00511 return None 00512 00513 def next_rx_byte(self): 00514 if self.data_buffer : 00515 return self.data_buffer .popleft() 00516 else: 00517 return None
Generated on Tue Jul 12 2022 22:03:01 by
