BA
/
BaBoRo_test2
Backup 1
Diff: mbed-os/tools/host_tests/host_test.py
- Revision:
- 0:02dd72d1d465
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/mbed-os/tools/host_tests/host_test.py Tue Apr 24 11:45:18 2018 +0000 @@ -0,0 +1,426 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +# Check if 'serial' module is installed +try: + from serial import Serial +except ImportError, e: + print "Error: Can't import 'serial' module: %s"% e + exit(-1) + +import os +import re +import types +from sys import stdout +from time import sleep, time +from optparse import OptionParser + +import host_tests_plugins + +# This is a little tricky. We need to add upper directory to path so +# we can find packages we want from the same level as other files do +import sys +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../..'))) +from tools.test_api import get_autodetected_MUTS_list +from tools.test_api import get_module_avail + + +class Mbed: + """ Base class for a host driven test + """ + def __init__(self): + parser = OptionParser() + + parser.add_option("-m", "--micro", + dest="micro", + help="The target microcontroller", + metavar="MICRO") + + parser.add_option("-p", "--port", + dest="port", + help="The serial port of the target mbed", + metavar="PORT") + + parser.add_option("-d", "--disk", + dest="disk", + help="The target disk path", + metavar="DISK_PATH") + + parser.add_option("-f", "--image-path", + dest="image_path", + help="Path with target's image", + metavar="IMAGE_PATH") + + parser.add_option("-c", "--copy", + dest="copy_method", + help="Copy method selector", + metavar="COPY_METHOD") + + parser.add_option("-C", "--program_cycle_s", + dest="program_cycle_s", + help="Program cycle sleep. Define how many seconds you want wait after copying bianry onto target", + type="float", + metavar="COPY_METHOD") + + parser.add_option("-t", "--timeout", + dest="timeout", + help="Timeout", + metavar="TIMEOUT") + + parser.add_option("-r", "--reset", + dest="forced_reset_type", + help="Forces different type of reset") + + parser.add_option("-R", "--reset-timeout", + dest="forced_reset_timeout", + metavar="NUMBER", + type="int", + help="When forcing a reset using option -r you can set up after reset timeout in seconds") + + parser.add_option('', '--auto', + dest='auto_detect', + metavar=False, + action="store_true", + help='Use mbed-ls module to detect all connected mbed devices') + + (self.options, _) = parser.parse_args() + + self.DEFAULT_RESET_TOUT = 0 + self.DEFAULT_TOUT = 10 + + if self.options.port is None: + raise Exception("The serial port of the target mbed have to be provided as command line arguments") + + # Options related to copy / reset mbed device + self.port = self.options.port + self.disk = self.options.disk + self.image_path = self.options.image_path.strip('"') + self.copy_method = self.options.copy_method + self.program_cycle_s = float(self.options.program_cycle_s) + + self.serial = None + self.serial_baud = 9600 + self.serial_timeout = 1 + + self.timeout = self.DEFAULT_TOUT if self.options.timeout is None else self.options.timeout + print 'MBED: Instrumentation: "%s" and disk: "%s"' % (self.port, self.disk) + + def init_serial_params(self, serial_baud=9600, serial_timeout=1): + """ Initialize port parameters. + This parameters will be used by self.init_serial() function to open serial port + """ + self.serial_baud = serial_baud + self.serial_timeout = serial_timeout + + def init_serial(self, serial_baud=None, serial_timeout=None): + """ Initialize serial port. + Function will return error is port can't be opened or initialized + """ + # Overload serial port configuration from default to parameters' values if they are specified + serial_baud = serial_baud if serial_baud is not None else self.serial_baud + serial_timeout = serial_timeout if serial_timeout is not None else self.serial_timeout + + if get_module_avail('mbed_lstools') and self.options.auto_detect: + # Ensure serial port is up-to-date (try to find it 60 times) + found = False + + for i in range(0, 60): + print('Looking for %s with MBEDLS' % self.options.micro) + muts_list = get_autodetected_MUTS_list(platform_name_filter=[self.options.micro]) + + if 1 in muts_list: + mut = muts_list[1] + self.port = mut['port'] + found = True + break + else: + sleep(3) + + if not found: + return False + + # Clear serial port + if self.serial: + self.serial.close() + self.serial = None + + # We will pool for serial to be re-mounted if it was unmounted after device reset + result = self.pool_for_serial_init(serial_baud, serial_timeout) # Blocking + + # Port can be opened + if result: + self.flush() + return result + + def pool_for_serial_init(self, serial_baud, serial_timeout, pooling_loops=40, init_delay=0.5, loop_delay=0.25): + """ Functions pools for serial port readiness + """ + result = True + last_error = None + # This loop is used to check for serial port availability due to + # some delays and remounting when devices are being flashed with new software. + for i in range(pooling_loops): + sleep(loop_delay if i else init_delay) + try: + self.serial = Serial(self.port, baudrate=serial_baud, timeout=serial_timeout) + except Exception as e: + result = False + last_error = "MBED: %s"% str(e) + stdout.write('.') + stdout.flush() + else: + print "...port ready!" + result = True + break + if not result and last_error: + print last_error + return result + + def set_serial_timeout(self, timeout): + """ Wraps self.mbed.serial object timeout property + """ + result = None + if self.serial: + self.serial.timeout = timeout + result = True + return result + + def serial_read(self, count=1): + """ Wraps self.mbed.serial object read method + """ + result = None + if self.serial: + try: + result = self.serial.read(count) + except: + result = None + return result + + def serial_readline(self, timeout=5): + """ Wraps self.mbed.serial object read method to read one line from serial port + """ + result = '' + start = time() + while (time() - start) < timeout: + if self.serial: + try: + c = self.serial.read(1) + result += c + except Exception as e: + print "MBED: %s"% str(e) + result = None + break + if c == '\n': + break + return result + + def serial_write(self, write_buffer): + """ Wraps self.mbed.serial object write method + """ + result = None + if self.serial: + try: + result = self.serial.write(write_buffer) + except: + result = None + return result + + def reset_timeout(self, timeout): + """ Timeout executed just after reset command is issued + """ + for n in range(0, timeout): + sleep(1) + + def reset(self): + """ Calls proper reset plugin to do the job. + Please refer to host_test_plugins functionality + """ + # Flush serials to get only input after reset + self.flush() + if self.options.forced_reset_type: + result = host_tests_plugins.call_plugin('ResetMethod', self.options.forced_reset_type, disk=self.disk) + else: + result = host_tests_plugins.call_plugin('ResetMethod', 'default', serial=self.serial) + # Give time to wait for the image loading + reset_tout_s = self.options.forced_reset_timeout if self.options.forced_reset_timeout is not None else self.DEFAULT_RESET_TOUT + self.reset_timeout(reset_tout_s) + return result + + def copy_image(self, image_path=None, disk=None, copy_method=None): + """ Closure for copy_image_raw() method. + Method which is actually copying image to mbed + """ + # Set closure environment + image_path = image_path if image_path is not None else self.image_path + disk = disk if disk is not None else self.disk + copy_method = copy_method if copy_method is not None else self.copy_method + # Call proper copy method + result = self.copy_image_raw(image_path, disk, copy_method) + return result + + def copy_image_raw(self, image_path=None, disk=None, copy_method=None): + """ Copy file depending on method you want to use. Handles exception + and return code from shell copy commands. + """ + # image_path - Where is binary with target's firmware + if copy_method is not None: + # We override 'default' method with 'shell' method + if copy_method == 'default': + copy_method = 'shell' + else: + copy_method = 'shell' + + result = host_tests_plugins.call_plugin('CopyMethod', copy_method, image_path=image_path, destination_disk=disk, program_cycle_s=self.program_cycle_s, target_mcu=self.options.micro) + return result; + + def flush(self): + """ Flush serial ports + """ + result = False + if self.serial: + self.serial.flushInput() + self.serial.flushOutput() + result = True + return result + + +class HostTestResults: + """ Test results set by host tests + """ + def __init__(self): + self.RESULT_SUCCESS = 'success' + self.RESULT_FAILURE = 'failure' + self.RESULT_ERROR = 'error' + self.RESULT_IO_SERIAL = 'ioerr_serial' + self.RESULT_NO_IMAGE = 'no_image' + self.RESULT_IOERR_COPY = "ioerr_copy" + self.RESULT_PASSIVE = "passive" + self.RESULT_NOT_DETECTED = "not_detected" + self.RESULT_MBED_ASSERT = "mbed_assert" + + +import tools.host_tests as host_tests + + +class Test(HostTestResults): + """ Base class for host test's test runner + """ + # Select default host_test supervision (replaced after autodetection) + test_supervisor = host_tests.get_host_test("default") + + def __init__(self): + self.mbed = Mbed() + + def detect_test_config(self, verbose=False): + """ Detects test case configuration + """ + result = {} + while True: + line = self.mbed.serial_readline() + if "{start}" in line: + self.notify("HOST: Start test...") + break + else: + # Detect if this is property from TEST_ENV print + m = re.search('{([\w_]+);([\w\d\+ ]+)}}', line[:-1]) + if m and len(m.groups()) == 2: + # This is most likely auto-detection property + result[m.group(1)] = m.group(2) + if verbose: + self.notify("HOST: Property '%s' = '%s'"% (m.group(1), m.group(2))) + else: + # We can check if this is TArget Id in mbed specific format + m2 = re.search('^([\$]+)([a-fA-F0-9]+)', line[:-1]) + if m2 and len(m2.groups()) == 2: + if verbose: + target_id = m2.group(1) + m2.group(2) + self.notify("HOST: TargetID '%s'"% target_id) + self.notify(line[len(target_id):-1]) + else: + self.notify("HOST: Unknown property: %s"% line.strip()) + return result + + def run(self): + """ Test runner for host test. This function will start executing + test and forward test result via serial port to test suite + """ + # Copy image to device + self.notify("HOST: Copy image onto target...") + result = self.mbed.copy_image() + if not result: + self.print_result(self.RESULT_IOERR_COPY) + + # Initialize and open target's serial port (console) + self.notify("HOST: Initialize serial port...") + result = self.mbed.init_serial() + if not result: + self.print_result(self.RESULT_IO_SERIAL) + + # Reset device + self.notify("HOST: Reset target...") + result = self.mbed.reset() + if not result: + self.print_result(self.RESULT_IO_SERIAL) + + # Run test + try: + CONFIG = self.detect_test_config(verbose=True) # print CONFIG + + if "host_test_name" in CONFIG: + if host_tests.is_host_test(CONFIG["host_test_name"]): + self.test_supervisor = host_tests.get_host_test(CONFIG["host_test_name"]) + result = self.test_supervisor.test(self) #result = self.test() + + if result is not None: + self.print_result(result) + else: + self.notify("HOST: Passive mode...") + except Exception, e: + print str(e) + self.print_result(self.RESULT_ERROR) + + def setup(self): + """ Setup and check if configuration for test is + correct. E.g. if serial port can be opened. + """ + result = True + if not self.mbed.serial: + result = False + self.print_result(self.RESULT_IO_SERIAL) + return result + + def notify(self, message): + """ On screen notification function + """ + print message + stdout.flush() + + def print_result(self, result): + """ Test result unified printing function + """ + self.notify("\r\n{{%s}}\r\n{{end}}" % result) + + +class DefaultTestSelector(Test): + """ Test class with serial port initialization + """ + def __init__(self): + HostTestResults.__init__(self) + Test.__init__(self) + +if __name__ == '__main__': + DefaultTestSelector().run()