Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
daplink_board.py
00001 # 00002 # DAPLink Interface Firmware 00003 # Copyright (c) 2009-2016, ARM Limited, All Rights Reserved 00004 # SPDX-License-Identifier: Apache-2.0 00005 # 00006 # Licensed under the Apache License, Version 2.0 (the "License"); you may 00007 # not use this file except in compliance with the License. 00008 # You may obtain a copy of the License at 00009 # 00010 # http://www.apache.org/licenses/LICENSE-2.0 00011 # 00012 # Unless required by applicable law or agreed to in writing, software 00013 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 00014 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00015 # See the License for the specific language governing permissions and 00016 # limitations under the License. 00017 # 00018 00019 from __future__ import absolute_import 00020 00021 import os 00022 import re 00023 import time 00024 import subprocess 00025 import sys 00026 import binascii 00027 import itertools 00028 import mbed_lstools 00029 import info 00030 import test_daplink 00031 from test_info import TestInfoStub, TestInfo 00032 from intelhex import IntelHex 00033 from pyocd.core.helpers import ConnectHelper 00034 00035 FILE_IGNORE_PATTERN_LIST = [ 00036 re.compile("\\._\\.Trashes") 00037 ] 00038 00039 00040 # This prevents the following error message from getting 00041 # displayed on windows if the mbed dismounts unexpectedly 00042 # during a transfer: 00043 # There is no disk in the drive. Please insert a disk into 00044 # drive \Device<Harddiskx><rdrive> 00045 def disable_popup(): 00046 if sys.platform.startswith("win"): 00047 # pylint: disable=invalid-name 00048 import ctypes 00049 SEM_FAILCRITICALERRORS = 1 00050 GetErrorMode = \ 00051 ctypes.windll.kernel32.GetErrorMode # @UndefinedVariable 00052 GetErrorMode.restype = ctypes.c_uint 00053 GetErrorMode.argtypes = [] 00054 SetErrorMode = \ 00055 ctypes.windll.kernel32.SetErrorMode # @UndefinedVariable 00056 SetErrorMode.restype = ctypes.c_uint 00057 SetErrorMode.argtypes = [ctypes.c_uint] 00058 00059 err_mode = GetErrorMode() 00060 err_mode |= SEM_FAILCRITICALERRORS 00061 SetErrorMode(err_mode) 00062 00063 disable_popup() 00064 00065 00066 def get_all_attached_daplink_boards(): 00067 all_boards = [] 00068 lstools = mbed_lstools.create() 00069 mbed_list = lstools.list_mbeds() 00070 for mbed in mbed_list: 00071 unique_id = mbed['target_id'] 00072 board = DaplinkBoard(unique_id) 00073 if board._mode is not None: #Valid daplink should have set this mode 00074 all_boards.append(board) 00075 else: 00076 print("Warning: DAPLink tests cannot be done on board %s" % board.unique_id) 00077 return all_boards 00078 00079 00080 def _unique_id_to_host_id(unique_id): 00081 """Return the chip id unique to the daplink host procesor 00082 00083 Unique ID has the following fomat 00084 Board ID - 4 bytes 00085 Version - 4 bytes 00086 Host ID - Everything else 00087 """ 00088 return unique_id[8:8 + 32] 00089 00090 00091 def _get_board_endpoints(unique_id): 00092 """Return a tuple of unique_id, serial_port, mount_point""" 00093 lstools = mbed_lstools.create() 00094 mbed_list = lstools.list_mbeds() 00095 00096 host_id = _unique_id_to_host_id(unique_id) 00097 for mbed in mbed_list: 00098 mbed_unique_id = mbed['target_id'] 00099 mbed_serial_port = mbed['serial_port'] 00100 mbed_mount_point = mbed['mount_point'] 00101 mbed_host_id = _unique_id_to_host_id(mbed_unique_id) 00102 if mbed_host_id == host_id: 00103 return mbed_unique_id, mbed_serial_port, mbed_mount_point 00104 return None 00105 00106 00107 def _ranges(i): 00108 for _, b in itertools.groupby(enumerate(i), lambda x_y: x_y[1] - x_y[0]): 00109 b = list(b) 00110 yield b[0][1], b[-1][1] 00111 00112 00113 def _parse_kvp_file(file_path, parent_test=None): 00114 """Parse details.txt and return True if successful""" 00115 test_info = None 00116 kvp = {} 00117 if parent_test is not None: 00118 test_info = parent_test.create_subtest('parse_kvp_file') 00119 line_format = re.compile("^([a-zA-Z0-9 ]+): +(.+)$") 00120 if not os.path.isfile(file_path): 00121 return kvp 00122 00123 with open(file_path, "r") as file_handle: 00124 for line in file_handle: 00125 if len(line) <= 0: 00126 if test_info is not None: 00127 test_info.failure("Empty line in %s" % file_path) 00128 continue 00129 00130 if line[0] == '#': 00131 # The line is a comment 00132 continue 00133 00134 match = line_format.match(line) 00135 if match is None: 00136 if test_info is not None: 00137 test_info.failure("Invalid line: %s" % line) 00138 continue 00139 00140 key = match.group(1) 00141 key = key.lower().replace(" ", "_") 00142 value = match.group(2) 00143 value = value.lower() 00144 value = value.strip() 00145 if key in kvp: 00146 if test_info is not None: 00147 test_info.failure("Duplicate key %s" % key) 00148 continue 00149 kvp[key] = value 00150 return kvp 00151 00152 00153 def _compute_crc(hex_file_path): 00154 # Read in hex file 00155 new_hex_file = IntelHex() 00156 new_hex_file.padding = 0xFF 00157 new_hex_file.fromfile(hex_file_path, format='hex') 00158 00159 # Get the starting and ending address 00160 addresses = new_hex_file.addresses() 00161 addresses.sort() 00162 start_end_pairs = list(_ranges(addresses)) 00163 regions = len(start_end_pairs) 00164 assert regions == 1, ("Error - only 1 region allowed in " 00165 "hex file %i found." % regions) 00166 start, end = start_end_pairs[0] 00167 00168 # Compute checksum over the range (don't include data at location of crc) 00169 size = end - start + 1 00170 crc_size = size - 4 00171 data = new_hex_file.tobinarray(start=start, size=crc_size) 00172 data_crc32 = binascii.crc32(data) & 0xFFFFFFFF 00173 00174 # Grab the crc from the image 00175 embedded_crc32 = (((new_hex_file[end - 3] & 0xFF) << 0) | 00176 ((new_hex_file[end - 2] & 0xFF) << 8) | 00177 ((new_hex_file[end - 1] & 0xFF) << 16) | 00178 ((new_hex_file[end - 0] & 0xFF) << 24)) 00179 return data_crc32, embedded_crc32 00180 00181 00182 def _run_chkdsk(drive): 00183 args = ["chkdsk", drive] 00184 process = subprocess.Popen(args, stdin=subprocess.PIPE, 00185 stdout=subprocess.PIPE, 00186 stderr=subprocess.PIPE) 00187 process.communicate(input=bytearray('n\r\n',encoding='ascii')) # Answer no if prompted 00188 process.wait() 00189 return process.returncode 00190 00191 00192 class AssertInfo(object): 00193 00194 def __init__(self, file_name, line_number): 00195 self._file = file_name 00196 self._line = line_number 00197 00198 @property 00199 def file(self): 00200 return self._file 00201 00202 @property 00203 def line(self): 00204 return self._line 00205 00206 00207 class DaplinkBoard(object): 00208 00209 MODE_IF = "interface" 00210 MODE_BL = "bootloader" 00211 00212 # Keys for details.txt 00213 KEY_UNIQUE_ID = "unique_id" 00214 KEY_HIC_ID = "hic_id" 00215 KEY_MODE = "daplink_mode" 00216 KEY_BL_VERSION = "bootloader_version" 00217 KEY_IF_VERSION = "interface_version" 00218 KEY_GIT_SHA = "git_sha" 00219 KEY_LOCAL_MODS = "local_mods" 00220 KEY_USB_INTERFACES = "usb_interfaces" 00221 KEY_BL_CRC = "bootloader_crc" 00222 KEY_IF_CRC = "interface_crc" 00223 KEY_REMOUNT_COUNT = "remount_count" 00224 00225 def __init__(self, unique_id): 00226 00227 self.unique_id = unique_id 00228 self.details_txt = None 00229 self._mode = None 00230 self._remount_count = None 00231 self._assert = None 00232 self._check_fs_on_remount = False 00233 self._manage_assert = False 00234 self.update_board_info() 00235 00236 def __str__(self): 00237 return "Name=%s Unique ID=%s" % (self.name, self.get_unique_id()) 00238 00239 def get_unique_id(self): 00240 return self.unique_id 00241 00242 def get_board_id(self): 00243 return self.board_id 00244 00245 @property 00246 def hic_id(self): 00247 return self._hic_id 00248 00249 @property 00250 def name(self): 00251 if self.board_id in info.BOARD_ID_TO_BUILD_TARGET: 00252 board_target = info.BOARD_ID_TO_BUILD_TARGET[self.board_id] 00253 else: 00254 board_target = "Unknown" 00255 return board_target 00256 00257 def get_serial_port(self): 00258 return self.serial_port 00259 00260 def get_mount_point(self): 00261 return self.mount_point 00262 00263 def get_connected(self): 00264 """Check if the board is connected""" 00265 return os.path.isdir(self.mount_point) 00266 00267 def get_failure_message_and_type(self): 00268 """Get the failure message and types from fail.txt 00269 00270 return None if there there is no failure 00271 """ 00272 error = None 00273 error_type = None 00274 fail_file = self.get_file_path('FAIL.TXT') 00275 if not self.get_connected(): 00276 raise Exception('Board not connected') 00277 if os.path.isfile(fail_file): 00278 with open(fail_file, 'r') as fail_file_handle: 00279 msg = fail_file_handle.read() 00280 lines = msg.splitlines() 00281 if len(lines) == 2: 00282 if lines[0].startswith('error: '): 00283 error = lines[0][7:] 00284 else: 00285 raise Exception('Can not parse error line in FAIL.TXT') 00286 if lines[1].startswith('type: '): 00287 error_type = lines[1][6:] 00288 else: 00289 raise Exception('Can not parse type line in FAIL.TXT') 00290 else: 00291 raise Exception('Wrong number of lines in FAIL.TXT, expected: 2') 00292 return error, error_type 00293 00294 def get_assert_info(self): 00295 """Return an AssertInfo if an assert occurred, else None""" 00296 return self._assert 00297 00298 def get_mode(self): 00299 """Return either MODE_IF or MODE_BL""" 00300 assert ((self._mode is DaplinkBoard.MODE_BL) or 00301 (self._mode is DaplinkBoard.MODE_IF)) 00302 return self._mode 00303 00304 def get_file_path(self, file_name): 00305 """Convenience function to the path to a file on the drive""" 00306 return os.path.normpath(self.mount_point + os.sep + file_name) 00307 00308 def refresh(self, parent_test): 00309 """Remount driver to get updated contents""" 00310 refresh_filename = self.get_file_path('REFRESH.ACT') 00311 with open(refresh_filename, 'wb') as _: 00312 pass 00313 self.wait_for_remount(parent_test) 00314 00315 def set_mode(self, mode, parent_test=None): 00316 """Set the mode to either MODE_IF or MODE_BL""" 00317 assert ((mode is DaplinkBoard.MODE_BL) or 00318 (mode is DaplinkBoard.MODE_IF)) 00319 if parent_test is None: 00320 parent_test = TestInfoStub() 00321 test_info = parent_test.create_subtest('set_mode') 00322 current_mode = self.get_mode() 00323 if current_mode is mode: 00324 # No mode change needed 00325 return 00326 00327 if mode is self.MODE_BL: 00328 test_info.info("changing mode IF -> BL") 00329 # Create file to enter BL mode 00330 start_bl_path = self.get_file_path('START_BL.ACT') 00331 with open(start_bl_path, 'wb') as _: pass 00332 elif mode is self.MODE_IF: 00333 test_info.info("changing mode BL -> IF") 00334 # Create file to enter IF mode 00335 start_if_path = self.get_file_path('START_IF.ACT') 00336 with open(start_if_path, 'wb') as _: pass 00337 else: 00338 test_info.warning("Board is in unknown mode") 00339 self.wait_for_remount(test_info) 00340 00341 new_mode = self.get_mode() 00342 if new_mode != mode: 00343 test_info.failure("Board in wrong mode: %s" % new_mode) 00344 raise Exception("Could not change board mode") 00345 00346 def set_check_fs_on_remount(self, enabled): 00347 assert isinstance(enabled, bool) 00348 self._check_fs_on_remount = enabled 00349 self.set_assert_auto_manage(enabled) 00350 00351 def set_assert_auto_manage(self, enabled): 00352 assert isinstance(enabled, bool) 00353 self.clear_assert() 00354 self._manage_assert = enabled 00355 00356 def clear_assert(self): 00357 assert_path = self.get_file_path("ASSERT.TXT") 00358 if os.path.isfile(assert_path): 00359 os.remove(assert_path) 00360 self.wait_for_remount(TestInfoStub()) 00361 00362 def run_board_test(self, parent_test): 00363 test_daplink.daplink_test(self, parent_test) 00364 00365 def read_target_memory(self, addr, size, resume=True): 00366 assert self.get_mode() == self.MODE_IF 00367 with ConnectHelper.session_with_chosen_probe(unique_id=self.get_unique_id(), 00368 resume_on_disconnect=resume) as session: 00369 data = session.target.read_memory_block8(addr, size) 00370 return bytearray(data) 00371 00372 def test_fs(self, parent_test): 00373 """Check if the raw filesystem is valid""" 00374 if sys.platform.startswith("win"): 00375 test_info = parent_test.create_subtest('test_fs') 00376 returncode = _run_chkdsk(self.mount_point) 00377 test_info.info('chkdsk returned %s' % returncode) 00378 if returncode != 0: 00379 test_info.failure('Disk corrupt') 00380 00381 # Windows 8/10 workaround - rerun chkdsk until disk caching is on 00382 # Notes about this problem: 00383 # - This is less likely to occur when the "storage" service is 00384 # turned off and/or you are running as administrator 00385 # - When creating a directory with os.mkdir the 00386 # following error occurs: "WindowsError: [Error 1392] The 00387 # file or directory is corrupted and unreadable: '<directory>'" 00388 # - When creating a file with open(<filename>, "wb") the 00389 # following error occurs: "OError: [Errno 22] invalid 00390 # mode ('wb') or filename: '<filename>'" 00391 # - When a file or directory is created on the drive in explorer 00392 # and you preform a refresh, the newly created file or 00393 # directory disappears 00394 persist_test_dir = self.get_file_path("persist_test_dir") 00395 for _ in range(10): 00396 try: 00397 os.mkdir(persist_test_dir) 00398 except EnvironmentError as exception: 00399 test_info.info("cache check exception %s" % exception) 00400 if os.path.exists(persist_test_dir): 00401 os.rmdir(persist_test_dir) 00402 break 00403 test_info.info("running checkdisk to re-enable caching") 00404 _run_chkdsk(self.mount_point) 00405 else: 00406 raise Exception("Unable to re-enable caching") 00407 00408 # TODO - as a future improvement add linux and mac support 00409 00410 # Tests for the following: 00411 # 1. Correct files present -TODO 00412 # 2. Contents of file are valid ascii 00413 # 3. Each line ends with \r\n 00414 # 4. There is no whitespace at the end of the line 00415 # 5. Each file ends with \r\n 00416 def test_fs_contents(self, parent_test): 00417 """Check if the file contents are valid""" 00418 test_info = parent_test.create_subtest('test_fs_contents') 00419 non_ascii = b'[^\x20-\x7F\r\n]' 00420 non_cr_lf = b'\r[^\n]|[^\r]\n' 00421 trail_white = b'(?:\ \r|\ \n)' 00422 end_of_file = b'\r\n$' 00423 files = os.listdir(self.mount_point) 00424 non_ascii_re = re.compile(non_ascii) 00425 non_cr_lf_re = re.compile(non_cr_lf) 00426 trail_white_re = re.compile(trail_white) 00427 end_of_file_re = re.compile(end_of_file) 00428 for filename in files: 00429 filepath = self.get_file_path(filename) 00430 if not os.path.isfile(filepath): 00431 test_info.info("Skipping non file item %s" % filepath) 00432 continue 00433 skip = False 00434 for pattern in FILE_IGNORE_PATTERN_LIST: 00435 if pattern.match(filename): 00436 skip = True 00437 break 00438 if skip: 00439 continue 00440 00441 with open(filepath, 'rb') as file_handle: 00442 file_contents = file_handle.read() 00443 if non_ascii_re.search(file_contents): 00444 test_info.failure("Non ascii characters in %s" % filepath) 00445 elif non_cr_lf_re.search(file_contents): 00446 test_info.failure("File has non-standard line endings %s" % 00447 filepath) 00448 elif trail_white_re.search(file_contents): 00449 test_info.warning("File trailing whitespace %s" % 00450 filepath) 00451 elif end_of_file_re.search(file_contents) is None: 00452 test_info.warning("No newline at end of file %s" % 00453 filepath) 00454 else: 00455 test_info.info("File %s valid" % filepath) 00456 00457 self.test_details_txt(test_info) 00458 00459 def load_interface(self, filepath, parent_test): 00460 """Load an interface binary or hex""" 00461 assert isinstance(filepath, str), "Invalid bootloader image!" 00462 assert isinstance(parent_test, TestInfo), "Invalid parent test object!" 00463 00464 test_info = parent_test.create_subtest('load_interface') 00465 self.set_mode(self.MODE_BL, test_info) 00466 00467 data_crc, crc_in_image = _compute_crc(filepath) 00468 assert data_crc == crc_in_image, ("CRC in interface is wrong " 00469 "expected 0x%x, found 0x%x" % 00470 (data_crc, crc_in_image)) 00471 00472 filename = os.path.basename(filepath) 00473 with open(filepath, 'rb') as firmware_file: 00474 data = firmware_file.read() 00475 out_file = self.get_file_path(filename) 00476 start = time.time() 00477 with open(out_file, 'wb') as firmware_file: 00478 firmware_file.write(data) 00479 stop = time.time() 00480 test_info.info("programming took %s s" % (stop - start)) 00481 self.wait_for_remount(test_info) 00482 00483 # Check the CRC 00484 self.set_mode(self.MODE_IF, test_info) 00485 if DaplinkBoard.KEY_IF_CRC not in self.details_txt: 00486 test_info.failure("No interface CRC in details.txt") 00487 return 00488 details_crc = int(self.details_txt[DaplinkBoard.KEY_IF_CRC], 0) 00489 test_info.info("Interface crc: 0x%x" % details_crc) 00490 if data_crc != details_crc: 00491 test_info.failure("Interface CRC is wrong") 00492 00493 def load_bootloader(self, filepath, parent_test): 00494 """Load a bootloader binary or hex""" 00495 assert isinstance(filepath, str), "Invalid bootloader image!" 00496 assert isinstance(parent_test, TestInfo), "Invalid parent test object!" 00497 00498 test_info = parent_test.create_subtest('load_bootloader') 00499 self.set_mode(self.MODE_IF, test_info) 00500 00501 # Check image CRC 00502 data_crc, crc_in_image = _compute_crc(filepath) 00503 assert data_crc == crc_in_image, ("CRC in bootloader is wrong " 00504 "expected 0x%x, found 0x%x" % 00505 (data_crc, crc_in_image)) 00506 00507 filename = os.path.basename(filepath) 00508 with open(filepath, 'rb') as firmware_file: 00509 data = firmware_file.read() 00510 out_file = self.get_file_path(filename) 00511 start = time.time() 00512 with open(out_file, 'wb') as firmware_file: 00513 firmware_file.write(data) 00514 stop = time.time() 00515 test_info.info("programming took %s s" % (stop - start)) 00516 self.wait_for_remount(test_info) 00517 00518 # Check the CRC 00519 self.set_mode(self.MODE_IF, test_info) 00520 if DaplinkBoard.KEY_BL_CRC not in self.details_txt: 00521 test_info.failure("No bootloader CRC in details.txt") 00522 return 00523 details_crc = int(self.details_txt[DaplinkBoard.KEY_BL_CRC], 0) 00524 test_info.info("Bootloader crc: 0x%x" % details_crc) 00525 if data_crc != details_crc: 00526 test_info.failure("Bootloader CRC is wrong") 00527 00528 def wait_for_remount(self, parent_test, wait_time=600): 00529 mode = self._mode 00530 count = self._remount_count 00531 test_info = parent_test.create_subtest('wait_for_remount') 00532 00533 elapsed = 0 00534 start = time.time() 00535 remounted = False 00536 while os.path.isdir(self.mount_point): 00537 if self.update_board_info(False): #check info if it is already mounted 00538 if mode is not None and self._mode is not None and mode is not self._mode: 00539 remounted = True 00540 test_info.info("already remounted with change mode") 00541 break 00542 elif count is not None and self._remount_count is not None and count != self._remount_count: 00543 remounted = True 00544 test_info.info("already remounted with change mount count") 00545 break 00546 if elapsed > wait_time: 00547 raise Exception("Dismount timed out") 00548 time.sleep(0.1) 00549 elapsed += 0.2 00550 else: 00551 stop = time.time() 00552 test_info.info("unmount took %s s" % (stop - start)) 00553 elapsed = 0 00554 start = time.time() 00555 00556 while not remounted: 00557 if self.update_board_info(False): 00558 if os.path.isdir(self.mount_point): 00559 # Information returned by mbed-ls could be old. 00560 # Only break from the loop if the second call to 00561 # mbed-ls returns the same mount point. 00562 tmp_mount = self.mount_point 00563 if self.update_board_info(False): 00564 if tmp_mount == self.mount_point: 00565 break 00566 if elapsed > wait_time: 00567 raise Exception("Mount timed out") 00568 time.sleep(0.1) 00569 elapsed += 0.1 00570 stop = time.time() 00571 test_info.info("mount took %s s" % (stop - start)) 00572 00573 if count is not None and self._remount_count is not None: 00574 expected_count = (0 if mode is not self._mode 00575 else (count + 1) & 0xFFFFFFFF) 00576 if expected_count != self._remount_count: 00577 test_info.failure('Expected remount count of %s got %s' % 00578 (expected_count, self._remount_count)) 00579 00580 # If enabled check the filesystem 00581 if self._check_fs_on_remount: 00582 self.test_fs(parent_test) 00583 self.test_fs_contents(parent_test) 00584 self.test_details_txt(parent_test) 00585 if self._manage_assert: 00586 if self._assert is not None: 00587 test_info.failure('Assert on line %s in file %s' % 00588 (self._assert.line, self._assert.file)) 00589 self.clear_assert() 00590 00591 def update_board_info(self, exptn_on_fail=True): 00592 """Update board info 00593 00594 Update all board information variables that could 00595 change when remounting or changing modes. 00596 Note - before this function is set self.unique_id 00597 must be set. 00598 """ 00599 00600 try: 00601 endpoints = _get_board_endpoints(self.unique_id) 00602 if endpoints is None: 00603 if exptn_on_fail: 00604 raise Exception("Could not update board info: %s" % 00605 self.unique_id) 00606 return False 00607 self.unique_id, self.serial_port, self.mount_point = endpoints 00608 # Serial port can be missing 00609 if self.unique_id is None: 00610 if exptn_on_fail: 00611 raise Exception("Mount point is null") 00612 return False 00613 if self.mount_point is None: 00614 if exptn_on_fail: 00615 raise Exception("Mount point is null") 00616 return False 00617 self.board_id = int(self.unique_id[0:4], 16) 00618 self._hic_id = int(self.unique_id[-8:], 16) 00619 00620 # Note - Some legacy boards might not have details.txt 00621 details_txt_path = self.get_file_path("details.txt") 00622 self.details_txt = _parse_kvp_file(details_txt_path) 00623 self._parse_assert_txt() 00624 00625 self._remount_count = None 00626 if DaplinkBoard.KEY_REMOUNT_COUNT in self.details_txt: 00627 self._remount_count = int(self.details_txt[DaplinkBoard.KEY_REMOUNT_COUNT]) 00628 self._mode = None 00629 if DaplinkBoard.KEY_MODE in self.details_txt: 00630 DETAILS_TO_MODE = { 00631 "interface": DaplinkBoard.MODE_IF, 00632 "bootloader": DaplinkBoard.MODE_BL, 00633 } 00634 mode_str = self.details_txt[DaplinkBoard.KEY_MODE] 00635 self._mode = DETAILS_TO_MODE[mode_str] 00636 else: 00637 #check for race condition here 00638 return False 00639 return True 00640 except Exception as e: 00641 if exptn_on_fail: 00642 raise e 00643 else: 00644 return False 00645 00646 def test_details_txt(self, parent_test): 00647 """Check that details.txt has all requied fields""" 00648 test_info = parent_test.create_subtest('test_details_txt') 00649 required_key_and_format = { 00650 DaplinkBoard.KEY_UNIQUE_ID: re.compile("^[a-f0-9]{48}$"), 00651 DaplinkBoard.KEY_HIC_ID: re.compile("^[a-f0-9]{8}$"), 00652 DaplinkBoard.KEY_GIT_SHA: re.compile("^[a-f0-9]{40}$"), 00653 DaplinkBoard.KEY_LOCAL_MODS: re.compile("^[01]{1}$"), 00654 DaplinkBoard.KEY_USB_INTERFACES: re.compile("^.+$"), 00655 DaplinkBoard.KEY_MODE: re.compile("(interface|bootloader)"), 00656 } 00657 optional_key_and_format = { 00658 DaplinkBoard.KEY_BL_VERSION: re.compile("^[0-9]{4}$"), 00659 DaplinkBoard.KEY_IF_VERSION: re.compile("^[0-9]{4}$"), 00660 DaplinkBoard.KEY_BL_CRC: re.compile("^0x[a-f0-9]{8}$"), 00661 DaplinkBoard.KEY_IF_CRC: re.compile("^0x[a-f0-9]{8}$"), 00662 } 00663 # 1. keys and values are alphanumeric 00664 # 2. no duplicate keys 00665 # 3. format is key : value 00666 # 4. required keys are present 00667 # 5. optional keys have the expected format 00668 details_txt_path = self.get_file_path("details.txt") 00669 details_txt = _parse_kvp_file(details_txt_path, test_info) 00670 if not details_txt: 00671 test_info.failure("Could not parse details.txt") 00672 return 00673 00674 # Check for required keys 00675 for key in required_key_and_format: 00676 if key not in details_txt: 00677 test_info.failure("Missing detail.txt entry: %s" % key) 00678 continue 00679 00680 value = details_txt[key] 00681 pattern = required_key_and_format[key] 00682 if pattern.match(value) is None: 00683 test_info.failure("Bad format detail.txt %s: %s" % 00684 (key, value)) 00685 00686 # Check format of optional values 00687 for key in optional_key_and_format: 00688 if key not in details_txt: 00689 continue 00690 00691 value = details_txt[key] 00692 pattern = optional_key_and_format[key] 00693 if pattern.match(value) is None: 00694 test_info.failure("Bad format detail.txt %s: %s" % 00695 (key, value)) 00696 00697 # Check details.txt contents 00698 details_unique_id = None 00699 details_hic_id = None 00700 if DaplinkBoard.KEY_UNIQUE_ID in details_txt: 00701 details_unique_id = details_txt[DaplinkBoard.KEY_UNIQUE_ID] 00702 if DaplinkBoard.KEY_HIC_ID in details_txt: 00703 details_hic_id = details_txt[DaplinkBoard.KEY_HIC_ID] 00704 if details_unique_id is not None: 00705 if details_unique_id != self.unique_id: 00706 test_info.failure("Unique ID mismatch in details.txt " 00707 "details.txt=%s, usb=%s" % 00708 (details_unique_id, self.unique_id)) 00709 if details_hic_id is not None: 00710 usb_hic = details_unique_id[-8:] 00711 if details_hic_id != usb_hic: 00712 test_info.failure("HIC ID is not the last 8 " 00713 "digits of unique ID " 00714 "details.txt=%s, usb=%s" % 00715 (details_hic_id, usb_hic)) 00716 00717 def _parse_assert_txt(self): 00718 file_path = self.get_file_path("ASSERT.TXT") 00719 if not os.path.isfile(file_path): 00720 self._assert = None 00721 return 00722 00723 assert_table = _parse_kvp_file(file_path) 00724 assert "file" in assert_table 00725 assert "line" in assert_table 00726 00727 self._assert = AssertInfo(assert_table["file"], assert_table['line'])
Generated on Tue Jul 12 2022 15:37:15 by
