Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
test_api.py
00001 """ 00002 mbed SDK 00003 Copyright (c) 2011-2014 ARM Limited 00004 00005 Licensed under the Apache License, Version 2.0 (the "License"); 00006 you may not use this file except in compliance with the License. 00007 You may obtain a copy of the License at 00008 00009 http://www.apache.org/licenses/LICENSE-2.0 00010 00011 Unless required by applicable law or agreed to in writing, software 00012 distributed under the License is distributed on an "AS IS" BASIS, 00013 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00014 See the License for the specific language governing permissions and 00015 limitations under the License. 00016 00017 Author: Przemyslaw Wirkus <Przemyslaw.wirkus@arm.com> 00018 """ 00019 from __future__ import print_function 00020 00021 import os 00022 import re 00023 import sys 00024 import json 00025 import uuid 00026 import pprint 00027 import random 00028 import argparse 00029 import datetime 00030 import threading 00031 import ctypes 00032 import functools 00033 from colorama import Fore, Back, Style 00034 from prettytable import PrettyTable 00035 from copy import copy, deepcopy 00036 00037 from time import sleep, time 00038 try: 00039 from Queue import Queue, Empty 00040 except ImportError: 00041 from queue import Queue, Empty 00042 from os.path import join, exists, basename, relpath 00043 from threading import Thread, Lock 00044 from multiprocessing import Pool, cpu_count 00045 from subprocess import Popen, PIPE 00046 00047 # Imports related to mbed build api 00048 from tools.tests import TESTS 00049 from tools.tests import TEST_MAP 00050 from tools.paths import BUILD_DIR 00051 from tools.paths import HOST_TESTS 00052 from tools.utils import ToolException 00053 from tools.utils import NotSupportedException 00054 from tools.utils import construct_enum 00055 from tools.memap import MemapParser 00056 from tools.targets import TARGET_MAP, Target 00057 import tools.test_configs as TestConfig 00058 from tools.test_db import BaseDBAccess 00059 from tools.build_api import build_project, build_mbed_libs, build_lib 00060 from tools.build_api import get_target_supported_toolchains 00061 from tools.build_api import write_build_report 00062 from tools.build_api import prep_report 00063 from tools.build_api import prep_properties 00064 from tools.build_api import create_result 00065 from tools.build_api import add_result_to_report 00066 from tools.build_api import prepare_toolchain 00067 from tools.build_api import scan_resources 00068 from tools.build_api import get_config 00069 from tools.libraries import LIBRARIES, LIBRARY_MAP 00070 from tools.options import extract_profile 00071 from tools.toolchains import TOOLCHAIN_PATHS 00072 from tools.toolchains import TOOLCHAINS 00073 from tools.test_exporters import ReportExporter, ResultExporterType 00074 from tools.utils import argparse_filestring_type 00075 from tools.utils import argparse_uppercase_type 00076 from tools.utils import argparse_lowercase_type 00077 from tools.utils import argparse_many 00078 from tools.notifier.mock import MockNotifier 00079 00080 import tools.host_tests.host_tests_plugins as host_tests_plugins 00081 00082 try: 00083 import mbed_lstools 00084 from tools.compliance.ioper_runner import get_available_oper_test_scopes 00085 except: 00086 pass 00087 00088 00089 class ProcessObserver(Thread): 00090 def __init__(self, proc): 00091 Thread.__init__(self) 00092 self.proc = proc 00093 self.queue = Queue() 00094 self.daemon = True 00095 self.active = True 00096 self.start() 00097 00098 def run(self): 00099 while self.active: 00100 c = self.proc.stdout.read(1) 00101 self.queue.put(c) 00102 00103 def stop(self): 00104 self.active = False 00105 try: 00106 self.proc.terminate() 00107 except Exception: 00108 pass 00109 00110 00111 class SingleTestExecutor (threading.Thread): 00112 """ Example: Single test class in separate thread usage 00113 """ 00114 def __init__(self, single_test): 00115 self.single_test = single_test 00116 threading.Thread.__init__(self) 00117 00118 def run(self): 00119 start = time() 00120 # Execute tests depending on options and filter applied 00121 test_summary, shuffle_seed, test_summary_ext, test_suite_properties_ext = self.single_test .execute() 00122 elapsed_time = time() - start 00123 00124 # Human readable summary 00125 if not self.single_test .opts_suppress_summary: 00126 # prints well-formed summary with results (SQL table like) 00127 print(self.single_test .generate_test_summary(test_summary, 00128 shuffle_seed)) 00129 if self.single_test .opts_test_x_toolchain_summary: 00130 # prints well-formed summary with results (SQL table like) 00131 # table shows text x toolchain test result matrix 00132 print(self.single_test .generate_test_summary_by_target( 00133 test_summary, shuffle_seed)) 00134 print("Completed in %.2f sec"% (elapsed_time)) 00135 00136 00137 class SingleTestRunner (object): 00138 """ Object wrapper for single test run which may involve multiple MUTs 00139 """ 00140 RE_DETECT_TESTCASE_RESULT = None 00141 00142 # Return codes for test script 00143 TEST_RESULT_OK = "OK" 00144 TEST_RESULT_FAIL = "FAIL" 00145 TEST_RESULT_ERROR = "ERROR" 00146 TEST_RESULT_UNDEF = "UNDEF" 00147 TEST_RESULT_IOERR_COPY = "IOERR_COPY" 00148 TEST_RESULT_IOERR_DISK = "IOERR_DISK" 00149 TEST_RESULT_IOERR_SERIAL = "IOERR_SERIAL" 00150 TEST_RESULT_TIMEOUT = "TIMEOUT" 00151 TEST_RESULT_NO_IMAGE = "NO_IMAGE" 00152 TEST_RESULT_MBED_ASSERT = "MBED_ASSERT" 00153 TEST_RESULT_BUILD_FAILED = "BUILD_FAILED" 00154 TEST_RESULT_NOT_SUPPORTED = "NOT_SUPPORTED" 00155 00156 GLOBAL_LOOPS_COUNT = 1 # How many times each test should be repeated 00157 TEST_LOOPS_LIST = [] # We redefine no.of loops per test_id 00158 TEST_LOOPS_DICT = {} # TEST_LOOPS_LIST in dict format: { test_id : test_loop_count} 00159 00160 muts = {} # MUTs descriptor (from external file) 00161 test_spec = {} # Test specification (from external file) 00162 00163 # mbed test suite -> SingleTestRunner 00164 TEST_RESULT_MAPPING = {"success" : TEST_RESULT_OK, 00165 "failure" : TEST_RESULT_FAIL, 00166 "error" : TEST_RESULT_ERROR, 00167 "ioerr_copy" : TEST_RESULT_IOERR_COPY, 00168 "ioerr_disk" : TEST_RESULT_IOERR_DISK, 00169 "ioerr_serial" : TEST_RESULT_IOERR_SERIAL, 00170 "timeout" : TEST_RESULT_TIMEOUT, 00171 "no_image" : TEST_RESULT_NO_IMAGE, 00172 "end" : TEST_RESULT_UNDEF, 00173 "mbed_assert" : TEST_RESULT_MBED_ASSERT, 00174 "build_failed" : TEST_RESULT_BUILD_FAILED, 00175 "not_supproted" : TEST_RESULT_NOT_SUPPORTED 00176 } 00177 00178 def __init__ (self, 00179 _global_loops_count=1, 00180 _test_loops_list=None, 00181 _muts={}, 00182 _clean=False, 00183 _parser=None, 00184 _opts=None, 00185 _opts_db_url=None, 00186 _opts_log_file_name=None, 00187 _opts_report_html_file_name=None, 00188 _opts_report_junit_file_name=None, 00189 _opts_report_build_file_name=None, 00190 _opts_report_text_file_name=None, 00191 _opts_build_report={}, 00192 _opts_build_properties={}, 00193 _test_spec={}, 00194 _opts_goanna_for_mbed_sdk=None, 00195 _opts_goanna_for_tests=None, 00196 _opts_shuffle_test_order=False, 00197 _opts_shuffle_test_seed=None, 00198 _opts_test_by_names=None, 00199 _opts_peripheral_by_names=None, 00200 _opts_test_only_peripheral=False, 00201 _opts_test_only_common=False, 00202 _opts_verbose_skipped_tests=False, 00203 _opts_verbose_test_result_only=False, 00204 _opts_verbose=False, 00205 _opts_firmware_global_name=None, 00206 _opts_only_build_tests=False, 00207 _opts_parallel_test_exec=False, 00208 _opts_suppress_summary=False, 00209 _opts_test_x_toolchain_summary=False, 00210 _opts_copy_method=None, 00211 _opts_mut_reset_type=None, 00212 _opts_jobs=None, 00213 _opts_waterfall_test=None, 00214 _opts_consolidate_waterfall_test=None, 00215 _opts_extend_test_timeout=None, 00216 _opts_auto_detect=None, 00217 _opts_include_non_automated=False): 00218 """ Let's try hard to init this object 00219 """ 00220 from colorama import init 00221 init() 00222 00223 PATTERN = "\\{(" + "|".join(self.TEST_RESULT_MAPPING .keys()) + ")\\}" 00224 self.RE_DETECT_TESTCASE_RESULT = re.compile(PATTERN) 00225 # Settings related to test loops counters 00226 try: 00227 _global_loops_count = int(_global_loops_count) 00228 except: 00229 _global_loops_count = 1 00230 if _global_loops_count < 1: 00231 _global_loops_count = 1 00232 self.GLOBAL_LOOPS_COUNT = _global_loops_count 00233 self.TEST_LOOPS_LIST = _test_loops_list if _test_loops_list else [] 00234 self.TEST_LOOPS_DICT = self.test_loop_list_to_dict (_test_loops_list) 00235 00236 self.shuffle_random_seed = 0.0 00237 self.SHUFFLE_SEED_ROUND = 10 00238 00239 # MUT list and test specification storage 00240 self.muts = _muts 00241 self.test_spec = _test_spec 00242 00243 # Settings passed e.g. from command line 00244 self.opts_db_url = _opts_db_url 00245 self.opts_log_file_name = _opts_log_file_name 00246 self.opts_report_html_file_name = _opts_report_html_file_name 00247 self.opts_report_junit_file_name = _opts_report_junit_file_name 00248 self.opts_report_build_file_name = _opts_report_build_file_name 00249 self.opts_report_text_file_name = _opts_report_text_file_name 00250 self.opts_goanna_for_mbed_sdk = _opts_goanna_for_mbed_sdk 00251 self.opts_goanna_for_tests = _opts_goanna_for_tests 00252 self.opts_shuffle_test_order = _opts_shuffle_test_order 00253 self.opts_shuffle_test_seed = _opts_shuffle_test_seed 00254 self.opts_test_by_names = _opts_test_by_names 00255 self.opts_peripheral_by_names = _opts_peripheral_by_names 00256 self.opts_test_only_peripheral = _opts_test_only_peripheral 00257 self.opts_test_only_common = _opts_test_only_common 00258 self.opts_verbose_skipped_tests = _opts_verbose_skipped_tests 00259 self.opts_verbose_test_result_only = _opts_verbose_test_result_only 00260 self.opts_verbose = _opts_verbose 00261 self.opts_firmware_global_name = _opts_firmware_global_name 00262 self.opts_only_build_tests = _opts_only_build_tests 00263 self.opts_parallel_test_exec = _opts_parallel_test_exec 00264 self.opts_suppress_summary = _opts_suppress_summary 00265 self.opts_test_x_toolchain_summary = _opts_test_x_toolchain_summary 00266 self.opts_copy_method = _opts_copy_method 00267 self.opts_mut_reset_type = _opts_mut_reset_type 00268 self.opts_jobs = _opts_jobs if _opts_jobs is not None else 1 00269 self.opts_waterfall_test = _opts_waterfall_test 00270 self.opts_consolidate_waterfall_test = _opts_consolidate_waterfall_test 00271 self.opts_extend_test_timeout = _opts_extend_test_timeout 00272 self.opts_clean = _clean 00273 self.opts_parser = _parser 00274 self.opts = _opts 00275 self.opts_auto_detect = _opts_auto_detect 00276 self.opts_include_non_automated = _opts_include_non_automated 00277 00278 self.build_report = _opts_build_report 00279 self.build_properties = _opts_build_properties 00280 00281 # File / screen logger initialization 00282 self.logger = CLITestLogger(file_name=self.opts_log_file_name ) # Default test logger 00283 00284 # Database related initializations 00285 self.db_logger = factory_db_logger(self.opts_db_url ) 00286 self.db_logger_build_id = None # Build ID (database index of build_id table) 00287 # Let's connect to database to set up credentials and confirm database is ready 00288 if self.db_logger : 00289 self.db_logger .connect_url(self.opts_db_url ) # Save db access info inside db_logger object 00290 if self.db_logger .is_connected(): 00291 # Get hostname and uname so we can use it as build description 00292 # when creating new build_id in external database 00293 (_hostname, _uname) = self.db_logger .get_hostname() 00294 _host_location = os.path.dirname(os.path.abspath(__file__)) 00295 build_id_type = None if self.opts_only_build_tests is None else self.db_logger .BUILD_ID_TYPE_BUILD_ONLY 00296 self.db_logger_build_id = self.db_logger .get_next_build_id(_hostname, desc=_uname, location=_host_location, type=build_id_type) 00297 self.db_logger .disconnect() 00298 00299 def dump_options (self): 00300 """ Function returns data structure with common settings passed to SingelTestRunner 00301 It can be used for example to fill _extra fields in database storing test suite single run data 00302 Example: 00303 data = self.dump_options() 00304 or 00305 data_str = json.dumps(self.dump_options()) 00306 """ 00307 result = {"db_url" : str(self.opts_db_url ), 00308 "log_file_name" : str(self.opts_log_file_name ), 00309 "shuffle_test_order" : str(self.opts_shuffle_test_order ), 00310 "shuffle_test_seed" : str(self.opts_shuffle_test_seed ), 00311 "test_by_names" : str(self.opts_test_by_names ), 00312 "peripheral_by_names" : str(self.opts_peripheral_by_names ), 00313 "test_only_peripheral" : str(self.opts_test_only_peripheral ), 00314 "test_only_common" : str(self.opts_test_only_common ), 00315 "verbose" : str(self.opts_verbose ), 00316 "firmware_global_name" : str(self.opts_firmware_global_name ), 00317 "only_build_tests" : str(self.opts_only_build_tests ), 00318 "copy_method" : str(self.opts_copy_method ), 00319 "mut_reset_type" : str(self.opts_mut_reset_type ), 00320 "jobs" : str(self.opts_jobs ), 00321 "extend_test_timeout" : str(self.opts_extend_test_timeout ), 00322 "_dummy" : '' 00323 } 00324 return result 00325 00326 def shuffle_random_func(self): 00327 return self.shuffle_random_seed 00328 00329 def is_shuffle_seed_float (self): 00330 """ return true if function parameter can be converted to float 00331 """ 00332 result = True 00333 try: 00334 float(self.shuffle_random_seed ) 00335 except ValueError: 00336 result = False 00337 return result 00338 00339 # This will store target / toolchain specific properties 00340 test_suite_properties_ext = {} # target : toolchain 00341 # Here we store test results 00342 test_summary = [] 00343 # Here we store test results in extended data structure 00344 test_summary_ext = {} 00345 execute_thread_slice_lock = Lock() 00346 00347 def execute_thread_slice(self, q, target, toolchains, clean, test_ids, build_report, build_properties): 00348 for toolchain in toolchains: 00349 tt_id = "%s::%s" % (toolchain, target) 00350 00351 T = TARGET_MAP[target] 00352 00353 # print target, toolchain 00354 # Test suite properties returned to external tools like CI 00355 test_suite_properties = { 00356 'jobs': self.opts_jobs , 00357 'clean': clean, 00358 'target': target, 00359 'vendor': T.extra_labels[0], 00360 'test_ids': ', '.join(test_ids), 00361 'toolchain': toolchain, 00362 'shuffle_random_seed': self.shuffle_random_seed 00363 } 00364 00365 00366 # print '=== %s::%s ===' % (target, toolchain) 00367 # Let's build our test 00368 if target not in TARGET_MAP: 00369 print(self.logger .log_line( 00370 self.logger .LogType.NOTIF, 00371 'Skipped tests for %s target. Target platform not found' % 00372 (target))) 00373 continue 00374 00375 clean_mbed_libs_options = (self.opts_goanna_for_mbed_sdk or 00376 self.opts_clean or clean) 00377 00378 profile = extract_profile(self.opts_parser , self.opts , toolchain) 00379 stats_depth = self.opts .stats_depth or 2 00380 00381 try: 00382 build_mbed_libs_result = build_mbed_libs( 00383 T, toolchain, 00384 clean=clean_mbed_libs_options, 00385 verbose=self.opts_verbose , 00386 jobs=self.opts_jobs , 00387 report=build_report, 00388 properties=build_properties, 00389 build_profile=profile) 00390 00391 if not build_mbed_libs_result: 00392 print(self.logger .log_line( 00393 self.logger .LogType.NOTIF, 00394 'Skipped tests for %s target. Toolchain %s is not ' 00395 'supported for this target'% (T.name, toolchain))) 00396 continue 00397 00398 except ToolException: 00399 print(self.logger .log_line( 00400 self.logger .LogType.ERROR, 00401 'There were errors while building MBED libs for %s using %s' 00402 % (target, toolchain))) 00403 continue 00404 00405 build_dir = join(BUILD_DIR, "test", target, toolchain) 00406 00407 test_suite_properties['build_mbed_libs_result'] = build_mbed_libs_result 00408 test_suite_properties['build_dir'] = build_dir 00409 test_suite_properties['skipped'] = [] 00410 00411 # Enumerate through all tests and shuffle test order if requested 00412 test_map_keys = sorted(TEST_MAP.keys()) 00413 00414 if self.opts_shuffle_test_order : 00415 random.shuffle(test_map_keys, self.shuffle_random_func ) 00416 # Update database with shuffle seed f applicable 00417 if self.db_logger : 00418 self.db_logger .reconnect(); 00419 if self.db_logger .is_connected(): 00420 self.db_logger .update_build_id_info( 00421 self.db_logger_build_id , 00422 _shuffle_seed=self.shuffle_random_func ()) 00423 self.db_logger .disconnect(); 00424 00425 if self.db_logger : 00426 self.db_logger .reconnect(); 00427 if self.db_logger .is_connected(): 00428 # Update MUTs and Test Specification in database 00429 self.db_logger .update_build_id_info( 00430 self.db_logger_build_id , 00431 _muts=self.muts , _test_spec=self.test_spec ) 00432 # Update Extra information in database (some options passed to test suite) 00433 self.db_logger .update_build_id_info( 00434 self.db_logger_build_id , 00435 _extra=json.dumps(self.dump_options ())) 00436 self.db_logger .disconnect(); 00437 00438 valid_test_map_keys = self.get_valid_tests (test_map_keys, target, toolchain, test_ids, self.opts_include_non_automated ) 00439 skipped_test_map_keys = self.get_skipped_tests (test_map_keys, valid_test_map_keys) 00440 00441 for skipped_test_id in skipped_test_map_keys: 00442 test_suite_properties['skipped'].append(skipped_test_id) 00443 00444 00445 # First pass through all tests and determine which libraries need to be built 00446 libraries = [] 00447 for test_id in valid_test_map_keys: 00448 test = TEST_MAP[test_id] 00449 00450 # Detect which lib should be added to test 00451 # Some libs have to compiled like RTOS or ETH 00452 for lib in LIBRARIES: 00453 if lib['build_dir'] in test.dependencies and lib['id'] not in libraries: 00454 libraries.append(lib['id']) 00455 00456 00457 clean_project_options = True if self.opts_goanna_for_tests or clean or self.opts_clean else None 00458 00459 # Build all required libraries 00460 for lib_id in libraries: 00461 try: 00462 build_lib(lib_id, 00463 T, 00464 toolchain, 00465 verbose=self.opts_verbose , 00466 clean=clean_mbed_libs_options, 00467 jobs=self.opts_jobs , 00468 report=build_report, 00469 properties=build_properties, 00470 build_profile=profile) 00471 00472 except ToolException: 00473 print(self.logger .log_line( 00474 self.logger .LogType.ERROR, 00475 'There were errors while building library %s' % lib_id)) 00476 continue 00477 00478 00479 for test_id in valid_test_map_keys: 00480 test = TEST_MAP[test_id] 00481 00482 test_suite_properties['test.libs.%s.%s.%s'% (target, toolchain, test_id)] = ', '.join(libraries) 00483 00484 # TODO: move this 2 below loops to separate function 00485 INC_DIRS = [] 00486 for lib_id in libraries: 00487 if 'inc_dirs_ext' in LIBRARY_MAP[lib_id] and LIBRARY_MAP[lib_id]['inc_dirs_ext']: 00488 INC_DIRS.extend(LIBRARY_MAP[lib_id]['inc_dirs_ext']) 00489 00490 MACROS = [] 00491 for lib_id in libraries: 00492 if 'macros' in LIBRARY_MAP[lib_id] and LIBRARY_MAP[lib_id]['macros']: 00493 MACROS.extend(LIBRARY_MAP[lib_id]['macros']) 00494 MACROS.append('TEST_SUITE_TARGET_NAME="%s"'% target) 00495 MACROS.append('TEST_SUITE_TEST_ID="%s"'% test_id) 00496 test_uuid = uuid.uuid4() 00497 MACROS.append('TEST_SUITE_UUID="%s"'% str(test_uuid)) 00498 00499 # Prepare extended test results data structure (it can be used to generate detailed test report) 00500 if target not in self.test_summary_ext : 00501 self.test_summary_ext [target] = {} # test_summary_ext : toolchain 00502 if toolchain not in self.test_summary_ext [target]: 00503 self.test_summary_ext [target][toolchain] = {} # test_summary_ext : toolchain : target 00504 00505 tt_test_id = "%s::%s::%s" % (toolchain, target, test_id) # For logging only 00506 00507 project_name = self.opts_firmware_global_name if self.opts_firmware_global_name else None 00508 try: 00509 path = build_project(test.source_dir, join(build_dir, test_id), T, 00510 toolchain, test.dependencies, clean=clean_project_options, 00511 verbose=self.opts_verbose , name=project_name, macros=MACROS, 00512 inc_dirs=INC_DIRS, jobs=self.opts_jobs , report=build_report, 00513 properties=build_properties, project_id=test_id, 00514 project_description=test.get_description(), 00515 build_profile=profile, stats_depth=stats_depth) 00516 00517 except Exception as e: 00518 project_name_str = project_name if project_name is not None else test_id 00519 00520 00521 test_result = self.TEST_RESULT_FAIL 00522 00523 if isinstance(e, ToolException): 00524 print(self.logger .log_line( 00525 self.logger .LogType.ERROR, 00526 'There were errors while building project %s' % 00527 project_name_str)) 00528 test_result = self.TEST_RESULT_BUILD_FAILED 00529 elif isinstance(e, NotSupportedException): 00530 print(elf.logger.log_line( 00531 self.logger .LogType.INFO, 00532 'Project %s is not supported' % project_name_str)) 00533 test_result = self.TEST_RESULT_NOT_SUPPORTED 00534 00535 00536 # Append test results to global test summary 00537 self.test_summary .append( 00538 (test_result, target, toolchain, test_id, 00539 test.get_description(), 0, 0, '-') 00540 ) 00541 00542 # Add detailed test result to test summary structure 00543 if test_id not in self.test_summary_ext [target][toolchain]: 00544 self.test_summary_ext [target][toolchain][test_id] = [] 00545 00546 self.test_summary_ext [target][toolchain][test_id].append({ 0: { 00547 'result' : test_result, 00548 'output' : '', 00549 'target_name' : target, 00550 'target_name_unique': target, 00551 'toolchain_name' : toolchain, 00552 'id' : test_id, 00553 'description' : test.get_description(), 00554 'elapsed_time' : 0, 00555 'duration' : 0, 00556 'copy_method' : None 00557 }}) 00558 continue 00559 00560 if self.opts_only_build_tests : 00561 # With this option we are skipping testing phase 00562 continue 00563 00564 # Test duration can be increased by global value 00565 test_duration = test.duration 00566 if self.opts_extend_test_timeout is not None: 00567 test_duration += self.opts_extend_test_timeout 00568 00569 # For an automated test the duration act as a timeout after 00570 # which the test gets interrupted 00571 test_spec = self.shape_test_request (target, path, test_id, test_duration) 00572 test_loops = self.get_test_loop_count (test_id) 00573 00574 test_suite_properties['test.duration.%s.%s.%s'% (target, toolchain, test_id)] = test_duration 00575 test_suite_properties['test.loops.%s.%s.%s'% (target, toolchain, test_id)] = test_loops 00576 test_suite_properties['test.path.%s.%s.%s'% (target, toolchain, test_id)] = path 00577 00578 # read MUTs, test specification and perform tests 00579 handle_results = self.handle (test_spec, target, toolchain, test_loops=test_loops) 00580 00581 if handle_results is None: 00582 continue 00583 00584 for handle_result in handle_results: 00585 if handle_result: 00586 single_test_result, detailed_test_results = handle_result 00587 else: 00588 continue 00589 00590 # Append test results to global test summary 00591 if single_test_result is not None: 00592 self.test_summary .append(single_test_result) 00593 00594 # Add detailed test result to test summary structure 00595 if target not in self.test_summary_ext [target][toolchain]: 00596 if test_id not in self.test_summary_ext [target][toolchain]: 00597 self.test_summary_ext [target][toolchain][test_id] = [] 00598 00599 append_test_result = detailed_test_results 00600 00601 # If waterfall and consolidate-waterfall options are enabled, 00602 # only include the last test result in the report. 00603 if self.opts_waterfall_test and self.opts_consolidate_waterfall_test : 00604 append_test_result = {0: detailed_test_results[len(detailed_test_results) - 1]} 00605 00606 self.test_summary_ext [target][toolchain][test_id].append(append_test_result) 00607 00608 test_suite_properties['skipped'] = ', '.join(test_suite_properties['skipped']) 00609 self.test_suite_properties_ext [target][toolchain] = test_suite_properties 00610 00611 q.put(target + '_'.join(toolchains)) 00612 return 00613 00614 def execute(self): 00615 clean = self.test_spec .get('clean', False) 00616 test_ids = self.test_spec .get('test_ids', []) 00617 q = Queue() 00618 00619 # Generate seed for shuffle if seed is not provided in 00620 self.shuffle_random_seed = round(random.random(), self.SHUFFLE_SEED_ROUND ) 00621 if self.opts_shuffle_test_seed is not None and self.is_shuffle_seed_float (): 00622 self.shuffle_random_seed = round(float(self.opts_shuffle_test_seed ), self.SHUFFLE_SEED_ROUND ) 00623 00624 00625 if self.opts_parallel_test_exec : 00626 ################################################################### 00627 # Experimental, parallel test execution per singletest instance. 00628 ################################################################### 00629 execute_threads = [] # Threads used to build mbed SDL, libs, test cases and execute tests 00630 # Note: We are building here in parallel for each target separately! 00631 # So we are not building the same thing multiple times and compilers 00632 # in separate threads do not collide. 00633 # Inside execute_thread_slice() function function handle() will be called to 00634 # get information about available MUTs (per target). 00635 for target, toolchains in self.test_spec ['targets'].items(): 00636 self.test_suite_properties_ext [target] = {} 00637 t = threading.Thread(target=self.execute_thread_slice , args = (q, target, toolchains, clean, test_ids, self.build_report , self.build_properties )) 00638 t.daemon = True 00639 t.start() 00640 execute_threads.append(t) 00641 00642 for t in execute_threads: 00643 q.get() # t.join() would block some threads because we should not wait in any order for thread end 00644 else: 00645 # Serialized (not parallel) test execution 00646 for target, toolchains in self.test_spec ['targets'].items(): 00647 if target not in self.test_suite_properties_ext : 00648 self.test_suite_properties_ext [target] = {} 00649 00650 self.execute_thread_slice (q, target, toolchains, clean, test_ids, self.build_report , self.build_properties ) 00651 q.get() 00652 00653 if self.db_logger : 00654 self.db_logger .reconnect(); 00655 if self.db_logger .is_connected(): 00656 self.db_logger .update_build_id_info(self.db_logger_build_id , _status_fk=self.db_logger .BUILD_ID_STATUS_COMPLETED) 00657 self.db_logger .disconnect(); 00658 00659 return self.test_summary , self.shuffle_random_seed , self.test_summary_ext , self.test_suite_properties_ext , self.build_report , self.build_properties 00660 00661 def get_valid_tests(self, test_map_keys, target, toolchain, test_ids, include_non_automated): 00662 valid_test_map_keys = [] 00663 00664 for test_id in test_map_keys: 00665 test = TEST_MAP[test_id] 00666 if self.opts_test_by_names and test_id not in self.opts_test_by_names : 00667 continue 00668 00669 if test_ids and test_id not in test_ids: 00670 continue 00671 00672 if self.opts_test_only_peripheral and not test.peripherals: 00673 if self.opts_verbose_skipped_tests : 00674 print(self.logger .log_line( 00675 self.logger .LogType.INFO, 00676 'Common test skipped for target %s' % target)) 00677 continue 00678 00679 if (self.opts_peripheral_by_names and test.peripherals and 00680 not any((i in self.opts_peripheral_by_names ) 00681 for i in test.peripherals)): 00682 # We will skip tests not forced with -p option 00683 if self.opts_verbose_skipped_tests : 00684 print(self.logger .log_line( 00685 self.logger .LogType.INFO, 00686 'Common test skipped for target %s' % target)) 00687 continue 00688 00689 if self.opts_test_only_common and test.peripherals: 00690 if self.opts_verbose_skipped_tests : 00691 print(self.logger .log_line( 00692 self.logger .LogType.INFO, 00693 'Peripheral test skipped for target %s' % target)) 00694 continue 00695 00696 if not include_non_automated and not test.automated: 00697 if self.opts_verbose_skipped_tests : 00698 print(self.logger .log_line( 00699 self.logger .LogType.INFO, 00700 'Non automated test skipped for target %s' % target)) 00701 continue 00702 00703 if test.is_supported(target, toolchain): 00704 if test.peripherals is None and self.opts_only_build_tests : 00705 # When users are using 'build only flag' and test do not have 00706 # specified peripherals we can allow test building by default 00707 pass 00708 elif self.opts_peripheral_by_names and test_id not in self.opts_peripheral_by_names : 00709 # If we force peripheral with option -p we expect test 00710 # to pass even if peripheral is not in MUTs file. 00711 pass 00712 elif not self.is_peripherals_available (target, test.peripherals): 00713 if self.opts_verbose_skipped_tests : 00714 if test.peripherals: 00715 print(self.logger .log_line( 00716 self.logger .LogType.INFO, 00717 'Peripheral %s test skipped for target %s' % 00718 (",".join(test.peripherals), target))) 00719 else: 00720 print(self.logger .log_line( 00721 self.logger .LogType.INFO, 00722 'Test %s skipped for target %s' % 00723 (test_id, target))) 00724 continue 00725 00726 # The test has made it through all the filters, so add it to the valid tests list 00727 valid_test_map_keys.append(test_id) 00728 00729 return valid_test_map_keys 00730 00731 def get_skipped_tests(self, all_test_map_keys, valid_test_map_keys): 00732 # NOTE: This will not preserve order 00733 return list(set(all_test_map_keys) - set(valid_test_map_keys)) 00734 00735 def generate_test_summary_by_target (self, test_summary, shuffle_seed=None): 00736 """ Prints well-formed summary with results (SQL table like) 00737 table shows text x toolchain test result matrix 00738 """ 00739 RESULT_INDEX = 0 00740 TARGET_INDEX = 1 00741 TOOLCHAIN_INDEX = 2 00742 TEST_INDEX = 3 00743 DESC_INDEX = 4 00744 00745 unique_targets = get_unique_value_from_summary(test_summary, TARGET_INDEX) 00746 unique_tests = get_unique_value_from_summary(test_summary, TEST_INDEX) 00747 unique_test_desc = get_unique_value_from_summary_ext(test_summary, TEST_INDEX, DESC_INDEX) 00748 unique_toolchains = get_unique_value_from_summary(test_summary, TOOLCHAIN_INDEX) 00749 00750 result = "Test summary:\n" 00751 for target in unique_targets: 00752 result_dict = {} # test : { toolchain : result } 00753 unique_target_toolchains = [] 00754 for test in test_summary: 00755 if test[TARGET_INDEX] == target: 00756 if test[TOOLCHAIN_INDEX] not in unique_target_toolchains: 00757 unique_target_toolchains.append(test[TOOLCHAIN_INDEX]) 00758 if test[TEST_INDEX] not in result_dict: 00759 result_dict[test[TEST_INDEX]] = {} 00760 result_dict[test[TEST_INDEX]][test[TOOLCHAIN_INDEX]] = test[RESULT_INDEX] 00761 00762 pt_cols = ["Target", "Test ID", "Test Description"] + unique_target_toolchains 00763 pt = PrettyTable(pt_cols) 00764 for col in pt_cols: 00765 pt.align[col] = "l" 00766 pt.padding_width = 1 # One space between column edges and contents (default) 00767 00768 for test in unique_tests: 00769 if test in result_dict: 00770 test_results = result_dict[test] 00771 if test in unique_test_desc: 00772 row = [target, test, unique_test_desc[test]] 00773 for toolchain in unique_toolchains: 00774 if toolchain in test_results: 00775 row.append(test_results[toolchain]) 00776 pt.add_row(row) 00777 result += pt.get_string() 00778 shuffle_seed_text = "Shuffle Seed: %.*f"% (self.SHUFFLE_SEED_ROUND , 00779 shuffle_seed if shuffle_seed else self.shuffle_random_seed ) 00780 result += "\n%s"% (shuffle_seed_text if self.opts_shuffle_test_order else '') 00781 return result 00782 00783 def generate_test_summary (self, test_summary, shuffle_seed=None): 00784 """ Prints well-formed summary with results (SQL table like) 00785 table shows target x test results matrix across 00786 """ 00787 success_code = 0 # Success code that can be leter returned to 00788 result = "Test summary:\n" 00789 # Pretty table package is used to print results 00790 pt = PrettyTable(["Result", "Target", "Toolchain", "Test ID", "Test Description", 00791 "Elapsed Time (sec)", "Timeout (sec)", "Loops"]) 00792 pt.align["Result"] = "l" # Left align 00793 pt.align["Target"] = "l" # Left align 00794 pt.align["Toolchain"] = "l" # Left align 00795 pt.align["Test ID"] = "l" # Left align 00796 pt.align["Test Description"] = "l" # Left align 00797 pt.padding_width = 1 # One space between column edges and contents (default) 00798 00799 result_dict = {self.TEST_RESULT_OK : 0, 00800 self.TEST_RESULT_FAIL : 0, 00801 self.TEST_RESULT_ERROR : 0, 00802 self.TEST_RESULT_UNDEF : 0, 00803 self.TEST_RESULT_IOERR_COPY : 0, 00804 self.TEST_RESULT_IOERR_DISK : 0, 00805 self.TEST_RESULT_IOERR_SERIAL : 0, 00806 self.TEST_RESULT_NO_IMAGE : 0, 00807 self.TEST_RESULT_TIMEOUT : 0, 00808 self.TEST_RESULT_MBED_ASSERT : 0, 00809 self.TEST_RESULT_BUILD_FAILED : 0, 00810 self.TEST_RESULT_NOT_SUPPORTED : 0 00811 } 00812 00813 for test in test_summary: 00814 if test[0] in result_dict: 00815 result_dict[test[0]] += 1 00816 pt.add_row(test) 00817 result += pt.get_string() 00818 result += "\n" 00819 00820 # Print result count 00821 result += "Result: " + ' / '.join(['%s %s' % (value, key) for (key, value) in {k: v for k, v in result_dict.items() if v != 0}.items()]) 00822 shuffle_seed_text = "Shuffle Seed: %.*f\n"% (self.SHUFFLE_SEED_ROUND , 00823 shuffle_seed if shuffle_seed else self.shuffle_random_seed ) 00824 result += "\n%s"% (shuffle_seed_text if self.opts_shuffle_test_order else '') 00825 return result 00826 00827 def test_loop_list_to_dict (self, test_loops_str): 00828 """ Transforms test_id=X,test_id=X,test_id=X into dictionary {test_id : test_id_loops_count} 00829 """ 00830 result = {} 00831 if test_loops_str: 00832 test_loops = test_loops_str 00833 for test_loop in test_loops: 00834 test_loop_count = test_loop.split('=') 00835 if len(test_loop_count) == 2: 00836 _test_id, _test_loops = test_loop_count 00837 try: 00838 _test_loops = int(_test_loops) 00839 except: 00840 continue 00841 result[_test_id] = _test_loops 00842 return result 00843 00844 def get_test_loop_count (self, test_id): 00845 """ This function returns no. of loops per test (deducted by test_id_. 00846 If test is not in list of redefined loop counts it will use default value. 00847 """ 00848 result = self.GLOBAL_LOOPS_COUNT 00849 if test_id in self.TEST_LOOPS_DICT : 00850 result = self.TEST_LOOPS_DICT [test_id] 00851 return result 00852 00853 def delete_file (self, file_path): 00854 """ Remove file from the system 00855 """ 00856 result = True 00857 resutl_msg = "" 00858 try: 00859 os.remove(file_path) 00860 except Exception as e: 00861 resutl_msg = e 00862 result = False 00863 return result, resutl_msg 00864 00865 def handle_mut (self, mut, data, target_name, toolchain_name, test_loops=1): 00866 """ Test is being invoked for given MUT. 00867 """ 00868 # Get test information, image and test timeout 00869 test_id = data['test_id'] 00870 test = TEST_MAP[test_id] 00871 test_description = TEST_MAP[test_id].get_description() 00872 image = data["image"] 00873 duration = data.get("duration", 10) 00874 00875 if mut is None: 00876 print("Error: No Mbed available: MUT[%s]" % data['mcu']) 00877 return None 00878 00879 mcu = mut['mcu'] 00880 copy_method = mut.get('copy_method') # Available board configuration selection e.g. core selection etc. 00881 00882 if self.db_logger : 00883 self.db_logger .reconnect() 00884 00885 selected_copy_method = self.opts_copy_method if copy_method is None else copy_method 00886 00887 # Tests can be looped so test results must be stored for the same test 00888 test_all_result = [] 00889 # Test results for one test ran few times 00890 detailed_test_results = {} # { Loop_number: { results ... } } 00891 00892 for test_index in range(test_loops): 00893 00894 # If mbedls is available and we are auto detecting MUT info, 00895 # update MUT info (mounting may changed) 00896 if get_module_avail('mbed_lstools') and self.opts_auto_detect : 00897 platform_name_filter = [mcu] 00898 muts_list = {} 00899 found = False 00900 00901 for i in range(0, 60): 00902 print('Looking for %s with MBEDLS' % mcu) 00903 muts_list = get_autodetected_MUTS_list(platform_name_filter=platform_name_filter) 00904 00905 if 1 not in muts_list: 00906 sleep(3) 00907 else: 00908 found = True 00909 break 00910 00911 if not found: 00912 print("Error: mbed not found with MBEDLS: %s" % data['mcu']) 00913 return None 00914 else: 00915 mut = muts_list[1] 00916 00917 disk = mut.get('disk') 00918 port = mut.get('port') 00919 00920 if disk is None or port is None: 00921 return None 00922 00923 target_by_mcu = TARGET_MAP[mut['mcu']] 00924 target_name_unique = mut['mcu_unique'] if 'mcu_unique' in mut else mut['mcu'] 00925 # Some extra stuff can be declared in MUTs structure 00926 reset_type = mut.get('reset_type') # reboot.txt, reset.txt, shutdown.txt 00927 reset_tout = mut.get('reset_tout') # COPY_IMAGE -> RESET_PROC -> SLEEP(RESET_TOUT) 00928 00929 # When the build and test system were separate, this was relative to a 00930 # base network folder base path: join(NETWORK_BASE_PATH, ) 00931 image_path = image 00932 00933 # Host test execution 00934 start_host_exec_time = time() 00935 00936 single_test_result = self.TEST_RESULT_UNDEF # single test run result 00937 _copy_method = selected_copy_method 00938 00939 if not exists(image_path): 00940 single_test_result = self.TEST_RESULT_NO_IMAGE 00941 elapsed_time = 0 00942 single_test_output = self.logger .log_line(self.logger .LogType.ERROR, 'Image file does not exist: %s'% image_path) 00943 print(single_test_output) 00944 else: 00945 # Host test execution 00946 start_host_exec_time = time() 00947 00948 host_test_verbose = self.opts_verbose_test_result_only or self.opts_verbose 00949 host_test_reset = self.opts_mut_reset_type if reset_type is None else reset_type 00950 host_test_result = self.run_host_test (test.host_test, 00951 image_path, disk, port, duration, 00952 micro=target_name, 00953 verbose=host_test_verbose, 00954 reset=host_test_reset, 00955 reset_tout=reset_tout, 00956 copy_method=selected_copy_method, 00957 program_cycle_s=target_by_mcu.program_cycle_s) 00958 single_test_result, single_test_output, single_testduration, single_timeout = host_test_result 00959 00960 # Store test result 00961 test_all_result.append(single_test_result) 00962 total_elapsed_time = time() - start_host_exec_time # Test time with copy (flashing) / reset 00963 elapsed_time = single_testduration # TIme of single test case execution after reset 00964 00965 detailed_test_results[test_index] = { 00966 'result' : single_test_result, 00967 'output' : single_test_output, 00968 'target_name' : target_name, 00969 'target_name_unique' : target_name_unique, 00970 'toolchain_name' : toolchain_name, 00971 'id' : test_id, 00972 'description' : test_description, 00973 'elapsed_time' : round(elapsed_time, 2), 00974 'duration' : single_timeout, 00975 'copy_method' : _copy_method, 00976 } 00977 00978 print(self.print_test_result ( 00979 single_test_result, target_name_unique, toolchain_name, test_id, 00980 test_description, elapsed_time, single_timeout)) 00981 00982 # Update database entries for ongoing test 00983 if self.db_logger and self.db_logger .is_connected(): 00984 test_type = 'SingleTest' 00985 self.db_logger .insert_test_entry(self.db_logger_build_id , 00986 target_name, 00987 toolchain_name, 00988 test_type, 00989 test_id, 00990 single_test_result, 00991 single_test_output, 00992 elapsed_time, 00993 single_timeout, 00994 test_index) 00995 00996 # If we perform waterfall test we test until we get OK and we stop testing 00997 if self.opts_waterfall_test and single_test_result == self.TEST_RESULT_OK : 00998 break 00999 01000 if self.db_logger : 01001 self.db_logger .disconnect() 01002 01003 return (self.shape_global_test_loop_result (test_all_result, self.opts_waterfall_test and self.opts_consolidate_waterfall_test ), 01004 target_name_unique, 01005 toolchain_name, 01006 test_id, 01007 test_description, 01008 round(elapsed_time, 2), 01009 single_timeout, 01010 self.shape_test_loop_ok_result_count (test_all_result)), detailed_test_results 01011 01012 def handle (self, test_spec, target_name, toolchain_name, test_loops=1): 01013 """ Function determines MUT's mbed disk/port and copies binary to 01014 target. 01015 """ 01016 handle_results = [] 01017 data = json.loads(test_spec) 01018 01019 # Find a suitable MUT: 01020 mut = None 01021 for id, m in self.muts .items(): 01022 if m['mcu'] == data['mcu']: 01023 mut = m 01024 handle_result = self.handle_mut (mut, data, target_name, toolchain_name, test_loops=test_loops) 01025 handle_results.append(handle_result) 01026 01027 return handle_results 01028 01029 def print_test_result (self, test_result, target_name, toolchain_name, 01030 test_id, test_description, elapsed_time, duration): 01031 """ Use specific convention to print test result and related data 01032 """ 01033 tokens = [] 01034 tokens.append("TargetTest") 01035 tokens.append(target_name) 01036 tokens.append(toolchain_name) 01037 tokens.append(test_id) 01038 tokens.append(test_description) 01039 separator = "::" 01040 time_info = " in %.2f of %d sec" % (round(elapsed_time, 2), duration) 01041 result = separator.join(tokens) + " [" + test_result +"]" + time_info 01042 return Fore.MAGENTA + result + Fore.RESET 01043 01044 def shape_test_loop_ok_result_count (self, test_all_result): 01045 """ Reformats list of results to simple string 01046 """ 01047 test_loop_count = len(test_all_result) 01048 test_loop_ok_result = test_all_result.count(self.TEST_RESULT_OK ) 01049 return "%d/%d"% (test_loop_ok_result, test_loop_count) 01050 01051 def shape_global_test_loop_result (self, test_all_result, waterfall_and_consolidate): 01052 """ Reformats list of results to simple string 01053 """ 01054 result = self.TEST_RESULT_FAIL 01055 01056 if all(test_all_result[0] == res for res in test_all_result): 01057 result = test_all_result[0] 01058 elif waterfall_and_consolidate and any(res == self.TEST_RESULT_OK for res in test_all_result): 01059 result = self.TEST_RESULT_OK 01060 01061 return result 01062 01063 def run_host_test (self, name, image_path, disk, port, duration, 01064 micro=None, reset=None, reset_tout=None, 01065 verbose=False, copy_method=None, program_cycle_s=None): 01066 """ Function creates new process with host test configured with particular test case. 01067 Function also is pooling for serial port activity from process to catch all data 01068 printed by test runner and host test during test execution 01069 """ 01070 01071 def get_char_from_queue(obs): 01072 """ Get character from queue safe way 01073 """ 01074 try: 01075 c = obs.queue.get(block=True, timeout=0.5) 01076 except Empty: 01077 c = None 01078 return c 01079 01080 def filter_queue_char(c): 01081 """ Filters out non ASCII characters from serial port 01082 """ 01083 if ord(c) not in range(128): 01084 c = ' ' 01085 return c 01086 01087 def get_test_result(output): 01088 """ Parse test 'output' data 01089 """ 01090 result = self.TEST_RESULT_TIMEOUT 01091 for line in "".join(output).splitlines(): 01092 search_result = self.RE_DETECT_TESTCASE_RESULT .search(line) 01093 if search_result and len(search_result.groups()): 01094 result = self.TEST_RESULT_MAPPING [search_result.groups(0)[0]] 01095 break 01096 return result 01097 01098 def get_auto_property_value(property_name, line): 01099 """ Scans auto detection line from MUT and returns scanned parameter 'property_name' 01100 Returns string 01101 """ 01102 result = None 01103 if re.search("HOST: Property '%s'"% property_name, line) is not None: 01104 property = re.search("HOST: Property '%s' = '([\w\d _]+)'"% property_name, line) 01105 if property is not None and len(property.groups()) == 1: 01106 result = property.groups()[0] 01107 return result 01108 01109 cmd = ["python", 01110 '%s.py'% name, 01111 '-d', disk, 01112 '-f', '"%s"'% image_path, 01113 '-p', port, 01114 '-t', str(duration), 01115 '-C', str(program_cycle_s)] 01116 01117 if get_module_avail('mbed_lstools') and self.opts_auto_detect : 01118 cmd += ['--auto'] 01119 01120 # Add extra parameters to host_test 01121 if copy_method is not None: 01122 cmd += ["-c", copy_method] 01123 if micro is not None: 01124 cmd += ["-m", micro] 01125 if reset is not None: 01126 cmd += ["-r", reset] 01127 if reset_tout is not None: 01128 cmd += ["-R", str(reset_tout)] 01129 01130 if verbose: 01131 print(Fore.MAGENTA + "Executing '" + " ".join(cmd) + "'" + Fore.RESET) 01132 print("Test::Output::Start") 01133 01134 proc = Popen(cmd, stdout=PIPE, cwd=HOST_TESTS) 01135 obs = ProcessObserver(proc) 01136 update_once_flag = {} # Stores flags checking if some auto-parameter was already set 01137 line = '' 01138 output = [] 01139 start_time = time() 01140 while (time() - start_time) < (2 * duration): 01141 c = get_char_from_queue(obs) 01142 if c: 01143 if verbose: 01144 sys.stdout.write(c) 01145 c = filter_queue_char(c) 01146 output.append(c) 01147 # Give the mbed under test a way to communicate the end of the test 01148 if c in ['\n', '\r']: 01149 01150 # Checking for auto-detection information from the test about MUT reset moment 01151 if 'reset_target' not in update_once_flag and "HOST: Reset target..." in line: 01152 # We will update this marker only once to prevent multiple time resets 01153 update_once_flag['reset_target'] = True 01154 start_time = time() 01155 01156 # Checking for auto-detection information from the test about timeout 01157 auto_timeout_val = get_auto_property_value('timeout', line) 01158 if 'timeout' not in update_once_flag and auto_timeout_val is not None: 01159 # We will update this marker only once to prevent multiple time resets 01160 update_once_flag['timeout'] = True 01161 duration = int(auto_timeout_val) 01162 01163 # Detect mbed assert: 01164 if 'mbed assertation failed: ' in line: 01165 output.append('{{mbed_assert}}') 01166 break 01167 01168 # Check for test end 01169 if '{end}' in line: 01170 break 01171 line = '' 01172 else: 01173 line += c 01174 end_time = time() 01175 testcase_duration = end_time - start_time # Test case duration from reset to {end} 01176 01177 c = get_char_from_queue(obs) 01178 01179 if c: 01180 if verbose: 01181 sys.stdout.write(c) 01182 c = filter_queue_char(c) 01183 output.append(c) 01184 01185 if verbose: 01186 print("Test::Output::Finish") 01187 # Stop test process 01188 obs.stop() 01189 01190 result = get_test_result(output) 01191 return (result, "".join(output), testcase_duration, duration) 01192 01193 def is_peripherals_available (self, target_mcu_name, peripherals=None): 01194 """ Checks if specified target should run specific peripheral test case defined in MUTs file 01195 """ 01196 if peripherals is not None: 01197 peripherals = set(peripherals) 01198 for id, mut in self.muts .items(): 01199 # Target MCU name check 01200 if mut["mcu"] != target_mcu_name: 01201 continue 01202 # Peripherals check 01203 if peripherals is not None: 01204 if 'peripherals' not in mut: 01205 continue 01206 if not peripherals.issubset(set(mut['peripherals'])): 01207 continue 01208 return True 01209 return False 01210 01211 def shape_test_request (self, mcu, image_path, test_id, duration=10): 01212 """ Function prepares JSON structure describing test specification 01213 """ 01214 test_spec = { 01215 "mcu": mcu, 01216 "image": image_path, 01217 "duration": duration, 01218 "test_id": test_id, 01219 } 01220 return json.dumps(test_spec) 01221 01222 01223 def get_unique_value_from_summary (test_summary, index): 01224 """ Gets list of unique target names 01225 """ 01226 result = [] 01227 for test in test_summary: 01228 target_name = test[index] 01229 if target_name not in result: 01230 result.append(target_name) 01231 return sorted(result) 01232 01233 01234 def get_unique_value_from_summary_ext (test_summary, index_key, index_val): 01235 """ Gets list of unique target names and return dictionary 01236 """ 01237 result = {} 01238 for test in test_summary: 01239 key = test[index_key] 01240 val = test[index_val] 01241 if key not in result: 01242 result[key] = val 01243 return result 01244 01245 01246 def show_json_file_format_error (json_spec_filename, line, column): 01247 """ Prints JSON broken content 01248 """ 01249 with open(json_spec_filename) as data_file: 01250 line_no = 1 01251 for json_line in data_file: 01252 if line_no + 5 >= line: # Print last few lines before error 01253 print('Line %d:\t'%line_no + json_line) 01254 if line_no == line: 01255 print('%s\t%s^' (' ' * len('Line %d:' % line_no), 01256 '-' * (column - 1))) 01257 break 01258 line_no += 1 01259 01260 01261 def json_format_error_defect_pos (json_error_msg): 01262 """ Gets first error line and column in JSON file format. 01263 Parsed from exception thrown by json.loads() string 01264 """ 01265 result = None 01266 line, column = 0, 0 01267 # Line value search 01268 line_search = re.search('line [0-9]+', json_error_msg) 01269 if line_search is not None: 01270 ls = line_search.group().split(' ') 01271 if len(ls) == 2: 01272 line = int(ls[1]) 01273 # Column position search 01274 column_search = re.search('column [0-9]+', json_error_msg) 01275 if column_search is not None: 01276 cs = column_search.group().split(' ') 01277 if len(cs) == 2: 01278 column = int(cs[1]) 01279 result = [line, column] 01280 return result 01281 01282 01283 def get_json_data_from_file (json_spec_filename, verbose=False): 01284 """ Loads from file JSON formatted string to data structure 01285 """ 01286 result = None 01287 try: 01288 with open(json_spec_filename) as data_file: 01289 try: 01290 result = json.load(data_file) 01291 except ValueError as json_error_msg: 01292 result = None 01293 print('JSON file %s parsing failed. Reason: %s' % 01294 (json_spec_filename, json_error_msg)) 01295 # We can print where error occurred inside JSON file if we can parse exception msg 01296 json_format_defect_pos = json_format_error_defect_pos(str(json_error_msg)) 01297 if json_format_defect_pos is not None: 01298 line = json_format_defect_pos[0] 01299 column = json_format_defect_pos[1] 01300 print() 01301 show_json_file_format_error(json_spec_filename, line, column) 01302 01303 except IOError as fileopen_error_msg: 01304 print('JSON file %s not opened. Reason: %s\n'% 01305 (json_spec_filename, fileopen_error_msg)) 01306 if verbose and result: 01307 pp = pprint.PrettyPrinter(indent=4) 01308 pp.pprint(result) 01309 return result 01310 01311 01312 def print_muts_configuration_from_json (json_data, join_delim=", ", platform_filter=None): 01313 """ Prints MUTs configuration passed to test script for verboseness 01314 """ 01315 muts_info_cols = [] 01316 # We need to check all unique properties for each defined MUT 01317 for k in json_data: 01318 mut_info = json_data[k] 01319 for mut_property in mut_info: 01320 if mut_property not in muts_info_cols: 01321 muts_info_cols.append(mut_property) 01322 01323 # Prepare pretty table object to display all MUTs 01324 pt_cols = ["index"] + muts_info_cols 01325 pt = PrettyTable(pt_cols) 01326 for col in pt_cols: 01327 pt.align[col] = "l" 01328 01329 # Add rows to pretty print object 01330 for k in json_data: 01331 row = [k] 01332 mut_info = json_data[k] 01333 01334 add_row = True 01335 if platform_filter and 'mcu' in mut_info: 01336 add_row = re.search(platform_filter, mut_info['mcu']) is not None 01337 if add_row: 01338 for col in muts_info_cols: 01339 cell_val = mut_info[col] if col in mut_info else None 01340 if isinstance(cell_val, list): 01341 cell_val = join_delim.join(cell_val) 01342 row.append(cell_val) 01343 pt.add_row(row) 01344 return pt.get_string() 01345 01346 01347 def print_test_configuration_from_json (json_data, join_delim=", "): 01348 """ Prints test specification configuration passed to test script for verboseness 01349 """ 01350 toolchains_info_cols = [] 01351 # We need to check all toolchains for each device 01352 for k in json_data: 01353 # k should be 'targets' 01354 targets = json_data[k] 01355 for target in targets: 01356 toolchains = targets[target] 01357 for toolchain in toolchains: 01358 if toolchain not in toolchains_info_cols: 01359 toolchains_info_cols.append(toolchain) 01360 01361 # Prepare pretty table object to display test specification 01362 pt_cols = ["mcu"] + sorted(toolchains_info_cols) 01363 pt = PrettyTable(pt_cols) 01364 for col in pt_cols: 01365 pt.align[col] = "l" 01366 01367 # { target : [conflicted toolchains] } 01368 toolchain_conflicts = {} 01369 toolchain_path_conflicts = [] 01370 for k in json_data: 01371 # k should be 'targets' 01372 targets = json_data[k] 01373 for target in targets: 01374 target_supported_toolchains = get_target_supported_toolchains(target) 01375 if not target_supported_toolchains: 01376 target_supported_toolchains = [] 01377 target_name = target if target in TARGET_MAP else "%s*"% target 01378 row = [target_name] 01379 toolchains = targets[target] 01380 01381 for toolchain in sorted(toolchains_info_cols): 01382 # Check for conflicts: target vs toolchain 01383 conflict = False 01384 conflict_path = False 01385 if toolchain in toolchains: 01386 if toolchain not in target_supported_toolchains: 01387 conflict = True 01388 if target not in toolchain_conflicts: 01389 toolchain_conflicts[target] = [] 01390 toolchain_conflicts[target].append(toolchain) 01391 # Add marker inside table about target usage / conflict 01392 cell_val = 'Yes' if toolchain in toolchains else '-' 01393 if conflict: 01394 cell_val += '*' 01395 # Check for conflicts: toolchain vs toolchain path 01396 if toolchain in TOOLCHAIN_PATHS: 01397 toolchain_path = TOOLCHAIN_PATHS[toolchain] 01398 if not os.path.isdir(toolchain_path): 01399 conflict_path = True 01400 if toolchain not in toolchain_path_conflicts: 01401 toolchain_path_conflicts.append(toolchain) 01402 if conflict_path: 01403 cell_val += '#' 01404 row.append(cell_val) 01405 pt.add_row(row) 01406 01407 # generate result string 01408 result = pt.get_string() # Test specification table 01409 if toolchain_conflicts or toolchain_path_conflicts: 01410 result += "\n" 01411 result += "Toolchain conflicts:\n" 01412 for target in toolchain_conflicts: 01413 if target not in TARGET_MAP: 01414 result += "\t* Target %s unknown\n"% (target) 01415 conflict_target_list = join_delim.join(toolchain_conflicts[target]) 01416 sufix = 's' if len(toolchain_conflicts[target]) > 1 else '' 01417 result += "\t* Target %s does not support %s toolchain%s\n"% (target, conflict_target_list, sufix) 01418 01419 for toolchain in toolchain_path_conflicts: 01420 # Let's check toolchain configuration 01421 if toolchain in TOOLCHAIN_PATHS: 01422 toolchain_path = TOOLCHAIN_PATHS[toolchain] 01423 if not os.path.isdir(toolchain_path): 01424 result += "\t# Toolchain %s path not found: %s\n"% (toolchain, toolchain_path) 01425 return result 01426 01427 01428 def get_avail_tests_summary_table (cols=None, result_summary=True, join_delim=',',platform_filter=None): 01429 """ Generates table summary with all test cases and additional test cases 01430 information using pretty print functionality. Allows test suite user to 01431 see test cases 01432 """ 01433 # get all unique test ID prefixes 01434 unique_test_id = [] 01435 for test in TESTS: 01436 split = test['id'].split('_')[:-1] 01437 test_id_prefix = '_'.join(split) 01438 if test_id_prefix not in unique_test_id: 01439 unique_test_id.append(test_id_prefix) 01440 unique_test_id.sort() 01441 counter_dict_test_id_types = dict((t, 0) for t in unique_test_id) 01442 counter_dict_test_id_types_all = dict((t, 0) for t in unique_test_id) 01443 01444 test_properties = ['id', 01445 'automated', 01446 'description', 01447 'peripherals', 01448 'host_test', 01449 'duration'] if cols is None else cols 01450 01451 # All tests status table print 01452 pt = PrettyTable(test_properties) 01453 for col in test_properties: 01454 pt.align[col] = "l" 01455 pt.align['duration'] = "r" 01456 01457 counter_all = 0 01458 counter_automated = 0 01459 pt.padding_width = 1 # One space between column edges and contents (default) 01460 01461 for test_id in sorted(TEST_MAP.keys()): 01462 if platform_filter is not None: 01463 # FIlter out platforms using regex 01464 if re.search(platform_filter, test_id) is None: 01465 continue 01466 row = [] 01467 test = TEST_MAP[test_id] 01468 split = test_id.split('_')[:-1] 01469 test_id_prefix = '_'.join(split) 01470 01471 for col in test_properties: 01472 col_value = test[col] 01473 if isinstance(test[col], list): 01474 col_value = join_delim.join(test[col]) 01475 elif test[col] == None: 01476 col_value = "-" 01477 01478 row.append(col_value) 01479 if test['automated'] == True: 01480 counter_dict_test_id_types[test_id_prefix] += 1 01481 counter_automated += 1 01482 pt.add_row(row) 01483 # Update counters 01484 counter_all += 1 01485 counter_dict_test_id_types_all[test_id_prefix] += 1 01486 result = pt.get_string() 01487 result += "\n\n" 01488 01489 if result_summary and not platform_filter: 01490 # Automation result summary 01491 test_id_cols = ['automated', 'all', 'percent [%]', 'progress'] 01492 pt = PrettyTable(test_id_cols) 01493 pt.align['automated'] = "r" 01494 pt.align['all'] = "r" 01495 pt.align['percent [%]'] = "r" 01496 01497 percent_progress = round(100.0 * counter_automated / float(counter_all), 1) 01498 str_progress = progress_bar(percent_progress, 75) 01499 pt.add_row([counter_automated, counter_all, percent_progress, str_progress]) 01500 result += "Automation coverage:\n" 01501 result += pt.get_string() 01502 result += "\n\n" 01503 01504 # Test automation coverage table print 01505 test_id_cols = ['id', 'automated', 'all', 'percent [%]', 'progress'] 01506 pt = PrettyTable(test_id_cols) 01507 pt.align['id'] = "l" 01508 pt.align['automated'] = "r" 01509 pt.align['all'] = "r" 01510 pt.align['percent [%]'] = "r" 01511 for unique_id in unique_test_id: 01512 # print "\t\t%s: %d / %d" % (unique_id, counter_dict_test_id_types[unique_id], counter_dict_test_id_types_all[unique_id]) 01513 percent_progress = round(100.0 * counter_dict_test_id_types[unique_id] / float(counter_dict_test_id_types_all[unique_id]), 1) 01514 str_progress = progress_bar(percent_progress, 75) 01515 row = [unique_id, 01516 counter_dict_test_id_types[unique_id], 01517 counter_dict_test_id_types_all[unique_id], 01518 percent_progress, 01519 "[" + str_progress + "]"] 01520 pt.add_row(row) 01521 result += "Test automation coverage:\n" 01522 result += pt.get_string() 01523 result += "\n\n" 01524 return result 01525 01526 01527 def progress_bar (percent_progress, saturation=0): 01528 """ This function creates progress bar with optional simple saturation mark 01529 """ 01530 step = int(percent_progress / 2) # Scale by to (scale: 1 - 50) 01531 str_progress = '#' * step + '.' * int(50 - step) 01532 c = '!' if str_progress[38] == '.' else '|' 01533 if saturation > 0: 01534 saturation = saturation / 2 01535 str_progress = str_progress[:saturation] + c + str_progress[saturation:] 01536 return str_progress 01537 01538 01539 def singletest_in_cli_mode (single_test): 01540 """ Runs SingleTestRunner object in CLI (Command line interface) mode 01541 01542 @return returns success code (0 == success) for building and running tests 01543 """ 01544 start = time() 01545 # Execute tests depending on options and filter applied 01546 test_summary, shuffle_seed, test_summary_ext, test_suite_properties_ext, build_report, build_properties = single_test.execute() 01547 elapsed_time = time() - start 01548 01549 # Human readable summary 01550 if not single_test.opts_suppress_summary: 01551 # prints well-formed summary with results (SQL table like) 01552 print(single_test.generate_test_summary(test_summary, shuffle_seed)) 01553 if single_test.opts_test_x_toolchain_summary: 01554 # prints well-formed summary with results (SQL table like) 01555 # table shows text x toolchain test result matrix 01556 print(single_test.generate_test_summary_by_target(test_summary, 01557 shuffle_seed)) 01558 01559 print("Completed in %.2f sec" % elapsed_time) 01560 print 01561 # Write summary of the builds 01562 01563 print_report_exporter = ReportExporter(ResultExporterType.PRINT, package="build") 01564 status = print_report_exporter.report(build_report) 01565 01566 # Store extra reports in files 01567 if single_test.opts_report_html_file_name: 01568 # Export results in form of HTML report to separate file 01569 report_exporter = ReportExporter(ResultExporterType.HTML) 01570 report_exporter.report_to_file(test_summary_ext, single_test.opts_report_html_file_name, test_suite_properties=test_suite_properties_ext) 01571 if single_test.opts_report_junit_file_name: 01572 # Export results in form of JUnit XML report to separate file 01573 report_exporter = ReportExporter(ResultExporterType.JUNIT) 01574 report_exporter.report_to_file(test_summary_ext, single_test.opts_report_junit_file_name, test_suite_properties=test_suite_properties_ext) 01575 if single_test.opts_report_text_file_name: 01576 # Export results in form of a text file 01577 report_exporter = ReportExporter(ResultExporterType.TEXT) 01578 report_exporter.report_to_file(test_summary_ext, single_test.opts_report_text_file_name, test_suite_properties=test_suite_properties_ext) 01579 if single_test.opts_report_build_file_name: 01580 # Export build results as html report to sparate file 01581 report_exporter = ReportExporter(ResultExporterType.JUNIT, package="build") 01582 report_exporter.report_to_file(build_report, single_test.opts_report_build_file_name, test_suite_properties=build_properties) 01583 01584 # Returns True if no build failures of the test projects or their dependencies 01585 return status 01586 01587 class TestLogger (): 01588 """ Super-class for logging and printing ongoing events for test suite pass 01589 """ 01590 def __init__ (self, store_log=True): 01591 """ We can control if logger actually stores log in memory 01592 or just handled all log entries immediately 01593 """ 01594 self.log = [] 01595 self.log_to_file = False 01596 self.log_file_name = None 01597 self.store_log = store_log 01598 01599 self.LogType = construct_enum(INFO='Info', 01600 WARN='Warning', 01601 NOTIF='Notification', 01602 ERROR='Error', 01603 EXCEPT='Exception') 01604 01605 self.LogToFileAttr = construct_enum(CREATE=1, # Create or overwrite existing log file 01606 APPEND=2) # Append to existing log file 01607 01608 def log_line (self, LogType, log_line, timestamp=True, line_delim='\n'): 01609 """ Log one line of text 01610 """ 01611 log_timestamp = time() 01612 log_entry = {'log_type' : LogType, 01613 'log_timestamp' : log_timestamp, 01614 'log_line' : log_line, 01615 '_future' : None 01616 } 01617 # Store log in memory 01618 if self.store_log : 01619 self.log .append(log_entry) 01620 return log_entry 01621 01622 01623 class CLITestLogger (TestLogger ): 01624 """ Logger used with CLI (Command line interface) test suite. Logs on screen and to file if needed 01625 """ 01626 def __init__(self, store_log=True, file_name=None): 01627 TestLogger.__init__(self) 01628 self.log_file_name = file_name 01629 #self.TIMESTAMP_FORMAT = '%y-%m-%d %H:%M:%S' # Full date and time 01630 self.TIMESTAMP_FORMAT = '%H:%M:%S' # Time only 01631 01632 def log_print (self, log_entry, timestamp=True): 01633 """ Prints on screen formatted log entry 01634 """ 01635 ts = log_entry['log_timestamp'] 01636 timestamp_str = datetime.datetime.fromtimestamp(ts).strftime("[%s] "% self.TIMESTAMP_FORMAT ) if timestamp else '' 01637 log_line_str = "%(log_type)s: %(log_line)s"% (log_entry) 01638 return timestamp_str + log_line_str 01639 01640 def log_line (self, LogType, log_line, timestamp=True, line_delim='\n'): 01641 """ Logs line, if log file output was specified log line will be appended 01642 at the end of log file 01643 """ 01644 log_entry = TestLogger.log_line(self, LogType, log_line) 01645 log_line_str = self.log_print (log_entry, timestamp) 01646 if self.log_file_name is not None: 01647 try: 01648 with open(self.log_file_name , 'a') as f: 01649 f.write(log_line_str + line_delim) 01650 except IOError: 01651 pass 01652 return log_line_str 01653 01654 01655 def factory_db_logger (db_url): 01656 """ Factory database driver depending on database type supplied in database connection string db_url 01657 """ 01658 if db_url is not None: 01659 from tools.test_mysql import MySQLDBAccess 01660 connection_info = BaseDBAccess().parse_db_connection_string(db_url) 01661 if connection_info is not None: 01662 (db_type, username, password, host, db_name) = BaseDBAccess().parse_db_connection_string(db_url) 01663 if db_type == 'mysql': 01664 return MySQLDBAccess() 01665 return None 01666 01667 01668 def detect_database_verbose (db_url): 01669 """ uses verbose mode (prints) database detection sequence to check it database connection string is valid 01670 """ 01671 result = BaseDBAccess().parse_db_connection_string(db_url) 01672 if result is not None: 01673 # Parsing passed 01674 (db_type, username, password, host, db_name) = result 01675 #print "DB type '%s', user name '%s', password '%s', host '%s', db name '%s'"% result 01676 # Let's try to connect 01677 db_ = factory_db_logger(db_url) 01678 if db_ is not None: 01679 print("Connecting to database '%s'..." % db_url) 01680 db_.connect(host, username, password, db_name) 01681 if db_.is_connected(): 01682 print("ok") 01683 print("Detecting database...") 01684 print(db_.detect_database(verbose=True)) 01685 print("Disconnecting...") 01686 db_.disconnect() 01687 print("done") 01688 else: 01689 print("Database type '%s' unknown" % db_type) 01690 else: 01691 print("Parse error: '%s' - DB Url error" % db_url) 01692 01693 01694 def get_module_avail (module_name): 01695 """ This function returns True if module_name is already imported module 01696 """ 01697 return module_name in sys.modules.keys() 01698 01699 def get_autodetected_MUTS_list(platform_name_filter=None): 01700 oldError = None 01701 if os.name == 'nt': 01702 # Disable Windows error box temporarily 01703 oldError = ctypes.windll.kernel32.SetErrorMode(1) #note that SEM_FAILCRITICALERRORS = 1 01704 01705 mbeds = mbed_lstools.create() 01706 detect_muts_list = mbeds.list_mbeds() 01707 01708 if os.name == 'nt': 01709 ctypes.windll.kernel32.SetErrorMode(oldError) 01710 01711 return get_autodetected_MUTS(detect_muts_list, platform_name_filter=platform_name_filter) 01712 01713 def get_autodetected_MUTS (mbeds_list, platform_name_filter=None): 01714 """ Function detects all connected to host mbed-enabled devices and generates artificial MUTS file. 01715 If function fails to auto-detect devices it will return empty dictionary. 01716 01717 if get_module_avail('mbed_lstools'): 01718 mbeds = mbed_lstools.create() 01719 mbeds_list = mbeds.list_mbeds() 01720 01721 @param mbeds_list list of mbeds captured from mbed_lstools 01722 @param platform_name You can filter 'platform_name' with list of filtered targets from 'platform_name_filter' 01723 """ 01724 result = {} # Should be in muts_all.json format 01725 # Align mbeds_list from mbed_lstools to MUT file format (JSON dictionary with muts) 01726 # mbeds_list = [{'platform_name': 'NUCLEO_F302R8', 'mount_point': 'E:', 'target_id': '07050200623B61125D5EF72A', 'serial_port': u'COM34'}] 01727 index = 1 01728 for mut in mbeds_list: 01729 # Filter the MUTS if a filter is specified 01730 01731 if platform_name_filter and not mut['platform_name'] in platform_name_filter: 01732 continue 01733 01734 # For mcu_unique - we are assigning 'platform_name_unique' value from mbedls output (if its existing) 01735 # if not we are creating our own unique value (last few chars from platform's target_id). 01736 m = {'mcu': mut['platform_name'], 01737 'mcu_unique' : mut['platform_name_unique'] if 'platform_name_unique' in mut else "%s[%s]" % (mut['platform_name'], mut['target_id'][-4:]), 01738 'port': mut['serial_port'], 01739 'disk': mut['mount_point'], 01740 'peripherals': [] # No peripheral detection 01741 } 01742 if index not in result: 01743 result[index] = {} 01744 result[index] = m 01745 index += 1 01746 return result 01747 01748 01749 def get_autodetected_TEST_SPEC (mbeds_list, 01750 use_default_toolchain=True, 01751 use_supported_toolchains=False, 01752 toolchain_filter=None, 01753 platform_name_filter=None): 01754 """ Function detects all connected to host mbed-enabled devices and generates artificial test_spec file. 01755 If function fails to auto-detect devices it will return empty 'targets' test_spec description. 01756 01757 use_default_toolchain - if True add default toolchain to test_spec 01758 use_supported_toolchains - if True add all supported toolchains to test_spec 01759 toolchain_filter - if [...list of toolchains...] add from all toolchains only those in filter to test_spec 01760 """ 01761 result = {'targets': {} } 01762 01763 for mut in mbeds_list: 01764 mcu = mut['mcu'] 01765 if platform_name_filter is None or (platform_name_filter and mut['mcu'] in platform_name_filter): 01766 if mcu in TARGET_MAP: 01767 default_toolchain = TARGET_MAP[mcu].default_toolchain 01768 supported_toolchains = TARGET_MAP[mcu].supported_toolchains 01769 01770 # Decide which toolchains should be added to test specification toolchain pool for each target 01771 toolchains = [] 01772 if use_default_toolchain: 01773 toolchains.append(default_toolchain) 01774 if use_supported_toolchains: 01775 toolchains += supported_toolchains 01776 if toolchain_filter is not None: 01777 all_toolchains = supported_toolchains + [default_toolchain] 01778 for toolchain in toolchain_filter: 01779 if toolchain in all_toolchains: 01780 toolchains.append(toolchain) 01781 01782 result['targets'][mcu] = list(set(toolchains)) 01783 return result 01784 01785 01786 def get_default_test_options_parser (): 01787 """ Get common test script options used by CLI, web services etc. 01788 """ 01789 parser = argparse.ArgumentParser() 01790 parser.add_argument('-i', '--tests', 01791 dest='test_spec_filename', 01792 metavar="FILE", 01793 type=argparse_filestring_type, 01794 help='Points to file with test specification') 01795 01796 parser.add_argument('-M', '--MUTS', 01797 dest='muts_spec_filename', 01798 metavar="FILE", 01799 type=argparse_filestring_type, 01800 help='Points to file with MUTs specification (overwrites settings.py and private_settings.py)') 01801 01802 parser.add_argument("-j", "--jobs", 01803 dest='jobs', 01804 metavar="NUMBER", 01805 type=int, 01806 help="Define number of compilation jobs. Default value is 1") 01807 01808 if get_module_avail('mbed_lstools'): 01809 # Additional features available when mbed_lstools is installed on host and imported 01810 # mbed_lstools allow users to detect connected to host mbed-enabled devices 01811 parser.add_argument('--auto', 01812 dest='auto_detect', 01813 action="store_true", 01814 help='Use mbed-ls module to detect all connected mbed devices') 01815 01816 toolchain_list = list(TOOLCHAINS) + ["DEFAULT", "ALL"] 01817 parser.add_argument('--tc', 01818 dest='toolchains_filter', 01819 type=argparse_many(argparse_uppercase_type(toolchain_list, "toolchains")), 01820 help="Toolchain filter for --auto argument. Use toolchains names separated by comma, 'default' or 'all' to select toolchains") 01821 01822 test_scopes = ','.join(["'%s'" % n for n in get_available_oper_test_scopes()]) 01823 parser.add_argument('--oper', 01824 dest='operability_checks', 01825 type=argparse_lowercase_type(get_available_oper_test_scopes(), "scopes"), 01826 help='Perform interoperability tests between host and connected mbed devices. Available test scopes are: %s' % test_scopes) 01827 01828 parser.add_argument('--clean', 01829 dest='clean', 01830 action="store_true", 01831 help='Clean the build directory') 01832 01833 parser.add_argument('-P', '--only-peripherals', 01834 dest='test_only_peripheral', 01835 default=False, 01836 action="store_true", 01837 help='Test only peripheral declared for MUT and skip common tests') 01838 01839 parser.add_argument("--profile", dest="profile", action="append", 01840 type=argparse_filestring_type, 01841 default=[]) 01842 01843 parser.add_argument('-C', '--only-commons', 01844 dest='test_only_common', 01845 default=False, 01846 action="store_true", 01847 help='Test only board internals. Skip perpherials tests and perform common tests') 01848 01849 parser.add_argument('-n', '--test-by-names', 01850 dest='test_by_names', 01851 type=argparse_many(str), 01852 help='Runs only test enumerated it this switch. Use comma to separate test case names') 01853 01854 parser.add_argument('-p', '--peripheral-by-names', 01855 dest='peripheral_by_names', 01856 type=argparse_many(str), 01857 help='Forces discovery of particular peripherals. Use comma to separate peripheral names') 01858 01859 copy_methods = host_tests_plugins.get_plugin_caps('CopyMethod') 01860 copy_methods_str = "Plugin support: " + ', '.join(copy_methods) 01861 01862 parser.add_argument('-c', '--copy-method', 01863 dest='copy_method', 01864 type=argparse_uppercase_type(copy_methods, "flash method"), 01865 help="Select binary copy (flash) method. Default is Python's shutil.copy() method. %s"% copy_methods_str) 01866 01867 reset_methods = host_tests_plugins.get_plugin_caps('ResetMethod') 01868 reset_methods_str = "Plugin support: " + ', '.join(reset_methods) 01869 01870 parser.add_argument('-r', '--reset-type', 01871 dest='mut_reset_type', 01872 default=None, 01873 type=argparse_uppercase_type(reset_methods, "reset method"), 01874 help='Extra reset method used to reset MUT by host test script. %s'% reset_methods_str) 01875 01876 parser.add_argument('-g', '--goanna-for-tests', 01877 dest='goanna_for_tests', 01878 action="store_true", 01879 help='Run Goanna static analyse tool for tests. (Project will be rebuilded)') 01880 01881 parser.add_argument('-G', '--goanna-for-sdk', 01882 dest='goanna_for_mbed_sdk', 01883 action="store_true", 01884 help='Run Goanna static analyse tool for mbed SDK (Project will be rebuilded)') 01885 01886 parser.add_argument('-s', '--suppress-summary', 01887 dest='suppress_summary', 01888 default=False, 01889 action="store_true", 01890 help='Suppresses display of wellformatted table with test results') 01891 01892 parser.add_argument('-t', '--test-summary', 01893 dest='test_x_toolchain_summary', 01894 default=False, 01895 action="store_true", 01896 help='Displays wellformatted table with test x toolchain test result per target') 01897 01898 parser.add_argument('-A', '--test-automation-report', 01899 dest='test_automation_report', 01900 default=False, 01901 action="store_true", 01902 help='Prints information about all tests and exits') 01903 01904 parser.add_argument('-R', '--test-case-report', 01905 dest='test_case_report', 01906 default=False, 01907 action="store_true", 01908 help='Prints information about all test cases and exits') 01909 01910 parser.add_argument("-S", "--supported-toolchains", 01911 action="store_true", 01912 dest="supported_toolchains", 01913 default=False, 01914 help="Displays supported matrix of MCUs and toolchains") 01915 01916 parser.add_argument("-O", "--only-build", 01917 action="store_true", 01918 dest="only_build_tests", 01919 default=False, 01920 help="Only build tests, skips actual test procedures (flashing etc.)") 01921 01922 parser.add_argument('--parallel', 01923 dest='parallel_test_exec', 01924 default=False, 01925 action="store_true", 01926 help='Experimental, you execute test runners for connected to your host MUTs in parallel (speeds up test result collection)') 01927 01928 parser.add_argument('--config', 01929 dest='verbose_test_configuration_only', 01930 default=False, 01931 action="store_true", 01932 help='Displays full test specification and MUTs configration and exits') 01933 01934 parser.add_argument('--loops', 01935 dest='test_loops_list', 01936 type=argparse_many(str), 01937 help='Set no. of loops per test. Format: TEST_1=1,TEST_2=2,TEST_3=3') 01938 01939 parser.add_argument('--global-loops', 01940 dest='test_global_loops_value', 01941 type=int, 01942 help='Set global number of test loops per test. Default value is set 1') 01943 01944 parser.add_argument('--consolidate-waterfall', 01945 dest='consolidate_waterfall_test', 01946 default=False, 01947 action="store_true", 01948 help='Used with --waterfall argument. Adds only one test to report reflecting outcome of waterfall test.') 01949 01950 parser.add_argument('-W', '--waterfall', 01951 dest='waterfall_test', 01952 default=False, 01953 action="store_true", 01954 help='Used with --loops or --global-loops arguments. Tests until OK result occurs and assumes test passed') 01955 01956 parser.add_argument('-N', '--firmware-name', 01957 dest='firmware_global_name', 01958 help='Set global name for all produced projects. Note, proper file extension will be added by buid scripts') 01959 01960 parser.add_argument('-u', '--shuffle', 01961 dest='shuffle_test_order', 01962 default=False, 01963 action="store_true", 01964 help='Shuffles test execution order') 01965 01966 parser.add_argument('--shuffle-seed', 01967 dest='shuffle_test_seed', 01968 default=None, 01969 help='Shuffle seed (If you want to reproduce your shuffle order please use seed provided in test summary)') 01970 01971 parser.add_argument('-f', '--filter', 01972 dest='general_filter_regex', 01973 type=argparse_many(str), 01974 default=None, 01975 help='For some commands you can use filter to filter out results') 01976 01977 parser.add_argument('--inc-timeout', 01978 dest='extend_test_timeout', 01979 metavar="NUMBER", 01980 type=int, 01981 help='You can increase global timeout for each test by specifying additional test timeout in seconds') 01982 01983 parser.add_argument('--db', 01984 dest='db_url', 01985 help='This specifies what database test suite uses to store its state. To pass DB connection info use database connection string. Example: \'mysql://username:password@127.0.0.1/db_name\'') 01986 01987 parser.add_argument('-l', '--log', 01988 dest='log_file_name', 01989 help='Log events to external file (note not all console entries may be visible in log file)') 01990 01991 parser.add_argument('--report-html', 01992 dest='report_html_file_name', 01993 help='You can log test suite results in form of HTML report') 01994 01995 parser.add_argument('--report-junit', 01996 dest='report_junit_file_name', 01997 help='You can log test suite results in form of JUnit compliant XML report') 01998 01999 parser.add_argument("--report-build", 02000 dest="report_build_file_name", 02001 help="Output the build results to a junit xml file") 02002 02003 parser.add_argument("--report-text", 02004 dest="report_text_file_name", 02005 help="Output the build results to a text file") 02006 02007 parser.add_argument('--verbose-skipped', 02008 dest='verbose_skipped_tests', 02009 default=False, 02010 action="store_true", 02011 help='Prints some extra information about skipped tests') 02012 02013 parser.add_argument('-V', '--verbose-test-result', 02014 dest='verbose_test_result_only', 02015 default=False, 02016 action="store_true", 02017 help='Prints test serial output') 02018 02019 parser.add_argument('-v', '--verbose', 02020 dest='verbose', 02021 default=False, 02022 action="store_true", 02023 help='Verbose mode (prints some extra information)') 02024 02025 parser.add_argument('--version', 02026 dest='version', 02027 default=False, 02028 action="store_true", 02029 help='Prints script version and exits') 02030 02031 parser.add_argument('--stats-depth', 02032 dest='stats_depth', 02033 default=2, 02034 type=int, 02035 help="Depth level for static memory report") 02036 return parser 02037 02038 def test_path_to_name (path, base): 02039 """Change all slashes in a path into hyphens 02040 This creates a unique cross-platform test name based on the path 02041 This can eventually be overriden by a to-be-determined meta-data mechanism""" 02042 name_parts = [] 02043 head, tail = os.path.split(relpath(path,base)) 02044 while (tail and tail != "."): 02045 name_parts.insert(0, tail) 02046 head, tail = os.path.split(head) 02047 02048 return "-".join(name_parts).lower() 02049 02050 def get_test_config (config_name, target_name): 02051 """Finds the path to a test configuration file 02052 config_name: path to a custom configuration file OR mbed OS interface "ethernet, wifi_odin, etc" 02053 target_name: name of target to determing if mbed OS interface given is valid 02054 returns path to config, will return None if no valid config is found 02055 """ 02056 # If they passed in a full path 02057 if exists(config_name): 02058 # This is a module config 02059 return config_name 02060 # Otherwise find the path to configuration file based on mbed OS interface 02061 return TestConfig.get_config_path(config_name, target_name) 02062 02063 def find_tests (base_dir, target_name, toolchain_name, app_config=None): 02064 """ Finds all tests in a directory recursively 02065 base_dir: path to the directory to scan for tests (ex. 'path/to/project') 02066 target_name: name of the target to use for scanning (ex. 'K64F') 02067 toolchain_name: name of the toolchain to use for scanning (ex. 'GCC_ARM') 02068 options: Compile options to pass to the toolchain (ex. ['debug-info']) 02069 app_config - location of a chosen mbed_app.json file 02070 02071 returns a dictionary where keys are the test name, and the values are 02072 lists of paths needed to biuld the test. 02073 """ 02074 02075 # Temporary structure: tests referenced by (name, base, group, case) tuple 02076 tests = {} 02077 # List of common folders: (predicate function, path) tuple 02078 commons = [] 02079 02080 # Prepare the toolchain 02081 toolchain = prepare_toolchain([base_dir], None, target_name, toolchain_name, 02082 app_config=app_config) 02083 02084 # Scan the directory for paths to probe for 'TESTS' folders 02085 base_resources = scan_resources([base_dir], toolchain) 02086 02087 dirs = base_resources.inc_dirs 02088 for directory in dirs: 02089 subdirs = os.listdir(directory) 02090 02091 # If the directory contains a subdirectory called 'TESTS', scan it for test cases 02092 if 'TESTS' in subdirs: 02093 walk_base_dir = join(directory, 'TESTS') 02094 test_resources = toolchain.scan_resources(walk_base_dir, base_path=base_dir) 02095 02096 # Loop through all subdirectories 02097 for d in test_resources.inc_dirs: 02098 02099 # If the test case folder is not called 'host_tests' or 'COMMON' and it is 02100 # located two folders down from the main 'TESTS' folder (ex. TESTS/testgroup/testcase) 02101 # then add it to the tests 02102 relative_path = relpath(d, walk_base_dir) 02103 relative_path_parts = os.path.normpath(relative_path).split(os.sep) 02104 if len(relative_path_parts) == 2: 02105 test_group_directory_path, test_case_directory = os.path.split(d) 02106 test_group_directory = os.path.basename(test_group_directory_path) 02107 02108 # Check to make sure discoverd folder is not in a host test directory or common directory 02109 special_dirs = ['host_tests', 'COMMON'] 02110 if test_group_directory not in special_dirs and test_case_directory not in special_dirs: 02111 test_name = test_path_to_name(d, base_dir) 02112 tests[(test_name, walk_base_dir, test_group_directory, test_case_directory)] = [d] 02113 02114 # Also find any COMMON paths, we'll add these later once we find all the base tests 02115 if 'COMMON' in relative_path_parts: 02116 if relative_path_parts[0] != 'COMMON': 02117 def predicate(base_pred, group_pred, name_base_group_case): 02118 (name, base, group, case) = name_base_group_case 02119 return base == base_pred and group == group_pred 02120 commons.append((functools.partial(predicate, walk_base_dir, relative_path_parts[0]), d)) 02121 else: 02122 def predicate(base_pred, name_base_group_case): 02123 (name, base, group, case) = name_base_group_case 02124 return base == base_pred 02125 commons.append((functools.partial(predicate, walk_base_dir), d)) 02126 02127 # Apply common directories 02128 for pred, path in commons: 02129 for test_identity, test_paths in tests.iteritems(): 02130 if pred(test_identity): 02131 test_paths.append(path) 02132 02133 # Drop identity besides name 02134 return {name: paths for (name, _, _, _), paths in tests.iteritems()} 02135 02136 def print_tests (tests, format="list", sort=True): 02137 """Given a dictionary of tests (as returned from "find_tests"), print them 02138 in the specified format""" 02139 if format == "list": 02140 for test_name in sorted(tests.keys()): 02141 test_path = tests[test_name][0] 02142 print("Test Case:") 02143 print(" Name: %s" % test_name) 02144 print(" Path: %s" % test_path) 02145 elif format == "json": 02146 print(json.dumps({test_name: test_path[0] for test_name, test_paths 02147 in tests}, indent=2)) 02148 else: 02149 print("Unknown format '%s'" % format) 02150 sys.exit(1) 02151 02152 def norm_relative_path (path, start): 02153 """This function will create a normalized, relative path. It mimics the 02154 python os.path.relpath function, but also normalizes a Windows-syle path 02155 that use backslashes to a Unix style path that uses forward slashes.""" 02156 path = os.path.normpath(path) 02157 path = os.path.relpath(path, start) 02158 path = path.replace("\\", "/") 02159 return path 02160 02161 02162 def build_test_worker (*args, **kwargs): 02163 """This is a worker function for the parallel building of tests. The `args` 02164 and `kwargs` are passed directly to `build_project`. It returns a dictionary 02165 with the following structure: 02166 02167 { 02168 'result': `True` if no exceptions were thrown, `False` otherwise 02169 'reason': Instance of exception that was thrown on failure 02170 'bin_file': Path to the created binary if `build_project` was 02171 successful. Not present otherwise 02172 'kwargs': The keyword arguments that were passed to `build_project`. 02173 This includes arguments that were modified (ex. report) 02174 } 02175 """ 02176 bin_file = None 02177 ret = { 02178 'result': False, 02179 'args': args, 02180 'kwargs': kwargs 02181 } 02182 02183 # Use parent TOOLCHAIN_PATHS variable 02184 for key, value in kwargs['toolchain_paths'].items(): 02185 TOOLCHAIN_PATHS[key] = value 02186 02187 del kwargs['toolchain_paths'] 02188 02189 try: 02190 bin_file = build_project(*args, **kwargs) 02191 ret['result'] = True 02192 ret['bin_file'] = bin_file 02193 ret['kwargs'] = kwargs 02194 02195 except NotSupportedException as e: 02196 ret['reason'] = e 02197 except ToolException as e: 02198 ret['reason'] = e 02199 except KeyboardInterrupt as e: 02200 ret['reason'] = e 02201 except: 02202 # Print unhandled exceptions here 02203 import traceback 02204 traceback.print_exc(file=sys.stdout) 02205 02206 return ret 02207 02208 02209 def build_tests (tests, base_source_paths, build_path, target, toolchain_name, 02210 clean=False, notify=None, jobs=1, macros=None, 02211 silent=False, report=None, properties=None, 02212 continue_on_build_fail=False, app_config=None, 02213 build_profile=None, stats_depth=None): 02214 """Given the data structure from 'find_tests' and the typical build parameters, 02215 build all the tests 02216 02217 Returns a tuple of the build result (True or False) followed by the test 02218 build data structure""" 02219 02220 execution_directory = "." 02221 base_path = norm_relative_path(build_path, execution_directory) 02222 02223 target_name = target.name if isinstance(target, Target) else target 02224 cfg, _, _ = get_config(base_source_paths, target_name, toolchain_name, app_config=app_config) 02225 02226 baud_rate = 9600 02227 if 'platform.stdio-baud-rate' in cfg: 02228 baud_rate = cfg['platform.stdio-baud-rate'].value 02229 02230 test_build = { 02231 "platform": target_name, 02232 "toolchain": toolchain_name, 02233 "base_path": base_path, 02234 "baud_rate": baud_rate, 02235 "binary_type": "bootable", 02236 "tests": {} 02237 } 02238 02239 result = True 02240 02241 jobs_count = int(jobs if jobs else cpu_count()) 02242 p = Pool(processes=jobs_count) 02243 results = [] 02244 for test_name, test_paths in tests.items(): 02245 if not isinstance(test_paths, list): 02246 test_paths = [test_paths] 02247 02248 test_build_path = os.path.join(build_path, test_paths[0]) 02249 src_paths = base_source_paths + test_paths 02250 bin_file = None 02251 test_case_folder_name = os.path.basename(test_paths[0]) 02252 02253 args = (src_paths, test_build_path, target, toolchain_name) 02254 kwargs = { 02255 'jobs': 1, 02256 'clean': clean, 02257 'macros': macros, 02258 'name': test_case_folder_name, 02259 'project_id': test_name, 02260 'report': report, 02261 'properties': properties, 02262 'app_config': app_config, 02263 'build_profile': build_profile, 02264 'toolchain_paths': TOOLCHAIN_PATHS, 02265 'stats_depth': stats_depth, 02266 'notify': MockNotifier() 02267 } 02268 02269 results.append(p.apply_async(build_test_worker, args, kwargs)) 02270 02271 p.close() 02272 result = True 02273 itr = 0 02274 while len(results): 02275 itr += 1 02276 if itr > 360000: 02277 p.terminate() 02278 p.join() 02279 raise ToolException("Compile did not finish in 10 minutes") 02280 else: 02281 sleep(0.01) 02282 pending = 0 02283 for r in results: 02284 if r.ready() is True: 02285 try: 02286 worker_result = r.get() 02287 results.remove(r) 02288 02289 # Push all deferred notifications out to the actual notifier 02290 new_notify = deepcopy(notify) 02291 for message in worker_result['kwargs']['notify'].messages: 02292 new_notify.notify(message) 02293 02294 # Take report from the kwargs and merge it into existing report 02295 if report: 02296 report_entry = worker_result['kwargs']['report'][target_name][toolchain_name] 02297 report_entry[worker_result['kwargs']['project_id'].upper()][0][0]['output'] = new_notify.get_output() 02298 for test_key in report_entry.keys(): 02299 report[target_name][toolchain_name][test_key] = report_entry[test_key] 02300 02301 # Set the overall result to a failure if a build failure occurred 02302 if ('reason' in worker_result and 02303 not worker_result['reason'] and 02304 not isinstance(worker_result['reason'], NotSupportedException)): 02305 result = False 02306 break 02307 02308 02309 # Adding binary path to test build result 02310 if ('result' in worker_result and 02311 worker_result['result'] and 02312 'bin_file' in worker_result): 02313 bin_file = norm_relative_path(worker_result['bin_file'], execution_directory) 02314 02315 test_build['tests'][worker_result['kwargs']['project_id']] = { 02316 "binaries": [ 02317 { 02318 "path": bin_file 02319 } 02320 ] 02321 } 02322 02323 test_key = worker_result['kwargs']['project_id'].upper() 02324 print('Image: %s\n' % bin_file) 02325 02326 except: 02327 if p._taskqueue.queue: 02328 p._taskqueue.queue.clear() 02329 sleep(0.5) 02330 p.terminate() 02331 p.join() 02332 raise 02333 else: 02334 pending += 1 02335 if pending >= jobs_count: 02336 break 02337 02338 # Break as soon as possible if there is a failure and we are not 02339 # continuing on build failures 02340 if not result and not continue_on_build_fail: 02341 if p._taskqueue.queue: 02342 p._taskqueue.queue.clear() 02343 sleep(0.5) 02344 p.terminate() 02345 break 02346 02347 p.join() 02348 02349 test_builds = {} 02350 test_builds["%s-%s" % (target_name, toolchain_name)] = test_build 02351 02352 return result, test_builds 02353 02354 02355 def test_spec_from_test_builds(test_builds): 02356 return { 02357 "builds": test_builds 02358 }
Generated on Tue Jul 12 2022 18:18:51 by
