Arrow / Mbed OS DAPLink Reset
Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers serial_test.py Source File

serial_test.py

00001 #
00002 # DAPLink Interface Firmware
00003 # Copyright (c) 2009-2016, ARM Limited, All Rights Reserved
00004 # SPDX-License-Identifier: Apache-2.0
00005 #
00006 # Licensed under the Apache License, Version 2.0 (the "License"); you may
00007 # not use this file except in compliance with the License.
00008 # You may obtain a copy of the License at
00009 #
00010 # http://www.apache.org/licenses/LICENSE-2.0
00011 #
00012 # Unless required by applicable law or agreed to in writing, software
00013 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
00014 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00015 # See the License for the specific language governing permissions and
00016 # limitations under the License.
00017 #
00018 
00019 from __future__ import absolute_import
00020 from __future__ import division
00021 import queue
00022 import functools
00023 import serial
00024 import threading
00025 import time
00026 
00027 ERROR_TIMEOUT_SECONDS = 10.0
00028 
00029 
00030 def _same(d1, d2):
00031     #Do a string or bytearray compare 
00032     if d1 != d2:
00033         return False
00034     return True
00035 
00036 # http://digital.ni.com/public.nsf/allkb/D37754FFA24F7C3F86256706005B9BE7
00037 standard_baud = [
00038     9600,
00039     14400,
00040     19200,
00041     28800,
00042     38400,
00043     #56000, #TODO - uncomment once daplink-validation supports 56000 on nrf5x
00044     57600,
00045     115200,
00046     ]
00047 timing_test_baud = standard_baud[3:]
00048 
00049 
00050 def calc_timeout(length, baud):
00051     """Calculate a timeout given the data and baudrate
00052 
00053     Positional arguments:
00054         length - size of data to be sent
00055         baud - baud rate to send data
00056 
00057     Calculate a reasonable timeout given the supplied parameters.
00058     This function adds slightly more time then is needed, to accont
00059     for latency and various configurations.
00060     """
00061     return 12 * float(length) / float(baud) + 0.2
00062 
00063 
00064 class SerialTester (object):
00065     """Helper object to buffer serial and setup baud"""
00066 
00067     def __init__(self, port):
00068         self.raw_serial  = serial.Serial(port=port,bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=None, xonxoff=False, rtscts=False, write_timeout=None, dsrdtr=False, inter_byte_timeout=None, exclusive=None)
00069         self.raw_serial .write_timeout = ERROR_TIMEOUT_SECONDS
00070         self._queue  = queue.Queue()
00071         self._write_thread  = threading.Thread(target=self._serial_main )
00072         self._write_thread .start()
00073 
00074     def __enter__(self):
00075         return self
00076 
00077     def __exit__(self, exception_type, value, traceback):
00078         self._queue .put(None)
00079         self._write_thread .join(ERROR_TIMEOUT_SECONDS)
00080         assert not self._write_thread .isAlive(), "Thread join failed"
00081         self.raw_serial .close()
00082         self.raw_serial  = None
00083         return False
00084 
00085     def new_session_with_baud (self, baud, parent_test):
00086         """Start a new session by restarting target and setting baud"""
00087         test_info = parent_test.create_subtest("Set Baud")
00088 
00089         # Set baud to 115200
00090         self.raw_serial .baudrate = 115200
00091         self.raw_serial .timeout = 1.0
00092 
00093         # Reset the target
00094         self.raw_serial .reset_output_buffer()
00095         self.raw_serial .reset_input_buffer()
00096         self.raw_serial .sendBreak()
00097 
00098         # Wait until the target is initialized
00099         expected_resp = "{init}"
00100         resp = self.read (len(expected_resp))
00101         if not _same(resp.decode(), expected_resp):
00102             test_info.failure("Fail on init: %s" % resp)
00103             return False
00104 
00105         # Change baudrate to that of the first test
00106         command = "{baud:%i}" % baud
00107         self.write (command.encode())
00108         resp = self.read (len(command))
00109         if not _same(resp.decode(), command):
00110             test_info.failure("Fail on baud command: %s" % resp)
00111             return False
00112 
00113         # Update baud of local serial port
00114         self.raw_serial .baudrate = baud
00115 
00116         # Read the response indicating that the baudrate
00117         # on the target has changed
00118         expected_resp = "{change}"
00119         resp = self.read (len(expected_resp))
00120         if not _same(resp.decode(), expected_resp):
00121             test_info.failure("Fail on baud change %s" % resp)
00122             return False
00123 
00124         # Set default timeout
00125         self.raw_serial .timeout = ERROR_TIMEOUT_SECONDS
00126 
00127         # Success
00128         return True
00129 
00130     def read (self, length):
00131         """Read serial data"""
00132         return self.raw_serial .read(length)
00133 
00134     def write (self, data):
00135         """Write serial port data in the background"""
00136         func = functools.partial(self.raw_serial .write, data)
00137         self._queue .put(func)
00138 
00139     def set_read_timeout (self, timeout):
00140         """Set timeout for read operations"""
00141         assert self._queue .empty(), "Queue must be empty to change timeout"
00142         self.raw_serial .timeout = timeout
00143 
00144     def flush (self):
00145         """Wait for all writes to complete"""
00146         self._queue .join()
00147         assert self._queue .empty()
00148 
00149     def _serial_main(self):
00150         """Write helper thread"""
00151         while True:
00152             task = self._queue .get(True)
00153             if task is None:
00154                 self._queue .task_done()
00155                 # End of processing is an empty task
00156                 break
00157             try:
00158                 task()
00159             except serial.SerialTimeoutException:
00160                 pass
00161             self._queue .task_done()
00162 
00163 
00164 def test_serial(workspace, parent_test):
00165     """Test the serial port endpoint
00166 
00167     Requirements:
00168         -daplink-validation must be loaded for the target.
00169 
00170     Positional arguments:
00171         port - the serial port to open as a string
00172 
00173     Return:
00174         True if the test passed, False otherwise
00175     """
00176     test_info = parent_test.create_subtest("Serial test")
00177     board = workspace.board
00178     port = board.get_serial_port()
00179     test_info.info("Testing serial port %s" % port)
00180 
00181     # Note: OSX sends a break command when a serial port is closed.
00182     # To avoid problems while testing keep the serial port open the
00183     # whole time.  Use the property 'baudrate' to change the baud
00184     # instead of opening a new instance.
00185 
00186     with SerialTester(port) as sp:
00187 
00188         # Generate a 4KB block of dummy data
00189         # and test supported baud rates
00190         test_data = [i for i in range(0, 256)] * 4 * 4
00191         test_data = bytearray(test_data)
00192         for baud in standard_baud:
00193 
00194             test_info.info("Testing baud %i" % baud)
00195             success = sp.new_session_with_baud(baud, test_info)
00196             if not success:
00197                 test_info.failure("Unable to setup session")
00198                 continue
00199 
00200             # Perform test
00201             sp.write(test_data)
00202             resp = sp.read(len(test_data))
00203             if _same(test_data, resp):
00204                 test_info.info("Pass")
00205             else:
00206                 test_info.failure("Fail on baud %s" % baud)
00207 
00208         # Timing stress test - send data at critical points
00209         # in time like right as the transmitter is turned off
00210         # ------------------
00211         # Test sequence
00212         # 1. Send a block of data (vary size for the test)
00213         # 2. Wait until 1 byte is read back
00214         # 3. Write 1 byte
00215         # 4. Read back all data
00216         test_data = [i for i in range(0, 256)] * 4 * 4
00217         test_data = bytearray(test_data)
00218         for baud in timing_test_baud:
00219 
00220             test_info.info("Timing test baud %i" % baud)
00221             success = sp.new_session_with_baud(baud, test_info)
00222             if not success:
00223                 test_info.failure("Unable to setup session")
00224                 continue
00225 
00226             test_pass = True
00227             for data_size in range(1, 10):
00228                 data = test_data[0:data_size + 1]
00229                 for _ in range(0, 1000):
00230                     resp = bytearray()
00231 
00232                     sp.write(data[0:data_size])
00233                     resp += sp.read(1)
00234                     sp.write(data[-1:])
00235                     resp += sp.read(data_size)
00236                     sp.flush()
00237                     if not _same(data, resp):
00238                         test_pass = False
00239                         test_info.info("fail size - %s" % data_size)
00240                         break
00241 
00242             if test_pass:
00243                 test_info.info("Pass")
00244             else:
00245                 test_info.failure("Fail on timing test with baud %s"
00246                                   % baud)
00247 
00248         # Setting change smoke test - reconfigure settings while
00249         # in the middle of a transfer and verify nothing bad
00250         test_data = [i for i in range(0, 128)]
00251         test_data = bytearray(test_data)
00252         sp.new_session_with_baud(115200, test_info)
00253         sp.set_read_timeout(0)
00254         for baud in standard_baud:
00255             sp.raw_serial.baudrate = baud
00256             sp.write(test_data)
00257             xfer_time = float(len(test_data) * 10) / float(baud)
00258             time.sleep(xfer_time / 2)
00259             # Discard data
00260             sp.read(1024)
00261         # Read any leftover data
00262         sp.flush()
00263         sp.raw_serial.baudrate = 115200
00264         sp.set_read_timeout(1.0)
00265         sp.read(128 * len(standard_baud))
00266 
00267         # Generate a 8 KB block of dummy data
00268         # and test a large block transfer
00269         test_data = [i for i in range(0, 256)] * 4 * 8
00270         test_data = bytearray(test_data)
00271         sp.new_session_with_baud(115200, test_info)
00272 
00273         sp.write(test_data)
00274         resp = sp.read(len(test_data))
00275         if _same(resp, test_data):
00276             test_info.info("Block test passed")
00277         else:
00278             test_info.failure("Block test failed")
00279 
00280         # Refresh to check for asserts
00281         board.refresh(test_info)