Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Diff: test_api.py
- Revision:
- 43:2a7da56ebd24
- Parent:
- 41:2a77626a4c21
diff -r 2cf3f29fece1 -r 2a7da56ebd24 test_api.py
--- a/test_api.py Mon Nov 06 13:17:14 2017 -0600
+++ b/test_api.py Tue Sep 25 13:43:09 2018 -0500
@@ -16,6 +16,8 @@
Author: Przemyslaw Wirkus <Przemyslaw.wirkus@arm.com>
"""
+from __future__ import print_function
+import six
import os
import re
@@ -28,14 +30,17 @@
import datetime
import threading
import ctypes
-from types import ListType
+import functools
from colorama import Fore, Back, Style
-from prettytable import PrettyTable
-from copy import copy
+from prettytable import PrettyTable, HEADER
+from copy import copy, deepcopy
from time import sleep, time
-from Queue import Queue, Empty
-from os.path import join, exists, basename, relpath
+try:
+ from Queue import Queue, Empty
+except ImportError:
+ from queue import Queue, Empty
+from os.path import join, exists, basename, relpath, isdir, isfile
from threading import Thread, Lock
from multiprocessing import Pool, cpu_count
from subprocess import Popen, PIPE
@@ -49,7 +54,8 @@
from tools.utils import NotSupportedException
from tools.utils import construct_enum
from tools.memap import MemapParser
-from tools.targets import TARGET_MAP
+from tools.targets import TARGET_MAP, Target
+from tools.config import Config
import tools.test_configs as TestConfig
from tools.test_db import BaseDBAccess
from tools.build_api import build_project, build_mbed_libs, build_lib
@@ -60,8 +66,8 @@
from tools.build_api import create_result
from tools.build_api import add_result_to_report
from tools.build_api import prepare_toolchain
-from tools.build_api import scan_resources
from tools.build_api import get_config
+from tools.resources import Resources, MbedIgnoreSet, IGNORE_FILENAME
from tools.libraries import LIBRARIES, LIBRARY_MAP
from tools.options import extract_profile
from tools.toolchains import TOOLCHAIN_PATHS
@@ -71,7 +77,8 @@
from tools.utils import argparse_uppercase_type
from tools.utils import argparse_lowercase_type
from tools.utils import argparse_many
-from tools.utils import get_path_depth
+from tools.notifier.mock import MockNotifier
+from tools.notifier.term import TerminalNotifier
import tools.host_tests.host_tests_plugins as host_tests_plugins
@@ -100,7 +107,7 @@
self.active = False
try:
self.proc.terminate()
- except Exception, _:
+ except Exception:
pass
@@ -120,12 +127,14 @@
# Human readable summary
if not self.single_test.opts_suppress_summary:
# prints well-formed summary with results (SQL table like)
- print self.single_test.generate_test_summary(test_summary, shuffle_seed)
+ print(self.single_test.generate_test_summary(test_summary,
+ shuffle_seed))
if self.single_test.opts_test_x_toolchain_summary:
# prints well-formed summary with results (SQL table like)
# table shows text x toolchain test result matrix
- print self.single_test.generate_test_summary_by_target(test_summary, shuffle_seed)
- print "Completed in %.2f sec"% (elapsed_time)
+ print(self.single_test.generate_test_summary_by_target(
+ test_summary, shuffle_seed))
+ print("Completed in %.2f sec"% (elapsed_time))
class SingleTestRunner(object):
@@ -360,31 +369,40 @@
# print '=== %s::%s ===' % (target, toolchain)
# Let's build our test
if target not in TARGET_MAP:
- print self.logger.log_line(self.logger.LogType.NOTIF, 'Skipped tests for %s target. Target platform not found'% (target))
+ print(self.logger.log_line(
+ self.logger.LogType.NOTIF,
+ 'Skipped tests for %s target. Target platform not found' %
+ (target)))
continue
- clean_mbed_libs_options = True if self.opts_goanna_for_mbed_sdk or clean or self.opts_clean else None
+ clean_mbed_libs_options = (self.opts_goanna_for_mbed_sdk or
+ self.opts_clean or clean)
profile = extract_profile(self.opts_parser, self.opts, toolchain)
stats_depth = self.opts.stats_depth or 2
-
try:
- build_mbed_libs_result = build_mbed_libs(T,
- toolchain,
- clean=clean_mbed_libs_options,
- verbose=self.opts_verbose,
- jobs=self.opts_jobs,
- report=build_report,
- properties=build_properties,
- build_profile=profile)
+ build_mbed_libs_result = build_mbed_libs(
+ T, toolchain,
+ clean=clean_mbed_libs_options,
+ jobs=self.opts_jobs,
+ report=build_report,
+ properties=build_properties,
+ build_profile=profile,
+ notify=TerminalNotifier())
if not build_mbed_libs_result:
- print self.logger.log_line(self.logger.LogType.NOTIF, 'Skipped tests for %s target. Toolchain %s is not yet supported for this target'% (T.name, toolchain))
+ print(self.logger.log_line(
+ self.logger.LogType.NOTIF,
+ 'Skipped tests for %s target. Toolchain %s is not '
+ 'supported for this target'% (T.name, toolchain)))
continue
except ToolException:
- print self.logger.log_line(self.logger.LogType.ERROR, 'There were errors while building MBED libs for %s using %s'% (target, toolchain))
+ print(self.logger.log_line(
+ self.logger.LogType.ERROR,
+ 'There were errors while building MBED libs for %s using %s'
+ % (target, toolchain)))
continue
build_dir = join(BUILD_DIR, "test", target, toolchain)
@@ -402,16 +420,22 @@
if self.db_logger:
self.db_logger.reconnect();
if self.db_logger.is_connected():
- self.db_logger.update_build_id_info(self.db_logger_build_id, _shuffle_seed=self.shuffle_random_func())
+ self.db_logger.update_build_id_info(
+ self.db_logger_build_id,
+ _shuffle_seed=self.shuffle_random_func())
self.db_logger.disconnect();
if self.db_logger:
self.db_logger.reconnect();
if self.db_logger.is_connected():
# Update MUTs and Test Specification in database
- self.db_logger.update_build_id_info(self.db_logger_build_id, _muts=self.muts, _test_spec=self.test_spec)
+ self.db_logger.update_build_id_info(
+ self.db_logger_build_id,
+ _muts=self.muts, _test_spec=self.test_spec)
# Update Extra information in database (some options passed to test suite)
- self.db_logger.update_build_id_info(self.db_logger_build_id, _extra=json.dumps(self.dump_options()))
+ self.db_logger.update_build_id_info(
+ self.db_logger_build_id,
+ _extra=json.dumps(self.dump_options()))
self.db_logger.disconnect();
valid_test_map_keys = self.get_valid_tests(test_map_keys, target, toolchain, test_ids, self.opts_include_non_automated)
@@ -441,15 +465,17 @@
build_lib(lib_id,
T,
toolchain,
- verbose=self.opts_verbose,
clean=clean_mbed_libs_options,
jobs=self.opts_jobs,
report=build_report,
properties=build_properties,
- build_profile=profile)
+ build_profile=profile,
+ notify=TerminalNotifier())
except ToolException:
- print self.logger.log_line(self.logger.LogType.ERROR, 'There were errors while building library %s'% (lib_id))
+ print(self.logger.log_line(
+ self.logger.LogType.ERROR,
+ 'There were errors while building library %s' % lib_id))
continue
@@ -483,31 +509,40 @@
project_name = self.opts_firmware_global_name if self.opts_firmware_global_name else None
try:
- path = build_project(test.source_dir, join(build_dir, test_id), T,
+ path = build_project(
+ test.source_dir, join(build_dir, test_id), T,
toolchain, test.dependencies, clean=clean_project_options,
- verbose=self.opts_verbose, name=project_name, macros=MACROS,
+ name=project_name, macros=MACROS,
inc_dirs=INC_DIRS, jobs=self.opts_jobs, report=build_report,
properties=build_properties, project_id=test_id,
project_description=test.get_description(),
- build_profile=profile, stats_depth=stats_depth)
+ build_profile=profile, stats_depth=stats_depth,
+ notify=TerminalNotifier(),
+ )
- except Exception, e:
+ except Exception as e:
project_name_str = project_name if project_name is not None else test_id
test_result = self.TEST_RESULT_FAIL
if isinstance(e, ToolException):
- print self.logger.log_line(self.logger.LogType.ERROR, 'There were errors while building project %s'% (project_name_str))
+ print(self.logger.log_line(
+ self.logger.LogType.ERROR,
+ 'There were errors while building project %s' %
+ project_name_str))
test_result = self.TEST_RESULT_BUILD_FAILED
elif isinstance(e, NotSupportedException):
- print self.logger.log_line(self.logger.LogType.INFO, 'The project %s is not supported'% (project_name_str))
+ print(self.logger.log_line(
+ self.logger.LogType.INFO,
+ 'Project %s is not supported' % project_name_str))
test_result = self.TEST_RESULT_NOT_SUPPORTED
# Append test results to global test summary
self.test_summary.append(
- (test_result, target, toolchain, test_id, test.get_description(), 0, 0, '-')
+ (test_result, target, toolchain, test_id,
+ test.get_description(), 0, 0, '-')
)
# Add detailed test result to test summary structure
@@ -603,7 +638,7 @@
# in separate threads do not collide.
# Inside execute_thread_slice() function function handle() will be called to
# get information about available MUTs (per target).
- for target, toolchains in self.test_spec['targets'].iteritems():
+ for target, toolchains in self.test_spec['targets'].items():
self.test_suite_properties_ext[target] = {}
t = threading.Thread(target=self.execute_thread_slice, args = (q, target, toolchains, clean, test_ids, self.build_report, self.build_properties))
t.daemon = True
@@ -614,7 +649,7 @@
q.get() # t.join() would block some threads because we should not wait in any order for thread end
else:
# Serialized (not parallel) test execution
- for target, toolchains in self.test_spec['targets'].iteritems():
+ for target, toolchains in self.test_spec['targets'].items():
if target not in self.test_suite_properties_ext:
self.test_suite_properties_ext[target] = {}
@@ -642,23 +677,33 @@
if self.opts_test_only_peripheral and not test.peripherals:
if self.opts_verbose_skipped_tests:
- print self.logger.log_line(self.logger.LogType.INFO, 'Common test skipped for target %s'% (target))
+ print(self.logger.log_line(
+ self.logger.LogType.INFO,
+ 'Common test skipped for target %s' % target))
continue
- if self.opts_peripheral_by_names and test.peripherals and not len([i for i in test.peripherals if i in self.opts_peripheral_by_names]):
+ if (self.opts_peripheral_by_names and test.peripherals and
+ not any((i in self.opts_peripheral_by_names)
+ for i in test.peripherals)):
# We will skip tests not forced with -p option
if self.opts_verbose_skipped_tests:
- print self.logger.log_line(self.logger.LogType.INFO, 'Common test skipped for target %s'% (target))
+ print(self.logger.log_line(
+ self.logger.LogType.INFO,
+ 'Common test skipped for target %s' % target))
continue
if self.opts_test_only_common and test.peripherals:
if self.opts_verbose_skipped_tests:
- print self.logger.log_line(self.logger.LogType.INFO, 'Peripheral test skipped for target %s'% (target))
+ print(self.logger.log_line(
+ self.logger.LogType.INFO,
+ 'Peripheral test skipped for target %s' % target))
continue
if not include_non_automated and not test.automated:
if self.opts_verbose_skipped_tests:
- print self.logger.log_line(self.logger.LogType.INFO, 'Non automated test skipped for target %s'% (target))
+ print(self.logger.log_line(
+ self.logger.LogType.INFO,
+ 'Non automated test skipped for target %s' % target))
continue
if test.is_supported(target, toolchain):
@@ -673,9 +718,15 @@
elif not self.is_peripherals_available(target, test.peripherals):
if self.opts_verbose_skipped_tests:
if test.peripherals:
- print self.logger.log_line(self.logger.LogType.INFO, 'Peripheral %s test skipped for target %s'% (",".join(test.peripherals), target))
+ print(self.logger.log_line(
+ self.logger.LogType.INFO,
+ 'Peripheral %s test skipped for target %s' %
+ (",".join(test.peripherals), target)))
else:
- print self.logger.log_line(self.logger.LogType.INFO, 'Test %s skipped for target %s'% (test_id, target))
+ print(self.logger.log_line(
+ self.logger.LogType.INFO,
+ 'Test %s skipped for target %s' %
+ (test_id, target)))
continue
# The test has made it through all the filters, so add it to the valid tests list
@@ -715,7 +766,7 @@
result_dict[test[TEST_INDEX]][test[TOOLCHAIN_INDEX]] = test[RESULT_INDEX]
pt_cols = ["Target", "Test ID", "Test Description"] + unique_target_toolchains
- pt = PrettyTable(pt_cols)
+ pt = PrettyTable(pt_cols, junction_char="|", hrules=HEADER)
for col in pt_cols:
pt.align[col] = "l"
pt.padding_width = 1 # One space between column edges and contents (default)
@@ -743,7 +794,7 @@
result = "Test summary:\n"
# Pretty table package is used to print results
pt = PrettyTable(["Result", "Target", "Toolchain", "Test ID", "Test Description",
- "Elapsed Time (sec)", "Timeout (sec)", "Loops"])
+ "Elapsed Time (sec)", "Timeout (sec)", "Loops"], junction_char="|", hrules=HEADER)
pt.align["Result"] = "l" # Left align
pt.align["Target"] = "l" # Left align
pt.align["Toolchain"] = "l" # Left align
@@ -773,7 +824,7 @@
result += "\n"
# Print result count
- result += "Result: " + ' / '.join(['%s %s' % (value, key) for (key, value) in {k: v for k, v in result_dict.items() if v != 0}.iteritems()])
+ result += "Result: " + ' / '.join(['%s %s' % (value, key) for (key, value) in {k: v for k, v in result_dict.items() if v != 0}.items()])
shuffle_seed_text = "Shuffle Seed: %.*f\n"% (self.SHUFFLE_SEED_ROUND,
shuffle_seed if shuffle_seed else self.shuffle_random_seed)
result += "\n%s"% (shuffle_seed_text if self.opts_shuffle_test_order else '')
@@ -812,7 +863,7 @@
resutl_msg = ""
try:
os.remove(file_path)
- except Exception, e:
+ except Exception as e:
resutl_msg = e
result = False
return result, resutl_msg
@@ -828,7 +879,7 @@
duration = data.get("duration", 10)
if mut is None:
- print "Error: No Mbed available: MUT[%s]" % data['mcu']
+ print("Error: No Mbed available: MUT[%s]" % data['mcu'])
return None
mcu = mut['mcu']
@@ -864,7 +915,7 @@
break
if not found:
- print "Error: mbed not found with MBEDLS: %s" % data['mcu']
+ print("Error: mbed not found with MBEDLS: %s" % data['mcu'])
return None
else:
mut = muts_list[1]
@@ -895,7 +946,7 @@
single_test_result = self.TEST_RESULT_NO_IMAGE
elapsed_time = 0
single_test_output = self.logger.log_line(self.logger.LogType.ERROR, 'Image file does not exist: %s'% image_path)
- print single_test_output
+ print(single_test_output)
else:
# Host test execution
start_host_exec_time = time()
@@ -930,8 +981,9 @@
'copy_method' : _copy_method,
}
- print self.print_test_result(single_test_result, target_name_unique, toolchain_name,
- test_id, test_description, elapsed_time, single_timeout)
+ print(self.print_test_result(
+ single_test_result, target_name_unique, toolchain_name, test_id,
+ test_description, elapsed_time, single_timeout))
# Update database entries for ongoing test
if self.db_logger and self.db_logger.is_connected():
@@ -972,7 +1024,7 @@
# Find a suitable MUT:
mut = None
- for id, m in self.muts.iteritems():
+ for id, m in self.muts.items():
if m['mcu'] == data['mcu']:
mut = m
handle_result = self.handle_mut(mut, data, target_name, toolchain_name, test_loops=test_loops)
@@ -1027,7 +1079,7 @@
"""
try:
c = obs.queue.get(block=True, timeout=0.5)
- except Empty, _:
+ except Empty:
c = None
return c
@@ -1060,7 +1112,6 @@
result = property.groups()[0]
return result
- # print "{%s} port:%s disk:%s" % (name, port, disk),
cmd = ["python",
'%s.py'% name,
'-d', disk,
@@ -1083,8 +1134,8 @@
cmd += ["-R", str(reset_tout)]
if verbose:
- print Fore.MAGENTA + "Executing '" + " ".join(cmd) + "'" + Fore.RESET
- print "Test::Output::Start"
+ print(Fore.MAGENTA + "Executing '" + " ".join(cmd) + "'" + Fore.RESET)
+ print("Test::Output::Start")
proc = Popen(cmd, stdout=PIPE, cwd=HOST_TESTS)
obs = ProcessObserver(proc)
@@ -1138,7 +1189,7 @@
output.append(c)
if verbose:
- print "Test::Output::Finish"
+ print("Test::Output::Finish")
# Stop test process
obs.stop()
@@ -1150,7 +1201,7 @@
"""
if peripherals is not None:
peripherals = set(peripherals)
- for id, mut in self.muts.iteritems():
+ for id, mut in self.muts.items():
# Target MCU name check
if mut["mcu"] != target_mcu_name:
continue
@@ -1205,9 +1256,10 @@
line_no = 1
for json_line in data_file:
if line_no + 5 >= line: # Print last few lines before error
- print 'Line %d:\t'%line_no + json_line, # Prints line
+ print('Line %d:\t'%line_no + json_line)
if line_no == line:
- print ' ' * len('Line %d:'%line_no) + '\t', '-' * (column-1) + '^'
+ print('%s\t%s^' (' ' * len('Line %d:' % line_no),
+ '-' * (column - 1)))
break
line_no += 1
@@ -1244,18 +1296,19 @@
result = json.load(data_file)
except ValueError as json_error_msg:
result = None
- print 'JSON file %s parsing failed. Reason: %s' % (json_spec_filename, json_error_msg)
+ print('JSON file %s parsing failed. Reason: %s' %
+ (json_spec_filename, json_error_msg))
# We can print where error occurred inside JSON file if we can parse exception msg
json_format_defect_pos = json_format_error_defect_pos(str(json_error_msg))
if json_format_defect_pos is not None:
line = json_format_defect_pos[0]
column = json_format_defect_pos[1]
- print
+ print()
show_json_file_format_error(json_spec_filename, line, column)
except IOError as fileopen_error_msg:
- print 'JSON file %s not opened. Reason: %s'% (json_spec_filename, fileopen_error_msg)
- print
+ print('JSON file %s not opened. Reason: %s\n'%
+ (json_spec_filename, fileopen_error_msg))
if verbose and result:
pp = pprint.PrettyPrinter(indent=4)
pp.pprint(result)
@@ -1275,7 +1328,7 @@
# Prepare pretty table object to display all MUTs
pt_cols = ["index"] + muts_info_cols
- pt = PrettyTable(pt_cols)
+ pt = PrettyTable(pt_cols, junction_char="|", hrules=HEADER)
for col in pt_cols:
pt.align[col] = "l"
@@ -1290,7 +1343,7 @@
if add_row:
for col in muts_info_cols:
cell_val = mut_info[col] if col in mut_info else None
- if type(cell_val) == ListType:
+ if isinstance(cell_val, list):
cell_val = join_delim.join(cell_val)
row.append(cell_val)
pt.add_row(row)
@@ -1313,7 +1366,7 @@
# Prepare pretty table object to display test specification
pt_cols = ["mcu"] + sorted(toolchains_info_cols)
- pt = PrettyTable(pt_cols)
+ pt = PrettyTable(pt_cols, junction_char="|", hrules=HEADER)
for col in pt_cols:
pt.align[col] = "l"
@@ -1402,7 +1455,7 @@
'duration'] if cols is None else cols
# All tests status table print
- pt = PrettyTable(test_properties)
+ pt = PrettyTable(test_properties, junction_char="|", hrules=HEADER)
for col in test_properties:
pt.align[col] = "l"
pt.align['duration'] = "r"
@@ -1423,7 +1476,7 @@
for col in test_properties:
col_value = test[col]
- if type(test[col]) == ListType:
+ if isinstance(test[col], list):
col_value = join_delim.join(test[col])
elif test[col] == None:
col_value = "-"
@@ -1442,7 +1495,7 @@
if result_summary and not platform_filter:
# Automation result summary
test_id_cols = ['automated', 'all', 'percent [%]', 'progress']
- pt = PrettyTable(test_id_cols)
+ pt = PrettyTable(test_id_cols, junction_char="|", hrules=HEADER)
pt.align['automated'] = "r"
pt.align['all'] = "r"
pt.align['percent [%]'] = "r"
@@ -1456,7 +1509,7 @@
# Test automation coverage table print
test_id_cols = ['id', 'automated', 'all', 'percent [%]', 'progress']
- pt = PrettyTable(test_id_cols)
+ pt = PrettyTable(test_id_cols, junction_char="|", hrules=HEADER)
pt.align['id'] = "l"
pt.align['automated'] = "r"
pt.align['all'] = "r"
@@ -1502,13 +1555,14 @@
# Human readable summary
if not single_test.opts_suppress_summary:
# prints well-formed summary with results (SQL table like)
- print single_test.generate_test_summary(test_summary, shuffle_seed)
+ print(single_test.generate_test_summary(test_summary, shuffle_seed))
if single_test.opts_test_x_toolchain_summary:
# prints well-formed summary with results (SQL table like)
# table shows text x toolchain test result matrix
- print single_test.generate_test_summary_by_target(test_summary, shuffle_seed)
+ print(single_test.generate_test_summary_by_target(test_summary,
+ shuffle_seed))
- print "Completed in %.2f sec"% (elapsed_time)
+ print("Completed in %.2f sec" % elapsed_time)
print
# Write summary of the builds
@@ -1628,19 +1682,19 @@
# Let's try to connect
db_ = factory_db_logger(db_url)
if db_ is not None:
- print "Connecting to database '%s'..."% db_url,
+ print("Connecting to database '%s'..." % db_url)
db_.connect(host, username, password, db_name)
if db_.is_connected():
- print "ok"
- print "Detecting database..."
- print db_.detect_database(verbose=True)
- print "Disconnecting...",
+ print("ok")
+ print("Detecting database...")
+ print(db_.detect_database(verbose=True))
+ print("Disconnecting...")
db_.disconnect()
- print "done"
+ print("done")
else:
- print "Database type '%s' unknown"% db_type
+ print("Database type '%s' unknown" % db_type)
else:
- print "Parse error: '%s' - DB Url error"% (db_url)
+ print("Parse error: '%s' - DB Url error" % db_url)
def get_module_avail(module_name):
@@ -2003,7 +2057,7 @@
"""Finds the path to a test configuration file
config_name: path to a custom configuration file OR mbed OS interface "ethernet, wifi_odin, etc"
target_name: name of target to determing if mbed OS interface given is valid
- returns path to config, boolean of whether it is a module or mbed OS interface
+ returns path to config, will return None if no valid config is found
"""
# If they passed in a full path
if exists(config_name):
@@ -2012,64 +2066,106 @@
# Otherwise find the path to configuration file based on mbed OS interface
return TestConfig.get_config_path(config_name, target_name)
-def find_tests(base_dir, target_name, toolchain_name, app_config=None):
+
+def find_tests(base_dir, target_name, toolchain_name, icetea, greentea, app_config=None):
""" Finds all tests in a directory recursively
- base_dir: path to the directory to scan for tests (ex. 'path/to/project')
- target_name: name of the target to use for scanning (ex. 'K64F')
- toolchain_name: name of the toolchain to use for scanning (ex. 'GCC_ARM')
- options: Compile options to pass to the toolchain (ex. ['debug-info'])
- app_config - location of a chosen mbed_app.json file
+ :param base_dir: path to the directory to scan for tests (ex. 'path/to/project')
+ :param target_name: name of the target to use for scanning (ex. 'K64F')
+ :param toolchain_name: name of the toolchain to use for scanning (ex. 'GCC_ARM')
+ :param icetea: icetea enabled
+ :param greentea: greentea enabled
+ :param app_config - location of a chosen mbed_app.json file
+
+ returns a dictionary where keys are the test name, and the values are
+ lists of paths needed to biuld the test.
"""
+ # Temporary structure: tests referenced by (name, base, group, case) tuple
tests = {}
+ # List of common folders: (predicate function, path) tuple
+ commons = []
- # Prepare the toolchain
- toolchain = prepare_toolchain([base_dir], None, target_name, toolchain_name,
- silent=True, app_config=app_config)
+ config = Config(target_name, base_dir, app_config)
# Scan the directory for paths to probe for 'TESTS' folders
- base_resources = scan_resources([base_dir], toolchain)
+ base_resources = Resources(MockNotifier(), collect_ignores=True)
+ base_resources.scan_with_config(base_dir, config)
- dirs = base_resources.inc_dirs
- for directory in dirs:
- subdirs = os.listdir(directory)
+ if greentea:
+ dirs = [d for d in base_resources.ignored_dirs if basename(d) == 'TESTS']
+ ignoreset = MbedIgnoreSet()
- # If the directory contains a subdirectory called 'TESTS', scan it for test cases
- if 'TESTS' in subdirs:
- walk_base_dir = join(directory, 'TESTS')
- test_resources = toolchain.scan_resources(walk_base_dir, base_path=base_dir)
-
- # Loop through all subdirectories
- for d in test_resources.inc_dirs:
+ for directory in dirs:
+ ignorefile = join(directory, IGNORE_FILENAME)
+ if isfile(ignorefile):
+ ignoreset.add_mbedignore(directory, ignorefile)
+ for test_group_directory in os.listdir(directory):
+ grp_dir = join(directory, test_group_directory)
+ if not isdir(grp_dir) or ignoreset.is_ignored(grp_dir):
+ continue
+ grpignorefile = join(grp_dir, IGNORE_FILENAME)
+ if isfile(grpignorefile):
+ ignoreset.add_mbedignore(grp_dir, grpignorefile)
+ for test_case_directory in os.listdir(grp_dir):
+ d = join(directory, test_group_directory, test_case_directory)
+ if not isdir(d) or ignoreset.is_ignored(d):
+ continue
+ special_dirs = ['host_tests', 'COMMON']
+ if test_group_directory not in special_dirs and test_case_directory not in special_dirs:
+ test_name = test_path_to_name(d, base_dir)
+ tests[(test_name, directory, test_group_directory, test_case_directory)] = [d]
+ if test_case_directory == 'COMMON':
+ def predicate(base_pred, group_pred, name_base_group_case):
+ (name, base, group, case) = name_base_group_case
+ return base == base_pred and group == group_pred
- # If the test case folder is not called 'host_tests' and it is
- # located two folders down from the main 'TESTS' folder (ex. TESTS/testgroup/testcase)
- # then add it to the tests
- path_depth = get_path_depth(relpath(d, walk_base_dir))
- if path_depth == 2:
- test_group_directory_path, test_case_directory = os.path.split(d)
- test_group_directory = os.path.basename(test_group_directory_path)
+ commons.append((functools.partial(predicate, directory, test_group_directory), d))
+ if test_group_directory == 'COMMON':
+ def predicate(base_pred, name_base_group_case):
+ (name, base, group, case) = name_base_group_case
+ return base == base_pred
+
+ commons.append((functools.partial(predicate, directory), grp_dir))
- # Check to make sure discoverd folder is not in a host test directory
- if test_case_directory != 'host_tests' and test_group_directory != 'host_tests':
- test_name = test_path_to_name(d, base_dir)
- tests[test_name] = d
+ if icetea:
+ dirs = [d for d in base_resources.ignored_dirs if basename(d) == 'TEST_APPS']
+ for directory in dirs:
+ if not isdir(directory):
+ continue
+ for subdir in os.listdir(directory):
+ d = join(directory, subdir)
+ if not isdir(d):
+ continue
+ if 'device' == subdir:
+ for test_dir in os.listdir(d):
+ test_dir_path = join(d, test_dir)
+ test_name = test_path_to_name(test_dir_path, base_dir)
+ tests[(test_name, directory, subdir, test_dir)] = [test_dir_path]
- return tests
+ # Apply common directories
+ for pred, path in commons:
+ for test_identity, test_paths in six.iteritems(tests):
+ if pred(test_identity):
+ test_paths.append(path)
+
+ # Drop identity besides name
+ return {name: paths for (name, _, _, _), paths in six.iteritems(tests)}
+
def print_tests(tests, format="list", sort=True):
"""Given a dictionary of tests (as returned from "find_tests"), print them
in the specified format"""
if format == "list":
for test_name in sorted(tests.keys()):
- test_path = tests[test_name]
- print "Test Case:"
- print " Name: %s" % test_name
- print " Path: %s" % test_path
+ test_path = tests[test_name][0]
+ print("Test Case:")
+ print(" Name: %s" % test_name)
+ print(" Path: %s" % test_path)
elif format == "json":
- print json.dumps(tests, indent=2)
+ print(json.dumps({test_name: test_path[0] for test_name, test_paths
+ in tests}, indent=2))
else:
- print "Unknown format '%s'" % format
+ print("Unknown format '%s'" % format)
sys.exit(1)
def norm_relative_path(path, start):
@@ -2104,22 +2200,22 @@
}
# Use parent TOOLCHAIN_PATHS variable
- for key, value in kwargs['toolchain_paths'].iteritems():
+ for key, value in kwargs['toolchain_paths'].items():
TOOLCHAIN_PATHS[key] = value
del kwargs['toolchain_paths']
try:
- bin_file = build_project(*args, **kwargs)
+ bin_file, _ = build_project(*args, **kwargs)
ret['result'] = True
ret['bin_file'] = bin_file
ret['kwargs'] = kwargs
- except NotSupportedException, e:
+ except NotSupportedException as e:
ret['reason'] = e
- except ToolException, e:
+ except ToolException as e:
ret['reason'] = e
- except KeyboardInterrupt, e:
+ except KeyboardInterrupt as e:
ret['reason'] = e
except:
# Print unhandled exceptions here
@@ -2130,10 +2226,10 @@
def build_tests(tests, base_source_paths, build_path, target, toolchain_name,
- clean=False, notify=None, verbose=False, jobs=1, macros=None,
+ clean=False, notify=None, jobs=1, macros=None,
silent=False, report=None, properties=None,
continue_on_build_fail=False, app_config=None,
- build_profile=None, stats_depth=None):
+ build_profile=None, stats_depth=None, ignore=None):
"""Given the data structure from 'find_tests' and the typical build parameters,
build all the tests
@@ -2143,8 +2239,12 @@
execution_directory = "."
base_path = norm_relative_path(build_path, execution_directory)
- target_name = target if isinstance(target, str) else target.name
- cfg, _, _ = get_config(base_source_paths, target_name, toolchain_name)
+ if isinstance(target, Target):
+ target_name = target.name
+ else:
+ target_name = target
+ target = TARGET_MAP[target_name]
+ cfg, _, _ = get_config(base_source_paths, target, app_config=app_config)
baud_rate = 9600
if 'platform.stdio-baud-rate' in cfg:
@@ -2156,7 +2256,8 @@
"base_path": base_path,
"baud_rate": baud_rate,
"binary_type": "bootable",
- "tests": {}
+ "tests": {},
+ "test_apps": {}
}
result = True
@@ -2164,13 +2265,16 @@
jobs_count = int(jobs if jobs else cpu_count())
p = Pool(processes=jobs_count)
results = []
- for test_name, test_path in tests.iteritems():
- test_build_path = os.path.join(build_path, test_path)
- src_path = base_source_paths + [test_path]
+ for test_name, test_paths in tests.items():
+ if not isinstance(test_paths, list):
+ test_paths = [test_paths]
+
+ test_build_path = os.path.join(build_path, test_paths[0])
+ src_paths = base_source_paths + test_paths
bin_file = None
- test_case_folder_name = os.path.basename(test_path)
+ test_case_folder_name = os.path.basename(test_paths[0])
- args = (src_path, test_build_path, target, toolchain_name)
+ args = (src_paths, test_build_path, deepcopy(target), toolchain_name)
kwargs = {
'jobs': 1,
'clean': clean,
@@ -2179,12 +2283,11 @@
'project_id': test_name,
'report': report,
'properties': properties,
- 'verbose': verbose,
'app_config': app_config,
'build_profile': build_profile,
- 'silent': True,
'toolchain_paths': TOOLCHAIN_PATHS,
- 'stats_depth': stats_depth
+ 'stats_depth': stats_depth,
+ 'notify': MockNotifier()
}
results.append(p.apply_async(build_test_worker, args, kwargs))
@@ -2207,9 +2310,15 @@
worker_result = r.get()
results.remove(r)
+ # Push all deferred notifications out to the actual notifier
+ new_notify = deepcopy(notify)
+ for message in worker_result['kwargs']['notify'].messages:
+ new_notify.notify(message)
+
# Take report from the kwargs and merge it into existing report
if report:
report_entry = worker_result['kwargs']['report'][target_name][toolchain_name]
+ report_entry[worker_result['kwargs']['project_id'].upper()][0][0]['output'] = new_notify.get_output()
for test_key in report_entry.keys():
report[target_name][toolchain_name][test_key] = report_entry[test_key]
@@ -2220,13 +2329,15 @@
result = False
break
+
# Adding binary path to test build result
if ('result' in worker_result and
worker_result['result'] and
'bin_file' in worker_result):
bin_file = norm_relative_path(worker_result['bin_file'], execution_directory)
- test_build['tests'][worker_result['kwargs']['project_id']] = {
+ test_key = 'test_apps' if 'test_apps-' in worker_result['kwargs']['project_id'] else 'tests'
+ test_build[test_key][worker_result['kwargs']['project_id']] = {
"binaries": [
{
"path": bin_file
@@ -2235,9 +2346,7 @@
}
test_key = worker_result['kwargs']['project_id'].upper()
- if report:
- print report[target_name][toolchain_name][test_key][0][0]['output'].rstrip()
- print 'Image: %s\n' % bin_file
+ print('Image: %s\n' % bin_file)
except:
if p._taskqueue.queue:
@@ -2271,4 +2380,4 @@
def test_spec_from_test_builds(test_builds):
return {
"builds": test_builds
- }
+ }
\ No newline at end of file