Clone of official tools
Diff: test_api.py
- Revision:
- 43:2a7da56ebd24
- Parent:
- 41:2a77626a4c21
--- a/test_api.py Mon Nov 06 13:17:14 2017 -0600 +++ b/test_api.py Tue Sep 25 13:43:09 2018 -0500 @@ -16,6 +16,8 @@ Author: Przemyslaw Wirkus <Przemyslaw.wirkus@arm.com> """ +from __future__ import print_function +import six import os import re @@ -28,14 +30,17 @@ import datetime import threading import ctypes -from types import ListType +import functools from colorama import Fore, Back, Style -from prettytable import PrettyTable -from copy import copy +from prettytable import PrettyTable, HEADER +from copy import copy, deepcopy from time import sleep, time -from Queue import Queue, Empty -from os.path import join, exists, basename, relpath +try: + from Queue import Queue, Empty +except ImportError: + from queue import Queue, Empty +from os.path import join, exists, basename, relpath, isdir, isfile from threading import Thread, Lock from multiprocessing import Pool, cpu_count from subprocess import Popen, PIPE @@ -49,7 +54,8 @@ from tools.utils import NotSupportedException from tools.utils import construct_enum from tools.memap import MemapParser -from tools.targets import TARGET_MAP +from tools.targets import TARGET_MAP, Target +from tools.config import Config import tools.test_configs as TestConfig from tools.test_db import BaseDBAccess from tools.build_api import build_project, build_mbed_libs, build_lib @@ -60,8 +66,8 @@ from tools.build_api import create_result from tools.build_api import add_result_to_report from tools.build_api import prepare_toolchain -from tools.build_api import scan_resources from tools.build_api import get_config +from tools.resources import Resources, MbedIgnoreSet, IGNORE_FILENAME from tools.libraries import LIBRARIES, LIBRARY_MAP from tools.options import extract_profile from tools.toolchains import TOOLCHAIN_PATHS @@ -71,7 +77,8 @@ from tools.utils import argparse_uppercase_type from tools.utils import argparse_lowercase_type from tools.utils import argparse_many -from tools.utils import get_path_depth +from tools.notifier.mock import MockNotifier +from tools.notifier.term import TerminalNotifier import tools.host_tests.host_tests_plugins as host_tests_plugins @@ -100,7 +107,7 @@ self.active = False try: self.proc.terminate() - except Exception, _: + except Exception: pass @@ -120,12 +127,14 @@ # Human readable summary if not self.single_test.opts_suppress_summary: # prints well-formed summary with results (SQL table like) - print self.single_test.generate_test_summary(test_summary, shuffle_seed) + print(self.single_test.generate_test_summary(test_summary, + shuffle_seed)) if self.single_test.opts_test_x_toolchain_summary: # prints well-formed summary with results (SQL table like) # table shows text x toolchain test result matrix - print self.single_test.generate_test_summary_by_target(test_summary, shuffle_seed) - print "Completed in %.2f sec"% (elapsed_time) + print(self.single_test.generate_test_summary_by_target( + test_summary, shuffle_seed)) + print("Completed in %.2f sec"% (elapsed_time)) class SingleTestRunner(object): @@ -360,31 +369,40 @@ # print '=== %s::%s ===' % (target, toolchain) # Let's build our test if target not in TARGET_MAP: - print self.logger.log_line(self.logger.LogType.NOTIF, 'Skipped tests for %s target. Target platform not found'% (target)) + print(self.logger.log_line( + self.logger.LogType.NOTIF, + 'Skipped tests for %s target. Target platform not found' % + (target))) continue - clean_mbed_libs_options = True if self.opts_goanna_for_mbed_sdk or clean or self.opts_clean else None + clean_mbed_libs_options = (self.opts_goanna_for_mbed_sdk or + self.opts_clean or clean) profile = extract_profile(self.opts_parser, self.opts, toolchain) stats_depth = self.opts.stats_depth or 2 - try: - build_mbed_libs_result = build_mbed_libs(T, - toolchain, - clean=clean_mbed_libs_options, - verbose=self.opts_verbose, - jobs=self.opts_jobs, - report=build_report, - properties=build_properties, - build_profile=profile) + build_mbed_libs_result = build_mbed_libs( + T, toolchain, + clean=clean_mbed_libs_options, + jobs=self.opts_jobs, + report=build_report, + properties=build_properties, + build_profile=profile, + notify=TerminalNotifier()) if not build_mbed_libs_result: - print self.logger.log_line(self.logger.LogType.NOTIF, 'Skipped tests for %s target. Toolchain %s is not yet supported for this target'% (T.name, toolchain)) + print(self.logger.log_line( + self.logger.LogType.NOTIF, + 'Skipped tests for %s target. Toolchain %s is not ' + 'supported for this target'% (T.name, toolchain))) continue except ToolException: - print self.logger.log_line(self.logger.LogType.ERROR, 'There were errors while building MBED libs for %s using %s'% (target, toolchain)) + print(self.logger.log_line( + self.logger.LogType.ERROR, + 'There were errors while building MBED libs for %s using %s' + % (target, toolchain))) continue build_dir = join(BUILD_DIR, "test", target, toolchain) @@ -402,16 +420,22 @@ if self.db_logger: self.db_logger.reconnect(); if self.db_logger.is_connected(): - self.db_logger.update_build_id_info(self.db_logger_build_id, _shuffle_seed=self.shuffle_random_func()) + self.db_logger.update_build_id_info( + self.db_logger_build_id, + _shuffle_seed=self.shuffle_random_func()) self.db_logger.disconnect(); if self.db_logger: self.db_logger.reconnect(); if self.db_logger.is_connected(): # Update MUTs and Test Specification in database - self.db_logger.update_build_id_info(self.db_logger_build_id, _muts=self.muts, _test_spec=self.test_spec) + self.db_logger.update_build_id_info( + self.db_logger_build_id, + _muts=self.muts, _test_spec=self.test_spec) # Update Extra information in database (some options passed to test suite) - self.db_logger.update_build_id_info(self.db_logger_build_id, _extra=json.dumps(self.dump_options())) + self.db_logger.update_build_id_info( + self.db_logger_build_id, + _extra=json.dumps(self.dump_options())) self.db_logger.disconnect(); valid_test_map_keys = self.get_valid_tests(test_map_keys, target, toolchain, test_ids, self.opts_include_non_automated) @@ -441,15 +465,17 @@ build_lib(lib_id, T, toolchain, - verbose=self.opts_verbose, clean=clean_mbed_libs_options, jobs=self.opts_jobs, report=build_report, properties=build_properties, - build_profile=profile) + build_profile=profile, + notify=TerminalNotifier()) except ToolException: - print self.logger.log_line(self.logger.LogType.ERROR, 'There were errors while building library %s'% (lib_id)) + print(self.logger.log_line( + self.logger.LogType.ERROR, + 'There were errors while building library %s' % lib_id)) continue @@ -483,31 +509,40 @@ project_name = self.opts_firmware_global_name if self.opts_firmware_global_name else None try: - path = build_project(test.source_dir, join(build_dir, test_id), T, + path = build_project( + test.source_dir, join(build_dir, test_id), T, toolchain, test.dependencies, clean=clean_project_options, - verbose=self.opts_verbose, name=project_name, macros=MACROS, + name=project_name, macros=MACROS, inc_dirs=INC_DIRS, jobs=self.opts_jobs, report=build_report, properties=build_properties, project_id=test_id, project_description=test.get_description(), - build_profile=profile, stats_depth=stats_depth) + build_profile=profile, stats_depth=stats_depth, + notify=TerminalNotifier(), + ) - except Exception, e: + except Exception as e: project_name_str = project_name if project_name is not None else test_id test_result = self.TEST_RESULT_FAIL if isinstance(e, ToolException): - print self.logger.log_line(self.logger.LogType.ERROR, 'There were errors while building project %s'% (project_name_str)) + print(self.logger.log_line( + self.logger.LogType.ERROR, + 'There were errors while building project %s' % + project_name_str)) test_result = self.TEST_RESULT_BUILD_FAILED elif isinstance(e, NotSupportedException): - print self.logger.log_line(self.logger.LogType.INFO, 'The project %s is not supported'% (project_name_str)) + print(self.logger.log_line( + self.logger.LogType.INFO, + 'Project %s is not supported' % project_name_str)) test_result = self.TEST_RESULT_NOT_SUPPORTED # Append test results to global test summary self.test_summary.append( - (test_result, target, toolchain, test_id, test.get_description(), 0, 0, '-') + (test_result, target, toolchain, test_id, + test.get_description(), 0, 0, '-') ) # Add detailed test result to test summary structure @@ -603,7 +638,7 @@ # in separate threads do not collide. # Inside execute_thread_slice() function function handle() will be called to # get information about available MUTs (per target). - for target, toolchains in self.test_spec['targets'].iteritems(): + for target, toolchains in self.test_spec['targets'].items(): self.test_suite_properties_ext[target] = {} t = threading.Thread(target=self.execute_thread_slice, args = (q, target, toolchains, clean, test_ids, self.build_report, self.build_properties)) t.daemon = True @@ -614,7 +649,7 @@ q.get() # t.join() would block some threads because we should not wait in any order for thread end else: # Serialized (not parallel) test execution - for target, toolchains in self.test_spec['targets'].iteritems(): + for target, toolchains in self.test_spec['targets'].items(): if target not in self.test_suite_properties_ext: self.test_suite_properties_ext[target] = {} @@ -642,23 +677,33 @@ if self.opts_test_only_peripheral and not test.peripherals: if self.opts_verbose_skipped_tests: - print self.logger.log_line(self.logger.LogType.INFO, 'Common test skipped for target %s'% (target)) + print(self.logger.log_line( + self.logger.LogType.INFO, + 'Common test skipped for target %s' % target)) continue - if self.opts_peripheral_by_names and test.peripherals and not len([i for i in test.peripherals if i in self.opts_peripheral_by_names]): + if (self.opts_peripheral_by_names and test.peripherals and + not any((i in self.opts_peripheral_by_names) + for i in test.peripherals)): # We will skip tests not forced with -p option if self.opts_verbose_skipped_tests: - print self.logger.log_line(self.logger.LogType.INFO, 'Common test skipped for target %s'% (target)) + print(self.logger.log_line( + self.logger.LogType.INFO, + 'Common test skipped for target %s' % target)) continue if self.opts_test_only_common and test.peripherals: if self.opts_verbose_skipped_tests: - print self.logger.log_line(self.logger.LogType.INFO, 'Peripheral test skipped for target %s'% (target)) + print(self.logger.log_line( + self.logger.LogType.INFO, + 'Peripheral test skipped for target %s' % target)) continue if not include_non_automated and not test.automated: if self.opts_verbose_skipped_tests: - print self.logger.log_line(self.logger.LogType.INFO, 'Non automated test skipped for target %s'% (target)) + print(self.logger.log_line( + self.logger.LogType.INFO, + 'Non automated test skipped for target %s' % target)) continue if test.is_supported(target, toolchain): @@ -673,9 +718,15 @@ elif not self.is_peripherals_available(target, test.peripherals): if self.opts_verbose_skipped_tests: if test.peripherals: - print self.logger.log_line(self.logger.LogType.INFO, 'Peripheral %s test skipped for target %s'% (",".join(test.peripherals), target)) + print(self.logger.log_line( + self.logger.LogType.INFO, + 'Peripheral %s test skipped for target %s' % + (",".join(test.peripherals), target))) else: - print self.logger.log_line(self.logger.LogType.INFO, 'Test %s skipped for target %s'% (test_id, target)) + print(self.logger.log_line( + self.logger.LogType.INFO, + 'Test %s skipped for target %s' % + (test_id, target))) continue # The test has made it through all the filters, so add it to the valid tests list @@ -715,7 +766,7 @@ result_dict[test[TEST_INDEX]][test[TOOLCHAIN_INDEX]] = test[RESULT_INDEX] pt_cols = ["Target", "Test ID", "Test Description"] + unique_target_toolchains - pt = PrettyTable(pt_cols) + pt = PrettyTable(pt_cols, junction_char="|", hrules=HEADER) for col in pt_cols: pt.align[col] = "l" pt.padding_width = 1 # One space between column edges and contents (default) @@ -743,7 +794,7 @@ result = "Test summary:\n" # Pretty table package is used to print results pt = PrettyTable(["Result", "Target", "Toolchain", "Test ID", "Test Description", - "Elapsed Time (sec)", "Timeout (sec)", "Loops"]) + "Elapsed Time (sec)", "Timeout (sec)", "Loops"], junction_char="|", hrules=HEADER) pt.align["Result"] = "l" # Left align pt.align["Target"] = "l" # Left align pt.align["Toolchain"] = "l" # Left align @@ -773,7 +824,7 @@ result += "\n" # Print result count - result += "Result: " + ' / '.join(['%s %s' % (value, key) for (key, value) in {k: v for k, v in result_dict.items() if v != 0}.iteritems()]) + result += "Result: " + ' / '.join(['%s %s' % (value, key) for (key, value) in {k: v for k, v in result_dict.items() if v != 0}.items()]) shuffle_seed_text = "Shuffle Seed: %.*f\n"% (self.SHUFFLE_SEED_ROUND, shuffle_seed if shuffle_seed else self.shuffle_random_seed) result += "\n%s"% (shuffle_seed_text if self.opts_shuffle_test_order else '') @@ -812,7 +863,7 @@ resutl_msg = "" try: os.remove(file_path) - except Exception, e: + except Exception as e: resutl_msg = e result = False return result, resutl_msg @@ -828,7 +879,7 @@ duration = data.get("duration", 10) if mut is None: - print "Error: No Mbed available: MUT[%s]" % data['mcu'] + print("Error: No Mbed available: MUT[%s]" % data['mcu']) return None mcu = mut['mcu'] @@ -864,7 +915,7 @@ break if not found: - print "Error: mbed not found with MBEDLS: %s" % data['mcu'] + print("Error: mbed not found with MBEDLS: %s" % data['mcu']) return None else: mut = muts_list[1] @@ -895,7 +946,7 @@ single_test_result = self.TEST_RESULT_NO_IMAGE elapsed_time = 0 single_test_output = self.logger.log_line(self.logger.LogType.ERROR, 'Image file does not exist: %s'% image_path) - print single_test_output + print(single_test_output) else: # Host test execution start_host_exec_time = time() @@ -930,8 +981,9 @@ 'copy_method' : _copy_method, } - print self.print_test_result(single_test_result, target_name_unique, toolchain_name, - test_id, test_description, elapsed_time, single_timeout) + print(self.print_test_result( + single_test_result, target_name_unique, toolchain_name, test_id, + test_description, elapsed_time, single_timeout)) # Update database entries for ongoing test if self.db_logger and self.db_logger.is_connected(): @@ -972,7 +1024,7 @@ # Find a suitable MUT: mut = None - for id, m in self.muts.iteritems(): + for id, m in self.muts.items(): if m['mcu'] == data['mcu']: mut = m handle_result = self.handle_mut(mut, data, target_name, toolchain_name, test_loops=test_loops) @@ -1027,7 +1079,7 @@ """ try: c = obs.queue.get(block=True, timeout=0.5) - except Empty, _: + except Empty: c = None return c @@ -1060,7 +1112,6 @@ result = property.groups()[0] return result - # print "{%s} port:%s disk:%s" % (name, port, disk), cmd = ["python", '%s.py'% name, '-d', disk, @@ -1083,8 +1134,8 @@ cmd += ["-R", str(reset_tout)] if verbose: - print Fore.MAGENTA + "Executing '" + " ".join(cmd) + "'" + Fore.RESET - print "Test::Output::Start" + print(Fore.MAGENTA + "Executing '" + " ".join(cmd) + "'" + Fore.RESET) + print("Test::Output::Start") proc = Popen(cmd, stdout=PIPE, cwd=HOST_TESTS) obs = ProcessObserver(proc) @@ -1138,7 +1189,7 @@ output.append(c) if verbose: - print "Test::Output::Finish" + print("Test::Output::Finish") # Stop test process obs.stop() @@ -1150,7 +1201,7 @@ """ if peripherals is not None: peripherals = set(peripherals) - for id, mut in self.muts.iteritems(): + for id, mut in self.muts.items(): # Target MCU name check if mut["mcu"] != target_mcu_name: continue @@ -1205,9 +1256,10 @@ line_no = 1 for json_line in data_file: if line_no + 5 >= line: # Print last few lines before error - print 'Line %d:\t'%line_no + json_line, # Prints line + print('Line %d:\t'%line_no + json_line) if line_no == line: - print ' ' * len('Line %d:'%line_no) + '\t', '-' * (column-1) + '^' + print('%s\t%s^' (' ' * len('Line %d:' % line_no), + '-' * (column - 1))) break line_no += 1 @@ -1244,18 +1296,19 @@ result = json.load(data_file) except ValueError as json_error_msg: result = None - print 'JSON file %s parsing failed. Reason: %s' % (json_spec_filename, json_error_msg) + print('JSON file %s parsing failed. Reason: %s' % + (json_spec_filename, json_error_msg)) # We can print where error occurred inside JSON file if we can parse exception msg json_format_defect_pos = json_format_error_defect_pos(str(json_error_msg)) if json_format_defect_pos is not None: line = json_format_defect_pos[0] column = json_format_defect_pos[1] - print + print() show_json_file_format_error(json_spec_filename, line, column) except IOError as fileopen_error_msg: - print 'JSON file %s not opened. Reason: %s'% (json_spec_filename, fileopen_error_msg) - print + print('JSON file %s not opened. Reason: %s\n'% + (json_spec_filename, fileopen_error_msg)) if verbose and result: pp = pprint.PrettyPrinter(indent=4) pp.pprint(result) @@ -1275,7 +1328,7 @@ # Prepare pretty table object to display all MUTs pt_cols = ["index"] + muts_info_cols - pt = PrettyTable(pt_cols) + pt = PrettyTable(pt_cols, junction_char="|", hrules=HEADER) for col in pt_cols: pt.align[col] = "l" @@ -1290,7 +1343,7 @@ if add_row: for col in muts_info_cols: cell_val = mut_info[col] if col in mut_info else None - if type(cell_val) == ListType: + if isinstance(cell_val, list): cell_val = join_delim.join(cell_val) row.append(cell_val) pt.add_row(row) @@ -1313,7 +1366,7 @@ # Prepare pretty table object to display test specification pt_cols = ["mcu"] + sorted(toolchains_info_cols) - pt = PrettyTable(pt_cols) + pt = PrettyTable(pt_cols, junction_char="|", hrules=HEADER) for col in pt_cols: pt.align[col] = "l" @@ -1402,7 +1455,7 @@ 'duration'] if cols is None else cols # All tests status table print - pt = PrettyTable(test_properties) + pt = PrettyTable(test_properties, junction_char="|", hrules=HEADER) for col in test_properties: pt.align[col] = "l" pt.align['duration'] = "r" @@ -1423,7 +1476,7 @@ for col in test_properties: col_value = test[col] - if type(test[col]) == ListType: + if isinstance(test[col], list): col_value = join_delim.join(test[col]) elif test[col] == None: col_value = "-" @@ -1442,7 +1495,7 @@ if result_summary and not platform_filter: # Automation result summary test_id_cols = ['automated', 'all', 'percent [%]', 'progress'] - pt = PrettyTable(test_id_cols) + pt = PrettyTable(test_id_cols, junction_char="|", hrules=HEADER) pt.align['automated'] = "r" pt.align['all'] = "r" pt.align['percent [%]'] = "r" @@ -1456,7 +1509,7 @@ # Test automation coverage table print test_id_cols = ['id', 'automated', 'all', 'percent [%]', 'progress'] - pt = PrettyTable(test_id_cols) + pt = PrettyTable(test_id_cols, junction_char="|", hrules=HEADER) pt.align['id'] = "l" pt.align['automated'] = "r" pt.align['all'] = "r" @@ -1502,13 +1555,14 @@ # Human readable summary if not single_test.opts_suppress_summary: # prints well-formed summary with results (SQL table like) - print single_test.generate_test_summary(test_summary, shuffle_seed) + print(single_test.generate_test_summary(test_summary, shuffle_seed)) if single_test.opts_test_x_toolchain_summary: # prints well-formed summary with results (SQL table like) # table shows text x toolchain test result matrix - print single_test.generate_test_summary_by_target(test_summary, shuffle_seed) + print(single_test.generate_test_summary_by_target(test_summary, + shuffle_seed)) - print "Completed in %.2f sec"% (elapsed_time) + print("Completed in %.2f sec" % elapsed_time) print # Write summary of the builds @@ -1628,19 +1682,19 @@ # Let's try to connect db_ = factory_db_logger(db_url) if db_ is not None: - print "Connecting to database '%s'..."% db_url, + print("Connecting to database '%s'..." % db_url) db_.connect(host, username, password, db_name) if db_.is_connected(): - print "ok" - print "Detecting database..." - print db_.detect_database(verbose=True) - print "Disconnecting...", + print("ok") + print("Detecting database...") + print(db_.detect_database(verbose=True)) + print("Disconnecting...") db_.disconnect() - print "done" + print("done") else: - print "Database type '%s' unknown"% db_type + print("Database type '%s' unknown" % db_type) else: - print "Parse error: '%s' - DB Url error"% (db_url) + print("Parse error: '%s' - DB Url error" % db_url) def get_module_avail(module_name): @@ -2003,7 +2057,7 @@ """Finds the path to a test configuration file config_name: path to a custom configuration file OR mbed OS interface "ethernet, wifi_odin, etc" target_name: name of target to determing if mbed OS interface given is valid - returns path to config, boolean of whether it is a module or mbed OS interface + returns path to config, will return None if no valid config is found """ # If they passed in a full path if exists(config_name): @@ -2012,64 +2066,106 @@ # Otherwise find the path to configuration file based on mbed OS interface return TestConfig.get_config_path(config_name, target_name) -def find_tests(base_dir, target_name, toolchain_name, app_config=None): + +def find_tests(base_dir, target_name, toolchain_name, icetea, greentea, app_config=None): """ Finds all tests in a directory recursively - base_dir: path to the directory to scan for tests (ex. 'path/to/project') - target_name: name of the target to use for scanning (ex. 'K64F') - toolchain_name: name of the toolchain to use for scanning (ex. 'GCC_ARM') - options: Compile options to pass to the toolchain (ex. ['debug-info']) - app_config - location of a chosen mbed_app.json file + :param base_dir: path to the directory to scan for tests (ex. 'path/to/project') + :param target_name: name of the target to use for scanning (ex. 'K64F') + :param toolchain_name: name of the toolchain to use for scanning (ex. 'GCC_ARM') + :param icetea: icetea enabled + :param greentea: greentea enabled + :param app_config - location of a chosen mbed_app.json file + + returns a dictionary where keys are the test name, and the values are + lists of paths needed to biuld the test. """ + # Temporary structure: tests referenced by (name, base, group, case) tuple tests = {} + # List of common folders: (predicate function, path) tuple + commons = [] - # Prepare the toolchain - toolchain = prepare_toolchain([base_dir], None, target_name, toolchain_name, - silent=True, app_config=app_config) + config = Config(target_name, base_dir, app_config) # Scan the directory for paths to probe for 'TESTS' folders - base_resources = scan_resources([base_dir], toolchain) + base_resources = Resources(MockNotifier(), collect_ignores=True) + base_resources.scan_with_config(base_dir, config) - dirs = base_resources.inc_dirs - for directory in dirs: - subdirs = os.listdir(directory) + if greentea: + dirs = [d for d in base_resources.ignored_dirs if basename(d) == 'TESTS'] + ignoreset = MbedIgnoreSet() - # If the directory contains a subdirectory called 'TESTS', scan it for test cases - if 'TESTS' in subdirs: - walk_base_dir = join(directory, 'TESTS') - test_resources = toolchain.scan_resources(walk_base_dir, base_path=base_dir) - - # Loop through all subdirectories - for d in test_resources.inc_dirs: + for directory in dirs: + ignorefile = join(directory, IGNORE_FILENAME) + if isfile(ignorefile): + ignoreset.add_mbedignore(directory, ignorefile) + for test_group_directory in os.listdir(directory): + grp_dir = join(directory, test_group_directory) + if not isdir(grp_dir) or ignoreset.is_ignored(grp_dir): + continue + grpignorefile = join(grp_dir, IGNORE_FILENAME) + if isfile(grpignorefile): + ignoreset.add_mbedignore(grp_dir, grpignorefile) + for test_case_directory in os.listdir(grp_dir): + d = join(directory, test_group_directory, test_case_directory) + if not isdir(d) or ignoreset.is_ignored(d): + continue + special_dirs = ['host_tests', 'COMMON'] + if test_group_directory not in special_dirs and test_case_directory not in special_dirs: + test_name = test_path_to_name(d, base_dir) + tests[(test_name, directory, test_group_directory, test_case_directory)] = [d] + if test_case_directory == 'COMMON': + def predicate(base_pred, group_pred, name_base_group_case): + (name, base, group, case) = name_base_group_case + return base == base_pred and group == group_pred - # If the test case folder is not called 'host_tests' and it is - # located two folders down from the main 'TESTS' folder (ex. TESTS/testgroup/testcase) - # then add it to the tests - path_depth = get_path_depth(relpath(d, walk_base_dir)) - if path_depth == 2: - test_group_directory_path, test_case_directory = os.path.split(d) - test_group_directory = os.path.basename(test_group_directory_path) + commons.append((functools.partial(predicate, directory, test_group_directory), d)) + if test_group_directory == 'COMMON': + def predicate(base_pred, name_base_group_case): + (name, base, group, case) = name_base_group_case + return base == base_pred + + commons.append((functools.partial(predicate, directory), grp_dir)) - # Check to make sure discoverd folder is not in a host test directory - if test_case_directory != 'host_tests' and test_group_directory != 'host_tests': - test_name = test_path_to_name(d, base_dir) - tests[test_name] = d + if icetea: + dirs = [d for d in base_resources.ignored_dirs if basename(d) == 'TEST_APPS'] + for directory in dirs: + if not isdir(directory): + continue + for subdir in os.listdir(directory): + d = join(directory, subdir) + if not isdir(d): + continue + if 'device' == subdir: + for test_dir in os.listdir(d): + test_dir_path = join(d, test_dir) + test_name = test_path_to_name(test_dir_path, base_dir) + tests[(test_name, directory, subdir, test_dir)] = [test_dir_path] - return tests + # Apply common directories + for pred, path in commons: + for test_identity, test_paths in six.iteritems(tests): + if pred(test_identity): + test_paths.append(path) + + # Drop identity besides name + return {name: paths for (name, _, _, _), paths in six.iteritems(tests)} + def print_tests(tests, format="list", sort=True): """Given a dictionary of tests (as returned from "find_tests"), print them in the specified format""" if format == "list": for test_name in sorted(tests.keys()): - test_path = tests[test_name] - print "Test Case:" - print " Name: %s" % test_name - print " Path: %s" % test_path + test_path = tests[test_name][0] + print("Test Case:") + print(" Name: %s" % test_name) + print(" Path: %s" % test_path) elif format == "json": - print json.dumps(tests, indent=2) + print(json.dumps({test_name: test_path[0] for test_name, test_paths + in tests}, indent=2)) else: - print "Unknown format '%s'" % format + print("Unknown format '%s'" % format) sys.exit(1) def norm_relative_path(path, start): @@ -2104,22 +2200,22 @@ } # Use parent TOOLCHAIN_PATHS variable - for key, value in kwargs['toolchain_paths'].iteritems(): + for key, value in kwargs['toolchain_paths'].items(): TOOLCHAIN_PATHS[key] = value del kwargs['toolchain_paths'] try: - bin_file = build_project(*args, **kwargs) + bin_file, _ = build_project(*args, **kwargs) ret['result'] = True ret['bin_file'] = bin_file ret['kwargs'] = kwargs - except NotSupportedException, e: + except NotSupportedException as e: ret['reason'] = e - except ToolException, e: + except ToolException as e: ret['reason'] = e - except KeyboardInterrupt, e: + except KeyboardInterrupt as e: ret['reason'] = e except: # Print unhandled exceptions here @@ -2130,10 +2226,10 @@ def build_tests(tests, base_source_paths, build_path, target, toolchain_name, - clean=False, notify=None, verbose=False, jobs=1, macros=None, + clean=False, notify=None, jobs=1, macros=None, silent=False, report=None, properties=None, continue_on_build_fail=False, app_config=None, - build_profile=None, stats_depth=None): + build_profile=None, stats_depth=None, ignore=None): """Given the data structure from 'find_tests' and the typical build parameters, build all the tests @@ -2143,8 +2239,12 @@ execution_directory = "." base_path = norm_relative_path(build_path, execution_directory) - target_name = target if isinstance(target, str) else target.name - cfg, _, _ = get_config(base_source_paths, target_name, toolchain_name) + if isinstance(target, Target): + target_name = target.name + else: + target_name = target + target = TARGET_MAP[target_name] + cfg, _, _ = get_config(base_source_paths, target, app_config=app_config) baud_rate = 9600 if 'platform.stdio-baud-rate' in cfg: @@ -2156,7 +2256,8 @@ "base_path": base_path, "baud_rate": baud_rate, "binary_type": "bootable", - "tests": {} + "tests": {}, + "test_apps": {} } result = True @@ -2164,13 +2265,16 @@ jobs_count = int(jobs if jobs else cpu_count()) p = Pool(processes=jobs_count) results = [] - for test_name, test_path in tests.iteritems(): - test_build_path = os.path.join(build_path, test_path) - src_path = base_source_paths + [test_path] + for test_name, test_paths in tests.items(): + if not isinstance(test_paths, list): + test_paths = [test_paths] + + test_build_path = os.path.join(build_path, test_paths[0]) + src_paths = base_source_paths + test_paths bin_file = None - test_case_folder_name = os.path.basename(test_path) + test_case_folder_name = os.path.basename(test_paths[0]) - args = (src_path, test_build_path, target, toolchain_name) + args = (src_paths, test_build_path, deepcopy(target), toolchain_name) kwargs = { 'jobs': 1, 'clean': clean, @@ -2179,12 +2283,11 @@ 'project_id': test_name, 'report': report, 'properties': properties, - 'verbose': verbose, 'app_config': app_config, 'build_profile': build_profile, - 'silent': True, 'toolchain_paths': TOOLCHAIN_PATHS, - 'stats_depth': stats_depth + 'stats_depth': stats_depth, + 'notify': MockNotifier() } results.append(p.apply_async(build_test_worker, args, kwargs)) @@ -2207,9 +2310,15 @@ worker_result = r.get() results.remove(r) + # Push all deferred notifications out to the actual notifier + new_notify = deepcopy(notify) + for message in worker_result['kwargs']['notify'].messages: + new_notify.notify(message) + # Take report from the kwargs and merge it into existing report if report: report_entry = worker_result['kwargs']['report'][target_name][toolchain_name] + report_entry[worker_result['kwargs']['project_id'].upper()][0][0]['output'] = new_notify.get_output() for test_key in report_entry.keys(): report[target_name][toolchain_name][test_key] = report_entry[test_key] @@ -2220,13 +2329,15 @@ result = False break + # Adding binary path to test build result if ('result' in worker_result and worker_result['result'] and 'bin_file' in worker_result): bin_file = norm_relative_path(worker_result['bin_file'], execution_directory) - test_build['tests'][worker_result['kwargs']['project_id']] = { + test_key = 'test_apps' if 'test_apps-' in worker_result['kwargs']['project_id'] else 'tests' + test_build[test_key][worker_result['kwargs']['project_id']] = { "binaries": [ { "path": bin_file @@ -2235,9 +2346,7 @@ } test_key = worker_result['kwargs']['project_id'].upper() - if report: - print report[target_name][toolchain_name][test_key][0][0]['output'].rstrip() - print 'Image: %s\n' % bin_file + print('Image: %s\n' % bin_file) except: if p._taskqueue.queue: @@ -2271,4 +2380,4 @@ def test_spec_from_test_builds(test_builds): return { "builds": test_builds - } + } \ No newline at end of file