E=MC / telemetry-master
Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers parser.py Source File

parser.py

00001 from collections import deque
00002 from numbers import Number
00003 import struct
00004 import time
00005 
00006 # TODO: MASSIVE REFACTORING EVERYWHERE
00007 
00008 # lifted from https://stackoverflow.com/questions/36932/how-can-i-represent-an-enum-in-python
00009 def enum(*sequential, **named):
00010     enums = dict(zip(sequential, range(len(sequential))), **named)
00011     return type('Enum', (), enums)
00012 
00013 # Global constants defined by the telemetry protocol.
00014 # TODO: parse constants from cpp header
00015 SOF_BYTE = [0x05, 0x39]
00016 
00017 OPCODE_HEADER = 0x81
00018 OPCODE_DATA = 0x01
00019 
00020 DATAID_TERMINATOR = 0x00
00021 
00022 DATATYPE_NUMERIC = 0x01
00023 DATATYPE_NUMERIC_ARRAY = 0x02
00024 
00025 NUMERIC_SUBTYPE_UINT = 0x01
00026 NUMERIC_SUBTYPE_SINT = 0x02
00027 NUMERIC_SUBTYPE_FLOAT = 0x03
00028 
00029 RECORDID_TERMINATOR = 0x00
00030 
00031 # Deserialization functions that (destructively) reads data from the input "stream".
00032 def deserialize_uint8(byte_stream):
00033 # TODO: handle overflow
00034   return byte_stream.popleft()
00035 
00036 def deserialize_bool(byte_stream):
00037 # TODO: handle overflow
00038   return not(byte_stream.popleft() == 0)
00039 
00040 def deserialize_uint16(byte_stream):
00041   # TODO: handle overflow
00042   return byte_stream.popleft() << 8 | byte_stream.popleft()
00043 
00044 def deserialize_uint32(byte_stream):
00045   # TODO: handle overflow
00046   return (byte_stream.popleft() << 24 
00047          | byte_stream.popleft() << 16
00048          | byte_stream.popleft() << 8
00049          | byte_stream.popleft())
00050   
00051 def deserialize_float(byte_stream):
00052   # TODO: handle overflow
00053   packed = bytearray([byte_stream.popleft(), 
00054                       byte_stream.popleft(),
00055                       byte_stream.popleft(),
00056                       byte_stream.popleft()])
00057   return struct.unpack('!f', packed)[0]
00058 
00059 def deserialize_numeric(byte_stream, subtype, length):
00060   if subtype == NUMERIC_SUBTYPE_UINT:
00061     value = 0
00062     remaining = length
00063     while remaining > 0:
00064       value = value << 8 | deserialize_uint8(byte_stream)
00065       remaining -= 1
00066     return value
00067     # TODO: add support for sint
00068   elif subtype == NUMERIC_SUBTYPE_FLOAT:
00069     if length == 4:
00070       return deserialize_float(byte_stream)
00071     else:
00072       raise UnknownNumericSubtype("Unknown float length %02x" % length)
00073   else:
00074     raise UnknownNumericSubtype("Unknown subtype %02x" % subtype)
00075 
00076 def deserialize_numeric_from_def(data_def, count=None):
00077   def deserialize_numeric_inner(byte_stream):
00078     # these should have already been decoded
00079     assert hasattr(data_def, 'subtype')
00080     assert hasattr(data_def, 'length')
00081     if count is not None:
00082       inner_count = count
00083       out = []
00084       while inner_count > 0:
00085         out.append(deserialize_numeric(byte_stream, data_def.subtype, data_def.length))
00086         inner_count -= 1
00087       return out
00088     else:
00089       return deserialize_numeric(byte_stream, data_def.subtype, data_def.length)
00090   return deserialize_numeric_inner
00091 
00092 def deserialize_string(byte_stream):
00093   # TODO: handle overflow
00094   outstr = ""
00095   data = byte_stream.popleft()
00096   while data:
00097     outstr += chr(data)
00098     data = byte_stream.popleft()
00099   return outstr
00100 
00101 
00102 
00103 def serialize_uint8(value):
00104   if (not isinstance(value, int)) or (value < 0 or value > 255):
00105     raise ValueError("Invalid uint8: %s" % value)
00106   return struct.pack('!B', value)
00107 
00108 def serialize_uint16(value):
00109   if (not isinstance(value, int)) or (value < 0 or value > 65535):
00110     raise ValueError("Invalid uint16: %s" % value) 
00111   return struct.pack('!H', value)
00112 
00113 def serialize_uint32(value):
00114   if (not isinstance(value, int)) or (value < 0 or value > 2 ** 32 - 1): 
00115     raise ValueError("Invalid uint32: %s" % value)
00116   return struct.pack('!L', value)
00117 
00118 def serialize_float(value):
00119   if not isinstance(value, Number):
00120     raise ValueError("Invalid uintfloat: %s" % value)
00121   return struct.pack('!f', value)
00122 
00123 def serialize_numeric(value, subtype, length):
00124   if subtype == NUMERIC_SUBTYPE_UINT:
00125     if length == 1:
00126       return serialize_uint8(value)
00127     elif length == 2:
00128       return serialize_uint16(value)
00129     elif length == 4:
00130       return serialize_uint32(value)
00131     else:
00132       raise ValueError("Unknown uint length %02x" % length)
00133   elif subtype == NUMERIC_SUBTYPE_FLOAT:
00134     if length == 4:
00135       return serialize_float(value)
00136     else:
00137       raise ValueError("Unknown float length %02x" % length)
00138   else:
00139     raise ValueError("Unknown subtype %02x" % subtype)
00140 
00141 
00142 
00143 PACKET_LENGTH_BYTES = 2 # number of bytes in the packet length field
00144 
00145 class TelemetryDeserializationError(Exception):
00146   pass
00147 
00148 class NoRecordIdError(TelemetryDeserializationError):
00149   pass
00150 class MissingKvrError(TelemetryDeserializationError):
00151   pass
00152 
00153 datatype_registry = {}
00154 class TelemetryData (object):
00155   """Abstract base class for telemetry data ID definitions.
00156   """
00157   def __repr__(self):
00158     out = self.__class__.__name__
00159     for _, record_desc in self.get_kvrs_dict ().items():
00160       record_name, _ = record_desc
00161       out += " %s=%s" % (record_name, repr(getattr(self, record_name)))
00162     return out
00163   
00164   def get_kvrs_dict (self):
00165     """Returns a dict of record id => (record name, deserialization function) known by this class.
00166     The record name is used as an instance variable name if the KVR is read in.
00167     """
00168     return {
00169       0x01: ('internal_name', deserialize_string),
00170       0x02: ('display_name', deserialize_string),
00171       0x03: ('units', deserialize_string),
00172       # TODO: make more robust by allowing defaults / partial parses
00173       # add record 0x08 = freeze
00174     }
00175   
00176   @staticmethod
00177   def decode_header (data_id, byte_stream):
00178     """Decodes a data header from the telemetry stream, automatically detecting and returning
00179     the correct TelemetryData subclass object.
00180     """
00181     opcode = byte_stream[0]
00182     if opcode not in datatype_registry:
00183       raise NoOpcodeError("No datatype %02x" % opcode)
00184     data_cls = datatype_registry[opcode]
00185     return data_cls(data_id, byte_stream)
00186   
00187   def __init__(self, data_id, byte_stream):
00188     self.data_id  = data_id
00189     self.data_type  = deserialize_uint8(byte_stream)
00190     
00191     self.internal_name  = "%02x" % data_id
00192     self.display_name  = self.internal_name 
00193     self.units  = ""
00194     
00195     self.latest_value  = None
00196     
00197     self.decode_kvrs (byte_stream)
00198     
00199   def decode_kvrs (self, byte_stream):
00200     """Destructively reads in a sequence of KVRs from the input stream, writing
00201     the known ones as instance variables and throwing exceptions on unknowns.
00202     """
00203     kvrs_dict = self.get_kvrs_dict ()
00204     while True:
00205       record_id = deserialize_uint8(byte_stream)
00206       if record_id == RECORDID_TERMINATOR:
00207         break
00208       elif record_id not in kvrs_dict:
00209         raise NoRecordIdError("No RecordId %02x in %s" % (record_id, self.__class__.__name__))
00210       record_name, record_deserializer = kvrs_dict[record_id]
00211       setattr(self, record_name, record_deserializer(byte_stream))
00212       
00213     # check that all KVRs have been read in / defaulted
00214     for record_id, record_desc in kvrs_dict.items():
00215       record_name, _ = record_desc
00216       if not hasattr(self, record_name):
00217         raise NoRecordIdError("%s missing RecordId %02x (%s) in header" % (self.__class__.__name__, record_id, record_name))
00218 
00219   def deserialize_data (self, byte_stream):
00220     """Destructively reads in the data of this type from the input stream.
00221     """
00222     raise NotImplementedError
00223   
00224   def serialize_data (self, value):
00225     """Returns the serialized version (as bytes) of this data given a value.
00226     Can raise a ValueError if there is a conversion issue.
00227     """
00228     raise NotImplementedError
00229   
00230   def get_latest_value(self):
00231     return self.latest_value 
00232   
00233   def set_latest_value(self, value):
00234     self.latest_value  = value
00235   
00236   
00237   
00238 class UnknownNumericSubtype(Exception):
00239   pass
00240   
00241 class NumericData(TelemetryData):
00242   def get_kvrs_dict (self):
00243     newdict = super(NumericData, self).get_kvrs_dict().copy()
00244     newdict.update({ 
00245       0x40: ('subtype', deserialize_uint8),
00246       0x41: ('length', deserialize_uint8),
00247       0x42: ('limits', deserialize_numeric_from_def(self, count=2)),
00248     })
00249     return newdict
00250     
00251   def deserialize_data (self, byte_stream):
00252     return deserialize_numeric(byte_stream, self.subtype, self.length)
00253   
00254   def serialize_data (self, value):
00255     return serialize_numeric(value, self.subtype, self.length)
00256   
00257 datatype_registry[DATATYPE_NUMERIC] = NumericData
00258 
00259 class NumericArray(TelemetryData):
00260   def get_kvrs_dict (self):
00261     newdict = super(NumericArray, self).get_kvrs_dict().copy()
00262     newdict.update({ 
00263       0x40: ('subtype', deserialize_uint8),
00264       0x41: ('length', deserialize_uint8),
00265       0x42: ('limits', deserialize_numeric_from_def(self, count=2)),
00266       0x50: ('count', deserialize_uint32),
00267     })
00268     return newdict 
00269   
00270   def deserialize_data (self, byte_stream):
00271     out = []
00272     for _ in range(self.count):
00273       out.append(deserialize_numeric(byte_stream, self.subtype, self.length))
00274     return out
00275   
00276   def serialize_data (self, value):
00277     if len(value) != self.count:
00278       raise ValueError("Length mismatch: got %i, expected %i"
00279                        % (len(value), self.count))
00280     out = bytes()
00281     for elt in value:
00282       out += serialize_numeric(elt, self.subtype, self.length)
00283     return out
00284   
00285 datatype_registry[DATATYPE_NUMERIC_ARRAY] = NumericArray
00286 
00287 class PacketSizeError(TelemetryDeserializationError):
00288   pass
00289 class NoOpcodeError(TelemetryDeserializationError):
00290   pass
00291 class DuplicateDataIdError(TelemetryDeserializationError):
00292   pass
00293 class UndefinedDataIdError(TelemetryDeserializationError):
00294   pass
00295 
00296 opcodes_registry = {}
00297 class TelemetryPacket (object):
00298   """Abstract base class for telemetry packets.
00299   """
00300   @staticmethod
00301   def decode(byte_stream, context):
00302     opcode = byte_stream[0]
00303     if opcode not in opcodes_registry:
00304       raise NoOpcodeError("No opcode %02x" % opcode)
00305     packet_cls = opcodes_registry[opcode]
00306     return packet_cls(byte_stream, context)
00307   
00308   def __init__(self, byte_stream, context):
00309     self.opcode  = deserialize_uint8(byte_stream)
00310     self.sequence  = deserialize_uint8(byte_stream)
00311     self.decode_payload (byte_stream, context)
00312     if len(byte_stream) > 0:
00313       raise PacketSizeError("%i unused bytes in packet" % len(byte_stream))
00314       
00315   def decode_payload(self, byte_stream, context):
00316     raise NotImplementedError
00317 
00318 class HeaderPacket(TelemetryPacket ):
00319   def __repr__(self):
00320     return "[%i]Header: %s" % (self.sequence , repr(self.data))
00321     
00322   def decode_payload(self, byte_stream, context):
00323     self.data = {}
00324     while True:
00325       data_id = deserialize_uint8(byte_stream)
00326       if data_id == DATAID_TERMINATOR:
00327         break
00328       elif data_id in self.data:
00329         raise DuplicateDataIdError("Duplicate DataId %02x" % data_id)
00330       self.data[data_id] = TelemetryData.decode_header(data_id, byte_stream)
00331       
00332   def get_data_defs(self):
00333     """Returns the data defs defined in this header as a dict of data ID to
00334     TelemetryData objects.
00335     """
00336     return self.data
00337   
00338   def get_data_names(self):
00339     data_names = []
00340     for data_def in self.data.values():
00341       data_names.append(data_def.internal_name)
00342     return data_names
00343       
00344 opcodes_registry[OPCODE_HEADER] = HeaderPacket
00345 
00346 class DataPacket(TelemetryPacket ):
00347   def __repr__(self):
00348     return "[%i]Data: %s" % (self.sequence , repr(self.data))
00349     
00350   def decode_payload(self, byte_stream, context):
00351     self.data = {}
00352     while True:
00353       data_id = deserialize_uint8(byte_stream)
00354       if data_id == DATAID_TERMINATOR:
00355         break
00356       data_def = context.get_data_def(data_id)
00357       if not data_def:
00358         raise UndefinedDataIdError("Received DataId %02x not defined in header" % data_id)
00359       data_value = data_def.deserialize_data(byte_stream)
00360       data_def.set_latest_value(data_value)
00361       self.data[data_def.data_id] = data_value 
00362 
00363   def get_data_dict(self):
00364     return self.data
00365 
00366   def get_data_by_id(self, data_id):
00367     if data_id in self.data:
00368       return self.data[data_id]
00369     else:
00370       return None
00371 
00372 opcodes_registry[OPCODE_DATA] = DataPacket  
00373 
00374 
00375 
00376 class TelemetryContext (object):
00377   """Context for telemetry communications, containing the setup information in
00378   the header.
00379   """
00380   def __init__(self, data_defs):
00381     self.data_defs  = data_defs
00382     
00383   def get_data_def(self, data_id):
00384     if data_id in self.data_defs :
00385       return self.data_defs [data_id]
00386     else:
00387       return None
00388 
00389 
00390 
00391 class TelemetrySerial (object):
00392   """Telemetry serial receiver state machine. Separates out telemetry packets
00393   from the rest of the stream.
00394   """
00395   DecoderState = enum('SOF', 'LENGTH', 'DATA', 'DATA_DESTUFF', 'DATA_DESTUFF_END')
00396   PACKET_TIMEOUT_THRESHOLD = 0.1  # seconds
00397   
00398   def __init__(self, serial):
00399     self.serial  = serial
00400     
00401     self.rx_packets  = deque()  # queued decoded packets
00402     
00403     self.context  = TelemetryContext([])
00404     
00405     # decoder state machine variables
00406     self.decoder_state  = self.DecoderState .SOF;  # expected next byte
00407     self.decoder_pos  = 0; # position within decoder_State
00408     self.packet_length  = 0;  # expected packet length
00409     self.packet_buffer  = deque()
00410     
00411     self.data_buffer  = deque()
00412     
00413     # decoder packet timeout variables
00414     self.last_loop_received  = False
00415     self.last_receive_time  = time.time()
00416 
00417   def process_rx(self):
00418     if ((not self.last_loop_received )
00419         and (time.time() - self.last_receive_time  > self.PACKET_TIMEOUT_THRESHOLD )
00420         and (self.decoder_state  != self.DecoderState .SOF and self.decoder_pos  > 0)):
00421       self.decoder_state  = self.DecoderState .SOF
00422       self.decoder_pos  = 0
00423       self.packet_buffer  = deque()
00424       print("Packet timed out; dropping")
00425       
00426     self.last_loop_received  = False
00427     
00428     while self.serial .inWaiting():
00429       self.last_loop_received  = True
00430       self.last_receive_time  = time.time()
00431 
00432       rx_byte = ord(self.serial .read())
00433       if self.decoder_state  == self.DecoderState .SOF:
00434         self.packet_buffer .append(rx_byte)
00435         
00436         if rx_byte == SOF_BYTE[self.decoder_pos ]:
00437           self.decoder_pos  += 1
00438           if self.decoder_pos  == len(SOF_BYTE):
00439             self.packet_length  = 0
00440             self.decoder_pos  = 0
00441             self.decoder_state  = self.DecoderState .LENGTH
00442         else:
00443           self.data_buffer .extend(self.packet_buffer )
00444           self.packet_buffer  = deque()
00445           self.decoder_pos  = 0
00446       elif self.decoder_state  == self.DecoderState .LENGTH:
00447         self.packet_length  = self.packet_length  << 8 | rx_byte
00448         self.decoder_pos  += 1
00449         if self.decoder_pos  == PACKET_LENGTH_BYTES:
00450           self.packet_buffer  = deque()
00451           self.decoder_pos  = 0
00452           self.decoder_state  = self.DecoderState .DATA
00453       elif self.decoder_state  == self.DecoderState .DATA:
00454         self.packet_buffer .append(rx_byte)
00455         self.decoder_pos  += 1
00456         if self.decoder_pos  == self.packet_length :
00457           try:
00458             decoded = TelemetryPacket.decode(self.packet_buffer , self.context )
00459           
00460             if isinstance(decoded, HeaderPacket):
00461               self.context  = TelemetryContext(decoded.get_data_defs())
00462             
00463             self.rx_packets .append(decoded)
00464           except TelemetryDeserializationError as e:
00465             print("Deserialization error: %s" % repr(e)) # TODO prettier cleaner
00466           
00467           self.packet_buffer  = deque()
00468       
00469           self.decoder_pos  = 0
00470           if rx_byte == SOF_BYTE[0]:
00471             self.decoder_state  = self.DecoderState .DATA_DESTUFF_END
00472           else:
00473             self.decoder_state  = self.DecoderState .SOF
00474         elif rx_byte == SOF_BYTE[0]:
00475           self.decoder_state  = self.DecoderState .DATA_DESTUFF
00476       elif self.decoder_state  == self.DecoderState .DATA_DESTUFF:
00477         self.decoder_state  = self.DecoderState .DATA
00478       elif self.decoder_state  == self.DecoderState .DATA_DESTUFF_END:
00479         self.decoder_state  = self.DecoderState .SOF
00480       else:
00481         raise RuntimeError("Unknown DecoderState")
00482 
00483   def transmit_set_packet(self, data_def, value):
00484     packet = bytearray()
00485     packet += serialize_uint8(OPCODE_DATA)
00486     packet += serialize_uint8(data_def.data_id)
00487     packet += data_def.serialize_data(value)
00488     packet += serialize_uint8(DATAID_TERMINATOR)
00489     self.transmit_packet (packet)
00490   
00491   def transmit_packet(self, packet):
00492     header = bytearray()
00493     for elt in SOF_BYTE:
00494       header += serialize_uint8(elt)
00495     header += serialize_uint16(len(packet))
00496     
00497     modified_packet = bytearray()
00498     for packet_byte in packet:
00499       modified_packet.append(packet_byte)
00500       if packet_byte == SOF_BYTE[0]:
00501         modified_packet.append(0x00)
00502     
00503     self.serial .write(header + modified_packet)
00504 
00505     # TODO: add CRC support
00506       
00507   def next_rx_packet(self):
00508     if self.rx_packets :
00509       return self.rx_packets .popleft()
00510     else:
00511       return None
00512     
00513   def next_rx_byte(self):
00514     if self.data_buffer :
00515       return self.data_buffer .popleft()
00516     else:
00517       return None